diff --git a/eth/protocols/snap/syncv2.go b/eth/protocols/snap/syncv2.go index ac84008697..bad52de184 100644 --- a/eth/protocols/snap/syncv2.go +++ b/eth/protocols/snap/syncv2.go @@ -69,6 +69,10 @@ const ( // are still hashes left to fetch. var errAccessListPeersExhausted = errors.New("all peers exhausted for BAL requests") +// errAccessListUnavailable is returned from the BAL catch-up when some gap +// block's access list cannot be retrieved against the current peerset. +var errAccessListUnavailable = errors.New("block access lists unavailable") + // accountRequestV2 tracks a pending account range request to ensure responses are // to actual requests and to validate any security constraints. // @@ -823,25 +827,22 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has } fetched := make(map[common.Hash]rlp.RawValue, len(hashes)) + // refused tracks the mapping between BAL and peerset which doesn't have + // it available. + refused := make(map[common.Hash]map[string]struct{}) + var ( accessListReqFails = make(chan *accessListRequest) accessListResps = make(chan *accessListResponse) lastStallLog = time.Now() ) for len(fetched) < len(hashes) { - // Assign BAL retrieval tasks to idle peers - s.assignAccessListTasks(pending, accessListResps, accessListReqFails, cancel) - - // If every peer is now stateless and nothing is in flight, no event - // short of cancel or a new peer joining can move us forward. Surface - // this so the caller can return and let a higher-level retry happen - // against a fresh peer set. - // - // TODO(rjl, jonny) add a time allowance before returning the error. - if s.accessListPeersExhausted() { - log.Warn("BAL peers exhausted, stopping catch-up early", "fetched", len(fetched), "remaining", len(pending)) - return nil, errAccessListPeersExhausted + if err := s.checkAccessListProgress(pending, refused); err != nil { + log.Warn("BAL fetch cannot progress", "err", err, "fetched", len(fetched), "remaining", len(pending)) + return nil, err } + // Assign BAL retrieval tasks to idle peers + s.assignAccessListTasks(pending, refused, accessListResps, accessListReqFails, cancel) // Periodic visibility while stalled with peers connected but idle. if len(pending) > 0 && time.Since(lastStallLog) > 30*time.Second { @@ -857,12 +858,18 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has // A new peer joined, try to assign it work case id := <-peerDrop: s.revertBALRequests(id, pending) + for h, set := range refused { + delete(set, id) + if len(set) == 0 { + delete(refused, h) + } + } case <-cancel: return nil, ErrCancelled case req := <-accessListReqFails: s.revertAccessListRequest(req, pending) case res := <-accessListResps: - s.processAccessListResponse(res, headers, pending, fetched) + s.processAccessListResponse(res, headers, pending, fetched, refused) } } // Assemble results in input order @@ -874,8 +881,9 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has } // assignAccessListTasks attempts to assign BAL fetch requests to idle -// peers for any hashes still in pending. -func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, success chan *accessListResponse, fail chan *accessListRequest, cancel chan struct{}) { +// peers for any hashes still in pending. Hashes a peer has already refused +// (recorded in refused) are not assigned back to that same peer. +func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, refused map[common.Hash]map[string]struct{}, success chan *accessListResponse, fail chan *accessListRequest, cancel chan struct{}) { s.lock.Lock() defer s.lock.Unlock() @@ -889,6 +897,33 @@ func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, succe ) idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:] + // Collect hashes to fetch, capped by peer capacity and the + // EIP-8189 2 MiB response soft limit (~72 KiB/BAL -> 28 blocks). + if cap > maxAccessListRequestCount { + cap = maxAccessListRequestCount + } + batch := make([]common.Hash, 0, cap) + for h := range pending { + // Skip hashes this peer has already refused; another peer + // must serve them. + if set := refused[h]; set != nil { + if _, ok := set[idle]; ok { + continue + } + } + delete(pending, h) + + batch = append(batch, h) + if len(batch) >= cap { + break + } + } + // The peer has already refused every pending hash; leave them in + // pending for another peer and move on without a wasted request. + if len(batch) == 0 { + continue + } + // Generate a unique request ID var reqid uint64 for { @@ -901,21 +936,6 @@ func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, succe } break } - - // Collect hashes to fetch, capped by peer capacity and the - // EIP-8189 2 MiB response soft limit (~72 KiB/BAL -> 28 blocks). - if cap > maxAccessListRequestCount { - cap = maxAccessListRequestCount - } - batch := make([]common.Hash, 0, cap) - for h := range pending { - delete(pending, h) - - batch = append(batch, h) - if len(batch) >= cap { - break - } - } req := &accessListRequest{ peer: idle, id: reqid, @@ -950,7 +970,7 @@ func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, succe // processAccessListResponse handles a successful BAL response. It // verifies each non-empty BAL against the corresponding block header and // stores the verified ones in fetched. -func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers map[common.Hash]*types.Header, pending map[common.Hash]struct{}, fetched map[common.Hash]rlp.RawValue) { +func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers map[common.Hash]*types.Header, pending map[common.Hash]struct{}, fetched map[common.Hash]rlp.RawValue, refused map[common.Hash]map[string]struct{}) { var ( stateless bool valid = make(map[common.Hash]rlp.RawValue) @@ -959,8 +979,14 @@ func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers ma for i, raw := range res.accessLists { h := res.req.hashes[i] - // Peer doesn't have this BAL. Add it back to pending for retry. + // Peer doesn't have this BAL (a legitimate reply, e.g. the block is + // outside its retention window). Record the refusal and add the hash + // back to pending for a retry against other peers. if bytes.Equal(raw, rlp.EmptyString) { + if refused[h] == nil { + refused[h] = make(map[string]struct{}) + } + refused[h][res.req.peer] = struct{}{} continue } var b bal.BlockAccessList @@ -984,6 +1010,7 @@ func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers ma // Re-add hashes that were not served back or invalid to pending for i := 0; i < len(res.req.hashes); i++ { if _, ok := valid[res.req.hashes[i]]; ok { + delete(refused, res.req.hashes[i]) continue } pending[res.req.hashes[i]] = struct{}{} @@ -2632,25 +2659,45 @@ func (s *syncerV2) reportSyncProgressV2(force bool) { "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed)) } -// accessListPeersExhausted reports whether forward progress on BAL fetches is -// impossible: at least one peer is connected, every connected peer is marked -// stateless, and no BAL requests are in flight. -func (s *syncerV2) accessListPeersExhausted() bool { +// checkAccessListProgress reports whether the BAL fetch can still make +// forward progress against the current peer set. +func (s *syncerV2) checkAccessListProgress(pending map[common.Hash]struct{}, refused map[common.Hash]map[string]struct{}) error { s.lock.RLock() defer s.lock.RUnlock() if len(s.peers) == 0 { - return false + return nil } if len(s.accessListReqs) > 0 { - return false + return nil } + serviceable := make(map[string]struct{}, len(s.peers)) for id := range s.peers { if _, ok := s.statelessPeers[id]; !ok { - return false + serviceable[id] = struct{}{} } } - return true + if len(serviceable) == 0 { + return errAccessListPeersExhausted + } + for h, set := range refused { + // Delivered by some other peer after all + if _, ok := pending[h]; !ok { + continue + } + unobtainable := true + for id := range serviceable { + if _, ok := set[id]; !ok { + unobtainable = false + break + } + } + if unobtainable { + log.Warn("Access list unavailable from all peers", "hash", h) + return errAccessListUnavailable + } + } + return nil } // sortIdlePeers builds a list of idle peers sorted by download capacity