eth/downloader: add test for deferred head during skeleton sync restart

Test that when a forced head event arrives during the suspend window
of a sync restart, it is captured and used as the restart target
instead of the stale reorg head.
This commit is contained in:
Karl Bartel 2026-03-11 12:42:06 +01:00
parent 913720b201
commit be444c29a3
No known key found for this signature in database

View file

@ -941,6 +941,127 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
}
}
// Tests that when a forced head event arrives during the suspend window of a
// sync restart (errSyncReorged), it is captured and used as the restart head
// instead of the stale reorg head. This prevents a death spiral on chains with
// fast block times where suspend() blocks long enough for new heads to arrive.
func TestSkeletonSyncDeferredHead(t *testing.T) {
// Create a short chain and a peer that can serve it
chain := []*types.Header{{Number: big.NewInt(0)}}
for i := 1; i <= 100; i++ {
chain = append(chain, &types.Header{
ParentHash: chain[i-1].Hash(),
Number: big.NewInt(int64(i)),
})
}
// Create a reorg head at the same height but with a non-linking parent hash.
// This triggers errChainForked in processNewHead, which with force=true
// becomes errSyncReorged.
reorgHead := &types.Header{
ParentHash: common.Hash{0xff}, // doesn't match chain[99]
Number: big.NewInt(100),
Extra: []byte("reorg"),
}
// Create a later "deferred" head that should be used instead of reorgHead.
// It's on a completely separate chain so the skeleton reinits with it.
deferredChain := []*types.Header{{Number: big.NewInt(0)}}
for i := 1; i <= 200; i++ {
deferredChain = append(deferredChain, &types.Header{
ParentHash: deferredChain[i-1].Hash(),
Number: big.NewInt(int64(i)),
Extra: []byte("deferred"),
})
}
deferredHead := deferredChain[200]
// Set up database with genesis
db := rawdb.NewMemoryDatabase()
rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0]))
rawdb.WriteReceipts(db, chain[0].Hash(), 0, types.Receipts{})
// Channels to coordinate the suspend hook timing. The first suspend() call
// happens during errSyncLinked (initial header fill). We use it as a
// synchronization point: once it fires, we know the skeleton has linked
// and restarted into a linked sync loop. Only the second suspend (triggered
// by errSyncReorged) is blocked to create the window for deferred heads.
firstSuspendDone := make(chan struct{})
suspendStarted := make(chan struct{})
suspendUnblock := make(chan struct{})
var suspendCount atomic.Int32
filler := &hookedBackfiller{
resumeHook: func() {},
suspendHook: func() *types.Header {
if suspendCount.Add(1) < 2 {
close(firstSuspendDone) // signal that initial fill completed
return nil
}
// Signal that the reorg-triggered suspend started, then block
select {
case suspendStarted <- struct{}{}:
default:
}
<-suspendUnblock
return nil
},
}
peer := newSkeletonTestPeer("test-peer", chain)
peerset := newPeerSet()
peerset.Register(newPeerConnection(peer.id, eth.ETH69, peer, log.New("id", peer.id)))
skeleton := newSkeleton(db, peerset, nil, filler, &fakeChainReader{})
// Start sync and wait for the first suspend to confirm the chain linked
// and the skeleton restarted into a linked sync loop.
skeleton.Sync(chain[100], nil, true)
select {
case <-firstSuspendDone:
case <-time.After(30 * time.Second):
t.Fatal("timed out waiting for initial sync to link")
}
// Send the reorg head — this triggers errSyncReorged, whose defer calls
// filler.suspend() which will block on our hook
go skeleton.Sync(reorgHead, nil, true)
// Wait for suspend to start (meaning we're in the defer draining head events)
select {
case <-suspendStarted:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for suspend to start")
}
// While suspend is blocked, send a new forced head. This should be captured
// as the deferred head.
go skeleton.Sync(deferredHead, nil, true)
// Give the head event a moment to be received by the drain loop
time.Sleep(100 * time.Millisecond)
// Unblock suspend — the skeleton should restart with deferredHead
close(suspendUnblock)
// Wait for the skeleton to process the deferred head and re-init
waitStart := time.Now()
for waitTime := 20 * time.Millisecond; time.Since(waitStart) < 5*time.Second; waitTime = waitTime * 2 {
time.Sleep(waitTime)
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) > 0 && progress.Subchains[0].Head == 200 {
break
}
}
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) == 0 {
t.Fatal("no subchains after deferred head sync")
}
if progress.Subchains[0].Head != 200 {
t.Errorf("skeleton restarted with wrong head: have %d, want %d (deferred head was not used)", progress.Subchains[0].Head, 200)
}
skeleton.Terminate()
}
func checkSkeletonProgress(db ethdb.KeyValueReader, unpredictable bool, peers []*skeletonTestPeer, expected skeletonExpect) error {
var progress skeletonProgress
// Check the post-init end state if it matches the required results