1
0
Fork 0
forked from forks/go-ethereum

core, ethdb: introduce database sync function (#31703)

This pull request introduces a SyncKeyValue function to the
ethdb.KeyValueStore
interface, providing the ability to forcibly flush all previous writes
to disk.

This functionality is critical for go-ethereum, which internally uses
two independent
database engines: a key-value store (such as Pebble, LevelDB, or
memoryDB for
testing) and a flat-file–based freezer. To ensure write-order
consistency between
these engines, the key-value store must be explicitly synced before
writing to the
freezer and vice versa.

Fixes 
- https://github.com/ethereum/go-ethereum/issues/31405
- https://github.com/ethereum/go-ethereum/issues/29819
This commit is contained in:
rjl493456442 2025-05-08 19:10:26 +08:00 committed by GitHub
parent 07d073bc5a
commit 10519768a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 194 additions and 60 deletions

View file

@ -183,7 +183,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
if !disk {
db = rawdb.NewMemoryDatabase()
} else {
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false, true)
pdb, err := pebble.New(b.TempDir(), 128, 128, "", false)
if err != nil {
b.Fatalf("cannot create temporary database: %v", err)
}
@ -303,7 +303,7 @@ func makeChainForBench(db ethdb.Database, genesis *Genesis, full bool, count uin
func benchWriteChain(b *testing.B, full bool, count uint64) {
genesis := &Genesis{Config: params.AllEthashProtocolChanges}
for i := 0; i < b.N; i++ {
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false, true)
pdb, err := pebble.New(b.TempDir(), 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
@ -316,7 +316,7 @@ func benchWriteChain(b *testing.B, full bool, count uint64) {
func benchReadChain(b *testing.B, full bool, count uint64) {
dir := b.TempDir()
pdb, err := pebble.New(dir, 1024, 128, "", false, true)
pdb, err := pebble.New(dir, 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}
@ -332,7 +332,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
pdb, err = pebble.New(dir, 1024, 128, "", false, true)
pdb, err = pebble.New(dir, 1024, 128, "", false)
if err != nil {
b.Fatalf("error opening database: %v", err)
}

View file

@ -979,17 +979,16 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
// The chain segment, such as the block header, canonical hash,
// body, and receipt, will be removed from the ancient store
// in one go.
//
// The hash-to-number mapping in the key-value store will be
// removed by the hc.SetHead function.
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
// Remove the associated body and receipts from the key-value store.
// The header, hash-to-number mapping, and canonical hash will be
// removed by the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
@ -1361,7 +1360,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += writeSize
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
if err := bc.db.SyncAncient(); err != nil {
return 0, err
}
// Write hash to number mappings
@ -2627,7 +2626,8 @@ func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, e
if err != nil {
return 0, err
}
if err := bc.db.Sync(); err != nil {
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.SyncAncient(); err != nil {
return 0, err
}
// Write hash to number mappings

View file

@ -1765,7 +1765,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")
pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
@ -1850,7 +1850,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
chain.stopWithoutSaving()
// Start a new blockchain back up and see where the repair leads us
pdb, err = pebble.New(datadir, 0, 0, "", false, true)
pdb, err = pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent key-value database: %v", err)
}
@ -1915,7 +1915,7 @@ func testIssue23496(t *testing.T, scheme string) {
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")
pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
@ -1973,7 +1973,7 @@ func testIssue23496(t *testing.T, scheme string) {
chain.stopWithoutSaving()
// Start a new blockchain back up and see where the repair leads us
pdb, err = pebble.New(datadir, 0, 0, "", false, true)
pdb, err = pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to reopen persistent key-value database: %v", err)
}

View file

@ -1969,7 +1969,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")
pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}

View file

@ -66,7 +66,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
datadir := t.TempDir()
ancient := filepath.Join(datadir, "ancient")
pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}
@ -257,7 +257,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) {
chain.triedb.Close()
// Start a new blockchain back up and see where the repair leads us
pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false, true)
pdb, err := pebble.New(snaptest.datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}

View file

@ -2492,7 +2492,7 @@ func testSideImportPrunedBlocks(t *testing.T, scheme string) {
datadir := t.TempDir()
ancient := path.Join(datadir, "ancient")
pdb, err := pebble.New(datadir, 0, 0, "", false, true)
pdb, err := pebble.New(datadir, 0, 0, "", false)
if err != nil {
t.Fatalf("Failed to create persistent key-value database: %v", err)
}

View file

@ -591,17 +591,50 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
hashes = append(hashes, hdr.Hash())
}
for _, hash := range hashes {
// Remove the associated block body and receipts if required.
//
// If the block is in the chain freezer, then this delete operation
// is actually ineffective.
if delFn != nil {
delFn(batch, hash, num)
}
// Remove the hash->number mapping along with the header itself
rawdb.DeleteHeader(batch, hash, num)
}
// Remove the number->hash mapping
rawdb.DeleteCanonicalHash(batch, num)
}
}
// Flush all accumulated deletions.
if err := batch.Write(); err != nil {
log.Crit("Failed to rewind block", "error", err)
log.Crit("Failed to commit batch in setHead", "err", err)
}
// Explicitly flush the pending writes in the key-value store to disk, ensuring
// data durability of the previous deletions.
if err := hc.chainDb.SyncKeyValue(); err != nil {
log.Crit("Failed to sync the key-value store in setHead", "err", err)
}
// Truncate the excessive chain segments in the ancient store.
// These are actually deferred deletions from the loop above.
//
// This step must be performed after synchronizing the key-value store;
// otherwise, in the event of a panic, it's theoretically possible to
// lose recent key-value store writes while the ancient store deletions
// remain, leading to data inconsistency, e.g., the gap between the key
// value store and ancient can be created due to unclean shutdown.
if delFn != nil {
// Ignore the error here since light client won't hit this path
frozen, _ := hc.chainDb.Ancients()
header := hc.CurrentHeader()
// Truncate the excessive chain segment above the current chain head
// in the ancient store.
if header.Number.Uint64()+1 < frozen {
_, err := hc.chainDb.TruncateHead(header.Number.Uint64() + 1)
if err != nil {
log.Crit("Failed to truncate head block", "err", err)
}
}
}
// Clear out any stale content from the caches
hc.headerCache.Purge()

View file

@ -205,7 +205,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
continue
}
// Batch of blocks have been frozen, flush them before wiping from key-value store
if err := f.Sync(); err != nil {
if err := f.SyncAncient(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}
// Wipe out all data from the active database

View file

@ -131,8 +131,8 @@ func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) {
return 0, errNotSupported
}
// Sync returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Sync() error {
// SyncAncient returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) SyncAncient() error {
return errNotSupported
}

