From 930836ed66c9ff8eaffc8cd168cd9c76fad82c29 Mon Sep 17 00:00:00 2001 From: minh-bq Date: Wed, 19 Mar 2025 13:20:50 +0700 Subject: [PATCH] core/txpool, eth: add GetRLP to transaction pool (#31307) Currently, when answering GetPooledTransaction request, txpool.Get() is used. When the requested hash is blob transaction, blobpool.Get() is called. This function loads the RLP-encoded transaction from limbo then decodes and returns. Later, in answerGetPooledTransactions, we need to RLP encode again. This decode then encode is wasteful. This commit adds GetRLP to transaction pool interface so that answerGetPooledTransactions can use the RLP-encoded from limbo directly. --------- Co-authored-by: Gary Rong --- core/txpool/blobpool/blobpool.go | 24 +++++- core/txpool/legacypool/legacypool.go | 15 ++++ core/txpool/subpool.go | 3 + core/txpool/txpool.go | 11 +++ eth/handler.go | 4 + eth/handler_test.go | 15 ++++ eth/protocols/eth/handler.go | 4 + eth/protocols/eth/handler_test.go | 116 ++++++++++++++++++++++++--- eth/protocols/eth/handlers.go | 15 ++-- 9 files changed, 184 insertions(+), 23 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 7ad95612bf..59a5645040 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1189,8 +1189,7 @@ func (p *BlobPool) Has(hash common.Hash) bool { return p.lookup.exists(hash) } -// Get returns a transaction if it is contained in the pool, or nil otherwise. -func (p *BlobPool) Get(hash common.Hash) *types.Transaction { +func (p *BlobPool) getRLP(hash common.Hash) []byte { // Track the amount of time waiting to retrieve a fully resolved blob tx from // the pool and the amount of time actually spent on pulling the data from disk. getStart := time.Now() @@ -1212,14 +1211,31 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err) return nil } + return data +} + +// Get returns a transaction if it is contained in the pool, or nil otherwise. +func (p *BlobPool) Get(hash common.Hash) *types.Transaction { + data := p.getRLP(hash) + if len(data) == 0 { + return nil + } item := new(types.Transaction) - if err = rlp.DecodeBytes(data, item); err != nil { - log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err) + if err := rlp.DecodeBytes(data, item); err != nil { + id, _ := p.lookup.storeidOfTx(hash) + + log.Error("Blobs corrupted for traced transaction", + "hash", hash, "id", id, "err", err) return nil } return item } +// GetRLP returns a RLP-encoded transaction if it is contained in the pool. +func (p *BlobPool) GetRLP(hash common.Hash) []byte { + return p.getRLP(hash) +} + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 8c09d48695..78be81480f 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/uint256" ) @@ -1010,6 +1011,20 @@ func (pool *LegacyPool) get(hash common.Hash) *types.Transaction { return pool.all.Get(hash) } +// GetRLP returns a RLP-encoded transaction if it is contained in the pool. +func (pool *LegacyPool) GetRLP(hash common.Hash) []byte { + tx := pool.all.Get(hash) + if tx == nil { + return nil + } + encoded, err := rlp.EncodeToBytes(tx) + if err != nil { + log.Error("Failed to encoded transaction in legacy pool", "hash", hash, "err", err) + return nil + } + return encoded +} + // GetBlobs is not supported by the legacy transaction pool, it is just here to // implement the txpool.SubPool interface. func (pool *LegacyPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 242af02136..1392cfb274 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -124,6 +124,9 @@ type SubPool interface { // Get returns a transaction if it is contained in the pool, or nil otherwise. Get(hash common.Hash) *types.Transaction + // GetRLP returns a RLP-encoded transaction if it is contained in the pool. + GetRLP(hash common.Hash) []byte + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 55812415f9..042e3d36d9 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -309,6 +309,17 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction { return nil } +// GetRLP returns a RLP-encoded transaction if it is contained in the pool. +func (p *TxPool) GetRLP(hash common.Hash) []byte { + for _, subpool := range p.subpools { + encoded := subpool.GetRLP(hash) + if len(encoded) != 0 { + return encoded + } + } + return nil +} + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. diff --git a/eth/handler.go b/eth/handler.go index 6ac890902b..4c83f5613c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -67,6 +67,10 @@ type txPool interface { // tx hash. Get(hash common.Hash) *types.Transaction + // GetRLP retrieves the RLP-encoded transaction from local txpool + // with given tx hash. + GetRLP(hash common.Hash) []byte + // Add should add the given transactions to the pool. Add(txs []*types.Transaction, sync bool) []error diff --git a/eth/handler_test.go b/eth/handler_test.go index d5d46a3c65..0c6b9854e6 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/uint256" ) @@ -78,6 +79,20 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction { return p.pool[hash] } +// Get retrieves the transaction from local txpool with given +// tx hash. +func (p *testTxPool) GetRLP(hash common.Hash) []byte { + p.lock.Lock() + defer p.lock.Unlock() + + tx := p.pool[hash] + if tx != nil { + blob, _ := rlp.EncodeToBytes(tx) + return blob + } + return nil +} + // Add appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index a0848137fa..eca6777bd6 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -86,6 +86,10 @@ type Backend interface { type TxPool interface { // Get retrieves the transaction from the local txpool with the given hash. Get(hash common.Hash) *types.Transaction + + // GetRLP retrieves the RLP-encoded transaction from the local txpool with + // the given hash. + GetRLP(hash common.Hash) []byte } // MakeProtocols constructs the P2P protocol definitions for `eth`. diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index f92599dba7..36f5e90c9a 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -18,9 +18,11 @@ package eth import ( "bytes" + "crypto/sha256" "math" "math/big" "math/rand" + "os" "testing" "time" @@ -30,15 +32,18 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/holiman/uint256" ) var ( @@ -62,12 +67,12 @@ type testBackend struct { // newTestBackend creates an empty chain and wraps it into a mock backend. func newTestBackend(blocks int) *testBackend { - return newTestBackendWithGenerator(blocks, false, nil) + return newTestBackendWithGenerator(blocks, false, false, nil) } // newTestBackendWithGenerator creates a chain with a number of explicitly defined blocks and // wraps it into a mock backend. -func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, *core.BlockGen)) *testBackend { +func newTestBackendWithGenerator(blocks int, shanghai bool, cancun bool, generator func(int, *core.BlockGen)) *testBackend { var ( // Create a database pre-initialize with a genesis block db = rawdb.NewMemoryDatabase() @@ -99,9 +104,21 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, } } + if cancun { + config.CancunTime = u64(0) + config.BlobScheduleConfig = ¶ms.BlobScheduleConfig{ + Cancun: ¶ms.BlobConfig{ + Target: 3, + Max: 6, + UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction, + }, + } + } + gspec := &core.Genesis{ - Config: config, - Alloc: types.GenesisAlloc{testAddr: {Balance: big.NewInt(100_000_000_000_000_000)}}, + Config: config, + Alloc: types.GenesisAlloc{testAddr: {Balance: big.NewInt(100_000_000_000_000_000)}}, + Difficulty: common.Big0, } chain, _ := core.NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil) @@ -115,8 +132,12 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, txconfig := legacypool.DefaultConfig txconfig.Journal = "" // Don't litter the disk with test journals - pool := legacypool.New(txconfig, chain) - txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool}) + storage, _ := os.MkdirTemp("", "blobpool-") + defer os.RemoveAll(storage) + + blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain) + legacyPool := legacypool.New(txconfig, chain) + txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{legacyPool, blobPool}) return &testBackend{ db: db, @@ -351,7 +372,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) { } } - backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen) + backend := newTestBackendWithGenerator(maxBodiesServe+15, true, false, gen) defer backend.close() peer, _ := newTestPeer("peer", protocol, backend) @@ -471,7 +492,7 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { } } // Assemble the test environment - backend := newTestBackendWithGenerator(4, false, generator) + backend := newTestBackendWithGenerator(4, false, false, generator) defer backend.close() peer, _ := newTestPeer("peer", protocol, backend) @@ -548,7 +569,7 @@ func setup() (*testBackend, *testPeer) { block.SetExtra([]byte("yeehaw")) } } - backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen) + backend := newTestBackendWithGenerator(maxBodiesServe+15, true, false, gen) peer, _ := newTestPeer("peer", ETH68, backend) // Discard all messages go func() { @@ -573,3 +594,80 @@ func FuzzEthProtocolHandlers(f *testing.F) { handler(backend, decoder{msg: msg}, peer.Peer) }) } + +func TestGetPooledTransaction(t *testing.T) { + t.Run("blobTx", func(t *testing.T) { + testGetPooledTransaction(t, true) + }) + t.Run("legacyTx", func(t *testing.T) { + testGetPooledTransaction(t, false) + }) +} + +func testGetPooledTransaction(t *testing.T, blobTx bool) { + var ( + emptyBlob = kzg4844.Blob{} + emptyBlobs = []kzg4844.Blob{emptyBlob} + emptyBlobCommit, _ = kzg4844.BlobToCommitment(&emptyBlob) + emptyBlobProof, _ = kzg4844.ComputeBlobProof(&emptyBlob, emptyBlobCommit) + emptyBlobHash = kzg4844.CalcBlobHashV1(sha256.New(), &emptyBlobCommit) + ) + backend := newTestBackendWithGenerator(0, true, true, nil) + defer backend.close() + + peer, _ := newTestPeer("peer", ETH68, backend) + defer peer.close() + + var ( + tx *types.Transaction + err error + signer = types.NewCancunSigner(params.TestChainConfig.ChainID) + ) + if blobTx { + tx, err = types.SignNewTx(testKey, signer, &types.BlobTx{ + ChainID: uint256.MustFromBig(params.TestChainConfig.ChainID), + Nonce: 0, + GasTipCap: uint256.NewInt(20_000_000_000), + GasFeeCap: uint256.NewInt(21_000_000_000), + Gas: 21000, + To: testAddr, + BlobHashes: []common.Hash{emptyBlobHash}, + BlobFeeCap: uint256.MustFromBig(common.Big1), + Sidecar: &types.BlobTxSidecar{ + Blobs: emptyBlobs, + Commitments: []kzg4844.Commitment{emptyBlobCommit}, + Proofs: []kzg4844.Proof{emptyBlobProof}, + }, + }) + if err != nil { + t.Fatal(err) + } + } else { + tx, err = types.SignTx( + types.NewTransaction(0, testAddr, big.NewInt(10_000), params.TxGas, big.NewInt(1_000_000_000), nil), + signer, + testKey, + ) + if err != nil { + t.Fatal(err) + } + } + errs := backend.txpool.Add([]*types.Transaction{tx}, true) + for _, err := range errs { + if err != nil { + t.Fatal(err) + } + } + + // Send the hash request and verify the response + p2p.Send(peer.app, GetPooledTransactionsMsg, GetPooledTransactionsPacket{ + RequestId: 123, + GetPooledTransactionsRequest: []common.Hash{tx.Hash()}, + }) + if err := p2p.ExpectMsg(peer.app, PooledTransactionsMsg, PooledTransactionsPacket{ + RequestId: 123, + PooledTransactionsResponse: []*types.Transaction{tx}, + }); err != nil { + t.Errorf("pooled transaction mismatch: %v", err) + } +} diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index b3886270f3..ab56be7ffc 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -397,18 +397,13 @@ func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsReq break } // Retrieve the requested transaction, skipping if unknown to us - tx := backend.TxPool().Get(hash) - if tx == nil { + encoded := backend.TxPool().GetRLP(hash) + if len(encoded) == 0 { continue } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(tx); err != nil { - log.Error("Failed to encode transaction", "err", err) - } else { - hashes = append(hashes, hash) - txs = append(txs, encoded) - bytes += len(encoded) - } + hashes = append(hashes, hash) + txs = append(txs, encoded) + bytes += len(encoded) } return hashes, txs }