From 14d576c002309e38864f9afd95e7305e35a68035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Mon, 31 Mar 2025 14:47:56 +0200 Subject: [PATCH] core/filtermaps: hashdb safe delete range (#31525) This PR adds `rawdb.SafeDeleteRange` and uses it for range deletion in `core/filtermaps`. This includes deleting the old bloombits database, resetting the log index database and removing index data for unindexed tail epochs (which previously weren't properly implemented for the fallback case). `SafeDeleteRange` either calls `ethdb.DeleteRange` if the node uses the new path based state scheme or uses an iterator based fallback method that safely skips trie nodes in the range if the old hash based state scheme is used. Note that `ethdb.DeleteRange` also has its own iterator based fallback implementation in `ethdb/leveldb`. If a path based state scheme is used and the backing db is pebble (as it is on the majority of new nodes) then `rawdb.SafeDeleteRange` uses the fast native range delete. Also note that `rawdb.SafeDeleteRange` has different semantics from `ethdb.DeleteRange`, it does not automatically return if the operation takes a long time. Instead it receives a `stopCallback` that can interrupt the process if necessary. This is because in the safe mode potentially a lot of entries are iterated without being deleted (this is definitely the case when deleting the old bloombits database which has a single byte prefix) and therefore restarting the process every time a fixed number of entries have been iterated would result in a quadratic run time in the number of skipped entries. When running in safe mode, unindexing an epoch takes about a second, removing bloombits takes around 10s while resetting a full log index might take a few minutes. If a range delete operation takes a significant amount of time then log messages are printed. Also, any range delete operation can be interrupted by shutdown (tail uinindexing can also be interrupted by head indexing, similarly to how tail indexing works). If the last unindexed epoch might have "dirty" index data left then the indexed map range points to the first valid epoch and `cleanedEpochsBefore` points to the previous, potentially dirty one. At startup it is always assumed that the epoch before the first fully indexed one might be dirty. New tail maps are never rendered and also no further maps are unindexed before the previous unindexing is properly cleaned up. --------- Co-authored-by: Gary Rong Co-authored-by: Felix Lange --- core/filtermaps/filtermaps.go | 210 ++++++++++++++++++++------------ core/filtermaps/indexer.go | 32 ++--- core/rawdb/accessors_indexes.go | 38 +++--- core/rawdb/database.go | 73 +++++++++++ eth/backend.go | 7 +- ethdb/database.go | 9 +- ethdb/leveldb/leveldb.go | 4 +- 7 files changed, 256 insertions(+), 117 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index db7ab0a426..5722f17daa 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/leveldb" "github.com/ethereum/go-ethereum/log" ) @@ -59,6 +58,7 @@ type FilterMaps struct { closeCh chan struct{} closeWg sync.WaitGroup history uint64 + hashScheme bool // use hashdb-safe delete range method exportFileName string Params @@ -67,10 +67,11 @@ type FilterMaps struct { // fields written by the indexer and read by matcher backend. Indexer can // read them without a lock and write them under indexLock write lock. // Matcher backend can read them under indexLock read lock. - indexLock sync.RWMutex - indexedRange filterMapsRange - indexedView *ChainView // always consistent with the log index - hasTempRange bool + indexLock sync.RWMutex + indexedRange filterMapsRange + cleanedEpochsBefore uint32 // all unindexed data cleaned before this point + indexedView *ChainView // always consistent with the log index + hasTempRange bool // also accessed by indexer and matcher backend but no locking needed. filterMapCache *lru.Cache[uint32, filterMap] @@ -180,6 +181,10 @@ type Config struct { // This option enables the checkpoint JSON file generator. // If set, the given file will be updated with checkpoint information. ExportFileName string + + // expect trie nodes of hash based state scheme in the filtermaps key range; + // use safe iterator based implementation of DeleteRange that skips them + HashScheme bool } // NewFilterMaps creates a new FilterMaps and starts the indexer. @@ -197,6 +202,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f blockProcessingCh: make(chan bool, 1), history: config.History, disabled: config.Disabled, + hashScheme: config.HashScheme, disabledCh: make(chan struct{}), exportFileName: config.ExportFileName, Params: params, @@ -208,15 +214,17 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f maps: common.NewRange(rs.MapsFirst, rs.MapsAfterLast-rs.MapsFirst), tailPartialEpoch: rs.TailPartialEpoch, }, - historyCutoff: historyCutoff, - finalBlock: finalBlock, - matcherSyncCh: make(chan *FilterMapsMatcherBackend), - matchers: make(map[*FilterMapsMatcherBackend]struct{}), - filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps), - lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks), - lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers), - baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows), - renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots), + // deleting last unindexed epoch might have been interrupted by shutdown + cleanedEpochsBefore: max(rs.MapsFirst>>params.logMapsPerEpoch, 1) - 1, + historyCutoff: historyCutoff, + finalBlock: finalBlock, + matcherSyncCh: make(chan *FilterMapsMatcherBackend), + matchers: make(map[*FilterMapsMatcherBackend]struct{}), + filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps), + lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks), + lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers), + baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows), + renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots), } // Set initial indexer target. @@ -301,14 +309,24 @@ func (f *FilterMaps) reset() { // deleting the range first ensures that resetDb will be called again at next // startup and any leftover data will be removed even if it cannot finish now. rawdb.DeleteFilterMapsRange(f.db) - f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") + f.safeDeleteWithLogs(rawdb.DeleteFilterMapsDb, "Resetting log index database", f.isShuttingDown) +} + +// isShuttingDown returns true if FilterMaps is shutting down. +func (f *FilterMaps) isShuttingDown() bool { + select { + case <-f.closeCh: + return true + default: + return false + } } // init initializes an empty log index according to the current targetView. func (f *FilterMaps) init() error { // ensure that there is no remaining data in the filter maps key range - if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") { - return errors.New("could not reset log index database") + if err := f.safeDeleteWithLogs(rawdb.DeleteFilterMapsDb, "Resetting log index database", f.isShuttingDown); err != nil { + return err } f.indexLock.Lock() @@ -358,38 +376,37 @@ func (f *FilterMaps) init() error { // removeBloomBits removes old bloom bits data from the database. func (f *FilterMaps) removeBloomBits() { - f.safeDeleteRange(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database") + f.safeDeleteWithLogs(rawdb.DeleteBloomBitsDb, "Removing old bloom bits database", f.isShuttingDown) f.closeWg.Done() } -// safeDeleteRange calls the specified database range deleter function -// repeatedly as long as it returns leveldb.ErrTooManyKeys. -// This wrapper is necessary because of the leveldb fallback implementation -// of DeleteRange. -func (f *FilterMaps) safeDeleteRange(removeFn func(ethdb.KeyValueRangeDeleter) error, action string) bool { - start := time.Now() - var retry bool - for { - err := removeFn(f.db) - if err == nil { - if retry { - log.Info(action+" finished", "elapsed", time.Since(start)) - } - return true +// safeDeleteWithLogs is a wrapper for a function that performs a safe range +// delete operation using rawdb.SafeDeleteRange. It emits log messages if the +// process takes long enough to call the stop callback. +func (f *FilterMaps) safeDeleteWithLogs(deleteFn func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error, action string, stopCb func() bool) error { + var ( + start = time.Now() + logPrinted bool + lastLogPrinted = start + ) + switch err := deleteFn(f.db, f.hashScheme, func(deleted bool) bool { + if deleted && !logPrinted || time.Since(lastLogPrinted) > time.Second*10 { + log.Info(action+" in progress...", "elapsed", common.PrettyDuration(time.Since(start))) + logPrinted, lastLogPrinted = true, time.Now() } - if err != leveldb.ErrTooManyKeys { - log.Error(action+" failed", "error", err) - return false - } - select { - case <-f.closeCh: - return false - default: - } - if !retry { - log.Info(action+" in progress...", "elapsed", time.Since(start)) - retry = true + return stopCb() + }); { + case err == nil: + if logPrinted { + log.Info(action+" finished", "elapsed", common.PrettyDuration(time.Since(start))) } + return nil + case errors.Is(err, rawdb.ErrDeleteRangeInterrupted): + log.Warn(action+" interrupted", "elapsed", common.PrettyDuration(time.Since(start))) + return err + default: + log.Error(action+" failed", "error", err) + return err } } @@ -658,54 +675,97 @@ func (f *FilterMaps) deleteLastBlockOfMap(batch ethdb.Batch, mapIndex uint32) { rawdb.DeleteFilterMapLastBlock(batch, mapIndex) } -// deleteTailEpoch deletes index data from the earliest, either fully or partially -// indexed epoch. The last block pointer for the last map of the epoch and the -// corresponding block log value pointer are retained as these are always assumed -// to be available for each epoch. -func (f *FilterMaps) deleteTailEpoch(epoch uint32) error { +// deleteTailEpoch deletes index data from the specified epoch. The last block +// pointer for the last map of the epoch and the corresponding block log value +// pointer are retained as these are always assumed to be available for each +// epoch as boundary markers. +// The function returns true if all index data related to the epoch (except for +// the boundary markers) has been fully removed. +func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) { f.indexLock.Lock() defer f.indexLock.Unlock() + // determine epoch boundaries firstMap := epoch << f.logMapsPerEpoch lastBlock, _, err := f.getLastBlockOfMap(firstMap + f.mapsPerEpoch - 1) if err != nil { - return fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err) + return false, fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err) } var firstBlock uint64 if epoch > 0 { firstBlock, _, err = f.getLastBlockOfMap(firstMap - 1) if err != nil { - return fmt.Errorf("failed to retrieve last block before deleted epoch %d: %v", epoch, err) + return false, fmt.Errorf("failed to retrieve last block before deleted epoch %d: %v", epoch, err) } firstBlock++ } - fmr := f.indexedRange - if f.indexedRange.maps.First() == firstMap && - f.indexedRange.maps.AfterLast() > firstMap+f.mapsPerEpoch && - f.indexedRange.tailPartialEpoch == 0 { - fmr.maps.SetFirst(firstMap + f.mapsPerEpoch) - fmr.blocks.SetFirst(lastBlock + 1) - } else if f.indexedRange.maps.First() == firstMap+f.mapsPerEpoch { + // update rendered range if necessary + var ( + fmr = f.indexedRange + firstEpoch = f.indexedRange.maps.First() >> f.logMapsPerEpoch + afterLastEpoch = (f.indexedRange.maps.AfterLast() + f.mapsPerEpoch - 1) >> f.logMapsPerEpoch + ) + if f.indexedRange.tailPartialEpoch != 0 && firstEpoch > 0 { + firstEpoch-- + } + switch { + case epoch < firstEpoch: + // cleanup of already unindexed epoch; range not affected + case epoch == firstEpoch && epoch+1 < afterLastEpoch: + // first fully or partially rendered epoch and there is at least one + // rendered map in the next epoch; remove from indexed range fmr.tailPartialEpoch = 0 + fmr.maps.SetFirst((epoch + 1) << f.logMapsPerEpoch) + fmr.blocks.SetFirst(lastBlock + 1) + f.setRange(f.db, f.indexedView, fmr, false) + default: + // cannot be cleaned or unindexed; return with error + return false, errors.New("invalid tail epoch number") + } + // remove index data + if err := f.safeDeleteWithLogs(func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error { + first := f.mapRowIndex(firstMap, 0) + count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first + if err := rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count), hashScheme, stopCb); err != nil { + return err + } + for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch; mapIndex++ { + f.filterMapCache.Remove(mapIndex) + } + delMapRange := common.NewRange(firstMap, f.mapsPerEpoch-1) // keep last entry + if err := rawdb.DeleteFilterMapLastBlocks(f.db, delMapRange, hashScheme, stopCb); err != nil { + return err + } + for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch-1; mapIndex++ { + f.lastBlockCache.Remove(mapIndex) + } + delBlockRange := common.NewRange(firstBlock, lastBlock-firstBlock) // keep last entry + if err := rawdb.DeleteBlockLvPointers(f.db, delBlockRange, hashScheme, stopCb); err != nil { + return err + } + for blockNumber := firstBlock; blockNumber < lastBlock; blockNumber++ { + f.lvPointerCache.Remove(blockNumber) + } + return nil + }, fmt.Sprintf("Deleting tail epoch #%d", epoch), func() bool { + f.processEvents() + return f.stop || !f.targetHeadIndexed() + }); err == nil { + // everything removed; mark as cleaned and report success + if f.cleanedEpochsBefore == epoch { + f.cleanedEpochsBefore = epoch + 1 + } + return true, nil } else { - return errors.New("invalid tail epoch number") + // more data left in epoch range; mark as dirty and report unfinished + if f.cleanedEpochsBefore > epoch { + f.cleanedEpochsBefore = epoch + } + if errors.Is(err, rawdb.ErrDeleteRangeInterrupted) { + return false, nil + } + return false, err } - f.setRange(f.db, f.indexedView, fmr, false) - first := f.mapRowIndex(firstMap, 0) - count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first - rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count)) - for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch; mapIndex++ { - f.filterMapCache.Remove(mapIndex) - } - rawdb.DeleteFilterMapLastBlocks(f.db, common.NewRange(firstMap, f.mapsPerEpoch-1)) // keep last enrty - for mapIndex := firstMap; mapIndex < firstMap+f.mapsPerEpoch-1; mapIndex++ { - f.lastBlockCache.Remove(mapIndex) - } - rawdb.DeleteBlockLvPointers(f.db, common.NewRange(firstBlock, lastBlock-firstBlock)) // keep last enrty - for blockNumber := firstBlock; blockNumber < lastBlock; blockNumber++ { - f.lvPointerCache.Remove(blockNumber) - } - return nil } // exportCheckpoints exports epoch checkpoints in the format used by checkpoints.go. diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 6732dc85ea..9a5424da4a 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -67,14 +67,17 @@ func (f *FilterMaps) indexerLoop() { } f.lastFinal = f.finalBlock } - if done, err := f.tryIndexTail(); err != nil { - f.disableForError("tail rendering", err) + // always attempt unindexing before indexing the tail in order to + // ensure that a potentially dirty previously unindexed epoch is + // always cleaned up before any new maps are rendered. + if done, err := f.tryUnindexTail(); err != nil { + f.disableForError("tail unindexing", err) return } else if !done { continue } - if done, err := f.tryUnindexTail(); err != nil { - f.disableForError("tail unindexing", err) + if done, err := f.tryIndexTail(); err != nil { + f.disableForError("tail rendering", err) return } else if !done { continue @@ -349,25 +352,24 @@ func (f *FilterMaps) tryIndexTail() (bool, error) { // Note that unindexing is very quick as it only removes continuous ranges of // data from the database and is also called while running head indexing. func (f *FilterMaps) tryUnindexTail() (bool, error) { - for { - firstEpoch := (f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch) >> f.logMapsPerEpoch - if f.needTailEpoch(firstEpoch) { - break - } - f.processEvents() - if f.stop { - return false, nil - } + firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch + if f.indexedRange.tailPartialEpoch > 0 && firstEpoch > 0 { + firstEpoch-- + } + for epoch := min(firstEpoch, f.cleanedEpochsBefore); !f.needTailEpoch(epoch); epoch++ { if !f.startedTailUnindex { f.startedTailUnindexAt = time.Now() f.startedTailUnindex = true f.ptrTailUnindexMap = f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch f.ptrTailUnindexBlock = f.indexedRange.blocks.First() - f.tailPartialBlocks() } - if err := f.deleteTailEpoch(firstEpoch); err != nil { - log.Error("Log index tail epoch unindexing failed", "error", err) + if done, err := f.deleteTailEpoch(epoch); !done { return false, err } + f.processEvents() + if f.stop || !f.targetHeadIndexed() { + return false, nil + } } if f.startedTailUnindex && f.indexedRange.hasIndexedBlocks() { log.Info("Log index tail unindexing finished", diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 297e339c83..c413839b7b 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -354,10 +354,8 @@ func WriteFilterMapBaseRows(db ethdb.KeyValueWriter, mapRowIndex uint64, rows [] } } -func DeleteFilterMapRows(db ethdb.KeyValueRangeDeleter, mapRows common.Range[uint64]) { - if err := db.DeleteRange(filterMapRowKey(mapRows.First(), false), filterMapRowKey(mapRows.AfterLast(), false)); err != nil { - log.Crit("Failed to delete range of filter map rows", "err", err) - } +func DeleteFilterMapRows(db ethdb.KeyValueStore, mapRows common.Range[uint64], hashScheme bool, stopCallback func(bool) bool) error { + return SafeDeleteRange(db, filterMapRowKey(mapRows.First(), false), filterMapRowKey(mapRows.AfterLast(), false), hashScheme, stopCallback) } // ReadFilterMapLastBlock retrieves the number of the block that generated the @@ -368,7 +366,7 @@ func ReadFilterMapLastBlock(db ethdb.KeyValueReader, mapIndex uint32) (uint64, c return 0, common.Hash{}, err } if len(enc) != 40 { - return 0, common.Hash{}, errors.New("Invalid block number and id encoding") + return 0, common.Hash{}, errors.New("invalid block number and id encoding") } var id common.Hash copy(id[:], enc[8:]) @@ -394,10 +392,8 @@ func DeleteFilterMapLastBlock(db ethdb.KeyValueWriter, mapIndex uint32) { } } -func DeleteFilterMapLastBlocks(db ethdb.KeyValueRangeDeleter, maps common.Range[uint32]) { - if err := db.DeleteRange(filterMapLastBlockKey(maps.First()), filterMapLastBlockKey(maps.AfterLast())); err != nil { - log.Crit("Failed to delete range of filter map last block pointers", "err", err) - } +func DeleteFilterMapLastBlocks(db ethdb.KeyValueStore, maps common.Range[uint32], hashScheme bool, stopCallback func(bool) bool) error { + return SafeDeleteRange(db, filterMapLastBlockKey(maps.First()), filterMapLastBlockKey(maps.AfterLast()), hashScheme, stopCallback) } // ReadBlockLvPointer retrieves the starting log value index where the log values @@ -408,7 +404,7 @@ func ReadBlockLvPointer(db ethdb.KeyValueReader, blockNumber uint64) (uint64, er return 0, err } if len(encPtr) != 8 { - return 0, errors.New("Invalid log value pointer encoding") + return 0, errors.New("invalid log value pointer encoding") } return binary.BigEndian.Uint64(encPtr), nil } @@ -431,10 +427,8 @@ func DeleteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber uint64) { } } -func DeleteBlockLvPointers(db ethdb.KeyValueRangeDeleter, blocks common.Range[uint64]) { - if err := db.DeleteRange(filterMapBlockLVKey(blocks.First()), filterMapBlockLVKey(blocks.AfterLast())); err != nil { - log.Crit("Failed to delete range of block log value pointers", "err", err) - } +func DeleteBlockLvPointers(db ethdb.KeyValueStore, blocks common.Range[uint64], hashScheme bool, stopCallback func(bool) bool) error { + return SafeDeleteRange(db, filterMapBlockLVKey(blocks.First()), filterMapBlockLVKey(blocks.AfterLast()), hashScheme, stopCallback) } // FilterMapsRange is a storage representation of the block range covered by the @@ -485,22 +479,22 @@ func DeleteFilterMapsRange(db ethdb.KeyValueWriter) { } // deletePrefixRange deletes everything with the given prefix from the database. -func deletePrefixRange(db ethdb.KeyValueRangeDeleter, prefix []byte) error { +func deletePrefixRange(db ethdb.KeyValueStore, prefix []byte, hashScheme bool, stopCallback func(bool) bool) error { end := bytes.Clone(prefix) end[len(end)-1]++ - return db.DeleteRange(prefix, end) + return SafeDeleteRange(db, prefix, end, hashScheme, stopCallback) } // DeleteFilterMapsDb removes the entire filter maps database -func DeleteFilterMapsDb(db ethdb.KeyValueRangeDeleter) error { - return deletePrefixRange(db, []byte(filterMapsPrefix)) +func DeleteFilterMapsDb(db ethdb.KeyValueStore, hashScheme bool, stopCallback func(bool) bool) error { + return deletePrefixRange(db, []byte(filterMapsPrefix), hashScheme, stopCallback) } -// DeleteFilterMapsDb removes the old bloombits database and the associated +// DeleteBloomBitsDb removes the old bloombits database and the associated // chain indexer database. -func DeleteBloomBitsDb(db ethdb.KeyValueRangeDeleter) error { - if err := deletePrefixRange(db, bloomBitsPrefix); err != nil { +func DeleteBloomBitsDb(db ethdb.KeyValueStore, hashScheme bool, stopCallback func(bool) bool) error { + if err := deletePrefixRange(db, bloomBitsPrefix, hashScheme, stopCallback); err != nil { return err } - return deletePrefixRange(db, bloomBitsMetaPrefix) + return deletePrefixRange(db, bloomBitsMetaPrefix, hashScheme, stopCallback) } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 7fca822155..2a50e3f9ee 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -28,12 +28,15 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" "github.com/olekukonko/tablewriter" ) +var ErrDeleteRangeInterrupted = errors.New("safe delete range operation interrupted") + // freezerdb is a database wrapper that enables ancient chain segment freezing. type freezerdb struct { ethdb.KeyValueStore @@ -607,3 +610,73 @@ func ReadChainMetadata(db ethdb.KeyValueStore) [][]string { } return data } + +// SafeDeleteRange deletes all of the keys (and values) in the range +// [start,end) (inclusive on start, exclusive on end). +// If hashScheme is true then it always uses an iterator and skips hashdb trie +// node entries. If it is false and the backing db is pebble db then it uses +// the fast native range delete. +// In case of fallback mode (hashdb or leveldb) the range deletion might be +// very slow depending on the number of entries. In this case stopCallback +// is periodically called and if it returns an error then SafeDeleteRange +// stops and also returns that error. The callback is not called if native +// range delete is used or there are a small number of keys only. The bool +// argument passed to the callback is true if enrties have actually been +// deleted already. +func SafeDeleteRange(db ethdb.KeyValueStore, start, end []byte, hashScheme bool, stopCallback func(bool) bool) error { + if !hashScheme { + // delete entire range; use fast native range delete on pebble db + for { + switch err := db.DeleteRange(start, end); { + case err == nil: + return nil + case errors.Is(err, ethdb.ErrTooManyKeys): + if stopCallback(true) { + return ErrDeleteRangeInterrupted + } + default: + return err + } + } + } + + var ( + count, deleted, skipped int + buff = crypto.NewKeccakState() + startTime = time.Now() + ) + + batch := db.NewBatch() + it := db.NewIterator(nil, start) + defer func() { + it.Release() // it might be replaced during the process + log.Debug("SafeDeleteRange finished", "deleted", deleted, "skipped", skipped, "elapsed", common.PrettyDuration(time.Since(startTime))) + }() + + for it.Next() && bytes.Compare(end, it.Key()) > 0 { + // Prevent deletion for trie nodes in hash mode + if len(it.Key()) != 32 || crypto.HashData(buff, it.Value()) != common.BytesToHash(it.Key()) { + if err := batch.Delete(it.Key()); err != nil { + return err + } + deleted++ + } else { + skipped++ + } + count++ + if count > 10000 { // should not block for more than a second + if err := batch.Write(); err != nil { + return err + } + if stopCallback(deleted != 0) { + return ErrDeleteRangeInterrupted + } + start = append(bytes.Clone(it.Key()), 0) // appending a zero gives us the next possible key + it.Release() + batch = db.NewBatch() + it = db.NewIterator(nil, start) + count = 0 + } + } + return batch.Write() +} diff --git a/eth/backend.go b/eth/backend.go index 909d153a2b..ab612b1de7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -239,7 +239,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - fmConfig := filtermaps.Config{History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints} + fmConfig := filtermaps.Config{ + History: config.LogHistory, + Disabled: config.LogNoHistory, + ExportFileName: config.LogExportCheckpoints, + HashScheme: scheme == rawdb.HashScheme, + } chainView := eth.newChainView(eth.blockchain.CurrentBlock()) historyCutoff := eth.blockchain.HistoryPruningCutoff() var finalBlock uint64 diff --git a/ethdb/database.go b/ethdb/database.go index b1577512f3..f2d458b85f 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -17,7 +17,10 @@ // Package ethdb defines the interfaces for an Ethereum data store. package ethdb -import "io" +import ( + "errors" + "io" +) // KeyValueReader wraps the Has and Get method of a backing data store. type KeyValueReader interface { @@ -37,10 +40,14 @@ type KeyValueWriter interface { Delete(key []byte) error } +var ErrTooManyKeys = errors.New("too many keys in deleted range") + // KeyValueRangeDeleter wraps the DeleteRange method of a backing data store. type KeyValueRangeDeleter interface { // DeleteRange deletes all of the keys (and values) in the range [start,end) // (inclusive on start, exclusive on end). + // Some implementations of DeleteRange may return ErrTooManyKeys after + // partially deleting entries in the given range. DeleteRange(start, end []byte) error } diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 7f47523b82..ef02e91822 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -207,8 +207,6 @@ func (db *Database) Delete(key []byte) error { return db.db.Delete(key, nil) } -var ErrTooManyKeys = errors.New("too many keys in deleted range") - // DeleteRange deletes all of the keys (and values) in the range [start,end) // (inclusive on start, exclusive on end). // Note that this is a fallback implementation as leveldb does not natively @@ -228,7 +226,7 @@ func (db *Database) DeleteRange(start, end []byte) error { if err := batch.Write(); err != nil { return err } - return ErrTooManyKeys + return ethdb.ErrTooManyKeys } if err := batch.Delete(it.Key()); err != nil { return err