View file

@ -325,8 +325,8 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
return old, nil
}
// Sync flushes all data tables to disk.
func (f *Freezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *Freezer) SyncAncient() error {
var errs []error
for _, table := range f.tables {
if err := table.Sync(); err != nil {

View file

@ -395,8 +395,8 @@ func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) {
return old, nil
}
// Sync flushes all data tables to disk.
func (f *MemoryFreezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *MemoryFreezer) SyncAncient() error {
return nil
}

View file

@ -194,12 +194,12 @@ func (f *resettableFreezer) TruncateTail(tail uint64) (uint64, error) {
return f.freezer.TruncateTail(tail)
}
// Sync flushes all data tables to disk.
func (f *resettableFreezer) Sync() error {
// SyncAncient flushes all data tables to disk.
func (f *resettableFreezer) SyncAncient() error {
f.lock.RLock()
defer f.lock.RUnlock()
return f.freezer.Sync()
return f.freezer.SyncAncient()
}
// AncientDatadir returns the path of the ancient store.

View file

@ -392,7 +392,7 @@ func TestFreezerCloseSync(t *testing.T) {
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := f.Sync(); err == nil {
if err := f.SyncAncient(); err == nil {
t.Fatalf("want error, have nil")
} else if have, want := err.Error(), "[closed closed]"; have != want {
t.Fatalf("want %v, have %v", have, want)

View file

@ -107,10 +107,10 @@ func (t *table) TruncateTail(items uint64) (uint64, error) {
return t.db.TruncateTail(items)
}
// Sync is a noop passthrough that just forwards the request to the underlying
// SyncAncient is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) Sync() error {
return t.db.Sync()
func (t *table) SyncAncient() error {
return t.db.SyncAncient()
}
// AncientDatadir returns the ancient datadir of the underlying database.
@ -188,6 +188,12 @@ func (t *table) Compact(start []byte, limit []byte) error {
return t.db.Compact(start, limit)
}
// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
func (t *table) SyncKeyValue() error {
return t.db.SyncKeyValue()
}
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called, each operation prefixing all keys with the
// pre-configured string.

View file

@ -57,6 +57,13 @@ type KeyValueStater interface {
Stat() (string, error)
}
// KeyValueSyncer wraps the SyncKeyValue method of a backing data store.
type KeyValueSyncer interface {
// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
SyncKeyValue() error
}
// Compacter wraps the Compact method of a backing data store.
type Compacter interface {
// Compact flattens the underlying data store for the given key range. In essence,
@ -75,6 +82,7 @@ type KeyValueStore interface {
KeyValueReader
KeyValueWriter
KeyValueStater
KeyValueSyncer
KeyValueRangeDeleter
Batcher
Iteratee
@ -126,6 +134,9 @@ type AncientWriter interface {
// The integer return value is the total size of the written data.
ModifyAncients(func(AncientWriteOp) error) (int64, error)
// SyncAncient flushes all in-memory ancient store data to disk.
SyncAncient() error
// TruncateHead discards all but the first n ancient data from the ancient store.
// After the truncation, the latest item can be accessed it item_n-1(start from 0).
TruncateHead(n uint64) (uint64, error)
@ -138,9 +149,6 @@ type AncientWriter interface {
//
// Note that data marked as non-prunable will still be retained and remain accessible.
TruncateTail(n uint64) (uint64, error)
// Sync flushes all in-memory ancient store data to disk.
Sync() error
}
// AncientWriteOp is given to the function argument of ModifyAncients.

View file

@ -324,6 +324,22 @@ func (db *Database) Path() string {
return db.fn
}
// SyncKeyValue flushes all pending writes in the write-ahead-log to disk,
// ensuring data durability up to that point.
func (db *Database) SyncKeyValue() error {
// In theory, the WAL (Write-Ahead Log) can be explicitly synchronized using
// a write operation with SYNC=true. However, there is no dedicated key reserved
// for this purpose, and even a nil key (key=nil) is considered a valid
// database entry.
//
// In LevelDB, writes are blocked until the data is written to the WAL, meaning
// recent writes won't be lost unless a power failure or system crash occurs.
// Additionally, LevelDB is no longer the default database engine and is likely
// only used by hash-mode archive nodes. Given this, the durability guarantees
// without explicit sync are acceptable in the context of LevelDB.
return nil
}
// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
func (db *Database) meter(refresh time.Duration, namespace string) {

View file

@ -199,6 +199,12 @@ func (db *Database) Compact(start []byte, limit []byte) error {
return nil
}
// SyncKeyValue ensures that all pending writes are flushed to disk,
// guaranteeing data durability up to the point.
func (db *Database) SyncKeyValue() error {
return nil
}
// Len returns the number of entries currently present in the memory database.
//
// Note, this method is only used for testing (i.e. not public in general) and

View file

@ -144,7 +144,7 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {
// New returns a wrapped pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) {
// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
@ -182,10 +182,18 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
memTableSize = maxMemTableSize - 1
}
db := &Database{
fn: file,
log: logger,
quitChan: make(chan chan error),
writeOptions: &pebble.WriteOptions{Sync: !ephemeral},
fn: file,
log: logger,
quitChan: make(chan chan error),
// Use asynchronous write mode by default. Otherwise, the overhead of frequent fsync
// operations can be significant, especially on platforms with slow fsync performance
// (e.g., macOS) or less capable SSDs.
//
// Note that enabling async writes means recent data may be lost in the event of an
// application-level panic (writes will also be lost on a machine-level failure,
// of course). Geth is expected to handle recovery from an unclean shutdown.
writeOptions: pebble.NoSync,
}
opt := &pebble.Options{
// Pebble has a single combined cache area and the write
@ -228,6 +236,15 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
WriteStallEnd: db.onWriteStallEnd,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble
// Pebble is configured to use asynchronous write mode, meaning write operations
// return as soon as the data is cached in memory, without waiting for the WAL
// to be written. This mode offers better write performance but risks losing
// recent writes if the application crashes or a power failure/system crash occurs.
//
// By setting the WALBytesPerSync, the cached WAL writes will be periodically
// flushed at the background if the accumulated size exceeds this threshold.
WALBytesPerSync: 5 * ethdb.IdealBatchSize,
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
@ -414,6 +431,18 @@ func (d *Database) Path() string {
return d.fn
}
// SyncKeyValue flushes all pending writes in the write-ahead-log to disk,
// ensuring data durability up to that point.
func (d *Database) SyncKeyValue() error {
// The entry (value=nil) is not written to the database; it is only
// added to the WAL. Writing this special log entry in sync mode
// automatically flushes all previous writes, ensuring database
// durability up to this point.
b := d.db.NewBatch()
b.LogData(nil, nil)
return d.db.Apply(b, pebble.Sync)
}
// meter periodically retrieves internal pebble counters and reports them to
// the metrics subsystem.
func (d *Database) meter(refresh time.Duration, namespace string) {

View file

@ -17,6 +17,7 @@
package pebble
import (
"errors"
"testing"
"github.com/cockroachdb/pebble"
@ -54,3 +55,26 @@ func BenchmarkPebbleDB(b *testing.B) {
}
})
}
func TestPebbleLogData(t *testing.T) {
db, err := pebble.Open("", &pebble.Options{
FS: vfs.NewMem(),
})
if err != nil {
t.Fatal(err)
}
_, _, err = db.Get(nil)
if !errors.Is(err, pebble.ErrNotFound) {
t.Fatal("Unknown database entry")
}
b := db.NewBatch()
b.LogData(nil, nil)
db.Apply(b, pebble.Sync)
_, _, err = db.Get(nil)
if !errors.Is(err, pebble.ErrNotFound) {
t.Fatal("Unknown database entry")
}
}

View file

@ -110,7 +110,7 @@ func (db *Database) TruncateTail(n uint64) (uint64, error) {
panic("not supported")
}
func (db *Database) Sync() error {
func (db *Database) SyncAncient() error {
return nil
}
@ -138,6 +138,10 @@ func (db *Database) Compact(start []byte, limit []byte) error {
return nil
}
func (db *Database) SyncKeyValue() error {
return nil
}
func (db *Database) Close() error {
db.remote.Close()
return nil

View file

@ -36,11 +36,6 @@ type openOptions struct {
Cache int // the capacity(in megabytes) of the data caching
Handles int // number of files to be open simultaneously
ReadOnly bool
// Ephemeral means that filesystem sync operations should be avoided:
// data integrity in the face of a crash is not important. This option
// should typically be used in tests.
Ephemeral bool
}
// openDatabase opens both a disk-based key-value database such as leveldb or pebble, but also
@ -83,7 +78,7 @@ func openKeyValueDatabase(o openOptions) (ethdb.Database, error) {
}
if o.Type == rawdb.DBPebble || existingDb == rawdb.DBPebble {
log.Info("Using pebble as the backing database")
return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly)
}
if o.Type == rawdb.DBLeveldb || existingDb == rawdb.DBLeveldb {
log.Info("Using leveldb as the backing database")
@ -91,7 +86,7 @@ func openKeyValueDatabase(o openOptions) (ethdb.Database, error) {
}
// No pre-existing database, no user-requested one either. Default to Pebble.
log.Info("Defaulting to pebble as the backing database")
return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return newPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly)
}
// newLevelDBDatabase creates a persistent key-value database without a freezer
@ -107,8 +102,8 @@ func newLevelDBDatabase(file string, cache int, handles int, namespace string, r
// newPebbleDBDatabase creates a persistent key-value database without a freezer
// moving immutable chain segments into cold storage.
func newPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral)
func newPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly bool) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly)
if err != nil {
return nil, err
}

View file

@ -830,6 +830,7 @@ func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBat
func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} }
func (s *spongeDb) Stat() (string, error) { panic("implement me") }
func (s *spongeDb) Compact(start []byte, limit []byte) error { panic("implement me") }
func (s *spongeDb) SyncKeyValue() error { return nil }
func (s *spongeDb) Close() error { return nil }
func (s *spongeDb) Put(key []byte, value []byte) error {
var (

View file

@ -135,10 +135,13 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
// Explicitly sync the state freezer to ensure all written data is persisted to disk
// before updating the key-value store.
//
// This step is crucial to guarantee that the corresponding state history remains
// available for state rollback.
if freezer != nil {
if err := freezer.Sync(); err != nil {
if err := freezer.SyncAncient(); err != nil {
return err
}
}

View file

@ -454,6 +454,15 @@ func (db *Database) Recover(root common.Hash) error {
db.tree.reset(dl)
}
rawdb.DeleteTrieJournal(db.diskdb)
// Explicitly sync the key-value store to ensure all recent writes are
// flushed to disk. This step is crucial to prevent a scenario where
// recent key-value writes are lost due to an application panic, while
// the associated state histories have already been removed, resulting
// in the inability to perform a state rollback.
if err := db.diskdb.SyncKeyValue(); err != nil {
return err
}
_, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID())
if err != nil {
return err