go-ethereum/p2p/discover/table_reval.go
Moshe Malawach 6702bfb432 p2p/discover: fix data race on revalidationList fields
Table.loop calls (*tableRevalidation).run without holding tab.mutex,
while doRefresh -> loadSeedNodes -> handleAddNode reaches the same
tableRevalidation state (nodeAdded -> list.push -> list.schedule) in a
separate goroutine under tab.mutex. The two paths race on
revalidationList.nextTime and list.nodes: the loop goroutine reads
nextTime in run, schedules (writes it) and iterates list.nodes via
list.get; the refresh goroutine appends to list.nodes and, on the first
push, writes nextTime.

Acquire tab.mutex for the full duration of run and document that
startRequest must be called with the lock held. Remove the now-redundant
internal lock/unlock pair in startRequest.

Add a regression test that triggers the race deterministically under
'go test -race'.

Closes #31460.
2026-04-21 22:11:01 +02:00

253 lines
7.3 KiB
Go

// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package discover
import (
"fmt"
"math"
"slices"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
)
const never = mclock.AbsTime(math.MaxInt64)
const slowRevalidationFactor = 3
// tableRevalidation implements the node revalidation process.
// It tracks all nodes contained in Table, and schedules sending PING to them.
type tableRevalidation struct {
fast revalidationList
slow revalidationList
activeReq map[enode.ID]struct{}
}
type revalidationResponse struct {
n *tableNode
newRecord *enode.Node
didRespond bool
}
func (tr *tableRevalidation) init(cfg *Config) {
tr.activeReq = make(map[enode.ID]struct{})
tr.fast.nextTime = never
tr.fast.interval = cfg.PingInterval
tr.fast.name = "fast"
tr.slow.nextTime = never
tr.slow.interval = cfg.PingInterval * slowRevalidationFactor
tr.slow.name = "slow"
}
// nodeAdded is called when the table receives a new node.
func (tr *tableRevalidation) nodeAdded(tab *Table, n *tableNode) {
tr.fast.push(n, tab.cfg.Clock.Now(), &tab.rand)
}
// nodeRemoved is called when a node was removed from the table.
func (tr *tableRevalidation) nodeRemoved(n *tableNode) {
if n.revalList == nil {
panic(fmt.Errorf("removed node %v has nil revalList", n.ID()))
}
n.revalList.remove(n)
}
// nodeEndpointChanged is called when a change in IP or port is detected.
func (tr *tableRevalidation) nodeEndpointChanged(tab *Table, n *tableNode) {
n.isValidatedLive = false
tr.moveToList(&tr.fast, n, tab.cfg.Clock.Now(), &tab.rand)
}
// run performs node revalidation.
// It returns the next time it should be invoked, which is used in the Table main loop
// to schedule a timer. However, run can be called at any time.
//
// run acquires tab.mutex to synchronize with node additions performed from
// the doRefresh background goroutine, which reach this file through
// tab.handleAddNode -> tr.nodeAdded -> list.push -> list.schedule and therefore
// mutate the same revalidationList fields that run reads.
func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
runList := func(list *revalidationList) {
if list.nextTime <= now {
if n := list.get(&tab.rand, tr.activeReq); n != nil {
tr.startRequest(tab, n)
}
// Update nextTime regardless if any requests were started because
// current value has passed.
list.schedule(now, &tab.rand)
}
}
runList(&tr.fast)
runList(&tr.slow)
return min(tr.fast.nextTime, tr.slow.nextTime)
}
// startRequest spawns a revalidation request for node n.
// The caller must hold tab.mutex.
func (tr *tableRevalidation) startRequest(tab *Table, n *tableNode) {
if _, ok := tr.activeReq[n.ID()]; ok {
panic(fmt.Errorf("duplicate startRequest (node %v)", n.ID()))
}
tr.activeReq[n.ID()] = struct{}{}
resp := revalidationResponse{n: n}
node := n.Node
go tab.doRevalidate(resp, node)
}
func (tab *Table) doRevalidate(resp revalidationResponse, node *enode.Node) {
// Ping the selected node and wait for a pong response.
remoteSeq, err := tab.net.ping(node)
resp.didRespond = err == nil
// Also fetch record if the node replied and returned a higher sequence number.
if remoteSeq > node.Seq() {
newrec, err := tab.net.RequestENR(node)
if err != nil {
tab.log.Debug("ENR request failed", "id", node.ID(), "err", err)
} else {
resp.newRecord = newrec
}
}
select {
case tab.revalResponseCh <- resp:
case <-tab.closed:
}
}
// handleResponse processes the result of a revalidation request.
func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationResponse) {
var (
now = tab.cfg.Clock.Now()
n = resp.n
b = tab.bucket(n.ID())
)
delete(tr.activeReq, n.ID())
// If the node was removed from the table while getting checked, we need to stop
// processing here to avoid re-adding it.
if n.revalList == nil {
return
}
// Store potential seeds in database.
// This is done via defer to avoid holding Table lock while writing to DB.
defer func() {
if n.isValidatedLive && n.livenessChecks > 5 {
tab.db.UpdateNode(resp.n.Node)
}
}()
// Remaining logic needs access to Table internals.
tab.mutex.Lock()
defer tab.mutex.Unlock()
if !resp.didRespond {
n.livenessChecks /= 3
if n.livenessChecks <= 0 {
tab.deleteInBucket(b, n.ID())
} else {
tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
tr.moveToList(&tr.fast, n, now, &tab.rand)
}
return
}
// The node responded.
n.livenessChecks++
n.isValidatedLive = true
tab.log.Debug("Node revalidated", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
var endpointChanged bool
if resp.newRecord != nil {
_, endpointChanged = tab.bumpInBucket(b, resp.newRecord, false)
}
// Node moves to slow list if it passed and hasn't changed.
if !endpointChanged {
tr.moveToList(&tr.slow, n, now, &tab.rand)
}
}
// moveToList ensures n is in the 'dest' list.
func (tr *tableRevalidation) moveToList(dest *revalidationList, n *tableNode, now mclock.AbsTime, rand randomSource) {
if n.revalList == dest {
return
}
if n.revalList != nil {
n.revalList.remove(n)
}
dest.push(n, now, rand)
}
// revalidationList holds a list nodes and the next revalidation time.
type revalidationList struct {
nodes []*tableNode
nextTime mclock.AbsTime
interval time.Duration
name string
}
// get returns a random node from the queue. Nodes in the 'exclude' map are not returned.
func (list *revalidationList) get(rand randomSource, exclude map[enode.ID]struct{}) *tableNode {
if len(list.nodes) == 0 {
return nil
}
for i := 0; i < len(list.nodes)*3; i++ {
n := list.nodes[rand.Intn(len(list.nodes))]
_, excluded := exclude[n.ID()]
if !excluded {
return n
}
}
return nil
}
func (list *revalidationList) schedule(now mclock.AbsTime, rand randomSource) {
list.nextTime = now.Add(time.Duration(rand.Int63n(int64(list.interval))))
}
func (list *revalidationList) push(n *tableNode, now mclock.AbsTime, rand randomSource) {
list.nodes = append(list.nodes, n)
if list.nextTime == never {
list.schedule(now, rand)
}
n.revalList = list
}
func (list *revalidationList) remove(n *tableNode) {
i := slices.Index(list.nodes, n)
if i == -1 {
panic(fmt.Errorf("node %v not found in list", n.ID()))
}
list.nodes = slices.Delete(list.nodes, i, i+1)
if len(list.nodes) == 0 {
list.nextTime = never
}
n.revalList = nil
}
func (list *revalidationList) contains(id enode.ID) bool {
return slices.ContainsFunc(list.nodes, func(n *tableNode) bool {
return n.ID() == id
})
}