Merge pull request #779 from gzliudan/core_blockchain

upgrade the core package for reorg
This commit is contained in:
Daniel Liu 2024-12-28 13:05:40 +08:00 committed by GitHub
commit 911fe219df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 772 additions and 312 deletions

View file

@ -48,6 +48,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/ethclient"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/internal/syncx"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/params"
@ -81,6 +82,9 @@ var (
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
CheckpointCh = make(chan int)
)
@ -149,9 +153,11 @@ type BlockChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block
mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock
// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
chainmu *syncx.ClosableMutex
procmu sync.RWMutex // block processor lock
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
@ -170,10 +176,10 @@ type BlockChain struct {
// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]
wg sync.WaitGroup // chain processing wait group for shutting down
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing
engine consensus.Engine
processor Processor // block processor interface
@ -212,6 +218,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
triegc: prque.New[int64, common.Hash](nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, types.Receipts](receiptsCacheLimit),
@ -262,8 +269,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
}
// Take ownership of this particular state
go bc.update()
// Start future block processor.
bc.wg.Add(1)
go bc.futureBlocksLoop()
return bc, nil
}
@ -411,15 +421,74 @@ func (bc *BlockChain) loadLastState() error {
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)
bc.mu.Lock()
defer bc.mu.Unlock()
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()
updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
if newHeadBlock == nil {
newHeadBlock = bc.genesisBlock
} else {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
newHeadBlock = bc.genesisBlock
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
// If either blocks reached nil, reset to the genesis state
if newHeadFastBlock == nil {
newHeadFastBlock = bc.genesisBlock
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
}
// Rewind the header chain, deleting all block bodies until then
delFn := func(hash common.Hash, num uint64) {
DeleteBody(bc.db, hash, num)
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num + 1); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
bc.hc.SetHead(head, delFn)
currentHeader := bc.hc.CurrentHeader()
bc.hc.SetHead(head, updateFn, delFn)
// Clear out any stale content from the caches
bc.bodyCache.Purge()
@ -429,38 +498,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.futureBlocks.Purge()
bc.blocksHashCache.Purge()
// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
headBlockGauge.Update(int64(currentHeader.Number.Uint64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() {
bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
headFastBlockGauge.Update(int64(currentHeader.Number.Uint64()))
}
// If either blocks reached nil, reset to the genesis state
if currentBlock := bc.CurrentBlock(); currentBlock == nil {
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
}
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
if err := WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
log.Crit("Failed to reset head fast block", "err", err)
}
return bc.loadLastState()
}
@ -475,11 +512,14 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil {
return err
}
// If all checks out, manually set the head block
bc.mu.Lock()
// If all checks out, manually set the head block.
if !bc.chainmu.TryLock() {
return errChainStopped
}
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
bc.mu.Unlock()
bc.chainmu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
@ -598,24 +638,28 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.SetHead(0); err != nil {
return err
}
bc.mu.Lock()
defer bc.mu.Unlock()
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
log.Crit("Failed to write genesis block TD", "err", err)
batch := bc.db.NewBatch()
rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
rawdb.WriteBlock(batch, genesis)
if err := batch.Write(); err != nil {
log.Crit("Failed to write genesis block", "err", err)
}
rawdb.WriteBlock(bc.db, genesis)
bc.writeHeadBlock(genesis, false)
// Last update all in-memory chain markers
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock, false)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
@ -672,8 +716,10 @@ func (bc *BlockChain) Export(w io.Writer) error {
// ExportN writes a subset of the active chain to the given writer.
func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
bc.mu.RLock()
defer bc.mu.RUnlock()
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()
if first > last {
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
@ -690,31 +736,41 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return err
}
}
return nil
}
// insert injects a new head block into the current block chain. This method
// writeHeadBlock injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block, writeBlock bool) {
func (bc *BlockChain) writeHeadBlock(block *types.Block, writeBlock bool) {
blockHash := block.Hash()
blockNumberU64 := block.NumberU64()
// If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := GetCanonicalHash(bc.db, blockNumberU64) != blockHash
// Add the block to the canonical chain number scheme and mark as the head
rawdb.WriteCanonicalHash(bc.db, blockHash, blockNumberU64)
rawdb.WriteHeadBlockHash(bc.db, blockHash)
batch := bc.db.NewBatch()
rawdb.WriteHeadHeaderHash(batch, blockHash)
rawdb.WriteHeadFastBlockHash(batch, blockHash)
rawdb.WriteCanonicalHash(batch, blockHash, blockNumberU64)
rawdb.WriteTxLookupEntriesByBlock(batch, block)
rawdb.WriteHeadBlockHash(batch, blockHash)
if writeBlock {
rawdb.WriteBlock(bc.db, block)
rawdb.WriteBlock(batch, block)
}
// Flush the whole batch into the disk, exit the node if failed
if err := batch.Write(); err != nil {
log.Crit("Failed to update chain indexes and markers", "err", err)
}
// Update all in-memory chain markers in the last step
bc.hc.SetCurrentHeader(block.Header())
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(blockNumberU64))
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
@ -725,17 +781,6 @@ func (bc *BlockChain) insert(block *types.Block, writeBlock bool) {
engine.CacheNoneTIPSigningTxs(block.Header(), block.Transactions(), bc.GetReceiptsByHash(blockHash))
}
}
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
if err := WriteHeadFastBlockHash(bc.db, blockHash); err != nil {
log.Crit("Failed to insert head fast block hash", "err", err)
}
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
// Genesis retrieves the chain's genesis block.
@ -1001,15 +1046,38 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}
// Unsubscribe all subscriptions registered from blockchain
// Unsubscribe all subscriptions registered from blockchain.
bc.scope.Close()
// Signal shutdown to all goroutines.
close(bc.quit)
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.StopInsert()
// Now wait for all chain modifications to end and persistent goroutines to exit.
//
// Note: Close waits for the mutex to become available, i.e. any running chain
// modification will have exited when Close returns. Since we also called StopInsert,
// the mutex should become available quickly. It cannot be taken again after Close has
// returned.
bc.chainmu.Close()
bc.wg.Wait()
bc.saveData()
log.Info("Blockchain manager stopped")
}
// StopInsert interrupts all insertion methods, causing them to return
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1)
}
// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
}
func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
@ -1053,34 +1121,49 @@ const (
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
bc.mu.Lock()
defer bc.mu.Unlock()
if !bc.chainmu.TryLock() {
return
}
defer bc.chainmu.Unlock()
batch := bc.db.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
currentHeader := bc.hc.CurrentHeader()
if currentHeader.Hash() == hash {
bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash)
bc.hc.SetCurrentHeader(newHeadHeader)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash())
bc.currentFastBlock.Store(newFastBlock)
WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback chain markers", "err", err)
}
// TODO: Truncate ancient data which exceeds the current header.
}
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
// We don't require the chainMu here since we want to maximize the
// concurrency of header insertion and receipt insertion.
bc.wg.Add(1)
defer bc.wg.Done()
@ -1128,8 +1211,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := WriteTxLookupEntries(batch, block); err != nil {
return i, fmt.Errorf("failed to write lookup metadata: %v", err)
}
stats.processed++
// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
@ -1137,7 +1222,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
bytes += batch.ValueSize()
batch.Reset()
}
stats.processed++
}
// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
bytes += batch.ValueSize()
if err := batch.Write(); err != nil {
@ -1146,7 +1235,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Update the head fast sync block if better
bc.mu.Lock()
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
currentFastBlock := bc.CurrentFastBlock()
@ -1158,7 +1249,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
headFastBlockGauge.Update(int64(head.NumberU64()))
}
}
bc.mu.Unlock()
bc.chainmu.Unlock()
log.Info("Imported new block receipts",
"count", stats.processed,
@ -1172,24 +1263,38 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var lastWrite uint64
// WriteBlockWithoutState writes only the block and its metadata to the database,
// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
bc.wg.Add(1)
defer bc.wg.Done()
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
return err
func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
if bc.insertStopped() {
return errInsertionInterrupted
}
batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block)
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
rawdb.WriteBlock(bc.db, block)
return nil
}
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
if !bc.chainmu.TryLock() {
return NonStatTy, errInsertionInterrupted
}
defer bc.chainmu.Unlock()
return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState)
}
// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
if bc.insertStopped() {
return NonStatTy, errInsertionInterrupted
}
// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
@ -1197,24 +1302,29 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
bc.mu.Lock()
defer bc.mu.Unlock()
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
// Irrelevant of the canonical status, write the block itself to the database
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
return NonStatTy, err
// Irrelevant of the canonical status, write the block itself to the database.
//
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
// Write other block data using a batch.
batch := bc.db.NewBatch()
rawdb.WriteBlock(batch, block)
// Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
}
triedb := bc.stateCache.TrieDB()
tradingRoot := common.Hash{}
if tradingState != nil {
tradingRoot, err = tradingState.Commit()
@ -1229,6 +1339,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
}
engine, _ := bc.Engine().(*XDPoS.XDPoS)
var tradingTrieDb *trie.Database
var tradingService utils.TradingService
@ -1244,7 +1355,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
lendingTrieDb = lendingService.GetStateCache().TrieDB()
}
}
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush
if bc.cacheConfig.Disabled {
if err := triedb.Commit(root, false); err != nil {
@ -1352,9 +1463,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
}
}
}
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return NonStatTy, err
}
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
@ -1364,24 +1473,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
// Split same-difficulty blocks by number
reorg = block.NumberU64() > currentBlock.NumberU64()
}
// This is the ETH fix. We shall ultimately have this workflow,
// but due to below code has diverged significantly between ETH and XDC, and current issue we have,
// it's best to have it in a different PR with more investigations.
// if reorg {
// // Write the positional metadata for transaction and receipt lookups
// if err := WriteTxLookupEntries(batch, block); err != nil {
// return NonStatTy, err
// }
// // Write hash preimages
// if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
// return NonStatTy, err
// }
// }
// if err := batch.Write(); err != nil {
// return NonStatTy, err
// }
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
@ -1389,26 +1480,15 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
}
// Write the positional metadata for transaction and receipt lookups
if err := WriteTxLookupEntries(batch, block); err != nil {
return NonStatTy, err
}
// Write hash preimages
if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
return NonStatTy, err
}
status = CanonStatTy
} else {
status = SideStatTy
}
if err := batch.Write(); err != nil {
return NonStatTy, err
}
// Set new head.
if status == CanonStatTy {
// WriteBlock has already been called, no need to write again
bc.insert(block, false)
bc.writeHeadBlock(block, false)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
@ -1435,38 +1515,51 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
//
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil
}
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
block, prev := chain[i], chain[i-1]
if block.NumberU64() != chain[i-1].NumberU64()+1 || block.ParentHash() != chain[i-1].Hash() {
// Chain broke ancestry, log a messge (programming error) and skip insertion
log.Error("Non contiguous block insert",
"number", block.Number(),
"hash", block.Hash(),
"parent", block.ParentHash(),
"prevnumber", prev.Number(),
"prevhash", prev.Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(),
prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
}
}
// Pre-check passed, start the full block imports.
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
n, events, logs, err := bc.insertChain(chain, true)
bc.PostChainEvents(events, logs)
return n, err
}
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
// insertChain is the internal implementation of InsertChain, which assumes that
// 1) chains are contiguous, and 2) The chain mutex is held.
//
// This method is split out so that import batches that require re-injecting
// historical blocks can do so without releasing the lock, which could lead to
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
// If the chain is terminating, don't even bother starting up.
if bc.insertStopped() {
return 0, nil, nil, nil
}
engine, _ := bc.Engine().(*XDPoS.XDPoS)
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
// Chain broke ancestry, log a messge (programming error) and skip insertion
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
}
}
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
defer bc.wg.Done()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
@ -1543,7 +1636,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
if localTd.Cmp(externTd) > 0 {
if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
if err = bc.writeBlockWithoutState(block, externTd); err != nil {
return i, events, coalescedLogs, err
}
continue
@ -1561,10 +1654,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
}
log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner))
// Import all the pruned blocks to make the state available
bc.chainmu.Unlock()
// During reorg, we use verifySeals=false
_, evs, logs, err := bc.insertChain(winner, false)
bc.chainmu.Lock()
events, coalescedLogs = evs, logs
if err != nil {
@ -1593,6 +1684,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
var tradingService utils.TradingService
var lendingService utils.LendingService
isSDKNode := false
engine, _ := bc.Engine().(*XDPoS.XDPoS)
if bc.Config().IsTIPXDCXReceiver(block.Number()) && bc.chainConfig.XDPoS != nil && engine != nil && block.NumberU64() > bc.chainConfig.XDPoS.Epoch {
author, err := bc.Engine().Author(block.Header()) // Ignore error, we're past header validation
if err != nil {
@ -1710,7 +1802,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
proctime := time.Since(bstart)
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, statedb, tradingState, lendingState)
status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState)
t3 := time.Now()
if err != nil {
return i, events, coalescedLogs, err
@ -2056,12 +2148,14 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L
bc.wg.Add(1)
defer bc.wg.Done()
// Write the block to the chain and get the status.
bc.chainmu.Lock()
if !bc.chainmu.TryLock() {
return nil, nil, errChainStopped
}
defer bc.chainmu.Unlock()
if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) {
return events, coalescedLogs, nil
}
status, err := bc.WriteBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState)
status, err := bc.writeBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState)
if err != nil {
return events, coalescedLogs, err
@ -2169,23 +2263,25 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
var logs []*types.Log
for _, receipt := range receipts {
for _, log := range receipt.Logs {
l := *log
l.Removed = removed
logs = append(logs, &l)
if removed {
log.Removed = true
}
logs = append(logs, log)
}
}
return logs
}
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
addedTxs types.Transactions
deletedLogs []*types.Log
)
log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64())
@ -2234,6 +2330,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return errors.New("invalid new chain")
}
}
// Ensure XDPoS engine committed block will be not reverted
if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok {
latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo()
@ -2259,6 +2356,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
}
}
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Warn
@ -2273,16 +2371,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
// Insert the new chain, taking care of the proper incremental order
var addedTxs types.Transactions
// Insert the new chain(except the head block(reverse order)),
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
bc.insert(newChain[i], true)
// write lookup entries for hash based transaction/receipt searches
if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil {
return err
}
bc.writeHeadBlock(newChain[i], true)
// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
@ -2291,20 +2389,36 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
}
}
// calculate the difference between deleted and added transactions
diff := types.TxDifference(deletedTxs, addedTxs)
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
for _, tx := range diff {
DeleteTxLookupEntry(bc.db, tx.Hash())
// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(indexesBatch, i)
}
if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
for i := len(oldChain) - 1; i >= 0; i-- {
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
}
}()
}
@ -2333,7 +2447,10 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
}
}
func (bc *BlockChain) update() {
// futureBlocksLoop processes the 'future block' queue.
func (bc *BlockChain) futureBlocksLoop() {
defer bc.wg.Done()
futureTimer := time.NewTicker(10 * time.Millisecond)
defer futureTimer.Stop()
for {
@ -2417,17 +2534,12 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
return i, err
}
// Make sure only one thread manipulates the chain at once
bc.chainmu.Lock()
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
bc.wg.Add(1)
defer bc.wg.Done()
whFunc := func(header *types.Header) error {
bc.mu.Lock()
defer bc.mu.Unlock()
_, err := bc.hc.WriteHeader(header)
return err
}

View file

@ -25,10 +25,9 @@ import (
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
@ -129,11 +128,11 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
blockchain.mu.Lock()
blockchain.chainmu.MustLock()
WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
rawdb.WriteBlock(blockchain.db, block)
statedb.Commit(true)
blockchain.mu.Unlock()
blockchain.chainmu.Unlock()
}
return nil
}
@ -147,10 +146,10 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
return err
}
// Manually insert the header into the database, but don't reorganise (allows subsequent testing)
blockchain.mu.Lock()
blockchain.chainmu.MustLock()
WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(blockchain.db, header)
blockchain.mu.Unlock()
blockchain.chainmu.Unlock()
}
return nil
}

