nomt/bitbox: add Phase 4 WAL, sync controller, and crash recovery

Implement crash-safe persistence for the Bitbox hash table:
- wal.go: WAL format with START/CLEAR/UPDATE/END entries, builder/reader
- sync.go: 3-phase sync protocol (BeginSync → WriteWAL → CommitSync)
- recover.go: WAL replay for crash recovery

The WAL records page diffs (not full pages) for compact logging. The
3-phase protocol ensures: WAL fsynced before HT modification, HT fsynced
before WAL truncation, providing at-least-once delivery of page updates.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
weiihann 2026-02-12 17:16:29 +08:00
parent fef1ed4c4f
commit 859312d1f5
4 changed files with 717 additions and 0 deletions

73
nomt/bitbox/recover.go Normal file
View file

@ -0,0 +1,73 @@
package bitbox
import (
"fmt"
"github.com/ethereum/go-ethereum/nomt/core"
)
// Recover replays the WAL file to restore the database to a consistent state.
// Returns the sync sequence number from the WAL, or 0 if no recovery was
// needed.
func (db *DB) Recover(walPath string) (uint32, error) {
data, err := ReadWALFile(walPath)
if err != nil {
return 0, fmt.Errorf("bitbox/recover: %w", err)
}
if len(data) == 0 {
return 0, nil // No recovery needed.
}
syncSeqn, entries, err := ReadWAL(data)
if err != nil {
return 0, fmt.Errorf("bitbox/recover: parse: %w", err)
}
for _, entry := range entries {
switch entry.Kind {
case WALEntryClear:
db.metaMap.Set(entry.ClearBucket, MetaTombstone)
case WALEntryUpdate:
// Read the existing data page at this bucket (or use a fresh one).
page, readErr := db.readDataPage(entry.UpdateBucket)
if readErr != nil {
page = new(core.RawPage)
}
// Apply the diff: unpack changed nodes into the page.
entry.Diff.UnpackChangedNodes(entry.ChangedNodes, page)
// Set elided children and page ID.
page.SetElidedChildren(entry.ElidedChildren)
page.SetPageIDBytes(entry.PageID)
// Write data page.
if err := db.writeDataPage(entry.UpdateBucket, page); err != nil {
return 0, fmt.Errorf("bitbox/recover: write page: %w", err)
}
// Update meta byte.
hash := HashPageIDBytes(db.seed, entry.PageID)
db.metaMap.Set(entry.UpdateBucket, MakeOccupied(hash))
}
}
// Write dirty meta pages.
if err := db.FlushMeta(); err != nil {
return 0, fmt.Errorf("bitbox/recover: flush meta: %w", err)
}
// fsync HT file.
if err := db.file.Sync(); err != nil {
return 0, fmt.Errorf("bitbox/recover: fsync: %w", err)
}
// Truncate WAL.
if err := TruncateWALFile(walPath); err != nil {
return 0, fmt.Errorf("bitbox/recover: truncate WAL: %w", err)
}
return syncSeqn, nil
}

131
nomt/bitbox/sync.go Normal file
View file

