mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-22 07:49:26 +00:00
p2p/enode: add Pebble database support
This commit is contained in:
parent
ebc7dc9e37
commit
07c7dcb44b
3 changed files with 144 additions and 84 deletions
|
|
@ -149,3 +149,24 @@ func TrimRightZeroes(s []byte) []byte {
|
||||||
}
|
}
|
||||||
return s[:idx]
|
return s[:idx]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpperBound returns the upper bound for iteration over keys with the given prefix.
|
||||||
|
// It returns the next key in lexicographic order that is greater than all keys with
|
||||||
|
// the given prefix. This is useful for setting iteration bounds in databases.
|
||||||
|
// Returns nil if no such upper bound exists (e.g., if prefix is empty or all 0xff bytes).
|
||||||
|
func UpperBound(prefix []byte) []byte {
|
||||||
|
if len(prefix) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var limit []byte
|
||||||
|
for i := len(prefix) - 1; i >= 0; i-- {
|
||||||
|
c := prefix[i]
|
||||||
|
if c < 0xff {
|
||||||
|
limit = make([]byte, i+1)
|
||||||
|
copy(limit, prefix)
|
||||||
|
limit[i] = c + 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return limit
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -450,21 +450,6 @@ func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// upperBound returns the upper bound for the given prefix
|
|
||||||
func upperBound(prefix []byte) (limit []byte) {
|
|
||||||
for i := len(prefix) - 1; i >= 0; i-- {
|
|
||||||
c := prefix[i]
|
|
||||||
if c == 0xff {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
limit = make([]byte, i+1)
|
|
||||||
copy(limit, prefix)
|
|
||||||
limit[i] = c + 1
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return limit
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat returns the internal metrics of Pebble in a text format. It's a developer
|
// Stat returns the internal metrics of Pebble in a text format. It's a developer
|
||||||
// method to read everything there is to read, independent of Pebble version.
|
// method to read everything there is to read, independent of Pebble version.
|
||||||
func (d *Database) Stat() (string, error) {
|
func (d *Database) Stat() (string, error) {
|
||||||
|
|
@ -731,7 +716,7 @@ type pebbleIterator struct {
|
||||||
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
||||||
iter, _ := d.db.NewIter(&pebble.IterOptions{
|
iter, _ := d.db.NewIter(&pebble.IterOptions{
|
||||||
LowerBound: append(prefix, start...),
|
LowerBound: append(prefix, start...),
|
||||||
UpperBound: upperBound(prefix),
|
UpperBound: common.UpperBound(prefix),
|
||||||
})
|
})
|
||||||
iter.First()
|
iter.First()
|
||||||
return &pebbleIterator{iter: iter, moved: true, released: false}
|
return &pebbleIterator{iter: iter, moved: true, released: false}
|
||||||
|
|
|
||||||
|
|
@ -20,20 +20,20 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
|
"github.com/cockroachdb/pebble/vfs"
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
||||||
"github.com/syndtr/goleveldb/leveldb/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Keys in the node database.
|
// Keys in the node database.
|
||||||
|
|
@ -59,7 +59,7 @@ const (
|
||||||
const (
|
const (
|
||||||
dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
|
dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
|
||||||
dbCleanupCycle = time.Hour // Time period for running the expiration task.
|
dbCleanupCycle = time.Hour // Time period for running the expiration task.
|
||||||
dbVersion = 9
|
dbVersion = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -71,7 +71,7 @@ var zeroIP = netip.IPv6Unspecified()
|
||||||
// DB is the node database, storing previously seen nodes and any collected metadata about
|
// DB is the node database, storing previously seen nodes and any collected metadata about
|
||||||
// them for QoS purposes.
|
// them for QoS purposes.
|
||||||
type DB struct {
|
type DB struct {
|
||||||
lvl *leveldb.DB // Interface to the database itself
|
db *pebble.DB // The pebble database
|
||||||
runner sync.Once // Ensures we can start at most one expirer
|
runner sync.Once // Ensures we can start at most one expirer
|
||||||
quit chan struct{} // Channel to signal the expiring thread to stop
|
quit chan struct{} // Channel to signal the expiring thread to stop
|
||||||
}
|
}
|
||||||
|
|
@ -87,49 +87,81 @@ func OpenDB(path string) (*DB, error) {
|
||||||
|
|
||||||
// newMemoryDB creates a new in-memory node database without a persistent backend.
|
// newMemoryDB creates a new in-memory node database without a persistent backend.
|
||||||
func newMemoryDB() (*DB, error) {
|
func newMemoryDB() (*DB, error) {
|
||||||
db, err := leveldb.Open(storage.NewMemStorage(), nil)
|
db, err := pebble.Open("", &pebble.Options{
|
||||||
|
FS: vfs.NewMem(),
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &DB{lvl: db, quit: make(chan struct{})}, nil
|
return &DB{db: db, quit: make(chan struct{})}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newPersistentDB creates/opens a leveldb backed persistent node database,
|
// newPersistentDB creates/opens a pebble-backed persistent node database.
|
||||||
// also flushing its contents in case of a version mismatch.
|
// If an old leveldb database is detected, it will be removed and replaced with pebble.
|
||||||
func newPersistentDB(path string) (*DB, error) {
|
func newPersistentDB(path string) (*DB, error) {
|
||||||
opts := &opt.Options{OpenFilesCacheCapacity: 5}
|
// Check if there's a pre-existing leveldb database
|
||||||
db, err := leveldb.OpenFile(path, opts)
|
if hasLevelDB(path) {
|
||||||
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
|
log.Info("Detected old leveldb node database, migrating to pebble", "path", path)
|
||||||
db, err = leveldb.RecoverFile(path, nil)
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to remove old leveldb database: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
opts := &pebble.Options{MaxOpenFiles: 5}
|
||||||
|
db, err := pebble.Open(path, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
// If the database is corrupted, remove it and try again.
|
||||||
|
if pebble.IsCorruptionError(err) {
|
||||||
|
log.Warn("Corrupted node database detected, removing and recreating", "path", path, "err", err)
|
||||||
|
if removeErr := os.RemoveAll(path); removeErr != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to remove corrupted database: %w", removeErr)
|
||||||
|
}
|
||||||
|
db, err = pebble.Open(path, opts)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// The nodes contained in the cache correspond to a certain protocol version.
|
// The nodes contained in the cache correspond to a certain protocol version.
|
||||||
// Flush all nodes if the version doesn't match.
|
// Flush all nodes if the version doesn't match.
|
||||||
currentVer := make([]byte, binary.MaxVarintLen64)
|
currentVer := encodeVarint(int64(dbVersion))
|
||||||
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
|
blob, closer, err := db.Get([]byte(dbVersionKey))
|
||||||
|
if err == pebble.ErrNotFound {
|
||||||
blob, err := db.Get([]byte(dbVersionKey), nil)
|
// New database, write version
|
||||||
switch err {
|
if err := db.Set([]byte(dbVersionKey), currentVer, pebble.Sync); err != nil {
|
||||||
case leveldb.ErrNotFound:
|
|
||||||
// Version not found (i.e. empty cache), insert it
|
|
||||||
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
|
|
||||||
db.Close()
|
db.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
} else if err != nil {
|
||||||
case nil:
|
db.Close()
|
||||||
// Version present, flush if different
|
return nil, err
|
||||||
if !bytes.Equal(blob, currentVer) {
|
} else {
|
||||||
|
// Check version match
|
||||||
|
match := bytes.Equal(blob, currentVer)
|
||||||
|
closer.Close()
|
||||||
|
if !match {
|
||||||
|
// Version mismatch, reset database
|
||||||
db.Close()
|
db.Close()
|
||||||
if err = os.RemoveAll(path); err != nil {
|
if err := os.RemoveAll(path); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return newPersistentDB(path)
|
return newPersistentDB(path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &DB{lvl: db, quit: make(chan struct{})}, nil
|
return &DB{db: db, quit: make(chan struct{})}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasLevelDB checks if the path contains a leveldb database.
|
||||||
|
func hasLevelDB(path string) bool {
|
||||||
|
// Check for CURRENT file
|
||||||
|
if _, err := os.Stat(filepath.Join(path, "CURRENT")); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Check for absence of OPTIONS* files (which indicates pebble)
|
||||||
|
matches, err := filepath.Glob(filepath.Join(path, "OPTIONS*"))
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return len(matches) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeKey returns the database key for a node record.
|
// nodeKey returns the database key for a node record.
|
||||||
|
|
@ -194,12 +226,27 @@ func localItemKey(id ID, field string) []byte {
|
||||||
return key
|
return key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// encodeVarint encodes an int64 as a varint.
|
||||||
|
func encodeVarint(n int64) []byte {
|
||||||
|
buf := make([]byte, binary.MaxVarintLen64)
|
||||||
|
buf = buf[:binary.PutVarint(buf, n)]
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// encodeUvarint encodes a uint64 as a varint.
|
||||||
|
func encodeUvarint(n uint64) []byte {
|
||||||
|
buf := make([]byte, binary.MaxVarintLen64)
|
||||||
|
buf = buf[:binary.PutUvarint(buf, n)]
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
// fetchInt64 retrieves an integer associated with a particular key.
|
// fetchInt64 retrieves an integer associated with a particular key.
|
||||||
func (db *DB) fetchInt64(key []byte) int64 {
|
func (db *DB) fetchInt64(key []byte) int64 {
|
||||||
blob, err := db.lvl.Get(key, nil)
|
blob, closer, err := db.db.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
defer closer.Close()
|
||||||
val, read := binary.Varint(blob)
|
val, read := binary.Varint(blob)
|
||||||
if read <= 0 {
|
if read <= 0 {
|
||||||
return 0
|
return 0
|
||||||
|
|
@ -209,34 +256,32 @@ func (db *DB) fetchInt64(key []byte) int64 {
|
||||||
|
|
||||||
// storeInt64 stores an integer in the given key.
|
// storeInt64 stores an integer in the given key.
|
||||||
func (db *DB) storeInt64(key []byte, n int64) error {
|
func (db *DB) storeInt64(key []byte, n int64) error {
|
||||||
blob := make([]byte, binary.MaxVarintLen64)
|
return db.db.Set(key, encodeVarint(n), pebble.Sync)
|
||||||
blob = blob[:binary.PutVarint(blob, n)]
|
|
||||||
return db.lvl.Put(key, blob, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchUint64 retrieves an integer associated with a particular key.
|
// fetchUint64 retrieves an integer associated with a particular key.
|
||||||
func (db *DB) fetchUint64(key []byte) uint64 {
|
func (db *DB) fetchUint64(key []byte) uint64 {
|
||||||
blob, err := db.lvl.Get(key, nil)
|
blob, closer, err := db.db.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
defer closer.Close()
|
||||||
val, _ := binary.Uvarint(blob)
|
val, _ := binary.Uvarint(blob)
|
||||||
return val
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
// storeUint64 stores an integer in the given key.
|
// storeUint64 stores an integer in the given key.
|
||||||
func (db *DB) storeUint64(key []byte, n uint64) error {
|
func (db *DB) storeUint64(key []byte, n uint64) error {
|
||||||
blob := make([]byte, binary.MaxVarintLen64)
|
return db.db.Set(key, encodeUvarint(n), pebble.Sync)
|
||||||
blob = blob[:binary.PutUvarint(blob, n)]
|
|
||||||
return db.lvl.Put(key, blob, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node retrieves a node with a given id from the database.
|
// Node retrieves a node with a given id from the database.
|
||||||
func (db *DB) Node(id ID) *Node {
|
func (db *DB) Node(id ID) *Node {
|
||||||
blob, err := db.lvl.Get(nodeKey(id), nil)
|
blob, closer, err := db.db.Get(nodeKey(id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
defer closer.Close()
|
||||||
return mustDecodeNode(id[:], blob)
|
return mustDecodeNode(id[:], blob)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -260,7 +305,7 @@ func (db *DB) UpdateNode(node *Node) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
|
if err := db.db.Set(nodeKey(node.ID()), blob, pebble.Sync); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
|
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
|
||||||
|
|
@ -282,14 +327,17 @@ func (db *DB) Resolve(n *Node) *Node {
|
||||||
|
|
||||||
// DeleteNode deletes all information associated with a node.
|
// DeleteNode deletes all information associated with a node.
|
||||||
func (db *DB) DeleteNode(id ID) {
|
func (db *DB) DeleteNode(id ID) {
|
||||||
deleteRange(db.lvl, nodeKey(id))
|
deleteRange(db.db, nodeKey(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteRange(db *leveldb.DB, prefix []byte) {
|
func deleteRange(db *pebble.DB, prefix []byte) {
|
||||||
it := db.NewIterator(util.BytesPrefix(prefix), nil)
|
iter, _ := db.NewIter(&pebble.IterOptions{
|
||||||
defer it.Release()
|
LowerBound: prefix,
|
||||||
for it.Next() {
|
UpperBound: common.UpperBound(prefix),
|
||||||
db.Delete(it.Key(), nil)
|
})
|
||||||
|
defer iter.Close()
|
||||||
|
for iter.First(); iter.Valid(); iter.Next() {
|
||||||
|
db.Delete(iter.Key(), pebble.Sync)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -324,9 +372,13 @@ func (db *DB) expirer() {
|
||||||
// expireNodes iterates over the database and deletes all nodes that have not
|
// expireNodes iterates over the database and deletes all nodes that have not
|
||||||
// been seen (i.e. received a pong from) for some time.
|
// been seen (i.e. received a pong from) for some time.
|
||||||
func (db *DB) expireNodes() {
|
func (db *DB) expireNodes() {
|
||||||
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
|
iter, _ := db.db.NewIter(&pebble.IterOptions{
|
||||||
defer it.Release()
|
LowerBound: []byte(dbNodePrefix),
|
||||||
if !it.Next() {
|
UpperBound: common.UpperBound([]byte(dbNodePrefix)),
|
||||||
|
})
|
||||||
|
defer iter.Close()
|
||||||
|
|
||||||
|
if !iter.First() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -336,24 +388,24 @@ func (db *DB) expireNodes() {
|
||||||
atEnd = false
|
atEnd = false
|
||||||
)
|
)
|
||||||
for !atEnd {
|
for !atEnd {
|
||||||
id, ip, field := splitNodeItemKey(it.Key())
|
id, ip, field := splitNodeItemKey(iter.Key())
|
||||||
if field == dbNodePong {
|
if field == dbNodePong {
|
||||||
time, _ := binary.Varint(it.Value())
|
time, _ := binary.Varint(iter.Value())
|
||||||
if time > youngestPong {
|
if time > youngestPong {
|
||||||
youngestPong = time
|
youngestPong = time
|
||||||
}
|
}
|
||||||
if time < threshold {
|
if time < threshold {
|
||||||
// Last pong from this IP older than threshold, remove fields belonging to it.
|
// Last pong from this IP older than threshold, remove fields belonging to it.
|
||||||
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
|
deleteRange(db.db, nodeItemKey(id, ip, ""))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
atEnd = !it.Next()
|
atEnd = !iter.Next()
|
||||||
nextID, _ := splitNodeKey(it.Key())
|
nextID, _ := splitNodeKey(iter.Key())
|
||||||
if atEnd || nextID != id {
|
if atEnd || nextID != id {
|
||||||
// We've moved beyond the last entry of the current ID.
|
// We've moved beyond the last entry of the current ID.
|
||||||
// Remove everything if there was no recent enough pong.
|
// Remove everything if there was no recent enough pong.
|
||||||
if youngestPong > 0 && youngestPong < threshold {
|
if youngestPong > 0 && youngestPong < threshold {
|
||||||
deleteRange(db.lvl, nodeKey(id))
|
deleteRange(db.db, nodeKey(id))
|
||||||
}
|
}
|
||||||
youngestPong = 0
|
youngestPong = 0
|
||||||
}
|
}
|
||||||
|
|
@ -390,7 +442,7 @@ func (db *DB) LastPongReceived(id ID, ip netip.Addr) time.Time {
|
||||||
// UpdateLastPongReceived updates the last pong time of a node.
|
// UpdateLastPongReceived updates the last pong time of a node.
|
||||||
func (db *DB) UpdateLastPongReceived(id ID, ip netip.Addr, instance time.Time) error {
|
func (db *DB) UpdateLastPongReceived(id ID, ip netip.Addr, instance time.Time) error {
|
||||||
if !ip.IsValid() {
|
if !ip.IsValid() {
|
||||||
return errInvalidIP
|
return fmt.Errorf("invalid IP")
|
||||||
}
|
}
|
||||||
return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
|
return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
|
||||||
}
|
}
|
||||||
|
|
@ -406,7 +458,7 @@ func (db *DB) FindFails(id ID, ip netip.Addr) int {
|
||||||
// UpdateFindFails updates the number of findnode failures since bonding.
|
// UpdateFindFails updates the number of findnode failures since bonding.
|
||||||
func (db *DB) UpdateFindFails(id ID, ip netip.Addr, fails int) error {
|
func (db *DB) UpdateFindFails(id ID, ip netip.Addr, fails int) error {
|
||||||
if !ip.IsValid() {
|
if !ip.IsValid() {
|
||||||
return errInvalidIP
|
return fmt.Errorf("invalid IP")
|
||||||
}
|
}
|
||||||
return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
|
return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
|
||||||
}
|
}
|
||||||
|
|
@ -422,7 +474,7 @@ func (db *DB) FindFailsV5(id ID, ip netip.Addr) int {
|
||||||
// UpdateFindFailsV5 stores the discv5 findnode failure counter.
|
// UpdateFindFailsV5 stores the discv5 findnode failure counter.
|
||||||
func (db *DB) UpdateFindFailsV5(id ID, ip netip.Addr, fails int) error {
|
func (db *DB) UpdateFindFailsV5(id ID, ip netip.Addr, fails int) error {
|
||||||
if !ip.IsValid() {
|
if !ip.IsValid() {
|
||||||
return errInvalidIP
|
return fmt.Errorf("invalid IP")
|
||||||
}
|
}
|
||||||
return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
|
return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
|
||||||
}
|
}
|
||||||
|
|
@ -448,10 +500,10 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
|
||||||
var (
|
var (
|
||||||
now = time.Now()
|
now = time.Now()
|
||||||
nodes = make([]*Node, 0, n)
|
nodes = make([]*Node, 0, n)
|
||||||
it = db.lvl.NewIterator(nil, nil)
|
it, _ = db.db.NewIter(nil)
|
||||||
id ID
|
id ID
|
||||||
)
|
)
|
||||||
defer it.Release()
|
defer it.Close()
|
||||||
|
|
||||||
seek:
|
seek:
|
||||||
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
|
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
|
||||||
|
|
@ -461,7 +513,7 @@ seek:
|
||||||
ctr := id[0]
|
ctr := id[0]
|
||||||
rand.Read(id[:])
|
rand.Read(id[:])
|
||||||
id[0] = ctr + id[0]%16
|
id[0] = ctr + id[0]%16
|
||||||
it.Seek(nodeKey(id))
|
it.SeekGE(nodeKey(id))
|
||||||
|
|
||||||
n := nextNode(it)
|
n := nextNode(it)
|
||||||
if n == nil {
|
if n == nil {
|
||||||
|
|
@ -483,13 +535,15 @@ seek:
|
||||||
|
|
||||||
// reads the next node record from the iterator, skipping over other
|
// reads the next node record from the iterator, skipping over other
|
||||||
// database entries.
|
// database entries.
|
||||||
func nextNode(it iterator.Iterator) *Node {
|
func nextNode(it *pebble.Iterator) *Node {
|
||||||
for end := false; !end; end = !it.Next() {
|
for it.Valid() {
|
||||||
id, rest := splitNodeKey(it.Key())
|
id, rest := splitNodeKey(it.Key())
|
||||||
if string(rest) != dbDiscoverRoot {
|
if string(rest) == dbDiscoverRoot {
|
||||||
continue
|
return mustDecodeNode(id[:], it.Value())
|
||||||
|
}
|
||||||
|
if !it.Next() {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
return mustDecodeNode(id[:], it.Value())
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -501,5 +555,5 @@ func (db *DB) Close() {
|
||||||
default:
|
default:
|
||||||
close(db.quit)
|
close(db.quit)
|
||||||
}
|
}
|
||||||
db.lvl.Close()
|
db.db.Close()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue