From f3e4866073d4650a7f461315c517333c6407ab5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Thu, 13 Mar 2025 12:35:10 +0100 Subject: [PATCH] core: update blockProcFeed in insertChain (#31065) This PR moves the updating of the `blockProcFeed` event feed from `InsertChain` to `insertChain` in order to ensure that the feed subscribers are notified whenever block processing happens. Note that this event is not subscribed to anywhere in our codebase at the moment, earlier it was used by the LES server to avoid slowing down block processing. Now I want to do the same with the log indexer, the problem is that back then every block insertion was done by `InsertChain`, now the regular payload insertion is done by `InsertBlockWithoutSetHead`. Both of these (and also `SetCanonical` if needed) calls `insertChain` so I moved the feed update there. --- core/blockchain.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 7aad5c9d26..2024c63b80 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -235,14 +235,15 @@ type BlockChain struct { statedb *state.CachingDB // State database to reuse between imports (contains state cache) txIndexer *txIndexer // Transaction indexer, might be nil if not enabled - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - blockProcFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + blockProcFeed event.Feed + blockProcCounter int32 + scope event.SubscriptionScope + genesisBlock *types.Block // This mutex synchronizes chain write operations. // Readers don't need to take it, they can just read the database. @@ -1574,8 +1575,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { if len(chain) == 0 { return 0, nil } - bc.blockProcFeed.Send(true) - defer bc.blockProcFeed.Send(false) // Do a sanity check that the provided chain is actually ordered and linked. for i := 1; i < len(chain); i++ { @@ -1615,6 +1614,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness if bc.insertStopped() { return nil, 0, nil } + + if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 { + bc.blockProcFeed.Send(true) + } + defer func() { + if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 { + bc.blockProcFeed.Send(false) + } + }() + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)