diff --git a/common/range.go b/common/range.go
index c3a26ea7f5..aefd0c16d9 100644
--- a/common/range.go
+++ b/common/range.go
@@ -27,7 +27,11 @@ type Range[T uint32 | uint64] struct {
// NewRange creates a new range based of first element and number of elements.
func NewRange[T uint32 | uint64](first, count T) Range[T] {
- return Range[T]{first, first + count}
+ afterLast := first + count
+ if afterLast < first {
+ panic("range overflow")
+ }
+ return Range[T]{first, afterLast}
}
// First returns the first element of the range.
@@ -97,6 +101,12 @@ func (r Range[T]) Intersection(q Range[T]) Range[T] {
// Union returns the union of two ranges. Panics for gapped ranges.
func (r Range[T]) Union(q Range[T]) Range[T] {
+ if r.IsEmpty() {
+ return q
+ }
+ if q.IsEmpty() {
+ return r
+ }
if max(r.first, q.first) > min(r.afterLast, q.afterLast) {
panic("cannot create union; gap between ranges")
}
diff --git a/core/blockchain.go b/core/blockchain.go
index d41f301243..2b097c82e9 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -335,6 +335,7 @@ type BlockChain struct {
blockProcCounter int32
scope event.SubscriptionScope
genesisBlock *types.Block
+ indexServers indexServers
// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
@@ -569,9 +570,15 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine,
log.Info("Failed to setup size tracker", "err", err)
}
}
+ bc.indexServers.init(bc)
return bc, nil
}
+// RegisterIndexer registers a new indexer to the chain.
+func (bc *BlockChain) RegisterIndexer(indexer Indexer, name string, needBodies, needReceipts bool) {
+ bc.indexServers.register(indexer, name, needBodies, needReceipts)
+}
+
func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
@@ -684,6 +691,7 @@ func (bc *BlockChain) loadLastState() error {
if head := rawdb.ReadFinalizedBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFinalBlock.Store(block.Header())
+ bc.indexServers.setFinalBlock(block.NumberU64())
headFinalizedBlockGauge.Update(int64(block.NumberU64()))
bc.currentSafeBlock.Store(block.Header())
headSafeBlockGauge.Update(int64(block.NumberU64()))
@@ -731,6 +739,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
return errors.New("unexpected database tail")
}
bc.historyPrunePoint.Store(predefinedPoint)
+ bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
return nil
case history.KeepPostMerge:
@@ -752,6 +761,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
return errors.New("unexpected database tail")
}
bc.historyPrunePoint.Store(predefinedPoint)
+ bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
return nil
default:
@@ -805,9 +815,11 @@ func (bc *BlockChain) SetFinalized(header *types.Header) {
if header != nil {
rawdb.WriteFinalizedBlockHash(bc.db, header.Hash())
headFinalizedBlockGauge.Update(int64(header.Number.Uint64()))
+ bc.indexServers.setFinalBlock(header.Number.Uint64())
} else {
rawdb.WriteFinalizedBlockHash(bc.db, common.Hash{})
headFinalizedBlockGauge.Update(0)
+ bc.indexServers.setFinalBlock(0)
}
}
@@ -1115,6 +1127,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
+ bc.indexServers.revert(bc.CurrentBlock())
// Clear safe block, finalized block if needed
headBlock := bc.CurrentBlock()
@@ -1203,6 +1216,7 @@ func (bc *BlockChain) Reset() error {
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
+ bc.indexServers.revert(genesis.Header())
// Dump the entire block chain and purge the caches
if err := bc.SetHead(0); err != nil {
return err
@@ -1219,6 +1233,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
log.Crit("Failed to write genesis block", "err", err)
}
bc.writeHeadBlock(genesis)
+ bc.indexServers.broadcast(genesis)
// Last update all in-memory chain markers
bc.genesisBlock = genesis
@@ -1335,6 +1350,7 @@ func (bc *BlockChain) stopWithoutSaving() {
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
+ bc.indexServers.stop()
bc.stopWithoutSaving()
// Ensure that the entirety of the state snapshot is journaled to disk.
@@ -1636,6 +1652,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
}
}
bc.writeHeadBlock(block)
+ bc.indexServers.broadcast(block)
return nil
}
@@ -1654,7 +1671,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
start = time.Now()
)
rawdb.WriteBlock(batch, block)
- rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
+ blockHash := block.Hash()
+ rawdb.WriteReceipts(batch, blockHash, block.NumberU64(), receipts)
+ bc.indexServers.cacheRawReceipts(blockHash, receipts)
rawdb.WritePreimages(batch, statedb.Preimages())
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
@@ -1768,6 +1787,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// Set new head.
bc.writeHeadBlock(block)
+ bc.indexServers.broadcast(block)
bc.chainFeed.Send(ChainEvent{
Header: block.Header(),
@@ -1839,10 +1859,12 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, setHe
}
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
+ bc.indexServers.setBlockProcessing(true)
bc.blockProcFeed.Send(true)
}
defer func() {
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
+ bc.indexServers.setBlockProcessing(false)
bc.blockProcFeed.Send(false)
}
}()
@@ -2522,6 +2544,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
return errInvalidNewChain
}
}
+ bc.indexServers.revert(commonBlock)
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Info
@@ -2619,6 +2642,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
}
// Update the head block
bc.writeHeadBlock(block)
+ bc.indexServers.broadcast(block)
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
@@ -2696,6 +2720,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
}
}
bc.writeHeadBlock(head)
+ bc.indexServers.broadcast(head)
// Emit events
receipts, logs := bc.collectReceiptsAndLogs(head, false)
diff --git a/core/index_server.go b/core/index_server.go
new file mode 100644
index 0000000000..f0bc82d6a1
--- /dev/null
+++ b/core/index_server.go
@@ -0,0 +1,720 @@
+// Copyright 2025 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package core implements the Ethereum consensus protocol.
+package core
+
+import (
+ "math"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/lru"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+const (
+ busyDelay = time.Second // indexer status polling frequency when not ready
+ maxHistoricPrefetch = 16 // size of block data pre-fetch queue
+ rawReceiptsCacheSize = 8
+ logFrequency = time.Second * 20 // log info frequency during long indexing/unindexing process
+ headLogDelay = time.Second // head indexing log info delay (do not log if finished faster)
+)
+
+type Indexer interface {
+ // AddBlockData delivers a header and receipts belonging to a block that is
+ // either a direct descendant of the latest delivered head or the first one
+ // in the last requested range.
+ // The current ready/busy status and the requested historic range are returned.
+ // Note that the indexer should never block even if it is busy processing.
+ // It is allowed to re-request the delivered blocks later if the indexer could
+ // not process them when first delivered.
+ AddBlockData(header *types.Header, body *types.Body, receipts types.Receipts) (ready bool, needBlocks common.Range[uint64])
+ // Revert rewinds the index to the given head block number. Subsequent
+ // AddBlockData calls will deliver blocks starting from this point.
+ Revert(blockNumber uint64)
+ // Status returns the current ready/busy status and the requested historic range.
+ // Only the new head blocks are delivered if the indexer reports busy status.
+ Status() (ready bool, needBlocks common.Range[uint64])
+ // SetHistoryCutoff signals the historical cutoff point to the indexer.
+ // Note that any block number that is consistently being requested in the
+ // needBlocks response that is not older than the cutoff point is guaranteed
+ // to be delivered eventually. If the required data belonging to certain
+ // block numbers is missing then the cutoff point is moved after the missing
+ // section in order to maintain this guarantee.
+ SetHistoryCutoff(blockNumber uint64)
+ // SetFinalized signals the latest finalized block number to the indexer.
+ SetFinalized(blockNumber uint64)
+ // Suspended signals to the indexer that block processing has started and
+ // any non-essential asynchronous tasks of the indexer should be suspended.
+ // The next AddBlockData call signals the end of the suspended state.
+ // Note that if multiple blocks are inserted then the indexer is only
+ // suspended once, before the first block processing begins, so according
+ // to the rule above it will not be suspended while processing the rest of
+ // the batch. This behavior should be fine because indexing can happen in
+ // parallel with forward syncing, the purpose of the suspend mechanism is
+ // to handle historical index backfilling with a lower priority so that it
+ // does not increase block latency.
+ Suspended()
+ // Stop initiates indexer shutdown. No subsequent calls are made through this
+ // interface after Stop.
+ Stop()
+}
+
+// indexServers operates as a part of BlockChain and can serve multiple chain
+// indexers that implement the Indexer interface.
+type indexServers struct {
+ lock sync.Mutex
+ servers []*indexServer
+ chain *BlockChain
+ rawReceiptsCache *lru.Cache[common.Hash, []*types.Receipt]
+
+ lastHead *types.Header
+ lastHeadBody *types.Body
+ lastHeadReceipts types.Receipts
+ finalBlock, historyCutoff uint64
+
+ closeCh chan struct{}
+ closeWg sync.WaitGroup
+}
+
+// init initializes indexServers.
+func (f *indexServers) init(chain *BlockChain) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ f.chain = chain
+ f.lastHead = chain.CurrentBlock()
+ if f.lastHead != nil {
+ f.lastHeadBody = chain.GetBody(f.lastHead.Hash())
+ f.lastHeadReceipts = chain.GetRawReceipts(f.lastHead.Hash(), f.lastHead.Number.Uint64())
+ }
+ f.closeCh = make(chan struct{})
+ f.rawReceiptsCache = lru.NewCache[common.Hash, []*types.Receipt](rawReceiptsCacheSize)
+}
+
+// stop shuts down all registered Indexers and their serving goroutines.
+func (f *indexServers) stop() {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ close(f.closeCh)
+ f.closeWg.Wait()
+ f.servers = nil
+}
+
+// register adds a new Indexer to the chain.
+func (f *indexServers) register(indexer Indexer, name string, needBodies, needReceipts bool) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ server := &indexServer{
+ parent: f,
+ indexer: indexer,
+ sendTimer: time.NewTimer(0),
+ name: name,
+ statusCh: make(chan indexerStatus, 1),
+ blockDataCh: make(chan blockData, maxHistoricPrefetch),
+ suspendCh: make(chan bool, 1),
+ needBodies: needBodies,
+ needReceipts: needReceipts,
+ }
+ f.servers = append(f.servers, server)
+ f.closeWg.Add(2)
+ indexer.SetHistoryCutoff(f.historyCutoff)
+ indexer.SetFinalized(f.finalBlock)
+ if f.lastHead != nil && f.lastHeadBody != nil && f.lastHeadReceipts != nil {
+ server.sendHeadBlockData(f.lastHead, f.lastHeadBody, f.lastHeadReceipts)
+ }
+ go server.historicReadLoop()
+ go server.historicSendLoop()
+}
+
+// cacheRawReceipts caches a set of raw receipts during block processing in order
+// to avoid having to read it back from the database during broadcast.
+func (f *indexServers) cacheRawReceipts(blockHash common.Hash, blockReceipts types.Receipts) {
+ f.rawReceiptsCache.Add(blockHash, blockReceipts)
+}
+
+// broadcast sends a new head block to all registered Indexer instances.
+func (f *indexServers) broadcast(block *types.Block) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ // Note that individual Indexer servers might ignore block bodies and
+ // receipts. We still always fetch receipts for simplicity because in the
+ // typical case it is cached during block processing and costs nothing.
+ blockHash := block.Hash()
+ blockReceipts, _ := f.rawReceiptsCache.Get(blockHash)
+ if blockReceipts == nil {
+ blockReceipts = f.chain.GetRawReceipts(blockHash, block.NumberU64())
+ if blockReceipts == nil {
+ log.Error("Receipts belonging to new head are missing", "number", block.NumberU64(), "hash", block.Hash())
+ return
+ }
+ f.rawReceiptsCache.Add(blockHash, blockReceipts)
+ }
+ f.lastHead, f.lastHeadBody, f.lastHeadReceipts = block.Header(), block.Body(), blockReceipts
+ for _, server := range f.servers {
+ server.sendHeadBlockData(block.Header(), block.Body(), blockReceipts)
+ }
+}
+
+// revert notifies all registered Indexer instances about the chain being rolled
+// back to the given head or last common ancestor.
+func (f *indexServers) revert(header *types.Header) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ for _, server := range f.servers {
+ server.revert(header)
+ }
+}
+
+// setFinalBlock notifies all Indexer instances about the latest finalized block.
+func (f *indexServers) setFinalBlock(blockNumber uint64) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.finalBlock == blockNumber {
+ return
+ }
+ f.finalBlock = blockNumber
+ for _, server := range f.servers {
+ server.setFinalBlock(blockNumber)
+ }
+}
+
+// setHistoryCutoff notifies all Indexer instances about the history cutoff point.
+// The indexers cannot expect any data being delivered if needBlocks.First() is
+// before this point.
+func (f *indexServers) setHistoryCutoff(blockNumber uint64) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.historyCutoff == blockNumber {
+ return
+ }
+ f.historyCutoff = blockNumber
+ for _, server := range f.servers {
+ server.setHistoryCutoff(blockNumber)
+ }
+}
+
+// setBlockProcessing suspends serving historical blocks requested by the indexers
+// while a chain segment is being processed and added to the chain.
+func (f *indexServers) setBlockProcessing(processing bool) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ for _, server := range f.servers {
+ server.setBlockProcessing(processing)
+ }
+}
+
+// indexServer sends updates to a single Indexer instance. It sends all new heads
+// and reorg events, and also sends historical block data upon request.
+// It guarantees that Indexer functions are never called concurrently and also
+// they always present a consistent view of the chain to the indexer.
+type indexServer struct {
+ lock sync.Mutex
+ parent *indexServers
+ indexer Indexer // always call under mutex lock; never call after stopped
+ stopped bool
+
+ lastHead *types.Header
+ sendStatus indexerStatus
+ statusCh chan indexerStatus
+ blockDataCh chan blockData
+ suspendCh chan bool
+ testSuspendHookCh chan struct{} // initialized by test, capacity = 1
+ sendTimer *time.Timer
+ historyCutoff, missingBlockCutoff uint64
+ needBodies, needReceipts bool
+
+ readStatus indexerStatus
+ readPointer uint64 // next block to be queued
+
+ name string
+ processed uint64
+ logged bool
+ startedAt, lastLoggedAt time.Time
+ lastHistoryErrorLog time.Time
+}
+
+// indexerStatus is maintained by the historicSendLoop goroutine and all changes
+// are sent to the historicReadLoop goroutine through statusCh.
+type indexerStatus struct {
+ ready bool // last feedback received from the indexer
+ needBlocks common.Range[uint64] // last feedback received from the indexer
+ suspended bool // suspend historic block delivery during block processing
+ resetQueueCount uint64 // total number of queue resets
+ revertCount, lastRevertBlock uint64 // detect entries potentially expired by a revert/reorg
+}
+
+// isNextExpected returns true if the received blockData (potentially based on a
+// previously sent indexerStatus) is still guaranteed to be valid and the one
+// expected by the indexer according to the latest indexerStatus.
+func (s *indexerStatus) isNextExpected(b blockData) bool {
+ if s.needBlocks.IsEmpty() || s.needBlocks.First() != b.blockNumber {
+ return false // not the expected next block number or no historical blocks expected at all
+ }
+ // block number is the expected one; check if a reorg might have invalidated it
+ switch s.revertCount {
+ case b.revertCount:
+ return true // no reorgs happened since the collection of block data
+ case b.revertCount + 1:
+ // one reorg happened to s.lastRevertBlock; b is valid if not newer than this
+ return b.blockNumber <= s.lastRevertBlock
+ default:
+ // multiple reorgs happened; previous revert blocks are not remembered
+ // so we don't know if b is still valid and therefore we have to discard it.
+ return false
+ }
+}
+
+// blockData represents the indexable data of a single block being sent from the
+// reader to the sender goroutine and optionally queued in blockDataCh between.
+// It also includes the latest revertCount known before reading the block data,
+// which allows the sender to guarantee that all sent block data is always
+// consistent with the indexer's canonical chain view while the reading of block
+// data can still happen asynchronously.
+type blockData struct {
+ blockNumber, revertCount uint64
+ valid bool
+ header *types.Header
+ body *types.Body
+ receipts types.Receipts
+}
+
+// sendHeadBlockData immediately sends the latest head block data to the indexer
+// and updates the status of the historical block data serving mechanism
+// accordingly.
+func (s *indexServer) sendHeadBlockData(header *types.Header, body *types.Body, receipts types.Receipts) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.stopped {
+ return
+ }
+ if header.Hash() == s.lastHead.Hash() {
+ return
+ }
+ s.lastHead = header
+ if !s.needBodies {
+ body = nil
+ }
+ if !s.needReceipts {
+ receipts = nil
+ }
+ ready, needBlocks := s.indexer.AddBlockData(header, body, receipts)
+ s.updateIndexerStatus(ready, needBlocks, 0)
+ s.updateSendStatus()
+}
+
+// updateIndexerStatus updates the ready / needBlocks fields in the send loop
+// status. The number of historic blocks added since the last update is
+// specified in addedBlocks. During continuous historical block range delivery
+// the starting point of the new needBlocks range is expected to advance with
+// each new block added. If the new range does not match the expectation then
+// a blockDataCh queue reset is requested.
+func (s *indexServer) updateIndexerStatus(ready bool, needBlocks common.Range[uint64], addedBlocks uint64) {
+ if needBlocks.First() != s.sendStatus.needBlocks.First()+addedBlocks {
+ s.sendStatus.resetQueueCount++ // request queue reset
+ }
+ s.sendStatus.ready, s.sendStatus.needBlocks = ready, needBlocks
+}
+
+// revert immediately reverts the indexer to the given block and updates the
+// status of the historical block data serving mechanism accordingly.
+func (s *indexServer) revert(header *types.Header) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.stopped || s.lastHead == nil {
+ return
+ }
+ if header.Hash() == s.lastHead.Hash() {
+ return
+ }
+ blockNumber := header.Number.Uint64()
+ if blockNumber >= s.lastHead.Number.Uint64() {
+ panic("invalid indexer revert")
+ }
+ s.lastHead = header
+ s.sendStatus.revertCount++
+ s.sendStatus.lastRevertBlock = blockNumber
+ s.updateSendStatus()
+ s.indexer.Revert(blockNumber)
+}
+
+// suspendOrStop blocks the send loop until it is unsuspended or the parent
+// chain is stopped. It also notifies the indexer by calling Suspend and
+// suspends the read loop through updateStatus.
+func (s *indexServer) suspendOrStop(suspended bool) bool {
+ if !suspended {
+ panic("unexpected 'false' signal on suspendCh")
+ }
+ s.lock.Lock()
+ s.sendStatus.suspended = true
+ s.updateSendStatus()
+ s.indexer.Suspended()
+ s.lock.Unlock()
+ select {
+ case <-s.parent.closeCh:
+ return true
+ case suspended = <-s.suspendCh:
+ }
+ if suspended {
+ panic("unexpected 'true' signal on suspendCh")
+ }
+ s.lock.Lock()
+ s.sendStatus.suspended = false
+ s.updateSendStatus()
+ s.lock.Unlock()
+ return false
+}
+
+// historicSendLoop is the main event loop that interacts with the indexer in
+// case when historical block data is requested. It sends status updates to
+// the reader goroutine through statusCh and feeds the fetched data coming from
+// blockDataCh into the indexer.
+func (s *indexServer) historicSendLoop() {
+ defer func() {
+ s.lock.Lock()
+ s.indexer.Stop()
+ s.stopped = true
+ s.lock.Unlock()
+ s.parent.closeWg.Done()
+ }()
+
+ for {
+ select {
+ // do a separate non-blocking select to ensure that a suspend attempt
+ // during the previous historical AddBlockData will be catched in the
+ // next round.
+ case suspend := <-s.suspendCh:
+ if s.suspendOrStop(suspend) {
+ return
+ }
+ default:
+ }
+ select {
+ case <-s.parent.closeCh:
+ return
+ case suspend := <-s.suspendCh:
+ if s.suspendOrStop(suspend) {
+ return
+ }
+ case nextBlockData := <-s.blockDataCh:
+ s.addHistoricBlockData(nextBlockData)
+ case <-s.sendTimer.C:
+ s.handleHistoricLoopTimer()
+ }
+ }
+}
+
+// handleHistoricLoopTimer queries the indexer status again if the last known
+// status was "not ready". By calling updateSendStatus it also restarts the
+// timer if the indexer is still not ready.
+func (s *indexServer) handleHistoricLoopTimer() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if !s.sendStatus.ready {
+ ready, needBlocks := s.indexer.Status()
+ s.updateIndexerStatus(ready, needBlocks, 0)
+ s.updateSendStatus()
+ }
+}
+
+// addHistoricBlockData checks if the next blockData fetched by the asynchronous
+// historicReadLoop is still the one to be delivered next to the indexer and
+// delivers it if possible. If the requested block range has changed since or
+// a reorg might have made the fetched data invalid then it triggers a queue
+// reset by increasing resetQueueCount. This ensures that the read loop discards
+// queued blockData and starts sending newly fetched data according to the new
+// needBlocks range.
+func (s *indexServer) addHistoricBlockData(nextBlockData blockData) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ // check if received block data is indeed from the next expected
+ // block and is still guaranteed to be canonical; ignore and request
+ // a queue reset otherwise.
+ if s.sendStatus.isNextExpected(nextBlockData) {
+ // check if the has actually been found in the database
+ if nextBlockData.valid {
+ ready, needBlocks := s.indexer.AddBlockData(nextBlockData.header, nextBlockData.body, nextBlockData.receipts)
+ s.updateIndexerStatus(ready, needBlocks, 1)
+ if s.sendStatus.needBlocks.IsEmpty() {
+ s.logDelivered(nextBlockData.blockNumber)
+ s.logFinished()
+ } else if s.sendStatus.needBlocks.First() == nextBlockData.blockNumber+1 {
+ s.logDelivered(nextBlockData.blockNumber)
+ }
+ } else {
+ // report error and update missingBlockCutoff in order to
+ // avoid spinning forever on the same error.
+ if time.Since(s.lastHistoryErrorLog) >= time.Second*10 {
+ s.lastHistoryErrorLog = time.Now()
+ if nextBlockData.header == nil {
+ log.Error("Historical header is missing", "number", nextBlockData.blockNumber)
+ } else if s.needBodies && nextBlockData.body == nil {
+ log.Error("Historical block body is missing", "number", nextBlockData.blockNumber, "hash", nextBlockData.header.Hash())
+ } else if s.needReceipts && nextBlockData.receipts == nil {
+ log.Error("Historical receipts are missing", "number", nextBlockData.blockNumber, "hash", nextBlockData.header.Hash())
+ }
+ }
+ s.missingBlockCutoff = max(s.missingBlockCutoff, nextBlockData.blockNumber+1)
+ s.indexer.SetHistoryCutoff(max(s.historyCutoff, s.missingBlockCutoff))
+ ready, needBlocks := s.indexer.Status()
+ s.updateIndexerStatus(ready, needBlocks, 0)
+ }
+ } else {
+ // trigger resetting the queue and sending blockData from needBlocks.First()
+ s.sendStatus.resetQueueCount++
+ }
+ s.updateSendStatus()
+}
+
+// updateStatus updates the asynchronous reader goroutine's status based on the
+// latest indexer status. If necessary then it trims the needBlocks range based
+// on the locally available block range. If there is already an unread status
+// update waiting on statusCh then it is replaced by the new one.
+func (s *indexServer) updateSendStatus() {
+ if s.sendStatus.ready || s.sendStatus.suspended {
+ s.sendTimer.Stop()
+ } else {
+ s.sendTimer.Reset(busyDelay)
+ }
+ var headNumber uint64
+ if s.lastHead != nil {
+ headNumber = s.lastHead.Number.Uint64()
+ }
+ if headNumber+1 < s.sendStatus.needBlocks.AfterLast() {
+ s.sendStatus.needBlocks.SetLast(headNumber)
+ }
+ if s.sendStatus.needBlocks.IsEmpty() || max(s.historyCutoff, s.missingBlockCutoff) > s.sendStatus.needBlocks.First() {
+ s.sendStatus.needBlocks = common.Range[uint64]{}
+ }
+ select {
+ case <-s.statusCh:
+ default:
+ }
+ s.statusCh <- s.sendStatus
+}
+
+// setBlockProcessing suspends serving historical blocks requested by the indexer
+// while a chain segment is being processed and added to the chain.
+func (s *indexServer) setBlockProcessing(suspended bool) {
+ select {
+ case old := <-s.suspendCh:
+ if old == suspended {
+ panic("unexpected value pulled back from suspendCh")
+ }
+ default:
+ // only send new suspended flag if previous (opposite) value has been
+ // read by the send loop already
+ s.suspendCh <- suspended
+ }
+ if suspended && s.testSuspendHookCh != nil {
+ select {
+ case s.testSuspendHookCh <- struct{}{}:
+ default:
+ }
+ }
+}
+
+// clearBlockQueue removes all entries from blockDataCh.
+func (s *indexServer) clearBlockQueue() {
+ for {
+ select {
+ case <-s.blockDataCh:
+ default:
+ return
+ }
+ }
+}
+
+// updateReadStatus updates readStatus and checks whether a queue reset has been
+// requested by the send loop. In that case it empties the queue and resets the
+// readPointer to the first block of the needBlocks range.
+// Note that the blocks betweeen needBlocks.First() and readPointer-1 are assumed
+// to already be queued in blockDataCh. If needBlocks.First() does not advance
+// with each delivered block or an expired blockData is received by the send
+// loop then a queue reset is requested.
+func (s *indexServer) updateReadStatus(newStatus indexerStatus) {
+ if newStatus.resetQueueCount != s.readStatus.resetQueueCount {
+ s.clearBlockQueue()
+ s.readPointer = newStatus.needBlocks.First()
+ }
+ s.readStatus = newStatus
+}
+
+// canQueueNextBlock returns true if there are more blocks to read in the
+// needBlocks range and we have not yet reached the capacity of blockDataCh yet.
+// Note that the latter check assumes that blocks between needBlocks.First() and
+// readPointer-1 are queued.
+func (s *indexServer) canQueueNextBlock() bool {
+ return s.readStatus.needBlocks.Includes(s.readPointer) &&
+ s.readPointer < s.readStatus.needBlocks.First()+maxHistoricPrefetch
+}
+
+// historicReadLoop reads requested historical block data asynchronously.
+// It receives indexer status updates on statusCh and sends block data to
+// blockDataCh. If the latest status indicates that there the server is not
+// suspended then it is guaranteed that eventually a corresponding block data
+// response will be sent unless a new status update is received before this
+// happens.
+// Note that blockDataCh can queue multiple block data pre-fetched by
+// historicReadLoop. If the requested range is changed while there is still
+// queued data in the channel that corresponds to the previous requested range
+// then the receiver sends a new status update with increased resetQueueCount.
+// In this case historicReadLoop removes all remaining entries from the queue
+// and starts sending block data from the beginning of the new range.
+func (s *indexServer) historicReadLoop() {
+ defer s.parent.closeWg.Done()
+
+ s.readStatus.resetQueueCount = math.MaxUint64
+ for {
+ if !s.readStatus.suspended && s.canQueueNextBlock() {
+ // Send next item to the queue.
+ bd := blockData{blockNumber: s.readPointer, revertCount: s.readStatus.revertCount, valid: true}
+ if bd.header = s.parent.chain.GetHeaderByNumber(bd.blockNumber); bd.header != nil {
+ blockHash := bd.header.Hash()
+ if s.needBodies {
+ bd.body = s.parent.chain.GetBody(blockHash)
+ if bd.body == nil {
+ bd.valid = false
+ }
+ }
+ if s.needReceipts {
+ bd.receipts, _ = s.parent.rawReceiptsCache.Get(blockHash)
+ if bd.receipts == nil {
+ // Note: we do not cache historical receipts because typically
+ // each indexer requests them at different times.
+ bd.receipts = s.parent.chain.GetRawReceipts(blockHash, bd.blockNumber)
+ if bd.receipts == nil {
+ bd.valid = false
+ }
+ }
+ }
+ }
+ // Note that a response with missing block data is still sent in case of
+ // a read error, signaling to the sender logic that something is missing.
+ // This might be either due to a database error or a reorg.
+ select {
+ case s.blockDataCh <- bd:
+ s.readPointer++
+ default:
+ // Note: updateIndexerStatus in the send loop and canQueueNextBlock
+ // in the read loop should ensure that no send is attempted at
+ // blockDataCh when it is filled to full capacity. If it happens
+ // anyway then we print an error and set the suspended flag to
+ // true until the next status update.
+ if time.Since(s.lastHistoryErrorLog) >= time.Second*10 {
+ s.lastHistoryErrorLog = time.Now()
+ log.Error("Historical block queue is full")
+ }
+ s.readStatus.suspended = true
+ }
+ // Keep checking status updates without blocking as long as there is
+ // something to do.
+ select {
+ case <-s.parent.closeCh:
+ return
+ case status := <-s.statusCh:
+ s.updateReadStatus(status)
+ default:
+ }
+ } else {
+ // There was nothing to do; wait for a next status update.
+ select {
+ case <-s.parent.closeCh:
+ return
+ case status := <-s.statusCh:
+ s.updateReadStatus(status)
+ }
+ }
+ }
+}
+
+// logDelivered periodically prints log messages that report the current state
+// of the indexing process. If should be called after processing each new block.
+func (s *indexServer) logDelivered(position uint64) {
+ if s.processed == 0 {
+ s.startedAt = time.Now()
+ }
+ s.processed++
+ if s.logged {
+ if time.Since(s.lastLoggedAt) < logFrequency {
+ return
+ }
+ } else {
+ if time.Since(s.startedAt) < headLogDelay {
+ return
+ }
+ s.logged = true
+ }
+ s.lastLoggedAt = time.Now()
+ log.Info("Generating "+s.name, "block", position, "processed", s.processed, "elapsed", time.Since(s.startedAt))
+}
+
+// logFinished prints a log message that report the end of the indexing process.
+// Note that any log message is only printed if the process took longer than
+// headLogDelay.
+func (s *indexServer) logFinished() {
+ if s.logged {
+ log.Info("Finished "+s.name, "processed", s.processed, "elapsed", time.Since(s.startedAt))
+ s.logged = false
+ }
+ s.processed = 0
+}
+
+// setFinalBlock notifies the Indexer instance about the latest finalized block.
+func (s *indexServer) setFinalBlock(blockNumber uint64) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.stopped {
+ return
+ }
+ s.indexer.SetFinalized(blockNumber)
+}
+
+// setHistoryCutoff notifies the Indexer instance about the history cutoff point.
+// The indexer cannot expect any data being delivered if needBlocks.First() is
+// before this point.
+// Note that if some historical block data could not be loaded from the database
+// then the historical cutoff point reported to the indexer might be modified by
+// missingBlockCutoff. This workaround ensures that the indexing process does not
+// get stuck permanently in case of missing data.
+func (s *indexServer) setHistoryCutoff(blockNumber uint64) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.stopped {
+ return
+ }
+ s.historyCutoff = blockNumber
+ s.indexer.SetHistoryCutoff(max(s.historyCutoff, s.missingBlockCutoff))
+ ready, needBlocks := s.indexer.Status()
+ s.updateIndexerStatus(ready, needBlocks, 0)
+ s.updateSendStatus()
+}
diff --git a/core/index_server_test.go b/core/index_server_test.go
new file mode 100644
index 0000000000..6143ddd40f
--- /dev/null
+++ b/core/index_server_test.go
@@ -0,0 +1,222 @@
+// Copyright 2025 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package core implements the Ethereum consensus protocol.
+package core
+
+import (
+ "math/big"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+func TestIndexServer(t *testing.T) {
+ testIndexServer(t, false, false)
+ testIndexServer(t, false, true)
+ testIndexServer(t, true, false)
+ testIndexServer(t, true, true)
+}
+
+func testIndexServer(t *testing.T, needBodies, needReceipts bool) {
+ ti := &testIndexer{
+ t: t,
+ eventCh: make(chan testIndexerEvent),
+ statusCh: make(chan testIndexerStatus),
+ }
+ var blockchain *BlockChain
+ doneCh := make(chan struct{})
+ expDone := func() {
+ select {
+ case <-doneCh:
+ case <-time.After(time.Second * 5):
+ t.Fatalf("Expected chain operation done but not finished yet")
+ }
+ }
+ testSuspendHookCh := make(chan struct{}, 1)
+ run := func(fn func()) {
+ go func() {
+ fn()
+ doneCh <- struct{}{}
+ }()
+ }
+ insert := func(chain []*types.Block) {
+ run(func() {
+ if i, err := blockchain.InsertChain(chain); err != nil {
+ t.Fatalf("failed to insert chain[%d]: %v", i, err)
+ }
+ })
+ }
+ waitSuspend := func() {
+ select {
+ case <-testSuspendHookCh:
+ case <-time.After(time.Second * 5):
+ t.Fatalf("Expected index server suspend but suspended state not reached")
+ }
+ }
+
+ gspec := &Genesis{
+ Config: params.TestChainConfig,
+ BaseFee: big.NewInt(params.InitialBaseFee),
+ }
+ db := rawdb.NewMemoryDatabase()
+ blockchain, _ = NewBlockChain(db, gspec, ethash.NewFaker(), DefaultConfig())
+ chain := []*types.Block{gspec.ToBlock()}
+ blocks, _ := GenerateChain(gspec.Config, chain[0], ethash.NewFaker(), db, 110, func(i int, gen *BlockGen) {})
+ chain = append(chain, blocks...)
+
+ run(func() {
+ blockchain.RegisterIndexer(ti, "", needBodies, needReceipts)
+ blockchain.indexServers.servers[0].testSuspendHookCh = testSuspendHookCh
+ })
+ ti.expEvent(testIndexerEvent{ev: "SetHistoryCutoff", blockNumber: 0})
+ ti.expEvent(testIndexerEvent{ev: "SetFinalized", blockNumber: 0})
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 0, blockHash: chain[0].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true})
+ expDone()
+
+ insert(chain[1:101])
+ waitSuspend()
+ ti.expEvent(testIndexerEvent{ev: "Suspended"})
+ for i := uint64(1); i <= 100; i++ {
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true})
+ }
+ expDone()
+
+ run(blockchain.Stop)
+ ti.expEvent(testIndexerEvent{ev: "Stop"})
+ expDone()
+
+ blockchain, _ = NewBlockChain(db, gspec, ethash.NewFaker(), DefaultConfig())
+ run(func() {
+ blockchain.RegisterIndexer(ti, "", needBodies, needReceipts)
+ blockchain.indexServers.servers[0].testSuspendHookCh = testSuspendHookCh
+ })
+ ti.expEvent(testIndexerEvent{ev: "SetHistoryCutoff", blockNumber: 0})
+ ti.expEvent(testIndexerEvent{ev: "SetFinalized", blockNumber: 0})
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 100, blockHash: chain[100].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](0, 100)})
+ expDone()
+ // request entire chain as historical range, add a new block in the middle and check suspend mechanism
+ for i := uint64(0); i <= 49; i++ {
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](i+1, 99-i)})
+ }
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 50, blockHash: chain[50].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ insert(chain[101:102])
+ waitSuspend()
+ ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](51, 49)})
+ ti.expEvent(testIndexerEvent{ev: "Suspended"})
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 101, blockHash: chain[101].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](51, 50)})
+ expDone()
+ for i := uint64(51); i <= 100; i++ {
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](i+1, 100-i)})
+ }
+
+ run(func() {
+ blockchain.SetHead(80)
+ })
+ ti.expEvent(testIndexerEvent{ev: "Revert", blockNumber: 80})
+ expDone()
+ chain = chain[:81]
+ blocks, _ = GenerateChain(gspec.Config, chain[80], ethash.NewFaker(), db, 45, func(i int, gen *BlockGen) {})
+ chain = append(chain, blocks...)
+ insert(chain[81:121])
+ waitSuspend()
+ ti.expEvent(testIndexerEvent{ev: "Suspended"})
+ for i := uint64(81); i <= 120; i++ {
+ ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
+ ti.status(testIndexerStatus{ready: true})
+ }
+ expDone()
+
+ run(blockchain.Stop)
+ ti.expEvent(testIndexerEvent{ev: "Stop"})
+ expDone()
+}
+
+type testIndexer struct {
+ t *testing.T
+ eventCh chan testIndexerEvent
+ statusCh chan testIndexerStatus
+}
+
+type testIndexerEvent struct {
+ ev string
+ blockNumber uint64
+ blockHash common.Hash
+ hasBody, hasReceipts bool
+}
+
+type testIndexerStatus struct {
+ ready bool
+ needBlocks common.Range[uint64]
+}
+
+func (ti *testIndexer) expEvent(exp testIndexerEvent) {
+ var got testIndexerEvent
+ select {
+ case got = <-ti.eventCh:
+ case <-time.After(time.Second * 5):
+ }
+ if got != exp {
+ ti.t.Fatalf("Wrong indexer event received (expected: %v, got: %v)", exp, got)
+ }
+}
+
+func (ti *testIndexer) status(status testIndexerStatus) {
+ ti.statusCh <- status
+}
+
+func (ti *testIndexer) AddBlockData(header *types.Header, body *types.Body, receipts types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
+ ti.eventCh <- testIndexerEvent{ev: "AddBlockData", blockNumber: header.Number.Uint64(), blockHash: header.Hash(), hasBody: body != nil, hasReceipts: receipts != nil}
+ status := <-ti.statusCh
+ return status.ready, status.needBlocks
+}
+
+func (ti *testIndexer) Revert(blockNumber uint64) {
+ ti.eventCh <- testIndexerEvent{ev: "Revert", blockNumber: blockNumber}
+}
+
+func (ti *testIndexer) Status() (ready bool, needBlocks common.Range[uint64]) {
+ ti.eventCh <- testIndexerEvent{ev: "Status"}
+ status := <-ti.statusCh
+ return status.ready, status.needBlocks
+}
+
+func (ti *testIndexer) SetHistoryCutoff(blockNumber uint64) {
+ ti.eventCh <- testIndexerEvent{ev: "SetHistoryCutoff", blockNumber: blockNumber}
+}
+
+func (ti *testIndexer) SetFinalized(blockNumber uint64) {
+ ti.eventCh <- testIndexerEvent{ev: "SetFinalized", blockNumber: blockNumber}
+}
+
+func (ti *testIndexer) Suspended() {
+ ti.eventCh <- testIndexerEvent{ev: "Suspended"}
+}
+
+func (ti *testIndexer) Stop() {
+ ti.eventCh <- testIndexerEvent{ev: "Stop"}
+}