mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-07 23:48:36 +00:00
core: respect history cutoff in txindexer (#31393)
In #31384 we unindex TXes prior to the merge block. However when the node starts up it will try to re-index those back if the config is to index the whole chain. This change makes the indexer aware of the history cutoff block, avoiding reindexing in that segment. --------- Co-authored-by: Gary Rong <garyrong0905@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
parent
07cca7ab9f
commit
1886922264
4 changed files with 543 additions and 202 deletions
|
|
@ -277,6 +277,14 @@ func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteTxIndexTail deletes the number of oldest indexed block
|
||||||
|
// from database.
|
||||||
|
func DeleteTxIndexTail(db ethdb.KeyValueWriter) {
|
||||||
|
if err := db.Delete(txIndexTailKey); err != nil {
|
||||||
|
log.Crit("Failed to delete the transaction index tail", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going
|
// ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going
|
||||||
// backwards towards genesis. This method assumes that the caller already has
|
// backwards towards genesis. This method assumes that the caller already has
|
||||||
// placed a cap on count, to prevent DoS issues.
|
// placed a cap on count, to prevent DoS issues.
|
||||||
|
|
|
||||||
|
|
@ -30,13 +30,8 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
|
// DecodeTxLookupEntry decodes the supplied tx lookup data.
|
||||||
// hash to allow retrieving the transaction or receipt by hash.
|
func DecodeTxLookupEntry(data []byte, db ethdb.Reader) *uint64 {
|
||||||
func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
|
|
||||||
data, _ := db.Get(txLookupKey(hash))
|
|
||||||
if len(data) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// Database v6 tx lookup just stores the block number
|
// Database v6 tx lookup just stores the block number
|
||||||
if len(data) < common.HashLength {
|
if len(data) < common.HashLength {
|
||||||
number := new(big.Int).SetBytes(data).Uint64()
|
number := new(big.Int).SetBytes(data).Uint64()
|
||||||
|
|
@ -49,12 +44,22 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
|
||||||
// Finally try database v3 tx lookup format
|
// Finally try database v3 tx lookup format
|
||||||
var entry LegacyTxLookupEntry
|
var entry LegacyTxLookupEntry
|
||||||
if err := rlp.DecodeBytes(data, &entry); err != nil {
|
if err := rlp.DecodeBytes(data, &entry); err != nil {
|
||||||
log.Error("Invalid transaction lookup entry RLP", "hash", hash, "blob", data, "err", err)
|
log.Error("Invalid transaction lookup entry RLP", "blob", data, "err", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return &entry.BlockIndex
|
return &entry.BlockIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
|
||||||
|
// hash to allow retrieving the transaction or receipt by hash.
|
||||||
|
func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
|
||||||
|
data, _ := db.Get(txLookupKey(hash))
|
||||||
|
if len(data) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return DecodeTxLookupEntry(data, db)
|
||||||
|
}
|
||||||
|
|
||||||
// writeTxLookupEntry stores a positional metadata for a transaction,
|
// writeTxLookupEntry stores a positional metadata for a transaction,
|
||||||
// enabling hash based transaction and receipt lookups.
|
// enabling hash based transaction and receipt lookups.
|
||||||
func writeTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash, numberBytes []byte) {
|
func writeTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash, numberBytes []byte) {
|
||||||
|
|
@ -95,6 +100,33 @@ func DeleteTxLookupEntries(db ethdb.KeyValueWriter, hashes []common.Hash) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteAllTxLookupEntries purges all the transaction indexes in the database.
|
||||||
|
// If condition is specified, only the entry with condition as True will be
|
||||||
|
// removed; If condition is not specified, the entry is deleted.
|
||||||
|
func DeleteAllTxLookupEntries(db ethdb.KeyValueStore, condition func([]byte) bool) {
|
||||||
|
iter := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, nil), common.HashLength+len(txLookupPrefix))
|
||||||
|
defer iter.Release()
|
||||||
|
|
||||||
|
batch := db.NewBatch()
|
||||||
|
for iter.Next() {
|
||||||
|
if condition == nil || condition(iter.Value()) {
|
||||||
|
batch.Delete(iter.Key())
|
||||||
|
}
|
||||||
|
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to delete transaction lookup entries", "err", err)
|
||||||
|
}
|
||||||
|
batch.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if batch.ValueSize() > 0 {
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to delete transaction lookup entries", "err", err)
|
||||||
|
}
|
||||||
|
batch.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ReadTransaction retrieves a specific transaction from the database, along with
|
// ReadTransaction retrieves a specific transaction from the database, along with
|
||||||
// its added positional metadata.
|
// its added positional metadata.
|
||||||
func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) {
|
func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) {
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,11 @@ type txIndexer struct {
|
||||||
// * 0: means the entire chain should be indexed
|
// * 0: means the entire chain should be indexed
|
||||||
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
|
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
|
||||||
// and all others shouldn't.
|
// and all others shouldn't.
|
||||||
limit uint64
|
limit uint64
|
||||||
|
|
||||||
|
// cutoff denotes the block number before which the chain segment should
|
||||||
|
// be pruned and not available locally.
|
||||||
|
cutoff uint64
|
||||||
db ethdb.Database
|
db ethdb.Database
|
||||||
progress chan chan TxIndexProgress
|
progress chan chan TxIndexProgress
|
||||||
term chan chan struct{}
|
term chan chan struct{}
|
||||||
|
|
@ -55,6 +59,7 @@ type txIndexer struct {
|
||||||
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
|
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
|
||||||
indexer := &txIndexer{
|
indexer := &txIndexer{
|
||||||
limit: limit,
|
limit: limit,
|
||||||
|
cutoff: chain.HistoryPruningCutoff(),
|
||||||
db: chain.db,
|
db: chain.db,
|
||||||
progress: make(chan chan TxIndexProgress),
|
progress: make(chan chan TxIndexProgress),
|
||||||
term: make(chan chan struct{}),
|
term: make(chan chan struct{}),
|
||||||
|
|
@ -64,7 +69,11 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
|
||||||
|
|
||||||
var msg string
|
var msg string
|
||||||
if limit == 0 {
|
if limit == 0 {
|
||||||
msg = "entire chain"
|
if indexer.cutoff == 0 {
|
||||||
|
msg = "entire chain"
|
||||||
|
} else {
|
||||||
|
msg = fmt.Sprintf("blocks since #%d", indexer.cutoff)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
msg = fmt.Sprintf("last %d blocks", limit)
|
msg = fmt.Sprintf("last %d blocks", limit)
|
||||||
}
|
}
|
||||||
|
|
@ -74,23 +83,31 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// run executes the scheduled indexing/unindexing task in a separate thread.
|
// run executes the scheduled indexing/unindexing task in a separate thread.
|
||||||
// If the stop channel is closed, the task should be terminated as soon as
|
// If the stop channel is closed, the task should terminate as soon as possible.
|
||||||
// possible, the done channel will be closed once the task is finished.
|
// The done channel will be closed once the task is complete.
|
||||||
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
|
//
|
||||||
|
// Existing transaction indexes are assumed to be valid, with both the head
|
||||||
|
// and tail above the configured cutoff.
|
||||||
|
func (indexer *txIndexer) run(head uint64, stop chan struct{}, done chan struct{}) {
|
||||||
defer func() { close(done) }()
|
defer func() { close(done) }()
|
||||||
|
|
||||||
// Short circuit if chain is empty and nothing to index.
|
// Short circuit if the chain is either empty, or entirely below the
|
||||||
if head == 0 {
|
// cutoff point.
|
||||||
|
if head == 0 || head < indexer.cutoff {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// The tail flag is not existent, it means the node is just initialized
|
// The tail flag is not existent, it means the node is just initialized
|
||||||
// and all blocks in the chain (part of them may from ancient store) are
|
// and all blocks in the chain (part of them may from ancient store) are
|
||||||
// not indexed yet, index the chain according to the configured limit.
|
// not indexed yet, index the chain according to the configured limit.
|
||||||
|
tail := rawdb.ReadTxIndexTail(indexer.db)
|
||||||
if tail == nil {
|
if tail == nil {
|
||||||
|
// Determine the first block for transaction indexing, taking the
|
||||||
|
// configured cutoff point into account.
|
||||||
from := uint64(0)
|
from := uint64(0)
|
||||||
if indexer.limit != 0 && head >= indexer.limit {
|
if indexer.limit != 0 && head >= indexer.limit {
|
||||||
from = head - indexer.limit + 1
|
from = head - indexer.limit + 1
|
||||||
}
|
}
|
||||||
|
from = max(from, indexer.cutoff)
|
||||||
rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)
|
rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -98,25 +115,82 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don
|
||||||
// present), while the whole chain are requested for indexing.
|
// present), while the whole chain are requested for indexing.
|
||||||
if indexer.limit == 0 || head < indexer.limit {
|
if indexer.limit == 0 || head < indexer.limit {
|
||||||
if *tail > 0 {
|
if *tail > 0 {
|
||||||
// It can happen when chain is rewound to a historical point which
|
from := max(uint64(0), indexer.cutoff)
|
||||||
// is even lower than the indexes tail, recap the indexing target
|
rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)
|
||||||
// to new head to avoid reading non-existent block bodies.
|
|
||||||
end := *tail
|
|
||||||
if end > head+1 {
|
|
||||||
end = head + 1
|
|
||||||
}
|
|
||||||
rawdb.IndexTransactions(indexer.db, 0, end, stop, true)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// The tail flag is existent, adjust the index range according to configured
|
// The tail flag is existent, adjust the index range according to configured
|
||||||
// limit and the latest chain head.
|
// limit and the latest chain head.
|
||||||
if head-indexer.limit+1 < *tail {
|
from := head - indexer.limit + 1
|
||||||
|
from = max(from, indexer.cutoff)
|
||||||
|
if from < *tail {
|
||||||
// Reindex a part of missing indices and rewind index tail to HEAD-limit
|
// Reindex a part of missing indices and rewind index tail to HEAD-limit
|
||||||
rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true)
|
rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)
|
||||||
} else {
|
} else {
|
||||||
// Unindex a part of stale indices and forward index tail to HEAD-limit
|
// Unindex a part of stale indices and forward index tail to HEAD-limit
|
||||||
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
|
rawdb.UnindexTransactions(indexer.db, *tail, from, stop, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// repair ensures that transaction indexes are in a valid state and invalidates
|
||||||
|
// them if they are not. The following cases are considered invalid:
|
||||||
|
// * The index tail is higher than the chain head.
|
||||||
|
// * The chain head is below the configured cutoff, but the index tail is not empty.
|
||||||
|
// * The index tail is below the configured cutoff, but it is not empty.
|
||||||
|
func (indexer *txIndexer) repair(head uint64) {
|
||||||
|
// If the transactions haven't been indexed yet, nothing to repair
|
||||||
|
tail := rawdb.ReadTxIndexTail(indexer.db)
|
||||||
|
if tail == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// The transaction index tail is higher than the chain head, which may occur
|
||||||
|
// when the chain is rewound to a historical height below the index tail.
|
||||||
|
// Purge the transaction indexes from the database. **It's not a common case
|
||||||
|
// to rewind the chain head below the index tail**.
|
||||||
|
if *tail > head {
|
||||||
|
// A crash may occur between the two delete operations,
|
||||||
|
// potentially leaving dangling indexes in the database.
|
||||||
|
// However, this is considered acceptable.
|
||||||
|
rawdb.DeleteTxIndexTail(indexer.db)
|
||||||
|
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
|
||||||
|
log.Warn("Purge transaction indexes", "head", head, "tail", *tail)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the entire chain is below the configured cutoff point,
|
||||||
|
// removing the tail of transaction indexing and purges the
|
||||||
|
// transaction indexes. **It's not a common case, as the cutoff
|
||||||
|
// is usually defined below the chain head**.
|
||||||
|
if head < indexer.cutoff {
|
||||||
|
// A crash may occur between the two delete operations,
|
||||||
|
// potentially leaving dangling indexes in the database.
|
||||||
|
// However, this is considered acceptable.
|
||||||
|
//
|
||||||
|
// The leftover indexes can't be unindexed by scanning
|
||||||
|
// the blocks as they are not guaranteed to be available.
|
||||||
|
// Traversing the database directly within the transaction
|
||||||
|
// index namespace might be slow and expensive, but we
|
||||||
|
// have no choice.
|
||||||
|
rawdb.DeleteTxIndexTail(indexer.db)
|
||||||
|
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
|
||||||
|
log.Warn("Purge transaction indexes", "head", head, "cutoff", indexer.cutoff)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// The chain head is above the cutoff while the tail is below the
|
||||||
|
// cutoff. Shift the tail to the cutoff point and remove the indexes
|
||||||
|
// below.
|
||||||
|
if *tail < indexer.cutoff {
|
||||||
|
// A crash may occur between the two delete operations,
|
||||||
|
// potentially leaving dangling indexes in the database.
|
||||||
|
// However, this is considered acceptable.
|
||||||
|
rawdb.WriteTxIndexTail(indexer.db, indexer.cutoff)
|
||||||
|
rawdb.DeleteAllTxLookupEntries(indexer.db, func(blob []byte) bool {
|
||||||
|
n := rawdb.DecodeTxLookupEntry(blob, indexer.db)
|
||||||
|
return n != nil && *n < indexer.cutoff
|
||||||
|
})
|
||||||
|
log.Warn("Purge transaction indexes below cutoff", "tail", *tail, "cutoff", indexer.cutoff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -127,39 +201,39 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
|
||||||
|
|
||||||
// Listening to chain events and manipulate the transaction indexes.
|
// Listening to chain events and manipulate the transaction indexes.
|
||||||
var (
|
var (
|
||||||
stop chan struct{} // Non-nil if background routine is active.
|
stop chan struct{} // Non-nil if background routine is active
|
||||||
done chan struct{} // Non-nil if background routine is active.
|
done chan struct{} // Non-nil if background routine is active
|
||||||
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
|
head = rawdb.ReadHeadBlock(indexer.db).NumberU64() // The latest announced chain head
|
||||||
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
|
|
||||||
|
|
||||||
headCh = make(chan ChainHeadEvent)
|
headCh = make(chan ChainHeadEvent)
|
||||||
sub = chain.SubscribeChainHeadEvent(headCh)
|
sub = chain.SubscribeChainHeadEvent(headCh)
|
||||||
)
|
)
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
// Validate the transaction indexes and repair if necessary
|
||||||
|
indexer.repair(head)
|
||||||
|
|
||||||
// Launch the initial processing if chain is not empty (head != genesis).
|
// Launch the initial processing if chain is not empty (head != genesis).
|
||||||
// This step is useful in these scenarios that chain has no progress.
|
// This step is useful in these scenarios that chain has no progress.
|
||||||
if head := rawdb.ReadHeadBlock(indexer.db); head != nil && head.Number().Uint64() != 0 {
|
if head != 0 {
|
||||||
stop = make(chan struct{})
|
stop = make(chan struct{})
|
||||||
done = make(chan struct{})
|
done = make(chan struct{})
|
||||||
lastHead = head.Number().Uint64()
|
go indexer.run(head, stop, done)
|
||||||
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done)
|
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case head := <-headCh:
|
case h := <-headCh:
|
||||||
if done == nil {
|
if done == nil {
|
||||||
stop = make(chan struct{})
|
stop = make(chan struct{})
|
||||||
done = make(chan struct{})
|
done = make(chan struct{})
|
||||||
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
|
go indexer.run(h.Header.Number.Uint64(), stop, done)
|
||||||
}
|
}
|
||||||
lastHead = head.Header.Number.Uint64()
|
head = h.Header.Number.Uint64()
|
||||||
case <-done:
|
case <-done:
|
||||||
stop = nil
|
stop = nil
|
||||||
done = nil
|
done = nil
|
||||||
lastTail = rawdb.ReadTxIndexTail(indexer.db)
|
|
||||||
case ch := <-indexer.progress:
|
case ch := <-indexer.progress:
|
||||||
ch <- indexer.report(lastHead, lastTail)
|
ch <- indexer.report(head)
|
||||||
case ch := <-indexer.term:
|
case ch := <-indexer.term:
|
||||||
if stop != nil {
|
if stop != nil {
|
||||||
close(stop)
|
close(stop)
|
||||||
|
|
@ -175,12 +249,27 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// report returns the tx indexing progress.
|
// report returns the tx indexing progress.
|
||||||
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
|
func (indexer *txIndexer) report(head uint64) TxIndexProgress {
|
||||||
|
// Special case if the head is even below the cutoff,
|
||||||
|
// nothing to index.
|
||||||
|
if head < indexer.cutoff {
|
||||||
|
return TxIndexProgress{
|
||||||
|
Indexed: 0,
|
||||||
|
Remaining: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Compute how many blocks are supposed to be indexed
|
||||||
total := indexer.limit
|
total := indexer.limit
|
||||||
if indexer.limit == 0 || total > head {
|
if indexer.limit == 0 || total > head {
|
||||||
total = head + 1 // genesis included
|
total = head + 1 // genesis included
|
||||||
}
|
}
|
||||||
|
length := head - indexer.cutoff + 1 // all available chain for indexing
|
||||||
|
if total > length {
|
||||||
|
total = length
|
||||||
|
}
|
||||||
|
// Compute how many blocks have been indexed
|
||||||
var indexed uint64
|
var indexed uint64
|
||||||
|
tail := rawdb.ReadTxIndexTail(indexer.db)
|
||||||
if tail != nil {
|
if tail != nil {
|
||||||
indexed = head - *tail + 1
|
indexed = head - *tail + 1
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,46 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func verifyIndexes(t *testing.T, db ethdb.Database, block *types.Block, exist bool) {
|
||||||
|
for _, tx := range block.Transactions() {
|
||||||
|
lookup := rawdb.ReadTxLookupEntry(db, tx.Hash())
|
||||||
|
if exist && lookup == nil {
|
||||||
|
t.Fatalf("missing %d %x", block.NumberU64(), tx.Hash().Hex())
|
||||||
|
}
|
||||||
|
if !exist && lookup != nil {
|
||||||
|
t.Fatalf("unexpected %d %x", block.NumberU64(), tx.Hash().Hex())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verify(t *testing.T, db ethdb.Database, blocks []*types.Block, expTail uint64) {
|
||||||
|
tail := rawdb.ReadTxIndexTail(db)
|
||||||
|
if tail == nil {
|
||||||
|
t.Fatal("Failed to write tx index tail")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if *tail != expTail {
|
||||||
|
t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail)
|
||||||
|
}
|
||||||
|
for _, b := range blocks {
|
||||||
|
if b.Number().Uint64() < *tail {
|
||||||
|
verifyIndexes(t, db, b, false)
|
||||||
|
} else {
|
||||||
|
verifyIndexes(t, db, b, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyNoIndex(t *testing.T, db ethdb.Database, blocks []*types.Block) {
|
||||||
|
tail := rawdb.ReadTxIndexTail(db)
|
||||||
|
if tail != nil {
|
||||||
|
t.Fatalf("Unexpected tx index tail %d", *tail)
|
||||||
|
}
|
||||||
|
for _, b := range blocks {
|
||||||
|
verifyIndexes(t, db, b, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestTxIndexer tests the functionalities for managing transaction indexes.
|
// TestTxIndexer tests the functionalities for managing transaction indexes.
|
||||||
func TestTxIndexer(t *testing.T) {
|
func TestTxIndexer(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
|
|
@ -50,163 +90,29 @@ func TestTxIndexer(t *testing.T) {
|
||||||
gen.AddTx(tx)
|
gen.AddTx(tx)
|
||||||
nonce += 1
|
nonce += 1
|
||||||
})
|
})
|
||||||
|
|
||||||
// verifyIndexes checks if the transaction indexes are present or not
|
|
||||||
// of the specified block.
|
|
||||||
verifyIndexes := func(db ethdb.Database, number uint64, exist bool) {
|
|
||||||
if number == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
block := blocks[number-1]
|
|
||||||
for _, tx := range block.Transactions() {
|
|
||||||
lookup := rawdb.ReadTxLookupEntry(db, tx.Hash())
|
|
||||||
if exist && lookup == nil {
|
|
||||||
t.Fatalf("missing %d %x", number, tx.Hash().Hex())
|
|
||||||
}
|
|
||||||
if !exist && lookup != nil {
|
|
||||||
t.Fatalf("unexpected %d %x", number, tx.Hash().Hex())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
verify := func(db ethdb.Database, expTail uint64, indexer *txIndexer) {
|
|
||||||
tail := rawdb.ReadTxIndexTail(db)
|
|
||||||
if tail == nil {
|
|
||||||
t.Fatal("Failed to write tx index tail")
|
|
||||||
}
|
|
||||||
if *tail != expTail {
|
|
||||||
t.Fatalf("Unexpected tx index tail, want %v, got %d", expTail, *tail)
|
|
||||||
}
|
|
||||||
if *tail != 0 {
|
|
||||||
for number := uint64(0); number < *tail; number += 1 {
|
|
||||||
verifyIndexes(db, number, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for number := *tail; number <= chainHead; number += 1 {
|
|
||||||
verifyIndexes(db, number, true)
|
|
||||||
}
|
|
||||||
progress := indexer.report(chainHead, tail)
|
|
||||||
if !progress.Done() {
|
|
||||||
t.Fatalf("Expect fully indexed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var cases = []struct {
|
var cases = []struct {
|
||||||
limitA uint64
|
limits []uint64
|
||||||
tailA uint64
|
tails []uint64
|
||||||
limitB uint64
|
|
||||||
tailB uint64
|
|
||||||
limitC uint64
|
|
||||||
tailC uint64
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
// LimitA: 0
|
limits: []uint64{0, 1, 64, 129, 0},
|
||||||
// TailA: 0
|
tails: []uint64{0, 128, 65, 0, 0},
|
||||||
//
|
|
||||||
// all blocks are indexed
|
|
||||||
limitA: 0,
|
|
||||||
tailA: 0,
|
|
||||||
|
|
||||||
// LimitB: 1
|
|
||||||
// TailB: 128
|
|
||||||
//
|
|
||||||
// block-128 is indexed
|
|
||||||
limitB: 1,
|
|
||||||
tailB: 128,
|
|
||||||
|
|
||||||
// LimitB: 64
|
|
||||||
// TailB: 65
|
|
||||||
//
|
|
||||||
// block [65, 128] are indexed
|
|
||||||
limitC: 64,
|
|
||||||
tailC: 65,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// LimitA: 64
|
limits: []uint64{64, 1, 64, 0},
|
||||||
// TailA: 65
|
tails: []uint64{65, 128, 65, 0},
|
||||||
//
|
|
||||||
// block [65, 128] are indexed
|
|
||||||
limitA: 64,
|
|
||||||
tailA: 65,
|
|
||||||
|
|
||||||
// LimitB: 1
|
|
||||||
// TailB: 128
|
|
||||||
//
|
|
||||||
// block-128 is indexed
|
|
||||||
limitB: 1,
|
|
||||||
tailB: 128,
|
|
||||||
|
|
||||||
// LimitB: 64
|
|
||||||
// TailB: 65
|
|
||||||
//
|
|
||||||
// block [65, 128] are indexed
|
|
||||||
limitC: 64,
|
|
||||||
tailC: 65,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// LimitA: 127
|
limits: []uint64{127, 1, 64, 0},
|
||||||
// TailA: 2
|
tails: []uint64{2, 128, 65, 0},
|
||||||
//
|
|
||||||
// block [2, 128] are indexed
|
|
||||||
limitA: 127,
|
|
||||||
tailA: 2,
|
|
||||||
|
|
||||||
// LimitB: 1
|
|
||||||
// TailB: 128
|
|
||||||
//
|
|
||||||
// block-128 is indexed
|
|
||||||
limitB: 1,
|
|
||||||
tailB: 128,
|
|
||||||
|
|
||||||
// LimitB: 64
|
|
||||||
// TailB: 65
|
|
||||||
//
|
|
||||||
// block [65, 128] are indexed
|
|
||||||
limitC: 64,
|
|
||||||
tailC: 65,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// LimitA: 128
|
limits: []uint64{128, 1, 64, 0},
|
||||||
// TailA: 1
|
tails: []uint64{1, 128, 65, 0},
|
||||||
//
|
|
||||||
// block [2, 128] are indexed
|
|
||||||
limitA: 128,
|
|
||||||
tailA: 1,
|
|
||||||
|
|
||||||
// LimitB: 1
|
|
||||||
// TailB: 128
|
|
||||||
//
|
|
||||||
// block-128 is indexed
|
|
||||||
limitB: 1,
|
|
||||||
tailB: 128,
|
|
||||||
|
|
||||||
// LimitB: 64
|
|
||||||
// TailB: 65
|
|
||||||
//
|
|
||||||
// block [65, 128] are indexed
|
|
||||||
limitC: 64,
|
|
||||||
tailC: 65,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// LimitA: 129
|
limits: []uint64{129, 1, 64, 0},
|
||||||
// TailA: 0
|
tails: []uint64{0, 128, 65, 0},
|
||||||
//
|
|
||||||
// block [0, 128] are indexed
|
|
||||||
limitA: 129,
|
|
||||||
tailA: 0,
|
|
||||||
|
|
||||||
// LimitB: 1
|
|
||||||
// TailB: 128
|
|
||||||
//
|
|
||||||
// block-128 is indexed
|
|
||||||
limitB: 1,
|
|
||||||
tailB: 128,
|
|
||||||
|
|
||||||
// LimitB: 64
|
|
||||||
// TailB: 65
|
|
||||||
//
|
|
||||||
// block [65, 128] are indexed
|
|
||||||
limitC: 64,
|
|
||||||
tailC: 65,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
|
|
@ -215,26 +121,332 @@ func TestTxIndexer(t *testing.T) {
|
||||||
|
|
||||||
// Index the initial blocks from ancient store
|
// Index the initial blocks from ancient store
|
||||||
indexer := &txIndexer{
|
indexer := &txIndexer{
|
||||||
limit: c.limitA,
|
limit: 0,
|
||||||
db: db,
|
db: db,
|
||||||
progress: make(chan chan TxIndexProgress),
|
progress: make(chan chan TxIndexProgress),
|
||||||
}
|
}
|
||||||
indexer.run(nil, 128, make(chan struct{}), make(chan struct{}))
|
for i, limit := range c.limits {
|
||||||
verify(db, c.tailA, indexer)
|
indexer.limit = limit
|
||||||
|
indexer.run(chainHead, make(chan struct{}), make(chan struct{}))
|
||||||
indexer.limit = c.limitB
|
verify(t, db, blocks, c.tails[i])
|
||||||
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
|
}
|
||||||
verify(db, c.tailB, indexer)
|
db.Close()
|
||||||
|
}
|
||||||
indexer.limit = c.limitC
|
}
|
||||||
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
|
|
||||||
verify(db, c.tailC, indexer)
|
func TestTxIndexerRepair(t *testing.T) {
|
||||||
|
var (
|
||||||
// Recover all indexes
|
testBankKey, _ = crypto.GenerateKey()
|
||||||
indexer.limit = 0
|
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
|
||||||
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
|
testBankFunds = big.NewInt(1000000000000000000)
|
||||||
verify(db, 0, indexer)
|
|
||||||
|
gspec = &Genesis{
|
||||||
|
Config: params.TestChainConfig,
|
||||||
|
Alloc: types.GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
|
||||||
|
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||||
|
}
|
||||||
|
engine = ethash.NewFaker()
|
||||||
|
nonce = uint64(0)
|
||||||
|
chainHead = uint64(128)
|
||||||
|
)
|
||||||
|
_, blocks, receipts := GenerateChainWithGenesis(gspec, engine, int(chainHead), func(i int, gen *BlockGen) {
|
||||||
|
tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
|
||||||
|
gen.AddTx(tx)
|
||||||
|
nonce += 1
|
||||||
|
})
|
||||||
|
tailPointer := func(n uint64) *uint64 {
|
||||||
|
return &n
|
||||||
|
}
|
||||||
|
var cases = []struct {
|
||||||
|
limit uint64
|
||||||
|
head uint64
|
||||||
|
cutoff uint64
|
||||||
|
expTail *uint64
|
||||||
|
}{
|
||||||
|
// if *tail > head => purge indexes
|
||||||
|
{
|
||||||
|
limit: 0,
|
||||||
|
head: chainHead / 2,
|
||||||
|
cutoff: 0,
|
||||||
|
expTail: tailPointer(0),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 1, // tail = 128
|
||||||
|
head: chainHead / 2, // newhead = 64
|
||||||
|
cutoff: 0,
|
||||||
|
expTail: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 64, // tail = 65
|
||||||
|
head: chainHead / 2, // newhead = 64
|
||||||
|
cutoff: 0,
|
||||||
|
expTail: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 65, // tail = 64
|
||||||
|
head: chainHead / 2, // newhead = 64
|
||||||
|
cutoff: 0,
|
||||||
|
expTail: tailPointer(64),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 66, // tail = 63
|
||||||
|
head: chainHead / 2, // newhead = 64
|
||||||
|
cutoff: 0,
|
||||||
|
expTail: tailPointer(63),
|
||||||
|
},
|
||||||
|
|
||||||
|
// if tail < cutoff => remove indexes below cutoff
|
||||||
|
{
|
||||||
|
limit: 0, // tail = 0
|
||||||
|
head: chainHead, // head = 128
|
||||||
|
cutoff: chainHead, // cutoff = 128
|
||||||
|
expTail: tailPointer(chainHead),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 1, // tail = 128
|
||||||
|
head: chainHead, // head = 128
|
||||||
|
cutoff: chainHead, // cutoff = 128
|
||||||
|
expTail: tailPointer(128),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 2, // tail = 127
|
||||||
|
head: chainHead, // head = 128
|
||||||
|
cutoff: chainHead, // cutoff = 128
|
||||||
|
expTail: tailPointer(chainHead),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 2, // tail = 127
|
||||||
|
head: chainHead, // head = 128
|
||||||
|
cutoff: chainHead / 2, // cutoff = 64
|
||||||
|
expTail: tailPointer(127),
|
||||||
|
},
|
||||||
|
|
||||||
|
// if head < cutoff => purge indexes
|
||||||
|
{
|
||||||
|
limit: 0, // tail = 0
|
||||||
|
head: chainHead, // head = 128
|
||||||
|
cutoff: 2 * chainHead, // cutoff = 256
|
||||||
|
expTail: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
limit: 64, // tail = 65
|
||||||
|
head: chainHead, // head = 128
|
||||||
|
cutoff: chainHead / 2, // cutoff = 64
|
||||||
|
expTail: tailPointer(65),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
|
||||||
|
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...))
|
||||||
|
|
||||||
|
// Index the initial blocks from ancient store
|
||||||
|
indexer := &txIndexer{
|
||||||
|
limit: c.limit,
|
||||||
|
db: db,
|
||||||
|
progress: make(chan chan TxIndexProgress),
|
||||||
|
}
|
||||||
|
indexer.run(chainHead, make(chan struct{}), make(chan struct{}))
|
||||||
|
|
||||||
|
indexer.cutoff = c.cutoff
|
||||||
|
indexer.repair(c.head)
|
||||||
|
|
||||||
|
if c.expTail == nil {
|
||||||
|
verifyNoIndex(t, db, blocks)
|
||||||
|
} else {
|
||||||
|
verify(t, db, blocks, *c.expTail)
|
||||||
|
}
|
||||||
|
db.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTxIndexerReport(t *testing.T) {
|
||||||
|
var (
|
||||||
|
testBankKey, _ = crypto.GenerateKey()
|
||||||
|
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
|
||||||
|
testBankFunds = big.NewInt(1000000000000000000)
|
||||||
|
|
||||||
|
gspec = &Genesis{
|
||||||
|
Config: params.TestChainConfig,
|
||||||
|
Alloc: types.GenesisAlloc{testBankAddress: {Balance: testBankFunds}},
|
||||||
|
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||||
|
}
|
||||||
|
engine = ethash.NewFaker()
|
||||||
|
nonce = uint64(0)
|
||||||
|
chainHead = uint64(128)
|
||||||
|
)
|
||||||
|
_, blocks, receipts := GenerateChainWithGenesis(gspec, engine, int(chainHead), func(i int, gen *BlockGen) {
|
||||||
|
tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0xdeadbeef"), big.NewInt(1000), params.TxGas, big.NewInt(10*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
|
||||||
|
gen.AddTx(tx)
|
||||||
|
nonce += 1
|
||||||
|
})
|
||||||
|
tailPointer := func(n uint64) *uint64 {
|
||||||
|
return &n
|
||||||
|
}
|
||||||
|
var cases = []struct {
|
||||||
|
head uint64
|
||||||
|
limit uint64
|
||||||
|
cutoff uint64
|
||||||
|
tail *uint64
|
||||||
|
expIndexed uint64
|
||||||
|
expRemaining uint64
|
||||||
|
}{
|
||||||
|
// The entire chain is supposed to be indexed
|
||||||
|
{
|
||||||
|
// head = 128, limit = 0, cutoff = 0 => all: 129
|
||||||
|
head: chainHead,
|
||||||
|
limit: 0,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 0
|
||||||
|
tail: tailPointer(0),
|
||||||
|
expIndexed: 129,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 0, cutoff = 0 => all: 129
|
||||||
|
head: chainHead,
|
||||||
|
limit: 0,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 1
|
||||||
|
tail: tailPointer(1),
|
||||||
|
expIndexed: 128,
|
||||||
|
expRemaining: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 0, cutoff = 0 => all: 129
|
||||||
|
head: chainHead,
|
||||||
|
limit: 0,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 128
|
||||||
|
tail: tailPointer(chainHead),
|
||||||
|
expIndexed: 1,
|
||||||
|
expRemaining: 128,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 256, cutoff = 0 => all: 129
|
||||||
|
head: chainHead,
|
||||||
|
limit: 256,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 0
|
||||||
|
tail: tailPointer(0),
|
||||||
|
expIndexed: 129,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
|
||||||
|
// The chain with specific range is supposed to be indexed
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 0 => index: [65, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 0, part of them need to be unindexed
|
||||||
|
tail: tailPointer(0),
|
||||||
|
expIndexed: 129,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 0 => index: [65, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 64, one of them needs to be unindexed
|
||||||
|
tail: tailPointer(64),
|
||||||
|
expIndexed: 65,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 0 => index: [65, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 65, all of them have been indexed
|
||||||
|
tail: tailPointer(65),
|
||||||
|
expIndexed: 64,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 0 => index: [65, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 0,
|
||||||
|
|
||||||
|
// tail = 66, one of them has to be indexed
|
||||||
|
tail: tailPointer(66),
|
||||||
|
expIndexed: 63,
|
||||||
|
expRemaining: 1,
|
||||||
|
},
|
||||||
|
|
||||||
|
// The chain with configured cutoff, the chain range could be capped
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 66 => index: [66, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 66,
|
||||||
|
|
||||||
|
// tail = 0, part of them need to be unindexed
|
||||||
|
tail: tailPointer(0),
|
||||||
|
expIndexed: 129,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 66 => index: [66, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 66,
|
||||||
|
|
||||||
|
// tail = 66, all of them have been indexed
|
||||||
|
tail: tailPointer(66),
|
||||||
|
expIndexed: 63,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 66 => index: [66, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 64,
|
||||||
|
cutoff: 66,
|
||||||
|
|
||||||
|
// tail = 67, one of them has to be indexed
|
||||||
|
tail: tailPointer(67),
|
||||||
|
expIndexed: 62,
|
||||||
|
expRemaining: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// head = 128, limit = 64, cutoff = 256 => index: [66, 128]
|
||||||
|
head: chainHead,
|
||||||
|
limit: 0,
|
||||||
|
cutoff: 256,
|
||||||
|
tail: nil,
|
||||||
|
expIndexed: 0,
|
||||||
|
expRemaining: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
|
||||||
|
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...))
|
||||||
|
|
||||||
|
// Index the initial blocks from ancient store
|
||||||
|
indexer := &txIndexer{
|
||||||
|
limit: c.limit,
|
||||||
|
cutoff: c.cutoff,
|
||||||
|
db: db,
|
||||||
|
progress: make(chan chan TxIndexProgress),
|
||||||
|
}
|
||||||
|
if c.tail != nil {
|
||||||
|
rawdb.WriteTxIndexTail(db, *c.tail)
|
||||||
|
}
|
||||||
|
p := indexer.report(c.head)
|
||||||
|
if p.Indexed != c.expIndexed {
|
||||||
|
t.Fatalf("Unexpected indexed: %d, expected: %d", p.Indexed, c.expIndexed)
|
||||||
|
}
|
||||||
|
if p.Remaining != c.expRemaining {
|
||||||
|
t.Fatalf("Unexpected remaining: %d, expected: %d", p.Remaining, c.expRemaining)
|
||||||
|
}
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue