From 859312d1f529d62c63f25f5c4dc9249435535aae Mon Sep 17 00:00:00 2001 From: weiihann Date: Thu, 12 Feb 2026 17:16:29 +0800 Subject: [PATCH] nomt/bitbox: add Phase 4 WAL, sync controller, and crash recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement crash-safe persistence for the Bitbox hash table: - wal.go: WAL format with START/CLEAR/UPDATE/END entries, builder/reader - sync.go: 3-phase sync protocol (BeginSync → WriteWAL → CommitSync) - recover.go: WAL replay for crash recovery The WAL records page diffs (not full pages) for compact logging. The 3-phase protocol ensures: WAL fsynced before HT modification, HT fsynced before WAL truncation, providing at-least-once delivery of page updates. Co-Authored-By: Claude Opus 4.6 --- nomt/bitbox/recover.go | 73 ++++++++++ nomt/bitbox/sync.go | 131 ++++++++++++++++++ nomt/bitbox/wal.go | 296 ++++++++++++++++++++++++++++++++++++++++ nomt/bitbox/wal_test.go | 217 +++++++++++++++++++++++++++++ 4 files changed, 717 insertions(+) create mode 100644 nomt/bitbox/recover.go create mode 100644 nomt/bitbox/sync.go create mode 100644 nomt/bitbox/wal.go create mode 100644 nomt/bitbox/wal_test.go diff --git a/nomt/bitbox/recover.go b/nomt/bitbox/recover.go new file mode 100644 index 0000000000..fb31ca91e6 --- /dev/null +++ b/nomt/bitbox/recover.go @@ -0,0 +1,73 @@ +package bitbox + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/nomt/core" +) + +// Recover replays the WAL file to restore the database to a consistent state. +// Returns the sync sequence number from the WAL, or 0 if no recovery was +// needed. +func (db *DB) Recover(walPath string) (uint32, error) { + data, err := ReadWALFile(walPath) + if err != nil { + return 0, fmt.Errorf("bitbox/recover: %w", err) + } + + if len(data) == 0 { + return 0, nil // No recovery needed. + } + + syncSeqn, entries, err := ReadWAL(data) + if err != nil { + return 0, fmt.Errorf("bitbox/recover: parse: %w", err) + } + + for _, entry := range entries { + switch entry.Kind { + case WALEntryClear: + db.metaMap.Set(entry.ClearBucket, MetaTombstone) + + case WALEntryUpdate: + // Read the existing data page at this bucket (or use a fresh one). + page, readErr := db.readDataPage(entry.UpdateBucket) + if readErr != nil { + page = new(core.RawPage) + } + + // Apply the diff: unpack changed nodes into the page. + entry.Diff.UnpackChangedNodes(entry.ChangedNodes, page) + + // Set elided children and page ID. + page.SetElidedChildren(entry.ElidedChildren) + page.SetPageIDBytes(entry.PageID) + + // Write data page. + if err := db.writeDataPage(entry.UpdateBucket, page); err != nil { + return 0, fmt.Errorf("bitbox/recover: write page: %w", err) + } + + // Update meta byte. + hash := HashPageIDBytes(db.seed, entry.PageID) + db.metaMap.Set(entry.UpdateBucket, MakeOccupied(hash)) + } + } + + // Write dirty meta pages. + if err := db.FlushMeta(); err != nil { + return 0, fmt.Errorf("bitbox/recover: flush meta: %w", err) + } + + // fsync HT file. + if err := db.file.Sync(); err != nil { + return 0, fmt.Errorf("bitbox/recover: fsync: %w", err) + } + + // Truncate WAL. + if err := TruncateWALFile(walPath); err != nil { + return 0, fmt.Errorf("bitbox/recover: truncate WAL: %w", err) + } + + return syncSeqn, nil +} diff --git a/nomt/bitbox/sync.go b/nomt/bitbox/sync.go new file mode 100644 index 0000000000..dcd76acad8 --- /dev/null +++ b/nomt/bitbox/sync.go @@ -0,0 +1,131 @@ +package bitbox + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/nomt/core" + "github.com/ethereum/go-ethereum/nomt/merkle" +) + +// SyncPlan holds the pre-computed work for a sync operation. +type SyncPlan struct { + walData []byte + dataWrites []dataWrite + syncSeqn uint32 +} + +type dataWrite struct { + bucket uint64 + page *core.RawPage +} + +// BeginSync prepares a sync plan from a set of updated pages. It allocates +// or reuses buckets, builds the WAL, and returns a SyncPlan. +// +// This is Phase 1 of the 3-phase sync protocol. +func (db *DB) BeginSync( + walPath string, + syncSeqn uint32, + updates []merkle.UpdatedPage, +) (*SyncPlan, error) { + wal := NewWALBuilder() + writes := make([]dataWrite, 0, len(updates)) + + for _, up := range updates { + if up.Diff.IsCleared() { + // Page was cleared — tombstone its bucket. + _, bucket, found, err := db.LoadPage(up.PageID) + if err != nil { + return nil, fmt.Errorf("bitbox/sync: load for clear: %w", err) + } + if found { + db.metaMap.Set(bucket, MetaTombstone) + db.occupied.Add(-1) + wal.AddClear(bucket) + } + continue + } + + // Encode the PageID into the page data. + encodedID := up.PageID.Encode() + up.Page.SetPageIDBytes(encodedID) + up.Page.SetElidedChildren(up.Page.ElidedChildren()) + + // Allocate or reuse a bucket. + bucket, err := db.StorePage(up.PageID, up.Page) + if err != nil { + return nil, fmt.Errorf("bitbox/sync: store page: %w", err) + } + + // Pack changed nodes from diff. + changedNodes := up.Diff.PackChangedNodes(up.Page) + + wal.AddUpdate( + encodedID, + up.Diff, + changedNodes, + up.Page.ElidedChildren(), + bucket, + ) + writes = append(writes, dataWrite{bucket: bucket, page: up.Page}) + } + + walData := wal.Finish(syncSeqn) + + return &SyncPlan{ + walData: walData, + dataWrites: writes, + syncSeqn: syncSeqn, + }, nil +} + +// WriteWAL writes the WAL to disk and fsyncs it. +// +// This is Phase 2 of the 3-phase sync protocol. +func (db *DB) WriteWAL(walPath string, plan *SyncPlan) error { + return WriteWALFile(walPath, plan.walData) +} + +// CommitSync writes dirty HT data + meta pages, fsyncs the HT file, and +// truncates the WAL. +// +// This is Phase 3 of the 3-phase sync protocol. +func (db *DB) CommitSync(walPath string, plan *SyncPlan) error { + // Write data pages. + for _, dw := range plan.dataWrites { + if err := db.writeDataPage(dw.bucket, dw.page); err != nil { + return fmt.Errorf("bitbox/sync: write data: %w", err) + } + } + + // Write dirty meta pages. + if err := db.FlushMeta(); err != nil { + return fmt.Errorf("bitbox/sync: flush meta: %w", err) + } + + // fsync the HT file. + if err := db.file.Sync(); err != nil { + return fmt.Errorf("bitbox/sync: fsync HT: %w", err) + } + + // Truncate WAL — no fsync needed. + return TruncateWALFile(walPath) +} + +// FullSync runs all three phases of the sync protocol. +func (db *DB) FullSync( + walPath string, + syncSeqn uint32, + updates []merkle.UpdatedPage, +) error { + plan, err := db.BeginSync(walPath, syncSeqn, updates) + if err != nil { + return err + } + + if err := db.WriteWAL(walPath, plan); err != nil { + return err + } + + return db.CommitSync(walPath, plan) +} diff --git a/nomt/bitbox/wal.go b/nomt/bitbox/wal.go new file mode 100644 index 0000000000..8b5322c1a1 --- /dev/null +++ b/nomt/bitbox/wal.go @@ -0,0 +1,296 @@ +package bitbox + +import ( + "encoding/binary" + "fmt" + "os" + + "github.com/ethereum/go-ethereum/nomt/core" +) + +// WAL entry type tags. +const ( + walTagStart byte = 0x01 + walTagClear byte = 0x02 + walTagUpdate byte = 0x03 + walTagEnd byte = 0x04 +) + +// WALEntryKind distinguishes the types of WAL entries. +type WALEntryKind int + +const ( + WALEntryClear WALEntryKind = iota + WALEntryUpdate +) + +// WALEntry represents a single entry in the WAL. +type WALEntry struct { + Kind WALEntryKind + + // For Clear entries: + ClearBucket uint64 + + // For Update entries: + PageID [32]byte + Diff core.PageDiff + ChangedNodes []core.Node + ElidedChildren uint64 + UpdateBucket uint64 +} + +// WALBuilder accumulates WAL entries in memory before serializing. +type WALBuilder struct { + entries []WALEntry +} + +// NewWALBuilder creates an empty WAL builder. +func NewWALBuilder() *WALBuilder { + return &WALBuilder{ + entries: make([]WALEntry, 0, 64), + } +} + +// AddClear adds a CLEAR entry (tombstone a bucket). +func (b *WALBuilder) AddClear(bucket uint64) { + b.entries = append(b.entries, WALEntry{ + Kind: WALEntryClear, + ClearBucket: bucket, + }) +} + +// AddUpdate adds an UPDATE entry. +func (b *WALBuilder) AddUpdate( + pageID [32]byte, + diff core.PageDiff, + changedNodes []core.Node, + elidedChildren uint64, + bucket uint64, +) { + b.entries = append(b.entries, WALEntry{ + Kind: WALEntryUpdate, + PageID: pageID, + Diff: diff, + ChangedNodes: changedNodes, + ElidedChildren: elidedChildren, + UpdateBucket: bucket, + }) +} + +// Finish serializes the WAL with a START and END record, padded to a +// multiple of pageSize. +func (b *WALBuilder) Finish(syncSeqn uint32) []byte { + // Estimate size: START(5) + entries + END(1). + estimatedSize := 5 + 1 + for _, e := range b.entries { + switch e.Kind { + case WALEntryClear: + estimatedSize += 1 + 8 // tag + bucket + case WALEntryUpdate: + estimatedSize += 1 + 32 + 16 + len(e.ChangedNodes)*32 + 8 + 8 + } + } + + buf := make([]byte, 0, estimatedSize+pageSize) + + // START: tag(1) + syncSeqn(4) + buf = append(buf, walTagStart) + var seqBuf [4]byte + binary.LittleEndian.PutUint32(seqBuf[:], syncSeqn) + buf = append(buf, seqBuf[:]...) + + // Entries. + var u64Buf [8]byte + for _, e := range b.entries { + switch e.Kind { + case WALEntryClear: + buf = append(buf, walTagClear) + binary.LittleEndian.PutUint64(u64Buf[:], e.ClearBucket) + buf = append(buf, u64Buf[:]...) + + case WALEntryUpdate: + buf = append(buf, walTagUpdate) + buf = append(buf, e.PageID[:]...) + encoded := e.Diff.Encode() + buf = append(buf, encoded[:]...) + for _, n := range e.ChangedNodes { + buf = append(buf, n[:]...) + } + binary.LittleEndian.PutUint64(u64Buf[:], e.ElidedChildren) + buf = append(buf, u64Buf[:]...) + binary.LittleEndian.PutUint64(u64Buf[:], e.UpdateBucket) + buf = append(buf, u64Buf[:]...) + } + } + + // END tag. + buf = append(buf, walTagEnd) + + // Pad to page boundary. + if rem := len(buf) % pageSize; rem != 0 { + padding := make([]byte, pageSize-rem) + buf = append(buf, padding...) + } + + return buf +} + +// ReadWAL parses a WAL from raw bytes. Returns the sync sequence number +// and the list of entries. Returns an error if the WAL is malformed. +func ReadWAL(data []byte) (uint32, []WALEntry, error) { + if len(data) == 0 { + return 0, nil, nil // Empty WAL = no recovery needed. + } + + pos := 0 + read := func(n int) ([]byte, error) { + if pos+n > len(data) { + return nil, fmt.Errorf("bitbox/wal: unexpected EOF at offset %d", pos) + } + b := data[pos : pos+n] + pos += n + return b, nil + } + + readByte := func() (byte, error) { + b, err := read(1) + if err != nil { + return 0, err + } + return b[0], nil + } + + readU32 := func() (uint32, error) { + b, err := read(4) + if err != nil { + return 0, err + } + return binary.LittleEndian.Uint32(b), nil + } + + readU64 := func() (uint64, error) { + b, err := read(8) + if err != nil { + return 0, err + } + return binary.LittleEndian.Uint64(b), nil + } + + // Read START. + tag, err := readByte() + if err != nil { + return 0, nil, err + } + if tag != walTagStart { + return 0, nil, fmt.Errorf("bitbox/wal: expected START tag, got 0x%02x", tag) + } + + syncSeqn, err := readU32() + if err != nil { + return 0, nil, err + } + + var entries []WALEntry + + for { + tag, err := readByte() + if err != nil { + return 0, nil, err + } + + switch tag { + case walTagEnd: + return syncSeqn, entries, nil + + case walTagClear: + bucket, err := readU64() + if err != nil { + return 0, nil, err + } + entries = append(entries, WALEntry{ + Kind: WALEntryClear, + ClearBucket: bucket, + }) + + case walTagUpdate: + pidBytes, err := read(32) + if err != nil { + return 0, nil, err + } + var pageID [32]byte + copy(pageID[:], pidBytes) + + diffBytes, err := read(16) + if err != nil { + return 0, nil, err + } + var diffBuf [16]byte + copy(diffBuf[:], diffBytes) + diff := core.DecodePageDiff(diffBuf) + + nodeCount := diff.Count() + nodes := make([]core.Node, nodeCount) + for i := range nodeCount { + nodeBytes, err := read(32) + if err != nil { + return 0, nil, err + } + copy(nodes[i][:], nodeBytes) + } + + elidedChildren, err := readU64() + if err != nil { + return 0, nil, err + } + bucket, err := readU64() + if err != nil { + return 0, nil, err + } + + entries = append(entries, WALEntry{ + Kind: WALEntryUpdate, + PageID: pageID, + Diff: diff, + ChangedNodes: nodes, + ElidedChildren: elidedChildren, + UpdateBucket: bucket, + }) + + default: + return 0, nil, fmt.Errorf("bitbox/wal: unknown tag 0x%02x at offset %d", + tag, pos-1) + } + } +} + +// WriteWALFile writes a WAL to a file, creating or truncating it. +func WriteWALFile(path string, data []byte) error { + if err := os.WriteFile(path, data, 0644); err != nil { + return fmt.Errorf("bitbox/wal: write: %w", err) + } + // fsync via re-open. + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("bitbox/wal: open for sync: %w", err) + } + defer f.Close() + return f.Sync() +} + +// ReadWALFile reads a WAL file. Returns nil data if the file doesn't exist +// or is empty. +func ReadWALFile(path string) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("bitbox/wal: read: %w", err) + } + return data, nil +} + +// TruncateWALFile empties the WAL file. +func TruncateWALFile(path string) error { + return os.Truncate(path, 0) +} diff --git a/nomt/bitbox/wal_test.go b/nomt/bitbox/wal_test.go new file mode 100644 index 0000000000..da01d7f820 --- /dev/null +++ b/nomt/bitbox/wal_test.go @@ -0,0 +1,217 @@ +package bitbox + +import ( + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/nomt/core" + "github.com/ethereum/go-ethereum/nomt/merkle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- WAL Builder/Reader Tests --- + +func TestWALEmptyRoundTrip(t *testing.T) { + b := NewWALBuilder() + data := b.Finish(42) + + // Should be padded to page boundary. + assert.Equal(t, 0, len(data)%pageSize) + + seqn, entries, err := ReadWAL(data) + require.NoError(t, err) + assert.Equal(t, uint32(42), seqn) + assert.Empty(t, entries) +} + +func TestWALClearEntryRoundTrip(t *testing.T) { + b := NewWALBuilder() + b.AddClear(123) + b.AddClear(456) + data := b.Finish(1) + + seqn, entries, err := ReadWAL(data) + require.NoError(t, err) + assert.Equal(t, uint32(1), seqn) + require.Len(t, entries, 2) + + assert.Equal(t, WALEntryClear, entries[0].Kind) + assert.Equal(t, uint64(123), entries[0].ClearBucket) + assert.Equal(t, uint64(456), entries[1].ClearBucket) +} + +func TestWALUpdateEntryRoundTrip(t *testing.T) { + var pageID [32]byte + pageID[0] = 0xAB + + var diff core.PageDiff + diff.SetChanged(5) + diff.SetChanged(70) + + nodes := []core.Node{{0x01}, {0x02}} + + b := NewWALBuilder() + b.AddUpdate(pageID, diff, nodes, 0xFF, 99) + data := b.Finish(7) + + seqn, entries, err := ReadWAL(data) + require.NoError(t, err) + assert.Equal(t, uint32(7), seqn) + require.Len(t, entries, 1) + + e := entries[0] + assert.Equal(t, WALEntryUpdate, e.Kind) + assert.Equal(t, pageID, e.PageID) + assert.True(t, e.Diff.IsChanged(5)) + assert.True(t, e.Diff.IsChanged(70)) + require.Len(t, e.ChangedNodes, 2) + assert.Equal(t, core.Node{0x01}, e.ChangedNodes[0]) + assert.Equal(t, core.Node{0x02}, e.ChangedNodes[1]) + assert.Equal(t, uint64(0xFF), e.ElidedChildren) + assert.Equal(t, uint64(99), e.UpdateBucket) +} + +func TestWALMixedEntries(t *testing.T) { + b := NewWALBuilder() + b.AddClear(10) + + var pid [32]byte + var diff core.PageDiff + diff.SetChanged(0) + b.AddUpdate(pid, diff, []core.Node{{0xAA}}, 0, 20) + + b.AddClear(30) + data := b.Finish(100) + + _, entries, err := ReadWAL(data) + require.NoError(t, err) + require.Len(t, entries, 3) + assert.Equal(t, WALEntryClear, entries[0].Kind) + assert.Equal(t, WALEntryUpdate, entries[1].Kind) + assert.Equal(t, WALEntryClear, entries[2].Kind) +} + +func TestReadWALEmpty(t *testing.T) { + seqn, entries, err := ReadWAL(nil) + require.NoError(t, err) + assert.Equal(t, uint32(0), seqn) + assert.Nil(t, entries) +} + +func TestWALFilePersistence(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.wal") + + b := NewWALBuilder() + b.AddClear(42) + data := b.Finish(5) + + require.NoError(t, WriteWALFile(path, data)) + + loaded, err := ReadWALFile(path) + require.NoError(t, err) + assert.Equal(t, data, loaded) + + require.NoError(t, TruncateWALFile(path)) + loaded2, err := ReadWALFile(path) + require.NoError(t, err) + assert.Empty(t, loaded2) +} + +// --- Sync Controller Tests --- + +func TestFullSyncCycle(t *testing.T) { + dir := t.TempDir() + htPath := filepath.Join(dir, "test.bitbox") + walPath := filepath.Join(dir, "test.wal") + + seed := HashSeedFromUint64(1, 2) + db, err := Create(htPath, 1024, seed) + require.NoError(t, err) + defer db.Close() + + rootID := core.RootPageID() + page := new(core.RawPage) + page.SetNodeAt(0, core.Node{0xAA}) + + var diff core.PageDiff + diff.SetChanged(0) + + updates := []merkle.UpdatedPage{{ + PageID: rootID, + Page: page, + Diff: diff, + }} + + require.NoError(t, db.FullSync(walPath, 1, updates)) + + // Verify page is persisted. + loaded, _, found, err := db.LoadPage(rootID) + require.NoError(t, err) + assert.True(t, found) + assert.Equal(t, core.Node{0xAA}, loaded.NodeAt(0)) +} + +// --- Recovery Tests --- + +func TestRecoverFromWAL(t *testing.T) { + dir := t.TempDir() + htPath := filepath.Join(dir, "test.bitbox") + walPath := filepath.Join(dir, "test.wal") + + seed := HashSeedFromUint64(1, 2) + + // Create DB and write a WAL but don't commit Phase 3. + db, err := Create(htPath, 1024, seed) + require.NoError(t, err) + + rootID := core.RootPageID() + page := new(core.RawPage) + page.SetNodeAt(0, core.Node{0xBB}) + + var diff core.PageDiff + diff.SetChanged(0) + + updates := []merkle.UpdatedPage{{ + PageID: rootID, + Page: page, + Diff: diff, + }} + + // Phase 1 + 2 only (simulate crash before Phase 3). + plan, err := db.BeginSync(walPath, 5, updates) + require.NoError(t, err) + require.NoError(t, db.WriteWAL(walPath, plan)) + db.Close() + + // Reopen and recover. + db2, err := Open(htPath) + require.NoError(t, err) + defer db2.Close() + + seqn, err := db2.Recover(walPath) + require.NoError(t, err) + assert.Equal(t, uint32(5), seqn) + + // Verify the page was recovered. + loaded, _, found, err := db2.LoadPage(rootID) + require.NoError(t, err) + assert.True(t, found) + assert.Equal(t, core.Node{0xBB}, loaded.NodeAt(0)) +} + +func TestRecoverNoWAL(t *testing.T) { + dir := t.TempDir() + htPath := filepath.Join(dir, "test.bitbox") + walPath := filepath.Join(dir, "test.wal") + + seed := HashSeedFromUint64(1, 2) + db, err := Create(htPath, 1024, seed) + require.NoError(t, err) + defer db.Close() + + seqn, err := db.Recover(walPath) + require.NoError(t, err) + assert.Equal(t, uint32(0), seqn, "no recovery needed") +}