diff --git a/core/blockchain.go b/core/blockchain.go index aa9ebd316d..f74b6c1513 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -202,8 +202,8 @@ type BlockChain struct { wg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. - running int32 // 0 if chain is running, 1 when stopped - procInterrupt int32 // interrupt signaler for block processing + stopping atomic.Bool // false if chain is running, true when stopped + procInterrupt atomic.Bool // interrupt signaler for block processing engine consensus.Engine validator Validator // Block and state validator interface @@ -1101,7 +1101,7 @@ func (bc *BlockChain) saveData() { // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { - if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { + if !bc.stopping.CompareAndSwap(false, true) { return } @@ -1128,12 +1128,12 @@ func (bc *BlockChain) Stop() { // errInsertionInterrupted as soon as possible. Insertion is permanently disabled after // calling this method. func (bc *BlockChain) StopInsert() { - atomic.StoreInt32(&bc.procInterrupt, 1) + bc.procInterrupt.Store(true) } // insertStopped returns true after StopInsert has been called. func (bc *BlockChain) insertStopped() bool { - return atomic.LoadInt32(&bc.procInterrupt) == 1 + return bc.procInterrupt.Load() } func (bc *BlockChain) procFutureBlocks() { @@ -1244,7 +1244,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ for i, block := range blockChain { receipts := receiptChain[i] // Short circuit insertion if shutting down or processing failed - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { return 0, nil } blockHash, blockNumber := block.Hash(), block.NumberU64() @@ -1698,7 +1698,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // No validation errors for the first block (or chain prefix skipped) for ; block != nil && err == nil; block, err = it.next() { // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { log.Debug("Premature abort during blocks processing") break } @@ -1722,7 +1722,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. - var followupInterrupt uint32 + var followupInterrupt atomic.Bool if !bc.cacheConfig.TrieCleanNoPrefetch { if followup, err := it.peek(); followup != nil && err == nil { go func(start time.Time) { @@ -1730,7 +1730,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) blockPrefetchExecuteTimer.Update(time.Since(start)) - if atomic.LoadUint32(&followupInterrupt) == 1 { + if followupInterrupt.Load() { blockPrefetchInterruptMeter.Mark(1) } }(time.Now()) @@ -1743,7 +1743,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] tradingState, lendingState, err := bc.processTradingAndLendingStates(isTIPXDCXReceiver, block, parent, statedb) if err != nil { bc.reportBlock(block, nil, err) - atomic.StoreUint32(&followupInterrupt, 1) + followupInterrupt.Store(true) return it.index, events, coalescedLogs, err } feeCapacity := state.GetTRC21FeeCapacityFromStateWithCache(parent.Root, statedb) @@ -1751,7 +1751,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] t1 := time.Now() if err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + followupInterrupt.Store(true) return it.index, events, coalescedLogs, err } // Validate the state using the default validator @@ -1766,7 +1766,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Write the block to the chain and get the status. status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState) t3 := time.Now() - atomic.StoreUint32(&followupInterrupt, 1) + followupInterrupt.Store(true) if err != nil { return it.index, events, coalescedLogs, err } @@ -1955,8 +1955,8 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i blocks, memory = blocks[:0], 0 // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") + if bc.insertStopped() { + log.Debug("Abort during blocks processing") return 0, nil, nil, nil } } @@ -2017,7 +2017,7 @@ func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*Resu bc.calculatingBlock.Add(block.HashNoValidator(), calculatedBlock) // Start the parallel header verifier // If the chain is terminating, stop processing blocks - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + if bc.insertStopped() { log.Debug("Premature abort during blocks processing") return nil, ErrBlacklistedHash } diff --git a/core/chain_indexer.go b/core/chain_indexer.go index b1e7dbfcbf..e1e7018723 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -73,7 +73,7 @@ type ChainIndexer struct { backend ChainIndexerBackend // Background processor generating the index data content children []*ChainIndexer // Child indexers to cascade chain updates to - active uint32 // Flag whether the event loop was started + active atomic.Bool // Flag whether the event loop was started update chan struct{} // Notification channel that headers should be processed quit chan chan error // Quit channel to tear down running goroutines ctx context.Context @@ -153,7 +153,7 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } // If needed, tear down the secondary event loop - if atomic.LoadUint32(&c.active) != 0 { + if c.active.Load() { c.quit <- errc if err := <-errc; err != nil { errs = append(errs, err) @@ -183,7 +183,7 @@ func (c *ChainIndexer) Close() error { // queue. func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown - atomic.StoreUint32(&c.active, 1) + c.active.Store(true) defer sub.Unsubscribe() diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 10f455e2f5..62910989d5 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -48,7 +48,7 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. -func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { +func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) { var ( header = block.Header() gaspool = new(GasPool).AddGas(block.GasLimit()) @@ -60,7 +60,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c byzantium := p.config.IsByzantium(block.Number()) for i, tx := range block.Transactions() { // If block precaching was interrupted, abort - if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { + if interrupt != nil && interrupt.Load() { return } // Convert the transaction into an executable message and pre-cache its sender diff --git a/core/types.go b/core/types.go index ff03807408..3befed7067 100644 --- a/core/types.go +++ b/core/types.go @@ -18,6 +18,7 @@ package core import ( "math/big" + "sync/atomic" "github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate" "github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate" @@ -48,7 +49,7 @@ type Prefetcher interface { // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. - Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) } // Processor is an interface for processing blocks using a given initial state.