diff --git a/core/blockchain.go b/core/blockchain.go index b6d328cc64..6496abd2f9 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -149,7 +149,6 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock procmu sync.RWMutex // block processor lock @@ -411,8 +410,8 @@ func (bc *BlockChain) loadLastState() error { func (bc *BlockChain) SetHead(head uint64) error { log.Warn("Rewinding blockchain", "target", head) - bc.mu.Lock() - defer bc.mu.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() // Rewind the header chain, deleting all block bodies until then delFn := func(hash common.Hash, num uint64) { @@ -476,10 +475,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { return err } // If all checks out, manually set the head block - bc.mu.Lock() + bc.chainmu.Lock() bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) - bc.mu.Unlock() + bc.chainmu.Unlock() log.Info("Committed new head block", "number", block.Number(), "hash", hash) return nil @@ -598,8 +597,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.mu.Lock() - defer bc.mu.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { @@ -672,8 +671,8 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.mu.RLock() - defer bc.mu.RUnlock() + bc.chainmu.RLock() + defer bc.chainmu.RUnlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -690,7 +689,6 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return err } } - return nil } @@ -1053,8 +1051,8 @@ const ( // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (bc *BlockChain) Rollback(chain []common.Hash) { - bc.mu.Lock() - defer bc.mu.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] @@ -1146,7 +1144,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Update the head fast sync block if better - bc.mu.Lock() + bc.chainmu.Lock() head := blockChain[len(blockChain)-1] if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case currentFastBlock := bc.CurrentFastBlock() @@ -1158,7 +1156,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ headFastBlockGauge.Update(int64(head.NumberU64())) } } - bc.mu.Unlock() + bc.chainmu.Unlock() log.Info("Imported new block receipts", "count", stats.processed, @@ -1188,6 +1186,15 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState) +} + +// writeBlockWithState writes the block and all associated state to the database, +// but is expects the chain mutex to be held. +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -1197,9 +1204,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. return NonStatTy, consensus.ErrUnknownAncestor } // Make sure no inconsistent state is leaked during insertion - bc.mu.Lock() - defer bc.mu.Unlock() - currentBlock := bc.CurrentBlock() localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(block.Difficulty(), ptd) @@ -1710,7 +1714,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] proctime := time.Since(bstart) // Write the block to the chain and get the status. - status, err := bc.WriteBlockWithState(block, receipts, statedb, tradingState, lendingState) + status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState) t3 := time.Now() if err != nil { return i, events, coalescedLogs, err @@ -2061,7 +2065,7 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) { return events, coalescedLogs, nil } - status, err := bc.WriteBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState) + status, err := bc.writeBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState) if err != nil { return events, coalescedLogs, err @@ -2425,9 +2429,6 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i defer bc.wg.Done() whFunc := func(header *types.Header) error { - bc.mu.Lock() - defer bc.mu.Unlock() - _, err := bc.hc.WriteHeader(header) return err } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index de15d9ca6c..41283aa592 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -25,10 +25,9 @@ import ( "testing" "time" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" - "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" @@ -129,11 +128,11 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.mu.Lock() + blockchain.chainmu.Lock() WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(true) - blockchain.mu.Unlock() + blockchain.chainmu.Unlock() } return nil } @@ -147,10 +146,10 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.mu.Lock() + blockchain.chainmu.Lock() WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(blockchain.db, header) - blockchain.mu.Unlock() + blockchain.chainmu.Unlock() } return nil } diff --git a/light/lightchain.go b/light/lightchain.go index 9cbf95ddf9..7664750615 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -56,7 +56,6 @@ type LightChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex chainmu sync.RWMutex bodyCache *lru.Cache[common.Hash, *types.Body] @@ -159,8 +158,8 @@ func (lc *LightChain) loadLastState() error { // SetHead rewinds the local chain to a new head. Everything above the new // head will be deleted and the new one set. func (lc *LightChain) SetHead(head uint64) { - lc.mu.Lock() - defer lc.mu.Unlock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() lc.hc.SetHead(head, nil) lc.loadLastState() @@ -182,8 +181,8 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) { // Dump the entire block chain and purge the caches lc.SetHead(0) - lc.mu.Lock() - defer lc.mu.Unlock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain if err := core.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { @@ -297,8 +296,8 @@ func (lc *LightChain) Stop() { // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (lc *LightChain) Rollback(chain []common.Hash) { - lc.mu.Lock() - defer lc.mu.Unlock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] @@ -344,19 +343,13 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i // Make sure only one thread manipulates the chain at once lc.chainmu.Lock() - defer func() { - lc.chainmu.Unlock() - time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation - }() + defer lc.chainmu.Unlock() lc.wg.Add(1) defer lc.wg.Done() var events []interface{} whFunc := func(header *types.Header) error { - lc.mu.Lock() - defer lc.mu.Unlock() - status, err := lc.hc.WriteHeader(header) switch status { @@ -450,11 +443,12 @@ func (lc *LightChain) SyncCht(ctx context.Context) bool { num := chtCount*CHTFrequencyClient - 1 header, err := GetHeaderByNumber(ctx, lc.odr, num) if header != nil && err == nil { - lc.mu.Lock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() + if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() { lc.hc.SetCurrentHeader(header) } - lc.mu.Unlock() return true } } diff --git a/light/lightchain_test.go b/light/lightchain_test.go index 7d762e2e3f..4733810e94 100644 --- a/light/lightchain_test.go +++ b/light/lightchain_test.go @@ -18,10 +18,11 @@ package light import ( "context" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "testing" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" @@ -122,10 +123,10 @@ func testHeaderChainImport(chain []*types.Header, lightchain *LightChain) error return err } // Manually insert the header into the database, but don't reorganize (allows subsequent testing) - lightchain.mu.Lock() + lightchain.chainmu.Lock() core.WriteTd(lightchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, lightchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(lightchain.chainDb, header) - lightchain.mu.Unlock() + lightchain.chainmu.Unlock() } return nil }