mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 13:21:37 +00:00
p2p/enode: migrate node database from leveldb to pebble
This commit is contained in:
parent
943a30d1ee
commit
d7d1473df5
1 changed files with 83 additions and 59 deletions
|
|
@ -20,20 +20,17 @@ import (
|
|||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/cockroachdb/pebble/vfs"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/storage"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
// Keys in the node database.
|
||||
|
|
@ -62,16 +59,14 @@ const (
|
|||
dbVersion = 9
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidIP = errors.New("invalid IP")
|
||||
)
|
||||
var errInvalidIP = errors.New("invalid IP")
|
||||
|
||||
var zeroIP = netip.IPv6Unspecified()
|
||||
|
||||
// DB is the node database, storing previously seen nodes and any collected metadata about
|
||||
// them for QoS purposes.
|
||||
type DB struct {
|
||||
lvl *leveldb.DB // Interface to the database itself
|
||||
lvl *pebble.DB // Pebble database instance storing node records
|
||||
runner sync.Once // Ensures we can start at most one expirer
|
||||
quit chan struct{} // Channel to signal the expiring thread to stop
|
||||
}
|
||||
|
|
@ -87,7 +82,9 @@ func OpenDB(path string) (*DB, error) {
|
|||
|
||||
// newMemoryDB creates a new in-memory node database without a persistent backend.
|
||||
func newMemoryDB() (*DB, error) {
|
||||
db, err := leveldb.Open(storage.NewMemStorage(), nil)
|
||||
db, err := pebble.Open("", &pebble.Options{
|
||||
FS: vfs.NewMem(), //in-memory filesystem for testing
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -95,40 +92,44 @@ func newMemoryDB() (*DB, error) {
|
|||
}
|
||||
|
||||
// newPersistentDB creates/opens a leveldb backed persistent node database,
|
||||
// also flushing its contents in case of a version mismatch.
|
||||
// and flushes its contents in case of a version mismatch.
|
||||
func newPersistentDB(path string) (*DB, error) {
|
||||
opts := &opt.Options{OpenFilesCacheCapacity: 5}
|
||||
db, err := leveldb.OpenFile(path, opts)
|
||||
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
|
||||
db, err = leveldb.RecoverFile(path, nil)
|
||||
opts := &pebble.Options{
|
||||
MaxOpenFiles: 5,
|
||||
}
|
||||
db, err := pebble.Open(path, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The nodes contained in the cache correspond to a certain protocol version.
|
||||
// Flush all nodes if the version doesn't match.
|
||||
currentVer := make([]byte, binary.MaxVarintLen64)
|
||||
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
|
||||
|
||||
blob, err := db.Get([]byte(dbVersionKey), nil)
|
||||
|
||||
blob, closer, err := db.Get([]byte(dbVersionKey))
|
||||
switch err {
|
||||
case leveldb.ErrNotFound:
|
||||
// Version not found (i.e. empty cache), insert it
|
||||
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
|
||||
case pebble.ErrNotFound:
|
||||
// version missing → store it
|
||||
if err := db.Set([]byte(dbVersionKey), currentVer, pebble.Sync); err != nil {
|
||||
db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
case nil:
|
||||
// Version present, flush if different
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
if !bytes.Equal(blob, currentVer) {
|
||||
db.Close()
|
||||
if err = os.RemoveAll(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
os.RemoveAll(path)
|
||||
return newPersistentDB(path)
|
||||
}
|
||||
default:
|
||||
db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DB{lvl: db, quit: make(chan struct{})}, nil
|
||||
}
|
||||
|
||||
|
|
@ -169,7 +170,7 @@ func splitNodeItemKey(key []byte) (id ID, ip netip.Addr, field string) {
|
|||
key = key[len(dbDiscoverRoot)+1:]
|
||||
// Split out the IP.
|
||||
ip, _ = netip.AddrFromSlice(key[:16])
|
||||
key = key[16+1:]
|
||||
key = key[17:]
|
||||
// Field is the remainder of key.
|
||||
field = string(key)
|
||||
return id, ip, field
|
||||
|
|
@ -196,10 +197,12 @@ func localItemKey(id ID, field string) []byte {
|
|||
|
||||
// fetchInt64 retrieves an integer associated with a particular key.
|
||||
func (db *DB) fetchInt64(key []byte) int64 {
|
||||
blob, err := db.lvl.Get(key, nil)
|
||||
blob, closer, err := db.lvl.Get(key)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
val, read := binary.Varint(blob)
|
||||
if read <= 0 {
|
||||
return 0
|
||||
|
|
@ -209,34 +212,39 @@ func (db *DB) fetchInt64(key []byte) int64 {
|
|||
|
||||
// storeInt64 stores an integer in the given key.
|
||||
func (db *DB) storeInt64(key []byte, n int64) error {
|
||||
|
||||
blob := make([]byte, binary.MaxVarintLen64)
|
||||
blob = blob[:binary.PutVarint(blob, n)]
|
||||
return db.lvl.Put(key, blob, nil)
|
||||
|
||||
return db.lvl.Set(key, blob, pebble.Sync)
|
||||
}
|
||||
|
||||
// fetchUint64 retrieves an integer associated with a particular key.
|
||||
func (db *DB) fetchUint64(key []byte) uint64 {
|
||||
blob, err := db.lvl.Get(key, nil)
|
||||
blob, closer, err := db.lvl.Get(key)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
defer closer.Close()
|
||||
val, _ := binary.Uvarint(blob)
|
||||
return val
|
||||
}
|
||||
|
||||
// storeUint64 stores an integer in the given key.
|
||||
func (db *DB) storeUint64(key []byte, n uint64) error {
|
||||
|
||||
blob := make([]byte, binary.MaxVarintLen64)
|
||||
blob = blob[:binary.PutUvarint(blob, n)]
|
||||
return db.lvl.Put(key, blob, nil)
|
||||
return db.lvl.Set(key, blob, pebble.Sync)
|
||||
}
|
||||
|
||||
// Node retrieves a node with a given id from the database.
|
||||
func (db *DB) Node(id ID) *Node {
|
||||
blob, err := db.lvl.Get(nodeKey(id), nil)
|
||||
blob, closer, err := db.lvl.Get(nodeKey(id))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer closer.Close()
|
||||
return mustDecodeNode(id[:], blob)
|
||||
}
|
||||
|
||||
|
|
@ -245,9 +253,7 @@ func mustDecodeNode(id, data []byte) *Node {
|
|||
if err := rlp.DecodeBytes(data, &r); err != nil {
|
||||
panic(fmt.Errorf("p2p/enode: can't decode node %x in DB: %v", id, err))
|
||||
}
|
||||
if len(id) != len(ID{}) {
|
||||
panic(fmt.Errorf("invalid id length %d", len(id)))
|
||||
}
|
||||
|
||||
return newNodeWithID(&r, ID(id))
|
||||
}
|
||||
|
||||
|
|
@ -260,7 +266,7 @@ func (db *DB) UpdateNode(node *Node) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
|
||||
if err := db.lvl.Set(nodeKey(node.ID()), blob, pebble.Sync); err != nil {
|
||||
return err
|
||||
}
|
||||
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
|
||||
|
|
@ -285,11 +291,20 @@ func (db *DB) DeleteNode(id ID) {
|
|||
deleteRange(db.lvl, nodeKey(id))
|
||||
}
|
||||
|
||||
func deleteRange(db *leveldb.DB, prefix []byte) {
|
||||
it := db.NewIterator(util.BytesPrefix(prefix), nil)
|
||||
defer it.Release()
|
||||
for it.Next() {
|
||||
db.Delete(it.Key(), nil)
|
||||
func deleteRange(db *pebble.DB, prefix []byte) {
|
||||
iter, err := db.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer iter.Close()
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
key := iter.Key()
|
||||
if !bytes.HasPrefix(key, prefix) {
|
||||
break
|
||||
}
|
||||
_ = db.Delete(key, pebble.Sync)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -324,9 +339,14 @@ func (db *DB) expirer() {
|
|||
// expireNodes iterates over the database and deletes all nodes that have not
|
||||
// been seen (i.e. received a pong from) for some time.
|
||||
func (db *DB) expireNodes() {
|
||||
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
|
||||
defer it.Release()
|
||||
if !it.Next() {
|
||||
iter, err := db.lvl.NewIter(&pebble.IterOptions{
|
||||
LowerBound: []byte(dbNodePrefix),
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer iter.Close()
|
||||
if !iter.First() {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -336,19 +356,19 @@ func (db *DB) expireNodes() {
|
|||
atEnd = false
|
||||
)
|
||||
for !atEnd {
|
||||
id, ip, field := splitNodeItemKey(it.Key())
|
||||
id, ip, field := splitNodeItemKey(iter.Key())
|
||||
if field == dbNodePong {
|
||||
time, _ := binary.Varint(it.Value())
|
||||
if time > youngestPong {
|
||||
youngestPong = time
|
||||
pongTime, _ := binary.Varint(iter.Value())
|
||||
if pongTime > youngestPong {
|
||||
youngestPong = pongTime
|
||||
}
|
||||
if time < threshold {
|
||||
if pongTime < threshold {
|
||||
// Last pong from this IP older than threshold, remove fields belonging to it.
|
||||
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
|
||||
}
|
||||
}
|
||||
atEnd = !it.Next()
|
||||
nextID, _ := splitNodeKey(it.Key())
|
||||
atEnd = !iter.Next()
|
||||
nextID, _ := splitNodeKey(iter.Key())
|
||||
if atEnd || nextID != id {
|
||||
// We've moved beyond the last entry of the current ID.
|
||||
// Remove everything if there was no recent enough pong.
|
||||
|
|
@ -448,10 +468,14 @@ func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
|
|||
var (
|
||||
now = time.Now()
|
||||
nodes = make([]*Node, 0, n)
|
||||
it = db.lvl.NewIterator(nil, nil)
|
||||
id ID
|
||||
)
|
||||
defer it.Release()
|
||||
|
||||
iter, err := db.lvl.NewIter(nil)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
seek:
|
||||
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
|
||||
|
|
@ -461,29 +485,29 @@ seek:
|
|||
ctr := id[0]
|
||||
rand.Read(id[:])
|
||||
id[0] = ctr + id[0]%16
|
||||
it.Seek(nodeKey(id))
|
||||
iter.SeekGE(nodeKey(id))
|
||||
|
||||
n := nextNode(it)
|
||||
if n == nil {
|
||||
node := nextNode(iter)
|
||||
if node == nil {
|
||||
id[0] = 0
|
||||
continue seek // iterator exhausted
|
||||
}
|
||||
if now.Sub(db.LastPongReceived(n.ID(), n.IPAddr())) > maxAge {
|
||||
if now.Sub(db.LastPongReceived(node.ID(), node.IPAddr())) > maxAge {
|
||||
continue seek
|
||||
}
|
||||
for i := range nodes {
|
||||
if nodes[i].ID() == n.ID() {
|
||||
if nodes[i].ID() == node.ID() {
|
||||
continue seek // duplicate
|
||||
}
|
||||
}
|
||||
nodes = append(nodes, n)
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// reads the next node record from the iterator, skipping over other
|
||||
// database entries.
|
||||
func nextNode(it iterator.Iterator) *Node {
|
||||
func nextNode(it *pebble.Iterator) *Node {
|
||||
for end := false; !end; end = !it.Next() {
|
||||
id, rest := splitNodeKey(it.Key())
|
||||
if string(rest) != dbDiscoverRoot {
|
||||
|
|
|
|||
Loading…
Reference in a new issue