From 17903fedf0374be940f1e75260a8eacbbef62d0a Mon Sep 17 00:00:00 2001 From: Delweng Date: Tue, 15 Jul 2025 11:45:20 +0800 Subject: [PATCH] triedb/pathdb: introduce file-based state journal (#32060) Introduce file-based state journal in path database, fixing the Pebble restriction when the journal size exceeds 4GB. --------- Signed-off-by: jsvisa Co-authored-by: Gary Rong --- cmd/utils/flags.go | 6 ++ core/blockchain.go | 10 +-- core/rawdb/accessors_state.go | 8 --- eth/backend.go | 6 +- triedb/pathdb/database.go | 22 ++++++- triedb/pathdb/database_test.go | 52 +++++++++++----- triedb/pathdb/fileutils_unix.go | 57 +++++++++++++++++ triedb/pathdb/fileutils_windows.go | 25 ++++++++ triedb/pathdb/history_reader_test.go | 2 +- triedb/pathdb/journal.go | 93 +++++++++++++++++++++++++--- 10 files changed, 240 insertions(+), 41 deletions(-) create mode 100644 triedb/pathdb/fileutils_unix.go create mode 100644 triedb/pathdb/fileutils_windows.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d3e842149a..b86970651f 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2198,6 +2198,12 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh StateHistory: ctx.Uint64(StateHistoryFlag.Name), // Disable transaction indexing/unindexing. TxLookupLimit: -1, + + // Enables file journaling for the trie database. The journal files will be stored + // within the data directory. The corresponding paths will be either: + // - DATADIR/triedb/merkle.journal + // - DATADIR/triedb/verkle.journal + TrieJournalDirectory: stack.ResolvePath("triedb"), } if options.ArchiveMode && !options.Preimages { options.Preimages = true diff --git a/core/blockchain.go b/core/blockchain.go index 2290b6d3cd..d52990ec5a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -162,10 +162,11 @@ const ( // BlockChainConfig contains the configuration of the BlockChain object. type BlockChainConfig struct { // Trie database related options - TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory - TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk - TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk - TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed + TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed + TrieJournalDirectory string // Directory path to the journal used for persisting trie data across node restarts Preimages bool // Whether to store preimage of trie key to the disk StateHistory uint64 // Number of blocks from head whose state histories are reserved. @@ -246,6 +247,7 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config { EnableStateIndexing: cfg.ArchiveMode, TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024, StateCleanSize: cfg.SnapshotLimit * 1024 * 1024, + JournalDirectory: cfg.TrieJournalDirectory, // TODO(rjl493456442): The write buffer represents the memory limit used // for flushing both trie data and state data to disk. The config name diff --git a/core/rawdb/accessors_state.go b/core/rawdb/accessors_state.go index 7d7b37641b..44f041d82e 100644 --- a/core/rawdb/accessors_state.go +++ b/core/rawdb/accessors_state.go @@ -157,14 +157,6 @@ func WriteTrieJournal(db ethdb.KeyValueWriter, journal []byte) { } } -// DeleteTrieJournal deletes the serialized in-memory trie nodes of layers saved at -// the last shutdown. -func DeleteTrieJournal(db ethdb.KeyValueWriter) { - if err := db.Delete(trieJournalKey); err != nil { - log.Crit("Failed to remove tries journal", "err", err) - } -} - // ReadStateHistoryMeta retrieves the metadata corresponding to the specified // state history. Compute the position of state history in freezer by minus // one since the id of first state history starts from one(zero for initial diff --git a/eth/backend.go b/eth/backend.go index 80ffd301ce..7616ec9d31 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -236,9 +236,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { VmConfig: vm.Config{ EnablePreimageRecording: config.EnablePreimageRecording, }, + // Enables file journaling for the trie database. The journal files will be stored + // within the data directory. The corresponding paths will be either: + // - DATADIR/triedb/merkle.journal + // - DATADIR/triedb/verkle.journal + TrieJournalDirectory: stack.ResolvePath("triedb"), } ) - if config.VMTrace != "" { traceConfig := json.RawMessage("{}") if config.VMTraceJsonConfig != "" { diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 23a7d383b5..e323a7449e 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "path/filepath" "sync" "time" @@ -120,6 +121,7 @@ type Config struct { StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer ReadOnly bool // Flag whether the database is opened in read only mode + JournalDirectory string // Absolute path of journal directory (null means the journal data is persisted in key-value store) // Testing configurations SnapshotNoBuild bool // Flag Whether the state generation is allowed @@ -156,6 +158,9 @@ func (c *Config) fields() []interface{} { } else { list = append(list, "history", fmt.Sprintf("last %d blocks", c.StateHistory)) } + if c.JournalDirectory != "" { + list = append(list, "journal-dir", c.JournalDirectory) + } return list } @@ -493,7 +498,6 @@ func (db *Database) Enable(root common.Hash) error { // Drop the stale state journal in persistent database and // reset the persistent state id back to zero. batch := db.diskdb.NewBatch() - rawdb.DeleteTrieJournal(batch) rawdb.DeleteSnapshotRoot(batch) rawdb.WritePersistentStateID(batch, 0) if err := batch.Write(); err != nil { @@ -573,8 +577,6 @@ func (db *Database) Recover(root common.Hash) error { // disk layer won't be accessible from outside. db.tree.init(dl) } - rawdb.DeleteTrieJournal(db.diskdb) - // Explicitly sync the key-value store to ensure all recent writes are // flushed to disk. This step is crucial to prevent a scenario where // recent key-value writes are lost due to an application panic, while @@ -680,6 +682,20 @@ func (db *Database) modifyAllowed() error { return nil } +// journalPath returns the absolute path of journal for persisting state data. +func (db *Database) journalPath() string { + if db.config.JournalDirectory == "" { + return "" + } + var fname string + if db.isVerkle { + fname = fmt.Sprintf("verkle.journal") + } else { + fname = fmt.Sprintf("merkle.journal") + } + return filepath.Join(db.config.JournalDirectory, fname) +} + // AccountHistory inspects the account history within the specified range. // // Start: State ID of the first history object for the query. 0 implies the first diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 2982202009..e9a1850ee0 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -21,12 +21,16 @@ import ( "errors" "fmt" "math/rand" + "os" + "path/filepath" + "strconv" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/internal/testrand" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -121,7 +125,7 @@ type tester struct { snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte // Keyed by the hash of account address and the hash of storage key } -func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool) *tester { +func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool, journalDir string) *tester { var ( disk, _ = rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{Ancient: t.TempDir()}) db = New(disk, &Config{ @@ -131,6 +135,7 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, ena StateCleanSize: 256 * 1024, WriteBufferSize: 256 * 1024, NoAsyncFlush: true, + JournalDirectory: journalDir, }, isVerkle) obj = &tester{ @@ -466,7 +471,7 @@ func TestDatabaseRollback(t *testing.T) { }() // Verify state histories - tester := newTester(t, 0, false, 32, false) + tester := newTester(t, 0, false, 32, false, "") defer tester.release() if err := tester.verifyHistory(); err != nil { @@ -500,7 +505,7 @@ func TestDatabaseRecoverable(t *testing.T) { }() var ( - tester = newTester(t, 0, false, 12, false) + tester = newTester(t, 0, false, 12, false, "") index = tester.bottomIndex() ) defer tester.release() @@ -544,7 +549,7 @@ func TestDisable(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 32, false) + tester := newTester(t, 0, false, 32, false, "") defer tester.release() stored := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)) @@ -586,7 +591,7 @@ func TestCommit(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12, false) + tester := newTester(t, 0, false, 12, false, "") defer tester.release() if err := tester.db.Commit(tester.lastHash(), false); err != nil { @@ -610,20 +615,25 @@ func TestCommit(t *testing.T) { } func TestJournal(t *testing.T) { + testJournal(t, "") + testJournal(t, filepath.Join(t.TempDir(), strconv.Itoa(rand.Intn(10000)))) +} + +func testJournal(t *testing.T, journalDir string) { // Redefine the diff layer depth allowance for faster testing. maxDiffLayers = 4 defer func() { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12, false) + tester := newTester(t, 0, false, 12, false, journalDir) defer tester.release() if err := tester.db.Journal(tester.lastHash()); err != nil { t.Errorf("Failed to journal, err: %v", err) } tester.db.Close() - tester.db = New(tester.db.diskdb, nil, false) + tester.db = New(tester.db.diskdb, tester.db.config, false) // Verify states including disk layer and all diff on top. for i := 0; i < len(tester.roots); i++ { @@ -640,13 +650,30 @@ func TestJournal(t *testing.T) { } func TestCorruptedJournal(t *testing.T) { + testCorruptedJournal(t, "", func(db ethdb.Database) { + // Mutate the journal in disk, it should be regarded as invalid + blob := rawdb.ReadTrieJournal(db) + blob[0] = 0xa + rawdb.WriteTrieJournal(db, blob) + }) + + directory := filepath.Join(t.TempDir(), strconv.Itoa(rand.Intn(10000))) + testCorruptedJournal(t, directory, func(_ ethdb.Database) { + f, _ := os.OpenFile(filepath.Join(directory, "merkle.journal"), os.O_WRONLY, 0644) + f.WriteAt([]byte{0xa}, 0) + f.Sync() + f.Close() + }) +} + +func testCorruptedJournal(t *testing.T, journalDir string, modifyFn func(database ethdb.Database)) { // Redefine the diff layer depth allowance for faster testing. maxDiffLayers = 4 defer func() { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12, false) + tester := newTester(t, 0, false, 12, false, journalDir) defer tester.release() if err := tester.db.Journal(tester.lastHash()); err != nil { @@ -655,13 +682,10 @@ func TestCorruptedJournal(t *testing.T) { tester.db.Close() root := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)) - // Mutate the journal in disk, it should be regarded as invalid - blob := rawdb.ReadTrieJournal(tester.db.diskdb) - blob[0] = 0xa - rawdb.WriteTrieJournal(tester.db.diskdb, blob) + modifyFn(tester.db.diskdb) // Verify states, all not-yet-written states should be discarded - tester.db = New(tester.db.diskdb, nil, false) + tester.db = New(tester.db.diskdb, tester.db.config, false) for i := 0; i < len(tester.roots); i++ { if tester.roots[i] == root { if err := tester.verifyState(root); err != nil { @@ -694,7 +718,7 @@ func TestTailTruncateHistory(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 10, false, 12, false) + tester := newTester(t, 10, false, 12, false, "") defer tester.release() tester.db.Close() diff --git a/triedb/pathdb/fileutils_unix.go b/triedb/pathdb/fileutils_unix.go new file mode 100644 index 0000000000..fde0bf50fa --- /dev/null +++ b/triedb/pathdb/fileutils_unix.go @@ -0,0 +1,57 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +//go:build !windows +// +build !windows + +package pathdb + +import ( + "errors" + "os" + "syscall" +) + +func isErrInvalid(err error) bool { + if errors.Is(err, os.ErrInvalid) { + return true + } + // Go >= 1.8 returns *os.PathError instead + if patherr, ok := err.(*os.PathError); ok && patherr.Err == syscall.EINVAL { + return true + } + return false +} + +func syncDir(name string) error { + // As per fsync manpage, Linux seems to expect fsync on directory, however + // some system don't support this, so we will ignore syscall.EINVAL. + // + // From fsync(2): + // Calling fsync() does not necessarily ensure that the entry in the + // directory containing the file has also reached disk. For that an + // explicit fsync() on a file descriptor for the directory is also needed. + f, err := os.Open(name) + if err != nil { + return err + } + defer f.Close() + + if err := f.Sync(); err != nil && !isErrInvalid(err) { + return err + } + return nil +} diff --git a/triedb/pathdb/fileutils_windows.go b/triedb/pathdb/fileutils_windows.go new file mode 100644 index 0000000000..e4c644d757 --- /dev/null +++ b/triedb/pathdb/fileutils_windows.go @@ -0,0 +1,25 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +//go:build windows +// +build windows + +package pathdb + +func syncDir(name string) error { + // On Windows, fsync on directories is not supported + return nil +} diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go index 4356490f23..4eb93fb9c9 100644 --- a/triedb/pathdb/history_reader_test.go +++ b/triedb/pathdb/history_reader_test.go @@ -126,7 +126,7 @@ func testHistoryReader(t *testing.T, historyLimit uint64) { }() //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) - env := newTester(t, historyLimit, false, 64, true) + env := newTester(t, historyLimit, false, 64, true, "") defer env.release() waitIndexing(env.db) diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index e88b3e062f..4639932763 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "os" "time" "github.com/ethereum/go-ethereum/common" @@ -31,6 +32,8 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const tempJournalSuffix = ".tmp" + var ( errMissJournal = errors.New("journal not found") errMissVersion = errors.New("version not found") @@ -51,11 +54,25 @@ const journalVersion uint64 = 3 // loadJournal tries to parse the layer journal from the disk. func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) { - journal := rawdb.ReadTrieJournal(db.diskdb) - if len(journal) == 0 { - return nil, errMissJournal + var reader io.Reader + if path := db.journalPath(); path != "" && common.FileExist(path) { + // If a journal file is specified, read it from there + log.Info("Load database journal from file", "path", path) + f, err := os.OpenFile(path, os.O_RDONLY, 0644) + if err != nil { + return nil, fmt.Errorf("failed to read journal file %s: %w", path, err) + } + defer f.Close() + reader = f + } else { + log.Info("Load database journal from disk") + journal := rawdb.ReadTrieJournal(db.diskdb) + if len(journal) == 0 { + return nil, errMissJournal + } + reader = bytes.NewReader(journal) } - r := rlp.NewStream(bytes.NewReader(journal), 0) + r := rlp.NewStream(reader, 0) // Firstly, resolve the first element as the journal version version, err := r.Uint64() @@ -297,9 +314,9 @@ func (db *Database) Journal(root common.Hash) error { } disk := db.tree.bottom() if l, ok := l.(*diffLayer); ok { - log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) + log.Info("Persisting dirty state", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) - log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) + log.Info("Persisting dirty state", "root", root, "layers", disk.buffer.layers) } // Block until the background flushing is finished and terminate // the potential active state generator. @@ -316,8 +333,37 @@ func (db *Database) Journal(root common.Hash) error { if db.readOnly { return errDatabaseReadOnly } + + // Store the journal into the database and return + var ( + file *os.File + journal io.Writer + journalPath = db.journalPath() + ) + if journalPath != "" { + // Write into a temp file first + err := os.MkdirAll(db.config.JournalDirectory, 0755) + if err != nil { + return err + } + tmp := journalPath + tempJournalSuffix + file, err = os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to open journal file %s: %w", tmp, err) + } + defer func() { + if file != nil { + file.Close() + os.Remove(tmp) // Clean up temp file if we didn't successfully rename it + log.Warn("Removed leftover temporary journal file", "path", tmp) + } + }() + journal = file + } else { + journal = new(bytes.Buffer) + } + // Firstly write out the metadata of journal - journal := new(bytes.Buffer) if err := rlp.Encode(journal, journalVersion); err != nil { return err } @@ -334,11 +380,38 @@ func (db *Database) Journal(root common.Hash) error { if err := l.journal(journal); err != nil { return err } - // Store the journal into the database and return - rawdb.WriteTrieJournal(db.diskdb, journal.Bytes()) + // Store the journal into the database and return + if file == nil { + data := journal.(*bytes.Buffer) + size := data.Len() + rawdb.WriteTrieJournal(db.diskdb, data.Bytes()) + log.Info("Persisted dirty state to disk", "size", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + } else { + stat, err := file.Stat() + if err != nil { + return err + } + size := int(stat.Size()) + + // Close the temporary file and atomically rename it + if err := file.Sync(); err != nil { + return fmt.Errorf("failed to fsync the journal, %v", err) + } + if err := file.Close(); err != nil { + return fmt.Errorf("failed to close the journal: %v", err) + } + // Replace the live journal with the newly generated one + if err := os.Rename(journalPath+tempJournalSuffix, journalPath); err != nil { + return fmt.Errorf("failed to rename the journal: %v", err) + } + if err := syncDir(db.config.JournalDirectory); err != nil { + return fmt.Errorf("failed to fsync the dir: %v", err) + } + file = nil + log.Info("Persisted dirty state to file", "path", journalPath, "size", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + } // Set the db in read only mode to reject all following mutations db.readOnly = true - log.Info("Persisted dirty state to disk", "size", common.StorageSize(journal.Len()), "elapsed", common.PrettyDuration(time.Since(start))) return nil }