From d7d1473df5bd411be31181d4dc9029ba70ce5e3c Mon Sep 17 00:00:00 2001 From: CodeByAnkita Date: Fri, 13 Mar 2026 11:25:32 +0700 Subject: [PATCH] p2p/enode: migrate node database from leveldb to pebble --- p2p/enode/nodedb.go | 142 ++++++++++++++++++++++++++------------------ 1 file changed, 83 insertions(+), 59 deletions(-) diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 2cd211e2c2..2f4fc0ab6f 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -20,20 +20,17 @@ import ( "bytes" "crypto/rand" "encoding/binary" + "errors" "fmt" "net/netip" "os" "sync" "time" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) // Keys in the node database. @@ -62,16 +59,14 @@ const ( dbVersion = 9 ) -var ( - errInvalidIP = errors.New("invalid IP") -) +var errInvalidIP = errors.New("invalid IP") var zeroIP = netip.IPv6Unspecified() // DB is the node database, storing previously seen nodes and any collected metadata about // them for QoS purposes. type DB struct { - lvl *leveldb.DB // Interface to the database itself + lvl *pebble.DB // Pebble database instance storing node records runner sync.Once // Ensures we can start at most one expirer quit chan struct{} // Channel to signal the expiring thread to stop } @@ -87,7 +82,9 @@ func OpenDB(path string) (*DB, error) { // newMemoryDB creates a new in-memory node database without a persistent backend. func newMemoryDB() (*DB, error) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) + db, err := pebble.Open("", &pebble.Options{ + FS: vfs.NewMem(), //in-memory filesystem for testing + }) if err != nil { return nil, err } @@ -95,40 +92,44 @@ func newMemoryDB() (*DB, error) { } // newPersistentDB creates/opens a leveldb backed persistent node database, -// also flushing its contents in case of a version mismatch. +// and flushes its contents in case of a version mismatch. func newPersistentDB(path string) (*DB, error) { - opts := &opt.Options{OpenFilesCacheCapacity: 5} - db, err := leveldb.OpenFile(path, opts) - if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted { - db, err = leveldb.RecoverFile(path, nil) + opts := &pebble.Options{ + MaxOpenFiles: 5, } + db, err := pebble.Open(path, opts) if err != nil { return nil, err } + // The nodes contained in the cache correspond to a certain protocol version. // Flush all nodes if the version doesn't match. currentVer := make([]byte, binary.MaxVarintLen64) currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))] - blob, err := db.Get([]byte(dbVersionKey), nil) + + blob, closer, err := db.Get([]byte(dbVersionKey)) switch err { - case leveldb.ErrNotFound: - // Version not found (i.e. empty cache), insert it - if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil { + case pebble.ErrNotFound: + // version missing → store it + if err := db.Set([]byte(dbVersionKey), currentVer, pebble.Sync); err != nil { db.Close() return nil, err } - case nil: - // Version present, flush if different + + defer closer.Close() + if !bytes.Equal(blob, currentVer) { db.Close() - if err = os.RemoveAll(path); err != nil { - return nil, err - } + os.RemoveAll(path) return newPersistentDB(path) } + default: + db.Close() + return nil, err } + return &DB{lvl: db, quit: make(chan struct{})}, nil } @@ -169,7 +170,7 @@ func splitNodeItemKey(key []byte) (id ID, ip netip.Addr, field string) { key = key[len(dbDiscoverRoot)+1:] // Split out the IP. ip, _ = netip.AddrFromSlice(key[:16]) - key = key[16+1:] + key = key[17:] // Field is the remainder of key. field = string(key) return id, ip, field @@ -196,10 +197,12 @@ func localItemKey(id ID, field string) []byte { // fetchInt64 retrieves an integer associated with a particular key. func (db *DB) fetchInt64(key []byte) int64 { - blob, err := db.lvl.Get(key, nil) + blob, closer, err := db.lvl.Get(key) if err != nil { return 0 } + defer closer.Close() + val, read := binary.Varint(blob) if read <= 0 { return 0 @@ -209,34 +212,39 @@ func (db *DB) fetchInt64(key []byte) int64 { // storeInt64 stores an integer in the given key. func (db *DB) storeInt64(key []byte, n int64) error { + blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutVarint(blob, n)] - return db.lvl.Put(key, blob, nil) + + return db.lvl.Set(key, blob, pebble.Sync) } // fetchUint64 retrieves an integer associated with a particular key. func (db *DB) fetchUint64(key []byte) uint64 { - blob, err := db.lvl.Get(key, nil) + blob, closer, err := db.lvl.Get(key) if err != nil { return 0 } + defer closer.Close() val, _ := binary.Uvarint(blob) return val } // storeUint64 stores an integer in the given key. func (db *DB) storeUint64(key []byte, n uint64) error { + blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutUvarint(blob, n)] - return db.lvl.Put(key, blob, nil) + return db.lvl.Set(key, blob, pebble.Sync) } // Node retrieves a node with a given id from the database. func (db *DB) Node(id ID) *Node { - blob, err := db.lvl.Get(nodeKey(id), nil) + blob, closer, err := db.lvl.Get(nodeKey(id)) if err != nil { return nil } + defer closer.Close() return mustDecodeNode(id[:], blob) } @@ -245,9 +253,7 @@ func mustDecodeNode(id, data []byte) *Node { if err := rlp.DecodeBytes(data, &r); err != nil { panic(fmt.Errorf("p2p/enode: can't decode node %x in DB: %v", id, err)) } - if len(id) != len(ID{}) { - panic(fmt.Errorf("invalid id length %d", len(id))) - } + return newNodeWithID(&r, ID(id)) } @@ -260,7 +266,7 @@ func (db *DB) UpdateNode(node *Node) error { if err != nil { return err } - if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil { + if err := db.lvl.Set(nodeKey(node.ID()), blob, pebble.Sync); err != nil { return err } return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq()) @@ -285,11 +291,20 @@ func (db *DB) DeleteNode(id ID) { deleteRange(db.lvl, nodeKey(id)) } -func deleteRange(db *leveldb.DB, prefix []byte) { - it := db.NewIterator(util.BytesPrefix(prefix), nil) - defer it.Release() - for it.Next() { - db.Delete(it.Key(), nil) +func deleteRange(db *pebble.DB, prefix []byte) { + iter, err := db.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + }) + if err != nil { + return + } + defer iter.Close() + for iter.First(); iter.Valid(); iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, prefix) { + break + } + _ = db.Delete(key, pebble.Sync) } } @@ -324,9 +339,14 @@ func (db *DB) expirer() { // expireNodes iterates over the database and deletes all nodes that have not // been seen (i.e. received a pong from) for some time. func (db *DB) expireNodes() { - it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil) - defer it.Release() - if !it.Next() { + iter, err := db.lvl.NewIter(&pebble.IterOptions{ + LowerBound: []byte(dbNodePrefix), + }) + if err != nil { + return + } + defer iter.Close() + if !iter.First() { return } @@ -336,19 +356,19 @@ func (db *DB) expireNodes() { atEnd = false ) for !atEnd { - id, ip, field := splitNodeItemKey(it.Key()) + id, ip, field := splitNodeItemKey(iter.Key()) if field == dbNodePong { - time, _ := binary.Varint(it.Value()) - if time > youngestPong { - youngestPong = time + pongTime, _ := binary.Varint(iter.Value()) + if pongTime > youngestPong { + youngestPong = pongTime } - if time < threshold { + if pongTime < threshold { // Last pong from this IP older than threshold, remove fields belonging to it. deleteRange(db.lvl, nodeItemKey(id, ip, "")) } } - atEnd = !it.Next() - nextID, _ := splitNodeKey(it.Key()) + atEnd = !iter.Next() + nextID, _ := splitNodeKey(iter.Key()) if atEnd || nextID != id { // We've moved beyond the last entry of the current ID. // Remove everything if there was no recent enough pong. @@ -448,10 +468,14 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node { var ( now = time.Now() nodes = make([]*Node, 0, n) - it = db.lvl.NewIterator(nil, nil) id ID ) - defer it.Release() + + iter, err := db.lvl.NewIter(nil) + if err != nil { + return nil + } + defer iter.Close() seek: for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { @@ -461,29 +485,29 @@ seek: ctr := id[0] rand.Read(id[:]) id[0] = ctr + id[0]%16 - it.Seek(nodeKey(id)) + iter.SeekGE(nodeKey(id)) - n := nextNode(it) - if n == nil { + node := nextNode(iter) + if node == nil { id[0] = 0 continue seek // iterator exhausted } - if now.Sub(db.LastPongReceived(n.ID(), n.IPAddr())) > maxAge { + if now.Sub(db.LastPongReceived(node.ID(), node.IPAddr())) > maxAge { continue seek } for i := range nodes { - if nodes[i].ID() == n.ID() { + if nodes[i].ID() == node.ID() { continue seek // duplicate } } - nodes = append(nodes, n) + nodes = append(nodes, node) } return nodes } // reads the next node record from the iterator, skipping over other // database entries. -func nextNode(it iterator.Iterator) *Node { +func nextNode(it *pebble.Iterator) *Node { for end := false; !end; end = !it.Next() { id, rest := splitNodeKey(it.Key()) if string(rest) != dbDiscoverRoot {