mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-03-12 06:09:08 +00:00
core/rawdb, triedb/pathdb: enable trienode history alongside existing data (#33934)
Fixes https://github.com/ethereum/go-ethereum/issues/33907 Notably there is a behavioral change: - Previously Geth will refuse to restart if the existing trienode history is gapped with the state data - With this PR, the gapped trienode history will be entirely reset and being constructed from scratch
This commit is contained in:
parent
59512b1849
commit
7d13acd030
8 changed files with 261 additions and 51 deletions
|
|
@ -260,6 +260,46 @@ func basicWrite(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
|
|||
if err != nil {
|
||||
t.Fatalf("Failed to write ancient data %v", err)
|
||||
}
|
||||
|
||||
// Write should work after truncating from tail but over the head
|
||||
db.TruncateTail(200)
|
||||
head, err := db.Ancients()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve head ancients %v", err)
|
||||
}
|
||||
tail, err := db.Tail()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve tail ancients %v", err)
|
||||
}
|
||||
if head != 200 || tail != 200 {
|
||||
t.Fatalf("Ancient head and tail are not expected")
|
||||
}
|
||||
_, err = db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
|
||||
offset := uint64(200)
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := op.AppendRaw("a", offset+uint64(i), dataA[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := op.AppendRaw("b", offset+uint64(i), dataB[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to write ancient data %v", err)
|
||||
}
|
||||
head, err = db.Ancients()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve head ancients %v", err)
|
||||
}
|
||||
tail, err = db.Tail()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve tail ancients %v", err)
|
||||
}
|
||||
if head != 300 || tail != 200 {
|
||||
t.Fatalf("Ancient head and tail are not expected")
|
||||
}
|
||||
}
|
||||
|
||||
func nonMutable(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ const freezerTableSize = 2 * 1000 * 1000 * 1000
|
|||
// - The in-order data ensures that disk reads are always optimized.
|
||||
type Freezer struct {
|
||||
datadir string
|
||||
frozen atomic.Uint64 // Number of items already frozen
|
||||
head atomic.Uint64 // Number of items stored (including items removed from tail)
|
||||
tail atomic.Uint64 // Number of the first stored item in the freezer
|
||||
|
||||
// This lock synchronizes writers and the truncate operation, as well as
|
||||
|
|
@ -97,12 +97,12 @@ func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
|
|||
return nil, errSymlinkDatadir
|
||||
}
|
||||
}
|
||||
// Leveldb/Pebble uses LOCK as the filelock filename. To prevent the
|
||||
// name collision, we use FLOCK as the lock name.
|
||||
flockFile := filepath.Join(datadir, "FLOCK")
|
||||
if err := os.MkdirAll(filepath.Dir(flockFile), 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Leveldb uses LOCK as the filelock filename. To prevent the
|
||||
// name collision, we use FLOCK as the lock name.
|
||||
lock := flock.New(flockFile)
|
||||
tryLock := lock.TryLock
|
||||
if readonly {
|
||||
|
|
@ -213,7 +213,7 @@ func (f *Freezer) AncientBytes(kind string, id, offset, length uint64) ([]byte,
|
|||
|
||||
// Ancients returns the length of the frozen items.
|
||||
func (f *Freezer) Ancients() (uint64, error) {
|
||||
return f.frozen.Load(), nil
|
||||
return f.head.Load(), nil
|
||||
}
|
||||
|
||||
// Tail returns the number of first stored item in the freezer.
|
||||
|
|
@ -252,7 +252,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
|
|||
defer f.writeLock.Unlock()
|
||||
|
||||
// Roll back all tables to the starting position in case of error.
|
||||
prevItem := f.frozen.Load()
|
||||
prevItem := f.head.Load()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// The write operation has failed. Go back to the previous item position.
|
||||
|
|
@ -273,7 +273,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
f.frozen.Store(item)
|
||||
f.head.Store(item)
|
||||
return writeSize, nil
|
||||
}
|
||||
|
||||
|
|
@ -286,7 +286,7 @@ func (f *Freezer) TruncateHead(items uint64) (uint64, error) {
|
|||
f.writeLock.Lock()
|
||||
defer f.writeLock.Unlock()
|
||||
|
||||
oitems := f.frozen.Load()
|
||||
oitems := f.head.Load()
|
||||
if oitems <= items {
|
||||
return oitems, nil
|
||||
}
|
||||
|
|
@ -295,7 +295,7 @@ func (f *Freezer) TruncateHead(items uint64) (uint64, error) {
|
|||
return 0, err
|
||||
}
|
||||
}
|
||||
f.frozen.Store(items)
|
||||
f.head.Store(items)
|
||||
return oitems, nil
|
||||
}
|
||||
|
||||
|
|
@ -320,6 +320,11 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) {
|
|||
}
|
||||
}
|
||||
f.tail.Store(tail)
|
||||
|
||||
// Update the head if the requested tail exceeds the current head
|
||||
if f.head.Load() < tail {
|
||||
f.head.Store(tail)
|
||||
}
|
||||
return old, nil
|
||||
}
|
||||
|
||||
|
|
@ -379,7 +384,7 @@ func (f *Freezer) validate() error {
|
|||
prunedTail = &tmp
|
||||
}
|
||||
|
||||
f.frozen.Store(head)
|
||||
f.head.Store(head)
|
||||
f.tail.Store(*prunedTail)
|
||||
return nil
|
||||
}
|
||||
|
|
@ -414,7 +419,7 @@ func (f *Freezer) repair() error {
|
|||
}
|
||||
}
|
||||
|
||||
f.frozen.Store(head)
|
||||
f.head.Store(head)
|
||||
f.tail.Store(prunedTail)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ func (t *memoryTable) truncateTail(items uint64) error {
|
|||
return nil
|
||||
}
|
||||
if t.items < items {
|
||||
return errors.New("truncation above head")
|
||||
return t.reset(items)
|
||||
}
|
||||
for i := uint64(0); i < items-t.offset; i++ {
|
||||
if t.size > uint64(len(t.data[i])) {
|
||||
|
|
@ -127,6 +127,16 @@ func (t *memoryTable) truncateTail(items uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// reset clears the entire table and sets both the head and tail to the given
|
||||
// value. It assumes the caller holds the lock and that tail > t.items.
|
||||
func (t *memoryTable) reset(offset uint64) error {
|
||||
t.size = 0
|
||||
t.data = nil
|
||||
t.items = offset
|
||||
t.offset = offset
|
||||
return nil
|
||||
}
|
||||
|
||||
// commit merges the given item batch into table. It's presumed that the
|
||||
// batch is ordered and continuous with table.
|
||||
func (t *memoryTable) commit(batch [][]byte) error {
|
||||
|
|
@ -387,6 +397,9 @@ func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) {
|
|||
}
|
||||
}
|
||||
f.tail = tail
|
||||
if f.items < tail {
|
||||
f.items = tail
|
||||
}
|
||||
return old, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -707,12 +707,13 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
// Ensure the given truncate target falls in the correct range
|
||||
// Short-circuit if the requested tail deletion points to a stale position
|
||||
if t.itemHidden.Load() >= items {
|
||||
return nil
|
||||
}
|
||||
// If the requested tail exceeds the current head, reset the entire table
|
||||
if t.items.Load() < items {
|
||||
return errors.New("truncation above head")
|
||||
return t.resetTo(items)
|
||||
}
|
||||
// Load the new tail index by the given new tail position
|
||||
var (
|
||||
|
|
@ -822,10 +823,9 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
shorten := indexEntrySize * int64(newDeleted-deleted)
|
||||
if t.metadata.flushOffset <= shorten {
|
||||
return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten)
|
||||
} else {
|
||||
if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil {
|
||||
return err
|
||||
}
|
||||
// Retrieve the new size and update the total size counter
|
||||
newSize, err := t.sizeNolock()
|
||||
|
|
@ -836,6 +836,59 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// resetTo clears the entire table and sets both the head and tail to the given
|
||||
// value. It assumes the caller holds the lock and that tail > t.items.
|
||||
func (t *freezerTable) resetTo(tail uint64) error {
|
||||
// Sync the entire table before resetting, eliminating the potential
|
||||
// data corruption.
|
||||
err := t.doSync()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Update the index file to reflect the new offset
|
||||
if err := t.index.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
entry := &indexEntry{
|
||||
filenum: t.headId + 1,
|
||||
offset: uint32(tail),
|
||||
}
|
||||
if err := reset(t.index.Name(), entry.append(nil)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.metadata.setVirtualTail(tail, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.metadata.setFlushOffset(indexEntrySize, true); err != nil {
|
||||
return err
|
||||
}
|
||||
t.index, err = openFreezerFileForAppend(t.index.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Purge all the existing data file
|
||||
if err := t.head.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
t.headId = t.headId + 1
|
||||
t.tailId = t.headId
|
||||
t.headBytes = 0
|
||||
|
||||
t.head, err = t.openFile(t.headId, openFreezerFileTruncated)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.releaseFilesBefore(t.headId, true)
|
||||
|
||||
t.items.Store(tail)
|
||||
t.itemOffset.Store(tail)
|
||||
t.itemHidden.Store(tail)
|
||||
t.sizeGauge.Update(0)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes all opened files and finalizes the freezer table for use.
|
||||
// This operation must be completed before shutdown to prevent the loss of
|
||||
// recent writes.
|
||||
|
|
@ -1247,25 +1300,20 @@ func (t *freezerTable) doSync() error {
|
|||
if t.index == nil || t.head == nil || t.metadata.file == nil {
|
||||
return errClosed
|
||||
}
|
||||
var err error
|
||||
trackError := func(e error) {
|
||||
if e != nil && err == nil {
|
||||
err = e
|
||||
}
|
||||
if err := t.index.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.head.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
trackError(t.index.Sync())
|
||||
trackError(t.head.Sync())
|
||||
|
||||
// A crash may occur before the offset is updated, leaving the offset
|
||||
// points to a old position. If so, the extra items above the offset
|
||||
// points to an old position. If so, the extra items above the offset
|
||||
// will be truncated during the next run.
|
||||
stat, err := t.index.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
offset := stat.Size()
|
||||
trackError(t.metadata.setFlushOffset(offset, true))
|
||||
return err
|
||||
return t.metadata.setFlushOffset(stat.Size(), true)
|
||||
}
|
||||
|
||||
func (t *freezerTable) dumpIndexStdout(start, stop int64) {
|
||||
|
|
|
|||
|
|
@ -1139,6 +1139,7 @@ const (
|
|||
opTruncateHeadAll
|
||||
opTruncateTail
|
||||
opTruncateTailAll
|
||||
opTruncateTailOverHead
|
||||
opCheckAll
|
||||
opMax // boundary value, not an actual op
|
||||
)
|
||||
|
|
@ -1226,6 +1227,11 @@ func (randTest) Generate(r *rand.Rand, size int) reflect.Value {
|
|||
step.target = deleted + uint64(len(items))
|
||||
items = items[:0]
|
||||
deleted = step.target
|
||||
case opTruncateTailOverHead:
|
||||
newDeleted := deleted + uint64(len(items)) + 10
|
||||
step.target = newDeleted
|
||||
deleted = newDeleted
|
||||
items = items[:0]
|
||||
}
|
||||
steps = append(steps, step)
|
||||
}
|
||||
|
|
@ -1268,7 +1274,7 @@ func runRandTest(rt randTest) bool {
|
|||
for i := 0; i < len(step.items); i++ {
|
||||
batch.AppendRaw(step.items[i], step.blobs[i])
|
||||
}
|
||||
batch.commit()
|
||||
rt[i].err = batch.commit()
|
||||
values = append(values, step.blobs...)
|
||||
|
||||
case opRetrieve:
|
||||
|
|
@ -1290,24 +1296,28 @@ func runRandTest(rt randTest) bool {
|
|||
}
|
||||
|
||||
case opTruncateHead:
|
||||
f.truncateHead(step.target)
|
||||
rt[i].err = f.truncateHead(step.target)
|
||||
|
||||
length := f.items.Load() - f.itemHidden.Load()
|
||||
values = values[:length]
|
||||
|
||||
case opTruncateHeadAll:
|
||||
f.truncateHead(step.target)
|
||||
rt[i].err = f.truncateHead(step.target)
|
||||
values = nil
|
||||
|
||||
case opTruncateTail:
|
||||
prev := f.itemHidden.Load()
|
||||
f.truncateTail(step.target)
|
||||
rt[i].err = f.truncateTail(step.target)
|
||||
|
||||
truncated := f.itemHidden.Load() - prev
|
||||
values = values[truncated:]
|
||||
|
||||
case opTruncateTailAll:
|
||||
f.truncateTail(step.target)
|
||||
rt[i].err = f.truncateTail(step.target)
|
||||
values = nil
|
||||
|
||||
case opTruncateTailOverHead:
|
||||
rt[i].err = f.truncateTail(step.target)
|
||||
values = nil
|
||||
}
|
||||
// Abort the test on error.
|
||||
|
|
@ -1633,3 +1643,43 @@ func TestFreezerAncientBytes(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateOverHead(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fn := fmt.Sprintf("t-%d", rand.Uint64())
|
||||
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, freezerTableConfig{noSnappy: true}, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Tail truncation on an empty table
|
||||
if err := f.truncateTail(10); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
batch := f.newBatch()
|
||||
data := getChunk(10, 1)
|
||||
require.NoError(t, batch.AppendRaw(uint64(10), data))
|
||||
require.NoError(t, batch.commit())
|
||||
|
||||
got, err := f.RetrieveItems(uint64(10), 1, 0)
|
||||
require.NoError(t, err)
|
||||
if !bytes.Equal(got[0], data) {
|
||||
t.Fatalf("Unexpected bytes, want: %v, got: %v", data, got[0])
|
||||
}
|
||||
|
||||
// Tail truncation on the non-empty table
|
||||
if err := f.truncateTail(20); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
batch = f.newBatch()
|
||||
data = getChunk(10, 1)
|
||||
require.NoError(t, batch.AppendRaw(uint64(20), data))
|
||||
require.NoError(t, batch.commit())
|
||||
|
||||
got, err = f.RetrieveItems(uint64(20), 1, 0)
|
||||
require.NoError(t, err)
|
||||
if !bytes.Equal(got[0], data) {
|
||||
t.Fatalf("Unexpected bytes, want: %v, got: %v", data, got[0])
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,19 @@ import (
|
|||
"path/filepath"
|
||||
)
|
||||
|
||||
func atomicRename(src, dest string) error {
|
||||
if err := os.Rename(src, dest); err != nil {
|
||||
return err
|
||||
}
|
||||
dir, err := os.Open(filepath.Dir(src))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dir.Close()
|
||||
|
||||
return dir.Sync()
|
||||
}
|
||||
|
||||
// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'.
|
||||
// The 'destPath' is created if it doesn't exist, otherwise it is overwritten.
|
||||
// Before the copy is executed, there is a callback can be registered to
|
||||
|
|
@ -73,13 +86,48 @@ func copyFrom(srcPath, destPath string, offset uint64, before func(f *os.File) e
|
|||
return err
|
||||
}
|
||||
f = nil
|
||||
return os.Rename(fname, destPath)
|
||||
|
||||
return atomicRename(fname, destPath)
|
||||
}
|
||||
|
||||
// reset atomically replaces the file at the given path with the provided content.
|
||||
func reset(path string, content []byte) error {
|
||||
// Create a temp file in the same dir where we want it to wind up
|
||||
f, err := os.CreateTemp(filepath.Dir(path), "*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fname := f.Name()
|
||||
|
||||
// Clean up the leftover file
|
||||
defer func() {
|
||||
if f != nil {
|
||||
f.Close()
|
||||
}
|
||||
os.Remove(fname)
|
||||
}()
|
||||
|
||||
// Write the content into the temp file
|
||||
_, err = f.Write(content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Permanently persist the content into disk
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
f = nil
|
||||
|
||||
return atomicRename(fname, path)
|
||||
}
|
||||
|
||||
// openFreezerFileForAppend opens a freezer table file and seeks to the end
|
||||
func openFreezerFileForAppend(filename string) (*os.File, error) {
|
||||
// Open the file without the O_APPEND flag
|
||||
// because it has differing behaviour during Truncate operations
|
||||
// because it has differing behavior during Truncate operations
|
||||
// on different OS's
|
||||
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -412,28 +412,34 @@ func repairHistory(db ethdb.Database, isVerkle bool, readOnly bool, stateID uint
|
|||
// Truncate excessive history entries in either the state history or
|
||||
// the trienode history, ensuring both histories remain aligned with
|
||||
// the state.
|
||||
head, err := states.Ancients()
|
||||
shead, err := states.Ancients()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if stateID > head {
|
||||
return nil, nil, fmt.Errorf("gap between state [#%d] and state history [#%d]", stateID, head)
|
||||
if stateID > shead { // Gap is not permitted in the state history
|
||||
return nil, nil, fmt.Errorf("gap between state [#%d] and state history [#%d]", stateID, shead)
|
||||
}
|
||||
truncTo := min(shead, stateID)
|
||||
|
||||
if trienodes != nil {
|
||||
th, err := trienodes.Ancients()
|
||||
thead, err := trienodes.Ancients()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if stateID > th {
|
||||
return nil, nil, fmt.Errorf("gap between state [#%d] and trienode history [#%d]", stateID, th)
|
||||
}
|
||||
if th != head {
|
||||
log.Info("Histories are not aligned with each other", "state", head, "trienode", th)
|
||||
head = min(head, th)
|
||||
if stateID <= thead {
|
||||
truncTo = min(truncTo, thead)
|
||||
} else {
|
||||
if thead == 0 {
|
||||
_, err = trienodes.TruncateTail(stateID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Warn("Initialized trienode history")
|
||||
} else {
|
||||
return nil, nil, fmt.Errorf("gap between state [#%d] and trienode history [#%d]", stateID, thead)
|
||||
}
|
||||
}
|
||||
}
|
||||
head = min(head, stateID)
|
||||
|
||||
// Truncate the extra history elements above in freezer in case it's not
|
||||
// aligned with the state. It might happen after an unclean shutdown.
|
||||
truncate := func(store ethdb.AncientStore, typ historyType, nhead uint64) {
|
||||
|
|
@ -448,7 +454,7 @@ func repairHistory(db ethdb.Database, isVerkle bool, readOnly bool, stateID uint
|
|||
log.Warn("Truncated extra histories", "typ", typ, "number", pruned)
|
||||
}
|
||||
}
|
||||
truncate(states, typeStateHistory, head)
|
||||
truncate(trienodes, typeTrienodeHistory, head)
|
||||
truncate(states, typeStateHistory, truncTo)
|
||||
truncate(trienodes, typeTrienodeHistory, truncTo)
|
||||
return states, trienodes, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -349,7 +349,7 @@ func (db *Database) HistoricNodeReader(root common.Hash) (*HistoricalNodeReader,
|
|||
// are not accessible.
|
||||
meta, err := readTrienodeMetadata(db.trienodeFreezer, *id+1)
|
||||
if err != nil {
|
||||
return nil, err // e.g., the referred trienode history has been pruned
|
||||
return nil, fmt.Errorf("state %#x is not available", root) // e.g., the referred trienode history has been pruned
|
||||
}
|
||||
if meta.parent != root {
|
||||
return nil, fmt.Errorf("state %#x is not canonincal", root)
|
||||
|
|
|
|||
Loading…
Reference in a new issue