This commit is contained in:
Felföldi Zsolt 2026-02-25 22:00:30 -08:00 committed by GitHub
commit c5d41efffe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 979 additions and 2 deletions

View file

@ -27,7 +27,11 @@ type Range[T uint32 | uint64] struct {
// NewRange creates a new range based of first element and number of elements.
func NewRange[T uint32 | uint64](first, count T) Range[T] {
return Range[T]{first, first + count}
afterLast := first + count
if afterLast < first {
panic("range overflow")
}
return Range[T]{first, afterLast}
}
// First returns the first element of the range.
@ -97,6 +101,12 @@ func (r Range[T]) Intersection(q Range[T]) Range[T] {
// Union returns the union of two ranges. Panics for gapped ranges.
func (r Range[T]) Union(q Range[T]) Range[T] {
if r.IsEmpty() {
return q
}
if q.IsEmpty() {
return r
}
if max(r.first, q.first) > min(r.afterLast, q.afterLast) {
panic("cannot create union; gap between ranges")
}

View file

@ -335,6 +335,7 @@ type BlockChain struct {
blockProcCounter int32
scope event.SubscriptionScope
genesisBlock *types.Block
indexServers indexServers
// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
@ -569,9 +570,15 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine,
log.Info("Failed to setup size tracker", "err", err)
}
}
bc.indexServers.init(bc)
return bc, nil
}
// RegisterIndexer registers a new indexer to the chain.
func (bc *BlockChain) RegisterIndexer(indexer Indexer, name string, needBodies, needReceipts bool) {
bc.indexServers.register(indexer, name, needBodies, needReceipts)
}
func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
@ -684,6 +691,7 @@ func (bc *BlockChain) loadLastState() error {
if head := rawdb.ReadFinalizedBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFinalBlock.Store(block.Header())
bc.indexServers.setFinalBlock(block.NumberU64())
headFinalizedBlockGauge.Update(int64(block.NumberU64()))
bc.currentSafeBlock.Store(block.Header())
headSafeBlockGauge.Update(int64(block.NumberU64()))
@ -731,6 +739,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
return errors.New("unexpected database tail")
}
bc.historyPrunePoint.Store(predefinedPoint)
bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
return nil
case history.KeepPostMerge:
@ -752,6 +761,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
return errors.New("unexpected database tail")
}
bc.historyPrunePoint.Store(predefinedPoint)
bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
return nil
default:
@ -805,9 +815,11 @@ func (bc *BlockChain) SetFinalized(header *types.Header) {
if header != nil {
rawdb.WriteFinalizedBlockHash(bc.db, header.Hash())
headFinalizedBlockGauge.Update(int64(header.Number.Uint64()))
bc.indexServers.setFinalBlock(header.Number.Uint64())
} else {
rawdb.WriteFinalizedBlockHash(bc.db, common.Hash{})
headFinalizedBlockGauge.Update(0)
bc.indexServers.setFinalBlock(0)
}
}
@ -1115,6 +1127,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.indexServers.revert(bc.CurrentBlock())
// Clear safe block, finalized block if needed
headBlock := bc.CurrentBlock()
@ -1203,6 +1216,7 @@ func (bc *BlockChain) Reset() error {
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
bc.indexServers.revert(genesis.Header())
// Dump the entire block chain and purge the caches
if err := bc.SetHead(0); err != nil {
return err
@ -1219,6 +1233,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
log.Crit("Failed to write genesis block", "err", err)
}
bc.writeHeadBlock(genesis)
bc.indexServers.broadcast(genesis)
// Last update all in-memory chain markers
bc.genesisBlock = genesis
@ -1335,6 +1350,7 @@ func (bc *BlockChain) stopWithoutSaving() {
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
bc.indexServers.stop()
bc.stopWithoutSaving()
// Ensure that the entirety of the state snapshot is journaled to disk.
@ -1636,6 +1652,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
}
}
bc.writeHeadBlock(block)
bc.indexServers.broadcast(block)
return nil
}
@ -1654,7 +1671,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
start = time.Now()
)
rawdb.WriteBlock(batch, block)
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
blockHash := block.Hash()
rawdb.WriteReceipts(batch, blockHash, block.NumberU64(), receipts)
bc.indexServers.cacheRawReceipts(blockHash, receipts)
rawdb.WritePreimages(batch, statedb.Preimages())
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
@ -1768,6 +1787,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// Set new head.
bc.writeHeadBlock(block)
bc.indexServers.broadcast(block)
bc.chainFeed.Send(ChainEvent{
Header: block.Header(),
@ -1839,10 +1859,12 @@ func (bc *BlockChain) insertChain(ctx context.Context, chain types.Blocks, setHe
}
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
bc.indexServers.setBlockProcessing(true)
bc.blockProcFeed.Send(true)
}
defer func() {
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
bc.indexServers.setBlockProcessing(false)
bc.blockProcFeed.Send(false)
}
}()
@ -2522,6 +2544,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
return errInvalidNewChain
}
}
bc.indexServers.revert(commonBlock)
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Info
@ -2619,6 +2642,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
}
// Update the head block
bc.writeHeadBlock(block)
bc.indexServers.broadcast(block)
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
@ -2696,6 +2720,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
}
}
bc.writeHeadBlock(head)
bc.indexServers.broadcast(head)
// Emit events
receipts, logs := bc.collectReceiptsAndLogs(head, false)

