Merge pull request #902 from gzliudan/fix_peer_tracking

eth/downloader: fix peer idleness tracking when restarting statesync
This commit is contained in:
Daniel Liu 2025-03-11 16:37:51 +08:00 committed by GitHub
commit d4f8f434b1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 67 additions and 34 deletions

View file

@ -323,7 +323,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
return err
}
if errors.Is(err, errInvalidChain) {
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errEmptyHeaderSet) ||
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
@ -334,22 +336,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
}
return err
}
switch err {
case errTimeout, errBadPeer, errStallingPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor:
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
default:
log.Warn("Synchronisation failed, retrying", "err", err)
}
log.Warn("Synchronisation failed, retrying", "err", err)
return err
}
@ -590,7 +577,7 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
head := headers[0]
p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
@ -799,7 +786,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
headers := packer.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
return 0, errBadPeer
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
arrived = true
@ -823,7 +810,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
}
start = check
hash = h
@ -1008,7 +995,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
case d.headerProcCh <- nil:
case <-d.cancelCh:
}
return errBadPeer
return fmt.Errorf("%w: header request timed out", errBadPeer)
}
}
}
@ -1436,7 +1423,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
log.Debug("Stale headers")
return errBadPeer
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]

View file

@ -63,6 +63,10 @@ func (d *Downloader) syncState(root common.Hash) *stateSync {
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
// If we tell the statesync to restart with a new root, we also need
// to wait for it to actually also start -- when old requests have timed
// out or been delivered
<-s.started
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
@ -95,15 +99,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(len(req.items))
}
}()
// Run the state sync.
log.Trace("State sync starting", "root", s.root)
go s.run()
defer s.Cancel()
@ -126,9 +124,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next
case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil
// Send the next finished request to the current sync:
@ -189,11 +189,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Make sure the previous one doesn't get siletly lost
// Move the previous request to the finished set
old.timer.Stop()
old.dropped = true
finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
@ -210,6 +208,46 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
}
}
// spindownStateSync 'drains' the outstanding requests; some will be delivered and other
// will time out. This is to ensure that when the next stateSync starts working, all peers
// are marked as idle and de facto _are_ idle.
func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*stateReq, timeout chan *stateReq, peerDrop chan *peerConnection) {
log.Trace("State sync spinning down", "active", len(active), "finished", len(finished))
for len(active) > 0 {
var (
req *stateReq
reason string
)
select {
// Handle (drop) incoming state packs:
case pack := <-d.stateCh:
req = active[pack.PeerId()]
reason = "delivered"
// Handle dropped peer connections:
case p := <-peerDrop:
req = active[p.id]
reason = "peerdrop"
// Handle timed-out requests:
case req = <-timeout:
reason = "timeout"
}
if req == nil {
continue
}
req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason)
req.timer.Stop()
delete(active, req.peer.id)
req.peer.SetNodeDataIdle(len(req.items))
}
// The 'finished' set contains deliveries that we were going to pass to processing.
// Those are now moot, but we still need to set those peers as idle, which would
// otherwise have been done after processing
for _, req := range finished {
req.peer.SetNodeDataIdle(len(req.items))
}
}
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
@ -222,11 +260,15 @@ type stateSync struct {
numUncommitted int
bytesUncommitted int
started chan struct{} // Started is signalled once the sync loop starts
deliver chan *stateReq // Delivery channel multiplexing peer responses
cancel chan struct{} // Channel to signal a termination request
cancelOnce sync.Once // Ensures cancel only ever gets called once
done chan struct{} // Channel to signal termination completion
err error // Any error hit during sync (set before completion)
root common.Hash
}
// stateTask represents a single trie node download taks, containing a set of
@ -247,6 +289,8 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
root: root,
}
}
@ -277,6 +321,7 @@ func (s *stateSync) Cancel() error {
// pushed here async. The reason is to decouple processing from data receipt
// and timeouts.
func (s *stateSync) loop() error {
close(s.started)
// Listen for new peer events to assign tasks to them
newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
@ -326,6 +371,7 @@ func (s *stateSync) loop() error {
}
// Process all the received blobs and check for stale delivery
delivered, err := s.process(req)
req.peer.SetNodeDataIdle(delivered)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
@ -365,7 +411,7 @@ func (s *stateSync) assignTasks() {
// If the peer was assigned tasks to fetch, send the network request
if len(req.items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root)
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)