p2p/discover: fix a deadlock

This commit is contained in:
weixie.cui 2026-01-22 20:37:30 +08:00
parent d0af257aa2
commit c4d7cc6c37
2 changed files with 82 additions and 24 deletions

View file

@ -378,8 +378,12 @@ loop:
case op := <-tab.addNodeCh: case op := <-tab.addNodeCh:
tab.mutex.Lock() tab.mutex.Lock()
ok := tab.handleAddNode(op) ok, addedNode := tab.handleAddNode(op)
tab.mutex.Unlock() tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if addedNode != nil {
tab.nodeFeed.Send(addedNode)
}
tab.addNodeHandled <- ok tab.addNodeHandled <- ok
case op := <-tab.trackRequestCh: case op := <-tab.trackRequestCh:
@ -454,8 +458,14 @@ func (tab *Table) loadSeedNodes() {
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age) tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age)
} }
tab.mutex.Lock() tab.mutex.Lock()
tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) _, addedNode := tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
tab.mutex.Unlock() tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
// See: goroutine_deadlock_analysis_97c19bb9.plan.md
if addedNode != nil {
tab.nodeFeed.Send(addedNode)
}
} }
} }
@ -506,30 +516,34 @@ func (tab *Table) removeIP(b *bucket, ip netip.Addr) {
// handleAddNode adds the node in the request to the table, if there is space. // handleAddNode adds the node in the request to the table, if there is space.
// The caller must hold tab.mutex. // The caller must hold tab.mutex.
func (tab *Table) handleAddNode(req addNodeOp) bool { //
// Returns (true, addedNode) if the node was added, (false, nil) otherwise.
// The caller should call nodeFeed.Send(addedNode) AFTER releasing the mutex
// to notify subscribers about the new node.
func (tab *Table) handleAddNode(req addNodeOp) (bool, *enode.Node) {
if req.node.ID() == tab.self().ID() { if req.node.ID() == tab.self().ID() {
return false return false, nil
} }
// For nodes from inbound contact, there is an additional safety measure: if the table // For nodes from inbound contact, there is an additional safety measure: if the table
// is still initializing the node is not added. // is still initializing the node is not added.
if req.isInbound && !tab.isInitDone() { if req.isInbound && !tab.isInitDone() {
return false return false, nil
} }
b := tab.bucket(req.node.ID()) b := tab.bucket(req.node.ID())
n, _ := tab.bumpInBucket(b, req.node, req.isInbound) n, _ := tab.bumpInBucket(b, req.node, req.isInbound)
if n != nil { if n != nil {
// Already in bucket. // Already in bucket.
return false return false, nil
} }
if len(b.entries) >= bucketSize { if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement. // Bucket full, maybe add as replacement.
tab.addReplacement(b, req.node) tab.addReplacement(b, req.node)
return false return false, nil
} }
if !tab.addIP(b, req.node.IPAddr()) { if !tab.addIP(b, req.node.IPAddr()) {
// Can't add: IP limit reached. // Can't add: IP limit reached.
return false return false, nil
} }
// Add to bucket. // Add to bucket.
@ -540,8 +554,8 @@ func (tab *Table) handleAddNode(req addNodeOp) bool {
} }
b.entries = append(b.entries, wn) b.entries = append(b.entries, wn)
b.replacements = deleteNode(b.replacements, wn.ID()) b.replacements = deleteNode(b.replacements, wn.ID())
tab.nodeAdded(b, wn) addedNode := tab.nodeAdded(b, wn)
return true return true, addedNode
} }
// addReplacement adds n to the replacement cache of bucket b. // addReplacement adds n to the replacement cache of bucket b.
@ -562,20 +576,29 @@ func (tab *Table) addReplacement(b *bucket, n *enode.Node) {
} }
} }
func (tab *Table) nodeAdded(b *bucket, n *tableNode) { // nodeAdded is called when a node is added to a bucket.
// It returns the added node for the caller to send a notification
// via nodeFeed.Send() AFTER releasing the mutex.
//
// The caller must hold tab.mutex.
func (tab *Table) nodeAdded(b *bucket, n *tableNode) *enode.Node {
if n.addedToTable.IsZero() { if n.addedToTable.IsZero() {
n.addedToTable = time.Now() n.addedToTable = time.Now()
} }
n.addedToBucket = time.Now() n.addedToBucket = time.Now()
tab.revalidation.nodeAdded(tab, n) tab.revalidation.nodeAdded(tab, n)
tab.nodeFeed.Send(n.Node) // NOTE: nodeFeed.Send() is NOT called here to avoid deadlock.
// The caller must send the notification after releasing the mutex.
// See: goroutine_deadlock_analysis_97c19bb9.plan.md
if tab.nodeAddedHook != nil { if tab.nodeAddedHook != nil {
tab.nodeAddedHook(b, n) tab.nodeAddedHook(b, n)
} }
if metrics.Enabled() { if metrics.Enabled() {
bucketsCounter[b.index].Inc(1) bucketsCounter[b.index].Inc(1)
} }
return n.Node
} }
func (tab *Table) nodeRemoved(b *bucket, n *tableNode) { func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {
@ -590,11 +613,18 @@ func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {
// deleteInBucket removes node n from the table. // deleteInBucket removes node n from the table.
// If there are replacement nodes in the bucket, the node is replaced. // If there are replacement nodes in the bucket, the node is replaced.
func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode { //
// Returns (replacementNode, nodeToNotify) where:
// - replacementNode is the tableNode that replaced the deleted node (or nil)
// - nodeToNotify is the enode.Node that should be sent via nodeFeed.Send() AFTER
// releasing the mutex (or nil if no notification needed)
//
// The caller must hold tab.mutex.
func (tab *Table) deleteInBucket(b *bucket, id enode.ID) (*tableNode, *enode.Node) {
index := slices.IndexFunc(b.entries, func(e *tableNode) bool { return e.ID() == id }) index := slices.IndexFunc(b.entries, func(e *tableNode) bool { return e.ID() == id })
if index == -1 { if index == -1 {
// Entry has been removed already. // Entry has been removed already.
return nil return nil, nil
} }
// Remove the node. // Remove the node.
@ -606,15 +636,15 @@ func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode {
// Add replacement. // Add replacement.
if len(b.replacements) == 0 { if len(b.replacements) == 0 {
tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr()) tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr())
return nil return nil, nil
} }
rindex := tab.rand.Intn(len(b.replacements)) rindex := tab.rand.Intn(len(b.replacements))
rep := b.replacements[rindex] rep := b.replacements[rindex]
b.replacements = slices.Delete(b.replacements, rindex, rindex+1) b.replacements = slices.Delete(b.replacements, rindex, rindex+1)
b.entries = append(b.entries, rep) b.entries = append(b.entries, rep)
tab.nodeAdded(b, rep) nodeToNotify := tab.nodeAdded(b, rep)
tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr(), "r", rep.ID(), "rip", rep.IPAddr()) tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr(), "r", rep.ID(), "rip", rep.IPAddr())
return rep return rep, nodeToNotify
} }
// bumpInBucket updates a node record if it exists in the bucket. // bumpInBucket updates a node record if it exists in the bucket.
@ -669,8 +699,10 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
tab.db.UpdateFindFails(op.node.ID(), op.node.IPAddr(), fails) tab.db.UpdateFindFails(op.node.ID(), op.node.IPAddr(), fails)
} }
// Collect nodes to notify after releasing mutex.
var nodesToNotify []*enode.Node
tab.mutex.Lock() tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(op.node.ID()) b := tab.bucket(op.node.ID())
// Remove the node from the local table if it fails to return anything useful too // Remove the node from the local table if it fails to return anything useful too
@ -678,12 +710,25 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
// condition specifically exists to make bootstrapping in smaller test networks more // condition specifically exists to make bootstrapping in smaller test networks more
// reliable. // reliable.
if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 { if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 {
tab.deleteInBucket(b, op.node.ID()) _, nodeToNotify := tab.deleteInBucket(b, op.node.ID())
if nodeToNotify != nil {
nodesToNotify = append(nodesToNotify, nodeToNotify)
}
} }
// Add found nodes. // Add found nodes.
for _, n := range op.foundNodes { for _, n := range op.foundNodes {
tab.handleAddNode(addNodeOp{n, false, false}) _, addedNode := tab.handleAddNode(addNodeOp{n, false, false})
if addedNode != nil {
nodesToNotify = append(nodesToNotify, addedNode)
}
}
tab.mutex.Unlock()
// Send notifications outside of mutex to avoid deadlock.
for _, n := range nodesToNotify {
tab.nodeFeed.Send(n)
} }
} }
@ -701,9 +746,14 @@ func pushNode(list []*tableNode, n *tableNode, max int) ([]*tableNode, *tableNod
// deleteNode removes a node from the table. // deleteNode removes a node from the table.
func (tab *Table) deleteNode(n *enode.Node) { func (tab *Table) deleteNode(n *enode.Node) {
tab.mutex.Lock() tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID()) b := tab.bucket(n.ID())
tab.deleteInBucket(b, n.ID()) _, nodeToNotify := tab.deleteInBucket(b, n.ID())
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if nodeToNotify != nil {
tab.nodeFeed.Send(nodeToNotify)
}
} }
// waitForNodes blocks until the table contains at least n nodes. // waitForNodes blocks until the table contains at least n nodes.

View file

@ -154,17 +154,23 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
}() }()
// Remaining logic needs access to Table internals. // Remaining logic needs access to Table internals.
var nodeToNotify *enode.Node
tab.mutex.Lock() tab.mutex.Lock()
defer tab.mutex.Unlock()
if !resp.didRespond { if !resp.didRespond {
n.livenessChecks /= 3 n.livenessChecks /= 3
if n.livenessChecks <= 0 { if n.livenessChecks <= 0 {
tab.deleteInBucket(b, n.ID()) _, nodeToNotify = tab.deleteInBucket(b, n.ID())
} else { } else {
tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name) tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
tr.moveToList(&tr.fast, n, now, &tab.rand) tr.moveToList(&tr.fast, n, now, &tab.rand)
} }
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if nodeToNotify != nil {
tab.nodeFeed.Send(nodeToNotify)
}
return return
} }
@ -181,6 +187,8 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
if !endpointChanged { if !endpointChanged {
tr.moveToList(&tr.slow, n, now, &tab.rand) tr.moveToList(&tr.slow, n, now, &tab.rand)
} }
tab.mutex.Unlock()
} }
// moveToList ensures n is in the 'dest' list. // moveToList ensures n is in the 'dest' list.