@ -0,0 +1,131 @@
package bitbox
import (
"fmt"
"github.com/ethereum/go-ethereum/nomt/core"
"github.com/ethereum/go-ethereum/nomt/merkle"
)
// SyncPlan holds the pre-computed work for a sync operation.
type SyncPlan struct {
walData []byte
dataWrites []dataWrite
syncSeqn uint32
}
type dataWrite struct {
bucket uint64
page *core.RawPage
}
// BeginSync prepares a sync plan from a set of updated pages. It allocates
// or reuses buckets, builds the WAL, and returns a SyncPlan.
//
// This is Phase 1 of the 3-phase sync protocol.
func (db *DB) BeginSync(
walPath string,
syncSeqn uint32,
updates []merkle.UpdatedPage,
) (*SyncPlan, error) {
wal := NewWALBuilder()
writes := make([]dataWrite, 0, len(updates))
for _, up := range updates {
if up.Diff.IsCleared() {
// Page was cleared — tombstone its bucket.
_, bucket, found, err := db.LoadPage(up.PageID)
if err != nil {
return nil, fmt.Errorf("bitbox/sync: load for clear: %w", err)
}
if found {
db.metaMap.Set(bucket, MetaTombstone)
db.occupied.Add(-1)
wal.AddClear(bucket)
}
continue
}
// Encode the PageID into the page data.
encodedID := up.PageID.Encode()
up.Page.SetPageIDBytes(encodedID)
up.Page.SetElidedChildren(up.Page.ElidedChildren())
// Allocate or reuse a bucket.
bucket, err := db.StorePage(up.PageID, up.Page)
if err != nil {
return nil, fmt.Errorf("bitbox/sync: store page: %w", err)
}
// Pack changed nodes from diff.
changedNodes := up.Diff.PackChangedNodes(up.Page)
wal.AddUpdate(
encodedID,
up.Diff,
changedNodes,
up.Page.ElidedChildren(),
bucket,
)
writes = append(writes, dataWrite{bucket: bucket, page: up.Page})
}
walData := wal.Finish(syncSeqn)
return &SyncPlan{
walData: walData,
dataWrites: writes,
syncSeqn: syncSeqn,
}, nil
}
// WriteWAL writes the WAL to disk and fsyncs it.
//
// This is Phase 2 of the 3-phase sync protocol.
func (db *DB) WriteWAL(walPath string, plan *SyncPlan) error {
return WriteWALFile(walPath, plan.walData)
}
// CommitSync writes dirty HT data + meta pages, fsyncs the HT file, and
// truncates the WAL.
//
// This is Phase 3 of the 3-phase sync protocol.
func (db *DB) CommitSync(walPath string, plan *SyncPlan) error {
// Write data pages.
for _, dw := range plan.dataWrites {
if err := db.writeDataPage(dw.bucket, dw.page); err != nil {
return fmt.Errorf("bitbox/sync: write data: %w", err)
}
}
// Write dirty meta pages.
if err := db.FlushMeta(); err != nil {
return fmt.Errorf("bitbox/sync: flush meta: %w", err)
}
// fsync the HT file.
if err := db.file.Sync(); err != nil {
return fmt.Errorf("bitbox/sync: fsync HT: %w", err)
}
// Truncate WAL — no fsync needed.
return TruncateWALFile(walPath)
}
// FullSync runs all three phases of the sync protocol.
func (db *DB) FullSync(
walPath string,
syncSeqn uint32,
updates []merkle.UpdatedPage,
) error {
plan, err := db.BeginSync(walPath, syncSeqn, updates)
if err != nil {
return err
}
if err := db.WriteWAL(walPath, plan); err != nil {
return err
}
return db.CommitSync(walPath, plan)
}

296
nomt/bitbox/wal.go Normal file
View file

