mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
eth/protocols/snap: fix catchup stall (#35158)
This PR fixes an issue that when peers legitimately lack a requested BAL, empty (0x80) is delivered and this BAL entry will be refetched over and over again. A `refused` tracker is added and catchUp will fail if this BAL is unavailable against the entire peerset.
This commit is contained in:
parent
e2164cc78c
commit
6ed112aee0
1 changed files with 87 additions and 40 deletions
|
|
@ -69,6 +69,10 @@ const (
|
|||
// are still hashes left to fetch.
|
||||
var errAccessListPeersExhausted = errors.New("all peers exhausted for BAL requests")
|
||||
|
||||
// errAccessListUnavailable is returned from the BAL catch-up when some gap
|
||||
// block's access list cannot be retrieved against the current peerset.
|
||||
var errAccessListUnavailable = errors.New("block access lists unavailable")
|
||||
|
||||
// accountRequestV2 tracks a pending account range request to ensure responses are
|
||||
// to actual requests and to validate any security constraints.
|
||||
//
|
||||
|
|
@ -823,25 +827,22 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has
|
|||
}
|
||||
fetched := make(map[common.Hash]rlp.RawValue, len(hashes))
|
||||
|
||||
// refused tracks the mapping between BAL and peerset which doesn't have
|
||||
// it available.
|
||||
refused := make(map[common.Hash]map[string]struct{})
|
||||
|
||||
var (
|
||||
accessListReqFails = make(chan *accessListRequest)
|
||||
accessListResps = make(chan *accessListResponse)
|
||||
lastStallLog = time.Now()
|
||||
)
|
||||
for len(fetched) < len(hashes) {
|
||||
// Assign BAL retrieval tasks to idle peers
|
||||
s.assignAccessListTasks(pending, accessListResps, accessListReqFails, cancel)
|
||||
|
||||
// If every peer is now stateless and nothing is in flight, no event
|
||||
// short of cancel or a new peer joining can move us forward. Surface
|
||||
// this so the caller can return and let a higher-level retry happen
|
||||
// against a fresh peer set.
|
||||
//
|
||||
// TODO(rjl, jonny) add a time allowance before returning the error.
|
||||
if s.accessListPeersExhausted() {
|
||||
log.Warn("BAL peers exhausted, stopping catch-up early", "fetched", len(fetched), "remaining", len(pending))
|
||||
return nil, errAccessListPeersExhausted
|
||||
if err := s.checkAccessListProgress(pending, refused); err != nil {
|
||||
log.Warn("BAL fetch cannot progress", "err", err, "fetched", len(fetched), "remaining", len(pending))
|
||||
return nil, err
|
||||
}
|
||||
// Assign BAL retrieval tasks to idle peers
|
||||
s.assignAccessListTasks(pending, refused, accessListResps, accessListReqFails, cancel)
|
||||
|
||||
// Periodic visibility while stalled with peers connected but idle.
|
||||
if len(pending) > 0 && time.Since(lastStallLog) > 30*time.Second {
|
||||
|
|
@ -857,12 +858,18 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has
|
|||
// A new peer joined, try to assign it work
|
||||
case id := <-peerDrop:
|
||||
s.revertBALRequests(id, pending)
|
||||
for h, set := range refused {
|
||||
delete(set, id)
|
||||
if len(set) == 0 {
|
||||
delete(refused, h)
|
||||
}
|
||||
}
|
||||
case <-cancel:
|
||||
return nil, ErrCancelled
|
||||
case req := <-accessListReqFails:
|
||||
s.revertAccessListRequest(req, pending)
|
||||
case res := <-accessListResps:
|
||||
s.processAccessListResponse(res, headers, pending, fetched)
|
||||
s.processAccessListResponse(res, headers, pending, fetched, refused)
|
||||
}
|
||||
}
|
||||
// Assemble results in input order
|
||||
|
|
@ -874,8 +881,9 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has
|
|||
}
|
||||
|
||||
// assignAccessListTasks attempts to assign BAL fetch requests to idle
|
||||
// peers for any hashes still in pending.
|
||||
func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, success chan *accessListResponse, fail chan *accessListRequest, cancel chan struct{}) {
|
||||
// peers for any hashes still in pending. Hashes a peer has already refused
|
||||
// (recorded in refused) are not assigned back to that same peer.
|
||||
func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, refused map[common.Hash]map[string]struct{}, success chan *accessListResponse, fail chan *accessListRequest, cancel chan struct{}) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
|
|
@ -889,6 +897,33 @@ func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, succe
|
|||
)
|
||||
idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
|
||||
|
||||
// Collect hashes to fetch, capped by peer capacity and the
|
||||
// EIP-8189 2 MiB response soft limit (~72 KiB/BAL -> 28 blocks).
|
||||
if cap > maxAccessListRequestCount {
|
||||
cap = maxAccessListRequestCount
|
||||
}
|
||||
batch := make([]common.Hash, 0, cap)
|
||||
for h := range pending {
|
||||
// Skip hashes this peer has already refused; another peer
|
||||
// must serve them.
|
||||
if set := refused[h]; set != nil {
|
||||
if _, ok := set[idle]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
delete(pending, h)
|
||||
|
||||
batch = append(batch, h)
|
||||
if len(batch) >= cap {
|
||||
break
|
||||
}
|
||||
}
|
||||
// The peer has already refused every pending hash; leave them in
|
||||
// pending for another peer and move on without a wasted request.
|
||||
if len(batch) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Generate a unique request ID
|
||||
var reqid uint64
|
||||
for {
|
||||
|
|
@ -901,21 +936,6 @@ func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, succe
|
|||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Collect hashes to fetch, capped by peer capacity and the
|
||||
// EIP-8189 2 MiB response soft limit (~72 KiB/BAL -> 28 blocks).
|
||||
if cap > maxAccessListRequestCount {
|
||||
cap = maxAccessListRequestCount
|
||||
}
|
||||
batch := make([]common.Hash, 0, cap)
|
||||
for h := range pending {
|
||||
delete(pending, h)
|
||||
|
||||
batch = append(batch, h)
|
||||
if len(batch) >= cap {
|
||||
break
|
||||
}
|
||||
}
|
||||
req := &accessListRequest{
|
||||
peer: idle,
|
||||
id: reqid,
|
||||
|
|
@ -950,7 +970,7 @@ func (s *syncerV2) assignAccessListTasks(pending map[common.Hash]struct{}, succe
|
|||
// processAccessListResponse handles a successful BAL response. It
|
||||
// verifies each non-empty BAL against the corresponding block header and
|
||||
// stores the verified ones in fetched.
|
||||
func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers map[common.Hash]*types.Header, pending map[common.Hash]struct{}, fetched map[common.Hash]rlp.RawValue) {
|
||||
func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers map[common.Hash]*types.Header, pending map[common.Hash]struct{}, fetched map[common.Hash]rlp.RawValue, refused map[common.Hash]map[string]struct{}) {
|
||||
var (
|
||||
stateless bool
|
||||
valid = make(map[common.Hash]rlp.RawValue)
|
||||
|
|
@ -959,8 +979,14 @@ func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers ma
|
|||
for i, raw := range res.accessLists {
|
||||
h := res.req.hashes[i]
|
||||
|
||||
// Peer doesn't have this BAL. Add it back to pending for retry.
|
||||
// Peer doesn't have this BAL (a legitimate reply, e.g. the block is
|
||||
// outside its retention window). Record the refusal and add the hash
|
||||
// back to pending for a retry against other peers.
|
||||
if bytes.Equal(raw, rlp.EmptyString) {
|
||||
if refused[h] == nil {
|
||||
refused[h] = make(map[string]struct{})
|
||||
}
|
||||
refused[h][res.req.peer] = struct{}{}
|
||||
continue
|
||||
}
|
||||
var b bal.BlockAccessList
|
||||
|
|
@ -984,6 +1010,7 @@ func (s *syncerV2) processAccessListResponse(res *accessListResponse, headers ma
|
|||
// Re-add hashes that were not served back or invalid to pending
|
||||
for i := 0; i < len(res.req.hashes); i++ {
|
||||
if _, ok := valid[res.req.hashes[i]]; ok {
|
||||
delete(refused, res.req.hashes[i])
|
||||
continue
|
||||
}
|
||||
pending[res.req.hashes[i]] = struct{}{}
|
||||
|
|
@ -2632,25 +2659,45 @@ func (s *syncerV2) reportSyncProgressV2(force bool) {
|
|||
"accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed))
|
||||
}
|
||||
|
||||
// accessListPeersExhausted reports whether forward progress on BAL fetches is
|
||||
// impossible: at least one peer is connected, every connected peer is marked
|
||||
// stateless, and no BAL requests are in flight.
|
||||
func (s *syncerV2) accessListPeersExhausted() bool {
|
||||
// checkAccessListProgress reports whether the BAL fetch can still make
|
||||
// forward progress against the current peer set.
|
||||
func (s *syncerV2) checkAccessListProgress(pending map[common.Hash]struct{}, refused map[common.Hash]map[string]struct{}) error {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
if len(s.peers) == 0 {
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if len(s.accessListReqs) > 0 {
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
serviceable := make(map[string]struct{}, len(s.peers))
|
||||
for id := range s.peers {
|
||||
if _, ok := s.statelessPeers[id]; !ok {
|
||||
return false
|
||||
serviceable[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
return true
|
||||
if len(serviceable) == 0 {
|
||||
return errAccessListPeersExhausted
|
||||
}
|
||||
for h, set := range refused {
|
||||
// Delivered by some other peer after all
|
||||
if _, ok := pending[h]; !ok {
|
||||
continue
|
||||
}
|
||||
unobtainable := true
|
||||
for id := range serviceable {
|
||||
if _, ok := set[id]; !ok {
|
||||
unobtainable = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if unobtainable {
|
||||
log.Warn("Access list unavailable from all peers", "hash", h)
|
||||
return errAccessListUnavailable
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sortIdlePeers builds a list of idle peers sorted by download capacity
|
||||
|
|
|
|||
Loading…
Reference in a new issue