This commit is contained in:
Bosul Mun 2026-02-24 21:54:25 -08:00 committed by GitHub
commit 39aa1963e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 145 additions and 69 deletions

View file

@ -26,6 +26,16 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }
type NewTxHashesEvent struct{ Hashes []common.Hash }
func NewTxHashesEventFromTxs(txs []*types.Transaction) NewTxHashesEvent {
hashes := make([]common.Hash, 0, len(txs))
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}
return NewTxHashesEvent{Hashes: hashes}
}
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }

View file

@ -133,12 +133,14 @@ type blobTxMeta struct {
evictionExecTip *uint256.Int // Worst gas tip across all previous nonces
evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
sender common.Address // Sender of the transaction
}
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
// and assembles a helper struct to track in memory.
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta {
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction, sender common.Address) *blobTxMeta {
if tx.BlobTxSidecar() == nil {
// This should never happen, as the pool only admits blob transactions with a sidecar
panic("missing blob tx sidecar")
@ -157,6 +159,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),
sender: sender,
}
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
@ -531,8 +534,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
return errors.New("missing blob sidecar")
}
meta := newBlobTxMeta(id, tx.Size(), size, tx)
if p.lookup.exists(meta.hash) {
if p.lookup.exists(tx.Hash()) {
// This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get
// partially resurrected.
@ -547,6 +549,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
return err
}
meta := newBlobTxMeta(id, tx.Size(), size, tx, sender)
if _, ok := p.index[sender]; !ok {
if err := p.reserver.Hold(sender); err != nil {
return err
@ -1070,8 +1073,13 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
return err
}
sender, err := types.Sender(p.signer, tx)
if err != nil {
return err
}
// Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx, sender)
if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
@ -1360,9 +1368,15 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
if !ok {
return nil
}
sender, ok := p.lookup.senderOfTx(hash)
if !ok {
return nil
}
return &txpool.TxMetadata{
Type: types.BlobTxType,
Size: size,
Type: types.BlobTxType,
Size: size,
Sender: sender,
}
}
@ -1577,7 +1591,13 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
if err != nil {
return err
}
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
sender, err := types.Sender(p.signer, tx)
if err != nil {
return err
}
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx, sender)
var (
next = p.state.GetNonce(from)
@ -1674,7 +1694,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
addValidMeter.Mark(1)
// Notify all listeners of the new arrival
p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})
p.discoverFeed.Send(core.NewTxHashesEventFromTxs([]*types.Transaction{tx.WithoutBlobTxSidecar()}))
p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})
//check the gapped queue for this account and try to promote
@ -1934,12 +1954,13 @@ func (p *BlobPool) updateLimboMetrics() {
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
if reorgs {
return p.insertFeed.Subscribe(ch)
} else {
return p.discoverFeed.Subscribe(ch)
}
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.insertFeed.Subscribe(ch)
}
// TODO: comment
func (p *BlobPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
return p.discoverFeed.Subscribe(ch)
}
// Nonce returns the next nonce of an account, with all transactions executable

View file

@ -21,8 +21,9 @@ import (
)
type txMetadata struct {
id uint64 // the billy id of transction
size uint64 // the RLP encoded size of transaction (blobs are included)
id uint64 // the billy id of transction
size uint64 // the RLP encoded size of transaction (blobs are included)
sender common.Address
}
// lookup maps blob versioned hashes to transaction hashes that include them,
@ -79,6 +80,15 @@ func (l *lookup) sizeOfTx(txhash common.Hash) (uint64, bool) {
return meta.size, true
}
// senderOfTx returns sender of transaction
func (l *lookup) senderOfTx(txhash common.Hash) (common.Address, bool) {
meta, ok := l.txIndex[txhash]
if !ok {
return common.Address{}, false
}
return meta.sender, true
}
// track inserts a new set of mappings from blob versioned hashes to transaction
// hashes; and from transaction hashes to datastore storage item ids.
func (l *lookup) track(tx *blobTxMeta) {
@ -91,8 +101,9 @@ func (l *lookup) track(tx *blobTxMeta) {
}
// Map the transaction hash to the datastore id and RLP-encoded transaction size
l.txIndex[tx.hash] = &txMetadata{
id: tx.id,
size: tx.size,
id: tx.id,
size: tx.size,
sender: tx.sender,
}
}

