core/txpool, eth: add GetRLP to transaction pool (#31307)

Currently, when answering GetPooledTransaction request, txpool.Get() is
used. When the requested hash is blob transaction, blobpool.Get() is
called. This function loads the RLP-encoded transaction from limbo then
decodes and returns. Later, in answerGetPooledTransactions, we need to
RLP encode again. This decode then encode is wasteful. This commit adds
GetRLP to transaction pool interface so that answerGetPooledTransactions
can use the RLP-encoded from limbo directly.

---------

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
minh-bq 2025-03-19 13:20:50 +07:00 committed by GitHub
parent c4f0450710
commit 930836ed66
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 184 additions and 23 deletions

View file

@ -1189,8 +1189,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
return p.lookup.exists(hash)
}
// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
func (p *BlobPool) getRLP(hash common.Hash) []byte {
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// the pool and the amount of time actually spent on pulling the data from disk.
getStart := time.Now()
@ -1212,14 +1211,31 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err)
return nil
}
return data
}
// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
data := p.getRLP(hash)
if len(data) == 0 {
return nil
}
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err)
if err := rlp.DecodeBytes(data, item); err != nil {
id, _ := p.lookup.storeidOfTx(hash)
log.Error("Blobs corrupted for traced transaction",
"hash", hash, "id", id, "err", err)
return nil
}
return item
}
// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
func (p *BlobPool) GetRLP(hash common.Hash) []byte {
return p.getRLP(hash)
}
// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.

View file

@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)
@ -1010,6 +1011,20 @@ func (pool *LegacyPool) get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash)
}
// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
func (pool *LegacyPool) GetRLP(hash common.Hash) []byte {
tx := pool.all.Get(hash)
if tx == nil {
return nil
}
encoded, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encoded transaction in legacy pool", "hash", hash, "err", err)
return nil
}
return encoded
}
// GetBlobs is not supported by the legacy transaction pool, it is just here to
// implement the txpool.SubPool interface.
func (pool *LegacyPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) {

View file

@ -124,6 +124,9 @@ type SubPool interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction
// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
GetRLP(hash common.Hash) []byte
// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.

View file

@ -309,6 +309,17 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction {
return nil
}
// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
func (p *TxPool) GetRLP(hash common.Hash) []byte {
for _, subpool := range p.subpools {
encoded := subpool.GetRLP(hash)
if len(encoded) != 0 {
return encoded
}
}
return nil
}
// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.

View file

@ -67,6 +67,10 @@ type txPool interface {
// tx hash.
Get(hash common.Hash) *types.Transaction
// GetRLP retrieves the RLP-encoded transaction from local txpool
// with given tx hash.
GetRLP(hash common.Hash) []byte
// Add should add the given transactions to the pool.
Add(txs []*types.Transaction, sync bool) []error

View file

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)
@ -78,6 +79,20 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
return p.pool[hash]
}
// Get retrieves the transaction from local txpool with given
// tx hash.
func (p *testTxPool) GetRLP(hash common.Hash) []byte {
p.lock.Lock()
defer p.lock.Unlock()
tx := p.pool[hash]
if tx != nil {
blob, _ := rlp.EncodeToBytes(tx)
return blob
}
return nil
}
// Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {

View file

@ -86,6 +86,10 @@ type Backend interface {
type TxPool interface {
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction
// GetRLP retrieves the RLP-encoded transaction from the local txpool with
// the given hash.
GetRLP(hash common.Hash) []byte
}
// MakeProtocols constructs the P2P protocol definitions for `eth`.

View file

@ -18,9 +18,11 @@ package eth
import (
"bytes"
"crypto/sha256"
"math"
"math/big"
"math/rand"
"os"
"testing"
"time"
@ -30,15 +32,18 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)
var (
@ -62,12 +67,12 @@ type testBackend struct {
// newTestBackend creates an empty chain and wraps it into a mock backend.
func newTestBackend(blocks int) *testBackend {
return newTestBackendWithGenerator(blocks, false, nil)
return newTestBackendWithGenerator(blocks, false, false, nil)
}
// newTestBackendWithGenerator creates a chain with a number of explicitly defined blocks and
// wraps it into a mock backend.
func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, *core.BlockGen)) *testBackend {
func newTestBackendWithGenerator(blocks int, shanghai bool, cancun bool, generator func(int, *core.BlockGen)) *testBackend {
var (
// Create a database pre-initialize with a genesis block
db = rawdb.NewMemoryDatabase()
@ -99,9 +104,21 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
}
}
if cancun {
config.CancunTime = u64(0)
config.BlobScheduleConfig = &params.BlobScheduleConfig{
Cancun: &params.BlobConfig{
Target: 3,
Max: 6,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
}
}
gspec := &core.Genesis{
Config: config,
Alloc: types.GenesisAlloc{testAddr: {Balance: big.NewInt(100_000_000_000_000_000)}},
Config: config,
Alloc: types.GenesisAlloc{testAddr: {Balance: big.NewInt(100_000_000_000_000_000)}},
Difficulty: common.Big0,
}
chain, _ := core.NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil)
@ -115,8 +132,12 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
txconfig := legacypool.DefaultConfig
txconfig.Journal = "" // Don't litter the disk with test journals
pool := legacypool.New(txconfig, chain)
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool})
storage, _ := os.MkdirTemp("", "blobpool-")
defer os.RemoveAll(storage)
blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain)
legacyPool := legacypool.New(txconfig, chain)
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{legacyPool, blobPool})
return &testBackend{
db: db,
@ -351,7 +372,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}
}
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, false, gen)
defer backend.close()
peer, _ := newTestPeer("peer", protocol, backend)
@ -471,7 +492,7 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}
}
// Assemble the test environment
backend := newTestBackendWithGenerator(4, false, generator)
backend := newTestBackendWithGenerator(4, false, false, generator)
defer backend.close()
peer, _ := newTestPeer("peer", protocol, backend)
@ -548,7 +569,7 @@ func setup() (*testBackend, *testPeer) {
block.SetExtra([]byte("yeehaw"))
}
}
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, false, gen)
peer, _ := newTestPeer("peer", ETH68, backend)
// Discard all messages
go func() {
@ -573,3 +594,80 @@ func FuzzEthProtocolHandlers(f *testing.F) {
handler(backend, decoder{msg: msg}, peer.Peer)
})
}
func TestGetPooledTransaction(t *testing.T) {
t.Run("blobTx", func(t *testing.T) {
testGetPooledTransaction(t, true)
})
t.Run("legacyTx", func(t *testing.T) {
testGetPooledTransaction(t, false)
})
}
func testGetPooledTransaction(t *testing.T, blobTx bool) {
var (
emptyBlob = kzg4844.Blob{}
emptyBlobs = []kzg4844.Blob{emptyBlob}
emptyBlobCommit, _ = kzg4844.BlobToCommitment(&emptyBlob)
emptyBlobProof, _ = kzg4844.ComputeBlobProof(&emptyBlob, emptyBlobCommit)
emptyBlobHash = kzg4844.CalcBlobHashV1(sha256.New(), &emptyBlobCommit)
)
backend := newTestBackendWithGenerator(0, true, true, nil)
defer backend.close()
peer, _ := newTestPeer("peer", ETH68, backend)
defer peer.close()
var (
tx *types.Transaction
err error
signer = types.NewCancunSigner(params.TestChainConfig.ChainID)
)
if blobTx {
tx, err = types.SignNewTx(testKey, signer, &types.BlobTx{
ChainID: uint256.MustFromBig(params.TestChainConfig.ChainID),
Nonce: 0,
GasTipCap: uint256.NewInt(20_000_000_000),
GasFeeCap: uint256.NewInt(21_000_000_000),
Gas: 21000,
To: testAddr,
BlobHashes: []common.Hash{emptyBlobHash},
BlobFeeCap: uint256.MustFromBig(common.Big1),
Sidecar: &types.BlobTxSidecar{
Blobs: emptyBlobs,
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
},
})
if err != nil {
t.Fatal(err)
}
} else {
tx, err = types.SignTx(
types.NewTransaction(0, testAddr, big.NewInt(10_000), params.TxGas, big.NewInt(1_000_000_000), nil),
signer,
testKey,
)
if err != nil {
t.Fatal(err)
}
}
errs := backend.txpool.Add([]*types.Transaction{tx}, true)
for _, err := range errs {
if err != nil {
t.Fatal(err)
}
}
// Send the hash request and verify the response
p2p.Send(peer.app, GetPooledTransactionsMsg, GetPooledTransactionsPacket{
RequestId: 123,
GetPooledTransactionsRequest: []common.Hash{tx.Hash()},
})
if err := p2p.ExpectMsg(peer.app, PooledTransactionsMsg, PooledTransactionsPacket{
RequestId: 123,
PooledTransactionsResponse: []*types.Transaction{tx},
}); err != nil {
t.Errorf("pooled transaction mismatch: %v", err)
}
}

View file

@ -397,18 +397,13 @@ func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsReq
break
}
// Retrieve the requested transaction, skipping if unknown to us
tx := backend.TxPool().Get(hash)
if tx == nil {
encoded := backend.TxPool().GetRLP(hash)
if len(encoded) == 0 {
continue
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(tx); err != nil {
log.Error("Failed to encode transaction", "err", err)
} else {
hashes = append(hashes, hash)
txs = append(txs, encoded)
bytes += len(encoded)
}
hashes = append(hashes, hash)
txs = append(txs, encoded)
bytes += len(encoded)
}
return hashes, txs
}