ethdb: Implement DeleteRange in batch (#31947)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Docker Image (push) Waiting to run

implement #31945

---------

Co-authored-by: prpeh <prpeh@proton.me>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
Ha DANG 2025-06-20 18:40:41 +07:00 committed by GitHub
parent f26b5653e8
commit 846d13a31a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 591 additions and 20 deletions

View file

@ -126,6 +126,11 @@ func (t *table) Delete(key []byte) error {
// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
func (t *table) DeleteRange(start, end []byte) error {
// The nilness will be lost by adding the prefix, explicitly converting it
// to a special flag representing the end of key range.
if end == nil {
end = ethdb.MaximumKey
}
return t.db.DeleteRange(append([]byte(t.prefix), start...), append([]byte(t.prefix), end...))
}
@ -217,6 +222,16 @@ func (b *tableBatch) Delete(key []byte) error {
return b.batch.Delete(append([]byte(b.prefix), key...))
}
// DeleteRange removes all keys in the range [start, end) from the batch for later committing.
func (b *tableBatch) DeleteRange(start, end []byte) error {
// The nilness will be lost by adding the prefix, explicitly converting it
// to a special flag representing the end of key range.
if end == nil {
end = ethdb.MaximumKey
}
return b.batch.DeleteRange(append([]byte(b.prefix), start...), append([]byte(b.prefix), end...))
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *tableBatch) ValueSize() int {
return b.batch.ValueSize()

View file

@ -125,4 +125,28 @@ func testTableDatabase(t *testing.T, prefix string) {
// Test iterators with prefix and start point
check(db.NewIterator([]byte{0xee}, nil), 0, 0)
check(db.NewIterator(nil, []byte{0x00}), 6, 0)
// Test range deletion
db.DeleteRange(nil, nil)
for _, entry := range entries {
_, err := db.Get(entry.key)
if err == nil {
t.Fatal("Unexpected item after deletion")
}
}
// Test range deletion by batch
batch = db.NewBatch()
for _, entry := range entries {
batch.Put(entry.key, entry.value)
}
batch.Write()
batch.Reset()
batch.DeleteRange(nil, nil)
batch.Write()
for _, entry := range entries {
_, err := db.Get(entry.key)
if err == nil {
t.Fatal("Unexpected item after deletion")
}
}
}

View file

@ -24,6 +24,7 @@ const IdealBatchSize = 100 * 1024
// when Write is called. A batch cannot be used concurrently.
type Batch interface {
KeyValueWriter
KeyValueRangeDeleter
// ValueSize retrieves the amount of data queued up for writing.
ValueSize() int
@ -53,8 +54,9 @@ type Batcher interface {
type HookedBatch struct {
Batch
OnPut func(key []byte, value []byte) // Callback if a key is inserted
OnDelete func(key []byte) // Callback if a key is deleted
OnPut func(key []byte, value []byte) // Callback if a key is inserted
OnDelete func(key []byte) // Callback if a key is deleted
OnDeleteRange func(start, end []byte) // Callback if a range of keys is deleted
}
// Put inserts the given value into the key-value data store.
@ -72,3 +74,11 @@ func (b HookedBatch) Delete(key []byte) error {
}
return b.Batch.Delete(key)
}
// DeleteRange removes all keys in the range [start, end) from the key-value data store.
func (b HookedBatch) DeleteRange(start, end []byte) error {
if b.OnDeleteRange != nil {
b.OnDeleteRange(start, end)
}
return b.Batch.DeleteRange(start, end)
}

View file

@ -18,10 +18,23 @@
package ethdb
import (
"bytes"
"errors"
"io"
)
var (
// MaximumKey is a special marker representing the largest possible key
// in the database.
//
// All prefixed database entries will be smaller than this marker.
// For trie nodes in hash mode, we use a 32-byte slice filled with 0xFF
// because there may be shared prefixes starting with multiple 0xFF bytes.
// Using 32 bytes ensures that only a hash collision could potentially
// match or exceed it.
MaximumKey = bytes.Repeat([]byte{0xff}, 32)
)
// KeyValueReader wraps the Has and Get method of a backing data store.
type KeyValueReader interface {
// Has retrieves if a key is present in the key-value data store.
@ -46,6 +59,11 @@ var ErrTooManyKeys = errors.New("too many keys in deleted range")
type KeyValueRangeDeleter interface {
// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
//
// A nil start is treated as a key before all keys in the data store; a nil
// end is treated as a key after all keys in the data store. If both is nil
// then the entire data store will be purged.
//
// Some implementations of DeleteRange may return ErrTooManyKeys after
// partially deleting entries in the given range.
DeleteRange(start, end []byte) error

View file

@ -401,6 +401,308 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
db.DeleteRange([]byte(""), []byte("a"))
checkRange(1, 999, false)
addRange(1, 999)
db.DeleteRange(nil, nil)
checkRange(1, 999, false)
})
t.Run("BatchDeleteRange", func(t *testing.T) {
db := New()
defer db.Close()
// Helper to add keys
addKeys := func(start, stop int) {
for i := start; i <= stop; i++ {
if err := db.Put([]byte(strconv.Itoa(i)), []byte("val-"+strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}
}
// Helper to check if keys exist
checkKeys := func(start, stop int, shouldExist bool) {
for i := start; i <= stop; i++ {
key := []byte(strconv.Itoa(i))
has, err := db.Has(key)
if err != nil {
t.Fatal(err)
}
if has != shouldExist {
if shouldExist {
t.Fatalf("key %s should exist but doesn't", key)
} else {
t.Fatalf("key %s shouldn't exist but does", key)
}
}
}
}
// Test 1: Basic range deletion in batch
addKeys(1, 10)
checkKeys(1, 10, true)
batch := db.NewBatch()
if err := batch.DeleteRange([]byte("3"), []byte("8")); err != nil {
t.Fatal(err)
}
// Keys shouldn't be deleted until Write is called
checkKeys(1, 10, true)
if err := batch.Write(); err != nil {
t.Fatal(err)
}
// After Write, keys in range should be deleted
// Range is [start, end) - inclusive of start, exclusive of end
checkKeys(1, 2, true) // These should still exist
checkKeys(3, 7, false) // These should be deleted (3 to 7 inclusive)
checkKeys(8, 10, true) // These should still exist (8 is the end boundary, exclusive)
// Test 2: Delete range with special markers
addKeys(3, 7)
batch = db.NewBatch()
if err := batch.DeleteRange(nil, nil); err != nil {
t.Fatal(err)
}
if err := batch.Write(); err != nil {
t.Fatal(err)
}
checkKeys(1, 10, false)
// Test 3: Mix Put, Delete, and DeleteRange in a batch
// Reset database for next test by adding back deleted keys
addKeys(1, 10)
checkKeys(1, 10, true)
// Create a new batch with multiple operations
batch = db.NewBatch()
if err := batch.Put([]byte("5"), []byte("new-val-5")); err != nil {
t.Fatal(err)
}
if err := batch.Delete([]byte("9")); err != nil {
t.Fatal(err)
}
if err := batch.DeleteRange([]byte("1"), []byte("3")); err != nil {
t.Fatal(err)
}
if err := batch.Write(); err != nil {
t.Fatal(err)
}
// Check results after batch operations
// Keys 1-2 should be deleted by DeleteRange
checkKeys(1, 2, false)
// Key 3 should exist (exclusive of end)
has, err := db.Has([]byte("3"))
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatalf("key 3 should exist after DeleteRange(1,3)")
}
// Key 5 should have a new value
val, err := db.Get([]byte("5"))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(val, []byte("new-val-5")) {
t.Fatalf("key 5 has wrong value: got %s, want %s", val, "new-val-5")
}
// Key 9 should be deleted
has, err = db.Has([]byte("9"))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key 9 should be deleted")
}
// Test 4: Reset batch
batch.Reset()
// Individual deletes work better with both string and numeric comparisons
if err := batch.Delete([]byte("8")); err != nil {
t.Fatal(err)
}
if err := batch.Delete([]byte("10")); err != nil {
t.Fatal(err)
}
if err := batch.Delete([]byte("11")); err != nil {
t.Fatal(err)
}
if err := batch.Write(); err != nil {
t.Fatal(err)
}
// Key 8 should be deleted
has, err = db.Has([]byte("8"))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key 8 should be deleted")
}
// Keys 3-7 should still exist
checkKeys(3, 7, true)
// Key 10 should be deleted
has, err = db.Has([]byte("10"))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key 10 should be deleted")
}
// Test 5: Empty range
batch = db.NewBatch()
if err := batch.DeleteRange([]byte("100"), []byte("100")); err != nil {
t.Fatal(err)
}
if err := batch.Write(); err != nil {
t.Fatal(err)
}
// No existing keys should be affected
checkKeys(3, 7, true)
// Test 6: Test entire keyspace deletion
// First clear any existing keys
for i := 1; i <= 100; i++ {
db.Delete([]byte(strconv.Itoa(i)))
}
// Then add some fresh test keys
addKeys(50, 60)
// Verify keys exist before deletion
checkKeys(50, 60, true)
batch = db.NewBatch()
if err := batch.DeleteRange([]byte(""), []byte("z")); err != nil {
t.Fatal(err)
}
if err := batch.Write(); err != nil {
t.Fatal(err)
}
// All keys should be deleted
checkKeys(50, 60, false)
// Test 7: overlapping range deletion
addKeys(50, 60)
batch = db.NewBatch()
if err := batch.DeleteRange([]byte("50"), []byte("55")); err != nil {
t.Fatal(err)
}
if err := batch.DeleteRange([]byte("52"), []byte("58")); err != nil {
t.Fatal(err)
}
if err := batch.Write(); err != nil {
t.Fatal(err)
}
checkKeys(50, 57, false)
checkKeys(58, 60, true)
})
t.Run("BatchReplayWithDeleteRange", func(t *testing.T) {
db := New()
defer db.Close()
// Setup some initial data
for i := 1; i <= 10; i++ {
if err := db.Put([]byte(strconv.Itoa(i)), []byte("val-"+strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}
// Create batch with multiple operations including DeleteRange
batch1 := db.NewBatch()
batch1.Put([]byte("new-key-1"), []byte("new-val-1"))
batch1.DeleteRange([]byte("3"), []byte("7")) // Should delete keys 3-6 but not 7
batch1.Delete([]byte("8"))
batch1.Put([]byte("new-key-2"), []byte("new-val-2"))
// Create a second batch to replay into
batch2 := db.NewBatch()
if err := batch1.Replay(batch2); err != nil {
t.Fatal(err)
}
// Write the second batch
if err := batch2.Write(); err != nil {
t.Fatal(err)
}
// Verify results
// Original keys 3-6 should be deleted (inclusive of start, exclusive of end)
for i := 3; i <= 6; i++ {
has, err := db.Has([]byte(strconv.Itoa(i)))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key %d should be deleted", i)
}
}
// Key 7 should NOT be deleted (exclusive of end)
has, err := db.Has([]byte("7"))
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatalf("key 7 should NOT be deleted (exclusive of end)")
}
// Key 8 should be deleted
has, err = db.Has([]byte("8"))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key 8 should be deleted")
}
// New keys should be added
for _, key := range []string{"new-key-1", "new-key-2"} {
has, err := db.Has([]byte(key))
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatalf("key %s should exist", key)
}
}
// Create a third batch for direct replay to database
batch3 := db.NewBatch()
batch3.DeleteRange([]byte("1"), []byte("3")) // Should delete keys 1-2 but not 3
// Replay directly to the database
if err := batch3.Replay(db); err != nil {
t.Fatal(err)
}
// Verify keys 1-2 are now deleted
for i := 1; i <= 2; i++ {
has, err := db.Has([]byte(strconv.Itoa(i)))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key %d should be deleted after direct replay", i)
}
}
// Verify key 3 is NOT deleted (since it's exclusive of end)
has, err = db.Has([]byte("3"))
if err != nil {
t.Fatal(err)
}
if has {
t.Fatalf("key 3 should still be deleted from previous operation")
}
})
}
@ -520,6 +822,81 @@ func BenchDatabaseSuite(b *testing.B, New func() ethdb.KeyValueStore) {
benchDeleteRange(b, 10000)
})
})
b.Run("BatchDeleteRange", func(b *testing.B) {
benchBatchDeleteRange := func(b *testing.B, count int) {
db := New()
defer db.Close()
// Prepare data
for i := 0; i < count; i++ {
db.Put([]byte(strconv.Itoa(i)), nil)
}
b.ResetTimer()
b.ReportAllocs()
// Create batch and delete range
batch := db.NewBatch()
batch.DeleteRange([]byte("0"), []byte("999999999"))
batch.Write()
}
b.Run("BatchDeleteRange100", func(b *testing.B) {
benchBatchDeleteRange(b, 100)
})
b.Run("BatchDeleteRange1k", func(b *testing.B) {
benchBatchDeleteRange(b, 1000)
})
b.Run("BatchDeleteRange10k", func(b *testing.B) {
benchBatchDeleteRange(b, 10000)
})
})
b.Run("BatchMixedOps", func(b *testing.B) {
benchBatchMixedOps := func(b *testing.B, count int) {
db := New()
defer db.Close()
// Prepare initial data
for i := 0; i < count; i++ {
db.Put([]byte(strconv.Itoa(i)), []byte("val"))
}
b.ResetTimer()
b.ReportAllocs()
// Create batch with mixed operations
batch := db.NewBatch()
// Add some new keys
for i := 0; i < count/10; i++ {
batch.Put([]byte(strconv.Itoa(count+i)), []byte("new-val"))
}
// Delete some individual keys
for i := 0; i < count/20; i++ {
batch.Delete([]byte(strconv.Itoa(i * 2)))
}
// Delete range of keys
rangeStart := count / 2
rangeEnd := count * 3 / 4
batch.DeleteRange([]byte(strconv.Itoa(rangeStart)), []byte(strconv.Itoa(rangeEnd)))
// Write the batch
batch.Write()
}
b.Run("BatchMixedOps100", func(b *testing.B) {
benchBatchMixedOps(b, 100)
})
b.Run("BatchMixedOps1k", func(b *testing.B) {
benchBatchMixedOps(b, 1000)
})
b.Run("BatchMixedOps10k", func(b *testing.B) {
benchBatchMixedOps(b, 10000)
})
})
}
func iterateKeys(it ethdb.Iterator) []string {

View file

@ -220,7 +220,7 @@ func (db *Database) DeleteRange(start, end []byte) error {
defer it.Release()
var count int
for it.Next() && bytes.Compare(end, it.Key()) > 0 {
for it.Next() && (end == nil || bytes.Compare(end, it.Key()) > 0) {
count++
if count > 10000 { // should not block for more than a second
if err := batch.Write(); err != nil {
@ -461,6 +461,38 @@ func (b *batch) Delete(key []byte) error {
return nil
}
// DeleteRange removes all keys in the range [start, end) from the batch for
// later committing, inclusive on start, exclusive on end.
//
// Note that this is a fallback implementation as leveldb does not natively
// support range deletion in batches. It iterates through the database to find
// keys in the range and adds them to the batch for deletion.
func (b *batch) DeleteRange(start, end []byte) error {
// Create an iterator to scan through the keys in the range
slice := &util.Range{
Start: start, // If nil, it represents the key before all keys
Limit: end, // If nil, it represents the key after all keys
}
it := b.db.NewIterator(slice, nil)
defer it.Release()
var count int
for it.Next() {
count++
key := it.Key()
if count > 10000 { // should not block for more than a second
return ethdb.ErrTooManyKeys
}
// Add this key to the batch for deletion
b.b.Delete(key)
b.size += len(key)
}
if err := it.Error(); err != nil {
return err
}
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
@ -506,6 +538,20 @@ func (r *replayer) Delete(key []byte) {
r.failure = r.writer.Delete(key)
}
// DeleteRange removes all keys in the range [start, end) from the key-value data store.
func (r *replayer) DeleteRange(start, end []byte) {
// If the replay already failed, stop executing ops
if r.failure != nil {
return
}
// Check if the writer also supports range deletion
if rangeDeleter, ok := r.writer.(ethdb.KeyValueRangeDeleter); ok {
r.failure = rangeDeleter.DeleteRange(start, end)
} else {
r.failure = fmt.Errorf("ethdb.KeyValueWriter does not implement DeleteRange")
}
}
// bytesPrefixRange returns key range that satisfy
// - the given prefix, and
// - the given seek position

View file

@ -18,7 +18,9 @@
package memorydb
import (
"bytes"
"errors"
"fmt"
"sort"
"strings"
"sync"
@ -122,18 +124,24 @@ func (db *Database) Delete(key []byte) error {
}
// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
// (inclusive on start, exclusive on end). If the start is nil, it represents
// the key before all keys; if the end is nil, it represents the key after
// all keys.
func (db *Database) DeleteRange(start, end []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
if db.db == nil {
return errMemorydbClosed
}
for key := range db.db {
if key >= string(start) && key < string(end) {
delete(db.db, key)
if start != nil && key < string(start) {
continue
}
if end != nil && key >= string(end) {
continue
}
delete(db.db, key)
}
return nil
}
@ -222,6 +230,9 @@ type keyvalue struct {
key string
value []byte
delete bool
rangeFrom []byte
rangeTo []byte
}
// batch is a write-only memory batch that commits changes to its host
@ -234,18 +245,29 @@ type batch struct {
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.writes = append(b.writes, keyvalue{string(key), common.CopyBytes(value), false})
b.writes = append(b.writes, keyvalue{key: string(key), value: common.CopyBytes(value)})
b.size += len(key) + len(value)
return nil
}
// Delete inserts the key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.writes = append(b.writes, keyvalue{string(key), nil, true})
b.writes = append(b.writes, keyvalue{key: string(key), delete: true})
b.size += len(key)
return nil
}
// DeleteRange removes all keys in the range [start, end) from the batch for later committing.
func (b *batch) DeleteRange(start, end []byte) error {
b.writes = append(b.writes, keyvalue{
rangeFrom: bytes.Clone(start),
rangeTo: bytes.Clone(end),
delete: true,
})
b.size += len(start) + len(end)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
@ -259,12 +281,26 @@ func (b *batch) Write() error {
if b.db.db == nil {
return errMemorydbClosed
}
for _, keyvalue := range b.writes {
if keyvalue.delete {
delete(b.db.db, keyvalue.key)
for _, entry := range b.writes {
if entry.delete {
if entry.key != "" {
// Single key deletion
delete(b.db.db, entry.key)
} else {
// Range deletion (inclusive of start, exclusive of end)
for key := range b.db.db {
if entry.rangeFrom != nil && key < string(entry.rangeFrom) {
continue
}
if entry.rangeTo != nil && key >= string(entry.rangeTo) {
continue
}
delete(b.db.db, key)
}
}
continue
}
b.db.db[keyvalue.key] = keyvalue.value
b.db.db[entry.key] = entry.value
}
return nil
}
@ -277,14 +313,26 @@ func (b *batch) Reset() {
// Replay replays the batch contents.
func (b *batch) Replay(w ethdb.KeyValueWriter) error {
for _, keyvalue := range b.writes {
if keyvalue.delete {
if err := w.Delete([]byte(keyvalue.key)); err != nil {
return err
for _, entry := range b.writes {
if entry.delete {
if entry.key != "" {
// Single key deletion
if err := w.Delete([]byte(entry.key)); err != nil {
return err
}
} else {
// Range deletion
if rangeDeleter, ok := w.(ethdb.KeyValueRangeDeleter); ok {
if err := rangeDeleter.DeleteRange(entry.rangeFrom, entry.rangeTo); err != nil {
return err
}
} else {
return fmt.Errorf("ethdb.KeyValueWriter does not implement DeleteRange")
}
}
continue
}
if err := w.Put([]byte(keyvalue.key), keyvalue.value); err != nil {
if err := w.Put([]byte(entry.key), entry.value); err != nil {
return err
}
}

View file

@ -18,7 +18,6 @@
package pebble
import (
"bytes"
"fmt"
"runtime"
"strings"
@ -417,9 +416,16 @@ func (d *Database) Delete(key []byte) error {
func (d *Database) DeleteRange(start, end []byte) error {
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
return pebble.ErrClosed
}
// There is no special flag to represent the end of key range
// in pebble(nil in leveldb). Use an ugly hack to construct a
// large key to represent it.
if end == nil {
end = ethdb.MaximumKey
}
return d.db.DeleteRange(start, end, d.writeOptions)
}
@ -478,7 +484,7 @@ func (d *Database) Compact(start []byte, limit []byte) error {
// 0xff-s, so 32 ensures than only a hash collision could touch it.
// https://github.com/cockroachdb/pebble/issues/2359#issuecomment-1443995833
if limit == nil {
limit = bytes.Repeat([]byte{0xff}, 32)
limit = ethdb.MaximumKey
}
return d.db.Compact(start, limit, true) // Parallelization is preferred
}
@ -633,6 +639,23 @@ func (b *batch) Delete(key []byte) error {
return nil
}
// DeleteRange removes all keys in the range [start, end) from the batch for
// later committing, inclusive on start, exclusive on end.
func (b *batch) DeleteRange(start, end []byte) error {
// There is no special flag to represent the end of key range
// in pebble(nil in leveldb). Use an ugly hack to construct a
// large key to represent it.
if end == nil {
end = ethdb.MaximumKey
}
if err := b.b.DeleteRange(start, end, nil); err != nil {
return err
}
// Approximate size impact - just the keys
b.size += len(start) + len(end)
return nil
}
// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
@ -672,6 +695,15 @@ func (b *batch) Replay(w ethdb.KeyValueWriter) error {
if err = w.Delete(k); err != nil {
return err
}
} else if kind == pebble.InternalKeyKindRangeDelete {
// For range deletion, k is the start key and v is the end key
if rangeDeleter, ok := w.(ethdb.KeyValueRangeDeleter); ok {
if err = rangeDeleter.DeleteRange(k, v); err != nil {
return err
}
} else {
return fmt.Errorf("ethdb.KeyValueWriter does not implement DeleteRange")
}
} else {
return fmt.Errorf("unhandled operation, keytype: %v", kind)
}

View file

@ -876,6 +876,7 @@ func (b *spongeBatch) Put(key, value []byte) error {
return nil
}
func (b *spongeBatch) Delete(key []byte) error { panic("implement me") }
func (b *spongeBatch) DeleteRange(start, end []byte) error { panic("implement me") }
func (b *spongeBatch) ValueSize() int { return 100 }
func (b *spongeBatch) Write() error { return nil }
func (b *spongeBatch) Reset() {}