core: use atomic type #27011

This commit is contained in:
Daniel Liu 2025-04-18 19:58:36 +08:00 committed by Daniel Liu
parent 9a06e129b8
commit 57c40154be
4 changed files with 22 additions and 21 deletions

View file

@ -202,8 +202,8 @@ type BlockChain struct {
wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
engine consensus.Engine
validator Validator // Block and state validator interface
@ -1101,7 +1101,7 @@ func (bc *BlockChain) saveData() {
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
if !bc.stopping.CompareAndSwap(false, true) {
return
}
@ -1128,12 +1128,12 @@ func (bc *BlockChain) Stop() {
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.procInterrupt.Store(true)
}
// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
return bc.procInterrupt.Load()
}
func (bc *BlockChain) procFutureBlocks() {
@ -1244,7 +1244,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
for i, block := range blockChain {
receipts := receiptChain[i]
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
return 0, nil
}
blockHash, blockNumber := block.Hash(), block.NumberU64()
@ -1698,7 +1698,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil; block, err = it.next() {
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
log.Debug("Premature abort during blocks processing")
break
}
@ -1722,7 +1722,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
go func(start time.Time) {
@ -1730,7 +1730,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(&followupInterrupt) == 1 {
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now())
@ -1743,7 +1743,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
tradingState, lendingState, err := bc.processTradingAndLendingStates(isTIPXDCXReceiver, block, parent, statedb)
if err != nil {
bc.reportBlock(block, nil, err)
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
return it.index, events, coalescedLogs, err
}
feeCapacity := state.GetTRC21FeeCapacityFromStateWithCache(parent.Root, statedb)
@ -1751,7 +1751,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
t1 := time.Now()
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
return it.index, events, coalescedLogs, err
}
// Validate the state using the default validator
@ -1766,7 +1766,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Write the block to the chain and get the status.
status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState)
t3 := time.Now()
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
if err != nil {
return it.index, events, coalescedLogs, err
}
@ -1955,8 +1955,8 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
blocks, memory = blocks[:0], 0
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
if bc.insertStopped() {
log.Debug("Abort during blocks processing")
return 0, nil, nil, nil
}
}
@ -2017,7 +2017,7 @@ func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*Resu
bc.calculatingBlock.Add(block.HashNoValidator(), calculatedBlock)
// Start the parallel header verifier
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
log.Debug("Premature abort during blocks processing")
return nil, ErrBlacklistedHash
}

View file

@ -73,7 +73,7 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
active uint32 // Flag whether the event loop was started
active atomic.Bool // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
ctx context.Context
@ -153,7 +153,7 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
// If needed, tear down the secondary event loop
if atomic.LoadUint32(&c.active) != 0 {
if c.active.Load() {
c.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
@ -183,7 +183,7 @@ func (c *ChainIndexer) Close() error {
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
c.active.Store(true)
defer sub.Unsubscribe()

View file

@ -48,7 +48,7 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) {
var (
header = block.Header()
gaspool = new(GasPool).AddGas(block.GasLimit())
@ -60,7 +60,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
byzantium := p.config.IsByzantium(block.Number())
for i, tx := range block.Transactions() {
// If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
if interrupt != nil && interrupt.Load() {
return
}
// Convert the transaction into an executable message and pre-cache its sender

View file

@ -18,6 +18,7 @@ package core
import (
"math/big"
"sync/atomic"
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
@ -48,7 +49,7 @@ type Prefetcher interface {
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool)
}
// Processor is an interface for processing blocks using a given initial state.