diff --git a/core/state/sync.go b/core/state/sync.go index 6938a636dd..01fab8e303 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -27,7 +27,7 @@ import ( ) // NewStateSync create a new state trie download scheduler. -func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync { +func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync { // Register the storage slot callback if the external callback is specified. var onSlot func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error if onLeaf != nil { @@ -52,6 +52,6 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.S syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), hexpath, parent) return nil } - syncer = trie.NewSync(root, database, onAccount, bloom) + syncer = trie.NewSync(root, database, onAccount) return syncer } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index e4a039f92d..a5172e9065 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -26,7 +26,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/ethdb" - "github.com/XinFinOrg/XDPoSChain/ethdb/memorydb" "github.com/XinFinOrg/XDPoSChain/rlp" "github.com/XinFinOrg/XDPoSChain/trie" ) @@ -133,7 +132,7 @@ func checkStateConsistency(db ethdb.Database, root common.Hash) error { // Tests that an empty state is not scheduled for syncing. func TestEmptyStateSync(t *testing.T) { - sync := NewStateSync(types.EmptyRootHash, rawdb.NewMemoryDatabase(), trie.NewSyncBloom(1, memorydb.New()), nil) + sync := NewStateSync(types.EmptyRootHash, rawdb.NewMemoryDatabase(), nil) if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 { t.Errorf(" content requested for empty state: %v, %v, %v", nodes, paths, codes) } @@ -170,7 +169,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) nodes, paths, codes := sched.Missing(count) var ( @@ -249,7 +248,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) nodes, _, codes := sched.Missing(0) queue := append(append([]common.Hash{}, nodes...), codes...) @@ -297,7 +296,7 @@ func testIterativeRandomStateSync(t *testing.T, count int) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(count) @@ -347,7 +346,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(0) @@ -414,7 +413,7 @@ func TestIncompleteStateSync(t *testing.T) { // Create a destination state and sync with the scheduler dstDb := rawdb.NewMemoryDatabase() - sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil) + sched := NewStateSync(srcRoot, dstDb, nil) var added []common.Hash diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 24d51f5c1b..f6857283ee 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -26,7 +26,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/ethdb" - "github.com/XinFinOrg/XDPoSChain/ethdb/memorydb" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/trie" "golang.org/x/crypto/sha3" @@ -294,7 +293,7 @@ type codeTask struct { func newStateSync(d *Downloader, root common.Hash) *stateSync { return &stateSync{ d: d, - sched: state.NewStateSync(root, d.stateDB, trie.NewSyncBloom(1, memorydb.New()), nil), + sched: state.NewStateSync(root, d.stateDB, nil), keccak: sha3.NewLegacyKeccak256(), trieTasks: make(map[common.Hash]*trieTask), codeTasks: make(map[common.Hash]*codeTask), diff --git a/trie/sync.go b/trie/sync.go index afe40970db..80c69f9cce 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -129,11 +129,10 @@ type Sync struct { codeReqs map[common.Hash]*request // Pending requests pertaining to a code hash queue *prque.Prque[int64, any] // Priority queue with the pending requests fetches map[int]int // Number of active fetches per trie node depth - bloom *SyncBloom // Bloom filter for fast state existence checks } // NewSync creates a new trie data download scheduler. -func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, bloom *SyncBloom) *Sync { +func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback) *Sync { ts := &Sync{ database: database, membatch: newSyncMemBatch(), @@ -141,7 +140,6 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb codeReqs: make(map[common.Hash]*request), queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy fetches: make(map[int]int), - bloom: bloom, } ts.AddSubTrie(root, nil, common.Hash{}, callback) return ts @@ -156,16 +154,11 @@ func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, cal if s.membatch.hasNode(root) { return } - if s.bloom == nil || s.bloom.Contains(root[:]) { - // Bloom filter says this might be a duplicate, double check. - // If database says yes, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - blob := rawdb.ReadTrieNode(s.database, root) - if len(blob) > 0 { - return - } - // False positive, bump fault meter - bloomFaultMeter.Mark(1) + // If database says this is a duplicate, then at least the trie node is + // present, and we hold the assumption that it's NOT legacy contract code. + blob := rawdb.ReadTrieNode(s.database, root) + if len(blob) > 0 { + return } // Assemble the new sub-trie sync request req := &request{ @@ -196,18 +189,13 @@ func (s *Sync) AddCodeEntry(hash common.Hash, path []byte, parent common.Hash) { if s.membatch.hasCode(hash) { return } - if s.bloom == nil || s.bloom.Contains(hash[:]) { - // Bloom filter says this might be a duplicate, double check. - // If database says yes, the blob is present for sure. - // Note we only check the existence with new code scheme, fast - // sync is expected to run with a fresh new node. Even there - // exists the code with legacy format, fetch and store with - // new scheme anyway. - if blob := rawdb.ReadCodeWithPrefix(s.database, hash); len(blob) > 0 { - return - } - // False positive, bump fault meter - bloomFaultMeter.Mark(1) + // If database says duplicate, the blob is present for sure. + // Note we only check the existence with new code scheme, fast + // sync is expected to run with a fresh new node. Even there + // exists the code with legacy format, fetch and store with + // new scheme anyway. + if blob := rawdb.ReadCodeWithPrefix(s.database, hash); len(blob) > 0 { + return } // Assemble the new sub-trie sync request req := &request{ @@ -314,15 +302,9 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { // Dump the membatch into a database dbw for key, value := range s.membatch.nodes { rawdb.WriteTrieNode(dbw, key, value) - if s.bloom != nil { - s.bloom.Add(key[:]) - } } for key, value := range s.membatch.codes { rawdb.WriteCode(dbw, key, value) - if s.bloom != nil { - s.bloom.Add(key[:]) - } } // Drop the membatch data and return s.membatch = newSyncMemBatch() @@ -418,15 +400,10 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { if s.membatch.hasNode(hash) { continue } - if s.bloom == nil || s.bloom.Contains(node) { - // Bloom filter says this might be a duplicate, double check. - // If database says yes, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - if blob := rawdb.ReadTrieNode(s.database, common.BytesToHash(node)); len(blob) > 0 { - continue - } - // False positive, bump fault meter - bloomFaultMeter.Mark(1) + // If database says duplicate, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + if blob := rawdb.ReadTrieNode(s.database, hash); len(blob) > 0 { + continue } // Locally unknown Node, schedule for retrieval requests = append(requests, &request{ diff --git a/trie/sync_bloom.go b/trie/sync_bloom.go deleted file mode 100644 index 7901d53d5b..0000000000 --- a/trie/sync_bloom.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package trie - -import ( - "encoding/binary" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/XinFinOrg/XDPoSChain/common" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" - "github.com/XinFinOrg/XDPoSChain/ethdb" - "github.com/XinFinOrg/XDPoSChain/log" - "github.com/XinFinOrg/XDPoSChain/metrics" - bloomfilter "github.com/holiman/bloomfilter/v2" -) - -var ( - bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil) - bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil) - bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil) - bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil) - bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil) - bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil) -) - -// SyncBloom is a bloom filter used during fast sync to quickly decide if a trie -// node or contract code already exists on disk or not. It self populates from the -// provided disk database on creation in a background thread and will only start -// returning live results once that's finished. -type SyncBloom struct { - bloom *bloomfilter.Filter - inited uint32 - closer sync.Once - closed uint32 - pend sync.WaitGroup - closeCh chan struct{} -} - -// NewSyncBloom creates a new bloom filter of the given size (in megabytes) and -// initializes it from the database. The bloom is hard coded to use 3 filters. -func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom { - // Create the bloom filter to track known trie nodes - bloom, err := bloomfilter.New(memory*1024*1024*8, 4) - if err != nil { - panic(fmt.Sprintf("failed to create bloom: %v", err)) - } - log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024)) - - // Assemble the fast sync bloom and init it from previous sessions - b := &SyncBloom{ - bloom: bloom, - closeCh: make(chan struct{}), - } - b.pend.Add(2) - go func() { - defer b.pend.Done() - b.init(database) - }() - go func() { - defer b.pend.Done() - b.meter() - }() - return b -} - -// init iterates over the database, pushing every trie hash into the bloom filter. -func (b *SyncBloom) init(database ethdb.Iteratee) { - // Iterate over the database, but restart every now and again to avoid holding - // a persistent snapshot since fast sync can push a ton of data concurrently, - // bloating the disk. - // - // Note, this is fine, because everything inserted into leveldb by fast sync is - // also pushed into the bloom directly, so we're not missing anything when the - // iterator is swapped out for a new one. - it := database.NewIterator(nil, nil) - - var ( - start = time.Now() - swap = time.Now() - ) - for it.Next() && atomic.LoadUint32(&b.closed) == 0 { - // If the database entry is a trie node, add it to the bloom - key := it.Key() - if len(key) == common.HashLength { - b.bloom.AddHash(binary.BigEndian.Uint64(key)) - bloomLoadMeter.Mark(1) - } else if ok, hash := rawdb.IsCodeKey(key); ok { - // If the database entry is a contract code, add it to the bloom - b.bloom.AddHash(binary.BigEndian.Uint64(hash)) - bloomLoadMeter.Mark(1) - } - // If enough time elapsed since the last iterator swap, restart - if time.Since(swap) > 8*time.Second { - key := common.CopyBytes(it.Key()) - - it.Release() - it = database.NewIterator(nil, key) - - log.Info("Initializing state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start))) - swap = time.Now() - } - } - it.Release() - - // Mark the bloom filter inited and return - log.Info("Initialized state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start))) - atomic.StoreUint32(&b.inited, 1) -} - -// meter periodically recalculates the false positive error rate of the bloom -// filter and reports it in a metric. -func (b *SyncBloom) meter() { - // check every second - tick := time.NewTicker(1 * time.Second) - defer tick.Stop() - - for { - select { - case <-tick.C: - // Report the current error ration. No floats, lame, scale it up. - bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000)) - case <-b.closeCh: - return - } - } -} - -// Close terminates any background initializer still running and releases all the -// memory allocated for the bloom. -func (b *SyncBloom) Close() error { - b.closer.Do(func() { - // Ensure the initializer is stopped - atomic.StoreUint32(&b.closed, 1) - close(b.closeCh) - b.pend.Wait() - - // Wipe the bloom, but mark it "uninited" just in case someone attempts an access - log.Info("Deallocated state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability()) - - atomic.StoreUint32(&b.inited, 0) - b.bloom = nil - }) - return nil -} - -// Add inserts a new trie node hash into the bloom filter. -func (b *SyncBloom) Add(hash []byte) { - if atomic.LoadUint32(&b.closed) == 1 { - return - } - b.bloom.AddHash(binary.BigEndian.Uint64(hash)) - bloomAddMeter.Mark(1) -} - -// Contains tests if the bloom filter contains the given hash: -// - false: the bloom definitely does not contain hash -// - true: the bloom maybe contains hash -// -// While the bloom is being initialized, any query will return true. -func (b *SyncBloom) Contains(hash []byte) bool { - bloomTestMeter.Mark(1) - if atomic.LoadUint32(&b.inited) == 0 { - // We didn't load all the trie nodes from the previous run of Geth yet. As - // such, we can't say for sure if a hash is not present for anything. Until - // the init is done, we're faking "possible presence" for everything. - return true - } - // Bloom initialized, check the real one and report any successful misses - maybe := b.bloom.ContainsHash(binary.BigEndian.Uint64(hash)) - if !maybe { - bloomMissMeter.Mark(1) - } - return maybe -} diff --git a/trie/sync_test.go b/trie/sync_test.go index a3832c3937..374dd9cffd 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -96,7 +96,7 @@ func TestEmptySync(t *testing.T) { emptyB, _ := New(types.EmptyRootHash, dbB) for i, trie := range []*Trie{emptyA, emptyB} { - sync := NewSync(trie.Hash(), memorydb.New(), nil, NewSyncBloom(1, memorydb.New())) + sync := NewSync(trie.Hash(), memorydb.New(), nil) if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 { t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, nodes, paths, codes) } @@ -117,7 +117,7 @@ func testIterativeSync(t *testing.T, count int, bypath bool) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, paths, codes := sched.Missing(count) var ( @@ -178,7 +178,7 @@ func TestIterativeDelayedSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, _, codes := sched.Missing(10000) queue := append(append([]common.Hash{}, nodes...), codes...) @@ -224,7 +224,7 @@ func testIterativeRandomSync(t *testing.T, count int) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(count) @@ -272,7 +272,7 @@ func TestIterativeRandomDelayedSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := make(map[common.Hash]struct{}) nodes, _, codes := sched.Missing(10000) @@ -325,7 +325,7 @@ func TestDuplicateAvoidanceSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, _, codes := sched.Missing(0) queue := append(append([]common.Hash{}, nodes...), codes...) @@ -372,7 +372,7 @@ func TestIncompleteSync(t *testing.T) { // Create a destination trie and sync with the scheduler diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) var added []common.Hash @@ -432,7 +432,7 @@ func TestSyncOrdering(t *testing.T) { // Create a destination trie and sync with the scheduler, tracking the requests diskdb := memorydb.New() triedb := NewDatabase(diskdb) - sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb)) + sched := NewSync(srcTrie.Hash(), diskdb, nil) nodes, paths, _ := sched.Missing(1) queue := append([]common.Hash{}, nodes...)