mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 13:21:37 +00:00
Merge 6702bfb432 into 7c9032dff6
This commit is contained in:
commit
7c55e55ad5
2 changed files with 72 additions and 7 deletions
|
|
@ -76,8 +76,16 @@ func (tr *tableRevalidation) nodeEndpointChanged(tab *Table, n *tableNode) {
|
||||||
// run performs node revalidation.
|
// run performs node revalidation.
|
||||||
// It returns the next time it should be invoked, which is used in the Table main loop
|
// It returns the next time it should be invoked, which is used in the Table main loop
|
||||||
// to schedule a timer. However, run can be called at any time.
|
// to schedule a timer. However, run can be called at any time.
|
||||||
|
//
|
||||||
|
// run acquires tab.mutex to synchronize with node additions performed from
|
||||||
|
// the doRefresh background goroutine, which reach this file through
|
||||||
|
// tab.handleAddNode -> tr.nodeAdded -> list.push -> list.schedule and therefore
|
||||||
|
// mutate the same revalidationList fields that run reads.
|
||||||
func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) {
|
func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) {
|
||||||
reval := func(list *revalidationList) {
|
tab.mutex.Lock()
|
||||||
|
defer tab.mutex.Unlock()
|
||||||
|
|
||||||
|
runList := func(list *revalidationList) {
|
||||||
if list.nextTime <= now {
|
if list.nextTime <= now {
|
||||||
if n := list.get(&tab.rand, tr.activeReq); n != nil {
|
if n := list.get(&tab.rand, tr.activeReq); n != nil {
|
||||||
tr.startRequest(tab, n)
|
tr.startRequest(tab, n)
|
||||||
|
|
@ -87,24 +95,21 @@ func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mcloc
|
||||||
list.schedule(now, &tab.rand)
|
list.schedule(now, &tab.rand)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reval(&tr.fast)
|
runList(&tr.fast)
|
||||||
reval(&tr.slow)
|
runList(&tr.slow)
|
||||||
|
|
||||||
return min(tr.fast.nextTime, tr.slow.nextTime)
|
return min(tr.fast.nextTime, tr.slow.nextTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// startRequest spawns a revalidation request for node n.
|
// startRequest spawns a revalidation request for node n.
|
||||||
|
// The caller must hold tab.mutex.
|
||||||
func (tr *tableRevalidation) startRequest(tab *Table, n *tableNode) {
|
func (tr *tableRevalidation) startRequest(tab *Table, n *tableNode) {
|
||||||
if _, ok := tr.activeReq[n.ID()]; ok {
|
if _, ok := tr.activeReq[n.ID()]; ok {
|
||||||
panic(fmt.Errorf("duplicate startRequest (node %v)", n.ID()))
|
panic(fmt.Errorf("duplicate startRequest (node %v)", n.ID()))
|
||||||
}
|
}
|
||||||
tr.activeReq[n.ID()] = struct{}{}
|
tr.activeReq[n.ID()] = struct{}{}
|
||||||
resp := revalidationResponse{n: n}
|
resp := revalidationResponse{n: n}
|
||||||
|
|
||||||
// Fetch the node while holding lock.
|
|
||||||
tab.mutex.Lock()
|
|
||||||
node := n.Node
|
node := n.Node
|
||||||
tab.mutex.Unlock()
|
|
||||||
|
|
||||||
go tab.doRevalidate(resp, node)
|
go tab.doRevalidate(resp, node)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package discover
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -117,3 +118,62 @@ func TestRevalidation_endpointUpdate(t *testing.T) {
|
||||||
t.Fatal("node is marked live after endpoint change")
|
t.Fatal("node is marked live after endpoint change")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRevalidation_concurrentAddAndRun reproduces the data race between the
|
||||||
|
// Table.loop goroutine (which calls tableRevalidation.run) and the doRefresh
|
||||||
|
// goroutine (which reaches tableRevalidation.nodeAdded via handleAddNode) on
|
||||||
|
// revalidationList.nextTime and revalidationList.nodes. See issue #31460.
|
||||||
|
//
|
||||||
|
// Without proper locking, this test reliably flags a race under "go test -race".
|
||||||
|
func TestRevalidation_concurrentAddAndRun(t *testing.T) {
|
||||||
|
var (
|
||||||
|
transport = newPingRecorder()
|
||||||
|
// Real clock + small ping interval so that list.schedule produces
|
||||||
|
// nextTime values close to now, and tr.run repeatedly enters the body.
|
||||||
|
tab, db = newInactiveTestTable(transport, Config{PingInterval: time.Millisecond})
|
||||||
|
)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Seed the fast list so tr.run has something to iterate over.
|
||||||
|
for i := 0; i < 16; i++ {
|
||||||
|
n := nodeAtDistance(tab.self().ID(), 255, net.IP{10, 0, 0, byte(i)})
|
||||||
|
tab.mutex.Lock()
|
||||||
|
tab.handleAddNode(addNodeOp{node: n})
|
||||||
|
tab.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// A barrier so both goroutines start their loops together.
|
||||||
|
start := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
const iterations = 2000
|
||||||
|
|
||||||
|
// Background goroutine: simulate doRefresh -> loadSeedNodes -> handleAddNode,
|
||||||
|
// which reaches tr.nodeAdded and appends to list.nodes (and may write
|
||||||
|
// list.nextTime via list.schedule).
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
n := nodeAtDistance(tab.self().ID(), 200, net.IP{11, 0, byte(i >> 8), byte(i)})
|
||||||
|
tab.mutex.Lock()
|
||||||
|
tab.handleAddNode(addNodeOp{node: n})
|
||||||
|
tab.mutex.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Foreground goroutine: simulate Table.loop repeatedly calling tr.run,
|
||||||
|
// which reads list.nextTime and list.nodes and may write list.nextTime
|
||||||
|
// via list.schedule.
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
tab.revalidation.run(tab, mclock.System{}.Now())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue