From 7926650d0b48b4144151bcaf7ff5ce430b22077e Mon Sep 17 00:00:00 2001 From: healthykim Date: Thu, 19 Mar 2026 02:26:29 +0900 Subject: [PATCH] pass block timestamp for fork aware validation --- eth/downloader/downloader_test.go | 2 +- .../fetchers_concurrent_receipts.go | 8 +++-- eth/downloader/peer.go | 2 +- eth/downloader/skeleton_test.go | 2 +- eth/handler_eth_test.go | 24 +++++++-------- eth/handler_test.go | 2 +- eth/protocols/eth/handler.go | 2 +- eth/protocols/eth/handshake_test.go | 2 +- eth/protocols/eth/peer.go | 30 +++++++++++++------ eth/protocols/eth/peer_test.go | 24 +++++++++++---- eth/sync_test.go | 4 +-- 11 files changed, 64 insertions(+), 38 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 0012ae2780..01a994dbfd 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -259,7 +259,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et // RequestReceipts constructs a getReceipts method associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of block receipts from the particularly requested peer. -func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *eth.Response) (*eth.Request, error) { +func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, timestamps []uint64, sink chan *eth.Response) (*eth.Request, error) { blobs := eth.ServiceGetReceiptsQuery69(dlp.chain, hashes) receipts := make([]types.Receipts, blobs.Len()) diff --git a/eth/downloader/fetchers_concurrent_receipts.go b/eth/downloader/fetchers_concurrent_receipts.go index 01a5888aee..74dbc67af3 100644 --- a/eth/downloader/fetchers_concurrent_receipts.go +++ b/eth/downloader/fetchers_concurrent_receipts.go @@ -79,14 +79,16 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch q.receiptFetchHook(req.Headers) } var ( - gasUsed = make([]uint64, 0, len(req.Headers)) - hashes = make([]common.Hash, 0, len(req.Headers)) + gasUsed = make([]uint64, 0, len(req.Headers)) + timestamps = make([]uint64, 0, len(req.Headers)) + hashes = make([]common.Hash, 0, len(req.Headers)) ) for _, header := range req.Headers { hashes = append(hashes, header.Hash()) gasUsed = append(gasUsed, header.GasUsed) + timestamps = append(timestamps, header.Time) } - return peer.peer.RequestReceipts(hashes, gasUsed, resCh) + return peer.peer.RequestReceipts(hashes, gasUsed, timestamps, resCh) } // deliver is responsible for taking a generic response packet from the concurrent diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 9dc91a99d7..d20bda69e9 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -60,7 +60,7 @@ type Peer interface { RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error) RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) - RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error) + RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error) } // newPeerConnection creates a new downloader peer. diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 5d9f266db6..b9e7dba6ee 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -208,7 +208,7 @@ func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*et panic("skeleton sync must not request block bodies") } -func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error) { +func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error) { panic("skeleton sync must not request receipts") } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 68e91fa897..4f74f7672f 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -137,8 +137,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { defer p2pNoFork.Close() defer p2pProFork.Close() - peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil) - peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil) + peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil, nil) + peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil, nil) defer peerNoFork.Close() defer peerProFork.Close() @@ -168,8 +168,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { defer p2pNoFork.Close() defer p2pProFork.Close() - peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil) - peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil) + peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil, nil) + peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil, nil) defer peerNoFork.Close() defer peerProFork.Close() @@ -199,8 +199,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { defer p2pNoFork.Close() defer p2pProFork.Close() - peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil) - peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil) + peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil, nil) + peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil, nil) defer peerNoFork.Close() defer peerProFork.Close() @@ -249,8 +249,8 @@ func testRecvTransactions(t *testing.T, protocol uint) { defer p2pSrc.Close() defer p2pSink.Close() - src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool) - sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool) + src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool, nil) + sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool, nil) defer src.Close() defer sink.Close() @@ -305,8 +305,8 @@ func testSendTransactions(t *testing.T, protocol uint) { defer p2pSrc.Close() defer p2pSink.Close() - src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool) - sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool) + src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool, nil) + sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool, nil) defer src.Close() defer sink.Close() @@ -380,8 +380,8 @@ func testTransactionPropagation(t *testing.T, protocol uint) { defer sourcePipe.Close() defer sinkPipe.Close() - sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i + 1)}, "", nil, sourcePipe), sourcePipe, source.txpool) - sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool) + sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i + 1)}, "", nil, sourcePipe), sourcePipe, source.txpool, nil) + sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool, nil) defer sourcePeer.Close() defer sinkPeer.Close() diff --git a/eth/handler_test.go b/eth/handler_test.go index 3470452980..205c06cd6b 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -315,7 +315,7 @@ func createTestPeers(rand *rand.Rand, n int) []*ethPeer { var id enode.ID rand.Read(id[:]) p2pPeer := p2p.NewPeer(id, "test", nil) - ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil) + ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil, nil) peers[i] = ðPeer{Peer: ep} } return peers diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 5fe2e76ecf..59512f5be7 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -110,7 +110,7 @@ func MakeProtocols(backend Backend, network uint64, disc enode.Iterator) []p2p.P Version: version, Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := NewPeer(version, p, rw, backend.TxPool()) + peer := NewPeer(version, p, rw, backend.TxPool(), backend.Chain().Config()) defer peer.Close() return backend.RunPeer(peer, func(peer *Peer) error { diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index e2f1e7592a..5746d5896d 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -77,7 +77,7 @@ func testHandshake(t *testing.T, protocol uint) { defer app.Close() defer net.Close() - peer := NewPeer(protocol, p2p.NewPeer(enode.ID{}, "peer", nil), net, nil) + peer := NewPeer(protocol, p2p.NewPeer(enode.ID{}, "peer", nil), net, nil, nil) defer peer.Close() // Send the junk test with one peer, check the handshake failure diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index b58aa4699a..1db1fa24d4 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -51,6 +51,7 @@ const ( type receiptRequest struct { request []common.Hash // block hashes corresponding to the requested receipts gasUsed []uint64 // block gas used corresponding to the requested receipts + timestamps []uint64 // block timestamps corresponding to the requested receipts list []*ReceiptList // list of partially collected receipts lastLogSize uint64 // log size of last receipt list } @@ -75,6 +76,8 @@ type Peer struct { reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them + chainConfig *params.ChainConfig // Chain configuration for fork-aware validation + receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts receiptBufferLock sync.Mutex // Lock for protecting the receiptBuffer @@ -83,7 +86,7 @@ type Peer struct { // NewPeer creates a wrapper for a network connection and negotiated protocol // version. -func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { +func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, chainConfig *params.ChainConfig) *Peer { cap := p2p.Cap{Name: ProtocolName, Version: version} id := p.ID().String() peer := &Peer{ @@ -99,6 +102,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe reqCancel: make(chan *cancel), resDispatch: make(chan *response), txpool: txpool, + chainConfig: chainConfig, receiptBuffer: make(map[uint64]*receiptRequest), term: make(chan struct{}), } @@ -355,9 +359,9 @@ func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Reques } // RequestReceipts fetches a batch of transaction receipts from a remote node. -// gasUsed provides the total gas used per block, used to estimate the maximum -// log byte size. -func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *Response) (*Request, error) { +// `gasUsed` provides the total gas used per block, used to estimate the maximum +// log byte size. `timestamps` provides the block timestamps for fork aware validation. +func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, timestamps []uint64, sink chan *Response) (*Request, error) { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) id := rand.Uint64() @@ -377,8 +381,9 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan } p.receiptBufferLock.Lock() p.receiptBuffer[id] = &receiptRequest{ - request: hashes, - gasUsed: gasUsed, + request: hashes, + gasUsed: gasUsed, + timestamps: timestamps, } p.receiptBufferLock.Unlock() } else { @@ -459,7 +464,8 @@ func (p *Peer) bufferReceipts(requestId uint64, receiptLists []*ReceiptList, las lastBlock += len(buffer.list) - 1 } gasUsed := buffer.gasUsed[lastBlock] - logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed) + timestamp := buffer.timestamps[lastBlock] + logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed, timestamp) if err != nil { delete(p.receiptBuffer, requestId) return err @@ -508,7 +514,7 @@ func (p *Peer) flushReceipts(requestId uint64) []*ReceiptList { // Note that the last receipt response (which completes receiptLists of a pending block) // is not verified here. Those response doesn't need hueristics below since they can be // verified by its trie root. -func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, gasUsed uint64) (uint64, error) { +func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, gasUsed uint64, timestamp uint64) (uint64, error) { lastReceipts := receiptLists[len(receiptLists)-1] // If the receipt is in the middle of retrieval, use the buffered data. @@ -525,7 +531,13 @@ func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, } // Verify that the total number of transactions delivered is under the limit. - if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/21_000 { + var minTxGas uint64 + if p.chainConfig.AmsterdamTime != nil && *p.chainConfig.AmsterdamTime <= timestamp { + minTxGas = 4500 + } else { + minTxGas = 21000 + } + if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/minTxGas { // should be dropped, don't clear the buffer return 0, fmt.Errorf("total number of tx exceeded limit") } diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index ab7b006d36..cf531cee11 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -52,7 +52,7 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan var id enode.ID rand.Read(id[:]) - peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool()) + peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool(), nil) errc := make(chan error, 1) go func() { defer app.Close() @@ -115,7 +115,7 @@ func TestPartialReceipt(t *testing.T) { t.Fatalf("failed to create random peer: %v", err) } - peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil) + peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil, nil) packetCh := make(chan *GetReceiptsPacket70, 1) go func() { @@ -149,9 +149,15 @@ func TestPartialReceipt(t *testing.T) { backend.chain.GetBlockByNumber(3).GasUsed(), backend.chain.GetBlockByNumber(4).GasUsed(), } + timestamps := []uint64{ + backend.chain.GetBlockByNumber(1).Time(), + backend.chain.GetBlockByNumber(2).Time(), + backend.chain.GetBlockByNumber(3).Time(), + backend.chain.GetBlockByNumber(4).Time(), + } sink := make(chan *Response, 1) - req, err := peer.RequestReceipts(hashes, gasUsed, sink) + req, err := peer.RequestReceipts(hashes, gasUsed, timestamps, sink) if err != nil { t.Fatalf("RequestReceipts failed: %v", err) } @@ -313,7 +319,7 @@ func TestPartialReceiptFailure(t *testing.T) { t.Fatalf("failed to create random peer: %v", err) } - peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil) + peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil, nil) packetCh := make(chan *GetReceiptsPacket70, 1) go func() { @@ -370,10 +376,16 @@ func TestPartialReceiptFailure(t *testing.T) { backend.chain.GetBlockByNumber(3).GasUsed(), backend.chain.GetBlockByNumber(4).GasUsed(), } + timestamps := []uint64{ + backend.chain.GetBlockByNumber(1).Time(), + backend.chain.GetBlockByNumber(2).Time(), + backend.chain.GetBlockByNumber(3).Time(), + backend.chain.GetBlockByNumber(4).Time(), + } // Case 1 ) The number of receipts exceeds maximum tx count sink := make(chan *Response, 1) - req, err := peer.RequestReceipts(hashes, gasUsed, sink) + req, err := peer.RequestReceipts(hashes, gasUsed, timestamps, sink) if err != nil { t.Fatalf("RequestReceipts failed: %v", err) } @@ -406,7 +418,7 @@ func TestPartialReceiptFailure(t *testing.T) { } // Case 2 ) Total receipt size exceeds the block gas limit - req, err = peer.RequestReceipts(hashes, gasUsed, sink) + req, err = peer.RequestReceipts(hashes, gasUsed, timestamps, sink) if err != nil { t.Fatalf("RequestReceipts failed: %v", err) } diff --git a/eth/sync_test.go b/eth/sync_test.go index 77a50bf6d3..e22c495275 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -50,8 +50,8 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { defer emptyPipeEth.Close() defer fullPipeEth.Close() - emptyPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{1}, "", caps), emptyPipeEth, empty.txpool) - fullPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{2}, "", caps), fullPipeEth, full.txpool) + emptyPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{1}, "", caps), emptyPipeEth, empty.txpool, nil) + fullPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{2}, "", caps), fullPipeEth, full.txpool, nil) defer emptyPeerEth.Close() defer fullPeerEth.Close()