720
core/index_server.go Normal file
View file

@ -0,0 +1,720 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package core implements the Ethereum consensus protocol.
package core
import (
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
const (
busyDelay = time.Second // indexer status polling frequency when not ready
maxHistoricPrefetch = 16 // size of block data pre-fetch queue
rawReceiptsCacheSize = 8
logFrequency = time.Second * 20 // log info frequency during long indexing/unindexing process
headLogDelay = time.Second // head indexing log info delay (do not log if finished faster)
)
type Indexer interface {
// AddBlockData delivers a header and receipts belonging to a block that is
// either a direct descendant of the latest delivered head or the first one
// in the last requested range.
// The current ready/busy status and the requested historic range are returned.
// Note that the indexer should never block even if it is busy processing.
// It is allowed to re-request the delivered blocks later if the indexer could
// not process them when first delivered.
AddBlockData(header *types.Header, body *types.Body, receipts types.Receipts) (ready bool, needBlocks common.Range[uint64])
// Revert rewinds the index to the given head block number. Subsequent
// AddBlockData calls will deliver blocks starting from this point.
Revert(blockNumber uint64)
// Status returns the current ready/busy status and the requested historic range.
// Only the new head blocks are delivered if the indexer reports busy status.
Status() (ready bool, needBlocks common.Range[uint64])
// SetHistoryCutoff signals the historical cutoff point to the indexer.
// Note that any block number that is consistently being requested in the
// needBlocks response that is not older than the cutoff point is guaranteed
// to be delivered eventually. If the required data belonging to certain
// block numbers is missing then the cutoff point is moved after the missing
// section in order to maintain this guarantee.
SetHistoryCutoff(blockNumber uint64)
// SetFinalized signals the latest finalized block number to the indexer.
SetFinalized(blockNumber uint64)
// Suspended signals to the indexer that block processing has started and
// any non-essential asynchronous tasks of the indexer should be suspended.
// The next AddBlockData call signals the end of the suspended state.
// Note that if multiple blocks are inserted then the indexer is only
// suspended once, before the first block processing begins, so according
// to the rule above it will not be suspended while processing the rest of
// the batch. This behavior should be fine because indexing can happen in
// parallel with forward syncing, the purpose of the suspend mechanism is
// to handle historical index backfilling with a lower priority so that it
// does not increase block latency.
Suspended()
// Stop initiates indexer shutdown. No subsequent calls are made through this
// interface after Stop.
Stop()
}
// indexServers operates as a part of BlockChain and can serve multiple chain
// indexers that implement the Indexer interface.
type indexServers struct {
lock sync.Mutex
servers []*indexServer
chain *BlockChain
rawReceiptsCache *lru.Cache[common.Hash, []*types.Receipt]
lastHead *types.Header
lastHeadBody *types.Body
lastHeadReceipts types.Receipts
finalBlock, historyCutoff uint64
closeCh chan struct{}
closeWg sync.WaitGroup
}
// init initializes indexServers.
func (f *indexServers) init(chain *BlockChain) {
f.lock.Lock()
defer f.lock.Unlock()
f.chain = chain
f.lastHead = chain.CurrentBlock()
if f.lastHead != nil {
f.lastHeadBody = chain.GetBody(f.lastHead.Hash())
f.lastHeadReceipts = chain.GetRawReceipts(f.lastHead.Hash(), f.lastHead.Number.Uint64())
}
f.closeCh = make(chan struct{})
f.rawReceiptsCache = lru.NewCache[common.Hash, []*types.Receipt](rawReceiptsCacheSize)
}
// stop shuts down all registered Indexers and their serving goroutines.
func (f *indexServers) stop() {
f.lock.Lock()
defer f.lock.Unlock()
close(f.closeCh)
f.closeWg.Wait()
f.servers = nil
}
// register adds a new Indexer to the chain.
func (f *indexServers) register(indexer Indexer, name string, needBodies, needReceipts bool) {
f.lock.Lock()
defer f.lock.Unlock()
server := &indexServer{
parent: f,
indexer: indexer,
sendTimer: time.NewTimer(0),
name: name,
statusCh: make(chan indexerStatus, 1),
blockDataCh: make(chan blockData, maxHistoricPrefetch),
suspendCh: make(chan bool, 1),
needBodies: needBodies,
needReceipts: needReceipts,
}
f.servers = append(f.servers, server)
f.closeWg.Add(2)
indexer.SetHistoryCutoff(f.historyCutoff)
indexer.SetFinalized(f.finalBlock)
if f.lastHead != nil && f.lastHeadBody != nil && f.lastHeadReceipts != nil {
server.sendHeadBlockData(f.lastHead, f.lastHeadBody, f.lastHeadReceipts)
}
go server.historicReadLoop()
go server.historicSendLoop()
}
// cacheRawReceipts caches a set of raw receipts during block processing in order
// to avoid having to read it back from the database during broadcast.
func (f *indexServers) cacheRawReceipts(blockHash common.Hash, blockReceipts types.Receipts) {
f.rawReceiptsCache.Add(blockHash, blockReceipts)
}
// broadcast sends a new head block to all registered Indexer instances.
func (f *indexServers) broadcast(block *types.Block) {
f.lock.Lock()
defer f.lock.Unlock()
// Note that individual Indexer servers might ignore block bodies and
// receipts. We still always fetch receipts for simplicity because in the
// typical case it is cached during block processing and costs nothing.
blockHash := block.Hash()
blockReceipts, _ := f.rawReceiptsCache.Get(blockHash)
if blockReceipts == nil {
blockReceipts = f.chain.GetRawReceipts(blockHash, block.NumberU64())
if blockReceipts == nil {
log.Error("Receipts belonging to new head are missing", "number", block.NumberU64(), "hash", block.Hash())
return
}
f.rawReceiptsCache.Add(blockHash, blockReceipts)
}
f.lastHead, f.lastHeadBody, f.lastHeadReceipts = block.Header(), block.Body(), blockReceipts
for _, server := range f.servers {
server.sendHeadBlockData(block.Header(), block.Body(), blockReceipts)
}
}
// revert notifies all registered Indexer instances about the chain being rolled
// back to the given head or last common ancestor.
func (f *indexServers) revert(header *types.Header) {
f.lock.Lock()
defer f.lock.Unlock()
for _, server := range f.servers {
server.revert(header)
}
}
// setFinalBlock notifies all Indexer instances about the latest finalized block.
func (f *indexServers) setFinalBlock(blockNumber uint64) {
f.lock.Lock()
defer f.lock.Unlock()
if f.finalBlock == blockNumber {
return
}
f.finalBlock = blockNumber
for _, server := range f.servers {
server.setFinalBlock(blockNumber)
}
}
// setHistoryCutoff notifies all Indexer instances about the history cutoff point.
// The indexers cannot expect any data being delivered if needBlocks.First() is
// before this point.
func (f *indexServers) setHistoryCutoff(blockNumber uint64) {
f.lock.Lock()
defer f.lock.Unlock()
if f.historyCutoff == blockNumber {
return
}
f.historyCutoff = blockNumber
for _, server := range f.servers {
server.setHistoryCutoff(blockNumber)
}
}
// setBlockProcessing suspends serving historical blocks requested by the indexers
// while a chain segment is being processed and added to the chain.
func (f *indexServers) setBlockProcessing(processing bool) {
f.lock.Lock()
defer f.lock.Unlock()
for _, server := range f.servers {
server.setBlockProcessing(processing)
}
}
// indexServer sends updates to a single Indexer instance. It sends all new heads
// and reorg events, and also sends historical block data upon request.
// It guarantees that Indexer functions are never called concurrently and also
// they always present a consistent view of the chain to the indexer.
type indexServer struct {
lock sync.Mutex
parent *indexServers
indexer Indexer // always call under mutex lock; never call after stopped
stopped bool
lastHead *types.Header
sendStatus indexerStatus
statusCh chan indexerStatus
blockDataCh chan blockData
suspendCh chan bool
testSuspendHookCh chan struct{} // initialized by test, capacity = 1
sendTimer *time.Timer
historyCutoff, missingBlockCutoff uint64
needBodies, needReceipts bool
readStatus indexerStatus
readPointer uint64 // next block to be queued
name string
processed uint64
logged bool
startedAt, lastLoggedAt time.Time
lastHistoryErrorLog time.Time
}
// indexerStatus is maintained by the historicSendLoop goroutine and all changes
// are sent to the historicReadLoop goroutine through statusCh.
type indexerStatus struct {
ready bool // last feedback received from the indexer
needBlocks common.Range[uint64] // last feedback received from the indexer
suspended bool // suspend historic block delivery during block processing
resetQueueCount uint64 // total number of queue resets
revertCount, lastRevertBlock uint64 // detect entries potentially expired by a revert/reorg
}
// isNextExpected returns true if the received blockData (potentially based on a
// previously sent indexerStatus) is still guaranteed to be valid and the one
// expected by the indexer according to the latest indexerStatus.
func (s *indexerStatus) isNextExpected(b blockData) bool {
if s.needBlocks.IsEmpty() || s.needBlocks.First() != b.blockNumber {
return false // not the expected next block number or no historical blocks expected at all
}
// block number is the expected one; check if a reorg might have invalidated it
switch s.revertCount {
case b.revertCount:
return true // no reorgs happened since the collection of block data
case b.revertCount + 1:
// one reorg happened to s.lastRevertBlock; b is valid if not newer than this
return b.blockNumber <= s.lastRevertBlock
default:
// multiple reorgs happened; previous revert blocks are not remembered
// so we don't know if b is still valid and therefore we have to discard it.
return false
}
}
// blockData represents the indexable data of a single block being sent from the
// reader to the sender goroutine and optionally queued in blockDataCh between.
// It also includes the latest revertCount known before reading the block data,
// which allows the sender to guarantee that all sent block data is always
// consistent with the indexer's canonical chain view while the reading of block
// data can still happen asynchronously.
type blockData struct {
blockNumber, revertCount uint64
valid bool
header *types.Header
body *types.Body
receipts types.Receipts
}
// sendHeadBlockData immediately sends the latest head block data to the indexer
// and updates the status of the historical block data serving mechanism
// accordingly.
func (s *indexServer) sendHeadBlockData(header *types.Header, body *types.Body, receipts types.Receipts) {
s.lock.Lock()
defer s.lock.Unlock()
if s.stopped {
return
}
if header.Hash() == s.lastHead.Hash() {
return
}
s.lastHead = header
if !s.needBodies {
body = nil
}
if !s.needReceipts {
receipts = nil
}
ready, needBlocks := s.indexer.AddBlockData(header, body, receipts)
s.updateIndexerStatus(ready, needBlocks, 0)
s.updateSendStatus()
}
// updateIndexerStatus updates the ready / needBlocks fields in the send loop
// status. The number of historic blocks added since the last update is
// specified in addedBlocks. During continuous historical block range delivery
// the starting point of the new needBlocks range is expected to advance with
// each new block added. If the new range does not match the expectation then
// a blockDataCh queue reset is requested.
func (s *indexServer) updateIndexerStatus(ready bool, needBlocks common.Range[uint64], addedBlocks uint64) {
if needBlocks.First() != s.sendStatus.needBlocks.First()+addedBlocks {
s.sendStatus.resetQueueCount++ // request queue reset
}
s.sendStatus.ready, s.sendStatus.needBlocks = ready, needBlocks
}
// revert immediately reverts the indexer to the given block and updates the
// status of the historical block data serving mechanism accordingly.
func (s *indexServer) revert(header *types.Header) {
s.lock.Lock()
defer s.lock.Unlock()
if s.stopped || s.lastHead == nil {
return
}
if header.Hash() == s.lastHead.Hash() {
return
}
blockNumber := header.Number.Uint64()
if blockNumber >= s.lastHead.Number.Uint64() {
panic("invalid indexer revert")
}
s.lastHead = header
s.sendStatus.revertCount++
s.sendStatus.lastRevertBlock = blockNumber
s.updateSendStatus()
s.indexer.Revert(blockNumber)
}
// suspendOrStop blocks the send loop until it is unsuspended or the parent
// chain is stopped. It also notifies the indexer by calling Suspend and
// suspends the read loop through updateStatus.
func (s *indexServer) suspendOrStop(suspended bool) bool {
if !suspended {
panic("unexpected 'false' signal on suspendCh")
}
s.lock.Lock()
s.sendStatus.suspended = true
s.updateSendStatus()
s.indexer.Suspended()
s.lock.Unlock()
select {
case <-s.parent.closeCh:
return true
case suspended = <-s.suspendCh:
}
if suspended {
panic("unexpected 'true' signal on suspendCh")
}
s.lock.Lock()
s.sendStatus.suspended = false
s.updateSendStatus()
s.lock.Unlock()
return false
}
// historicSendLoop is the main event loop that interacts with the indexer in
// case when historical block data is requested. It sends status updates to
// the reader goroutine through statusCh and feeds the fetched data coming from
// blockDataCh into the indexer.
func (s *indexServer) historicSendLoop() {
defer func() {
s.lock.Lock()
s.indexer.Stop()
s.stopped = true
s.lock.Unlock()
s.parent.closeWg.Done()
}()
for {
select {
// do a separate non-blocking select to ensure that a suspend attempt
// during the previous historical AddBlockData will be catched in the
// next round.
case suspend := <-s.suspendCh:
if s.suspendOrStop(suspend) {
return
}
default:
}
select {
case <-s.parent.closeCh:
return
case suspend := <-s.suspendCh:
if s.suspendOrStop(suspend) {
return
}
case nextBlockData := <-s.blockDataCh:
s.addHistoricBlockData(nextBlockData)
case <-s.sendTimer.C:
s.handleHistoricLoopTimer()
}
}
}
// handleHistoricLoopTimer queries the indexer status again if the last known
// status was "not ready". By calling updateSendStatus it also restarts the
// timer if the indexer is still not ready.
func (s *indexServer) handleHistoricLoopTimer() {
s.lock.Lock()
defer s.lock.Unlock()
if !s.sendStatus.ready {
ready, needBlocks := s.indexer.Status()
s.updateIndexerStatus(ready, needBlocks, 0)
s.updateSendStatus()
}
}
// addHistoricBlockData checks if the next blockData fetched by the asynchronous
// historicReadLoop is still the one to be delivered next to the indexer and
// delivers it if possible. If the requested block range has changed since or
// a reorg might have made the fetched data invalid then it triggers a queue
// reset by increasing resetQueueCount. This ensures that the read loop discards
// queued blockData and starts sending newly fetched data according to the new
// needBlocks range.
func (s *indexServer) addHistoricBlockData(nextBlockData blockData) {
s.lock.Lock()
defer s.lock.Unlock()
// check if received block data is indeed from the next expected
// block and is still guaranteed to be canonical; ignore and request
// a queue reset otherwise.
if s.sendStatus.isNextExpected(nextBlockData) {
// check if the has actually been found in the database
if nextBlockData.valid {
ready, needBlocks := s.indexer.AddBlockData(nextBlockData.header, nextBlockData.body, nextBlockData.receipts)
s.updateIndexerStatus(ready, needBlocks, 1)
if s.sendStatus.needBlocks.IsEmpty() {
s.logDelivered(nextBlockData.blockNumber)
s.logFinished()
} else if s.sendStatus.needBlocks.First() == nextBlockData.blockNumber+1 {
s.logDelivered(nextBlockData.blockNumber)
}
} else {
// report error and update missingBlockCutoff in order to
// avoid spinning forever on the same error.
if time.Since(s.lastHistoryErrorLog) >= time.Second*10 {
s.lastHistoryErrorLog = time.Now()
if nextBlockData.header == nil {
log.Error("Historical header is missing", "number", nextBlockData.blockNumber)
} else if s.needBodies && nextBlockData.body == nil {
log.Error("Historical block body is missing", "number", nextBlockData.blockNumber, "hash", nextBlockData.header.Hash())
} else if s.needReceipts && nextBlockData.receipts == nil {
log.Error("Historical receipts are missing", "number", nextBlockData.blockNumber, "hash", nextBlockData.header.Hash())
}
}
s.missingBlockCutoff = max(s.missingBlockCutoff, nextBlockData.blockNumber+1)
s.indexer.SetHistoryCutoff(max(s.historyCutoff, s.missingBlockCutoff))
ready, needBlocks := s.indexer.Status()
s.updateIndexerStatus(ready, needBlocks, 0)
}
} else {
// trigger resetting the queue and sending blockData from needBlocks.First()
s.sendStatus.resetQueueCount++
}
s.updateSendStatus()
}
// updateStatus updates the asynchronous reader goroutine's status based on the
// latest indexer status. If necessary then it trims the needBlocks range based
// on the locally available block range. If there is already an unread status
// update waiting on statusCh then it is replaced by the new one.
func (s *indexServer) updateSendStatus() {
if s.sendStatus.ready || s.sendStatus.suspended {
s.sendTimer.Stop()
} else {
s.sendTimer.Reset(busyDelay)
}
var headNumber uint64
if s.lastHead != nil {
headNumber = s.lastHead.Number.Uint64()
}
if headNumber+1 < s.sendStatus.needBlocks.AfterLast() {
s.sendStatus.needBlocks.SetLast(headNumber)
}
if s.sendStatus.needBlocks.IsEmpty() || max(s.historyCutoff, s.missingBlockCutoff) > s.sendStatus.needBlocks.First() {
s.sendStatus.needBlocks = common.Range[uint64]{}
}
select {
case <-s.statusCh:
default:
}
s.statusCh <- s.sendStatus
}
// setBlockProcessing suspends serving historical blocks requested by the indexer
// while a chain segment is being processed and added to the chain.
func (s *indexServer) setBlockProcessing(suspended bool) {
select {
case old := <-s.suspendCh:
if old == suspended {
panic("unexpected value pulled back from suspendCh")
}
default:
// only send new suspended flag if previous (opposite) value has been
// read by the send loop already
s.suspendCh <- suspended
}
if suspended && s.testSuspendHookCh != nil {
select {
case s.testSuspendHookCh <- struct{}{}:
default:
}
}
}
// clearBlockQueue removes all entries from blockDataCh.
func (s *indexServer) clearBlockQueue() {
for {
select {
case <-s.blockDataCh:
default:
return
}
}
}
// updateReadStatus updates readStatus and checks whether a queue reset has been
// requested by the send loop. In that case it empties the queue and resets the
// readPointer to the first block of the needBlocks range.
// Note that the blocks betweeen needBlocks.First() and readPointer-1 are assumed
// to already be queued in blockDataCh. If needBlocks.First() does not advance
// with each delivered block or an expired blockData is received by the send
// loop then a queue reset is requested.
func (s *indexServer) updateReadStatus(newStatus indexerStatus) {
if newStatus.resetQueueCount != s.readStatus.resetQueueCount {
s.clearBlockQueue()
s.readPointer = newStatus.needBlocks.First()
}
s.readStatus = newStatus
}
// canQueueNextBlock returns true if there are more blocks to read in the
// needBlocks range and we have not yet reached the capacity of blockDataCh yet.
// Note that the latter check assumes that blocks between needBlocks.First() and
// readPointer-1 are queued.
func (s *indexServer) canQueueNextBlock() bool {
return s.readStatus.needBlocks.Includes(s.readPointer) &&
s.readPointer < s.readStatus.needBlocks.First()+maxHistoricPrefetch
}
// historicReadLoop reads requested historical block data asynchronously.
// It receives indexer status updates on statusCh and sends block data to
// blockDataCh. If the latest status indicates that there the server is not
// suspended then it is guaranteed that eventually a corresponding block data
// response will be sent unless a new status update is received before this
// happens.
// Note that blockDataCh can queue multiple block data pre-fetched by
// historicReadLoop. If the requested range is changed while there is still
// queued data in the channel that corresponds to the previous requested range
// then the receiver sends a new status update with increased resetQueueCount.
// In this case historicReadLoop removes all remaining entries from the queue
// and starts sending block data from the beginning of the new range.
func (s *indexServer) historicReadLoop() {
defer s.parent.closeWg.Done()
s.readStatus.resetQueueCount = math.MaxUint64
for {
if !s.readStatus.suspended && s.canQueueNextBlock() {
// Send next item to the queue.
bd := blockData{blockNumber: s.readPointer, revertCount: s.readStatus.revertCount, valid: true}
if bd.header = s.parent.chain.GetHeaderByNumber(bd.blockNumber); bd.header != nil {
blockHash := bd.header.Hash()
if s.needBodies {
bd.body = s.parent.chain.GetBody(blockHash)
if bd.body == nil {
bd.valid = false
}
}
if s.needReceipts {
bd.receipts, _ = s.parent.rawReceiptsCache.Get(blockHash)
if bd.receipts == nil {
// Note: we do not cache historical receipts because typically
// each indexer requests them at different times.
bd.receipts = s.parent.chain.GetRawReceipts(blockHash, bd.blockNumber)
if bd.receipts == nil {
bd.valid = false
}
}
}
}
// Note that a response with missing block data is still sent in case of
// a read error, signaling to the sender logic that something is missing.
// This might be either due to a database error or a reorg.
select {
case s.blockDataCh <- bd:
s.readPointer++
default:
// Note: updateIndexerStatus in the send loop and canQueueNextBlock
// in the read loop should ensure that no send is attempted at
// blockDataCh when it is filled to full capacity. If it happens
// anyway then we print an error and set the suspended flag to
// true until the next status update.
if time.Since(s.lastHistoryErrorLog) >= time.Second*10 {
s.lastHistoryErrorLog = time.Now()
log.Error("Historical block queue is full")
}
s.readStatus.suspended = true
}
// Keep checking status updates without blocking as long as there is
// something to do.
select {
case <-s.parent.closeCh:
return
case status := <-s.statusCh:
s.updateReadStatus(status)
default:
}
} else {
// There was nothing to do; wait for a next status update.
select {
case <-s.parent.closeCh:
return
case status := <-s.statusCh:
s.updateReadStatus(status)
}
}
}
}
// logDelivered periodically prints log messages that report the current state
// of the indexing process. If should be called after processing each new block.
func (s *indexServer) logDelivered(position uint64) {
if s.processed == 0 {
s.startedAt = time.Now()
}
s.processed++
if s.logged {
if time.Since(s.lastLoggedAt) < logFrequency {
return
}
} else {
if time.Since(s.startedAt) < headLogDelay {
return
}
s.logged = true
}
s.lastLoggedAt = time.Now()
log.Info("Generating "+s.name, "block", position, "processed", s.processed, "elapsed", time.Since(s.startedAt))
}
// logFinished prints a log message that report the end of the indexing process.
// Note that any log message is only printed if the process took longer than
// headLogDelay.
func (s *indexServer) logFinished() {
if s.logged {
log.Info("Finished "+s.name, "processed", s.processed, "elapsed", time.Since(s.startedAt))
s.logged = false
}
s.processed = 0
}
// setFinalBlock notifies the Indexer instance about the latest finalized block.
func (s *indexServer) setFinalBlock(blockNumber uint64) {
s.lock.Lock()
defer s.lock.Unlock()
if s.stopped {
return
}
s.indexer.SetFinalized(blockNumber)
}
// setHistoryCutoff notifies the Indexer instance about the history cutoff point.
// The indexer cannot expect any data being delivered if needBlocks.First() is
// before this point.
// Note that if some historical block data could not be loaded from the database
// then the historical cutoff point reported to the indexer might be modified by
// missingBlockCutoff. This workaround ensures that the indexing process does not
// get stuck permanently in case of missing data.
func (s *indexServer) setHistoryCutoff(blockNumber uint64) {
s.lock.Lock()
defer s.lock.Unlock()
if s.stopped {
return
}
s.historyCutoff = blockNumber
s.indexer.SetHistoryCutoff(max(s.historyCutoff, s.missingBlockCutoff))
ready, needBlocks := s.indexer.Status()
s.updateIndexerStatus(ready, needBlocks, 0)
s.updateSendStatus()
}

222
core/index_server_test.go Normal file
View file

@ -0,0 +1,222 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package core implements the Ethereum consensus protocol.
package core
import (
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)
func TestIndexServer(t *testing.T) {
testIndexServer(t, false, false)
testIndexServer(t, false, true)
testIndexServer(t, true, false)
testIndexServer(t, true, true)
}
func testIndexServer(t *testing.T, needBodies, needReceipts bool) {
ti := &testIndexer{
t: t,
eventCh: make(chan testIndexerEvent),
statusCh: make(chan testIndexerStatus),
}
var blockchain *BlockChain
doneCh := make(chan struct{})
expDone := func() {
select {
case <-doneCh:
case <-time.After(time.Second * 5):
t.Fatalf("Expected chain operation done but not finished yet")
}
}
testSuspendHookCh := make(chan struct{}, 1)
run := func(fn func()) {
go func() {
fn()
doneCh <- struct{}{}
}()
}
insert := func(chain []*types.Block) {
run(func() {
if i, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain[%d]: %v", i, err)
}
})
}
waitSuspend := func() {
select {
case <-testSuspendHookCh:
case <-time.After(time.Second * 5):
t.Fatalf("Expected index server suspend but suspended state not reached")
}
}
gspec := &Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
db := rawdb.NewMemoryDatabase()
blockchain, _ = NewBlockChain(db, gspec, ethash.NewFaker(), DefaultConfig())
chain := []*types.Block{gspec.ToBlock()}
blocks, _ := GenerateChain(gspec.Config, chain[0], ethash.NewFaker(), db, 110, func(i int, gen *BlockGen) {})
chain = append(chain, blocks...)
run(func() {
blockchain.RegisterIndexer(ti, "", needBodies, needReceipts)
blockchain.indexServers.servers[0].testSuspendHookCh = testSuspendHookCh
})
ti.expEvent(testIndexerEvent{ev: "SetHistoryCutoff", blockNumber: 0})
ti.expEvent(testIndexerEvent{ev: "SetFinalized", blockNumber: 0})
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 0, blockHash: chain[0].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true})
expDone()
insert(chain[1:101])
waitSuspend()
ti.expEvent(testIndexerEvent{ev: "Suspended"})
for i := uint64(1); i <= 100; i++ {
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true})
}
expDone()
run(blockchain.Stop)
ti.expEvent(testIndexerEvent{ev: "Stop"})
expDone()
blockchain, _ = NewBlockChain(db, gspec, ethash.NewFaker(), DefaultConfig())
run(func() {
blockchain.RegisterIndexer(ti, "", needBodies, needReceipts)
blockchain.indexServers.servers[0].testSuspendHookCh = testSuspendHookCh
})
ti.expEvent(testIndexerEvent{ev: "SetHistoryCutoff", blockNumber: 0})
ti.expEvent(testIndexerEvent{ev: "SetFinalized", blockNumber: 0})
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 100, blockHash: chain[100].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](0, 100)})
expDone()
// request entire chain as historical range, add a new block in the middle and check suspend mechanism
for i := uint64(0); i <= 49; i++ {
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](i+1, 99-i)})
}
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 50, blockHash: chain[50].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
insert(chain[101:102])
waitSuspend()
ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](51, 49)})
ti.expEvent(testIndexerEvent{ev: "Suspended"})
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: 101, blockHash: chain[101].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](51, 50)})
expDone()
for i := uint64(51); i <= 100; i++ {
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true, needBlocks: common.NewRange[uint64](i+1, 100-i)})
}
run(func() {
blockchain.SetHead(80)
})
ti.expEvent(testIndexerEvent{ev: "Revert", blockNumber: 80})
expDone()
chain = chain[:81]
blocks, _ = GenerateChain(gspec.Config, chain[80], ethash.NewFaker(), db, 45, func(i int, gen *BlockGen) {})
chain = append(chain, blocks...)
insert(chain[81:121])
waitSuspend()
ti.expEvent(testIndexerEvent{ev: "Suspended"})
for i := uint64(81); i <= 120; i++ {
ti.expEvent(testIndexerEvent{ev: "AddBlockData", blockNumber: i, blockHash: chain[i].Hash(), hasBody: needBodies, hasReceipts: needReceipts})
ti.status(testIndexerStatus{ready: true})
}
expDone()
run(blockchain.Stop)
ti.expEvent(testIndexerEvent{ev: "Stop"})
expDone()
}
type testIndexer struct {
t *testing.T
eventCh chan testIndexerEvent
statusCh chan testIndexerStatus
}
type testIndexerEvent struct {
ev string
blockNumber uint64
blockHash common.Hash
hasBody, hasReceipts bool
}
type testIndexerStatus struct {
ready bool
needBlocks common.Range[uint64]
}
func (ti *testIndexer) expEvent(exp testIndexerEvent) {
var got testIndexerEvent
select {
case got = <-ti.eventCh:
case <-time.After(time.Second * 5):
}
if got != exp {
ti.t.Fatalf("Wrong indexer event received (expected: %v, got: %v)", exp, got)
}
}
func (ti *testIndexer) status(status testIndexerStatus) {
ti.statusCh <- status
}
func (ti *testIndexer) AddBlockData(header *types.Header, body *types.Body, receipts types.Receipts) (ready bool, needBlocks common.Range[uint64]) {
ti.eventCh <- testIndexerEvent{ev: "AddBlockData", blockNumber: header.Number.Uint64(), blockHash: header.Hash(), hasBody: body != nil, hasReceipts: receipts != nil}
status := <-ti.statusCh
return status.ready, status.needBlocks
}
func (ti *testIndexer) Revert(blockNumber uint64) {
ti.eventCh <- testIndexerEvent{ev: "Revert", blockNumber: blockNumber}
}
func (ti *testIndexer) Status() (ready bool, needBlocks common.Range[uint64]) {
ti.eventCh <- testIndexerEvent{ev: "Status"}
status := <-ti.statusCh
return status.ready, status.needBlocks
}
func (ti *testIndexer) SetHistoryCutoff(blockNumber uint64) {
ti.eventCh <- testIndexerEvent{ev: "SetHistoryCutoff", blockNumber: blockNumber}
}
func (ti *testIndexer) SetFinalized(blockNumber uint64) {
ti.eventCh <- testIndexerEvent{ev: "SetFinalized", blockNumber: blockNumber}
}
func (ti *testIndexer) Suspended() {
ti.eventCh <- testIndexerEvent{ev: "Suspended"}
}
func (ti *testIndexer) Stop() {
ti.eventCh <- testIndexerEvent{ev: "Stop"}
}