triedb/pathdb: introduce file-based state journal (#32060)

Introduce file-based state journal in path database, fixing
the Pebble restriction when the journal size exceeds 4GB.

---------

Signed-off-by: jsvisa <delweng@gmail.com>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
Delweng 2025-07-15 11:45:20 +08:00 committed by GitHub
parent fe0ae06c77
commit 17903fedf0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 240 additions and 41 deletions

View file

@ -2198,6 +2198,12 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
StateHistory: ctx.Uint64(StateHistoryFlag.Name),
// Disable transaction indexing/unindexing.
TxLookupLimit: -1,
// Enables file journaling for the trie database. The journal files will be stored
// within the data directory. The corresponding paths will be either:
// - DATADIR/triedb/merkle.journal
// - DATADIR/triedb/verkle.journal
TrieJournalDirectory: stack.ResolvePath("triedb"),
}
if options.ArchiveMode && !options.Preimages {
options.Preimages = true

View file

@ -162,10 +162,11 @@ const (
// BlockChainConfig contains the configuration of the BlockChain object.
type BlockChainConfig struct {
// Trie database related options
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed
TrieJournalDirectory string // Directory path to the journal used for persisting trie data across node restarts
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
@ -246,6 +247,7 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
EnableStateIndexing: cfg.ArchiveMode,
TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024,
StateCleanSize: cfg.SnapshotLimit * 1024 * 1024,
JournalDirectory: cfg.TrieJournalDirectory,
// TODO(rjl493456442): The write buffer represents the memory limit used
// for flushing both trie data and state data to disk. The config name

View file

@ -157,14 +157,6 @@ func WriteTrieJournal(db ethdb.KeyValueWriter, journal []byte) {
}
}
// DeleteTrieJournal deletes the serialized in-memory trie nodes of layers saved at
// the last shutdown.
func DeleteTrieJournal(db ethdb.KeyValueWriter) {
if err := db.Delete(trieJournalKey); err != nil {
log.Crit("Failed to remove tries journal", "err", err)
}
}
// ReadStateHistoryMeta retrieves the metadata corresponding to the specified
// state history. Compute the position of state history in freezer by minus
// one since the id of first state history starts from one(zero for initial

View file

@ -236,9 +236,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
VmConfig: vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
},
// Enables file journaling for the trie database. The journal files will be stored
// within the data directory. The corresponding paths will be either:
// - DATADIR/triedb/merkle.journal
// - DATADIR/triedb/verkle.journal
TrieJournalDirectory: stack.ResolvePath("triedb"),
}
)
if config.VMTrace != "" {
traceConfig := json.RawMessage("{}")
if config.VMTraceJsonConfig != "" {

View file

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"path/filepath"
"sync"
"time"
@ -120,6 +121,7 @@ type Config struct {
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
ReadOnly bool // Flag whether the database is opened in read only mode
JournalDirectory string // Absolute path of journal directory (null means the journal data is persisted in key-value store)
// Testing configurations
SnapshotNoBuild bool // Flag Whether the state generation is allowed
@ -156,6 +158,9 @@ func (c *Config) fields() []interface{} {
} else {
list = append(list, "history", fmt.Sprintf("last %d blocks", c.StateHistory))
}
if c.JournalDirectory != "" {
list = append(list, "journal-dir", c.JournalDirectory)
}
return list
}
@ -493,7 +498,6 @@ func (db *Database) Enable(root common.Hash) error {
// Drop the stale state journal in persistent database and
// reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
rawdb.DeleteTrieJournal(batch)
rawdb.DeleteSnapshotRoot(batch)
rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil {
@ -573,8 +577,6 @@ func (db *Database) Recover(root common.Hash) error {
// disk layer won't be accessible from outside.
db.tree.init(dl)
}
rawdb.DeleteTrieJournal(db.diskdb)
// Explicitly sync the key-value store to ensure all recent writes are
// flushed to disk. This step is crucial to prevent a scenario where
// recent key-value writes are lost due to an application panic, while
@ -680,6 +682,20 @@ func (db *Database) modifyAllowed() error {
return nil
}
// journalPath returns the absolute path of journal for persisting state data.
func (db *Database) journalPath() string {
if db.config.JournalDirectory == "" {
return ""
}
var fname string
if db.isVerkle {
fname = fmt.Sprintf("verkle.journal")
} else {
fname = fmt.Sprintf("merkle.journal")
}
return filepath.Join(db.config.JournalDirectory, fname)
}
// AccountHistory inspects the account history within the specified range.
//
// Start: State ID of the first history object for the query. 0 implies the first

View file

@ -21,12 +21,16 @@ import (
"errors"
"fmt"
"math/rand"
"os"
"path/filepath"
"strconv"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/testrand"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
@ -121,7 +125,7 @@ type tester struct {
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte // Keyed by the hash of account address and the hash of storage key
}
func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool) *tester {
func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool, journalDir string) *tester {
var (
disk, _ = rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{Ancient: t.TempDir()})
db = New(disk, &Config{
@ -131,6 +135,7 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, ena
StateCleanSize: 256 * 1024,
WriteBufferSize: 256 * 1024,
NoAsyncFlush: true,
JournalDirectory: journalDir,
}, isVerkle)
obj = &tester{
@ -466,7 +471,7 @@ func TestDatabaseRollback(t *testing.T) {
}()
// Verify state histories
tester := newTester(t, 0, false, 32, false)
tester := newTester(t, 0, false, 32, false, "")
defer tester.release()
if err := tester.verifyHistory(); err != nil {
@ -500,7 +505,7 @@ func TestDatabaseRecoverable(t *testing.T) {
}()
var (
tester = newTester(t, 0, false, 12, false)
tester = newTester(t, 0, false, 12, false, "")
index = tester.bottomIndex()
)
defer tester.release()
@ -544,7 +549,7 @@ func TestDisable(t *testing.T) {
maxDiffLayers = 128
}()
tester := newTester(t, 0, false, 32, false)
tester := newTester(t, 0, false, 32, false, "")
defer tester.release()
stored := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil))
@ -586,7 +591,7 @@ func TestCommit(t *testing.T) {
maxDiffLayers = 128
}()
tester := newTester(t, 0, false, 12, false)
tester := newTester(t, 0, false, 12, false, "")
defer tester.release()
if err := tester.db.Commit(tester.lastHash(), false); err != nil {
@ -610,20 +615,25 @@ func TestCommit(t *testing.T) {
}
func TestJournal(t *testing.T) {
testJournal(t, "")
testJournal(t, filepath.Join(t.TempDir(), strconv.Itoa(rand.Intn(10000))))
}
func testJournal(t *testing.T, journalDir string) {
// Redefine the diff layer depth allowance for faster testing.
maxDiffLayers = 4
defer func() {
maxDiffLayers = 128
}()
tester := newTester(t, 0, false, 12, false)
tester := newTester(t, 0, false, 12, false, journalDir)
defer tester.release()
if err := tester.db.Journal(tester.lastHash()); err != nil {
t.Errorf("Failed to journal, err: %v", err)
}
tester.db.Close()
tester.db = New(tester.db.diskdb, nil, false)
tester.db = New(tester.db.diskdb, tester.db.config, false)
// Verify states including disk layer and all diff on top.
for i := 0; i < len(tester.roots); i++ {
@ -640,13 +650,30 @@ func TestJournal(t *testing.T) {
}
func TestCorruptedJournal(t *testing.T) {
testCorruptedJournal(t, "", func(db ethdb.Database) {
// Mutate the journal in disk, it should be regarded as invalid
blob := rawdb.ReadTrieJournal(db)
blob[0] = 0xa
rawdb.WriteTrieJournal(db, blob)
})
directory := filepath.Join(t.TempDir(), strconv.Itoa(rand.Intn(10000)))
testCorruptedJournal(t, directory, func(_ ethdb.Database) {
f, _ := os.OpenFile(filepath.Join(directory, "merkle.journal"), os.O_WRONLY, 0644)
f.WriteAt([]byte{0xa}, 0)
f.Sync()
f.Close()
})
}
func testCorruptedJournal(t *testing.T, journalDir string, modifyFn func(database ethdb.Database)) {
// Redefine the diff layer depth allowance for faster testing.
maxDiffLayers = 4
defer func() {
maxDiffLayers = 128
}()
tester := newTester(t, 0, false, 12, false)
tester := newTester(t, 0, false, 12, false, journalDir)
defer tester.release()
if err := tester.db.Journal(tester.lastHash()); err != nil {
@ -655,13 +682,10 @@ func TestCorruptedJournal(t *testing.T) {
tester.db.Close()
root := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil))
// Mutate the journal in disk, it should be regarded as invalid
blob := rawdb.ReadTrieJournal(tester.db.diskdb)
blob[0] = 0xa
rawdb.WriteTrieJournal(tester.db.diskdb, blob)
modifyFn(tester.db.diskdb)
// Verify states, all not-yet-written states should be discarded
tester.db = New(tester.db.diskdb, nil, false)
tester.db = New(tester.db.diskdb, tester.db.config, false)
for i := 0; i < len(tester.roots); i++ {
if tester.roots[i] == root {
if err := tester.verifyState(root); err != nil {
@ -694,7 +718,7 @@ func TestTailTruncateHistory(t *testing.T) {
maxDiffLayers = 128
}()
tester := newTester(t, 10, false, 12, false)
tester := newTester(t, 10, false, 12, false, "")
defer tester.release()
tester.db.Close()

View file

@ -0,0 +1,57 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
//go:build !windows
// +build !windows
package pathdb
import (
"errors"
"os"
"syscall"
)
func isErrInvalid(err error) bool {
if errors.Is(err, os.ErrInvalid) {
return true
}
// Go >= 1.8 returns *os.PathError instead
if patherr, ok := err.(*os.PathError); ok && patherr.Err == syscall.EINVAL {
return true
}
return false
}
func syncDir(name string) error {
// As per fsync manpage, Linux seems to expect fsync on directory, however
// some system don't support this, so we will ignore syscall.EINVAL.
//
// From fsync(2):
// Calling fsync() does not necessarily ensure that the entry in the
// directory containing the file has also reached disk. For that an
// explicit fsync() on a file descriptor for the directory is also needed.
f, err := os.Open(name)
if err != nil {
return err
}
defer f.Close()
if err := f.Sync(); err != nil && !isErrInvalid(err) {
return err
}
return nil
}

View file

@ -0,0 +1,25 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
//go:build windows
// +build windows
package pathdb
func syncDir(name string) error {
// On Windows, fsync on directories is not supported
return nil
}

View file

@ -126,7 +126,7 @@ func testHistoryReader(t *testing.T, historyLimit uint64) {
}()
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
env := newTester(t, historyLimit, false, 64, true)
env := newTester(t, historyLimit, false, 64, true, "")
defer env.release()
waitIndexing(env.db)

View file

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
@ -31,6 +32,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
const tempJournalSuffix = ".tmp"
var (
errMissJournal = errors.New("journal not found")
errMissVersion = errors.New("version not found")
@ -51,11 +54,25 @@ const journalVersion uint64 = 3
// loadJournal tries to parse the layer journal from the disk.
func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
journal := rawdb.ReadTrieJournal(db.diskdb)
if len(journal) == 0 {
return nil, errMissJournal
var reader io.Reader
if path := db.journalPath(); path != "" && common.FileExist(path) {
// If a journal file is specified, read it from there
log.Info("Load database journal from file", "path", path)
f, err := os.OpenFile(path, os.O_RDONLY, 0644)
if err != nil {
return nil, fmt.Errorf("failed to read journal file %s: %w", path, err)
}
defer f.Close()
reader = f
} else {
log.Info("Load database journal from disk")
journal := rawdb.ReadTrieJournal(db.diskdb)
if len(journal) == 0 {
return nil, errMissJournal
}
reader = bytes.NewReader(journal)
}
r := rlp.NewStream(bytes.NewReader(journal), 0)
r := rlp.NewStream(reader, 0)
// Firstly, resolve the first element as the journal version
version, err := r.Uint64()
@ -297,9 +314,9 @@ func (db *Database) Journal(root common.Hash) error {
}
disk := db.tree.bottom()
if l, ok := l.(*diffLayer); ok {
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
log.Info("Persisting dirty state", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers)
log.Info("Persisting dirty state", "root", root, "layers", disk.buffer.layers)
}
// Block until the background flushing is finished and terminate
// the potential active state generator.
@ -316,8 +333,37 @@ func (db *Database) Journal(root common.Hash) error {
if db.readOnly {
return errDatabaseReadOnly
}
// Store the journal into the database and return
var (
file *os.File
journal io.Writer
journalPath = db.journalPath()
)
if journalPath != "" {
// Write into a temp file first
err := os.MkdirAll(db.config.JournalDirectory, 0755)
if err != nil {
return err
}
tmp := journalPath + tempJournalSuffix
file, err = os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to open journal file %s: %w", tmp, err)
}
defer func() {
if file != nil {
file.Close()
os.Remove(tmp) // Clean up temp file if we didn't successfully rename it
log.Warn("Removed leftover temporary journal file", "path", tmp)
}
}()
journal = file
} else {
journal = new(bytes.Buffer)
}
// Firstly write out the metadata of journal
journal := new(bytes.Buffer)
if err := rlp.Encode(journal, journalVersion); err != nil {
return err
}
@ -334,11 +380,38 @@ func (db *Database) Journal(root common.Hash) error {
if err := l.journal(journal); err != nil {
return err
}
// Store the journal into the database and return
rawdb.WriteTrieJournal(db.diskdb, journal.Bytes())
// Store the journal into the database and return
if file == nil {
data := journal.(*bytes.Buffer)
size := data.Len()
rawdb.WriteTrieJournal(db.diskdb, data.Bytes())
log.Info("Persisted dirty state to disk", "size", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
} else {
stat, err := file.Stat()
if err != nil {
return err
}
size := int(stat.Size())
// Close the temporary file and atomically rename it
if err := file.Sync(); err != nil {
return fmt.Errorf("failed to fsync the journal, %v", err)
}
if err := file.Close(); err != nil {
return fmt.Errorf("failed to close the journal: %v", err)
}
// Replace the live journal with the newly generated one
if err := os.Rename(journalPath+tempJournalSuffix, journalPath); err != nil {
return fmt.Errorf("failed to rename the journal: %v", err)
}
if err := syncDir(db.config.JournalDirectory); err != nil {
return fmt.Errorf("failed to fsync the dir: %v", err)
}
file = nil
log.Info("Persisted dirty state to file", "path", journalPath, "size", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
}
// Set the db in read only mode to reject all following mutations
db.readOnly = true
log.Info("Persisted dirty state to disk", "size", common.StorageSize(journal.Len()), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}