diff --git a/XDCx/tradingstate/state_liquidationprice.go b/XDCx/tradingstate/state_liquidationprice.go index 184c3fbc37..fe49d372fa 100644 --- a/XDCx/tradingstate/state_liquidationprice.go +++ b/XDCx/tradingstate/state_liquidationprice.go @@ -136,7 +136,7 @@ func (l *liquidationPriceState) updateRoot(db Database) error { if l.dbErr != nil { return l.dbErr } - root, err := l.trie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := l.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList orderList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil diff --git a/XDCx/tradingstate/state_orderbook.go b/XDCx/tradingstate/state_orderbook.go index ebb6d314de..bbf16f62b4 100644 --- a/XDCx/tradingstate/state_orderbook.go +++ b/XDCx/tradingstate/state_orderbook.go @@ -245,7 +245,7 @@ func (te *tradingExchanges) CommitAsksTrie(db Database) error { if te.dbErr != nil { return te.dbErr } - root, err := te.asksTrie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := te.asksTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList orderList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil @@ -307,7 +307,7 @@ func (te *tradingExchanges) CommitBidsTrie(db Database) error { if te.dbErr != nil { return te.dbErr } - root, err := te.bidsTrie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := te.bidsTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList orderList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil @@ -783,7 +783,7 @@ func (t *tradingExchanges) CommitLiquidationPriceTrie(db Database) error { if t.dbErr != nil { return t.dbErr } - root, err := t.liquidationPriceTrie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := t.liquidationPriceTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList orderList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil diff --git a/XDCx/tradingstate/statedb.go b/XDCx/tradingstate/statedb.go index 91010e9bff..52dbeccb86 100644 --- a/XDCx/tradingstate/statedb.go +++ b/XDCx/tradingstate/statedb.go @@ -589,7 +589,7 @@ func (t *TradingStateDB) Commit() (root common.Hash, err error) { } } // Write trie changes. - root, err = t.trie.Commit(func(leaf []byte, parent common.Hash) error { + root, err = t.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var exchange tradingExchangeObject if err := rlp.DecodeBytes(leaf, &exchange); err != nil { return nil diff --git a/XDCxlending/lendingstate/state_lendingbook.go b/XDCxlending/lendingstate/state_lendingbook.go index c607d0eff9..d92631bac5 100644 --- a/XDCxlending/lendingstate/state_lendingbook.go +++ b/XDCxlending/lendingstate/state_lendingbook.go @@ -472,7 +472,7 @@ func (le *lendingExchangeState) CommitInvestingTrie(db Database) error { if le.dbErr != nil { return le.dbErr } - root, err := le.investingTrie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := le.investingTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList itemList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil @@ -493,7 +493,7 @@ func (le *lendingExchangeState) CommitBorrowingTrie(db Database) error { if le.dbErr != nil { return le.dbErr } - root, err := le.borrowingTrie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := le.borrowingTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList itemList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil @@ -514,7 +514,7 @@ func (le *lendingExchangeState) CommitLiquidationTimeTrie(db Database) error { if le.dbErr != nil { return le.dbErr } - root, err := le.liquidationTimeTrie.Commit(func(leaf []byte, parent common.Hash) error { + root, err := le.liquidationTimeTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var orderList itemList if err := rlp.DecodeBytes(leaf, &orderList); err != nil { return nil diff --git a/XDCxlending/lendingstate/statedb.go b/XDCxlending/lendingstate/statedb.go index 146cf4a8c8..0d734f32b3 100644 --- a/XDCxlending/lendingstate/statedb.go +++ b/XDCxlending/lendingstate/statedb.go @@ -578,7 +578,7 @@ func (ls *LendingStateDB) Commit() (root common.Hash, err error) { } } // Write trie changes. - root, err = ls.trie.Commit(func(leaf []byte, parent common.Hash) error { + root, err = ls.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var exchange lendingObject if err := rlp.DecodeBytes(leaf, &exchange); err != nil { return nil diff --git a/core/state/statedb.go b/core/state/statedb.go index 8661982cb3..85a3628b6f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -840,7 +840,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { // Write the account trie changes, measuing the amount of wasted time defer func(start time.Time) { s.AccountCommits += time.Since(start) }(time.Now()) - return s.trie.Commit(func(leaf []byte, parent common.Hash) error { + return s.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error { var account Account if err := rlp.DecodeBytes(leaf, &account); err != nil { return nil diff --git a/core/state/sync.go b/core/state/sync.go index 460e0094ae..8a8597eee0 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -28,13 +28,13 @@ import ( // NewStateSync create a new state trie download scheduler. func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom) *trie.Sync { var syncer *trie.Sync - callback := func(leaf []byte, parent common.Hash) error { + callback := func(path []byte, leaf []byte, parent common.Hash) error { var obj Account if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { return err } - syncer.AddSubTrie(obj.Root, 64, parent, nil) - syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), 64, parent) + syncer.AddSubTrie(obj.Root, path, parent, nil) + syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent) return nil } syncer = trie.NewSync(root, database, callback, bloom) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d207e9bca1..57b9d6d93a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1545,7 +1545,13 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { // Start syncing state of the reported head block. This should get us most of // the state of the pivot block. sync := d.syncState(latest.Root) - defer sync.Cancel() + defer func() { + // The `sync` object is replaced every time the pivot moves. We need to + // defer close the very last active one, hence the lazy evaluation vs. + // calling defer sync.Cancel() !!! + sync.Cancel() + }() + closeOnErr := func(s *stateSync) { if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled { d.queue.Close() // wake up Results @@ -1603,9 +1609,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { // If new pivot block found, cancel old state retrieval and restart if oldPivot != P { sync.Cancel() - sync = d.syncState(P.Header.Root) - defer sync.Cancel() + go closeOnErr(sync) oldPivot = P } diff --git a/trie/committer.go b/trie/committer.go index 21a2d6f19d..a64e48b2c8 100644 --- a/trie/committer.go +++ b/trie/committer.go @@ -218,13 +218,13 @@ func (c *committer) commitLoop(db *Database) { switch n := n.(type) { case *shortNode: if child, ok := n.Val.(valueNode); ok { - c.onleaf(child, hash) + c.onleaf(nil, child, hash) } case *fullNode: // For children in range [0, 15], it's impossible // to contain valuenode. Only check the 17th child. if n.Children[16] != nil { - c.onleaf(n.Children[16].(valueNode), hash) + c.onleaf(nil, n.Children[16].(valueNode), hash) } } } diff --git a/trie/sync.go b/trie/sync.go index 9c14c7f72b..a1ac09dfa9 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -35,14 +35,19 @@ var ErrNotRequested = errors.New("not requested") // Node it already processed previously. var ErrAlreadyProcessed = errors.New("already processed") +// maxFetchesPerDepth is the maximum number of pending trie nodes per depth. The +// role of this value is to limit the number of trie nodes that get expanded in +// memory if the node was configured with a significant number of peers. +const maxFetchesPerDepth = 16384 + // request represents a scheduled or already in-flight state retrieval request. type request struct { + path []byte // Merkle path leading to this node for prioritization hash common.Hash // Hash of the Node data content to retrieve data []byte // Data content of the Node, cached until all subtrees complete code bool // Whether this is a code entry parents []*request // Parent state nodes referencing this entry (notify all upon completion) - depth int // Depth level within the trie the Node is located to prioritise DFS deps int // Number of dependencies before allowed to commit this Node callback LeafCallback // Callback to invoke if a leaf Node it reached on this branch @@ -90,6 +95,7 @@ type Sync struct { nodeReqs map[common.Hash]*request // Pending requests pertaining to a trie node hash codeReqs map[common.Hash]*request // Pending requests pertaining to a code hash queue *prque.Prque[int64, any] // Priority queue with the pending requests + fetches map[int]int // Number of active fetches per trie node depth bloom *SyncBloom // Bloom filter for fast state existence checks } @@ -101,14 +107,15 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb nodeReqs: make(map[common.Hash]*request), codeReqs: make(map[common.Hash]*request), queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy + fetches: make(map[int]int), bloom: bloom, } - ts.AddSubTrie(root, 0, common.Hash{}, callback) + ts.AddSubTrie(root, nil, common.Hash{}, callback) return ts } // AddSubTrie registers a new trie to the sync code, rooted at the designated parent. -func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) { +func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, callback LeafCallback) { // Short circuit if the trie is empty or already known if root == types.EmptyRootHash { return @@ -129,8 +136,8 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb } // Assemble the new sub-trie sync request req := &request{ + path: path, hash: root, - depth: depth, callback: callback, } // If this sub-trie has a designated parent, link them together @@ -148,7 +155,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb // AddCodeEntry schedules the direct retrieval of a contract code that should not // be interpreted as a trie node, but rather accepted and stored into the database // as is. -func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) { +func (s *Sync) AddCodeEntry(hash common.Hash, path []byte, parent common.Hash) { // Short circuit if the entry is empty or already known if hash == types.EmptyCodeHash { return @@ -171,9 +178,9 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) { } // Assemble the new sub-trie sync request req := &request{ - hash: hash, - code: true, - depth: depth, + path: path, + hash: hash, + code: true, } // If this sub-trie has a designated parent, link them together if parent != (common.Hash{}) { @@ -191,7 +198,18 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) { func (s *Sync) Missing(max int) []common.Hash { var requests []common.Hash for !s.queue.Empty() && (max == 0 || len(requests) < max) { - requests = append(requests, s.queue.PopItem().(common.Hash)) + // Retrieve th enext item in line + item, prio := s.queue.Peek() + + // If we have too many already-pending tasks for this depth, throttle + depth := int(prio >> 56) + if s.fetches[depth] > maxFetchesPerDepth { + break + } + // Item is allowed to be scheduled, add it to the task list + s.queue.Pop() + s.fetches[depth]++ + requests = append(requests, item.(common.Hash)) } return requests } @@ -286,7 +304,11 @@ func (s *Sync) schedule(req *request) { // is a trie node and code has same hash. In this case two elements // with same hash and same or different depth will be pushed. But it's // ok the worst case is the second response will be treated as duplicated. - s.queue.Push(req.hash, int64(req.depth)) + prio := int64(len(req.path)) << 56 // depth >= 128 will never happen, storage leaves will be included in their parents + for i := 0; i < 14 && i < len(req.path); i++ { + prio |= int64(15-req.path[i]) << (52 - i*4) // 15-nibble => lexicographic order + } + s.queue.Push(req.hash, prio) } // children retrieves all the missing children of a state trie entry for future @@ -294,23 +316,23 @@ func (s *Sync) schedule(req *request) { func (s *Sync) children(req *request, object node) ([]*request, error) { // Gather all the children of the Node, irrelevant whether known or not type child struct { - node node - depth int + path []byte + node node } var children []child switch node := (object).(type) { case *shortNode: children = []child{{ - node: node.Val, - depth: req.depth + len(node.Key), + node: node.Val, + path: append(append([]byte(nil), req.path...), node.Key...), }} case *fullNode: for i := 0; i < 17; i++ { if node.Children[i] != nil { children = append(children, child{ - node: node.Children[i], - depth: req.depth + 1, + node: node.Children[i], + path: append(append([]byte(nil), req.path...), byte(i)), }) } } @@ -323,7 +345,7 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { // Notify any external watcher of a new key/value Node if req.callback != nil { if node, ok := (child.node).(valueNode); ok { - if err := req.callback(node, req.hash); err != nil { + if err := req.callback(req.path, node, req.hash); err != nil { return nil, err } } @@ -347,9 +369,9 @@ func (s *Sync) children(req *request, object node) ([]*request, error) { } // Locally unknown Node, schedule for retrieval requests = append(requests, &request{ + path: child.path, hash: hash, parents: []*request{req}, - depth: child.depth, callback: req.callback, }) } @@ -365,9 +387,11 @@ func (s *Sync) commit(req *request) (err error) { if req.code { s.membatch.codes[req.hash] = req.data delete(s.codeReqs, req.hash) + s.fetches[len(req.path)]-- } else { s.membatch.nodes[req.hash] = req.data delete(s.nodeReqs, req.hash) + s.fetches[len(req.path)]-- } // Check all parents for completion for _, parent := range req.parents { diff --git a/trie/trie.go b/trie/trie.go index 0e49b4a7a8..695b38c70d 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -30,7 +30,7 @@ import ( // LeafCallback is a callback type invoked when a trie operation reaches a leaf // Node. It's used by state sync and commit to allow handling external references // between account and storage tries. -type LeafCallback func(leaf []byte, parent common.Hash) error +type LeafCallback func(path []byte, leaf []byte, parent common.Hash) error // Trie is a Merkle Patricia Trie. // The zero value is an empty trie with no database.