pass block timestamp for fork aware validation

This commit is contained in:
healthykim 2026-03-19 02:26:29 +09:00
parent bb449e5763
commit 7926650d0b
11 changed files with 64 additions and 38 deletions

View file

@ -259,7 +259,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// RequestReceipts constructs a getReceipts method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *eth.Response) (*eth.Request, error) {
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, timestamps []uint64, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery69(dlp.chain, hashes)
receipts := make([]types.Receipts, blobs.Len())

View file

@ -79,14 +79,16 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
q.receiptFetchHook(req.Headers)
}
var (
gasUsed = make([]uint64, 0, len(req.Headers))
hashes = make([]common.Hash, 0, len(req.Headers))
gasUsed = make([]uint64, 0, len(req.Headers))
timestamps = make([]uint64, 0, len(req.Headers))
hashes = make([]common.Hash, 0, len(req.Headers))
)
for _, header := range req.Headers {
hashes = append(hashes, header.Hash())
gasUsed = append(gasUsed, header.GasUsed)
timestamps = append(timestamps, header.Time)
}
return peer.peer.RequestReceipts(hashes, gasUsed, resCh)
return peer.peer.RequestReceipts(hashes, gasUsed, timestamps, resCh)
}
// deliver is responsible for taking a generic response packet from the concurrent

View file

@ -60,7 +60,7 @@ type Peer interface {
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error)
}
// newPeerConnection creates a new downloader peer.

View file

@ -208,7 +208,7 @@ func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*et
panic("skeleton sync must not request block bodies")
}
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error) {
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request receipts")
}

View file

@ -137,8 +137,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
defer p2pNoFork.Close()
defer p2pProFork.Close()
peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil)
peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil)
peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil, nil)
peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil, nil)
defer peerNoFork.Close()
defer peerProFork.Close()
@ -168,8 +168,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
defer p2pNoFork.Close()
defer p2pProFork.Close()
peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil, nil)
peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil, nil)
defer peerNoFork.Close()
defer peerProFork.Close()
@ -199,8 +199,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
defer p2pNoFork.Close()
defer p2pProFork.Close()
peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil)
peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil)
peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil, nil)
peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil, nil)
defer peerNoFork.Close()
defer peerProFork.Close()
@ -249,8 +249,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
defer p2pSrc.Close()
defer p2pSink.Close()
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool)
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool)
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool, nil)
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool, nil)
defer src.Close()
defer sink.Close()
@ -305,8 +305,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
defer p2pSrc.Close()
defer p2pSink.Close()
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool)
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool)
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool, nil)
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool, nil)
defer src.Close()
defer sink.Close()
@ -380,8 +380,8 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
defer sourcePipe.Close()
defer sinkPipe.Close()
sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i + 1)}, "", nil, sourcePipe), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool)
sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i + 1)}, "", nil, sourcePipe), sourcePipe, source.txpool, nil)
sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool, nil)
defer sourcePeer.Close()
defer sinkPeer.Close()

View file

@ -315,7 +315,7 @@ func createTestPeers(rand *rand.Rand, n int) []*ethPeer {
var id enode.ID
rand.Read(id[:])
p2pPeer := p2p.NewPeer(id, "test", nil)
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil)
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil, nil)
peers[i] = &ethPeer{Peer: ep}
}
return peers

View file