@ -0,0 +1,296 @@
package bitbox
import (
"encoding/binary"
"fmt"
"os"
"github.com/ethereum/go-ethereum/nomt/core"
)
// WAL entry type tags.
const (
walTagStart byte = 0x01
walTagClear byte = 0x02
walTagUpdate byte = 0x03
walTagEnd byte = 0x04
)
// WALEntryKind distinguishes the types of WAL entries.
type WALEntryKind int
const (
WALEntryClear WALEntryKind = iota
WALEntryUpdate
)
// WALEntry represents a single entry in the WAL.
type WALEntry struct {
Kind WALEntryKind
// For Clear entries:
ClearBucket uint64
// For Update entries:
PageID [32]byte
Diff core.PageDiff
ChangedNodes []core.Node
ElidedChildren uint64
UpdateBucket uint64
}
// WALBuilder accumulates WAL entries in memory before serializing.
type WALBuilder struct {
entries []WALEntry
}
// NewWALBuilder creates an empty WAL builder.
func NewWALBuilder() *WALBuilder {
return &WALBuilder{
entries: make([]WALEntry, 0, 64),
}
}
// AddClear adds a CLEAR entry (tombstone a bucket).
func (b *WALBuilder) AddClear(bucket uint64) {
b.entries = append(b.entries, WALEntry{
Kind: WALEntryClear,
ClearBucket: bucket,
})
}
// AddUpdate adds an UPDATE entry.
func (b *WALBuilder) AddUpdate(
pageID [32]byte,
diff core.PageDiff,
changedNodes []core.Node,
elidedChildren uint64,
bucket uint64,
) {
b.entries = append(b.entries, WALEntry{
Kind: WALEntryUpdate,
PageID: pageID,
Diff: diff,
ChangedNodes: changedNodes,
ElidedChildren: elidedChildren,
UpdateBucket: bucket,
})
}
// Finish serializes the WAL with a START and END record, padded to a
// multiple of pageSize.
func (b *WALBuilder) Finish(syncSeqn uint32) []byte {
// Estimate size: START(5) + entries + END(1).
estimatedSize := 5 + 1
for _, e := range b.entries {
switch e.Kind {
case WALEntryClear:
estimatedSize += 1 + 8 // tag + bucket
case WALEntryUpdate:
estimatedSize += 1 + 32 + 16 + len(e.ChangedNodes)*32 + 8 + 8
}
}
buf := make([]byte, 0, estimatedSize+pageSize)
// START: tag(1) + syncSeqn(4)
buf = append(buf, walTagStart)
var seqBuf [4]byte
binary.LittleEndian.PutUint32(seqBuf[:], syncSeqn)
buf = append(buf, seqBuf[:]...)
// Entries.
var u64Buf [8]byte
for _, e := range b.entries {
switch e.Kind {
case WALEntryClear:
buf = append(buf, walTagClear)
binary.LittleEndian.PutUint64(u64Buf[:], e.ClearBucket)
buf = append(buf, u64Buf[:]...)
case WALEntryUpdate:
buf = append(buf, walTagUpdate)
buf = append(buf, e.PageID[:]...)
encoded := e.Diff.Encode()
buf = append(buf, encoded[:]...)
for _, n := range e.ChangedNodes {
buf = append(buf, n[:]...)
}
binary.LittleEndian.PutUint64(u64Buf[:], e.ElidedChildren)
buf = append(buf, u64Buf[:]...)
binary.LittleEndian.PutUint64(u64Buf[:], e.UpdateBucket)
buf = append(buf, u64Buf[:]...)
}
}
// END tag.
buf = append(buf, walTagEnd)
// Pad to page boundary.
if rem := len(buf) % pageSize; rem != 0 {
padding := make([]byte, pageSize-rem)
buf = append(buf, padding...)
}
return buf
}
// ReadWAL parses a WAL from raw bytes. Returns the sync sequence number
// and the list of entries. Returns an error if the WAL is malformed.
func ReadWAL(data []byte) (uint32, []WALEntry, error) {
if len(data) == 0 {
return 0, nil, nil // Empty WAL = no recovery needed.
}
pos := 0
read := func(n int) ([]byte, error) {
if pos+n > len(data) {
return nil, fmt.Errorf("bitbox/wal: unexpected EOF at offset %d", pos)
}
b := data[pos : pos+n]
pos += n
return b, nil
}
readByte := func() (byte, error) {
b, err := read(1)
if err != nil {
return 0, err
}
return b[0], nil
}
readU32 := func() (uint32, error) {
b, err := read(4)
if err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(b), nil
}
readU64 := func() (uint64, error) {
b, err := read(8)
if err != nil {
return 0, err
}
return binary.LittleEndian.Uint64(b), nil
}
// Read START.
tag, err := readByte()
if err != nil {
return 0, nil, err
}
if tag != walTagStart {
return 0, nil, fmt.Errorf("bitbox/wal: expected START tag, got 0x%02x", tag)
}
syncSeqn, err := readU32()
if err != nil {
return 0, nil, err
}
var entries []WALEntry
for {
tag, err := readByte()
if err != nil {
return 0, nil, err
}
switch tag {
case walTagEnd:
return syncSeqn, entries, nil
case walTagClear:
bucket, err := readU64()
if err != nil {
return 0, nil, err
}
entries = append(entries, WALEntry{
Kind: WALEntryClear,
ClearBucket: bucket,
})
case walTagUpdate:
pidBytes, err := read(32)
if err != nil {
return 0, nil, err
}
var pageID [32]byte
copy(pageID[:], pidBytes)
diffBytes, err := read(16)
if err != nil {
return 0, nil, err
}
var diffBuf [16]byte
copy(diffBuf[:], diffBytes)
diff := core.DecodePageDiff(diffBuf)
nodeCount := diff.Count()
nodes := make([]core.Node, nodeCount)
for i := range nodeCount {
nodeBytes, err := read(32)
if err != nil {
return 0, nil, err
}
copy(nodes[i][:], nodeBytes)
}
elidedChildren, err := readU64()
if err != nil {
return 0, nil, err
}
bucket, err := readU64()
if err != nil {
return 0, nil, err
}
entries = append(entries, WALEntry{
Kind: WALEntryUpdate,
PageID: pageID,
Diff: diff,
ChangedNodes: nodes,
ElidedChildren: elidedChildren,
UpdateBucket: bucket,
})
default:
return 0, nil, fmt.Errorf("bitbox/wal: unknown tag 0x%02x at offset %d",
tag, pos-1)
}
}
}
// WriteWALFile writes a WAL to a file, creating or truncating it.
func WriteWALFile(path string, data []byte) error {
if err := os.WriteFile(path, data, 0644); err != nil {
return fmt.Errorf("bitbox/wal: write: %w", err)
}
// fsync via re-open.
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("bitbox/wal: open for sync: %w", err)
}
defer f.Close()
return f.Sync()
}
// ReadWALFile reads a WAL file. Returns nil data if the file doesn't exist
// or is empty.
func ReadWALFile(path string) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("bitbox/wal: read: %w", err)
}
return data, nil
}
// TruncateWALFile empties the WAL file.
func TruncateWALFile(path string) error {
return os.Truncate(path, 0)
}

