diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 8c38e9d0c5..b3d89a8e9e 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -941,6 +941,127 @@ func TestSkeletonSyncRetrievals(t *testing.T) { } } +// Tests that when a forced head event arrives during the suspend window of a +// sync restart (errSyncReorged), it is captured and used as the restart head +// instead of the stale reorg head. This prevents a death spiral on chains with +// fast block times where suspend() blocks long enough for new heads to arrive. +func TestSkeletonSyncDeferredHead(t *testing.T) { + // Create a short chain and a peer that can serve it + chain := []*types.Header{{Number: big.NewInt(0)}} + for i := 1; i <= 100; i++ { + chain = append(chain, &types.Header{ + ParentHash: chain[i-1].Hash(), + Number: big.NewInt(int64(i)), + }) + } + // Create a reorg head at the same height but with a non-linking parent hash. + // This triggers errChainForked in processNewHead, which with force=true + // becomes errSyncReorged. + reorgHead := &types.Header{ + ParentHash: common.Hash{0xff}, // doesn't match chain[99] + Number: big.NewInt(100), + Extra: []byte("reorg"), + } + // Create a later "deferred" head that should be used instead of reorgHead. + // It's on a completely separate chain so the skeleton reinits with it. + deferredChain := []*types.Header{{Number: big.NewInt(0)}} + for i := 1; i <= 200; i++ { + deferredChain = append(deferredChain, &types.Header{ + ParentHash: deferredChain[i-1].Hash(), + Number: big.NewInt(int64(i)), + Extra: []byte("deferred"), + }) + } + deferredHead := deferredChain[200] + + // Set up database with genesis + db := rawdb.NewMemoryDatabase() + rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0])) + rawdb.WriteReceipts(db, chain[0].Hash(), 0, types.Receipts{}) + + // Channels to coordinate the suspend hook timing. The first suspend() call + // happens during errSyncLinked (initial header fill). We use it as a + // synchronization point: once it fires, we know the skeleton has linked + // and restarted into a linked sync loop. Only the second suspend (triggered + // by errSyncReorged) is blocked to create the window for deferred heads. + firstSuspendDone := make(chan struct{}) + suspendStarted := make(chan struct{}) + suspendUnblock := make(chan struct{}) + var suspendCount atomic.Int32 + + filler := &hookedBackfiller{ + resumeHook: func() {}, + suspendHook: func() *types.Header { + if suspendCount.Add(1) < 2 { + close(firstSuspendDone) // signal that initial fill completed + return nil + } + // Signal that the reorg-triggered suspend started, then block + select { + case suspendStarted <- struct{}{}: + default: + } + <-suspendUnblock + return nil + }, + } + peer := newSkeletonTestPeer("test-peer", chain) + peerset := newPeerSet() + peerset.Register(newPeerConnection(peer.id, eth.ETH69, peer, log.New("id", peer.id))) + + skeleton := newSkeleton(db, peerset, nil, filler, &fakeChainReader{}) + + // Start sync and wait for the first suspend to confirm the chain linked + // and the skeleton restarted into a linked sync loop. + skeleton.Sync(chain[100], nil, true) + + select { + case <-firstSuspendDone: + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for initial sync to link") + } + // Send the reorg head — this triggers errSyncReorged, whose defer calls + // filler.suspend() which will block on our hook + go skeleton.Sync(reorgHead, nil, true) + + // Wait for suspend to start (meaning we're in the defer draining head events) + select { + case <-suspendStarted: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for suspend to start") + } + // While suspend is blocked, send a new forced head. This should be captured + // as the deferred head. + go skeleton.Sync(deferredHead, nil, true) + + // Give the head event a moment to be received by the drain loop + time.Sleep(100 * time.Millisecond) + + // Unblock suspend — the skeleton should restart with deferredHead + close(suspendUnblock) + + // Wait for the skeleton to process the deferred head and re-init + waitStart := time.Now() + for waitTime := 20 * time.Millisecond; time.Since(waitStart) < 5*time.Second; waitTime = waitTime * 2 { + time.Sleep(waitTime) + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + if len(progress.Subchains) > 0 && progress.Subchains[0].Head == 200 { + break + } + } + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) == 0 { + t.Fatal("no subchains after deferred head sync") + } + if progress.Subchains[0].Head != 200 { + t.Errorf("skeleton restarted with wrong head: have %d, want %d (deferred head was not used)", progress.Subchains[0].Head, 200) + } + skeleton.Terminate() +} + func checkSkeletonProgress(db ethdb.KeyValueReader, unpredictable bool, peers []*skeletonTestPeer, expected skeletonExpect) error { var progress skeletonProgress // Check the post-init end state if it matches the required results