This commit is contained in:
cui 2026-02-24 21:54:54 -08:00 committed by GitHub
commit c90f198a7f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 80 additions and 24 deletions

View file

@ -378,8 +378,12 @@ loop:
case op := <-tab.addNodeCh:
tab.mutex.Lock()
ok := tab.handleAddNode(op)
ok, addedNode := tab.handleAddNode(op)
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if addedNode != nil {
tab.nodeFeed.Send(addedNode)
}
tab.addNodeHandled <- ok
case op := <-tab.trackRequestCh:
@ -454,8 +458,13 @@ func (tab *Table) loadSeedNodes() {
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age)
}
tab.mutex.Lock()
tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
_, addedNode := tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if addedNode != nil {
tab.nodeFeed.Send(addedNode)
}
}
}
@ -506,30 +515,34 @@ func (tab *Table) removeIP(b *bucket, ip netip.Addr) {
// handleAddNode adds the node in the request to the table, if there is space.
// The caller must hold tab.mutex.
func (tab *Table) handleAddNode(req addNodeOp) bool {
//
// Returns (true, addedNode) if the node was added, (false, nil) otherwise.
// The caller should call nodeFeed.Send(addedNode) AFTER releasing the mutex
// to notify subscribers about the new node.
func (tab *Table) handleAddNode(req addNodeOp) (bool, *enode.Node) {
if req.node.ID() == tab.self().ID() {
return false
return false, nil
}
// For nodes from inbound contact, there is an additional safety measure: if the table
// is still initializing the node is not added.
if req.isInbound && !tab.isInitDone() {
return false
return false, nil
}
b := tab.bucket(req.node.ID())
n, _ := tab.bumpInBucket(b, req.node, req.isInbound)
if n != nil {
// Already in bucket.
return false
return false, nil
}
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, req.node)
return false
return false, nil
}
if !tab.addIP(b, req.node.IPAddr()) {
// Can't add: IP limit reached.
return false
return false, nil
}
// Add to bucket.
@ -540,8 +553,8 @@ func (tab *Table) handleAddNode(req addNodeOp) bool {
}
b.entries = append(b.entries, wn)
b.replacements = deleteNode(b.replacements, wn.ID())
tab.nodeAdded(b, wn)
return true
addedNode := tab.nodeAdded(b, wn)
return true, addedNode
}
// addReplacement adds n to the replacement cache of bucket b.
@ -562,20 +575,28 @@ func (tab *Table) addReplacement(b *bucket, n *enode.Node) {
}
}
func (tab *Table) nodeAdded(b *bucket, n *tableNode) {
// nodeAdded is called when a node is added to a bucket.
// It returns the added node for the caller to send a notification
// via nodeFeed.Send() AFTER releasing the mutex.
//
// The caller must hold tab.mutex.
func (tab *Table) nodeAdded(b *bucket, n *tableNode) *enode.Node {
if n.addedToTable.IsZero() {
n.addedToTable = time.Now()
}
n.addedToBucket = time.Now()
tab.revalidation.nodeAdded(tab, n)
tab.nodeFeed.Send(n.Node)
// NOTE: nodeFeed.Send() is NOT called here to avoid deadlock.
// The caller must send the notification after releasing the mutex.
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(b, n)
}
if metrics.Enabled() {
bucketsCounter[b.index].Inc(1)
}
return n.Node
}
func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {
@ -590,11 +611,18 @@ func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {
// deleteInBucket removes node n from the table.
// If there are replacement nodes in the bucket, the node is replaced.
func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode {
//
// Returns (replacementNode, nodeToNotify) where:
// - replacementNode is the tableNode that replaced the deleted node (or nil)
// - nodeToNotify is the enode.Node that should be sent via nodeFeed.Send() AFTER
// releasing the mutex (or nil if no notification needed)
//
// The caller must hold tab.mutex.
func (tab *Table) deleteInBucket(b *bucket, id enode.ID) (*tableNode, *enode.Node) {
index := slices.IndexFunc(b.entries, func(e *tableNode) bool { return e.ID() == id })
if index == -1 {
// Entry has been removed already.
return nil
return nil, nil
}
// Remove the node.
@ -606,15 +634,15 @@ func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode {
// Add replacement.
if len(b.replacements) == 0 {
tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr())
return nil
return nil, nil
}
rindex := tab.rand.Intn(len(b.replacements))
rep := b.replacements[rindex]
b.replacements = slices.Delete(b.replacements, rindex, rindex+1)
b.entries = append(b.entries, rep)
tab.nodeAdded(b, rep)
nodeToNotify := tab.nodeAdded(b, rep)
tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr(), "r", rep.ID(), "rip", rep.IPAddr())
return rep
return rep, nodeToNotify
}
// bumpInBucket updates a node record if it exists in the bucket.
@ -669,8 +697,10 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
tab.db.UpdateFindFails(op.node.ID(), op.node.IPAddr(), fails)
}
// Collect nodes to notify after releasing mutex.
var nodesToNotify []*enode.Node
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(op.node.ID())
// Remove the node from the local table if it fails to return anything useful too
@ -678,12 +708,25 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
// condition specifically exists to make bootstrapping in smaller test networks more
// reliable.
if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 {
tab.deleteInBucket(b, op.node.ID())
_, nodeToNotify := tab.deleteInBucket(b, op.node.ID())
if nodeToNotify != nil {
nodesToNotify = append(nodesToNotify, nodeToNotify)
}
}
// Add found nodes.
for _, n := range op.foundNodes {
tab.handleAddNode(addNodeOp{n, false, false})
_, addedNode := tab.handleAddNode(addNodeOp{n, false, false})
if addedNode != nil {
nodesToNotify = append(nodesToNotify, addedNode)
}
}
tab.mutex.Unlock()
// Send notifications outside of mutex to avoid deadlock.
for _, n := range nodesToNotify {
tab.nodeFeed.Send(n)
}
}
@ -701,9 +744,14 @@ func pushNode(list []*tableNode, n *tableNode, max int) ([]*tableNode, *tableNod
// deleteNode removes a node from the table.
func (tab *Table) deleteNode(n *enode.Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
tab.deleteInBucket(b, n.ID())
_, nodeToNotify := tab.deleteInBucket(b, n.ID())
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if nodeToNotify != nil {
tab.nodeFeed.Send(nodeToNotify)
}
}
// waitForNodes blocks until the table contains at least n nodes.

View file

@ -154,17 +154,23 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
}()
// Remaining logic needs access to Table internals.
var nodeToNotify *enode.Node
tab.mutex.Lock()
defer tab.mutex.Unlock()
if !resp.didRespond {
n.livenessChecks /= 3
if n.livenessChecks <= 0 {
tab.deleteInBucket(b, n.ID())
_, nodeToNotify = tab.deleteInBucket(b, n.ID())
} else {
tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
tr.moveToList(&tr.fast, n, now, &tab.rand)
}
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if nodeToNotify != nil {
tab.nodeFeed.Send(nodeToNotify)
}
return
}
@ -181,6 +187,8 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
if !endpointChanged {
tr.moveToList(&tr.slow, n, now, &tab.rand)
}
tab.mutex.Unlock()
}
// moveToList ensures n is in the 'dest' list.