diff --git a/eth/api_backend.go b/eth/api_backend.go index 3f826b7861..e96837000a 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -63,6 +63,7 @@ func (b *EthAPIBackend) CurrentBlock() *types.Header { func (b *EthAPIBackend) SetHead(number uint64) { b.eth.handler.downloader.Cancel() + b.eth.handler.downloader.ResetSkeleton() b.eth.blockchain.SetHead(number) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index caeb3d64dd..3db366a7ec 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -116,6 +116,7 @@ type Downloader struct { // Callbacks dropPeer peerDropFn // Drops a peer for misbehaving badBlock badBlockFn // Reports a block as rejected by the chain + success func() // Callback to signal successful sync completion // Status synchronising atomic.Bool @@ -241,6 +242,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch chainCutoffNumber: cutoffNumber, chainCutoffHash: cutoffHash, dropPeer: dropPeer, + success: success, headerProcCh: make(chan *headerTask, 1), quitCh: make(chan struct{}), SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), @@ -659,6 +661,14 @@ func (d *Downloader) Cancel() { d.blockchain.InterruptInsert(false) } +// ResetSkeleton terminates the skeleton syncer and reinitializes it. +func (d *Downloader) ResetSkeleton() { + log.Debug("Resetting skeleton syncer due to chain rewind") + d.skeleton.Terminate() + rawdb.DeleteSkeletonSyncStatus(d.stateDB) + d.skeleton = newSkeleton(d.stateDB, d.peers, d.dropPeer, newBeaconBackfiller(d, d.success)) +} + // Terminate interrupts the downloader, canceling all pending operations. // The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e5d4a7c59b..febf2ed204 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -743,3 +743,46 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } } + +// TestSkeletonResetAfterSetHead tests that the skeleton syncer is properly reset +// when the chain is rewound using SetHead, preventing data inconsistency issues. +func TestSkeletonResetAfterSetHead(t *testing.T) { + tester := newTester(t, ethconfig.SnapSync) + defer tester.terminate() + + chain := testChainBase.shorten(800) + tester.newPeer("peer", eth.ETH68, chain.blocks[1:]) + + if _, err := tester.chain.InsertChain(chain.blocks[1:401]); err != nil { + t.Fatalf("Failed to insert chain: %v", err) + } + + // Start beacon sync to populate the skeleton + header := chain.blocks[400].Header() + if err := tester.downloader.BeaconSync(header, header); err != nil { + t.Fatalf("Failed to start beacon sync: %v", err) + } + + // Wait for the skeleton state + time.Sleep(20 * time.Millisecond) + + // Check skeleton sync status exists in database before SetHead + if skeleton := rawdb.ReadSkeletonSyncStatus(tester.downloader.stateDB); len(skeleton) == 0 { + t.Fatal("Skeleton sync status should exist in database before SetHead") + } + + // Simulate chain rewind by calling SetHead + tester.downloader.Cancel() + tester.downloader.ResetSkeleton() + tester.chain.SetHead(200) + + // Verify skeleton sync status was cleared from database + if skeleton := rawdb.ReadSkeletonSyncStatus(tester.downloader.stateDB); len(skeleton) != 0 { + t.Fatal("Skeleton sync status should be cleared from database after SetHead") + } + + // Verify we can start a new sync after reset + if err := tester.downloader.BeaconSync(header, header); err != nil { + t.Fatalf("Failed to start beacon sync after reset: %v", err) + } +}