Merge pull request #881 from gzliudan/upgrade_downloader

upgrade downloader
This commit is contained in:
Daniel Liu 2025-02-26 15:06:44 +08:00 committed by GitHub
commit 4fd2360d3f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 435 additions and 170 deletions

View file

@ -852,6 +852,17 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {
return rawdb.HasBody(bc.db, hash, number)
}
// HasFastBlock checks if a fast block is fully present in the database or not.
func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool {
if !bc.HasBlock(hash, number) {
return false
}
if bc.receiptsCache.Contains(hash) {
return true
}
return rawdb.HasReceipts(bc.db, hash, number)
}
// HasFullState checks if state trie is fully present in the database or not.
func (bc *BlockChain) HasFullState(block *types.Block) bool {
_, err := bc.stateCache.OpenTrie(block.Root())

View file

@ -370,6 +370,18 @@ func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
}
}
// HasReceipts verifies the existence of all the transaction receipts belonging
// to a block.
func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
return true
}
if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil {
return false
}
return true
}
// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash

View file

@ -63,6 +63,9 @@ var (
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
@ -81,23 +84,18 @@ var (
errPeersUnavailable = errors.New("no peers available or all tried for download")
errInvalidAncestor = errors.New("retrieved ancestor is invalid")
errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBlock = errors.New("retrieved block is invalid")
errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid")
errCancelBlockFetch = errors.New("block download canceled (requested)")
errCancelHeaderFetch = errors.New("block header download canceled (requested)")
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
errEnoughBlock = errors.New("downloader download enough block")
)
type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle)
mux *event.TypeMux // Event multiplexer to announce sync operation events
queue *queue // Scheduler for selecting the hashes to download
@ -184,6 +182,9 @@ type BlockChain interface {
// HasBlock verifies a block's presence in the local chain.
HasBlock(common.Hash, uint64) bool
// HasFastBlock verifies a fast block's presence in the local chain.
HasFastBlock(common.Hash, uint64) bool
// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash(common.Hash) *types.Block
@ -204,13 +205,12 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader {
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
mode: mode,
stateDB: stateDb,
mux: mux,
queue: newQueue(),
@ -253,13 +253,16 @@ func (d *Downloader) Progress() XDPoSChain.SyncProgress {
defer d.syncStatsLock.RUnlock()
current := uint64(0)
switch d.mode {
case FullSync:
mode := d.getMode()
switch {
case d.blockchain != nil && mode == FullSync:
current = d.blockchain.CurrentBlock().NumberU64()
case FastSync:
case d.blockchain != nil && mode == FastSync:
current = d.blockchain.CurrentFastBlock().NumberU64()
case LightSync:
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
}
return XDPoSChain.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
@ -307,14 +310,6 @@ func (d *Downloader) UnregisterPeer(id string) error {
}
d.queue.Revoke(id)
// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := id == d.cancelPeer
d.cancelLock.RUnlock()
if master {
d.cancel()
}
return nil
}
@ -322,13 +317,28 @@ func (d *Downloader) UnregisterPeer(id string) error {
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
switch err {
case nil:
case errBusy:
switch err {
case nil, errBusy, errCanceled:
return err
}
if errors.Is(err, errInvalidChain) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
return err
}
switch err {
case errTimeout, errBadPeer, errStallingPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor, errInvalidChain:
errInvalidAncestor:
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
@ -395,8 +405,8 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
defer d.Cancel() // No matter what, we can't leave the cancel channel open
// Set the requested sync mode, unless it's forbidden
d.mode = mode
// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
@ -406,6 +416,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
return d.syncWithPeer(p, hash, td)
}
func (d *Downloader) getMode() SyncMode {
return SyncMode(atomic.LoadUint32(&d.mode))
}
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
@ -421,8 +435,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if p.version < 62 {
return errTooOld
}
mode := d.getMode()
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
}(time.Now())
@ -434,7 +449,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}
height := latest.Number.Uint64()
origin, err := d.findAncestor(p, height)
origin, err := d.findAncestor(p, latest)
if err != nil {
return err
}
@ -447,7 +462,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
if d.mode == FastSync {
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
@ -458,11 +473,11 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}
}
d.committed = 1
if d.mode == FastSync && pivot != 0 {
if mode == FastSync && pivot != 0 {
d.committed = 0
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
d.queue.Prepare(origin+1, mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
@ -473,9 +488,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, pivot, td) },
}
if d.mode == FastSync {
if mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
} else if mode == FullSync {
fetchers = append(fetchers, func() error { return d.processFullSyncContent(height) })
}
return d.spawnSync(fetchers)
@ -499,7 +514,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil {
if err = <-errc; err != nil && err != errCanceled {
break
}
}
@ -563,7 +578,7 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He
for {
select {
case <-d.cancelCh:
return nil, errCancelBlockFetch
return nil, errCanceled
case packet := <-d.headerCh:
// Discard anything not from the origin peer
@ -592,41 +607,90 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He
}
}
// calculateRequestSpan calculates what headers to request from a peer when trying to determine the
// common ancestor.
// It returns parameters to be used for peer.RequestHeadersByNumber:
//
// from - starting block number
// count - number of headers to request
// skip - number of headers to skip
//
// and also returns 'max', the last block which is expected to be returned by the remote peers,
// given the (from,count,skip)
func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) {
var (
from int
count int
MaxCount = MaxHeaderFetch / 16
)
// requestHead is the highest block that we will ask for. If requestHead is not offset,
// the highest block that we will get is 16 blocks back from head, which means we
// will fetch 14 or 15 blocks unnecessarily in the case the height difference
// between us and the peer is 1-2 blocks, which is most common
requestHead := int(remoteHeight) - 1
if requestHead < 0 {
requestHead = 0
}
// requestBottom is the lowest block we want included in the query
// Ideally, we want to include the one just below our own head
requestBottom := int(localHeight - 1)
if requestBottom < 0 {
requestBottom = 0
}
totalSpan := requestHead - requestBottom
span := 1 + totalSpan/MaxCount
if span < 2 {
span = 2
}
if span > 16 {
span = 16
}
count = 1 + totalSpan/span
if count > MaxCount {
count = MaxCount
}
if count < 2 {
count = 2
}
from = requestHead - (count-1)*span
if from < 0 {
from = 0
}
max := from + (count-1)*span
return int64(from), count, span - 1, uint64(max)
}
// findAncestor tries to locate the common ancestor link of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
var (
floor = int64(-1)
localHeight uint64
remoteHeight = remoteHeader.Number.Uint64()
)
mode := d.getMode()
switch mode {
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case FastSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}
p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
if d.mode == FullSync {
ceil = d.blockchain.CurrentBlock().NumberU64()
} else if d.mode == FastSync {
ceil = d.blockchain.CurrentFastBlock().NumberU64()
if localHeight >= MaxForkAncestry {
floor = int64(localHeight - MaxForkAncestry)
}
if ceil >= MaxForkAncestry {
floor = int64(ceil - MaxForkAncestry)
}
p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
// Request the topmost blocks to short circuit binary ancestor lookup
head := ceil
if head > height {
head = height
}
from := int64(head) - int64(MaxHeaderFetch)
if from < 0 {
from = 0
}
// Span out with 15 block gaps into the future to catch bad head reports
limit := 2 * MaxHeaderFetch / 16
count := 1 + int((int64(ceil)-from)/16)
if count > limit {
count = limit
}
go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
@ -637,8 +701,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
for finished := false; !finished; {
select {
case <-d.cancelCh:
return 0, errCancelHeaderFetch
return 0, errCanceled
case packet := <-d.headerCh:
// Discard anything not from the origin peer
@ -653,28 +716,35 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
return 0, errEmptyHeaderSet
}
// Make sure the peer's reply conforms to the request
for i := 0; i < len(headers); i++ {
if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
return 0, errInvalidChain
for i, header := range headers {
expectNumber := from + int64(i)*int64((skip+1))
if number := header.Number.Int64(); number != expectNumber {
p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
}
}
// Check if a common ancestor was found
finished = true
for i := len(headers) - 1; i >= 0; i-- {
// Skip any headers that underflow/overflow our requested set
if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {
continue
}
// Otherwise check if we already know the header or not
if (d.mode == FullSync && d.blockchain.HasBlock(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
h := headers[i].Hash()
n := headers[i].Number.Uint64()
// If every header is known, even future ones, the peer straight out lied about its head
if number > height && i == limit-1 {
p.log.Warn("Lied about chain head", "reported", height, "found", number)
return 0, errStallingPeer
}
var known bool
switch mode {
case FullSync:
known = d.blockchain.HasBlock(h, n)
case FastSync:
known = d.blockchain.HasFastBlock(h, n)
default:
known = d.lightchain.HasHeader(h, n)
}
if known {
number, hash = n, h
break
}
}
@ -698,10 +768,12 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
return number, nil
}
// Ancestor not found, we need to binary search over our chain
start, end := uint64(0), head
start, end := uint64(0), remoteHeight
if floor > 0 {
start = uint64(floor)
}
p.log.Trace("Binary searching for common ancestor", "start", start, "end", end)
for start+1 < end {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
@ -715,7 +787,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
for arrived := false; !arrived; {
select {
case <-d.cancelCh:
return 0, errCancelHeaderFetch
return 0, errCanceled
case packer := <-d.headerCh:
// Discard anything not from the origin peer
@ -732,11 +804,23 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
arrived = true
// Modify the search interval based on the response
if (d.mode == FullSync && !d.blockchain.HasBlock(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
h := headers[0].Hash()
n := headers[0].Number.Uint64()
var known bool
switch mode {
case FullSync:
known = d.blockchain.HasBlock(h, n)
case FastSync:
known = d.blockchain.HasFastBlock(h, n)
default:
known = d.lightchain.HasHeader(h, n)
}
if !known {
end = check
break
}
header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
@ -799,10 +883,11 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
// Start pulling the header chain skeleton until all is done
getHeaders(from)
mode := d.getMode()
for {
select {
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers
@ -829,7 +914,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
getHeaders(from)
continue
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
}
// Pivot done (or not in fast sync) and no more headers, terminate the process
@ -838,7 +923,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
}
headers := packet.(*headerPack).headers
@ -848,10 +933,34 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
p.log.Debug("Skeleton chain invalid", "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
headers = filled[proced:]
from += uint64(proced)
} else {
// If we're closing in on the chain head, but haven't yet reached it, delay
// the last few headers so mini reorgs on the head don't cause invalid hash
// chain errors.
if n := len(headers); n > 0 {
// Retrieve the current head we're at
head := uint64(0)
if mode == LightSync {
head = d.lightchain.CurrentHeader().Number.Uint64()
} else {
head = d.blockchain.CurrentFastBlock().NumberU64()
if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
head = full
}
}
// If the head is way older than this batch, delay the last few headers
if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
delay := reorgProtHeaderDelay
if delay > n {
delay = n
}
headers = headers[:n-delay]
}
}
}
// Insert all the new headers and fetch the next batch
if len(headers) > 0 {
@ -859,11 +968,21 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
from += uint64(len(headers))
getHeaders(from)
} else {
// No headers delivered, or all of them being delayed, sleep a bit and retry
p.log.Trace("All headers delayed, waiting")
select {
case <-time.After(fsHeaderContCheck):
getHeaders(from)
continue
case <-d.cancelCh:
return errCanceled
}
}
getHeaders(from)
case <-timeout.C:
if d.dropPeer == nil {
@ -920,7 +1039,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
@ -946,7 +1065,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
@ -970,7 +1089,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
@ -1003,7 +1122,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
@ -1019,7 +1138,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
for {
select {
case <-d.cancelCh:
return errCancel
return errCanceled
case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver its pack
@ -1027,13 +1146,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of data and check chain validity
accepted, err := deliver(packet)
if err == errInvalidChain {
if errors.Is(err, errInvalidChain) {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted)
}
// Issue a log to the user to see what's going on
@ -1090,12 +1209,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
setIdle(peer, 0)
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
} else {
d.dropPeer(pid)
// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := pid == d.cancelPeer
d.cancelLock.RUnlock()
if master {
d.cancel()
return errTimeout
}
}
}
}
@ -1169,6 +1299,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
rollback := []*types.Header{}
mode := d.getMode()
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
@ -1177,13 +1308,13 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
hashes[i] = header.Hash()
}
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if d.mode != LightSync {
if mode != LightSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
}
d.lightchain.Rollback(hashes)
curFastBlock, curBlock := common.Big0, common.Big0
if d.mode != LightSync {
if mode != LightSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
@ -1200,7 +1331,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
return errCanceled
case headers := <-d.headerProcCh:
// Terminate header processing if we synced up
@ -1214,7 +1345,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fecher):
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
@ -1224,7 +1355,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if d.mode != LightSync {
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
@ -1237,7 +1368,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
if mode == FastSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
@ -1249,12 +1380,11 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
return errCanceled
default:
}
// Select the next chunk of headers to import
@ -1263,9 +1393,8 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
limit = len(headers)
}
chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
if mode == FastSync || mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
for _, header := range chunk {
@ -1284,7 +1413,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
rollback = append(rollback, chunk[:n]...)
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, store newly found uncertain headers
rollback = append(rollback, unknown...)
@ -1293,12 +1422,12 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
if mode == FullSync || mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
return errCanceled
case <-time.After(time.Second):
}
}
@ -1402,8 +1531,16 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
if index, err := d.blockchain.InsertChain(blocks); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
if index < len(results) {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
} else {
// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
// when it needs to preprocess blocks to import a sidechain.
// The importer will put together a new list of blocks to import, which is a superset
// of the blocks delivered from the downloader, and the indexing will be off.
log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
}
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
if d.handleProposedBlock != nil {
header := blocks[len(blocks)-1].Header()
@ -1423,8 +1560,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync := d.syncState(latest.Root)
defer stateSync.Cancel()
go func() {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
d.queue.Close() // wake up WaitResults
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results
}
}()
// Figure out the ideal pivot block. Note, that this goalpost may move if the
@ -1451,6 +1588,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// If sync failed, stop
select {
case <-d.cancelCh:
stateSync.Cancel()
return stateSync.Cancel()
default:
}
@ -1481,8 +1619,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync = d.syncState(P.Header.Root)
defer stateSync.Cancel()
go func() {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
d.queue.Close() // wake up WaitResults
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results
}
}()
oldPivot = P
@ -1553,7 +1691,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
return nil
}

View file

@ -21,16 +21,16 @@ import (
"fmt"
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/ethdb"
@ -99,7 +99,7 @@ func newTester() *downloadTester {
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock)
tester.downloader = New(tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock)
return tester
}
@ -230,6 +230,15 @@ func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
return dl.GetBlockByHash(hash) != nil
}
// HasFastBlock checks if a block is present in the testers canonical chain.
func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
dl.lock.RLock()
defer dl.lock.RUnlock()
_, ok := dl.ownReceipts[hash]
return ok
}
// GetHeader retrieves a header from the testers canonical chain.
func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
dl.lock.RLock()
@ -309,27 +318,32 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
// Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
return 0, errors.New("unknown parent")
return 0, errors.New("InsertHeaderChain: unknown parent at first position")
}
var hashes []common.Hash
for i := 1; i < len(headers); i++ {
hash := headers[i-1].Hash()
if headers[i].ParentHash != headers[i-1].Hash() {
return i, errors.New("unknown parent")
return i, fmt.Errorf("non-contiguous import at position %d", i)
}
hashes = append(hashes, hash)
}
hashes = append(hashes, headers[len(headers)-1].Hash())
// Do a full insert if pre-checks passed
for i, header := range headers {
if _, ok := dl.ownHeaders[header.Hash()]; ok {
hash := hashes[i]
if _, ok := dl.ownHeaders[hash]; ok {
continue
}
if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
return i, errors.New("unknown parent")
// This _should_ be impossible, due to precheck and induction
return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i)
}
dl.ownHashes = append(dl.ownHashes, header.Hash())
dl.ownHeaders[header.Hash()] = header
dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
dl.ownHashes = append(dl.ownHashes, hash)
dl.ownHeaders[hash] = header
dl.ownChainTd[hash] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
}
return len(headers), nil
}
@ -341,15 +355,16 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) {
for i, block := range blocks {
if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
return i, errors.New("unknown parent")
return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks))
} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
}
if _, ok := dl.ownHeaders[block.Hash()]; !ok {
dl.ownHashes = append(dl.ownHashes, block.Hash())
dl.ownHeaders[block.Hash()] = block.Header()
}
dl.ownBlocks[block.Hash()] = block
dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty())
}
@ -366,7 +381,7 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
return i, errors.New("unknown owner")
}
if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
return i, errors.New("unknown parent")
return i, errors.New("InsertReceiptChain: unknown parent")
}
dl.ownBlocks[blocks[i].Hash()] = blocks[i]
dl.ownReceipts[blocks[i].Hash()] = receipts[i]
@ -622,28 +637,28 @@ func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
// assertOwnChain checks if the local chain contains the correct number of items
// of the various chain components.
func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
// Mark this method as a helper to report errors at callsite, not in here
t.Helper()
assertOwnForkedChain(t, tester, 1, []int{length})
}
// assertOwnForkedChain checks if the local forked chain contains the correct
// number of items of the various chain components.
func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
// Initialize the counters for the first fork
headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-fsMinFullBlocks
// Mark this method as a helper to report errors at callsite, not in here
t.Helper()
// Initialize the counters for the first fork
headers, blocks, receipts := lengths[0], lengths[0], lengths[0]
if receipts < 0 {
receipts = 1
}
// Update the counters for each subsequent fork
for _, length := range lengths[1:] {
headers += length - common
blocks += length - common
receipts += length - common - fsMinFullBlocks
receipts += length - common
}
switch tester.downloader.mode {
case FullSync:
receipts = 1
case LightSync:
if tester.downloader.getMode() == LightSync {
blocks, receipts = 1, 1
}
if hs := len(tester.ownHeaders); hs != headers {
@ -762,7 +777,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
tester.downloader.queue.lock.Unlock()
tester.lock.Unlock()
if cached == blockCacheItems || retrieved+cached+frozen == targetBlocks+1 {
if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
break
}
}
@ -772,7 +787,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
tester.lock.RLock()
retrieved = len(tester.ownBlocks)
tester.lock.RUnlock()
if cached != blockCacheItems && retrieved+cached+frozen != targetBlocks+1 {
if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
}
// Permit the blocked blocks to import
@ -1338,14 +1353,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
{errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
{errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
{errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
{errInvalidBody, false}, // A bad peer was detected, but not the sync origin
{errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
@ -1773,3 +1782,78 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
tester.downloader.peers.peers["peer"].peer.(*floodingTestPeer).pend.Wait()
}
}
func TestRemoteHeaderRequestSpan(t *testing.T) {
testCases := []struct {
remoteHeight uint64
localHeight uint64
expected []int
}{
// Remote is way higher. We should ask for the remote head and go backwards
{1500, 1000,
[]int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
},
{15000, 13006,
[]int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
},
//Remote is pretty close to us. We don't have to fetch as many
{1200, 1150,
[]int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
},
// Remote is equal to us (so on a fork with higher td)
// We should get the closest couple of ancestors
{1500, 1500,
[]int{1497, 1499},
},
// We're higher than the remote! Odd
{1000, 1500,
[]int{997, 999},
},
// Check some weird edgecases that it behaves somewhat rationally
{0, 1500,
[]int{0, 2},
},
{6000000, 0,
[]int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
},
{0, 0,
[]int{0, 2},
},
}
reqs := func(from, count, span int) []int {
var r []int
num := from
for len(r) < count {
r = append(r, num)
num += span + 1
}
return r
}
for i, tt := range testCases {
from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
data := reqs(int(from), count, span)
if max != uint64(data[len(data)-1]) {
t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
}
failed := false
if len(data) != len(tt.expected) {
failed = true
t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
} else {
for j, n := range data {
if n != tt.expected[j] {
failed = true
break
}
}
}
if failed {
res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
fmt.Printf("got: %v\n", res)
fmt.Printf("exp: %v\n", exp)
t.Errorf("test %d: wrong values", i)
}
}
}

View file

@ -18,8 +18,8 @@ package downloader
import "fmt"
// SyncMode represents the synchronisation mode of the downloader.
type SyncMode int
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32
const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks

View file

@ -143,7 +143,7 @@ func (q *queue) Reset() {
q.resultOffset = 0
}
// Close marks the end of the sync, unblocking WaitResults.
// Close marks the end of the sync, unblocking Results.
// It may be called even if the queue is already closed.
func (q *queue) Close() {
q.lock.Lock()
@ -510,7 +510,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
log.Error("index allocation went beyond available resultCache space", "index", index, "len.resultCache", len(q.resultCache), "blockNum", header.Number.Int64(), "resultOffset", q.resultOffset)
return nil, false, errInvalidChain
return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain)
}
if q.resultCache[index] == nil {
components := 1
@ -545,7 +545,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
// Wake WaitResults, resultCache was modified
// Wake Results, resultCache was modified
q.active.Signal()
}
// Assemble and return the block download request
@ -860,19 +860,21 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up WaitResults
// Wake up Results
if accepted > 0 {
q.active.Signal()
}
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
return accepted, failure
case useful:
return accepted, fmt.Errorf("partial failure: %v", failure)
default:
return accepted, errStaleDelivery
if failure == nil {
return accepted, nil
}
if errors.Is(failure, errInvalidChain) {
return accepted, failure
}
if useful {
return accepted, fmt.Errorf("partial failure: %v", failure)
}
return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
}
// Prepare configures the result cache to allow accepting and caching inbound

View file

@ -306,14 +306,31 @@ func (s *stateSync) loop() error {
// 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment.
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
s.d.dropPeer(req.peer.id)
if s.d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id)
} else {
s.d.dropPeer(req.peer.id)
// If this peer was the master peer, abort sync immediately
s.d.cancelLock.RLock()
master := req.peer.id == s.d.cancelPeer
s.d.cancelLock.RUnlock()
if master {
s.d.cancel()
return errTimeout
}
}
}
// Process all the received blobs and check for stale delivery
if err := s.process(req); err != nil {
delivered, err := s.process(req)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
}
req.peer.SetNodeDataIdle(len(req.response))
req.peer.SetNodeDataIdle(delivered)
}
}
return s.commit(true)
@ -391,10 +408,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
func (s *stateSync) process(req *stateReq) error {
// delivered. Returns whether the peer actually managed to deliver anything of
// value, and any error that occurred.
func (s *stateSync) process(req *stateReq) (int, error) {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected := 0, 0
duplicate, unexpected, successful := 0, 0, 0
defer func(start time.Time) {
if duplicate > 0 || unexpected > 0 {
@ -404,7 +422,6 @@ func (s *stateSync) process(req *stateReq) error {
// Iterate over all the delivered data and inject one-by-one into the trie
progress := false
for _, blob := range req.response {
prog, hash, err := s.processNodeData(blob)
switch err {
@ -412,12 +429,13 @@ func (s *stateSync) process(req *stateReq) error {
s.numUncommitted++
s.bytesUncommitted += len(blob)
progress = progress || prog
successful++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
delete(req.tasks, hash)
}
@ -433,12 +451,12 @@ func (s *stateSync) process(req *stateReq) error {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.tasks[hash] = task
}
return nil
return successful, nil
}
// processNodeData tries to inject a trie node data blob delivered from a remote

View file

@ -209,7 +209,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock)
manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)

View file

@ -208,7 +208,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
}
if lightSync {
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock)
manager.downloader = downloader.New(chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock)
manager.peers.notify((*downloaderPeerNotify)(manager))
manager.fetcher = newLightFetcher(manager)
}