mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-03 21:48:36 +00:00
core,eth,triedb: route snap/2 completion through pathdb.AdoptSyncedState
This commit is contained in:
parent
4fd4120450
commit
e18088ea6e
6 changed files with 66 additions and 47 deletions
|
|
@ -337,6 +337,8 @@ func (s *Suite) snapGetAccessLists(t *utesting.T, tc *accessListsTest) error {
|
||||||
for _, p := range tc.mustBeEmptyAt {
|
for _, p := range tc.mustBeEmptyAt {
|
||||||
mustEmpty[p] = struct{}{}
|
mustEmpty[p] = struct{}{}
|
||||||
}
|
}
|
||||||
|
head := s.chain.Head().Header()
|
||||||
|
rules := s.chain.config.Rules(head.Number, true, head.Time)
|
||||||
|
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
raw := it.Value()
|
raw := it.Value()
|
||||||
|
|
@ -383,7 +385,7 @@ func (s *Suite) snapGetAccessLists(t *utesting.T, tc *accessListsTest) error {
|
||||||
if err := rlp.DecodeBytes(raw, &accessList); err != nil {
|
if err := rlp.DecodeBytes(raw, &accessList); err != nil {
|
||||||
return fmt.Errorf("entry %d: invalid BAL RLP: %v", idx, err)
|
return fmt.Errorf("entry %d: invalid BAL RLP: %v", idx, err)
|
||||||
}
|
}
|
||||||
if err := accessList.Validate(); err != nil {
|
if err := accessList.Validate(rules); err != nil {
|
||||||
return fmt.Errorf("entry %d: BAL failed validation: %v", idx, err)
|
return fmt.Errorf("entry %d: BAL failed validation: %v", idx, err)
|
||||||
}
|
}
|
||||||
idx++
|
idx++
|
||||||
|
|
|
||||||
|
|
@ -1158,7 +1158,7 @@ func (bc *BlockChain) SnapSyncStart() error {
|
||||||
// given hash, regardless of the chain contents prior to snap sync. It is
|
// given hash, regardless of the chain contents prior to snap sync. It is
|
||||||
// invoked once snap sync completes and assumes that SnapSyncStart was called
|
// invoked once snap sync completes and assumes that SnapSyncStart was called
|
||||||
// previously.
|
// previously.
|
||||||
func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
|
func (bc *BlockChain) SnapSyncComplete(hash common.Hash, flatStateReady bool) error {
|
||||||
// Make sure that both the block as well at its state trie exists
|
// Make sure that both the block as well at its state trie exists
|
||||||
block := bc.GetBlockByHash(hash)
|
block := bc.GetBlockByHash(hash)
|
||||||
if block == nil {
|
if block == nil {
|
||||||
|
|
@ -1169,20 +1169,29 @@ func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
|
||||||
}
|
}
|
||||||
defer bc.chainmu.Unlock()
|
defer bc.chainmu.Unlock()
|
||||||
|
|
||||||
// Reset the trie database with the fresh snap synced state.
|
// Reset the trie database with the fresh snap synced state. Snap/1 needs
|
||||||
|
// a full trie-to-flat regeneration; snap/2 adopts the already-consistent
|
||||||
|
// flat state and skips that work.
|
||||||
root := block.Root()
|
root := block.Root()
|
||||||
if bc.triedb.Scheme() == rawdb.PathScheme {
|
if bc.triedb.Scheme() == rawdb.PathScheme {
|
||||||
if err := bc.triedb.Enable(root); err != nil {
|
if flatStateReady {
|
||||||
return err
|
if err := bc.triedb.AdoptSyncedState(root); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := bc.triedb.Enable(root); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !bc.HasState(root) {
|
if !bc.HasState(root) {
|
||||||
return fmt.Errorf("non existent state [%x..]", root[:4])
|
return fmt.Errorf("non existent state [%x..]", root[:4])
|
||||||
}
|
}
|
||||||
// Set up the snapshot tree from the synced flat state. Snap/2 downloads
|
|
||||||
// flat state directly as the snapshot.
|
// The legacy snapshot tree needs to be wiped and rebuilt from the trie
|
||||||
if bc.snaps != nil {
|
// after a snap/1 sync.
|
||||||
bc.snaps.InitFromSyncedState(root)
|
if !flatStateReady && bc.snaps != nil {
|
||||||
|
bc.snaps.Rebuild(root)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all checks out, manually set the head block.
|
// If all checks out, manually set the head block.
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/fastcache"
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
|
@ -727,35 +726,6 @@ func (t *Tree) Rebuild(root common.Hash) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitFromSyncedState sets up the snapshot tree to use flat state that was
|
|
||||||
// already downloaded by snap sync. Unlike Rebuild, it does NOT regenerate the
|
|
||||||
// snapshot from the trie.
|
|
||||||
func (t *Tree) InitFromSyncedState(root common.Hash) {
|
|
||||||
t.lock.Lock()
|
|
||||||
defer t.lock.Unlock()
|
|
||||||
|
|
||||||
// Delete any recovery flag in the database.
|
|
||||||
rawdb.DeleteSnapshotRecoveryNumber(t.diskdb)
|
|
||||||
rawdb.DeleteSnapshotDisabled(t.diskdb)
|
|
||||||
|
|
||||||
// Write the new root.
|
|
||||||
rawdb.WriteSnapshotRoot(t.diskdb, root)
|
|
||||||
|
|
||||||
// Clear the journal.
|
|
||||||
journalProgress(t.diskdb, nil, nil)
|
|
||||||
log.Info("Setting up snapshot from synced state", "root", root)
|
|
||||||
|
|
||||||
// Replace t.layers with a single diskLayer pointing at the root.
|
|
||||||
t.layers = map[common.Hash]snapshot{
|
|
||||||
root: &diskLayer{
|
|
||||||
diskdb: t.diskdb,
|
|
||||||
triedb: t.triedb,
|
|
||||||
cache: fastcache.New(t.config.CacheSize * 1024 * 1024),
|
|
||||||
root: root,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AccountIterator creates a new account iterator for the specified root hash and
|
// AccountIterator creates a new account iterator for the specified root hash and
|
||||||
// seeks to a starting account hash.
|
// seeks to a starting account hash.
|
||||||
func (t *Tree) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
|
func (t *Tree) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
|
||||||
|
|
|
||||||
|
|
@ -198,7 +198,7 @@ type BlockChain interface {
|
||||||
SnapSyncStart() error
|
SnapSyncStart() error
|
||||||
|
|
||||||
// SnapSyncComplete directly commits the head block to a certain entity.
|
// SnapSyncComplete directly commits the head block to a certain entity.
|
||||||
SnapSyncComplete(common.Hash) error
|
SnapSyncComplete(hash common.Hash, flatStateReady bool) error
|
||||||
|
|
||||||
// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
|
// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
|
||||||
// chain cutoff into the ancient store.
|
// chain cutoff into the ancient store.
|
||||||
|
|
@ -1052,7 +1052,9 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
|
||||||
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
|
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := d.blockchain.SnapSyncComplete(block.Hash()); err != nil {
|
|
||||||
|
// TODO JR: This needs to pass trie for snap/2 and false for snap/1
|
||||||
|
if err := d.blockchain.SnapSyncComplete(block.Hash(), true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
d.committed.Store(true)
|
d.committed.Store(true)
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ func (s *Syncer) applyAccessList(b *bal.BlockAccessList) error {
|
||||||
batch := s.db.NewBatch()
|
batch := s.db.NewBatch()
|
||||||
|
|
||||||
// Iterate over all accounts in the access list
|
// Iterate over all accounts in the access list
|
||||||
for _, access := range b.Accesses {
|
for _, access := range *b {
|
||||||
addr := common.Address(access.Address)
|
addr := common.Address(access.Address)
|
||||||
accountHash := crypto.Keccak256Hash(addr[:])
|
accountHash := crypto.Keccak256Hash(addr[:])
|
||||||
|
|
||||||
|
|
@ -91,8 +91,7 @@ func (s *Syncer) applyAccessList(b *bal.BlockAccessList) error {
|
||||||
|
|
||||||
// Apply balance change (last entry = post-block state)
|
// Apply balance change (last entry = post-block state)
|
||||||
if n := len(access.BalanceChanges); n > 0 {
|
if n := len(access.BalanceChanges); n > 0 {
|
||||||
raw := access.BalanceChanges[n-1].Balance
|
account.Balance = new(uint256.Int).Set(access.BalanceChanges[n-1].Balance)
|
||||||
account.Balance = new(uint256.Int).SetBytes(raw[:])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply nonce change (last entry = post-block state)
|
// Apply nonce change (last entry = post-block state)
|
||||||
|
|
@ -116,11 +115,13 @@ func (s *Syncer) applyAccessList(b *bal.BlockAccessList) error {
|
||||||
for _, slotWrites := range access.StorageWrites {
|
for _, slotWrites := range access.StorageWrites {
|
||||||
if n := len(slotWrites.Accesses); n > 0 {
|
if n := len(slotWrites.Accesses); n > 0 {
|
||||||
value := slotWrites.Accesses[n-1].ValueAfter
|
value := slotWrites.Accesses[n-1].ValueAfter
|
||||||
storageHash := crypto.Keccak256Hash(slotWrites.Slot[:])
|
slotKey := slotWrites.Slot.Bytes32()
|
||||||
if value == (common.Hash{}) {
|
storageHash := crypto.Keccak256Hash(slotKey[:])
|
||||||
|
if value.IsZero() {
|
||||||
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
|
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
|
||||||
} else {
|
} else {
|
||||||
rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, value[:])
|
valBytes := value.Bytes32()
|
||||||
|
rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, valBytes[:])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -666,6 +666,41 @@ func testSync(t *testing.T, scheme string) {
|
||||||
t.Fatalf("sync failed: %v", err)
|
t.Fatalf("sync failed: %v", err)
|
||||||
}
|
}
|
||||||
verifyTrie(scheme, syncer.db, sourceAccountTrie.Hash(), t)
|
verifyTrie(scheme, syncer.db, sourceAccountTrie.Hash(), t)
|
||||||
|
verifyAdoptedSyncedState(scheme, syncer.db, sourceAccountTrie.Hash(), elems, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verifyAdoptedSyncedState exercises the snap/2 completion contract end-to-end:
|
||||||
|
// after a real sync, opening a fresh triedb and calling AdoptSyncedState must
|
||||||
|
// (a) succeed and (b) leave flat-state reads serving immediately, with no
|
||||||
|
// background regeneration gating them.
|
||||||
|
func verifyAdoptedSyncedState(scheme string, db ethdb.KeyValueStore, root common.Hash, elems []*kv, t *testing.T) {
|
||||||
|
t.Helper()
|
||||||
|
if scheme != rawdb.PathScheme {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tdb := triedb.NewDatabase(rawdb.NewDatabase(db), newDbConfig(scheme))
|
||||||
|
defer tdb.Close()
|
||||||
|
|
||||||
|
if err := tdb.AdoptSyncedState(root); err != nil {
|
||||||
|
t.Fatalf("AdoptSyncedState failed: %v", err)
|
||||||
|
}
|
||||||
|
// Read one of the synced accounts via the public flat-state API. If this
|
||||||
|
// returned errNotCoveredYet we'd know AdoptSyncedState left a generator
|
||||||
|
// gating reads, exactly the bug we're trying to prevent.
|
||||||
|
sr, err := tdb.StateReader(root)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("StateReader: %v", err)
|
||||||
|
}
|
||||||
|
if len(elems) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
acc, err := sr.Account(common.BytesToHash(elems[0].k))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("flat-state read failed after AdoptSyncedState: %v", err)
|
||||||
|
}
|
||||||
|
if acc == nil {
|
||||||
|
t.Fatal("flat-state read returned nil account; sync did not populate the snapshot namespace")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSyncTinyTriePanic tests a basic sync with one peer, and a tiny trie. This caused a
|
// TestSyncTinyTriePanic tests a basic sync with one peer, and a tiny trie. This caused a
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue