1
0
Fork 0
forked from forks/go-ethereum

eth/protocols/eth: implement eth/69 (#29158)

This PR implements eth/69. This protocol version drops the bloom filter
from receipts messages, reducing the amount of data needed for a sync
by ~530GB (2.3B txs * 256 byte) uncompressed. Compressed this will
be reduced to ~100GB

The new version also changes the Status message and introduces the
BlockRangeUpdate message to relay information about the available history
range.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Marius van der Wijden 2025-05-16 17:10:47 +02:00 committed by GitHub
parent 892a661ee2
commit 7e79254605
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 1522 additions and 398 deletions

View file

@ -66,10 +66,9 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
return nil, err return nil, err
} }
conn.caps = []p2p.Cap{ conn.caps = []p2p.Cap{
{Name: "eth", Version: 67}, {Name: "eth", Version: 69},
{Name: "eth", Version: 68},
} }
conn.ourHighestProtoVersion = 68 conn.ourHighestProtoVersion = 69
return &conn, nil return &conn, nil
} }
@ -156,7 +155,7 @@ func (c *Conn) ReadEth() (any, error) {
var msg any var msg any
switch int(code) { switch int(code) {
case eth.StatusMsg: case eth.StatusMsg:
msg = new(eth.StatusPacket) msg = new(eth.StatusPacket69)
case eth.GetBlockHeadersMsg: case eth.GetBlockHeadersMsg:
msg = new(eth.GetBlockHeadersPacket) msg = new(eth.GetBlockHeadersPacket)
case eth.BlockHeadersMsg: case eth.BlockHeadersMsg:
@ -231,7 +230,7 @@ func (c *Conn) ReadSnap() (any, error) {
// peer performs both the protocol handshake and the status message // peer performs both the protocol handshake and the status message
// exchange with the node in order to peer with it. // exchange with the node in order to peer with it.
func (c *Conn) peer(chain *Chain, status *eth.StatusPacket) error { func (c *Conn) peer(chain *Chain, status *eth.StatusPacket69) error {
if err := c.handshake(); err != nil { if err := c.handshake(); err != nil {
return fmt.Errorf("handshake failed: %v", err) return fmt.Errorf("handshake failed: %v", err)
} }
@ -304,7 +303,7 @@ func (c *Conn) negotiateEthProtocol(caps []p2p.Cap) {
} }
// statusExchange performs a `Status` message exchange with the given node. // statusExchange performs a `Status` message exchange with the given node.
func (c *Conn) statusExchange(chain *Chain, status *eth.StatusPacket) error { func (c *Conn) statusExchange(chain *Chain, status *eth.StatusPacket69) error {
loop: loop:
for { for {
code, data, err := c.Read() code, data, err := c.Read()
@ -313,11 +312,15 @@ loop:
} }
switch code { switch code {
case eth.StatusMsg + protoOffset(ethProto): case eth.StatusMsg + protoOffset(ethProto):
msg := new(eth.StatusPacket) msg := new(eth.StatusPacket69)
if err := rlp.DecodeBytes(data, &msg); err != nil { if err := rlp.DecodeBytes(data, &msg); err != nil {
return fmt.Errorf("error decoding status packet: %w", err) return fmt.Errorf("error decoding status packet: %w", err)
} }
if have, want := msg.Head, chain.blocks[chain.Len()-1].Hash(); have != want { if have, want := msg.LatestBlock, chain.blocks[chain.Len()-1].NumberU64(); have != want {
return fmt.Errorf("wrong head block in status, want: %d, have %d",
want, have)
}
if have, want := msg.LatestBlockHash, chain.blocks[chain.Len()-1].Hash(); have != want {
return fmt.Errorf("wrong head block in status, want: %#x (block %d) have %#x", return fmt.Errorf("wrong head block in status, want: %#x (block %d) have %#x",
want, chain.blocks[chain.Len()-1].NumberU64(), have) want, chain.blocks[chain.Len()-1].NumberU64(), have)
} }
@ -348,13 +351,14 @@ loop:
} }
if status == nil { if status == nil {
// default status message // default status message
status = &eth.StatusPacket{ status = &eth.StatusPacket69{
ProtocolVersion: uint32(c.negotiatedProtoVersion), ProtocolVersion: uint32(c.negotiatedProtoVersion),
NetworkID: chain.config.ChainID.Uint64(), NetworkID: chain.config.ChainID.Uint64(),
TD: chain.TD(),
Head: chain.blocks[chain.Len()-1].Hash(),
Genesis: chain.blocks[0].Hash(), Genesis: chain.blocks[0].Hash(),
ForkID: chain.ForkID(), ForkID: chain.ForkID(),
EarliestBlock: 0,
LatestBlock: chain.blocks[chain.Len()-1].NumberU64(),
LatestBlockHash: chain.blocks[chain.Len()-1].Hash(),
} }
} }
if err := c.Write(ethProto, eth.StatusMsg, status); err != nil { if err := c.Write(ethProto, eth.StatusMsg, status); err != nil {

View file

@ -32,7 +32,7 @@ const (
// Unexported devp2p protocol lengths from p2p package. // Unexported devp2p protocol lengths from p2p package.
const ( const (
baseProtoLen = 16 baseProtoLen = 16
ethProtoLen = 17 ethProtoLen = 18
snapProtoLen = 8 snapProtoLen = 8
) )

View file

@ -74,8 +74,9 @@ func (s *Suite) EthTests() []utesting.Test {
{Name: "SimultaneousRequests", Fn: s.TestSimultaneousRequests}, {Name: "SimultaneousRequests", Fn: s.TestSimultaneousRequests},
{Name: "SameRequestID", Fn: s.TestSameRequestID}, {Name: "SameRequestID", Fn: s.TestSameRequestID},
{Name: "ZeroRequestID", Fn: s.TestZeroRequestID}, {Name: "ZeroRequestID", Fn: s.TestZeroRequestID},
// get block bodies // get history
{Name: "GetBlockBodies", Fn: s.TestGetBlockBodies}, {Name: "GetBlockBodies", Fn: s.TestGetBlockBodies},
{Name: "GetReceipts", Fn: s.TestGetReceipts},
// // malicious handshakes + status // // malicious handshakes + status
{Name: "MaliciousHandshake", Fn: s.TestMaliciousHandshake}, {Name: "MaliciousHandshake", Fn: s.TestMaliciousHandshake},
// test transactions // test transactions
@ -418,6 +419,51 @@ func (s *Suite) TestGetBlockBodies(t *utesting.T) {
} }
} }
func (s *Suite) TestGetReceipts(t *utesting.T) {
t.Log(`This test sends GetReceipts requests to the node for known blocks in the test chain.`)
conn, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
// Find some blocks containing receipts.
var hashes = make([]common.Hash, 0, 3)
for i := range s.chain.Len() {
block := s.chain.GetBlock(i)
if len(block.Transactions()) > 0 {
hashes = append(hashes, block.Hash())
}
if len(hashes) == cap(hashes) {
break
}
}
// Create block bodies request.
req := &eth.GetReceiptsPacket{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket[*eth.ReceiptList69])
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block bodies msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
}
if len(resp.List) != len(req.GetReceiptsRequest) {
t.Fatalf("wrong bodies in response: expected %d bodies, got %d", len(req.GetReceiptsRequest), len(resp.List))
}
}
// randBuf makes a random buffer size kilobytes large. // randBuf makes a random buffer size kilobytes large.
func randBuf(size int) []byte { func randBuf(size int) []byte {
buf := make([]byte, size*1024) buf := make([]byte, size*1024)
@ -500,6 +546,31 @@ func (s *Suite) TestMaliciousHandshake(t *utesting.T) {
} }
} }
func (s *Suite) TestInvalidBlockRangeUpdate(t *utesting.T) {
t.Log(`This test sends an invalid BlockRangeUpdate message to the node and expects to be disconnected.`)
conn, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
conn.Write(ethProto, eth.BlockRangeUpdateMsg, &eth.BlockRangeUpdatePacket{
EarliestBlock: 10,
LatestBlock: 8,
LatestBlockHash: s.chain.GetBlock(8).Hash(),
})
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect, got err: %v", err)
} else if code != discMsg {
t.Fatalf("expected disconnect message, got msg code %d", code)
}
}
func (s *Suite) TestTransaction(t *utesting.T) { func (s *Suite) TestTransaction(t *utesting.T) {
t.Log(`This test sends a valid transaction to the node and checks if the t.Log(`This test sends a valid transaction to the node and checks if the
transaction gets propagated.`) transaction gets propagated.`)

View file

@ -309,7 +309,8 @@ func ImportHistory(chain *core.BlockChain, dir string, network string) error {
if err != nil { if err != nil {
return fmt.Errorf("error reading receipts %d: %w", it.Number(), err) return fmt.Errorf("error reading receipts %d: %w", it.Number(), err)
} }
if _, err := chain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{receipts}, 2^64-1); err != nil { encReceipts := types.EncodeBlockReceiptLists([]types.Receipts{receipts})
if _, err := chain.InsertReceiptChain([]*types.Block{block}, encReceipts, 2^64-1); err != nil {
return fmt.Errorf("error inserting body %d: %w", it.Number(), err) return fmt.Errorf("error inserting body %d: %w", it.Number(), err)
} }
imported += 1 imported += 1

View file

@ -1309,12 +1309,11 @@ const (
// //
// The optional ancientLimit can also be specified and chain segment before that // The optional ancientLimit can also be specified and chain segment before that
// will be directly stored in the ancient, getting rid of the chain migration. // will be directly stored in the ancient, getting rid of the chain migration.
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []rlp.RawValue, ancientLimit uint64) (int, error) {
// Verify the supplied headers before insertion without lock // Verify the supplied headers before insertion without lock
var headers []*types.Header var headers []*types.Header
for _, block := range blockChain { for _, block := range blockChain {
headers = append(headers, block.Header()) headers = append(headers, block.Header())
// Here we also validate that blob transactions in the block do not // Here we also validate that blob transactions in the block do not
// contain a sidecar. While the sidecar does not affect the block hash // contain a sidecar. While the sidecar does not affect the block hash
// or tx hash, sending blobs within a block is not allowed. // or tx hash, sending blobs within a block is not allowed.
@ -1357,11 +1356,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// //
// this function only accepts canonical chain data. All side chain will be reverted // this function only accepts canonical chain data. All side chain will be reverted
// eventually. // eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { writeAncient := func(blockChain types.Blocks, receiptChain []rlp.RawValue) (int, error) {
// Ensure genesis is in the ancient store // Ensure genesis is in the ancient store
if blockChain[0].NumberU64() == 1 { if blockChain[0].NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 { if frozen, _ := bc.db.Ancients(); frozen == 0 {
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}) writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList})
if err != nil { if err != nil {
log.Error("Error writing genesis to ancients", "err", err) log.Error("Error writing genesis to ancients", "err", err)
return 0, err return 0, err
@ -1404,7 +1403,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// existing local chain segments (reorg around the chain tip). The reorganized part // existing local chain segments (reorg around the chain tip). The reorganized part
// will be included in the provided chain segment, and stale canonical markers will be // will be included in the provided chain segment, and stale canonical markers will be
// silently rewritten. Therefore, no explicit reorg logic is needed. // silently rewritten. Therefore, no explicit reorg logic is needed.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { writeLive := func(blockChain types.Blocks, receiptChain []rlp.RawValue) (int, error) {
var ( var (
skipPresenceCheck = false skipPresenceCheck = false
batch = bc.db.NewBatch() batch = bc.db.NewBatch()
@ -1429,7 +1428,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database // Write all the data out into the database
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteBlock(batch, block) rawdb.WriteBlock(batch, block)
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) rawdb.WriteRawReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
// Write everything belongs to the blocks into the database. So that // Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts) // we can ensure all components of body is completed(body, receipts)
@ -2650,7 +2649,7 @@ func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, e
first = headers[0].Number.Uint64() first = headers[0].Number.Uint64()
) )
if first == 1 && frozen == 0 { if first == 1 && frozen == 0 {
_, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}) _, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList})
if err != nil { if err != nil {
log.Error("Error writing genesis to ancients", "err", err) log.Error("Error writing genesis to ancients", "err", err)
return 0, err return 0, err

View file

@ -234,12 +234,22 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return receipts return receipts
} }
func (bc *BlockChain) GetRawReceiptsByHash(hash common.Hash) types.Receipts { // GetRawReceipts retrieves the receipts for all transactions in a given block
// without deriving the internal fields and the Bloom.
func (bc *BlockChain) GetRawReceipts(hash common.Hash, number uint64) types.Receipts {
if receipts, ok := bc.receiptsCache.Get(hash); ok {
return receipts
}
return rawdb.ReadRawReceipts(bc.db, hash, number)
}
// GetReceiptsRLP retrieves the receipts of a block.
func (bc *BlockChain) GetReceiptsRLP(hash common.Hash) rlp.RawValue {
number := rawdb.ReadHeaderNumber(bc.db, hash) number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil { if number == nil {
return nil return nil
} }
return rawdb.ReadRawReceipts(bc.db, hash, *number) return rawdb.ReadReceiptsRLP(bc.db, hash, *number)
} }
// GetUnclesInChain retrieves all the uncles from a given block backwards until // GetUnclesInChain retrieves all the uncles from a given block backwards until

View file

@ -734,7 +734,7 @@ func testFastVsFullChains(t *testing.T, scheme string) {
fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer fast.Stop() defer fast.Stop()
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { if n, err := fast.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
// Freezer style fast import the chain. // Freezer style fast import the chain.
@ -747,7 +747,7 @@ func testFastVsFullChains(t *testing.T, scheme string) {
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop() defer ancient.Stop()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil { if n, err := ancient.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
@ -871,7 +871,7 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer fast.Stop() defer fast.Stop()
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { if n, err := fast.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
assert(t, "fast", fast, height, height, 0) assert(t, "fast", fast, height, height, 0)
@ -884,7 +884,7 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop() defer ancient.Stop()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { if n, err := ancient.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
assert(t, "ancient", ancient, height, height, 0) assert(t, "ancient", ancient, height, height, 0)
@ -1696,7 +1696,7 @@ func testBlockchainRecovery(t *testing.T, scheme string) {
defer ancientDb.Close() defer ancientDb.Close()
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { if n, err := ancient.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
rawdb.WriteLastPivotNumber(ancientDb, blocks[len(blocks)-1].NumberU64()) // Force fast sync behavior rawdb.WriteLastPivotNumber(ancientDb, blocks[len(blocks)-1].NumberU64()) // Force fast sync behavior
@ -1991,7 +1991,7 @@ func testInsertKnownChainData(t *testing.T, typ string, scheme string) {
} }
} else if typ == "receipts" { } else if typ == "receipts" {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error { inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
_, err = chain.InsertReceiptChain(blocks, receipts, 0) _, err = chain.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0)
return err return err
} }
asserter = func(t *testing.T, block *types.Block) { asserter = func(t *testing.T, block *types.Block) {
@ -2157,7 +2157,7 @@ func testInsertKnownChainDataWithMerging(t *testing.T, typ string, mergeHeight i
} }
} else if typ == "receipts" { } else if typ == "receipts" {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error { inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
_, err = chain.InsertReceiptChain(blocks, receipts, 0) _, err = chain.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0)
return err return err
} }
asserter = func(t *testing.T, block *types.Block) { asserter = func(t *testing.T, block *types.Block) {
@ -4205,10 +4205,10 @@ func testChainReorgSnapSync(t *testing.T, ancientLimit uint64) {
chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil) chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil)
defer chain.Stop() defer chain.Stop()
if n, err := chain.InsertReceiptChain(blocks, receipts, ancientLimit); err != nil { if n, err := chain.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
if n, err := chain.InsertReceiptChain(chainA, receiptsA, ancientLimit); err != nil { if n, err := chain.InsertReceiptChain(chainA, types.EncodeBlockReceiptLists(receiptsA), ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
// If the common ancestor is below the ancient limit, rewind the chain head. // If the common ancestor is below the ancient limit, rewind the chain head.
@ -4218,7 +4218,7 @@ func testChainReorgSnapSync(t *testing.T, ancientLimit uint64) {
rawdb.WriteLastPivotNumber(db, ancestor) rawdb.WriteLastPivotNumber(db, ancestor)
chain.SetHead(ancestor) chain.SetHead(ancestor)
} }
if n, err := chain.InsertReceiptChain(chainB, receiptsB, ancientLimit); err != nil { if n, err := chain.InsertReceiptChain(chainB, types.EncodeBlockReceiptLists(receiptsB), ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
head := chain.CurrentSnapBlock() head := chain.CurrentSnapBlock()
@ -4336,7 +4336,7 @@ func testInsertChainWithCutoff(t *testing.T, cutoff uint64, ancientLimit uint64,
if n, err := chain.InsertHeadersBeforeCutoff(headersBefore); err != nil { if n, err := chain.InsertHeadersBeforeCutoff(headersBefore); err != nil {
t.Fatalf("failed to insert headers before cutoff %d: %v", n, err) t.Fatalf("failed to insert headers before cutoff %d: %v", n, err)
} }
if n, err := chain.InsertReceiptChain(blocksAfter, receiptsAfter, ancientLimit); err != nil { if n, err := chain.InsertReceiptChain(blocksAfter, types.EncodeBlockReceiptLists(receiptsAfter), ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err) t.Fatalf("failed to insert receipt %d: %v", n, err)
} }
headSnap := chain.CurrentSnapBlock() headSnap := chain.CurrentSnapBlock()

View file

@ -29,7 +29,7 @@ type blockchain interface {
GetHeader(hash common.Hash, number uint64) *types.Header GetHeader(hash common.Hash, number uint64) *types.Header
GetCanonicalHash(number uint64) common.Hash GetCanonicalHash(number uint64) common.Hash
GetReceiptsByHash(hash common.Hash) types.Receipts GetReceiptsByHash(hash common.Hash) types.Receipts
GetRawReceiptsByHash(hash common.Hash) types.Receipts GetRawReceipts(hash common.Hash, number uint64) types.Receipts
} }
// ChainView represents an immutable view of a chain with a block id and a set // ChainView represents an immutable view of a chain with a block id and a set
@ -117,7 +117,7 @@ func (cv *ChainView) RawReceipts(number uint64) types.Receipts {
log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber) log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber)
return nil return nil
} }
return cv.chain.GetRawReceiptsByHash(blockHash) return cv.chain.GetRawReceipts(blockHash, number)
} }
// SharedRange returns the block range shared by two chain views. // SharedRange returns the block range shared by two chain views.

View file

@ -515,7 +515,7 @@ func (tc *testChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return tc.receipts[hash] return tc.receipts[hash]
} }
func (tc *testChain) GetRawReceiptsByHash(hash common.Hash) types.Receipts { func (tc *testChain) GetRawReceipts(hash common.Hash, number uint64) types.Receipts {
tc.lock.RLock() tc.lock.RLock()
defer tc.lock.RUnlock() defer tc.lock.RUnlock()

View file

@ -545,8 +545,8 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa
} }
// ReadRawReceipts retrieves all the transaction receipts belonging to a block. // ReadRawReceipts retrieves all the transaction receipts belonging to a block.
// The receipt metadata fields are not guaranteed to be populated, so they // The receipt metadata fields and the Bloom are not guaranteed to be populated,
// should not be used. Use ReadReceipts instead if the metadata is needed. // so they should not be used. Use ReadReceipts instead if the metadata is needed.
func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts { func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts {
// Retrieve the flattened receipt slice // Retrieve the flattened receipt slice
data := ReadReceiptsRLP(db, hash, number) data := ReadReceiptsRLP(db, hash, number)
@ -621,6 +621,14 @@ func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rec
} }
} }
// WriteRawReceipts stores all the transaction receipts belonging to a block.
func WriteRawReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, receipts rlp.RawValue) {
// Store the flattened receipt slice
if err := db.Put(blockReceiptsKey(number, hash), receipts); err != nil {
log.Crit("Failed to store block receipts", "err", err)
}
}
// DeleteReceipts removes all receipt data associated with a block hash. // DeleteReceipts removes all receipt data associated with a block hash.
func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(blockReceiptsKey(number, hash)); err != nil { if err := db.Delete(blockReceiptsKey(number, hash)); err != nil {
@ -701,18 +709,11 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
} }
// WriteAncientBlocks writes entire block data into ancient store and returns the total written size. // WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts) (int64, error) { func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []rlp.RawValue) (int64, error) {
var stReceipts []*types.ReceiptForStorage
return db.ModifyAncients(func(op ethdb.AncientWriteOp) error { return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i, block := range blocks { for i, block := range blocks {
// Convert receipts to storage format and sum up total difficulty.
stReceipts = stReceipts[:0]
for _, receipt := range receipts[i] {
stReceipts = append(stReceipts, (*types.ReceiptForStorage)(receipt))
}
header := block.Header() header := block.Header()
if err := writeAncientBlock(op, block, header, stReceipts); err != nil { if err := writeAncientBlock(op, block, header, receipts[i]); err != nil {
return err return err
} }
} }
@ -720,7 +721,7 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
}) })
} }
func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage) error { func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts rlp.RawValue) error {
num := block.NumberU64() num := block.NumberU64()
if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil { if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err) return fmt.Errorf("can't add block %d hash: %v", num, err)

View file

@ -377,7 +377,11 @@ func TestBlockReceiptStorage(t *testing.T) {
t.Fatalf("receipts returned when body was deleted: %v", rs) t.Fatalf("receipts returned when body was deleted: %v", rs)
} }
// Ensure that receipts without metadata can be returned without the block body too // Ensure that receipts without metadata can be returned without the block body too
if err := checkReceiptsRLP(ReadRawReceipts(db, hash, 0), receipts); err != nil { raw := ReadRawReceipts(db, hash, 0)
for _, r := range raw {
r.Bloom = types.CreateBloom(r)
}
if err := checkReceiptsRLP(raw, receipts); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Sanity check that body alone without the receipt is a full purge // Sanity check that body alone without the receipt is a full purge
@ -439,7 +443,7 @@ func TestAncientStorage(t *testing.T) {
} }
// Write and verify the header in the database // Write and verify the header in the database
WriteAncientBlocks(db, []*types.Block{block}, []types.Receipts{nil}) WriteAncientBlocks(db, []*types.Block{block}, types.EncodeBlockReceiptLists([]types.Receipts{nil}))
if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 { if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 {
t.Fatalf("no header returned") t.Fatalf("no header returned")
@ -609,7 +613,7 @@ func BenchmarkWriteAncientBlocks(b *testing.B) {
blocks := allBlocks[i : i+length] blocks := allBlocks[i : i+length]
receipts := batchReceipts[:length] receipts := batchReceipts[:length]
writeSize, err := WriteAncientBlocks(db, blocks, receipts) writeSize, err := WriteAncientBlocks(db, blocks, types.EncodeBlockReceiptLists(receipts))
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
@ -909,7 +913,7 @@ func TestHeadersRLPStorage(t *testing.T) {
} }
receipts := make([]types.Receipts, 100) receipts := make([]types.Receipts, 100)
// Write first half to ancients // Write first half to ancients
WriteAncientBlocks(db, chain[:50], receipts[:50]) WriteAncientBlocks(db, chain[:50], types.EncodeBlockReceiptLists(receipts[:50]))
// Write second half to db // Write second half to db
for i := 50; i < 100; i++ { for i := 50; i < 100; i++ {
WriteCanonicalHash(db, chain[i].Hash(), chain[i].NumberU64()) WriteCanonicalHash(db, chain[i].Hash(), chain[i].NumberU64())

View file

@ -117,7 +117,7 @@ func TestTxIndexer(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false) db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...)) rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...)))
// Index the initial blocks from ancient store // Index the initial blocks from ancient store
indexer := &txIndexer{ indexer := &txIndexer{
@ -236,7 +236,8 @@ func TestTxIndexerRepair(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false) db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...)) encReceipts := types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...))
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), encReceipts)
// Index the initial blocks from ancient store // Index the initial blocks from ancient store
indexer := &txIndexer{ indexer := &txIndexer{
@ -426,7 +427,8 @@ func TestTxIndexerReport(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false) db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...)) encReceipts := types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...))
rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), encReceipts)
// Index the initial blocks from ancient store // Index the initial blocks from ancient store
indexer := &txIndexer{ indexer := &txIndexer{

View file

@ -59,11 +59,12 @@ func (b *Bloom) SetBytes(d []byte) {
// Add adds d to the filter. Future calls of Test(d) will return true. // Add adds d to the filter. Future calls of Test(d) will return true.
func (b *Bloom) Add(d []byte) { func (b *Bloom) Add(d []byte) {
b.add(d, make([]byte, 6)) var buf [6]byte
b.AddWithBuffer(d, &buf)
} }
// add is internal version of Add, which takes a scratch buffer for reuse (needs to be at least 6 bytes) // add is internal version of Add, which takes a scratch buffer for reuse (needs to be at least 6 bytes)
func (b *Bloom) add(d []byte, buf []byte) { func (b *Bloom) AddWithBuffer(d []byte, buf *[6]byte) {
i1, v1, i2, v2, i3, v3 := bloomValues(d, buf) i1, v1, i2, v2, i3, v3 := bloomValues(d, buf)
b[i1] |= v1 b[i1] |= v1
b[i2] |= v2 b[i2] |= v2
@ -84,7 +85,8 @@ func (b Bloom) Bytes() []byte {
// Test checks if the given topic is present in the bloom filter // Test checks if the given topic is present in the bloom filter
func (b Bloom) Test(topic []byte) bool { func (b Bloom) Test(topic []byte) bool {
i1, v1, i2, v2, i3, v3 := bloomValues(topic, make([]byte, 6)) var buf [6]byte
i1, v1, i2, v2, i3, v3 := bloomValues(topic, &buf)
return v1 == v1&b[i1] && return v1 == v1&b[i1] &&
v2 == v2&b[i2] && v2 == v2&b[i2] &&
v3 == v3&b[i3] v3 == v3&b[i3]
@ -104,12 +106,12 @@ func (b *Bloom) UnmarshalText(input []byte) error {
func CreateBloom(receipt *Receipt) Bloom { func CreateBloom(receipt *Receipt) Bloom {
var ( var (
bin Bloom bin Bloom
buf = make([]byte, 6) buf [6]byte
) )
for _, log := range receipt.Logs { for _, log := range receipt.Logs {
bin.add(log.Address.Bytes(), buf) bin.AddWithBuffer(log.Address.Bytes(), &buf)
for _, b := range log.Topics { for _, b := range log.Topics {
bin.add(b[:], buf) bin.AddWithBuffer(b[:], &buf)
} }
} }
return bin return bin
@ -139,21 +141,20 @@ func Bloom9(data []byte) []byte {
} }
// bloomValues returns the bytes (index-value pairs) to set for the given data // bloomValues returns the bytes (index-value pairs) to set for the given data
func bloomValues(data []byte, hashbuf []byte) (uint, byte, uint, byte, uint, byte) { func bloomValues(data []byte, hashbuf *[6]byte) (uint, byte, uint, byte, uint, byte) {
sha := hasherPool.Get().(crypto.KeccakState) sha := hasherPool.Get().(crypto.KeccakState)
sha.Reset() sha.Reset()
sha.Write(data) sha.Write(data)
sha.Read(hashbuf) sha.Read(hashbuf[:])
hasherPool.Put(sha) hasherPool.Put(sha)
// The actual bits to flip // The actual bits to flip
v1 := byte(1 << (hashbuf[1] & 0x7)) v1 := byte(1 << (hashbuf[1] & 0x7))
v2 := byte(1 << (hashbuf[3] & 0x7)) v2 := byte(1 << (hashbuf[3] & 0x7))
v3 := byte(1 << (hashbuf[5] & 0x7)) v3 := byte(1 << (hashbuf[5] & 0x7))
// The indices for the bytes to OR in // The indices for the bytes to OR in
i1 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf)&0x7ff)>>3) - 1 i1 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[0:])&0x7ff)>>3) - 1
i2 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1 i2 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1
i3 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1 i3 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1
return i1, v1, i2, v2, i3, v3 return i1, v1, i2, v2, i3, v3
} }

View file

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
@ -258,7 +259,7 @@ func (r *Receipt) Size() common.StorageSize {
} }
// ReceiptForStorage is a wrapper around a Receipt with RLP serialization // ReceiptForStorage is a wrapper around a Receipt with RLP serialization
// that omits the Bloom field and deserialization that re-computes it. // that omits the Bloom field. The Bloom field is recomputed by DeriveFields.
type ReceiptForStorage Receipt type ReceiptForStorage Receipt
// EncodeRLP implements rlp.Encoder, and flattens all content fields of a receipt // EncodeRLP implements rlp.Encoder, and flattens all content fields of a receipt
@ -291,7 +292,6 @@ func (r *ReceiptForStorage) DecodeRLP(s *rlp.Stream) error {
} }
r.CumulativeGasUsed = stored.CumulativeGasUsed r.CumulativeGasUsed = stored.CumulativeGasUsed
r.Logs = stored.Logs r.Logs = stored.Logs
r.Bloom = CreateBloom((*Receipt)(r))
return nil return nil
} }
@ -372,6 +372,26 @@ func (rs Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, nu
rs[i].Logs[j].Index = logIndex rs[i].Logs[j].Index = logIndex
logIndex++ logIndex++
} }
// also derive the Bloom if not derived yet
rs[i].Bloom = CreateBloom(rs[i])
} }
return nil return nil
} }
// EncodeBlockReceiptLists encodes a list of block receipt lists into RLP.
func EncodeBlockReceiptLists(receipts []Receipts) []rlp.RawValue {
var storageReceipts []*ReceiptForStorage
result := make([]rlp.RawValue, len(receipts))
for i, receipt := range receipts {
storageReceipts = storageReceipts[:0]
for _, r := range receipt {
storageReceipts = append(storageReceipts, (*ReceiptForStorage)(r))
}
bytes, err := rlp.EncodeToBytes(storageReceipts)
if err != nil {
log.Crit("Failed to encode block receipts", "err", err)
}
result[i] = bytes
}
return result
}

View file

@ -22,6 +22,7 @@ import (
"math" "math"
"math/big" "math/big"
"reflect" "reflect"
"sync"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -154,9 +155,16 @@ var (
blockNumber = big.NewInt(1) blockNumber = big.NewInt(1)
blockTime = uint64(2) blockTime = uint64(2)
blockHash = common.BytesToHash([]byte{0x03, 0x14}) blockHash = common.BytesToHash([]byte{0x03, 0x14})
)
var receiptsOnce sync.Once
var testReceipts Receipts
func getTestReceipts() Receipts {
// Compute the blooms only once
receiptsOnce.Do(func() {
// Create the corresponding receipts // Create the corresponding receipts
receipts = Receipts{ r := Receipts{
&Receipt{ &Receipt{
Status: ReceiptStatusFailed, Status: ReceiptStatusFailed,
CumulativeGasUsed: 1, CumulativeGasUsed: 1,
@ -294,7 +302,13 @@ var (
TransactionIndex: 6, TransactionIndex: 6,
}, },
} }
) for _, receipt := range r {
receipt.Bloom = CreateBloom(receipt)
}
testReceipts = r
})
return testReceipts
}
func TestDecodeEmptyTypedReceipt(t *testing.T) { func TestDecodeEmptyTypedReceipt(t *testing.T) {
input := []byte{0x80} input := []byte{0x80}
@ -310,6 +324,7 @@ func TestDeriveFields(t *testing.T) {
// Re-derive receipts. // Re-derive receipts.
basefee := big.NewInt(1000) basefee := big.NewInt(1000)
blobGasPrice := big.NewInt(920) blobGasPrice := big.NewInt(920)
receipts := getTestReceipts()
derivedReceipts := clearComputedFieldsOnReceipts(receipts) derivedReceipts := clearComputedFieldsOnReceipts(receipts)
err := Receipts(derivedReceipts).DeriveFields(params.TestChainConfig, blockHash, blockNumber.Uint64(), blockTime, basefee, blobGasPrice, txs) err := Receipts(derivedReceipts).DeriveFields(params.TestChainConfig, blockHash, blockNumber.Uint64(), blockTime, basefee, blobGasPrice, txs)
if err != nil { if err != nil {
@ -335,6 +350,7 @@ func TestDeriveFields(t *testing.T) {
// Test that we can marshal/unmarshal receipts to/from json without errors. // Test that we can marshal/unmarshal receipts to/from json without errors.
// This also confirms that our test receipts contain all the required fields. // This also confirms that our test receipts contain all the required fields.
func TestReceiptJSON(t *testing.T) { func TestReceiptJSON(t *testing.T) {
receipts := getTestReceipts()
for i := range receipts { for i := range receipts {
b, err := receipts[i].MarshalJSON() b, err := receipts[i].MarshalJSON()
if err != nil { if err != nil {
@ -351,6 +367,7 @@ func TestReceiptJSON(t *testing.T) {
// Test we can still parse receipt without EffectiveGasPrice for backwards compatibility, even // Test we can still parse receipt without EffectiveGasPrice for backwards compatibility, even
// though it is required per the spec. // though it is required per the spec.
func TestEffectiveGasPriceNotRequired(t *testing.T) { func TestEffectiveGasPriceNotRequired(t *testing.T) {
receipts := getTestReceipts()
r := *receipts[0] r := *receipts[0]
r.EffectiveGasPrice = nil r.EffectiveGasPrice = nil
b, err := r.MarshalJSON() b, err := r.MarshalJSON()
@ -511,6 +528,7 @@ func clearComputedFieldsOnReceipt(receipt *Receipt) *Receipt {
cpy.EffectiveGasPrice = big.NewInt(0) cpy.EffectiveGasPrice = big.NewInt(0)
cpy.BlobGasUsed = 0 cpy.BlobGasUsed = 0
cpy.BlobGasPrice = nil cpy.BlobGasPrice = nil
cpy.Bloom = CreateBloom(&cpy)
return &cpy return &cpy
} }

View file

@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/triedb" "github.com/ethereum/go-ethereum/triedb"
) )
@ -202,7 +203,7 @@ type BlockChain interface {
// into the local chain. Blocks older than the specified `ancientLimit` // into the local chain. Blocks older than the specified `ancientLimit`
// are stored directly in the ancient store, while newer blocks are stored // are stored directly in the ancient store, while newer blocks are stored
// in the live key-value store. // in the live key-value store.
InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) InsertReceiptChain(types.Blocks, []rlp.RawValue, uint64) (int, error)
// Snapshots returns the blockchain snapshot tree to paused it during sync. // Snapshots returns the blockchain snapshot tree to paused it during sync.
Snapshots() *snapshot.Tree Snapshots() *snapshot.Tree
@ -1034,7 +1035,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
"lastnumn", last.Number, "lasthash", last.Hash(), "lastnumn", last.Number, "lasthash", last.Hash(),
) )
blocks := make([]*types.Block, len(results)) blocks := make([]*types.Block, len(results))
receipts := make([]types.Receipts, len(results)) receipts := make([]rlp.RawValue, len(results))
for i, result := range results { for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body()) blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body())
receipts[i] = result.Receipts receipts[i] = result.Receipts
@ -1051,7 +1052,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
log.Debug("Committing snap sync pivot as new head", "number", block.Number(), "hash", block.Hash()) log.Debug("Committing snap sync pivot as new head", "number", block.Number(), "hash", block.Hash())
// Commit the pivot block as the new head, will require full sync from here on // Commit the pivot block as the new head, will require full sync from here on
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil { if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
return err return err
} }
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil { if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {

View file

@ -255,23 +255,24 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// peer in the download tester. The returned function can be used to retrieve // peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer. // batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) { func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes) blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes)
receipts := make([][]*types.Receipt, len(blobs)) receipts := make([]types.Receipts, len(blobs))
for i, blob := range blobs { for i, blob := range blobs {
rlp.DecodeBytes(blob, &receipts[i]) rlp.DecodeBytes(blob, &receipts[i])
} }
hasher := trie.NewStackTrie(nil) hasher := trie.NewStackTrie(nil)
hashes = make([]common.Hash, len(receipts)) hashes = make([]common.Hash, len(receipts))
for i, receipt := range receipts { for i, receipt := range receipts {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) hashes[i] = types.DeriveSha(receipt, hasher)
} }
req := &eth.Request{ req := &eth.Request{
Peer: dlp.id, Peer: dlp.id,
} }
resp := eth.ReceiptsRLPResponse(types.EncodeBlockReceiptLists(receipts))
res := &eth.Response{ res := &eth.Response{
Req: req, Req: req,
Res: (*eth.ReceiptsResponse)(&receipts), Res: &resp,
Meta: hashes, Meta: hashes,
Time: 1, Time: 1,
Done: make(chan error, 1), // Ignore the returned status Done: make(chan error, 1), // Ignore the returned status

View file

@ -88,7 +88,7 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
// deliver is responsible for taking a generic response packet from the concurrent // deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the receipt data and delivering it to the downloader's queue. // fetcher, unpacking the receipt data and delivering it to the downloader's queue.
func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
receipts := *packet.Res.(*eth.ReceiptsResponse) receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes} hashes := packet.Meta.([]common.Hash) // {receipt hashes}
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes) accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)

View file

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
) )
const ( const (
@ -69,7 +70,7 @@ type fetchResult struct {
Header *types.Header Header *types.Header
Uncles []*types.Header Uncles []*types.Header
Transactions types.Transactions Transactions types.Transactions
Receipts types.Receipts Receipts rlp.RawValue
Withdrawals types.Withdrawals Withdrawals types.Withdrawals
} }
@ -318,9 +319,7 @@ func (q *queue) Results(block bool) []*fetchResult {
for _, uncle := range result.Uncles { for _, uncle := range result.Uncles {
size += uncle.Size() size += uncle.Size()
} }
for _, receipt := range result.Receipts { size += common.StorageSize(len(result.Receipts))
size += receipt.Size()
}
for _, tx := range result.Transactions { for _, tx := range result.Transactions {
size += common.StorageSize(tx.Size()) size += common.StorageSize(tx.Size())
} }
@ -631,7 +630,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
// DeliverReceipts injects a receipt retrieval response into the results queue. // DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery // The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery. // and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, receiptListHashes []common.Hash) (int, error) { func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()

View file

@ -358,16 +358,16 @@ func XTestDelivery(t *testing.T) {
for { for {
f, _, _ := q.ReserveReceipts(peer, rand.Intn(50)) f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
if f != nil { if f != nil {
var rcs [][]*types.Receipt var rcs []types.Receipts
for _, hdr := range f.Headers { for _, hdr := range f.Headers {
rcs = append(rcs, world.getReceipts(hdr.Number.Uint64())) rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
} }
hasher := trie.NewStackTrie(nil) hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(rcs)) hashes := make([]common.Hash, len(rcs))
for i, receipt := range rcs { for i, receipt := range rcs {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) hashes[i] = types.DeriveSha(receipt, hasher)
} }
_, err := q.DeliverReceipts(peer.id, rcs, hashes) _, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes)
if err != nil { if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err) fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
} }

View file

@ -80,11 +80,8 @@ func (b *testBackend) GetReceiptsByHash(hash common.Hash) types.Receipts {
return r return r
} }
func (b *testBackend) GetRawReceiptsByHash(hash common.Hash) types.Receipts { func (b *testBackend) GetRawReceipts(hash common.Hash, number uint64) types.Receipts {
if number := rawdb.ReadHeaderNumber(b.db, hash); number != nil { return rawdb.ReadRawReceipts(b.db, hash, number)
return rawdb.ReadRawReceipts(b.db, hash, *number)
}
return nil
} }
func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {

View file

@ -18,15 +18,16 @@ package eth
import ( import (
"errors" "errors"
"maps"
"math" "math"
"math/big" "math/big"
"slices"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -49,6 +50,9 @@ const (
// The number is referenced from the size of tx pool. // The number is referenced from the size of tx pool.
txChanSize = 4096 txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 128
// txMaxBroadcastSize is the max size of a transaction that will be broadcasted. // txMaxBroadcastSize is the max size of a transaction that will be broadcasted.
// All transactions with a higher size will be announced and need to be fetched // All transactions with a higher size will be announced and need to be fetched
// by the peer. // by the peer.
@ -106,7 +110,6 @@ type handlerConfig struct {
type handler struct { type handler struct {
nodeID enode.ID nodeID enode.ID
networkID uint64 networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
@ -123,6 +126,7 @@ type handler struct {
eventMux *event.TypeMux eventMux *event.TypeMux
txsCh chan core.NewTxsEvent txsCh chan core.NewTxsEvent
txsSub event.Subscription txsSub event.Subscription
blockRange *blockRangeState
requiredBlocks map[uint64]common.Hash requiredBlocks map[uint64]common.Hash
@ -144,7 +148,6 @@ func newHandler(config *handlerConfig) (*handler, error) {
h := &handler{ h := &handler{
nodeID: config.NodeID, nodeID: config.NodeID,
networkID: config.Network, networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux, eventMux: config.EventMux,
database: config.Database, database: config.Database,
txpool: config.TxPool, txpool: config.TxPool,
@ -257,14 +260,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
} }
// Execute the Ethereum handshake // Execute the Ethereum handshake
var ( if err := peer.Handshake(h.networkID, h.chain, h.blockRange.currentRange()); err != nil {
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
)
forkID := forkid.NewID(h.chain.Config(), genesis, number, head.Time)
if err := peer.Handshake(h.networkID, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err) peer.Log().Debug("Ethereum handshake failed", "err", err)
return err return err
} }
@ -435,6 +431,11 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop() go h.txBroadcastLoop()
// broadcast block range
h.wg.Add(1)
h.blockRange = newBlockRangeState(h.chain, h.eventMux)
go h.blockRangeLoop(h.blockRange)
// start sync handlers // start sync handlers
h.txFetcher.Start() h.txFetcher.Start()
@ -445,6 +446,7 @@ func (h *handler) Start(maxPeers int) {
func (h *handler) Stop() { func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.blockRange.stop()
h.txFetcher.Stop() h.txFetcher.Stop()
h.downloader.Terminate() h.downloader.Terminate()
@ -566,3 +568,129 @@ func (h *handler) enableSyncedFeatures() {
h.snapSync.Store(false) h.snapSync.Store(false)
} }
} }
// blockRangeState holds the state of the block range update broadcasting mechanism.
type blockRangeState struct {
prev eth.BlockRangeUpdatePacket
next atomic.Pointer[eth.BlockRangeUpdatePacket]
headCh chan core.ChainHeadEvent
headSub event.Subscription
syncSub *event.TypeMuxSubscription
}
func newBlockRangeState(chain *core.BlockChain, typeMux *event.TypeMux) *blockRangeState {
headCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
headSub := chain.SubscribeChainHeadEvent(headCh)
syncSub := typeMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
st := &blockRangeState{
headCh: headCh,
headSub: headSub,
syncSub: syncSub,
}
st.update(chain, chain.CurrentBlock())
st.prev = *st.next.Load()
return st
}
// blockRangeBroadcastLoop announces changes in locally-available block range to peers.
// The range to announce is the range that is available in the store, so it's not just
// about imported blocks.
func (h *handler) blockRangeLoop(st *blockRangeState) {
defer h.wg.Done()
for {
select {
case ev := <-st.syncSub.Chan():
if ev == nil {
continue
}
if _, ok := ev.Data.(downloader.StartEvent); ok && h.snapSync.Load() {
h.blockRangeWhileSnapSyncing(st)
}
case <-st.headCh:
st.update(h.chain, h.chain.CurrentBlock())
if st.shouldSend() {
h.broadcastBlockRange(st)
}
case <-st.headSub.Err():
return
}
}
}
// blockRangeWhileSnapSyncing announces block range updates during snap sync.
// Here we poll the CurrentSnapBlock on a timer and announce updates to it.
func (h *handler) blockRangeWhileSnapSyncing(st *blockRangeState) {
tick := time.NewTicker(1 * time.Minute)
defer tick.Stop()
for {
select {
case <-tick.C:
st.update(h.chain, h.chain.CurrentSnapBlock())
if st.shouldSend() {
h.broadcastBlockRange(st)
}
// back to processing head block updates when sync is done
case ev := <-st.syncSub.Chan():
if ev == nil {
continue
}
switch ev.Data.(type) {
case downloader.FailedEvent, downloader.DoneEvent:
return
}
// ignore head updates, but exit when the subscription ends
case <-st.headCh:
case <-st.headSub.Err():
return
}
}
}
// broadcastBlockRange sends a range update when one is due.
func (h *handler) broadcastBlockRange(state *blockRangeState) {
h.peers.lock.Lock()
peerlist := slices.Collect(maps.Values(h.peers.peers))
h.peers.lock.Unlock()
if len(peerlist) == 0 {
return
}
msg := state.currentRange()
log.Debug("Sending BlockRangeUpdate", "peers", len(peerlist), "earliest", msg.EarliestBlock, "latest", msg.LatestBlock)
for _, p := range peerlist {
p.SendBlockRangeUpdate(msg)
}
state.prev = *state.next.Load()
}
// update assigns the values of the next block range update from the chain.
func (st *blockRangeState) update(chain *core.BlockChain, latest *types.Header) {
earliest, _ := chain.HistoryPruningCutoff()
st.next.Store(&eth.BlockRangeUpdatePacket{
EarliestBlock: min(latest.Number.Uint64(), earliest),
LatestBlock: latest.Number.Uint64(),
LatestBlockHash: latest.Hash(),
})
}
// shouldSend decides whether it is time to send a block range update. We don't want to
// send these updates constantly, so they will usually only be sent every 32 blocks.
// However, there is a special case: if the range would move back, i.e. due to SetHead, we
// want to send it immediately.
func (st *blockRangeState) shouldSend() bool {
next := st.next.Load()
return next.LatestBlock < st.prev.LatestBlock ||
next.LatestBlock-st.prev.LatestBlock >= 32
}
func (st *blockRangeState) stop() {
st.syncSub.Unsubscribe()
st.headSub.Unsubscribe()
}
// currentRange returns the current block range.
// This is safe to call from any goroutine.
func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket {
return *st.next.Load()
}

View file

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -257,11 +256,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
return eth.Handle((*ethHandler)(handler.handler), peer) return eth.Handle((*ethHandler)(handler.handler), peer)
}) })
// Run the handshake locally to avoid spinning up a source handler // Run the handshake locally to avoid spinning up a source handler
var ( if err := src.Handshake(1, handler.chain, eth.BlockRangeUpdatePacket{}); err != nil {
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
)
if err := src.Handshake(1, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake") t.Fatalf("failed to run protocol handshake")
} }
// Send the transaction to the sink and verify that it's added to the tx pool // Send the transaction to the sink and verify that it's added to the tx pool
@ -316,11 +311,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
return eth.Handle((*ethHandler)(handler.handler), peer) return eth.Handle((*ethHandler)(handler.handler), peer)
}) })
// Run the handshake locally to avoid spinning up a source handler // Run the handshake locally to avoid spinning up a source handler
var ( if err := sink.Handshake(1, handler.chain, eth.BlockRangeUpdatePacket{}); err != nil {
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
)
if err := sink.Handshake(1, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake") t.Fatalf("failed to run protocol handshake")
} }
// After the handshake completes, the source handler should stream the sink // After the handshake completes, the source handler should stream the sink

View file

@ -17,6 +17,7 @@
package eth package eth
import ( import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/protocols/snap"
) )
@ -25,6 +26,13 @@ import (
// about a connected peer. // about a connected peer.
type ethPeerInfo struct { type ethPeerInfo struct {
Version uint `json:"version"` // Ethereum protocol version negotiated Version uint `json:"version"` // Ethereum protocol version negotiated
*peerBlockRange
}
type peerBlockRange struct {
Earliest uint64 `json:"earliestBlock"`
Latest uint64 `json:"latestBlock"`
LatestHash common.Hash `json:"latestBlockHash"`
} }
// ethPeer is a wrapper around eth.Peer to maintain a few extra metadata. // ethPeer is a wrapper around eth.Peer to maintain a few extra metadata.
@ -35,9 +43,15 @@ type ethPeer struct {
// info gathers and returns some `eth` protocol metadata known about a peer. // info gathers and returns some `eth` protocol metadata known about a peer.
func (p *ethPeer) info() *ethPeerInfo { func (p *ethPeer) info() *ethPeerInfo {
return &ethPeerInfo{ info := &ethPeerInfo{Version: p.Version()}
Version: p.Version(), if br := p.BlockRange(); br != nil {
info.peerBlockRange = &peerBlockRange{
Earliest: br.EarliestBlock,
Latest: br.LatestBlock,
LatestHash: br.LatestBlockHash,
} }
}
return info
} }
// snapPeerInfo represents a short summary of the `snap` sub-protocol metadata known // snapPeerInfo represents a short summary of the `snap` sub-protocol metadata known

View file

@ -175,12 +175,26 @@ var eth68 = map[uint64]msgHandler{
BlockHeadersMsg: handleBlockHeaders, BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies, GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies, BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts, GetReceiptsMsg: handleGetReceipts68,
ReceiptsMsg: handleReceipts, ReceiptsMsg: handleReceipts[*ReceiptList68],
GetPooledTransactionsMsg: handleGetPooledTransactions, GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions, PooledTransactionsMsg: handlePooledTransactions,
} }
var eth69 = map[uint64]msgHandler{
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts69,
ReceiptsMsg: handleReceipts[*ReceiptList69],
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
BlockRangeUpdateMsg: handleBlockRangeUpdate,
}
// handleMessage is invoked whenever an inbound message is received from a remote // handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error. // peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error { func handleMessage(backend Backend, peer *Peer) error {
@ -194,7 +208,14 @@ func handleMessage(backend Backend, peer *Peer) error {
} }
defer msg.Discard() defer msg.Discard()
var handlers = eth68 var handlers map[uint64]msgHandler
if peer.version == ETH68 {
handlers = eth68
} else if peer.version == ETH69 {
handlers = eth69
} else {
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
}
// Track the amount of time it takes to serve the request and run the handler // Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled() { if metrics.Enabled() {

View file

@ -529,22 +529,23 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
// Collect the hashes to request, and the response to expect // Collect the hashes to request, and the response to expect
var ( var (
hashes []common.Hash hashes []common.Hash
receipts [][]*types.Receipt receipts []*ReceiptList68
) )
for i := uint64(0); i <= backend.chain.CurrentBlock().Number.Uint64(); i++ { for i := uint64(0); i <= backend.chain.CurrentBlock().Number.Uint64(); i++ {
block := backend.chain.GetBlockByNumber(i) block := backend.chain.GetBlockByNumber(i)
hashes = append(hashes, block.Hash()) hashes = append(hashes, block.Hash())
receipts = append(receipts, backend.chain.GetReceiptsByHash(block.Hash())) trs := backend.chain.GetReceiptsByHash(block.Hash())
receipts = append(receipts, NewReceiptList68(trs))
} }
// Send the hash request and verify the response // Send the hash request and verify the response
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket{ p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket{
RequestId: 123, RequestId: 123,
GetReceiptsRequest: hashes, GetReceiptsRequest: hashes,
}) })
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket{ if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket[*ReceiptList68]{
RequestId: 123, RequestId: 123,
ReceiptsResponse: receipts, List: receipts,
}); err != nil { }); err != nil {
t.Errorf("receipts mismatch: %v", err) t.Errorf("receipts mismatch: %v", err)
} }
@ -612,10 +613,10 @@ func setup() (*testBackend, *testPeer) {
} }
func FuzzEthProtocolHandlers(f *testing.F) { func FuzzEthProtocolHandlers(f *testing.F) {
handlers := eth68 handlers := eth69
backend, peer := setup() backend, peer := setup()
f.Fuzz(func(t *testing.T, code byte, msg []byte) { f.Fuzz(func(t *testing.T, code byte, msg []byte) {
handler := handlers[uint64(code)%protocolLengths[ETH68]] handler := handlers[uint64(code)%protocolLengths[ETH69]]
if handler == nil { if handler == nil {
return return
} }

View file

@ -20,20 +20,25 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/tracker"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )
// requestTracker is a singleton tracker for eth/66 and newer request times.
var requestTracker = tracker.New(ProtocolName, 5*time.Minute)
func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query // Decode the complex header query
var query GetBlockHeadersPacket var query GetBlockHeadersPacket
if err := msg.Decode(&query); err != nil { if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
response := ServiceGetBlockHeadersQuery(backend.Chain(), query.GetBlockHeadersRequest, peer) response := ServiceGetBlockHeadersQuery(backend.Chain(), query.GetBlockHeadersRequest, peer)
return peer.ReplyBlockHeadersRLP(query.RequestId, response) return peer.ReplyBlockHeadersRLP(query.RequestId, response)
@ -216,7 +221,7 @@ func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message // Decode the block body retrieval message
var query GetBlockBodiesPacket var query GetBlockBodiesPacket
if err := msg.Decode(&query); err != nil { if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
response := ServiceGetBlockBodiesQuery(backend.Chain(), query.GetBlockBodiesRequest) response := ServiceGetBlockBodiesQuery(backend.Chain(), query.GetBlockBodiesRequest)
return peer.ReplyBlockBodiesRLP(query.RequestId, response) return peer.ReplyBlockBodiesRLP(query.RequestId, response)
@ -243,19 +248,29 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
return bodies return bodies
} }
func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error { func handleGetReceipts68(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message // Decode the block receipts retrieval message
var query GetReceiptsPacket var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil { if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
response := ServiceGetReceiptsQuery(backend.Chain(), query.GetReceiptsRequest) response := ServiceGetReceiptsQuery68(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response) return peer.ReplyReceiptsRLP(query.RequestId, response)
} }
// ServiceGetReceiptsQuery assembles the response to a receipt query. It is func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return err
}
response := serviceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
}
// ServiceGetReceiptsQuery68 assembles the response to a receipt query. It is
// exposed to allow external packages to test protocol behavior. // exposed to allow external packages to test protocol behavior.
func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue { func ServiceGetReceiptsQuery68(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
// Gather state data until the fetch or network limits is reached // Gather state data until the fetch or network limits is reached
var ( var (
bytes int bytes int
@ -267,19 +282,62 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) [
break break
} }
// Retrieve the requested block's receipts // Retrieve the requested block's receipts
results := chain.GetReceiptsByHash(hash) results := chain.GetReceiptsRLP(hash)
if results == nil { if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue continue
} }
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
log.Error("Failed to encode receipt", "err", err)
} else { } else {
receipts = append(receipts, encoded) body := chain.GetBodyRLP(hash)
bytes += len(encoded) if body == nil {
continue
} }
var err error
results, err = blockReceiptsToNetwork68(results, body)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}
}
receipts = append(receipts, results)
bytes += len(results)
}
return receipts
}
// serviceGetReceiptsQuery69 assembles the response to a receipt query.
// It does not send the bloom filters for the receipts
func serviceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)
for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
}
// Retrieve the requested block's receipts
results := chain.GetReceiptsRLP(hash)
if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
} else {
body := chain.GetBodyRLP(hash)
if body == nil {
continue
}
var err error
results, err = blockReceiptsToNetwork69(results, body)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}
}
receipts = append(receipts, results)
bytes += len(results)
} }
return receipts return receipts
} }
@ -296,7 +354,7 @@ func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests // A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket) res := new(BlockHeadersPacket)
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
metadata := func() interface{} { metadata := func() interface{} {
hashes := make([]common.Hash, len(res.BlockHeadersRequest)) hashes := make([]common.Hash, len(res.BlockHeadersRequest))
@ -316,7 +374,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// A batch of block bodies arrived to one of our previous requests // A batch of block bodies arrived to one of our previous requests
res := new(BlockBodiesPacket) res := new(BlockBodiesPacket)
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
metadata := func() interface{} { metadata := func() interface{} {
var ( var (
@ -341,24 +399,35 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
}, metadata) }, metadata)
} }
func handleReceipts(backend Backend, msg Decoder, peer *Peer) error { func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests // A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket) res := new(ReceiptsPacket[L])
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
// Assign temporary hashing buffer to each list item, the same buffer is shared
// between all receipt list instances.
buffers := new(receiptListBuffers)
for i := range res.List {
res.List[i].setBuffers(buffers)
}
metadata := func() interface{} { metadata := func() interface{} {
hasher := trie.NewStackTrie(nil) hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(res.ReceiptsResponse)) hashes := make([]common.Hash, len(res.List))
for i, receipt := range res.ReceiptsResponse { for i := range res.List {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher) hashes[i] = types.DeriveSha(res.List[i], hasher)
} }
return hashes return hashes
} }
var enc ReceiptsRLPResponse
for i := range res.List {
enc = append(enc, res.List[i].EncodeForStorage())
}
return peer.dispatchResponse(&Response{ return peer.dispatchResponse(&Response{
id: res.RequestId, id: res.RequestId,
code: ReceiptsMsg, code: ReceiptsMsg,
Res: &res.ReceiptsResponse, Res: &enc,
}, metadata) }, metadata)
} }
@ -370,10 +439,10 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
} }
ann := new(NewPooledTransactionHashesPacket) ann := new(NewPooledTransactionHashesPacket)
if err := msg.Decode(ann); err != nil { if err := msg.Decode(ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) { if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes)) return fmt.Errorf("NewPooledTransactionHashes: invalid len of fields in %v %v %v", len(ann.Hashes), len(ann.Types), len(ann.Sizes))
} }
// Schedule all the unknown hashes for retrieval // Schedule all the unknown hashes for retrieval
for _, hash := range ann.Hashes { for _, hash := range ann.Hashes {
@ -386,7 +455,7 @@ func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error
// Decode the pooled transactions retrieval message // Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket var query GetPooledTransactionsPacket
if err := msg.Decode(&query); err != nil { if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest) hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest)
return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs) return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs)
@ -423,12 +492,12 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions can be processed, parse all of them and deliver to the pool // Transactions can be processed, parse all of them and deliver to the pool
var txs TransactionsPacket var txs TransactionsPacket
if err := msg.Decode(&txs); err != nil { if err := msg.Decode(&txs); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
for i, tx := range txs { for i, tx := range txs {
// Validate and mark the remote transaction // Validate and mark the remote transaction
if tx == nil { if tx == nil {
return fmt.Errorf("%w: transaction %d is nil", errDecode, i) return fmt.Errorf("Transactions: transaction %d is nil", i)
} }
peer.markTransaction(tx.Hash()) peer.markTransaction(tx.Hash())
} }
@ -443,12 +512,12 @@ func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions can be processed, parse all of them and deliver to the pool // Transactions can be processed, parse all of them and deliver to the pool
var txs PooledTransactionsPacket var txs PooledTransactionsPacket
if err := msg.Decode(&txs); err != nil { if err := msg.Decode(&txs); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return err
} }
for i, tx := range txs.PooledTransactionsResponse { for i, tx := range txs.PooledTransactionsResponse {
// Validate and mark the remote transaction // Validate and mark the remote transaction
if tx == nil { if tx == nil {
return fmt.Errorf("%w: transaction %d is nil", errDecode, i) return fmt.Errorf("PooledTransactions: transaction %d is nil", i)
} }
peer.markTransaction(tx.Hash()) peer.markTransaction(tx.Hash())
} }
@ -456,3 +525,16 @@ func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &txs.PooledTransactionsResponse) return backend.Handle(peer, &txs.PooledTransactionsResponse)
} }
func handleBlockRangeUpdate(backend Backend, msg Decoder, peer *Peer) error {
var update BlockRangeUpdatePacket
if err := msg.Decode(&update); err != nil {
return err
}
if err := update.Validate(); err != nil {
return err
}
// We don't do anything with these messages for now, just store them on the peer.
peer.lastRange.Store(&update)
return nil
}

View file

@ -19,10 +19,10 @@ package eth
import ( import (
"errors" "errors"
"fmt" "fmt"
"math/big"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -36,44 +36,122 @@ const (
// Handshake executes the eth protocol handshake, negotiating version number, // Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks. // network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(network uint64, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error { func (p *Peer) Handshake(networkID uint64, chain *core.BlockChain, rangeMsg BlockRangeUpdatePacket) error {
// Send out own handshake in a new thread switch p.version {
case ETH69:
return p.handshake69(networkID, chain, rangeMsg)
case ETH68:
return p.handshake68(networkID, chain)
default:
return errors.New("unsupported protocol version")
}
}
func (p *Peer) handshake68(networkID uint64, chain *core.BlockChain) error {
var (
genesis = chain.Genesis()
latest = chain.CurrentBlock()
forkID = forkid.NewID(chain.Config(), genesis, latest.Number.Uint64(), latest.Time)
forkFilter = forkid.NewFilter(chain)
)
errc := make(chan error, 2) errc := make(chan error, 2)
var status StatusPacket // safe to read after two values have been received from errc
go func() { go func() {
errc <- p2p.Send(p.rw, StatusMsg, &StatusPacket{ pkt := &StatusPacket68{
ProtocolVersion: uint32(p.version), ProtocolVersion: uint32(p.version),
NetworkID: network, NetworkID: networkID,
TD: new(big.Int), // unknown for post-merge tail=pruned networks Head: latest.Hash(),
Head: head, Genesis: genesis.Hash(),
Genesis: genesis,
ForkID: forkID, ForkID: forkID,
}) }
errc <- p2p.Send(p.rw, StatusMsg, pkt)
}() }()
var status StatusPacket68 // safe to read after two values have been received from errc
go func() { go func() {
errc <- p.readStatus(network, &status, genesis, forkFilter) errc <- p.readStatus68(networkID, &status, genesis.Hash(), forkFilter)
}() }()
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop() return waitForHandshake(errc, p)
for i := 0; i < 2; i++ { }
select {
case err := <-errc: func (p *Peer) readStatus68(networkID uint64, status *StatusPacket68, genesis common.Hash, forkFilter forkid.Filter) error {
if err != nil { if err := p.readStatusMsg(status); err != nil {
markError(p, err)
return err return err
} }
case <-timeout.C: if status.NetworkID != networkID {
markError(p, p2p.DiscReadTimeout) return fmt.Errorf("%w: %d (!= %d)", errNetworkIDMismatch, status.NetworkID, networkID)
return p2p.DiscReadTimeout
} }
if uint(status.ProtocolVersion) != p.version {
return fmt.Errorf("%w: %d (!= %d)", errProtocolVersionMismatch, status.ProtocolVersion, p.version)
}
if status.Genesis != genesis {
return fmt.Errorf("%w: %x (!= %x)", errGenesisMismatch, status.Genesis, genesis)
}
if err := forkFilter(status.ForkID); err != nil {
return fmt.Errorf("%w: %v", errForkIDRejected, err)
} }
return nil return nil
} }
// readStatus reads the remote handshake message. func (p *Peer) handshake69(networkID uint64, chain *core.BlockChain, rangeMsg BlockRangeUpdatePacket) error {
func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.Hash, forkFilter forkid.Filter) error { var (
genesis = chain.Genesis()
latest = chain.CurrentBlock()
forkID = forkid.NewID(chain.Config(), genesis, latest.Number.Uint64(), latest.Time)
forkFilter = forkid.NewFilter(chain)
)
errc := make(chan error, 2)
go func() {
pkt := &StatusPacket69{
ProtocolVersion: uint32(p.version),
NetworkID: networkID,
Genesis: genesis.Hash(),
ForkID: forkID,
EarliestBlock: rangeMsg.EarliestBlock,
LatestBlock: rangeMsg.LatestBlock,
LatestBlockHash: rangeMsg.LatestBlockHash,
}
errc <- p2p.Send(p.rw, StatusMsg, pkt)
}()
var status StatusPacket69 // safe to read after two values have been received from errc
go func() {
errc <- p.readStatus69(networkID, &status, genesis.Hash(), forkFilter)
}()
return waitForHandshake(errc, p)
}
func (p *Peer) readStatus69(networkID uint64, status *StatusPacket69, genesis common.Hash, forkFilter forkid.Filter) error {
if err := p.readStatusMsg(status); err != nil {
return err
}
if status.NetworkID != networkID {
return fmt.Errorf("%w: %d (!= %d)", errNetworkIDMismatch, status.NetworkID, networkID)
}
if uint(status.ProtocolVersion) != p.version {
return fmt.Errorf("%w: %d (!= %d)", errProtocolVersionMismatch, status.ProtocolVersion, p.version)
}
if status.Genesis != genesis {
return fmt.Errorf("%w: %x (!= %x)", errGenesisMismatch, status.Genesis, genesis)
}
if err := forkFilter(status.ForkID); err != nil {
return fmt.Errorf("%w: %v", errForkIDRejected, err)
}
// Handle initial block range.
initRange := &BlockRangeUpdatePacket{
EarliestBlock: status.EarliestBlock,
LatestBlock: status.LatestBlock,
LatestBlockHash: status.LatestBlockHash,
}
if err := initRange.Validate(); err != nil {
return fmt.Errorf("%w: %v", errInvalidBlockRange, err)
}
p.lastRange.Store(initRange)
return nil
}
// readStatusMsg reads the first message on the connection.
func (p *Peer) readStatusMsg(dst any) error {
msg, err := p.rw.ReadMsg() msg, err := p.rw.ReadMsg()
if err != nil { if err != nil {
return err return err
@ -84,21 +162,26 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
if msg.Size > maxMessageSize { if msg.Size > maxMessageSize {
return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize)
} }
// Decode the handshake and make sure everything matches if err := msg.Decode(dst); err != nil {
if err := msg.Decode(&status); err != nil { return err
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
if status.NetworkID != network { return nil
return fmt.Errorf("%w: %d (!= %d)", errNetworkIDMismatch, status.NetworkID, network) }
func waitForHandshake(errc <-chan error, p *Peer) error {
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
for range 2 {
select {
case err := <-errc:
if err != nil {
markError(p, err)
return err
} }
if uint(status.ProtocolVersion) != p.version { case <-timeout.C:
return fmt.Errorf("%w: %d (!= %d)", errProtocolVersionMismatch, status.ProtocolVersion, p.version) markError(p, p2p.DiscReadTimeout)
return p2p.DiscReadTimeout
} }
if status.Genesis != genesis {
return fmt.Errorf("%w: %x (!= %x)", errGenesisMismatch, status.Genesis, genesis)
}
if err := forkFilter(status.ForkID); err != nil {
return fmt.Errorf("%w: %v", errForkIDRejected, err)
} }
return nil return nil
} }
@ -124,3 +207,14 @@ func markError(p *Peer, err error) {
m.peerError.Mark(1) m.peerError.Mark(1)
} }
} }
// Validate checks basic validity of a block range announcement.
func (p *BlockRangeUpdatePacket) Validate() error {
if p.EarliestBlock > p.LatestBlock {
return errors.New("earliest > latest")
}
if p.LatestBlockHash == (common.Hash{}) {
return errors.New("zero latest hash")
}
return nil
}

View file

@ -52,19 +52,19 @@ func testHandshake(t *testing.T, protocol uint) {
want: errNoStatusMsg, want: errNoStatusMsg,
}, },
{ {
code: StatusMsg, data: StatusPacket{10, 1, new(big.Int), head.Hash(), genesis.Hash(), forkID}, code: StatusMsg, data: StatusPacket68{10, 1, new(big.Int), head.Hash(), genesis.Hash(), forkID},
want: errProtocolVersionMismatch, want: errProtocolVersionMismatch,
}, },
{ {
code: StatusMsg, data: StatusPacket{uint32(protocol), 999, new(big.Int), head.Hash(), genesis.Hash(), forkID}, code: StatusMsg, data: StatusPacket68{uint32(protocol), 999, new(big.Int), head.Hash(), genesis.Hash(), forkID},
want: errNetworkIDMismatch, want: errNetworkIDMismatch,
}, },
{ {
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, new(big.Int), head.Hash(), common.Hash{3}, forkID}, code: StatusMsg, data: StatusPacket68{uint32(protocol), 1, new(big.Int), head.Hash(), common.Hash{3}, forkID},
want: errGenesisMismatch, want: errGenesisMismatch,
}, },
{ {
code: StatusMsg, data: StatusPacket{uint32(protocol), 1, new(big.Int), head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}}, code: StatusMsg, data: StatusPacket68{uint32(protocol), 1, new(big.Int), head.Hash(), genesis.Hash(), forkid.ID{Hash: [4]byte{0x00, 0x01, 0x02, 0x03}}},
want: errForkIDRejected, want: errForkIDRejected,
}, },
} }
@ -80,7 +80,7 @@ func testHandshake(t *testing.T, protocol uint) {
// Send the junk test with one peer, check the handshake failure // Send the junk test with one peer, check the handshake failure
go p2p.Send(app, test.code, test.data) go p2p.Send(app, test.code, test.data)
err := peer.Handshake(1, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain)) err := peer.Handshake(1, backend.chain, BlockRangeUpdatePacket{})
if err == nil { if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want) t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) { } else if !errors.Is(err, test.want) {

View file

@ -18,6 +18,7 @@ package eth
import ( import (
"math/rand" "math/rand"
"sync/atomic"
mapset "github.com/deckarep/golang-set/v2" mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -47,6 +48,7 @@ type Peer struct {
*p2p.Peer // The embedded P2P package peer *p2p.Peer // The embedded P2P package peer
rw p2p.MsgReadWriter // Input/output streams for snap rw p2p.MsgReadWriter // Input/output streams for snap
version uint // Protocol version negotiated version uint // Protocol version negotiated
lastRange atomic.Pointer[BlockRangeUpdatePacket]
txpool TxPool // Transaction pool used by the broadcasters for liveness checks txpool TxPool // Transaction pool used by the broadcasters for liveness checks
knownTxs *knownCache // Set of transaction hashes known to be known by this peer knownTxs *knownCache // Set of transaction hashes known to be known by this peer
@ -102,6 +104,12 @@ func (p *Peer) Version() uint {
return p.version return p.version
} }
// BlockRange returns the latest announced block range.
// This will be nil for peers below protocol version eth/69.
func (p *Peer) BlockRange() *BlockRangeUpdatePacket {
return p.lastRange.Load()
}
// KnownTransaction returns whether peer is known to already have a transaction. // KnownTransaction returns whether peer is known to already have a transaction.
func (p *Peer) KnownTransaction(hash common.Hash) bool { func (p *Peer) KnownTransaction(hash common.Hash) bool {
return p.knownTxs.Contains(hash) return p.knownTxs.Contains(hash)
@ -343,6 +351,14 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
}) })
} }
// SendBlockRangeUpdate sends a notification about our available block range to the peer.
func (p *Peer) SendBlockRangeUpdate(msg BlockRangeUpdatePacket) error {
if p.version < ETH69 {
return nil
}
return p2p.Send(p.rw, BlockRangeUpdateMsg, &msg)
}
// knownCache is a cache for known hashes. // knownCache is a cache for known hashes.
type knownCache struct { type knownCache struct {
hashes mapset.Set[common.Hash] hashes mapset.Set[common.Hash]

View file

@ -31,6 +31,7 @@ import (
// Constants to match up protocol versions and messages // Constants to match up protocol versions and messages
const ( const (
ETH68 = 68 ETH68 = 68
ETH69 = 69
) )
// ProtocolName is the official short name of the `eth` protocol used during // ProtocolName is the official short name of the `eth` protocol used during
@ -39,11 +40,11 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first // ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary). // is primary).
var ProtocolVersions = []uint{ETH68} var ProtocolVersions = []uint{ETH69, ETH68}
// protocolLengths are the number of implemented message corresponding to // protocolLengths are the number of implemented message corresponding to
// different protocol versions. // different protocol versions.
var protocolLengths = map[uint]uint64{ETH68: 17} var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 18}
// maxMessageSize is the maximum cap on the size of a protocol message. // maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024 const maxMessageSize = 10 * 1024 * 1024
@ -62,17 +63,19 @@ const (
PooledTransactionsMsg = 0x0a PooledTransactionsMsg = 0x0a
GetReceiptsMsg = 0x0f GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10 ReceiptsMsg = 0x10
BlockRangeUpdateMsg = 0x11
) )
var ( var (
errNoStatusMsg = errors.New("no status message")
errMsgTooLarge = errors.New("message too long") errMsgTooLarge = errors.New("message too long")
errDecode = errors.New("invalid message")
errInvalidMsgCode = errors.New("invalid message code") errInvalidMsgCode = errors.New("invalid message code")
errProtocolVersionMismatch = errors.New("protocol version mismatch") errProtocolVersionMismatch = errors.New("protocol version mismatch")
// handshake errors
errNoStatusMsg = errors.New("no status message")
errNetworkIDMismatch = errors.New("network ID mismatch") errNetworkIDMismatch = errors.New("network ID mismatch")
errGenesisMismatch = errors.New("genesis mismatch") errGenesisMismatch = errors.New("genesis mismatch")
errForkIDRejected = errors.New("fork ID rejected") errForkIDRejected = errors.New("fork ID rejected")
errInvalidBlockRange = errors.New("invalid block range in status")
) )
// Packet represents a p2p message in the `eth` protocol. // Packet represents a p2p message in the `eth` protocol.
@ -82,7 +85,7 @@ type Packet interface {
} }
// StatusPacket is the network packet for the status message. // StatusPacket is the network packet for the status message.
type StatusPacket struct { type StatusPacket68 struct {
ProtocolVersion uint32 ProtocolVersion uint32
NetworkID uint64 NetworkID uint64
TD *big.Int TD *big.Int
@ -91,6 +94,18 @@ type StatusPacket struct {
ForkID forkid.ID ForkID forkid.ID
} }
// StatusPacket69 is the network packet for the status message.
type StatusPacket69 struct {
ProtocolVersion uint32
NetworkID uint64
Genesis common.Hash
ForkID forkid.ID
// initial available block range
EarliestBlock uint64
LatestBlock uint64
LatestBlockHash common.Hash
}
// NewBlockHashesPacket is the network packet for the block announcements. // NewBlockHashesPacket is the network packet for the block announcements.
type NewBlockHashesPacket []struct { type NewBlockHashesPacket []struct {
Hash common.Hash // Hash of one particular block being announced Hash common.Hash // Hash of one particular block being announced
@ -250,13 +265,21 @@ type GetReceiptsPacket struct {
} }
// ReceiptsResponse is the network packet for block receipts distribution. // ReceiptsResponse is the network packet for block receipts distribution.
type ReceiptsResponse [][]*types.Receipt type ReceiptsResponse []types.Receipts
// ReceiptsList is a type constraint for block receceipt list types.
type ReceiptsList interface {
*ReceiptList68 | *ReceiptList69
setBuffers(*receiptListBuffers)
EncodeForStorage() rlp.RawValue
types.DerivableList
}
// ReceiptsPacket is the network packet for block receipts distribution with // ReceiptsPacket is the network packet for block receipts distribution with
// request ID wrapping. // request ID wrapping.
type ReceiptsPacket struct { type ReceiptsPacket[L ReceiptsList] struct {
RequestId uint64 RequestId uint64
ReceiptsResponse List []L
} }
// ReceiptsRLPResponse is used for receipts, when we already have it encoded // ReceiptsRLPResponse is used for receipts, when we already have it encoded
@ -304,8 +327,18 @@ type PooledTransactionsRLPPacket struct {
PooledTransactionsRLPResponse PooledTransactionsRLPResponse
} }
func (*StatusPacket) Name() string { return "Status" } // BlockRangeUpdatePacket is an announcement of the node's available block range.
func (*StatusPacket) Kind() byte { return StatusMsg } type BlockRangeUpdatePacket struct {
EarliestBlock uint64
LatestBlock uint64
LatestBlockHash common.Hash
}
func (*StatusPacket68) Name() string { return "Status" }
func (*StatusPacket68) Kind() byte { return StatusMsg }
func (*StatusPacket69) Name() string { return "Status" }
func (*StatusPacket69) Kind() byte { return StatusMsg }
func (*NewBlockHashesPacket) Name() string { return "NewBlockHashes" } func (*NewBlockHashesPacket) Name() string { return "NewBlockHashes" }
func (*NewBlockHashesPacket) Kind() byte { return NewBlockHashesMsg } func (*NewBlockHashesPacket) Kind() byte { return NewBlockHashesMsg }
@ -342,3 +375,9 @@ func (*GetReceiptsRequest) Kind() byte { return GetReceiptsMsg }
func (*ReceiptsResponse) Name() string { return "Receipts" } func (*ReceiptsResponse) Name() string { return "Receipts" }
func (*ReceiptsResponse) Kind() byte { return ReceiptsMsg } func (*ReceiptsResponse) Kind() byte { return ReceiptsMsg }
func (*ReceiptsRLPResponse) Name() string { return "Receipts" }
func (*ReceiptsRLPResponse) Kind() byte { return ReceiptsMsg }
func (*BlockRangeUpdatePacket) Name() string { return "BlockRangeUpdate" }
func (*BlockRangeUpdatePacket) Kind() byte { return BlockRangeUpdateMsg }

View file

@ -75,7 +75,7 @@ func TestEmptyMessages(t *testing.T) {
// All empty messages encodes to the same format // All empty messages encodes to the same format
want := common.FromHex("c4820457c0") want := common.FromHex("c4820457c0")
for i, msg := range []interface{}{ for i, msg := range []any{
// Headers // Headers
GetBlockHeadersPacket{1111, nil}, GetBlockHeadersPacket{1111, nil},
BlockHeadersPacket{1111, nil}, BlockHeadersPacket{1111, nil},
@ -85,7 +85,6 @@ func TestEmptyMessages(t *testing.T) {
BlockBodiesRLPPacket{1111, nil}, BlockBodiesRLPPacket{1111, nil},
// Receipts // Receipts
GetReceiptsPacket{1111, nil}, GetReceiptsPacket{1111, nil},
ReceiptsPacket{1111, nil},
// Transactions // Transactions
GetPooledTransactionsPacket{1111, nil}, GetPooledTransactionsPacket{1111, nil},
PooledTransactionsPacket{1111, nil}, PooledTransactionsPacket{1111, nil},
@ -99,7 +98,8 @@ func TestEmptyMessages(t *testing.T) {
BlockBodiesRLPPacket{1111, BlockBodiesRLPResponse([]rlp.RawValue{})}, BlockBodiesRLPPacket{1111, BlockBodiesRLPResponse([]rlp.RawValue{})},
// Receipts // Receipts
GetReceiptsPacket{1111, GetReceiptsRequest([]common.Hash{})}, GetReceiptsPacket{1111, GetReceiptsRequest([]common.Hash{})},
ReceiptsPacket{1111, ReceiptsResponse([][]*types.Receipt{})}, ReceiptsPacket[*ReceiptList68]{1111, []*ReceiptList68{}},
ReceiptsPacket[*ReceiptList69]{1111, []*ReceiptList69{}},
// Transactions // Transactions
GetPooledTransactionsPacket{1111, GetPooledTransactionsRequest([]common.Hash{})}, GetPooledTransactionsPacket{1111, GetPooledTransactionsRequest([]common.Hash{})},
PooledTransactionsPacket{1111, PooledTransactionsResponse([]*types.Transaction{})}, PooledTransactionsPacket{1111, PooledTransactionsResponse([]*types.Transaction{})},
@ -168,7 +168,7 @@ func TestMessages(t *testing.T) {
receipts = []*types.Receipt{ receipts = []*types.Receipt{
{ {
Status: types.ReceiptStatusFailed, Status: types.ReceiptStatusFailed,
CumulativeGasUsed: 1, CumulativeGasUsed: 333,
Logs: []*types.Log{ Logs: []*types.Log{
{ {
Address: common.BytesToAddress([]byte{0x11}), Address: common.BytesToAddress([]byte{0x11}),
@ -176,11 +176,21 @@ func TestMessages(t *testing.T) {
Data: []byte{0x01, 0x00, 0xff}, Data: []byte{0x01, 0x00, 0xff},
}, },
}, },
TxHash: hashes[0], },
ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}), {
GasUsed: 111111, Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 444,
Logs: []*types.Log{
{
Address: common.BytesToAddress([]byte{0x22}),
Topics: []common.Hash{common.HexToHash("05668"), common.HexToHash("9773")},
Data: []byte{0x02, 0x0f, 0x0f, 0x0f, 0x06, 0x08},
},
},
}, },
} }
miniDeriveFields(receipts[0], 0)
miniDeriveFields(receipts[1], 1)
rlpData, err := rlp.EncodeToBytes(receipts) rlpData, err := rlp.EncodeToBytes(receipts)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -221,12 +231,17 @@ func TestMessages(t *testing.T) {
common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"), common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"),
}, },
{ {
ReceiptsPacket{1111, ReceiptsResponse([][]*types.Receipt{receipts})}, ReceiptsPacket[*ReceiptList68]{1111, []*ReceiptList68{NewReceiptList68(receipts)}},
common.FromHex("f90172820457f9016cf90169f901668001b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ff"), common.FromHex("f902e6820457f902e0f902ddf901688082014db9010000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000014000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000004000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ffb9016f01f9016b018201bcb9010000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000001000000000000000000000000000000000000000000000000040000000000000000000000000004000000000000000000000000000000000000000000000000000000008000400000000000000000000000000000000000000000000000000000000000000000000000000000040f862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"),
}, },
{ {
// Identical to the eth/68 encoding above.
ReceiptsRLPPacket{1111, ReceiptsRLPResponse([]rlp.RawValue{receiptsRlp})}, ReceiptsRLPPacket{1111, ReceiptsRLPResponse([]rlp.RawValue{receiptsRlp})},
common.FromHex("f90172820457f9016cf90169f901668001b9010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ff"), common.FromHex("f902e6820457f902e0f902ddf901688082014db9010000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000014000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000004000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ffb9016f01f9016b018201bcb9010000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000001000000000000000000000000000000000000000000000000040000000000000000000000000004000000000000000000000000000000000000000000000000000000008000400000000000000000000000000000000000000000000000000000000000000000000000000000040f862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"),
},
{
ReceiptsPacket[*ReceiptList69]{1111, []*ReceiptList69{NewReceiptList69(receipts)}},
common.FromHex("f8da820457f8d5f8d3f866808082014df85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100fff86901018201bcf862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"),
}, },
{ {
GetPooledTransactionsPacket{1111, GetPooledTransactionsRequest(hashes)}, GetPooledTransactionsPacket{1111, GetPooledTransactionsRequest(hashes)},

View file

@ -0,0 +1,462 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"bytes"
"fmt"
"io"
"iter"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
// This is just a sanity limit for the size of a single receipt.
const maxReceiptSize = 16 * 1024 * 1024
// Receipt is the representation of receipts for networking purposes.
type Receipt struct {
TxType byte
PostStateOrStatus []byte
GasUsed uint64
Logs rlp.RawValue
}
func newReceipt(tr *types.Receipt) Receipt {
r := Receipt{TxType: tr.Type, GasUsed: tr.CumulativeGasUsed}
if tr.PostState != nil {
r.PostStateOrStatus = tr.PostState
} else {
r.PostStateOrStatus = new(big.Int).SetUint64(tr.Status).Bytes()
}
r.Logs, _ = rlp.EncodeToBytes(tr.Logs)
return r
}
// decode68 parses a receipt in the eth/68 network encoding.
func (r *Receipt) decode68(buf *receiptListBuffers, s *rlp.Stream) error {
k, size, err := s.Kind()
if err != nil {
return err
}
*r = Receipt{}
if k == rlp.List {
// Legacy receipt.
return r.decodeInnerList(s, false, true)
}
// Typed receipt.
if size < 2 || size > maxReceiptSize {
return fmt.Errorf("invalid receipt size %d", size)
}
buf.tmp.Reset()
buf.tmp.Grow(int(size))
payload := buf.tmp.Bytes()[:int(size)]
if err := s.ReadBytes(payload); err != nil {
return err
}
r.TxType = payload[0]
s2 := rlp.NewStream(bytes.NewReader(payload[1:]), 0)
return r.decodeInnerList(s2, false, true)
}
// decode69 parses a receipt in the eth/69 network encoding.
func (r *Receipt) decode69(s *rlp.Stream) error {
*r = Receipt{}
return r.decodeInnerList(s, true, false)
}
// decodeDatabase parses a receipt in the basic database encoding.
func (r *Receipt) decodeDatabase(txType byte, s *rlp.Stream) error {
*r = Receipt{TxType: txType}
return r.decodeInnerList(s, false, false)
}
func (r *Receipt) decodeInnerList(s *rlp.Stream, readTxType, readBloom bool) error {
_, err := s.List()
if err != nil {
return err
}
if readTxType {
r.TxType, err = s.Uint8()
if err != nil {
return fmt.Errorf("invalid txType: %w", err)
}
}
r.PostStateOrStatus, err = s.Bytes()
if err != nil {
return fmt.Errorf("invalid postStateOrStatus: %w", err)
}
r.GasUsed, err = s.Uint64()
if err != nil {
return fmt.Errorf("invalid gasUsed: %w", err)
}
if readBloom {
var b types.Bloom
if err := s.ReadBytes(b[:]); err != nil {
return fmt.Errorf("invalid bloom: %v", err)
}
}
r.Logs, err = s.Raw()
if err != nil {
return fmt.Errorf("invalid logs: %w", err)
}
return s.ListEnd()
}
// encodeForStorage produces the the storage encoding, i.e. the result matches
// the RLP encoding of types.ReceiptForStorage.
func (r *Receipt) encodeForStorage(w *rlp.EncoderBuffer) {
list := w.List()
w.WriteBytes(r.PostStateOrStatus)
w.WriteUint64(r.GasUsed)
w.Write(r.Logs)
w.ListEnd(list)
}
// encodeForNetwork68 produces the eth/68 network protocol encoding of a receipt.
// Note this recomputes the bloom filter of the receipt.
func (r *Receipt) encodeForNetwork68(buf *receiptListBuffers, w *rlp.EncoderBuffer) {
writeInner := func(w *rlp.EncoderBuffer) {
list := w.List()
w.WriteBytes(r.PostStateOrStatus)
w.WriteUint64(r.GasUsed)
bloom := r.bloom(&buf.bloom)
w.WriteBytes(bloom[:])
w.Write(r.Logs)
w.ListEnd(list)
}
if r.TxType == 0 {
writeInner(w)
} else {
buf.tmp.Reset()
buf.tmp.WriteByte(r.TxType)
buf.enc.Reset(&buf.tmp)
writeInner(&buf.enc)
buf.enc.Flush()
w.WriteBytes(buf.tmp.Bytes())
}
}
// encodeForNetwork69 produces the eth/69 network protocol encoding of a receipt.
func (r *Receipt) encodeForNetwork69(w *rlp.EncoderBuffer) {
list := w.List()
w.WriteUint64(uint64(r.TxType))
w.WriteBytes(r.PostStateOrStatus)
w.WriteUint64(r.GasUsed)
w.Write(r.Logs)
w.ListEnd(list)
}
// encodeForHash encodes a receipt for the block receiptsRoot derivation.
func (r *Receipt) encodeForHash(buf *receiptListBuffers, out *bytes.Buffer) {
// For typed receipts, add the tx type.
if r.TxType != 0 {
out.WriteByte(r.TxType)
}
// Encode list = [postStateOrStatus, gasUsed, bloom, logs].
w := &buf.enc
w.Reset(out)
l := w.List()
w.WriteBytes(r.PostStateOrStatus)
w.WriteUint64(r.GasUsed)
bloom := r.bloom(&buf.bloom)
w.WriteBytes(bloom[:])
w.Write(r.Logs)
w.ListEnd(l)
w.Flush()
}
// bloom computes the bloom filter of the receipt.
// Note this doesn't check the validity of encoding, and will produce an invalid filter
// for invalid input. This is acceptable for the purpose of this function, which is
// recomputing the receipt hash.
func (r *Receipt) bloom(buffer *[6]byte) types.Bloom {
var b types.Bloom
logsIter, err := rlp.NewListIterator(r.Logs)
if err != nil {
return b
}
for logsIter.Next() {
log, _, _ := rlp.SplitList(logsIter.Value())
address, log, _ := rlp.SplitString(log)
b.AddWithBuffer(address, buffer)
topicsIter, err := rlp.NewListIterator(log)
if err != nil {
return b
}
for topicsIter.Next() {
topic, _, _ := rlp.SplitString(topicsIter.Value())
b.AddWithBuffer(topic, buffer)
}
}
return b
}
type receiptListBuffers struct {
enc rlp.EncoderBuffer
bloom [6]byte
tmp bytes.Buffer
}
func initBuffers(buf **receiptListBuffers) {
if *buf == nil {
*buf = new(receiptListBuffers)
}
}
// encodeForStorage encodes a list of receipts for the database.
func (buf *receiptListBuffers) encodeForStorage(rs []Receipt) rlp.RawValue {
var out bytes.Buffer
w := &buf.enc
w.Reset(&out)
outer := w.List()
for _, receipts := range rs {
receipts.encodeForStorage(w)
}
w.ListEnd(outer)
w.Flush()
return out.Bytes()
}
// ReceiptList68 is a block receipt list as downloaded by eth/68.
// This also implements types.DerivableList for validation purposes.
type ReceiptList68 struct {
buf *receiptListBuffers
items []Receipt
}
// NewReceiptList68 creates a receipt list.
// This is slow, and exists for testing purposes.
func NewReceiptList68(trs []*types.Receipt) *ReceiptList68 {
rl := &ReceiptList68{items: make([]Receipt, len(trs))}
for i, tr := range trs {
rl.items[i] = newReceipt(tr)
}
return rl
}
func blockReceiptsToNetwork68(blockReceipts, blockBody rlp.RawValue) ([]byte, error) {
txTypesIter, err := txTypesInBody(blockBody)
if err != nil {
return nil, fmt.Errorf("invalid block body: %v", err)
}
nextTxType, stopTxTypes := iter.Pull(txTypesIter)
defer stopTxTypes()
var (
out bytes.Buffer
buf receiptListBuffers
)
blockReceiptIter, _ := rlp.NewListIterator(blockReceipts)
innerReader := bytes.NewReader(nil)
innerStream := rlp.NewStream(innerReader, 0)
w := rlp.NewEncoderBuffer(&out)
outer := w.List()
for i := 0; blockReceiptIter.Next(); i++ {
content := blockReceiptIter.Value()
innerReader.Reset(content)
innerStream.Reset(innerReader, uint64(len(content)))
var r Receipt
txType, _ := nextTxType()
if err := r.decodeDatabase(txType, innerStream); err != nil {
return nil, fmt.Errorf("invalid database receipt %d: %v", i, err)
}
r.encodeForNetwork68(&buf, &w)
}
w.ListEnd(outer)
w.Flush()
return out.Bytes(), nil
}
// setBuffers implements ReceiptsList.
func (rl *ReceiptList68) setBuffers(buf *receiptListBuffers) {
rl.buf = buf
}
// EncodeForStorage encodes the receipts for storage into the database.
func (rl *ReceiptList68) EncodeForStorage() rlp.RawValue {
initBuffers(&rl.buf)
return rl.buf.encodeForStorage(rl.items)
}
// Len implements types.DerivableList.
func (rl *ReceiptList68) Len() int {
return len(rl.items)
}
// EncodeIndex implements types.DerivableList.
func (rl *ReceiptList68) EncodeIndex(i int, out *bytes.Buffer) {
initBuffers(&rl.buf)
rl.items[i].encodeForHash(rl.buf, out)
}
// DecodeRLP decodes a list of receipts from the network format.
func (rl *ReceiptList68) DecodeRLP(s *rlp.Stream) error {
initBuffers(&rl.buf)
if _, err := s.List(); err != nil {
return err
}
for i := 0; s.MoreDataInList(); i++ {
var item Receipt
err := item.decode68(rl.buf, s)
if err != nil {
return fmt.Errorf("receipt %d: %v", i, err)
}
rl.items = append(rl.items, item)
}
return s.ListEnd()
}
// EncodeRLP encodes the list into the network format of eth/68.
func (rl *ReceiptList68) EncodeRLP(_w io.Writer) error {
initBuffers(&rl.buf)
w := rlp.NewEncoderBuffer(_w)
outer := w.List()
for i := range rl.items {
rl.items[i].encodeForNetwork68(rl.buf, &w)
}
w.ListEnd(outer)
return w.Flush()
}
// ReceiptList69 is the block receipt list as downloaded by eth/69.
// This implements types.DerivableList for validation purposes.
type ReceiptList69 struct {
buf *receiptListBuffers
items []Receipt
}
// NewReceiptList69 creates a receipt list.
// This is slow, and exists for testing purposes.
func NewReceiptList69(trs []*types.Receipt) *ReceiptList69 {
rl := &ReceiptList69{items: make([]Receipt, len(trs))}
for i, tr := range trs {
rl.items[i] = newReceipt(tr)
}
return rl
}
// setBuffers implements ReceiptsList.
func (rl *ReceiptList69) setBuffers(buf *receiptListBuffers) {
rl.buf = buf
}
// EncodeForStorage encodes the receipts for storage into the database.
func (rl *ReceiptList69) EncodeForStorage() rlp.RawValue {
initBuffers(&rl.buf)
return rl.buf.encodeForStorage(rl.items)
}
// Len implements types.DerivableList.
func (rl *ReceiptList69) Len() int {
return len(rl.items)
}
// EncodeIndex implements types.DerivableList.
func (rl *ReceiptList69) EncodeIndex(i int, out *bytes.Buffer) {
initBuffers(&rl.buf)
rl.items[i].encodeForHash(rl.buf, out)
}
// DecodeRLP decodes a list receipts from the network format.
func (rl *ReceiptList69) DecodeRLP(s *rlp.Stream) error {
if _, err := s.List(); err != nil {
return err
}
for i := 0; s.MoreDataInList(); i++ {
var item Receipt
err := item.decode69(s)
if err != nil {
return fmt.Errorf("receipt %d: %v", i, err)
}
rl.items = append(rl.items, item)
}
return s.ListEnd()
}
// EncodeRLP encodes the list into the network format of eth/69.
func (rl *ReceiptList69) EncodeRLP(_w io.Writer) error {
w := rlp.NewEncoderBuffer(_w)
outer := w.List()
for i := range rl.items {
rl.items[i].encodeForNetwork69(&w)
}
w.ListEnd(outer)
return w.Flush()
}
// blockReceiptsToNetwork69 takes a slice of rlp-encoded receipts, and transactions,
// and applies the type-encoding on the receipts (for non-legacy receipts).
// e.g. for non-legacy receipts: receipt-data -> {tx-type || receipt-data}
func blockReceiptsToNetwork69(blockReceipts, blockBody rlp.RawValue) ([]byte, error) {
txTypesIter, err := txTypesInBody(blockBody)
if err != nil {
return nil, fmt.Errorf("invalid block body: %v", err)
}
nextTxType, stopTxTypes := iter.Pull(txTypesIter)
defer stopTxTypes()
var (
out bytes.Buffer
enc = rlp.NewEncoderBuffer(&out)
it, _ = rlp.NewListIterator(blockReceipts)
)
outer := enc.List()
for i := 0; it.Next(); i++ {
txType, _ := nextTxType()
content, _, _ := rlp.SplitList(it.Value())
receiptList := enc.List()
enc.WriteUint64(uint64(txType))
enc.Write(content)
enc.ListEnd(receiptList)
}
enc.ListEnd(outer)
enc.Flush()
return out.Bytes(), nil
}
// txTypesInBody parses the transactions list of an encoded block body, returning just the types.
func txTypesInBody(body rlp.RawValue) (iter.Seq[byte], error) {
bodyFields, _, err := rlp.SplitList(body)
if err != nil {
return nil, err
}
txsIter, err := rlp.NewListIterator(bodyFields)
if err != nil {
return nil, err
}
return func(yield func(byte) bool) {
for txsIter.Next() {
var txType byte
switch k, content, _, _ := rlp.Split(txsIter.Value()); k {
case rlp.List:
txType = 0
case rlp.String:
if len(content) > 0 {
txType = content[0]
}
}
if !yield(txType) {
return
}
}
}, nil
}

View file

@ -0,0 +1,158 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"bytes"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
// miniDeriveFields derives the necessary receipt fields to make types.DeriveSha work.
func miniDeriveFields(r *types.Receipt, txType byte) {
r.Type = txType
r.Bloom = types.CreateBloom(r)
}
var receiptsTestLogs1 = []*types.Log{{Address: common.Address{1}, Topics: []common.Hash{{1}}}}
var receiptsTestLogs2 = []*types.Log{
{Address: common.Address{2}, Topics: []common.Hash{{21}, {22}}, Data: []byte{2, 2, 32, 32}},
{Address: common.Address{3}, Topics: []common.Hash{{31}, {32}}, Data: []byte{3, 3, 32, 32}},
}
var receiptsTests = []struct {
input []types.ReceiptForStorage
txs []*types.Transaction
root common.Hash
}{
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 555, Status: 1, Logs: nil}},
txs: []*types.Transaction{types.NewTx(&types.LegacyTx{})},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 555, Status: 1, Logs: nil}},
txs: []*types.Transaction{types.NewTx(&types.DynamicFeeTx{})},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 555, Status: 1, Logs: nil}},
txs: []*types.Transaction{types.NewTx(&types.AccessListTx{})},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 555, Status: 1, Logs: receiptsTestLogs1}},
txs: []*types.Transaction{types.NewTx(&types.LegacyTx{})},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 555, Status: 1, Logs: receiptsTestLogs2}},
txs: []*types.Transaction{types.NewTx(&types.AccessListTx{})},
},
}
func init() {
for i := range receiptsTests {
// derive basic fields
for j := range receiptsTests[i].input {
r := (*types.Receipt)(&receiptsTests[i].input[j])
txType := receiptsTests[i].txs[j].Type()
miniDeriveFields(r, txType)
}
// compute expected root
receipts := make(types.Receipts, len(receiptsTests[i].input))
for j, sr := range receiptsTests[i].input {
r := types.Receipt(sr)
receipts[j] = &r
}
receiptsTests[i].root = types.DeriveSha(receipts, trie.NewStackTrie(nil))
}
}
func TestReceiptList69(t *testing.T) {
for i, test := range receiptsTests {
// encode receipts from types.ReceiptForStorage object.
canonDB, _ := rlp.EncodeToBytes(test.input)
// encode block body from types object.
blockBody := types.Body{Transactions: test.txs}
canonBody, _ := rlp.EncodeToBytes(blockBody)
// convert from storage encoding to network encoding
network, err := blockReceiptsToNetwork69(canonDB, canonBody)
if err != nil {
t.Fatalf("test[%d]: blockReceiptsToNetwork69 error: %v", i, err)
}
// parse as Receipts response list from network encoding
var rl ReceiptList69
if err := rlp.DecodeBytes(network, &rl); err != nil {
t.Fatalf("test[%d]: can't decode network receipts: %v", i, err)
}
rlStorageEnc := rl.EncodeForStorage()
if !bytes.Equal(rlStorageEnc, canonDB) {
t.Fatalf("test[%d]: re-encoded receipts not equal\nhave: %x\nwant: %x", i, rlStorageEnc, canonDB)
}
rlNetworkEnc, _ := rlp.EncodeToBytes(&rl)
if !bytes.Equal(rlNetworkEnc, network) {
t.Fatalf("test[%d]: re-encoded network receipt list not equal\nhave: %x\nwant: %x", i, rlNetworkEnc, network)
}
// compute root hash from ReceiptList69 and compare.
responseHash := types.DeriveSha(&rl, trie.NewStackTrie(nil))
if responseHash != test.root {
t.Fatalf("test[%d]: wrong root hash from ReceiptList69\nhave: %v\nwant: %v", i, responseHash, test.root)
}
}
}
func TestReceiptList68(t *testing.T) {
for i, test := range receiptsTests {
// encode receipts from types.ReceiptForStorage object.
canonDB, _ := rlp.EncodeToBytes(test.input)
// encode block body from types object.
blockBody := types.Body{Transactions: test.txs}
canonBody, _ := rlp.EncodeToBytes(blockBody)
// convert from storage encoding to network encoding
network, err := blockReceiptsToNetwork68(canonDB, canonBody)
if err != nil {
t.Fatalf("test[%d]: blockReceiptsToNetwork68 error: %v", i, err)
}
// parse as Receipts response list from network encoding
var rl ReceiptList68
if err := rlp.DecodeBytes(network, &rl); err != nil {
t.Fatalf("test[%d]: can't decode network receipts: %v", i, err)
}
rlStorageEnc := rl.EncodeForStorage()
if !bytes.Equal(rlStorageEnc, canonDB) {
t.Fatalf("test[%d]: re-encoded receipts not equal\nhave: %x\nwant: %x", i, rlStorageEnc, canonDB)
}
rlNetworkEnc, _ := rlp.EncodeToBytes(&rl)
if !bytes.Equal(rlNetworkEnc, network) {
t.Fatalf("test[%d]: re-encoded network receipt list not equal\nhave: %x\nwant: %x", i, rlNetworkEnc, network)
}
// compute root hash from ReceiptList68 and compare.
responseHash := types.DeriveSha(&rl, trie.NewStackTrie(nil))
if responseHash != test.root {
t.Fatalf("test[%d]: wrong root hash from ReceiptList68\nhave: %v\nwant: %v", i, responseHash, test.root)
}
}
}

View file

@ -1,26 +0,0 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"time"
"github.com/ethereum/go-ethereum/p2p/tracker"
)
// requestTracker is a singleton tracker for eth/66 and newer request times.
var requestTracker = tracker.New(ProtocolName, 5*time.Minute)