diff --git a/core/events.go b/core/events.go index ed853f1790..e4abb36066 100644 --- a/core/events.go +++ b/core/events.go @@ -26,6 +26,16 @@ import ( // NewTxsEvent is posted when a batch of transactions enter the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } +type NewTxHashesEvent struct{ Hashes []common.Hash } + +func NewTxHashesEventFromTxs(txs []*types.Transaction) NewTxHashesEvent { + hashes := make([]common.Hash, 0, len(txs)) + for _, tx := range txs { + hashes = append(hashes, tx.Hash()) + } + return NewTxHashesEvent{Hashes: hashes} +} + // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 6022514750..0f91c62a95 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -133,12 +133,14 @@ type blobTxMeta struct { evictionExecTip *uint256.Int // Worst gas tip across all previous nonces evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces + + sender common.Address // Sender of the transaction } // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // and assembles a helper struct to track in memory. // Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). -func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { +func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction, sender common.Address) *blobTxMeta { if tx.BlobTxSidecar() == nil { // This should never happen, as the pool only admits blob transactions with a sidecar panic("missing blob tx sidecar") @@ -157,6 +159,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), execGas: tx.Gas(), blobGas: tx.BlobGas(), + sender: sender, } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) @@ -531,8 +534,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { return errors.New("missing blob sidecar") } - meta := newBlobTxMeta(id, tx.Size(), size, tx) - if p.lookup.exists(meta.hash) { + if p.lookup.exists(tx.Hash()) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get // partially resurrected. @@ -547,6 +549,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err) return err } + meta := newBlobTxMeta(id, tx.Size(), size, tx, sender) if _, ok := p.index[sender]; !ok { if err := p.reserver.Hold(sender); err != nil { return err @@ -1070,8 +1073,13 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { return err } + sender, err := types.Sender(p.signer, tx) + if err != nil { + return err + } + // Update the indices and metrics - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx, sender) if _, ok := p.index[addr]; !ok { if err := p.reserver.Hold(addr); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) @@ -1360,9 +1368,15 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { if !ok { return nil } + + sender, ok := p.lookup.senderOfTx(hash) + if !ok { + return nil + } return &txpool.TxMetadata{ - Type: types.BlobTxType, - Size: size, + Type: types.BlobTxType, + Size: size, + Sender: sender, } } @@ -1577,7 +1591,13 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error if err != nil { return err } - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + + sender, err := types.Sender(p.signer, tx) + if err != nil { + return err + } + + meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx, sender) var ( next = p.state.GetNonce(from) @@ -1674,7 +1694,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error addValidMeter.Mark(1) // Notify all listeners of the new arrival - p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) + p.discoverFeed.Send(core.NewTxHashesEventFromTxs([]*types.Transaction{tx.WithoutBlobTxSidecar()})) p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) //check the gapped queue for this account and try to promote @@ -1934,12 +1954,13 @@ func (p *BlobPool) updateLimboMetrics() { // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. -func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { - if reorgs { - return p.insertFeed.Subscribe(ch) - } else { - return p.discoverFeed.Subscribe(ch) - } +func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { + return p.insertFeed.Subscribe(ch) +} + +// TODO: comment +func (p *BlobPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription { + return p.discoverFeed.Subscribe(ch) } // Nonce returns the next nonce of an account, with all transactions executable diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go index 7607cd487a..fbaf19597a 100644 --- a/core/txpool/blobpool/lookup.go +++ b/core/txpool/blobpool/lookup.go @@ -21,8 +21,9 @@ import ( ) type txMetadata struct { - id uint64 // the billy id of transction - size uint64 // the RLP encoded size of transaction (blobs are included) + id uint64 // the billy id of transction + size uint64 // the RLP encoded size of transaction (blobs are included) + sender common.Address } // lookup maps blob versioned hashes to transaction hashes that include them, @@ -79,6 +80,15 @@ func (l *lookup) sizeOfTx(txhash common.Hash) (uint64, bool) { return meta.size, true } +// senderOfTx returns sender of transaction +func (l *lookup) senderOfTx(txhash common.Hash) (common.Address, bool) { + meta, ok := l.txIndex[txhash] + if !ok { + return common.Address{}, false + } + return meta.sender, true +} + // track inserts a new set of mappings from blob versioned hashes to transaction // hashes; and from transaction hashes to datastore storage item ids. func (l *lookup) track(tx *blobTxMeta) { @@ -91,8 +101,9 @@ func (l *lookup) track(tx *blobTxMeta) { } // Map the transaction hash to the datastore id and RLP-encoded transaction size l.txIndex[tx.hash] = &txMetadata{ - id: tx.id, - size: tx.size, + id: tx.id, + size: tx.size, + sender: tx.sender, } } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 36970c820e..a8fa66ef78 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -232,6 +232,7 @@ type LegacyPool struct { chain BlockChain gasTip atomic.Pointer[uint256.Int] txFeed event.Feed + hashFeed event.Feed signer types.Signer mu sync.RWMutex @@ -400,9 +401,10 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) { <-wait } +// TODO: comment // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. -func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { +func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { // The legacy pool has a very messed up internal shuffling, so it's kind of // hard to separate newly discovered transaction from resurrected ones. This // is because the new txs are added to the queue, resurrected ones too and @@ -410,6 +412,10 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs return pool.txFeed.Subscribe(ch) } +func (pool *LegacyPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription { + return pool.hashFeed.Subscribe(ch) +} + // SetGasTip updates the minimum gas tip required by the transaction pool for a // new transaction, and drops all transactions below this threshold. func (pool *LegacyPool) SetGasTip(tip *big.Int) { @@ -1029,9 +1035,11 @@ func (pool *LegacyPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { if tx == nil { return nil } + sender, _ := types.Sender(pool.signer, tx) return &txpool.TxMetadata{ - Type: tx.Type(), - Size: tx.Size(), + Type: tx.Type(), + Size: tx.Size(), + Sender: sender, } } @@ -1294,6 +1302,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, txs = append(txs, set.Flatten()...) } pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) + pool.hashFeed.Send(core.NewTxHashesEventFromTxs(txs)) } } diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index db099ddf98..39c4d48a74 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -86,8 +86,11 @@ type PendingFilter struct { // TxMetadata denotes the metadata of a transaction. type TxMetadata struct { - Type uint8 // The type of the transaction - Size uint64 // The length of the 'rlp encoding' of a transaction + Type uint8 // The type of the transaction + Size uint64 // The length of the 'rlp encoding' of a transaction + Sender common.Address // The sender of the transaction. + //TODO: This is only for the broadcast decision. can we use something else for this ? + //TODO: Legacypool would have to resolve this every time GetMetadata() is called. } // SubPool represents a specialized transaction pool that lives on its own (e.g. @@ -157,9 +160,13 @@ type SubPool interface { Pending(filter PendingFilter) map[common.Address][]*LazyTransaction // SubscribeTransactions subscribes to new transaction events. The subscriber - // can decide whether to receive notifications only for newly seen transactions - // or also for reorged out ones. - SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + // would receive only newly seen transactions. + SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription + + // SubscribePropagationHashes subscribes to transaction hashes events, especially + // the ones to be propagated. This includes newly seen transactions and reorged + // transactions. + SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index a314a83f1b..0bdb806cb7 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -371,10 +371,18 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. -func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { +func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { subs := make([]event.Subscription, len(p.subpools)) for i, subpool := range p.subpools { - subs[i] = subpool.SubscribeTransactions(ch, reorgs) + subs[i] = subpool.SubscribeTransactions(ch) + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + +func (p *TxPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription { + subs := make([]event.Subscription, len(p.subpools)) + for i, subpool := range p.subpools { + subs[i] = subpool.SubscribePropagationHashes(ch) } return p.subs.Track(event.JoinSubscriptions(subs...)) } diff --git a/eth/api_backend.go b/eth/api_backend.go index 3f826b7861..46012260de 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -405,7 +405,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool { } func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return b.eth.txPool.SubscribeTransactions(ch, true) + return b.eth.txPool.SubscribeTransactions(ch) } func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress { diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index d0115efaa6..1b741246cc 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -48,7 +48,7 @@ func (a *simulatedBeaconAPI) loop() { var ( newTxs = make(chan core.NewTxsEvent) newWxs = make(chan newWithdrawalsEvent) - newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) + newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs) newWxsSub = a.sim.withdrawals.subscribe(newWxs) doCommit = make(chan struct{}, 1) ) diff --git a/eth/handler.go b/eth/handler.go index bb2cd5f88b..7f4d0eaad1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -91,7 +91,11 @@ type txPool interface { // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions // or also for reorged out ones. - SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + // TODO: comment + SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription + + // TODO: comment + SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription // FilterType returns whether the given tx type is supported by the txPool. FilterType(kind byte) bool @@ -127,7 +131,7 @@ type handler struct { txBroadcastKey [16]byte eventMux *event.TypeMux - txsCh chan core.NewTxsEvent + txsCh chan core.NewTxHashesEvent txsSub event.Subscription blockRange *blockRangeState @@ -414,8 +418,8 @@ func (h *handler) Start(maxPeers int) { // broadcast and announce transactions (only new ones, not resurrected ones) h.wg.Add(1) - h.txsCh = make(chan core.NewTxsEvent, txChanSize) - h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) + h.txsCh = make(chan core.NewTxHashesEvent, txChanSize) + h.txsSub = h.txpool.SubscribePropagationHashes(h.txsCh) go h.txBroadcastLoop() // broadcast block range @@ -455,7 +459,7 @@ func (h *handler) Stop() { // - To a square root of all peers for non-blob transactions // - And, separately, as announcements to all peers which are not known to // already have the given transaction. -func (h *handler) BroadcastTransactions(txs types.Transactions) { +func (h *handler) BroadcastTransactions(txs []common.Hash) { var ( blobTxs int // Number of blob transactions to announce only largeTxs int // Number of large transactions to announce only @@ -466,35 +470,35 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce - signer = types.LatestSigner(h.chain.Config()) choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey) peers = h.peers.all() ) for _, tx := range txs { - var directSet map[*ethPeer]struct{} - switch { - case tx.Type() == types.BlobTxType: - blobTxs++ - case tx.Size() > txMaxBroadcastSize: - largeTxs++ - default: - // Get transaction sender address. Here we can ignore any error - // since we're just interested in any value. - txSender, _ := types.Sender(signer, tx) - directSet = choice.choosePeers(peers, txSender) - } - - for _, peer := range peers { - if peer.KnownTransaction(tx.Hash()) { - continue + if meta := h.txpool.GetMetadata(tx); meta != nil { + var directSet map[*ethPeer]struct{} + switch { + case meta.Type == types.BlobTxType: + blobTxs++ + case meta.Size > txMaxBroadcastSize: + largeTxs++ + default: + // Get transaction sender address. Here we can ignore any error + // since we're just interested in any value. + directSet = choice.choosePeers(peers, meta.Sender) } - if _, ok := directSet[peer]; ok { - // Send direct. - txset[peer] = append(txset[peer], tx.Hash()) - } else { - // Send announcement. - annos[peer] = append(annos[peer], tx.Hash()) + + for _, peer := range peers { + if peer.KnownTransaction(tx) { + continue + } + if _, ok := directSet[peer]; ok { + // Send direct. + txset[peer] = append(txset[peer], tx) + } else { + // Send announcement. + annos[peer] = append(annos[peer], tx) + } } } } @@ -517,7 +521,7 @@ func (h *handler) txBroadcastLoop() { for { select { case event := <-h.txsCh: - h.BroadcastTransactions(event.Txs) + h.BroadcastTransactions(event.Hashes) case <-h.txsSub.Err(): return } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 0330713071..cfc2823ba2 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -245,8 +245,8 @@ func testRecvTransactions(t *testing.T, protocol uint) { handler.handler.synced.Store(true) // mark synced to accept transactions - txs := make(chan core.NewTxsEvent) - sub := handler.txpool.SubscribeTransactions(txs, false) + txs := make(chan core.NewTxHashesEvent) + sub := handler.txpool.SubscribePropagationHashes(txs) defer sub.Unsubscribe() // Create a source peer to send messages through and a sink handler to receive them @@ -275,10 +275,10 @@ func testRecvTransactions(t *testing.T, protocol uint) { } select { case event := <-txs: - if len(event.Txs) != 1 { - t.Errorf("wrong number of added transactions: got %d, want 1", len(event.Txs)) - } else if event.Txs[0].Hash() != tx.Hash() { - t.Errorf("added wrong tx hash: got %v, want %v", event.Txs[0].Hash(), tx.Hash()) + if len(event.Hashes) != 1 { + t.Errorf("wrong number of added transactions: got %d, want 1", len(event.Hashes)) + } else if event.Hashes[0] != tx.Hash() { + t.Errorf("added wrong tx hash: got %v, want %v", event.Hashes[0], tx.Hash()) } case <-time.After(2 * time.Second): t.Errorf("no NewTxsEvent received within 2 seconds") @@ -406,7 +406,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { for i := 0; i < len(sinks); i++ { txChs[i] = make(chan core.NewTxsEvent, 1024) - sub := sinks[i].txpool.SubscribeTransactions(txChs[i], false) + sub := sinks[i].txpool.SubscribeTransactions(txChs[i]) defer sub.Unsubscribe() } // Fill the source pool with transactions and wait for them at the sinks diff --git a/eth/handler_test.go b/eth/handler_test.go index 3470452980..22a8379899 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -56,8 +56,9 @@ var ( type testTxPool struct { pool map[common.Hash]*types.Transaction // Hash map of collected transactions - txFeed event.Feed // Notification feed to allow waiting for inclusion - lock sync.RWMutex // Protects the transaction pool + txFeed event.Feed // Notification feed to allow waiting for inclusion + hashFeed event.Feed + lock sync.RWMutex // Protects the transaction pool } // newTestTxPool creates a mock transaction pool. @@ -124,6 +125,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { p.pool[tx.Hash()] = tx } p.txFeed.Send(core.NewTxsEvent{Txs: txs}) + p.hashFeed.Send(core.NewTxHashesEventFromTxs(txs)) return make([]error, len(txs)) } @@ -159,10 +161,14 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]* // SubscribeTransactions should return an event subscription of NewTxsEvent and // send events to the given channel. -func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { +func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { return p.txFeed.Subscribe(ch) } +func (p *testTxPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription { + return p.hashFeed.Subscribe(ch) +} + // FilterType should check whether the pool supports the given type of transactions. func (p *testTxPool) FilterType(kind byte) bool { switch kind {