View file

@ -232,6 +232,7 @@ type LegacyPool struct {
chain BlockChain
gasTip atomic.Pointer[uint256.Int]
txFeed event.Feed
hashFeed event.Feed
signer types.Signer
mu sync.RWMutex
@ -400,9 +401,10 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait
}
// TODO: comment
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
// The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transaction from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and
@ -410,6 +412,10 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
return pool.txFeed.Subscribe(ch)
}
func (pool *LegacyPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
return pool.hashFeed.Subscribe(ch)
}
// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
@ -1029,9 +1035,11 @@ func (pool *LegacyPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
if tx == nil {
return nil
}
sender, _ := types.Sender(pool.signer, tx)
return &txpool.TxMetadata{
Type: tx.Type(),
Size: tx.Size(),
Type: tx.Type(),
Size: tx.Size(),
Sender: sender,
}
}
@ -1294,6 +1302,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
pool.hashFeed.Send(core.NewTxHashesEventFromTxs(txs))
}
}

View file

@ -86,8 +86,11 @@ type PendingFilter struct {
// TxMetadata denotes the metadata of a transaction.
type TxMetadata struct {
Type uint8 // The type of the transaction
Size uint64 // The length of the 'rlp encoding' of a transaction
Type uint8 // The type of the transaction
Size uint64 // The length of the 'rlp encoding' of a transaction
Sender common.Address // The sender of the transaction.
//TODO: This is only for the broadcast decision. can we use something else for this ?
//TODO: Legacypool would have to resolve this every time GetMetadata() is called.
}
// SubPool represents a specialized transaction pool that lives on its own (e.g.
@ -157,9 +160,13 @@ type SubPool interface {
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
// would receive only newly seen transactions.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// SubscribePropagationHashes subscribes to transaction hashes events, especially
// the ones to be propagated. This includes newly seen transactions and reorged
// transactions.
SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription
// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.

View file

@ -371,10 +371,18 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
subs[i] = subpool.SubscribeTransactions(ch)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}
func (p *TxPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribePropagationHashes(ch)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}

View file

@ -405,7 +405,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
}
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeTransactions(ch, true)
return b.eth.txPool.SubscribeTransactions(ch)
}
func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress {

View file

@ -48,7 +48,7 @@ func (a *simulatedBeaconAPI) loop() {
var (
newTxs = make(chan core.NewTxsEvent)
newWxs = make(chan newWithdrawalsEvent)
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs)
newWxsSub = a.sim.withdrawals.subscribe(newWxs)
doCommit = make(chan struct{}, 1)
)

View file

@ -91,7 +91,11 @@ type txPool interface {
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
// TODO: comment
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// TODO: comment
SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription
// FilterType returns whether the given tx type is supported by the txPool.
FilterType(kind byte) bool
@ -127,7 +131,7 @@ type handler struct {
txBroadcastKey [16]byte
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsCh chan core.NewTxHashesEvent
txsSub event.Subscription
blockRange *blockRangeState
@ -414,8 +418,8 @@ func (h *handler) Start(maxPeers int) {
// broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
h.txsCh = make(chan core.NewTxHashesEvent, txChanSize)
h.txsSub = h.txpool.SubscribePropagationHashes(h.txsCh)
go h.txBroadcastLoop()
// broadcast block range
@ -455,7 +459,7 @@ func (h *handler) Stop() {
// - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) {
func (h *handler) BroadcastTransactions(txs []common.Hash) {
var (
blobTxs int // Number of blob transactions to announce only
largeTxs int // Number of large transactions to announce only
@ -466,35 +470,35 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
signer = types.LatestSigner(h.chain.Config())
choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey)
peers = h.peers.all()
)
for _, tx := range txs {
var directSet map[*ethPeer]struct{}
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
// Get transaction sender address. Here we can ignore any error
// since we're just interested in any value.
txSender, _ := types.Sender(signer, tx)
directSet = choice.choosePeers(peers, txSender)
}
for _, peer := range peers {
if peer.KnownTransaction(tx.Hash()) {
continue
if meta := h.txpool.GetMetadata(tx); meta != nil {
var directSet map[*ethPeer]struct{}
switch {
case meta.Type == types.BlobTxType:
blobTxs++
case meta.Size > txMaxBroadcastSize:
largeTxs++
default:
// Get transaction sender address. Here we can ignore any error
// since we're just interested in any value.
directSet = choice.choosePeers(peers, meta.Sender)
}
if _, ok := directSet[peer]; ok {
// Send direct.
txset[peer] = append(txset[peer], tx.Hash())
} else {
// Send announcement.
annos[peer] = append(annos[peer], tx.Hash())
for _, peer := range peers {
if peer.KnownTransaction(tx) {
continue
}
if _, ok := directSet[peer]; ok {
// Send direct.
txset[peer] = append(txset[peer], tx)
} else {
// Send announcement.
annos[peer] = append(annos[peer], tx)
}
}
}
}
@ -517,7 +521,7 @@ func (h *handler) txBroadcastLoop() {
for {
select {
case event := <-h.txsCh:
h.BroadcastTransactions(event.Txs)
h.BroadcastTransactions(event.Hashes)
case <-h.txsSub.Err():
return
}

View file

@ -245,8 +245,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
handler.handler.synced.Store(true) // mark synced to accept transactions
txs := make(chan core.NewTxsEvent)
sub := handler.txpool.SubscribeTransactions(txs, false)
txs := make(chan core.NewTxHashesEvent)
sub := handler.txpool.SubscribePropagationHashes(txs)
defer sub.Unsubscribe()
// Create a source peer to send messages through and a sink handler to receive them
@ -275,10 +275,10 @@ func testRecvTransactions(t *testing.T, protocol uint) {
}
select {
case event := <-txs:
if len(event.Txs) != 1 {
t.Errorf("wrong number of added transactions: got %d, want 1", len(event.Txs))
} else if event.Txs[0].Hash() != tx.Hash() {
t.Errorf("added wrong tx hash: got %v, want %v", event.Txs[0].Hash(), tx.Hash())
if len(event.Hashes) != 1 {
t.Errorf("wrong number of added transactions: got %d, want 1", len(event.Hashes))
} else if event.Hashes[0] != tx.Hash() {
t.Errorf("added wrong tx hash: got %v, want %v", event.Hashes[0], tx.Hash())
}
case <-time.After(2 * time.Second):
t.Errorf("no NewTxsEvent received within 2 seconds")
@ -406,7 +406,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
for i := 0; i < len(sinks); i++ {
txChs[i] = make(chan core.NewTxsEvent, 1024)
sub := sinks[i].txpool.SubscribeTransactions(txChs[i], false)
sub := sinks[i].txpool.SubscribeTransactions(txChs[i])
defer sub.Unsubscribe()
}
// Fill the source pool with transactions and wait for them at the sinks

View file

@ -56,8 +56,9 @@ var (
type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions
txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool
txFeed event.Feed // Notification feed to allow waiting for inclusion
hashFeed event.Feed
lock sync.RWMutex // Protects the transaction pool
}
// newTestTxPool creates a mock transaction pool.
@ -124,6 +125,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
p.pool[tx.Hash()] = tx
}
p.txFeed.Send(core.NewTxsEvent{Txs: txs})
p.hashFeed.Send(core.NewTxHashesEventFromTxs(txs))
return make([]error, len(txs))
}
@ -159,10 +161,14 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*
// SubscribeTransactions should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
}
func (p *testTxPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
return p.hashFeed.Subscribe(ch)
}
// FilterType should check whether the pool supports the given type of transactions.
func (p *testTxPool) FilterType(kind byte) bool {
switch kind {