View file

@ -487,11 +487,6 @@ func DeleteBlockReceipts(db DatabaseDeleter, hash common.Hash, number uint64) {
db.Delete(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...))
}
// DeleteTxLookupEntry removes all transaction data associated with a hash.
func DeleteTxLookupEntry(db DatabaseDeleter, hash common.Hash) {
db.Delete(append(lookupPrefix, hash.Bytes()...))
}
// PreimageTable returns a Database instance with the key prefix for preimage entries.
func PreimageTable(db ethdb.Database) ethdb.Database {
return rawdb.NewTable(db, preimagePrefix)

View file

@ -24,8 +24,8 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
"golang.org/x/crypto/sha3"
"github.com/XinFinOrg/XDPoSChain/rlp"
"golang.org/x/crypto/sha3"
)
// Tests block header storage and retrieval operations.
@ -304,7 +304,7 @@ func TestLookupStorage(t *testing.T) {
}
// Delete the transactions and check purge
for i, tx := range txs {
DeleteTxLookupEntry(db, tx.Hash())
rawdb.DeleteTxLookupEntry(db, tx.Hash())
if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn)
}

View file

@ -45,6 +45,14 @@ const (
// HeaderChain implements the basic block header chain logic that is shared by
// core.BlockChain and light.LightChain. It is not usable in itself, only as
// a part of either structure.
//
// HeaderChain is responsible for maintaining the header chain including the
// header query and updating.
//
// The components maintained by headerchain includes: (1) total difficult
// (2) header (3) block hash -> number mapping (4) canonical number -> hash mapping
// and (5) head header flag.
//
// It is not thread safe either, the encapsulating chain structures should do
// the necessary mutex locking/unlocking.
type HeaderChain struct {
@ -66,11 +74,8 @@ type HeaderChain struct {
engine consensus.Engine
}
// NewHeaderChain creates a new HeaderChain structure.
//
// getValidator should return the parent's validator
// procInterrupt points to the parent's interrupt semaphore
// wg points to the parent's shutdown wait group
// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points
// to the parent's interrupt semaphore.
func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) {
// Seed a fast but crypto originating random generator
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
@ -143,41 +148,54 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
externTd := new(big.Int).Add(header.Difficulty, ptd)
// Irrelevant of the canonical status, write the td and header to the database
if err := hc.WriteTd(hash, number, externTd); err != nil {
log.Crit("Failed to write header total difficulty", "err", err)
//
// Note all the components of header(td, hash->number index and header) should
// be written atomically.
headerBatch := hc.chainDb.NewBatch()
rawdb.WriteTd(headerBatch, hash, number, externTd)
rawdb.WriteHeader(headerBatch, header)
if err := headerBatch.Write(); err != nil {
log.Crit("Failed to write header into disk", "err", err)
}
rawdb.WriteHeader(hc.chainDb, header)
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
// If the header can be added into canonical chain, adjust the
// header chain markers(canonical indexes and head header flag).
//
// Note all markers should be written atomically.
// Delete any canonical number assignments above the new head
markerBatch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ {
hash := GetCanonicalHash(hc.chainDb, i)
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
}
DeleteCanonicalHash(hc.chainDb, i)
rawdb.DeleteCanonicalHash(markerBatch, i)
}
// Overwrite any stale canonical number assignments
var (
headHash = header.ParentHash
headNumber = header.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
)
for GetCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
}
// Extend the canonical chain with the new header
rawdb.WriteCanonicalHash(hc.chainDb, hash, number)
if err := WriteHeadHeaderHash(hc.chainDb, hash); err != nil {
log.Crit("Failed to insert head header hash", "err", err)
rawdb.WriteCanonicalHash(markerBatch, hash, number)
rawdb.WriteHeadHeaderHash(markerBatch, hash)
if err := markerBatch.Write(); err != nil {
log.Crit("Failed to write header markers into disk", "err", err)
}
// Last step update all in-memory head header markers
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())
@ -186,10 +204,9 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
} else {
status = SideStatTy
}
hc.tdCache.Add(hash, externTd)
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
return
}
@ -328,16 +345,6 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int {
return hc.GetTd(hash, hc.GetBlockNumber(hash))
}
// WriteTd stores a block's total difficulty into the database, also caching it
// along the way.
func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error {
if err := WriteTd(hc.chainDb, hash, number, td); err != nil {
return err
}
hc.tdCache.Add(hash, new(big.Int).Set(td))
return nil
}
// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header {
@ -361,12 +368,13 @@ func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header {
}
// HasHeader checks if a block header is present in the database or not.
// In theory, if header is present in the database, all relative components
// like td and hash->number should be present too.
func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) {
return true
}
ok, _ := hc.chainDb.Has(headerKey(hash, number))
return ok
return rawdb.HasHeader(hc.chainDb, hash, number)
}
// GetHeaderByNumber retrieves a block header from the database by number,
@ -390,58 +398,79 @@ func (hc *HeaderChain) CurrentHeader() *types.Header {
return hc.currentHeader.Load().(*types.Header)
}
// SetCurrentHeader sets the current head header of the canonical chain.
// SetCurrentHeader sets the in-memory head header marker of the canonical chan
// as the given header.
func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
if err := WriteHeadHeaderHash(hc.chainDb, head.Hash()); err != nil {
log.Crit("Failed to insert head header hash", "err", err)
}
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64())
}
// DeleteCallback is a callback function that is called by SetHead before
// each header is deleted.
type DeleteCallback func(common.Hash, uint64)
type (
// UpdateHeadBlocksCallback is a callback function that is called by SetHead
// before head header is updated.
UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header)
// SetHead rewinds the local chain to a new head. Everything above the new head
// will be deleted and the new one set.
func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
height := uint64(0)
if hdr := hc.CurrentHeader(); hdr != nil {
height = hdr.Number.Uint64()
}
// DeleteBlockContentCallback is a callback function that is called by SetHead
// before each header is deleted.
DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64)
)
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) {
var (
parentHash common.Hash
batch = hc.chainDb.NewBatch()
)
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash := hdr.Hash()
num := hdr.Number.Uint64()
if delFn != nil {
delFn(hash, num)
hash, num := hdr.Hash(), hdr.Number.Uint64()
// Rewind block chain to new head.
parent := hc.GetHeader(hdr.ParentHash, num-1)
if parent == nil {
parent = hc.genesisHeader
}
DeleteHeader(hc.chainDb, hash, num)
DeleteTd(hc.chainDb, hash, num)
hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
parentHash = hdr.ParentHash
// Notably, since geth has the possibility for setting the head to a low
// height which is even lower than ancient head.
// In order to ensure that the head is always no higher than the data in
// the database(ancient store or active store), we need to update head
// first then remove the relative data from the database.
//
// Update head first(head fast block, head full block) before deleting the data.
markerBatch := hc.chainDb.NewBatch()
if updateFn != nil {
updateFn(markerBatch, parent)
}
// Update head header then.
rawdb.WriteHeadHeaderHash(markerBatch, parentHash)
if err := markerBatch.Write(); err != nil {
log.Crit("Failed to update chain markers", "error", err)
}
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
// Remove the relative data from the database.
if delFn != nil {
delFn(batch, hash, num)
}
// Rewind header chain to new head.
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
rawdb.DeleteCanonicalHash(batch, num)
}
// Roll back the canonical chain numbering
for i := height; i > head; i-- {
DeleteCanonicalHash(hc.chainDb, i)
// Flush all accumulated deletions.
if err := batch.Write(); err != nil {
log.Crit("Failed to rewind block", "error", err)
}
// Clear out any stale content from the caches
hc.headerCache.Purge()
hc.tdCache.Purge()
hc.numberCache.Purge()
if hc.CurrentHeader() == nil {
hc.currentHeader.Store(hc.genesisHeader)
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
if err := WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash); err != nil {
log.Crit("Failed to reset head header hash", "err", err)
}
}
// SetGenesis sets a new genesis block header for the chain

View file

@ -24,12 +24,32 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rlp"
)
// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
data, _ := db.Ancient(freezerHashTable, number)
if len(data) == 0 {
data, _ = db.Get(headerHashKey(number))
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
if len(data) == 0 {
data, _ = db.Ancient(freezerHashTable, number)
}
}
if len(data) == 0 {
return common.Hash{}
}
return common.BytesToHash(data)
}
// WriteCanonicalHash stores the hash assigned to a canonical block number.
func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil {
@ -37,6 +57,13 @@ func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64
}
}
// DeleteCanonicalHash removes the number to hash canonical mapping.
func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) {
if err := db.Delete(headerHashKey(number)); err != nil {
log.Crit("Failed to delete number to hash mapping", "err", err)
}
}
// ReadHeaderNumber returns the header number assigned to a hash.
func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 {
data, _ := db.Get(headerNumberKey(hash))
@ -56,6 +83,20 @@ func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64)
}
}
// DeleteHeaderNumber removes hash->number mapping.
func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Delete(headerNumberKey(hash)); err != nil {
log.Crit("Failed to delete hash to number mapping", "err", err)
}
}
// WriteHeadHeaderHash stores the hash of the current canonical head header.
func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headHeaderKey, hash.Bytes()); err != nil {
log.Crit("Failed to store last header's hash", "err", err)
}
}
// WriteHeadBlockHash stores the head block's hash.
func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headBlockKey, hash.Bytes()); err != nil {
@ -63,10 +104,47 @@ func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
}
}
// WriteHeadFastBlockHash stores the hash of the current fast-sync head block.
func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) {
if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil {
log.Crit("Failed to store last fast block's hash", "err", err)
}
}
// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
data, _ := db.Get(headerKey(number, hash))
return data
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return data
}
// Then try to look up the data in leveldb.
data, _ = db.Get(headerKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return data
}
return nil // Can't find the data anywhere.
}
// HasHeader verifies the existence of a block header corresponding to the hash.
func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
return true
}
if has, err := db.Has(headerKey(number, hash)); !has || err != nil {
return false
}
return true
}
// ReadHeader retrieves the block header corresponding to the hash.
@ -104,6 +182,22 @@ func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) {
}
}
// DeleteHeader removes all block header data associated with a hash.
func DeleteHeader(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
deleteHeaderWithoutNumber(db, hash, number)
if err := db.Delete(headerNumberKey(hash)); err != nil {
log.Crit("Failed to delete hash to number mapping", "err", err)
}
}
// deleteHeaderWithoutNumber removes only the block header but does not remove
// the hash to number mapping.
func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(headerKey(number, hash)); err != nil {
log.Crit("Failed to delete header", "err", err)
}
}
// ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
@ -165,6 +259,31 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t
WriteBodyRLP(db, hash, number, data)
}
// DeleteBody removes all block body data associated with a hash.
func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockBodyKey(number, hash)); err != nil {
log.Crit("Failed to delete block body", "err", err)
}
}
// WriteTd stores the total difficulty of a block into the database.
func WriteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64, td *big.Int) {
data, err := rlp.EncodeToBytes(td)
if err != nil {
log.Crit("Failed to RLP encode block total difficulty", "err", err)
}
if err := db.Put(headerTDKey(number, hash), data); err != nil {
log.Crit("Failed to store block total difficulty", "err", err)
}
}
// DeleteTd removes all block total difficulty data associated with a hash.
func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(headerTDKey(number, hash)); err != nil {
log.Crit("Failed to delete block total difficulty", "err", err)
}
}
// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
@ -270,6 +389,13 @@ func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rec
}
}
// DeleteReceipts removes all receipt data associated with a block hash.
func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
log.Crit("Failed to delete block receipts", "err", err)
}
}
// storedReceiptRLP is the storage encoding of a receipt.
// Re-definition in core/types/receipt.go.
type storedReceiptRLP struct {

View file

@ -0,0 +1,56 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/rlp"
)
type TxLookupEntry struct {
BlockHash common.Hash
BlockIndex uint64
Index uint64
}
// WriteTxLookupEntriesByBlock stores a positional metadata for every transaction from
// a block, enabling hash based transaction and receipt lookups.
func WriteTxLookupEntriesByBlock(db ethdb.KeyValueWriter, block *types.Block) {
// Iterate over each transaction and encode its metadata
for i, tx := range block.Transactions() {
entry := TxLookupEntry{
BlockHash: block.Hash(),
BlockIndex: block.NumberU64(),
Index: uint64(i),
}
data, err := rlp.EncodeToBytes(entry)
if err != nil {
log.Crit("Failed to RLP encode TxLookupEntry", "err", err)
}
if err := db.Put(txLookupKey(tx.Hash()), data); err != nil {
log.Crit("Failed to store tx lookup entry", "err", err)
}
}
}
// DeleteTxLookupEntry removes all transaction data associated with a hash.
func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) {
db.Delete(txLookupKey(hash))
}

