From a826f0c6f950896ddbd4ce8659e0f6141954b1d5 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 18 Jun 2026 12:13:58 +0800 Subject: [PATCH] trie: fallback to sequential mode if collapse is possible --- trie/trie.go | 51 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/trie/trie.go b/trie/trie.go index 7e69a90823..4041fe4e78 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -480,6 +480,11 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error } } +// parallelUpdateThreshold is the minimum number of entries a batch must contain +// before UpdateBatch processes them concurrently; smaller batches are applied +// sequentially to avoid the goroutine overhead. +const parallelUpdateThreshold = 4 + // UpdateBatch updates a batch of entries concurrently. func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error { // Short circuit if the trie is already committed and unusable. @@ -492,28 +497,38 @@ func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error { // Insert the entries sequentially if there are not too many // trie nodes in the trie. fn, ok := t.root.(*fullNode) - if !ok || len(keys) < 4 { // TODO(rjl493456442) the parallelism threshold should be twisted - for i, key := range keys { - err := t.Update(key, values[i]) - if err != nil { - return err - } - } - return nil + if !ok || len(keys) < parallelUpdateThreshold { + return t.updateSequential(keys, values) } + // Group the entries by the first nibble of the (hex) key, so that each group + // can be applied to a distinct child of the root independently. var ( - ikeys = make(map[byte][][]byte) - ivals = make(map[byte][][]byte) - eg errgroup.Group + ikeys = make(map[byte][][]byte) + ivals = make(map[byte][][]byte) + deleted [17]bool // child positions receiving at least one deletion + eg errgroup.Group ) for i, key := range keys { hkey := keybytesToHex(key) ikeys[hkey[0]] = append(ikeys[hkey[0]], hkey) ivals[hkey[0]] = append(ivals[hkey[0]], values[i]) + if len(values[i]) == 0 { + deleted[hkey[0]] = true + } } - if len(keys) > 0 { - fn.flags = t.newFlag() + // If the root may be collapsed after applying the updates, fallback + // to the sequential mode to eliminate the additional complexity. + survivors := 0 + for i, child := range &fn.Children { + if child != nil && !deleted[i] { + survivors++ + } } + if survivors < 2 { + return t.updateSequential(keys, values) + } + // Execute the trie updates in concurrent mode + fn.flags = t.newFlag() for pos, ks := range ikeys { eg.Go(func() error { vs := ivals[pos] @@ -543,6 +558,16 @@ func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error { return nil } +// updateSequential applies the given entries to the trie one by one. +func (t *Trie) updateSequential(keys [][]byte, values [][]byte) error { + for i, key := range keys { + if err := t.Update(key, values[i]); err != nil { + return err + } + } + return nil +} + // MustDelete is a wrapper of Delete and will omit any encountered error but // just print out an error message. func (t *Trie) MustDelete(key []byte) {