core/state, eth, trie: stabilize memory use, fix memory leak #21491 (#1040)

This commit is contained in:
Daniel Liu 2025-05-20 15:14:47 +08:00 committed by GitHub
parent b94b29b8d0
commit 6395c15280
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 66 additions and 37 deletions

View file

@ -136,7 +136,7 @@ func (l *liquidationPriceState) updateRoot(db Database) error {
if l.dbErr != nil {
return l.dbErr
}
root, err := l.trie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := l.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList orderList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil

View file

@ -245,7 +245,7 @@ func (te *tradingExchanges) CommitAsksTrie(db Database) error {
if te.dbErr != nil {
return te.dbErr
}
root, err := te.asksTrie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := te.asksTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList orderList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil
@ -307,7 +307,7 @@ func (te *tradingExchanges) CommitBidsTrie(db Database) error {
if te.dbErr != nil {
return te.dbErr
}
root, err := te.bidsTrie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := te.bidsTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList orderList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil
@ -783,7 +783,7 @@ func (t *tradingExchanges) CommitLiquidationPriceTrie(db Database) error {
if t.dbErr != nil {
return t.dbErr
}
root, err := t.liquidationPriceTrie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := t.liquidationPriceTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList orderList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil

View file

@ -589,7 +589,7 @@ func (t *TradingStateDB) Commit() (root common.Hash, err error) {
}
}
// Write trie changes.
root, err = t.trie.Commit(func(leaf []byte, parent common.Hash) error {
root, err = t.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var exchange tradingExchangeObject
if err := rlp.DecodeBytes(leaf, &exchange); err != nil {
return nil

View file

@ -472,7 +472,7 @@ func (le *lendingExchangeState) CommitInvestingTrie(db Database) error {
if le.dbErr != nil {
return le.dbErr
}
root, err := le.investingTrie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := le.investingTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList itemList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil
@ -493,7 +493,7 @@ func (le *lendingExchangeState) CommitBorrowingTrie(db Database) error {
if le.dbErr != nil {
return le.dbErr
}
root, err := le.borrowingTrie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := le.borrowingTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList itemList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil
@ -514,7 +514,7 @@ func (le *lendingExchangeState) CommitLiquidationTimeTrie(db Database) error {
if le.dbErr != nil {
return le.dbErr
}
root, err := le.liquidationTimeTrie.Commit(func(leaf []byte, parent common.Hash) error {
root, err := le.liquidationTimeTrie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var orderList itemList
if err := rlp.DecodeBytes(leaf, &orderList); err != nil {
return nil

View file

@ -578,7 +578,7 @@ func (ls *LendingStateDB) Commit() (root common.Hash, err error) {
}
}
// Write trie changes.
root, err = ls.trie.Commit(func(leaf []byte, parent common.Hash) error {
root, err = ls.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var exchange lendingObject
if err := rlp.DecodeBytes(leaf, &exchange); err != nil {
return nil

View file

@ -840,7 +840,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
// Write the account trie changes, measuing the amount of wasted time
defer func(start time.Time) { s.AccountCommits += time.Since(start) }(time.Now())
return s.trie.Commit(func(leaf []byte, parent common.Hash) error {
return s.trie.Commit(func(path []byte, leaf []byte, parent common.Hash) error {
var account Account
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil

View file

@ -28,13 +28,13 @@ import (
// NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom) *trie.Sync {
var syncer *trie.Sync
callback := func(leaf []byte, parent common.Hash) error {
callback := func(path []byte, leaf []byte, parent common.Hash) error {
var obj Account
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
syncer.AddSubTrie(obj.Root, 64, parent, nil)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), 64, parent)
syncer.AddSubTrie(obj.Root, path, parent, nil)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent)
return nil
}
syncer = trie.NewSync(root, database, callback, bloom)

View file

@ -1545,7 +1545,13 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
sync := d.syncState(latest.Root)
defer sync.Cancel()
defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
// calling defer sync.Cancel() !!!
sync.Cancel()
}()
closeOnErr := func(s *stateSync) {
if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results
@ -1603,9 +1609,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// If new pivot block found, cancel old state retrieval and restart
if oldPivot != P {
sync.Cancel()
sync = d.syncState(P.Header.Root)
defer sync.Cancel()
go closeOnErr(sync)
oldPivot = P
}

View file

@ -218,13 +218,13 @@ func (c *committer) commitLoop(db *Database) {
switch n := n.(type) {
case *shortNode:
if child, ok := n.Val.(valueNode); ok {
c.onleaf(child, hash)
c.onleaf(nil, child, hash)
}
case *fullNode:
// For children in range [0, 15], it's impossible
// to contain valuenode. Only check the 17th child.
if n.Children[16] != nil {
c.onleaf(n.Children[16].(valueNode), hash)
c.onleaf(nil, n.Children[16].(valueNode), hash)
}
}
}

View file

@ -35,14 +35,19 @@ var ErrNotRequested = errors.New("not requested")
// Node it already processed previously.
var ErrAlreadyProcessed = errors.New("already processed")
// maxFetchesPerDepth is the maximum number of pending trie nodes per depth. The
// role of this value is to limit the number of trie nodes that get expanded in
// memory if the node was configured with a significant number of peers.
const maxFetchesPerDepth = 16384
// request represents a scheduled or already in-flight state retrieval request.
type request struct {
path []byte // Merkle path leading to this node for prioritization
hash common.Hash // Hash of the Node data content to retrieve
data []byte // Data content of the Node, cached until all subtrees complete
code bool // Whether this is a code entry
parents []*request // Parent state nodes referencing this entry (notify all upon completion)
depth int // Depth level within the trie the Node is located to prioritise DFS
deps int // Number of dependencies before allowed to commit this Node
callback LeafCallback // Callback to invoke if a leaf Node it reached on this branch
@ -90,6 +95,7 @@ type Sync struct {
nodeReqs map[common.Hash]*request // Pending requests pertaining to a trie node hash
codeReqs map[common.Hash]*request // Pending requests pertaining to a code hash
queue *prque.Prque[int64, any] // Priority queue with the pending requests
fetches map[int]int // Number of active fetches per trie node depth
bloom *SyncBloom // Bloom filter for fast state existence checks
}
@ -101,14 +107,15 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
nodeReqs: make(map[common.Hash]*request),
codeReqs: make(map[common.Hash]*request),
queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy
fetches: make(map[int]int),
bloom: bloom,
}
ts.AddSubTrie(root, 0, common.Hash{}, callback)
ts.AddSubTrie(root, nil, common.Hash{}, callback)
return ts
}
// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) {
func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, callback LeafCallback) {
// Short circuit if the trie is empty or already known
if root == types.EmptyRootHash {
return
@ -129,8 +136,8 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
}
// Assemble the new sub-trie sync request
req := &request{
path: path,
hash: root,
depth: depth,
callback: callback,
}
// If this sub-trie has a designated parent, link them together
@ -148,7 +155,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
// AddCodeEntry schedules the direct retrieval of a contract code that should not
// be interpreted as a trie node, but rather accepted and stored into the database
// as is.
func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
func (s *Sync) AddCodeEntry(hash common.Hash, path []byte, parent common.Hash) {
// Short circuit if the entry is empty or already known
if hash == types.EmptyCodeHash {
return
@ -171,9 +178,9 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
}
// Assemble the new sub-trie sync request
req := &request{
hash: hash,
code: true,
depth: depth,
path: path,
hash: hash,
code: true,
}
// If this sub-trie has a designated parent, link them together
if parent != (common.Hash{}) {
@ -191,7 +198,18 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
func (s *Sync) Missing(max int) []common.Hash {
var requests []common.Hash
for !s.queue.Empty() && (max == 0 || len(requests) < max) {
requests = append(requests, s.queue.PopItem().(common.Hash))
// Retrieve th enext item in line
item, prio := s.queue.Peek()
// If we have too many already-pending tasks for this depth, throttle
depth := int(prio >> 56)
if s.fetches[depth] > maxFetchesPerDepth {
break
}
// Item is allowed to be scheduled, add it to the task list
s.queue.Pop()
s.fetches[depth]++
requests = append(requests, item.(common.Hash))
}
return requests
}
@ -286,7 +304,11 @@ func (s *Sync) schedule(req *request) {
// is a trie node and code has same hash. In this case two elements
// with same hash and same or different depth will be pushed. But it's
// ok the worst case is the second response will be treated as duplicated.
s.queue.Push(req.hash, int64(req.depth))
prio := int64(len(req.path)) << 56 // depth >= 128 will never happen, storage leaves will be included in their parents
for i := 0; i < 14 && i < len(req.path); i++ {
prio |= int64(15-req.path[i]) << (52 - i*4) // 15-nibble => lexicographic order
}
s.queue.Push(req.hash, prio)
}
// children retrieves all the missing children of a state trie entry for future
@ -294,23 +316,23 @@ func (s *Sync) schedule(req *request) {
func (s *Sync) children(req *request, object node) ([]*request, error) {
// Gather all the children of the Node, irrelevant whether known or not
type child struct {
node node
depth int
path []byte
node node
}
var children []child
switch node := (object).(type) {
case *shortNode:
children = []child{{
node: node.Val,
depth: req.depth + len(node.Key),
node: node.Val,
path: append(append([]byte(nil), req.path...), node.Key...),
}}
case *fullNode:
for i := 0; i < 17; i++ {
if node.Children[i] != nil {
children = append(children, child{
node: node.Children[i],
depth: req.depth + 1,
node: node.Children[i],
path: append(append([]byte(nil), req.path...), byte(i)),
})
}
}
@ -323,7 +345,7 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
// Notify any external watcher of a new key/value Node
if req.callback != nil {
if node, ok := (child.node).(valueNode); ok {
if err := req.callback(node, req.hash); err != nil {
if err := req.callback(req.path, node, req.hash); err != nil {
return nil, err
}
}
@ -347,9 +369,9 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
}
// Locally unknown Node, schedule for retrieval
requests = append(requests, &request{
path: child.path,
hash: hash,
parents: []*request{req},
depth: child.depth,
callback: req.callback,
})
}
@ -365,9 +387,11 @@ func (s *Sync) commit(req *request) (err error) {
if req.code {
s.membatch.codes[req.hash] = req.data
delete(s.codeReqs, req.hash)
s.fetches[len(req.path)]--
} else {
s.membatch.nodes[req.hash] = req.data
delete(s.nodeReqs, req.hash)
s.fetches[len(req.path)]--
}
// Check all parents for completion
for _, parent := range req.parents {

View file

@ -30,7 +30,7 @@ import (
// LeafCallback is a callback type invoked when a trie operation reaches a leaf
// Node. It's used by state sync and commit to allow handling external references
// between account and storage tries.
type LeafCallback func(leaf []byte, parent common.Hash) error
type LeafCallback func(path []byte, leaf []byte, parent common.Hash) error
// Trie is a Merkle Patricia Trie.
// The zero value is an empty trie with no database.