@ -110,7 +110,7 @@ func MakeProtocols(backend Backend, network uint64, disc enode.Iterator) []p2p.P
Version: version,
Length: protocolLengths[version],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := NewPeer(version, p, rw, backend.TxPool())
peer := NewPeer(version, p, rw, backend.TxPool(), backend.Chain().Config())
defer peer.Close()
return backend.RunPeer(peer, func(peer *Peer) error {

View file

@ -77,7 +77,7 @@ func testHandshake(t *testing.T, protocol uint) {
defer app.Close()
defer net.Close()
peer := NewPeer(protocol, p2p.NewPeer(enode.ID{}, "peer", nil), net, nil)
peer := NewPeer(protocol, p2p.NewPeer(enode.ID{}, "peer", nil), net, nil, nil)
defer peer.Close()
// Send the junk test with one peer, check the handshake failure

View file

@ -51,6 +51,7 @@ const (
type receiptRequest struct {
request []common.Hash // block hashes corresponding to the requested receipts
gasUsed []uint64 // block gas used corresponding to the requested receipts
timestamps []uint64 // block timestamps corresponding to the requested receipts
list []*ReceiptList // list of partially collected receipts
lastLogSize uint64 // log size of last receipt list
}
@ -75,6 +76,8 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
chainConfig *params.ChainConfig // Chain configuration for fork-aware validation
receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts
receiptBufferLock sync.Mutex // Lock for protecting the receiptBuffer
@ -83,7 +86,7 @@ type Peer struct {
// NewPeer creates a wrapper for a network connection and negotiated protocol
// version.
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, chainConfig *params.ChainConfig) *Peer {
cap := p2p.Cap{Name: ProtocolName, Version: version}
id := p.ID().String()
peer := &Peer{
@ -99,6 +102,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
chainConfig: chainConfig,
receiptBuffer: make(map[uint64]*receiptRequest),
term: make(chan struct{}),
}
@ -355,9 +359,9 @@ func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Reques
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
// gasUsed provides the total gas used per block, used to estimate the maximum
// log byte size.
func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *Response) (*Request, error) {
// `gasUsed` provides the total gas used per block, used to estimate the maximum
// log byte size. `timestamps` provides the block timestamps for fork aware validation.
func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, timestamps []uint64, sink chan *Response) (*Request, error) {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
id := rand.Uint64()
@ -377,8 +381,9 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan
}
p.receiptBufferLock.Lock()
p.receiptBuffer[id] = &receiptRequest{
request: hashes,
gasUsed: gasUsed,
request: hashes,
gasUsed: gasUsed,
timestamps: timestamps,
}
p.receiptBufferLock.Unlock()
} else {
@ -459,7 +464,8 @@ func (p *Peer) bufferReceipts(requestId uint64, receiptLists []*ReceiptList, las
lastBlock += len(buffer.list) - 1
}
gasUsed := buffer.gasUsed[lastBlock]
logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed)
timestamp := buffer.timestamps[lastBlock]
logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed, timestamp)
if err != nil {
delete(p.receiptBuffer, requestId)
return err
@ -508,7 +514,7 @@ func (p *Peer) flushReceipts(requestId uint64) []*ReceiptList {
// Note that the last receipt response (which completes receiptLists of a pending block)
// is not verified here. Those response doesn't need hueristics below since they can be
// verified by its trie root.
func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, gasUsed uint64) (uint64, error) {
func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, gasUsed uint64, timestamp uint64) (uint64, error) {
lastReceipts := receiptLists[len(receiptLists)-1]
// If the receipt is in the middle of retrieval, use the buffered data.
@ -525,7 +531,13 @@ func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64,
}
// Verify that the total number of transactions delivered is under the limit.
if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/21_000 {
var minTxGas uint64
if p.chainConfig.AmsterdamTime != nil && *p.chainConfig.AmsterdamTime <= timestamp {
minTxGas = 4500
} else {
minTxGas = 21000
}
if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/minTxGas {
// should be dropped, don't clear the buffer
return 0, fmt.Errorf("total number of tx exceeded limit")
}

View file

@ -52,7 +52,7 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
var id enode.ID
rand.Read(id[:])
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool(), nil)
errc := make(chan error, 1)
go func() {
defer app.Close()
@ -115,7 +115,7 @@ func TestPartialReceipt(t *testing.T) {
t.Fatalf("failed to create random peer: %v", err)
}
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil)
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil, nil)
packetCh := make(chan *GetReceiptsPacket70, 1)
go func() {
@ -149,9 +149,15 @@ func TestPartialReceipt(t *testing.T) {
backend.chain.GetBlockByNumber(3).GasUsed(),
backend.chain.GetBlockByNumber(4).GasUsed(),
}
timestamps := []uint64{
backend.chain.GetBlockByNumber(1).Time(),
backend.chain.GetBlockByNumber(2).Time(),
backend.chain.GetBlockByNumber(3).Time(),
backend.chain.GetBlockByNumber(4).Time(),
}
sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, gasUsed, sink)
req, err := peer.RequestReceipts(hashes, gasUsed, timestamps, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}
@ -313,7 +319,7 @@ func TestPartialReceiptFailure(t *testing.T) {
t.Fatalf("failed to create random peer: %v", err)
}
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil)
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil, nil)
packetCh := make(chan *GetReceiptsPacket70, 1)
go func() {
@ -370,10 +376,16 @@ func TestPartialReceiptFailure(t *testing.T) {
backend.chain.GetBlockByNumber(3).GasUsed(),
backend.chain.GetBlockByNumber(4).GasUsed(),
}
timestamps := []uint64{
backend.chain.GetBlockByNumber(1).Time(),
backend.chain.GetBlockByNumber(2).Time(),
backend.chain.GetBlockByNumber(3).Time(),
backend.chain.GetBlockByNumber(4).Time(),
}
// Case 1 ) The number of receipts exceeds maximum tx count
sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, gasUsed, sink)
req, err := peer.RequestReceipts(hashes, gasUsed, timestamps, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}
@ -406,7 +418,7 @@ func TestPartialReceiptFailure(t *testing.T) {
}
// Case 2 ) Total receipt size exceeds the block gas limit
req, err = peer.RequestReceipts(hashes, gasUsed, sink)
req, err = peer.RequestReceipts(hashes, gasUsed, timestamps, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}

View file

@ -50,8 +50,8 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) {
defer emptyPipeEth.Close()
defer fullPipeEth.Close()
emptyPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{1}, "", caps), emptyPipeEth, empty.txpool)
fullPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{2}, "", caps), fullPipeEth, full.txpool)
emptyPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{1}, "", caps), emptyPipeEth, empty.txpool, nil)
fullPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{2}, "", caps), fullPipeEth, full.txpool, nil)
defer emptyPeerEth.Close()
defer fullPeerEth.Close()