217
nomt/bitbox/wal_test.go Normal file
View file

@ -0,0 +1,217 @@
package bitbox
import (
"path/filepath"
"testing"
"github.com/ethereum/go-ethereum/nomt/core"
"github.com/ethereum/go-ethereum/nomt/merkle"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// --- WAL Builder/Reader Tests ---
func TestWALEmptyRoundTrip(t *testing.T) {
b := NewWALBuilder()
data := b.Finish(42)
// Should be padded to page boundary.
assert.Equal(t, 0, len(data)%pageSize)
seqn, entries, err := ReadWAL(data)
require.NoError(t, err)
assert.Equal(t, uint32(42), seqn)
assert.Empty(t, entries)
}
func TestWALClearEntryRoundTrip(t *testing.T) {
b := NewWALBuilder()
b.AddClear(123)
b.AddClear(456)
data := b.Finish(1)
seqn, entries, err := ReadWAL(data)
require.NoError(t, err)
assert.Equal(t, uint32(1), seqn)
require.Len(t, entries, 2)
assert.Equal(t, WALEntryClear, entries[0].Kind)
assert.Equal(t, uint64(123), entries[0].ClearBucket)
assert.Equal(t, uint64(456), entries[1].ClearBucket)
}
func TestWALUpdateEntryRoundTrip(t *testing.T) {
var pageID [32]byte
pageID[0] = 0xAB
var diff core.PageDiff
diff.SetChanged(5)
diff.SetChanged(70)
nodes := []core.Node{{0x01}, {0x02}}
b := NewWALBuilder()
b.AddUpdate(pageID, diff, nodes, 0xFF, 99)
data := b.Finish(7)
seqn, entries, err := ReadWAL(data)
require.NoError(t, err)
assert.Equal(t, uint32(7), seqn)
require.Len(t, entries, 1)
e := entries[0]
assert.Equal(t, WALEntryUpdate, e.Kind)
assert.Equal(t, pageID, e.PageID)
assert.True(t, e.Diff.IsChanged(5))
assert.True(t, e.Diff.IsChanged(70))
require.Len(t, e.ChangedNodes, 2)
assert.Equal(t, core.Node{0x01}, e.ChangedNodes[0])
assert.Equal(t, core.Node{0x02}, e.ChangedNodes[1])
assert.Equal(t, uint64(0xFF), e.ElidedChildren)
assert.Equal(t, uint64(99), e.UpdateBucket)
}
func TestWALMixedEntries(t *testing.T) {
b := NewWALBuilder()
b.AddClear(10)
var pid [32]byte
var diff core.PageDiff
diff.SetChanged(0)
b.AddUpdate(pid, diff, []core.Node{{0xAA}}, 0, 20)
b.AddClear(30)
data := b.Finish(100)
_, entries, err := ReadWAL(data)
require.NoError(t, err)
require.Len(t, entries, 3)
assert.Equal(t, WALEntryClear, entries[0].Kind)
assert.Equal(t, WALEntryUpdate, entries[1].Kind)
assert.Equal(t, WALEntryClear, entries[2].Kind)
}
func TestReadWALEmpty(t *testing.T) {
seqn, entries, err := ReadWAL(nil)
require.NoError(t, err)
assert.Equal(t, uint32(0), seqn)
assert.Nil(t, entries)
}
func TestWALFilePersistence(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.wal")
b := NewWALBuilder()
b.AddClear(42)
data := b.Finish(5)
require.NoError(t, WriteWALFile(path, data))
loaded, err := ReadWALFile(path)
require.NoError(t, err)
assert.Equal(t, data, loaded)
require.NoError(t, TruncateWALFile(path))
loaded2, err := ReadWALFile(path)
require.NoError(t, err)
assert.Empty(t, loaded2)
}
// --- Sync Controller Tests ---
func TestFullSyncCycle(t *testing.T) {
dir := t.TempDir()
htPath := filepath.Join(dir, "test.bitbox")
walPath := filepath.Join(dir, "test.wal")
seed := HashSeedFromUint64(1, 2)
db, err := Create(htPath, 1024, seed)
require.NoError(t, err)
defer db.Close()
rootID := core.RootPageID()
page := new(core.RawPage)
page.SetNodeAt(0, core.Node{0xAA})
var diff core.PageDiff
diff.SetChanged(0)
updates := []merkle.UpdatedPage{{
PageID: rootID,
Page: page,
Diff: diff,
}}
require.NoError(t, db.FullSync(walPath, 1, updates))
// Verify page is persisted.
loaded, _, found, err := db.LoadPage(rootID)
require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, core.Node{0xAA}, loaded.NodeAt(0))
}
// --- Recovery Tests ---
func TestRecoverFromWAL(t *testing.T) {
dir := t.TempDir()
htPath := filepath.Join(dir, "test.bitbox")
walPath := filepath.Join(dir, "test.wal")
seed := HashSeedFromUint64(1, 2)
// Create DB and write a WAL but don't commit Phase 3.
db, err := Create(htPath, 1024, seed)
require.NoError(t, err)
rootID := core.RootPageID()
page := new(core.RawPage)
page.SetNodeAt(0, core.Node{0xBB})
var diff core.PageDiff
diff.SetChanged(0)
updates := []merkle.UpdatedPage{{
PageID: rootID,
Page: page,
Diff: diff,
}}
// Phase 1 + 2 only (simulate crash before Phase 3).
plan, err := db.BeginSync(walPath, 5, updates)
require.NoError(t, err)
require.NoError(t, db.WriteWAL(walPath, plan))
db.Close()
// Reopen and recover.
db2, err := Open(htPath)
require.NoError(t, err)
defer db2.Close()
seqn, err := db2.Recover(walPath)
require.NoError(t, err)
assert.Equal(t, uint32(5), seqn)
// Verify the page was recovered.
loaded, _, found, err := db2.LoadPage(rootID)
require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, core.Node{0xBB}, loaded.NodeAt(0))
}
func TestRecoverNoWAL(t *testing.T) {
dir := t.TempDir()
htPath := filepath.Join(dir, "test.bitbox")
walPath := filepath.Join(dir, "test.wal")
seed := HashSeedFromUint64(1, 2)
db, err := Create(htPath, 1024, seed)
require.NoError(t, err)
defer db.Close()
seqn, err := db.Recover(walPath)
require.NoError(t, err)
assert.Equal(t, uint32(0), seqn, "no recovery needed")
}