trie: fallback to sequential mode if collapse is possible

This commit is contained in:
Gary Rong 2026-06-18 12:13:58 +08:00
parent 0b673454fe
commit a826f0c6f9

View file

@ -480,6 +480,11 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error
}
}
// parallelUpdateThreshold is the minimum number of entries a batch must contain
// before UpdateBatch processes them concurrently; smaller batches are applied
// sequentially to avoid the goroutine overhead.
const parallelUpdateThreshold = 4
// UpdateBatch updates a batch of entries concurrently.
func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error {
// Short circuit if the trie is already committed and unusable.
@ -492,28 +497,38 @@ func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error {
// Insert the entries sequentially if there are not too many
// trie nodes in the trie.
fn, ok := t.root.(*fullNode)
if !ok || len(keys) < 4 { // TODO(rjl493456442) the parallelism threshold should be twisted
for i, key := range keys {
err := t.Update(key, values[i])
if err != nil {
return err
}
}
return nil
if !ok || len(keys) < parallelUpdateThreshold {
return t.updateSequential(keys, values)
}
// Group the entries by the first nibble of the (hex) key, so that each group
// can be applied to a distinct child of the root independently.
var (
ikeys = make(map[byte][][]byte)
ivals = make(map[byte][][]byte)
eg errgroup.Group
ikeys = make(map[byte][][]byte)
ivals = make(map[byte][][]byte)
deleted [17]bool // child positions receiving at least one deletion
eg errgroup.Group
)
for i, key := range keys {
hkey := keybytesToHex(key)
ikeys[hkey[0]] = append(ikeys[hkey[0]], hkey)
ivals[hkey[0]] = append(ivals[hkey[0]], values[i])
if len(values[i]) == 0 {
deleted[hkey[0]] = true
}
}
if len(keys) > 0 {
fn.flags = t.newFlag()
// If the root may be collapsed after applying the updates, fallback
// to the sequential mode to eliminate the additional complexity.
survivors := 0
for i, child := range &fn.Children {
if child != nil && !deleted[i] {
survivors++
}
}
if survivors < 2 {
return t.updateSequential(keys, values)
}
// Execute the trie updates in concurrent mode
fn.flags = t.newFlag()
for pos, ks := range ikeys {
eg.Go(func() error {
vs := ivals[pos]
@ -543,6 +558,16 @@ func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error {
return nil
}
// updateSequential applies the given entries to the trie one by one.
func (t *Trie) updateSequential(keys [][]byte, values [][]byte) error {
for i, key := range keys {
if err := t.Update(key, values[i]); err != nil {
return err
}
}
return nil
}
// MustDelete is a wrapper of Delete and will omit any encountered error but
// just print out an error message.
func (t *Trie) MustDelete(key []byte) {