diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 750c224230..914e1dfada 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -36,7 +36,6 @@ type beaconBackfiller struct { downloader *Downloader // Downloader to direct via this callback implementation success func() // Callback to run on successful sync cycle completion filling bool // Flag whether the downloader is backfilling or not - filled *types.Header // Last header filled by the last terminated sync loop started chan struct{} // Notification channel whether the downloader inited lock sync.Mutex // Mutex protecting the sync lock } @@ -56,13 +55,15 @@ func (b *beaconBackfiller) suspend() *types.Header { // If no filling is running, don't waste cycles b.lock.Lock() filling := b.filling - filled := b.filled started := b.started b.lock.Unlock() if !filling { + // Sync cycle was inactive, retrieve and return the latest snap block + // as the filled header. log.Debug("Backfiller was inactive") - return filled // Return the filled header on the previous sync completion + + return b.downloader.blockchain.CurrentSnapBlock() } // A previous filling should be running, though it may happen that it hasn't // yet started (being done on a new goroutine). Many concurrent beacon head @@ -77,7 +78,6 @@ func (b *beaconBackfiller) suspend() *types.Header { log.Debug("Backfiller has been suspended") // Sync cycle was just terminated, retrieve and return the last filled header. - // Can't use `filled` as that contains a stale value from before cancellation. return b.downloader.blockchain.CurrentSnapBlock() } @@ -92,7 +92,6 @@ func (b *beaconBackfiller) resume() { return } b.filling = true - b.filled = nil b.started = make(chan struct{}) b.lock.Unlock() @@ -103,7 +102,6 @@ func (b *beaconBackfiller) resume() { defer func() { b.lock.Lock() b.filling = false - b.filled = b.downloader.blockchain.CurrentSnapBlock() b.lock.Unlock() }() // If the downloader fails, report an error as in beacon chain mode there @@ -113,7 +111,7 @@ func (b *beaconBackfiller) resume() { return } // Synchronization succeeded. Since this happens async, notify the outer - // context to disable snap syncing and enable transaction propagation. + // context to enable transaction propagation. if b.success != nil { b.success() } @@ -188,6 +186,8 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { log.Error("Failed to retrieve beacon bounds", "err", err) return 0, err } + log.Debug("Searching beacon ancestor", "local", number, "beaconhead", beaconHead.Number, "beacontail", beaconTail.Number) + var linked bool switch d.getMode() { case ethconfig.FullSync: @@ -241,6 +241,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { } start = check } + log.Debug("Found beacon ancestor", "number", start) return start, nil } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e16014be95..caeb3d64dd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -248,7 +248,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), } // Create the post-merge skeleton syncer and start the process - dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success), chain) go dl.stateFetcher() return dl diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index c498ac84ec..e693bfc066 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -207,6 +207,7 @@ type backfiller interface { type skeleton struct { db ethdb.Database // Database backing the skeleton filler backfiller // Chain syncer suspended/resumed by head events + chain chainReader // Underlying block chain peers *peerSet // Set of peers we can sync from idles map[string]*peerConnection // Set of idle peers in the current sync cycle @@ -231,12 +232,19 @@ type skeleton struct { syncStarting func() // callback triggered after a sync cycle is inited but before started } +// chainReader wraps the method to retrieve the head of the local chain. +type chainReader interface { + // CurrentSnapBlock retrieves the head snap block from the local chain. + CurrentSnapBlock() *types.Header +} + // newSkeleton creates a new sync skeleton that tracks a potentially dangling // header chain until it's linked into an existing set of blocks. -func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { +func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller, chain chainReader) *skeleton { sk := &skeleton{ db: db, filler: filler, + chain: chain, peers: peers, drop: drop, requests: make(map[uint64]*headerRequest), @@ -354,6 +362,29 @@ func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) err } } +// linked returns the flag indicating whether the skeleton has been linked with +// the local chain. +func (s *skeleton) linked(number uint64, hash common.Hash) bool { + linked := rawdb.HasHeader(s.db, hash, number) && + rawdb.HasBody(s.db, hash, number) && + rawdb.HasReceipts(s.db, hash, number) + + // Ensure the skeleton chain links to the local chain below the chain head. + // This accounts for edge cases where leftover chain segments above the head + // may still link to the skeleton chain. In such cases, synchronization is + // likely to fail due to potentially missing segments in the middle. + // + // You can try to produce the edge case by these steps: + // - sync the chain + // - debug.setHead(`0x1`) + // - kill the geth process (the chain segment will be left with chain head rewound) + // - restart + if s.chain.CurrentSnapBlock() != nil { + linked = linked && s.chain.CurrentSnapBlock().Number.Uint64() >= number + } + return linked +} + // sync is the internal version of Sync that executes a single sync cycle, either // until some termination condition is reached, or until the current cycle merges // with a previously aborted run. @@ -378,10 +409,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // If the sync is already done, resume the backfiller. When the loop stops, // terminate the backfiller too. - linked := len(s.progress.Subchains) == 1 && - rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) && - rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) && - rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead) + linked := len(s.progress.Subchains) == 1 && s.linked(s.scratchHead, s.progress.Subchains[0].Next) if linked { s.filler.resume() } @@ -497,12 +525,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // is still running, it will pick it up. If it already terminated, // a new cycle needs to be spun up. if linked { - linked = len(s.progress.Subchains) == 1 && - rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) && - rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) && - rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead) - - if linked { + if len(s.progress.Subchains) == 1 && s.linked(s.scratchHead, s.progress.Subchains[0].Next) { // The skeleton chain has been extended and is still linked with the local // chain, try to re-schedule the backfiller if it's already terminated. s.filler.resume() @@ -946,6 +969,45 @@ func (s *skeleton) revertRequest(req *headerRequest) { s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = "" } +// mergeSubchains is invoked once certain beacon headers have been persisted locally +// and the subchains should be merged in case there are some overlaps between. An +// indicator will be returned if the last subchain is merged with previous subchain. +func (s *skeleton) mergeSubchains() bool { + // If the subchain extended into the next subchain, we need to handle + // the overlap. Since there could be many overlaps, do this in a loop. + var merged bool + for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail { + // Extract some stats from the second subchain + head := s.progress.Subchains[1].Head + tail := s.progress.Subchains[1].Tail + next := s.progress.Subchains[1].Next + + // Since we just overwrote part of the next subchain, we need to trim + // its head independent of matching or mismatching content + if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail { + // Fully overwritten, get rid of the subchain as a whole + log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + continue + } else { + // Partially overwritten, trim the head to the overwritten size + log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1 + } + // If the old subchain is an extension of the new one, merge the two + // and let the skeleton syncer restart (to clean internal state) + if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next { + log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next) + s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail + s.progress.Subchains[0].Next = s.progress.Subchains[1].Next + + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + merged = true + } + } + return merged +} + func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) { res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers)) @@ -1019,10 +1081,9 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo // processing is done, so it's just one more "needless" check. // // The weird cascading checks are done to minimize the database reads. - linked = rawdb.HasHeader(s.db, header.ParentHash, header.Number.Uint64()-1) && - rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1) && - rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1) + linked = s.linked(header.Number.Uint64()-1, header.ParentHash) if linked { + log.Debug("Primary subchain linked", "number", header.Number.Uint64()-1, "hash", header.ParentHash) break } } @@ -1036,6 +1097,9 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo // If the beacon chain was linked to the local chain, completely swap out // all internal progress and abort header synchronization. if linked { + // Merge all overlapped subchains beforehand + s.mergeSubchains() + // Linking into the local chain should also mean that there are no // leftover subchains, but in the case of importing the blocks via // the engine API, we will not push the subchains forward. This will @@ -1093,41 +1157,10 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo s.scratchHead -= uint64(consumed) - // If the subchain extended into the next subchain, we need to handle - // the overlap. Since there could be many overlaps (come on), do this - // in a loop. - for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail { - // Extract some stats from the second subchain - head := s.progress.Subchains[1].Head - tail := s.progress.Subchains[1].Tail - next := s.progress.Subchains[1].Next - - // Since we just overwrote part of the next subchain, we need to trim - // its head independent of matching or mismatching content - if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail { - // Fully overwritten, get rid of the subchain as a whole - log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next) - s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) - continue - } else { - // Partially overwritten, trim the head to the overwritten size - log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next) - s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1 - } - // If the old subchain is an extension of the new one, merge the two - // and let the skeleton syncer restart (to clean internal state) - if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next { - log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next) - s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail - s.progress.Subchains[0].Next = s.progress.Subchains[1].Next - - s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) - merged = true - } - } // If subchains were merged, all further available headers in the scratch // space are invalid since we skipped ahead. Stop processing the scratch // space to avoid dropping peers thinking they delivered invalid data. + merged = s.mergeSubchains() if merged { break } @@ -1158,15 +1191,17 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo // due to the downloader backfilling past the tracked tail. func (s *skeleton) cleanStales(filled *types.Header) error { number := filled.Number.Uint64() - log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash()) + log.Debug("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash()) - // If the filled header is below the linked subchain, something's corrupted - // internally. Report and error and refuse to do anything. + // If the filled header is below the subchain, it means the skeleton is not + // linked with local chain yet, don't bother to do cleanup. if number+1 < s.progress.Subchains[0].Tail { - return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail) + log.Debug("filled header below beacon header tail", "filled", number, "tail", s.progress.Subchains[0].Tail) + return nil } // If nothing in subchain is filled, don't bother to do cleanup. if number+1 == s.progress.Subchains[0].Tail { + log.Debug("Skeleton chain not yet consumed", "filled", number, "hash", filled.Hash(), "tail", s.progress.Subchains[0].Tail) return nil } // If the latest fill was on a different subchain, it means the backfiller diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 4aa97cf1f7..5c54b4b5c2 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "sync/atomic" "testing" @@ -71,6 +72,12 @@ func (hf *hookedBackfiller) resume() { } } +type fakeChainReader struct{} + +func (fc *fakeChainReader) CurrentSnapBlock() *types.Header { + return &types.Header{Number: big.NewInt(math.MaxInt64)} +} + // skeletonTestPeer is a mock peer that can only serve header requests from a // pre-perated header chain (which may be arbitrarily wrong for testing). // @@ -369,7 +376,7 @@ func TestSkeletonSyncInit(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) + skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller(), &fakeChainReader{}) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, nil, true) @@ -472,7 +479,7 @@ func TestSkeletonSyncExtend(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) + skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller(), &fakeChainReader{}) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, nil, true) @@ -885,7 +892,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { } } // Create a skeleton sync and run a cycle - skeleton := newSkeleton(db, peerset, drop, filler) + skeleton := newSkeleton(db, peerset, drop, filler, &fakeChainReader{}) skeleton.Sync(tt.head, nil, true) // Wait a bit (bleah) for the initial sync loop to go to idle. This might