mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 08:49:29 +00:00
Merge 3934c45261 into 12eabbd76d
This commit is contained in:
commit
0e48776959
2 changed files with 97 additions and 65 deletions
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// InitDatabaseFromFreezer reinitializes an empty database from a previous batch
|
||||
|
|
@ -383,31 +384,76 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) {
|
|||
if tail == nil || *tail > pruneBlock {
|
||||
return // no index, or index ends above pruneBlock
|
||||
}
|
||||
// There are blocks below pruneBlock in the index. Iterate the entire index to remove
|
||||
// their entries. Note if this fails, the index is messed up, but tail still points to
|
||||
// the old tail.
|
||||
var count, removed int
|
||||
DeleteAllTxLookupEntries(db, func(txhash common.Hash, v []byte) bool {
|
||||
count++
|
||||
if count%10000000 == 0 {
|
||||
log.Info("Pruning tx index", "count", count, "removed", removed)
|
||||
|
||||
start := time.Now()
|
||||
var (
|
||||
eg errgroup.Group
|
||||
removed atomic.Int64
|
||||
scanned atomic.Int64
|
||||
)
|
||||
eg.SetLimit(runtime.NumCPU())
|
||||
|
||||
// Periodically log progress from the main goroutine.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(8 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
log.Info("Pruning tx index", "scanned", scanned.Load(), "removed", removed.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(v) > 8 {
|
||||
log.Error("Skipping legacy tx index entry", "hash", txhash)
|
||||
return false
|
||||
}
|
||||
bn := decodeNumber(v)
|
||||
if bn < pruneBlock {
|
||||
removed++
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}()
|
||||
// Split the keyspace into 256 ranges by the first byte of the tx hash.
|
||||
// Tx hashes are uniformly distributed so each range gets roughly equal
|
||||
// work. The errgroup limits concurrency to NumCPU.
|
||||
for i := 0; i < 256; i++ {
|
||||
prefix := byte(i)
|
||||
eg.Go(func() error {
|
||||
it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{prefix}), common.HashLength+len(txLookupPrefix))
|
||||
defer it.Release()
|
||||
|
||||
batch := db.NewBatch()
|
||||
for it.Next() {
|
||||
if it.Key()[len(txLookupPrefix)] != prefix {
|
||||
break
|
||||
}
|
||||
scanned.Add(1)
|
||||
v := it.Value()
|
||||
if len(v) > 8 {
|
||||
continue // skip legacy format entries
|
||||
}
|
||||
if decodeBlockNumber(v) < pruneBlock {
|
||||
batch.Delete(it.Key())
|
||||
removed.Add(1)
|
||||
}
|
||||
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to delete tx index entries", "err", err)
|
||||
}
|
||||
batch.Reset()
|
||||
}
|
||||
}
|
||||
if batch.ValueSize() > 0 {
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Failed to delete tx index entries", "err", err)
|
||||
}
|
||||
batch.Reset()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
eg.Wait()
|
||||
close(done)
|
||||
WriteTxIndexTail(db, pruneBlock)
|
||||
log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
|
||||
func decodeNumber(b []byte) uint64 {
|
||||
var numBuffer [8]byte
|
||||
copy(numBuffer[8-len(b):], b)
|
||||
return binary.BigEndian.Uint64(numBuffer[:])
|
||||
func decodeBlockNumber(b []byte) uint64 {
|
||||
var buf [8]byte
|
||||
copy(buf[8-len(b):], b)
|
||||
return binary.BigEndian.Uint64(buf[:])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,69 +103,52 @@ func TestChainIterator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func initDatabaseWithTransactions(db ethdb.Database) ([]*types.Block, []*types.Transaction) {
|
||||
var blocks []*types.Block
|
||||
var txs []*types.Transaction
|
||||
func initDatabaseWithTransactions(db ethdb.Database, numBlocks, txsPerBlock int) []*types.Block {
|
||||
to := common.BytesToAddress([]byte{0x11})
|
||||
|
||||
// Write empty genesis block
|
||||
block := types.NewBlock(&types.Header{Number: big.NewInt(int64(0))}, nil, nil, newTestHasher())
|
||||
var blocks []*types.Block
|
||||
block := types.NewBlock(&types.Header{Number: big.NewInt(0)}, nil, nil, newTestHasher())
|
||||
WriteBlock(db, block)
|
||||
WriteCanonicalHash(db, block.Hash(), block.NumberU64())
|
||||
blocks = append(blocks, block)
|
||||
|
||||
// Create transactions.
|
||||
for i := uint64(1); i <= 10; i++ {
|
||||
var tx *types.Transaction
|
||||
if i%2 == 0 {
|
||||
tx = types.NewTx(&types.LegacyTx{
|
||||
Nonce: i,
|
||||
for i := 1; i <= numBlocks; i++ {
|
||||
txs := make([]*types.Transaction, txsPerBlock)
|
||||
for j := range txs {
|
||||
txs[j] = types.NewTx(&types.LegacyTx{
|
||||
Nonce: uint64(i*txsPerBlock + j),
|
||||
GasPrice: big.NewInt(11111),
|
||||
Gas: 1111,
|
||||
To: &to,
|
||||
Value: big.NewInt(111),
|
||||
Data: []byte{0x11, 0x11, 0x11},
|
||||
})
|
||||
} else {
|
||||
tx = types.NewTx(&types.AccessListTx{
|
||||
ChainID: big.NewInt(1337),
|
||||
Nonce: i,
|
||||
GasPrice: big.NewInt(11111),
|
||||
Gas: 1111,
|
||||
To: &to,
|
||||
Value: big.NewInt(111),
|
||||
Data: []byte{0x11, 0x11, 0x11},
|
||||
})
|
||||
}
|
||||
txs = append(txs, tx)
|
||||
block := types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, &types.Body{Transactions: types.Transactions{tx}}, nil, newTestHasher())
|
||||
block := types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, &types.Body{Transactions: types.Transactions(txs)}, nil, newTestHasher())
|
||||
WriteBlock(db, block)
|
||||
WriteCanonicalHash(db, block.Hash(), block.NumberU64())
|
||||
blocks = append(blocks, block)
|
||||
}
|
||||
|
||||
return blocks, txs
|
||||
return blocks
|
||||
}
|
||||
|
||||
func TestIndexTransactions(t *testing.T) {
|
||||
// Construct test chain db
|
||||
chainDB := NewMemoryDatabase()
|
||||
|
||||
_, txs := initDatabaseWithTransactions(chainDB)
|
||||
blocks := initDatabaseWithTransactions(chainDB, 10, 1)
|
||||
|
||||
// verify checks whether the tx indices in the range [from, to)
|
||||
// is expected.
|
||||
// verify checks whether the tx indices in the block range [from, to)
|
||||
// are present or absent.
|
||||
verify := func(from, to int, exist bool, tail uint64) {
|
||||
for i := from; i < to; i++ {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
number := ReadTxLookupEntry(chainDB, txs[i-1].Hash())
|
||||
if exist && number == nil {
|
||||
t.Fatalf("Transaction index %d missing", i)
|
||||
}
|
||||
if !exist && number != nil {
|
||||
t.Fatalf("Transaction index %d is not deleted", i)
|
||||
for _, block := range blocks[from:to] {
|
||||
for _, tx := range block.Transactions() {
|
||||
number := ReadTxLookupEntry(chainDB, tx.Hash())
|
||||
if exist && number == nil {
|
||||
t.Fatalf("Transaction index missing for block %d", block.NumberU64())
|
||||
}
|
||||
if !exist && number != nil {
|
||||
t.Fatalf("Transaction index not deleted for block %d", block.NumberU64())
|
||||
}
|
||||
}
|
||||
}
|
||||
number := ReadTxIndexTail(chainDB)
|
||||
|
|
@ -221,7 +204,7 @@ func TestIndexTransactions(t *testing.T) {
|
|||
func TestUnindexTransactionsMissingBody(t *testing.T) {
|
||||
// Construct test chain db
|
||||
chainDB := NewMemoryDatabase()
|
||||
blocks, _ := initDatabaseWithTransactions(chainDB)
|
||||
blocks := initDatabaseWithTransactions(chainDB, 10, 1)
|
||||
|
||||
// Index the entire chain.
|
||||
lastBlock := blocks[len(blocks)-1].NumberU64()
|
||||
|
|
@ -250,9 +233,12 @@ func TestUnindexTransactionsMissingBody(t *testing.T) {
|
|||
|
||||
func TestPruneTransactionIndex(t *testing.T) {
|
||||
chainDB := NewMemoryDatabase()
|
||||
blocks, _ := initDatabaseWithTransactions(chainDB)
|
||||
|
||||
// Create 100 blocks with 8 txs each (800 total tx index entries) so that
|
||||
// parallel workers each get a meaningful share of the keyspace.
|
||||
blocks := initDatabaseWithTransactions(chainDB, 100, 8)
|
||||
lastBlock := blocks[len(blocks)-1].NumberU64()
|
||||
pruneBlock := lastBlock - 3
|
||||
pruneBlock := lastBlock / 2 // prune the first half
|
||||
|
||||
IndexTransactions(chainDB, 0, lastBlock+1, nil, false)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue