From ee30681a8d4d176a3561db20e9c8867dafe97441 Mon Sep 17 00:00:00 2001 From: minh-bq Date: Wed, 2 Apr 2025 14:47:56 +0700 Subject: [PATCH] core/txpool: add GetMetadata to transaction pool (#31433) This is an alternative to #31309 With eth/68, transaction announcement must have transaction type and size. So in announceTransactions, we need to query the transaction from transaction pool with its hash. This creates overhead in case of blob transaction which needs to load data from billy and RLP decode. This commit creates a lightweight lookup from transaction hash to transaction size and a function GetMetadata to query transaction type and transaction size given the transaction hash. --------- Co-authored-by: Gary Rong --- core/txpool/blobpool/blobpool.go | 81 ++++++++++++++++---------- core/txpool/blobpool/blobpool_test.go | 12 +++- core/txpool/blobpool/evictheap_test.go | 8 +-- core/txpool/blobpool/lookup.go | 35 ++++++++--- core/txpool/legacypool/legacypool.go | 13 +++++ core/txpool/subpool.go | 10 ++++ core/txpool/txpool.go | 11 ++++ eth/handler.go | 4 ++ eth/handler_test.go | 16 +++++ eth/protocols/eth/broadcast.go | 6 +- eth/protocols/eth/handler.go | 5 ++ 11 files changed, 156 insertions(+), 45 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 59a5645040..5a20c3ce5a 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -87,8 +87,9 @@ type blobTxMeta struct { hash common.Hash // Transaction hash to maintain the lookup table vhashes []common.Hash // Blob versioned hashes to maintain the lookup table - id uint64 // Storage ID in the pool's persistent store - size uint32 // Byte size in the pool's persistent store + id uint64 // Storage ID in the pool's persistent store + storageSize uint32 // Byte size in the pool's persistent store + size uint64 // RLP-encoded size of transaction including the attached blob nonce uint64 // Needed to prioritize inclusion order within an account costCap *uint256.Int // Needed to validate cumulative balance sufficiency @@ -108,19 +109,20 @@ type blobTxMeta struct { // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // and assembles a helper struct to track in memory. -func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta { +func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { meta := &blobTxMeta{ - hash: tx.Hash(), - vhashes: tx.BlobHashes(), - id: id, - size: size, - nonce: tx.Nonce(), - costCap: uint256.MustFromBig(tx.Cost()), - execTipCap: uint256.MustFromBig(tx.GasTipCap()), - execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), - execGas: tx.Gas(), - blobGas: tx.BlobGas(), + hash: tx.Hash(), + vhashes: tx.BlobHashes(), + id: id, + storageSize: storageSize, + size: size, + nonce: tx.Nonce(), + costCap: uint256.MustFromBig(tx.Cost()), + execTipCap: uint256.MustFromBig(tx.GasTipCap()), + execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), + execGas: tx.Gas(), + blobGas: tx.BlobGas(), } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) @@ -480,7 +482,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { return errors.New("missing blob sidecar") } - meta := newBlobTxMeta(id, size, tx) + meta := newBlobTxMeta(id, tx.Size(), size, tx) if p.lookup.exists(meta.hash) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get @@ -507,7 +509,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap) p.lookup.track(meta) - p.stored += uint64(meta.size) + p.stored += uint64(meta.storageSize) return nil } @@ -539,7 +541,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 ids = append(ids, txs[i].id) nonces = append(nonces, txs[i].nonce) - p.stored -= uint64(txs[i].size) + p.stored -= uint64(txs[i].storageSize) p.lookup.untrack(txs[i]) // Included transactions blobs need to be moved to the limbo @@ -580,7 +582,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 nonces = append(nonces, txs[0].nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap) - p.stored -= uint64(txs[0].size) + p.stored -= uint64(txs[0].storageSize) p.lookup.untrack(txs[0]) // Included transactions blobs need to be moved to the limbo @@ -636,7 +638,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 dropRepeatedMeter.Mark(1) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) - p.stored -= uint64(txs[i].size) + p.stored -= uint64(txs[i].storageSize) p.lookup.untrack(txs[i]) if err := p.store.Delete(id); err != nil { @@ -658,7 +660,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 nonces = append(nonces, txs[j].nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap) - p.stored -= uint64(txs[j].size) + p.stored -= uint64(txs[j].storageSize) p.lookup.untrack(txs[j]) } txs = txs[:i] @@ -696,7 +698,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 nonces = append(nonces, last.nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) - p.stored -= uint64(last.size) + p.stored -= uint64(last.storageSize) p.lookup.untrack(last) } if len(txs) == 0 { @@ -736,7 +738,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 nonces = append(nonces, last.nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) - p.stored -= uint64(last.size) + p.stored -= uint64(last.storageSize) p.lookup.untrack(last) } p.index[addr] = txs @@ -1002,7 +1004,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { } // Update the indices and metrics - meta := newBlobTxMeta(id, p.store.Size(id), tx) + meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { if err := p.reserve(addr, true); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) @@ -1016,7 +1018,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap) } p.lookup.track(meta) - p.stored += uint64(meta.size) + p.stored += uint64(meta.storageSize) return nil } @@ -1041,7 +1043,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { nonces = []uint64{tx.nonce} ) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) - p.stored -= uint64(tx.size) + p.stored -= uint64(tx.storageSize) p.lookup.untrack(tx) txs[i] = nil @@ -1051,7 +1053,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { nonces = append(nonces, tx.nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap) - p.stored -= uint64(tx.size) + p.stored -= uint64(tx.storageSize) p.lookup.untrack(tx) txs[i+1+j] = nil } @@ -1236,6 +1238,25 @@ func (p *BlobPool) GetRLP(hash common.Hash) []byte { return p.getRLP(hash) } +// GetMetadata returns the transaction type and transaction size with the +// given transaction hash. +// +// The size refers the length of the 'rlp encoding' of a blob transaction +// including the attached blobs. +func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { + p.lock.RLock() + defer p.lock.RUnlock() + + size, ok := p.lookup.sizeOfTx(hash) + if !ok { + return nil + } + return &txpool.TxMetadata{ + Type: types.BlobTxType, + Size: size, + } +} + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. @@ -1375,7 +1396,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { if err != nil { return err } - meta := newBlobTxMeta(id, p.store.Size(id), tx) + meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) var ( next = p.state.GetNonce(from) @@ -1403,7 +1424,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { p.lookup.untrack(prev) p.lookup.track(meta) - p.stored += uint64(meta.size) - uint64(prev.size) + p.stored += uint64(meta.storageSize) - uint64(prev.storageSize) } else { // Transaction extends previously scheduled ones p.index[from] = append(p.index[from], meta) @@ -1413,7 +1434,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { } p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) p.lookup.track(meta) - p.stored += uint64(meta.size) + p.stored += uint64(meta.storageSize) } // Recompute the rolling eviction fields. In case of a replacement, this will // recompute all subsequent fields. In case of an append, this will only do @@ -1500,7 +1521,7 @@ func (p *BlobPool) drop() { p.index[from] = txs p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap) } - p.stored -= uint64(drop.size) + p.stored -= uint64(drop.storageSize) p.lookup.untrack(drop) // Remove the transaction from the pool's eviction heap: diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index d9137cb679..b7c6cfa51e 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -376,7 +376,7 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) { var stored uint64 for _, txs := range pool.index { for _, tx := range txs { - stored += uint64(tx.size) + stored += uint64(tx.storageSize) } } if pool.stored != stored { @@ -1553,6 +1553,16 @@ func TestAdd(t *testing.T) { if err := pool.add(signed); !errors.Is(err, add.err) { t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, err, add.err) } + if add.err == nil { + size, exist := pool.lookup.sizeOfTx(signed.Hash()) + if !exist { + t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j) + } + if size != signed.Size() { + t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v", + i, j, size, signed.Size()) + } + } verifyPoolInternals(t, pool) } verifyPoolInternals(t, pool) diff --git a/core/txpool/blobpool/evictheap_test.go b/core/txpool/blobpool/evictheap_test.go index e392932401..de4076e298 100644 --- a/core/txpool/blobpool/evictheap_test.go +++ b/core/txpool/blobpool/evictheap_test.go @@ -146,7 +146,7 @@ func TestPriceHeapSorting(t *testing.T) { ) index[addr] = []*blobTxMeta{{ id: uint64(j), - size: 128 * 1024, + storageSize: 128 * 1024, nonce: 0, execTipCap: execTip, execFeeCap: execFee, @@ -205,7 +205,7 @@ func benchmarkPriceHeapReinit(b *testing.B, datacap uint64) { ) index[addr] = []*blobTxMeta{{ id: uint64(i), - size: 128 * 1024, + storageSize: 128 * 1024, nonce: 0, execTipCap: execTip, execFeeCap: execFee, @@ -281,7 +281,7 @@ func benchmarkPriceHeapOverflow(b *testing.B, datacap uint64) { ) index[addr] = []*blobTxMeta{{ id: uint64(i), - size: 128 * 1024, + storageSize: 128 * 1024, nonce: 0, execTipCap: execTip, execFeeCap: execFee, @@ -312,7 +312,7 @@ func benchmarkPriceHeapOverflow(b *testing.B, datacap uint64) { ) metas[i] = &blobTxMeta{ id: uint64(int(blobs) + i), - size: 128 * 1024, + storageSize: 128 * 1024, nonce: 0, execTipCap: execTip, execFeeCap: execFee, diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go index b5cf4d3799..7607cd487a 100644 --- a/core/txpool/blobpool/lookup.go +++ b/core/txpool/blobpool/lookup.go @@ -20,18 +20,24 @@ import ( "github.com/ethereum/go-ethereum/common" ) +type txMetadata struct { + id uint64 // the billy id of transction + size uint64 // the RLP encoded size of transaction (blobs are included) +} + // lookup maps blob versioned hashes to transaction hashes that include them, -// and transaction hashes to billy entries that include them. +// transaction hashes to billy entries that include them, transaction hashes +// to the transaction size type lookup struct { blobIndex map[common.Hash]map[common.Hash]struct{} - txIndex map[common.Hash]uint64 + txIndex map[common.Hash]*txMetadata } // newLookup creates a new index for tracking blob to tx; and tx to billy mappings. func newLookup() *lookup { return &lookup{ blobIndex: make(map[common.Hash]map[common.Hash]struct{}), - txIndex: make(map[common.Hash]uint64), + txIndex: make(map[common.Hash]*txMetadata), } } @@ -43,8 +49,11 @@ func (l *lookup) exists(txhash common.Hash) bool { // storeidOfTx returns the datastore storage item id of a transaction. func (l *lookup) storeidOfTx(txhash common.Hash) (uint64, bool) { - id, ok := l.txIndex[txhash] - return id, ok + meta, ok := l.txIndex[txhash] + if !ok { + return 0, false + } + return meta.id, true } // storeidOfBlob returns the datastore storage item id of a blob. @@ -61,6 +70,15 @@ func (l *lookup) storeidOfBlob(vhash common.Hash) (uint64, bool) { return 0, false // Weird, don't choke } +// sizeOfTx returns the RLP-encoded size of transaction +func (l *lookup) sizeOfTx(txhash common.Hash) (uint64, bool) { + meta, ok := l.txIndex[txhash] + if !ok { + return 0, false + } + return meta.size, true +} + // track inserts a new set of mappings from blob versioned hashes to transaction // hashes; and from transaction hashes to datastore storage item ids. func (l *lookup) track(tx *blobTxMeta) { @@ -71,8 +89,11 @@ func (l *lookup) track(tx *blobTxMeta) { } l.blobIndex[vhash][tx.hash] = struct{}{} // may be double mapped if a tx contains the same blob twice } - // Map the transaction hash to the datastore id - l.txIndex[tx.hash] = tx.id + // Map the transaction hash to the datastore id and RLP-encoded transaction size + l.txIndex[tx.hash] = &txMetadata{ + id: tx.id, + size: tx.size, + } } // untrack removes a set of mappings from blob versioned hashes to transaction diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index dafd185836..9066f3e16b 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1035,6 +1035,19 @@ func (pool *LegacyPool) GetRLP(hash common.Hash) []byte { return encoded } +// GetMetadata returns the transaction type and transaction size with the +// given transaction hash. +func (pool *LegacyPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { + tx := pool.all.Get(hash) + if tx == nil { + return nil + } + return &txpool.TxMetadata{ + Type: tx.Type(), + Size: tx.Size(), + } +} + // GetBlobs is not supported by the legacy transaction pool, it is just here to // implement the txpool.SubPool interface. func (pool *LegacyPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 1392cfb274..f5cb852d8f 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -86,6 +86,12 @@ type PendingFilter struct { OnlyBlobTxs bool // Return only blob transactions (block blob-space filling) } +// TxMetadata denotes the metadata of a transaction. +type TxMetadata struct { + Type uint8 // The type of the transaction + Size uint64 // The length of the 'rlp encoding' of a transaction +} + // SubPool represents a specialized transaction pool that lives on its own (e.g. // blob pool). Since independent of how many specialized pools we have, they do // need to be updated in lockstep and assemble into one coherent view for block @@ -127,6 +133,10 @@ type SubPool interface { // GetRLP returns a RLP-encoded transaction if it is contained in the pool. GetRLP(hash common.Hash) []byte + // GetMetadata returns the transaction type and transaction size with the + // given transaction hash. + GetMetadata(hash common.Hash) *TxMetadata + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 649c5d1a78..083aac92c6 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -354,6 +354,17 @@ func (p *TxPool) GetRLP(hash common.Hash) []byte { return nil } +// GetMetadata returns the transaction type and transaction size with the given +// hash. +func (p *TxPool) GetMetadata(hash common.Hash) *TxMetadata { + for _, subpool := range p.subpools { + if meta := subpool.GetMetadata(hash); meta != nil { + return meta + } + } + return nil +} + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. diff --git a/eth/handler.go b/eth/handler.go index 7179c9980b..b2ad6effdb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -71,6 +71,10 @@ type txPool interface { // with given tx hash. GetRLP(hash common.Hash) []byte + // GetMetadata returns the transaction type and transaction size with the + // given transaction hash. + GetMetadata(hash common.Hash) *txpool.TxMetadata + // Add should add the given transactions to the pool. Add(txs []*types.Transaction, sync bool) []error diff --git a/eth/handler_test.go b/eth/handler_test.go index 0c6b9854e6..fb3103f241 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -93,6 +93,22 @@ func (p *testTxPool) GetRLP(hash common.Hash) []byte { return nil } +// GetMetadata returns the transaction type and transaction size with the given +// hash. +func (p *testTxPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { + p.lock.Lock() + defer p.lock.Unlock() + + tx := p.pool[hash] + if tx != nil { + return &txpool.TxMetadata{ + Type: tx.Type(), + Size: tx.Size(), + } + } + return nil +} + // Add appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index f0ed1d6bc9..21cea0d4ef 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -116,10 +116,10 @@ func (p *Peer) announceTransactions() { size common.StorageSize ) for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { - if tx := p.txpool.Get(queue[count]); tx != nil { + if meta := p.txpool.GetMetadata(queue[count]); meta != nil { pending = append(pending, queue[count]) - pendingTypes = append(pendingTypes, tx.Type()) - pendingSizes = append(pendingSizes, uint32(tx.Size())) + pendingTypes = append(pendingTypes, meta.Type) + pendingSizes = append(pendingSizes, uint32(meta.Size)) size += common.HashLength } } diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index eca6777bd6..f2a3cb0292 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -90,6 +91,10 @@ type TxPool interface { // GetRLP retrieves the RLP-encoded transaction from the local txpool with // the given hash. GetRLP(hash common.Hash) []byte + + // GetMetadata returns the transaction type and transaction size with the + // given transaction hash. + GetMetadata(hash common.Hash) *txpool.TxMetadata } // MakeProtocols constructs the P2P protocol definitions for `eth`.