View file

@ -0,0 +1,34 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
)
// WritePreimages writes the provided set of preimages to the database.
func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) {
for hash, preimage := range preimages {
if err := db.Put(preimageKey(hash), preimage); err != nil {
log.Crit("Failed to store trie preimage", "err", err)
}
}
preimageCounter.Inc(int64(len(preimages)))
preimageHitCounter.Inc(int64(len(preimages)))
}

View file

@ -21,23 +21,41 @@ import (
"encoding/binary"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/metrics"
)
// The fields below define the low level database schema prefixing.
var (
// headHeaderKey tracks the latest known header's hash.
headHeaderKey = []byte("LastHeader")
// headBlockKey tracks the latest known full block's hash.
headBlockKey = []byte("LastBlock")
// headFastBlockKey tracks the latest known incomplete block's hash during fast sync.
headFastBlockKey = []byte("LastFast")
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
headerHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash
headerNumberPrefix = []byte("H") // headerNumberPrefix + hash -> num (uint64 big endian)
blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil)
preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil)
)
const (
// freezerHeaderTable indicates the name of the freezer header table.
freezerHeaderTable = "headers"
// freezerHashTable indicates the name of the freezer canonical hash table.
freezerHashTable = "hashes"
@ -60,6 +78,11 @@ func headerKey(number uint64, hash common.Hash) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}
// headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix
func headerTDKey(number uint64, hash common.Hash) []byte {
return append(headerKey(number, hash), headerTDSuffix...)
}
// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix
func headerHashKey(number uint64) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...)
@ -79,3 +102,13 @@ func blockBodyKey(number uint64, hash common.Hash) []byte {
func blockReceiptsKey(number uint64, hash common.Hash) []byte {
return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}
// txLookupKey = txLookupPrefix + hash
func txLookupKey(hash common.Hash) []byte {
return append(txLookupPrefix, hash.Bytes()...)
}
// preimageKey = preimagePrefix + hash
func preimageKey(hash common.Hash) []byte {
return append(preimagePrefix, hash.Bytes()...)
}

64
internal/syncx/mutex.go Normal file
View file

@ -0,0 +1,64 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package syncx contains exotic synchronization primitives.
package syncx
// ClosableMutex is a mutex that can also be closed.
// Once closed, it can never be taken again.
type ClosableMutex struct {
ch chan struct{}
}
func NewClosableMutex() *ClosableMutex {
ch := make(chan struct{}, 1)
ch <- struct{}{}
return &ClosableMutex{ch}
}
// TryLock attempts to lock cm.
// If the mutex is closed, TryLock returns false.
func (cm *ClosableMutex) TryLock() bool {
_, ok := <-cm.ch
return ok
}
// MustLock locks cm.
// If the mutex is closed, MustLock panics.
func (cm *ClosableMutex) MustLock() {
_, ok := <-cm.ch
if !ok {
panic("mutex closed")
}
}
// Unlock unlocks cm.
func (cm *ClosableMutex) Unlock() {
select {
case cm.ch <- struct{}{}:
default:
panic("Unlock of already-unlocked ClosableMutex")
}
}
// Close locks the mutex, then closes it.
func (cm *ClosableMutex) Close() {
_, ok := <-cm.ch
if !ok {
panic("Close of already-closed ClosableMutex")
}
close(cm.ch)
}

View file

@ -56,7 +56,6 @@ type LightChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block
mu sync.RWMutex
chainmu sync.RWMutex
bodyCache *lru.Cache[common.Hash, *types.Body]
@ -147,7 +146,6 @@ func (lc *LightChain) loadLastState() error {
lc.hc.SetCurrentHeader(header)
}
}
// Issue a status log and return
header := lc.hc.CurrentHeader()
headerTd := lc.GetTd(header.Hash(), header.Number.Uint64())
@ -159,10 +157,10 @@ func (lc *LightChain) loadLastState() error {
// SetHead rewinds the local chain to a new head. Everything above the new
// head will be deleted and the new one set.
func (lc *LightChain) SetHead(head uint64) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()
lc.hc.SetHead(head, nil)
lc.hc.SetHead(head, nil, nil)
lc.loadLastState()
}
@ -182,14 +180,17 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) {
// Dump the entire block chain and purge the caches
lc.SetHead(0)
lc.mu.Lock()
defer lc.mu.Unlock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain
if err := core.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
log.Crit("Failed to write genesis block TD", "err", err)
batch := lc.chainDb.NewBatch()
rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
rawdb.WriteBlock(batch, genesis)
rawdb.WriteHeadHeaderHash(batch, genesis.Hash())
if err := batch.Write(); err != nil {
log.Crit("Failed to reset genesis block", "err", err)
}
rawdb.WriteBlock(lc.chainDb, genesis)
lc.genesisBlock = genesis
lc.hc.SetGenesis(lc.genesisBlock.Header())
lc.hc.SetCurrentHeader(lc.genesisBlock.Header())
@ -297,16 +298,25 @@ func (lc *LightChain) Stop() {
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (lc *LightChain) Rollback(chain []common.Hash) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()
batch := lc.chainDb.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
if head := lc.hc.CurrentHeader(); head.Hash() == hash {
rawdb.WriteHeadHeaderHash(batch, head.ParentHash)
lc.hc.SetCurrentHeader(lc.GetHeader(head.ParentHash, head.Number.Uint64()-1))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback light chain", "error", err)
}
}
// postChainEvents iterates over the events generated by a chain insertion and
@ -344,19 +354,13 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
// Make sure only one thread manipulates the chain at once
lc.chainmu.Lock()
defer func() {
lc.chainmu.Unlock()
time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
}()
defer lc.chainmu.Unlock()
lc.wg.Add(1)
defer lc.wg.Done()
var events []interface{}
whFunc := func(header *types.Header) error {
lc.mu.Lock()
defer lc.mu.Unlock()
status, err := lc.hc.WriteHeader(header)
switch status {
@ -448,13 +452,17 @@ func (lc *LightChain) SyncCht(ctx context.Context) bool {
chtCount, _, _ := lc.odr.ChtIndexer().Sections()
if headNum+1 < chtCount*CHTFrequencyClient {
num := chtCount*CHTFrequencyClient - 1
header, err := GetHeaderByNumber(ctx, lc.odr, num)
if header != nil && err == nil {
lc.mu.Lock()
// Retrieve the latest useful header and update to it
if header, err := GetHeaderByNumber(ctx, lc.odr, num); header != nil && err == nil {
lc.chainmu.Lock()
defer lc.chainmu.Unlock()
// Ensure the chain didn't move past the latest block while retrieving it
if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() {
log.Info("Updated latest header based on CHT", "number", header.Number, "hash", header.Hash())
rawdb.WriteHeadHeaderHash(lc.chainDb, header.Hash())
lc.hc.SetCurrentHeader(header)
}
lc.mu.Unlock()
return true
}
}

View file

@ -18,10 +18,11 @@ package light
import (
"context"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"testing"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
@ -122,10 +123,10 @@ func testHeaderChainImport(chain []*types.Header, lightchain *LightChain) error
return err
}
// Manually insert the header into the database, but don't reorganize (allows subsequent testing)
lightchain.mu.Lock()
lightchain.chainmu.Lock()
core.WriteTd(lightchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, lightchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(lightchain.chainDb, header)
lightchain.mu.Unlock()
lightchain.chainmu.Unlock()
}
return nil
}

View file

@ -25,6 +25,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
@ -205,15 +206,17 @@ func (p *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number uin
// rollbackTxs marks the transactions contained in recently rolled back blocks
// as rolled back. It also removes any positional lookup entries.
func (p *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) {
batch := p.chainDb.NewBatch()
if list, ok := p.mined[hash]; ok {
for _, tx := range list {
txHash := tx.Hash()
core.DeleteTxLookupEntry(p.chainDb, txHash)
rawdb.DeleteTxLookupEntry(batch, txHash)
p.pending[txHash] = tx
txc.setState(txHash, false)
}
delete(p.mined, hash)
}
batch.Write()
}
// reorgOnNewHead sets a new head header, processing (and rolling back if necessary)