add new subscribe function

This commit is contained in:
healthykim 2026-02-08 12:21:21 -05:00
parent 777265620d
commit 0943eb826f
11 changed files with 130 additions and 54 deletions

View file

@ -26,6 +26,16 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool. // NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction } type NewTxsEvent struct{ Txs []*types.Transaction }
type NewTxHashesEvent struct{ Hashes []common.Hash }
func NewTxHashesEventFromTxs(txs []*types.Transaction) NewTxHashesEvent {
hashes := make([]common.Hash, 0, len(txs))
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}
return NewTxHashesEvent{Hashes: hashes}
}
// RemovedLogsEvent is posted when a reorg happens // RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log } type RemovedLogsEvent struct{ Logs []*types.Log }

View file

@ -133,12 +133,14 @@ type blobTxMeta struct {
evictionExecTip *uint256.Int // Worst gas tip across all previous nonces evictionExecTip *uint256.Int // Worst gas tip across all previous nonces
evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
sender common.Address // Sender of the transaction
} }
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
// and assembles a helper struct to track in memory. // and assembles a helper struct to track in memory.
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). // Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction, sender common.Address) *blobTxMeta {
if tx.BlobTxSidecar() == nil { if tx.BlobTxSidecar() == nil {
// This should never happen, as the pool only admits blob transactions with a sidecar // This should never happen, as the pool only admits blob transactions with a sidecar
panic("missing blob tx sidecar") panic("missing blob tx sidecar")
@ -157,6 +159,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(), execGas: tx.Gas(),
blobGas: tx.BlobGas(), blobGas: tx.BlobGas(),
sender: sender,
} }
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
@ -531,8 +534,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
return errors.New("missing blob sidecar") return errors.New("missing blob sidecar")
} }
meta := newBlobTxMeta(id, tx.Size(), size, tx) if p.lookup.exists(tx.Hash()) {
if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not // This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get // removed via the normal shutdown-startup procedure and thus may get
// partially resurrected. // partially resurrected.
@ -547,6 +549,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err) log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
return err return err
} }
meta := newBlobTxMeta(id, tx.Size(), size, tx, sender)
if _, ok := p.index[sender]; !ok { if _, ok := p.index[sender]; !ok {
if err := p.reserver.Hold(sender); err != nil { if err := p.reserver.Hold(sender); err != nil {
return err return err
@ -1070,8 +1073,13 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
return err return err
} }
sender, err := types.Sender(p.signer, tx)
if err != nil {
return err
}
// Update the indices and metrics // Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx, sender)
if _, ok := p.index[addr]; !ok { if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil { if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
@ -1360,9 +1368,15 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
if !ok { if !ok {
return nil return nil
} }
sender, ok := p.lookup.senderOfTx(hash)
if !ok {
return nil
}
return &txpool.TxMetadata{ return &txpool.TxMetadata{
Type: types.BlobTxType, Type: types.BlobTxType,
Size: size, Size: size,
Sender: sender,
} }
} }
@ -1582,7 +1596,13 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
if err != nil { if err != nil {
return err return err
} }
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
sender, err := types.Sender(p.signer, tx)
if err != nil {
return err
}
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx, sender)
var ( var (
next = p.state.GetNonce(from) next = p.state.GetNonce(from)
@ -1679,7 +1699,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
addValidMeter.Mark(1) addValidMeter.Mark(1)
// Notify all listeners of the new arrival // Notify all listeners of the new arrival
p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) p.discoverFeed.Send(core.NewTxHashesEventFromTxs([]*types.Transaction{tx.WithoutBlobTxSidecar()}))
p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}})
//check the gapped queue for this account and try to promote //check the gapped queue for this account and try to promote
@ -1939,12 +1959,13 @@ func (p *BlobPool) updateLimboMetrics() {
// SubscribeTransactions registers a subscription for new transaction events, // SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions. // supporting feeding only newly seen or also resurrected transactions.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
if reorgs { return p.insertFeed.Subscribe(ch)
return p.insertFeed.Subscribe(ch) }
} else {
return p.discoverFeed.Subscribe(ch) // TODO: comment
} func (p *BlobPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
return p.discoverFeed.Subscribe(ch)
} }
// Nonce returns the next nonce of an account, with all transactions executable // Nonce returns the next nonce of an account, with all transactions executable

View file

@ -21,8 +21,9 @@ import (
) )
type txMetadata struct { type txMetadata struct {
id uint64 // the billy id of transction id uint64 // the billy id of transction
size uint64 // the RLP encoded size of transaction (blobs are included) size uint64 // the RLP encoded size of transaction (blobs are included)
sender common.Address
} }
// lookup maps blob versioned hashes to transaction hashes that include them, // lookup maps blob versioned hashes to transaction hashes that include them,
@ -79,6 +80,16 @@ func (l *lookup) sizeOfTx(txhash common.Hash) (uint64, bool) {
return meta.size, true return meta.size, true
} }
// senderOfTx returns sender of transaction
func (l *lookup) senderOfTx(txhash common.Hash) (common.Address, bool) {
meta, ok := l.txIndex[txhash]
if !ok {
return common.Address{}, false
}
return meta.sender, true
}
// track inserts a new set of mappings from blob versioned hashes to transaction // track inserts a new set of mappings from blob versioned hashes to transaction
// hashes; and from transaction hashes to datastore storage item ids. // hashes; and from transaction hashes to datastore storage item ids.
func (l *lookup) track(tx *blobTxMeta) { func (l *lookup) track(tx *blobTxMeta) {
@ -91,8 +102,9 @@ func (l *lookup) track(tx *blobTxMeta) {
} }
// Map the transaction hash to the datastore id and RLP-encoded transaction size // Map the transaction hash to the datastore id and RLP-encoded transaction size
l.txIndex[tx.hash] = &txMetadata{ l.txIndex[tx.hash] = &txMetadata{
id: tx.id, id: tx.id,
size: tx.size, size: tx.size,
sender: tx.sender,
} }
} }

View file

@ -232,6 +232,7 @@ type LegacyPool struct {
chain BlockChain chain BlockChain
gasTip atomic.Pointer[uint256.Int] gasTip atomic.Pointer[uint256.Int]
txFeed event.Feed txFeed event.Feed
hashFeed event.Feed
signer types.Signer signer types.Signer
mu sync.RWMutex mu sync.RWMutex
@ -400,9 +401,10 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait <-wait
} }
// TODO: comment
// SubscribeTransactions registers a subscription for new transaction events, // SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions. // supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
// The legacy pool has a very messed up internal shuffling, so it's kind of // The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transaction from resurrected ones. This // hard to separate newly discovered transaction from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and // is because the new txs are added to the queue, resurrected ones too and
@ -410,6 +412,10 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
return pool.txFeed.Subscribe(ch) return pool.txFeed.Subscribe(ch)
} }
func (pool *LegacyPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
return pool.hashFeed.Subscribe(ch)
}
// SetGasTip updates the minimum gas tip required by the transaction pool for a // SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold. // new transaction, and drops all transactions below this threshold.
func (pool *LegacyPool) SetGasTip(tip *big.Int) { func (pool *LegacyPool) SetGasTip(tip *big.Int) {
@ -1029,9 +1035,11 @@ func (pool *LegacyPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
if tx == nil { if tx == nil {
return nil return nil
} }
sender, _ := types.Sender(pool.signer, tx)
return &txpool.TxMetadata{ return &txpool.TxMetadata{
Type: tx.Type(), Type: tx.Type(),
Size: tx.Size(), Size: tx.Size(),
Sender: sender,
} }
} }
@ -1294,6 +1302,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
txs = append(txs, set.Flatten()...) txs = append(txs, set.Flatten()...)
} }
pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
pool.hashFeed.Send(core.NewTxHashesEventFromTxs(txs))
} }
} }

View file

@ -86,8 +86,11 @@ type PendingFilter struct {
// TxMetadata denotes the metadata of a transaction. // TxMetadata denotes the metadata of a transaction.
type TxMetadata struct { type TxMetadata struct {
Type uint8 // The type of the transaction Type uint8 // The type of the transaction
Size uint64 // The length of the 'rlp encoding' of a transaction Size uint64 // The length of the 'rlp encoding' of a transaction
Sender common.Address // The sender of the transaction.
//TODO: This is only for the broadcast decision. can we use something else for this ?
//TODO: Legacypool would have to resolve this every time GetMetadata() is called.
} }
// SubPool represents a specialized transaction pool that lives on its own (e.g. // SubPool represents a specialized transaction pool that lives on its own (e.g.
@ -157,9 +160,13 @@ type SubPool interface {
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction Pending(filter PendingFilter) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events. The subscriber // SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions // would receive only newly seen transactions.
// or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
// SubscribePropagationHashes subscribes to transaction hashes events, especially
// the ones to be propagated. This includes newly seen transactions and reorged
// transactions.
SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription
// Nonce returns the next nonce of an account, with all transactions executable // Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top. // by the pool already applied on top.

View file

@ -371,10 +371,18 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac
// SubscribeTransactions registers a subscription for new transaction events, // SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions. // supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
subs := make([]event.Subscription, len(p.subpools)) subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools { for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch, reorgs) subs[i] = subpool.SubscribeTransactions(ch)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}
func (p *TxPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribePropagationHashes(ch)
} }
return p.subs.Track(event.JoinSubscriptions(subs...)) return p.subs.Track(event.JoinSubscriptions(subs...))
} }

View file

@ -405,7 +405,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
} }
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeTransactions(ch, true) return b.eth.txPool.SubscribeTransactions(ch)
} }
func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress { func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress {

View file

@ -48,7 +48,7 @@ func (a *simulatedBeaconAPI) loop() {
var ( var (
newTxs = make(chan core.NewTxsEvent) newTxs = make(chan core.NewTxsEvent)
newWxs = make(chan newWithdrawalsEvent) newWxs = make(chan newWithdrawalsEvent)
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs)
newWxsSub = a.sim.withdrawals.subscribe(newWxs) newWxsSub = a.sim.withdrawals.subscribe(newWxs)
doCommit = make(chan struct{}, 1) doCommit = make(chan struct{}, 1)
) )

View file

@ -91,7 +91,11 @@ type txPool interface {
// SubscribeTransactions subscribes to new transaction events. The subscriber // SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions // can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones. // or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription // TODO: comment
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// TODO: comment
SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription
// FilterType returns whether the given tx type is supported by the txPool. // FilterType returns whether the given tx type is supported by the txPool.
FilterType(kind byte) bool FilterType(kind byte) bool
@ -127,7 +131,7 @@ type handler struct {
txBroadcastKey [16]byte txBroadcastKey [16]byte
eventMux *event.TypeMux eventMux *event.TypeMux
txsCh chan core.NewTxsEvent txsCh chan core.NewTxHashesEvent
txsSub event.Subscription txsSub event.Subscription
blockRange *blockRangeState blockRange *blockRangeState
@ -416,8 +420,8 @@ func (h *handler) Start(maxPeers int) {
// broadcast and announce transactions (only new ones, not resurrected ones) // broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1) h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize) h.txsCh = make(chan core.NewTxHashesEvent, txChanSize)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) h.txsSub = h.txpool.SubscribePropagationHashes(h.txsCh)
go h.txBroadcastLoop() go h.txBroadcastLoop()
// broadcast block range // broadcast block range
@ -457,7 +461,7 @@ func (h *handler) Stop() {
// - To a square root of all peers for non-blob transactions // - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to // - And, separately, as announcements to all peers which are not known to
// already have the given transaction. // already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) { func (h *handler) BroadcastTransactions(txs []common.Hash) {
var ( var (
blobTxs int // Number of blob transactions to announce only blobTxs int // Number of blob transactions to announce only
largeTxs int // Number of large transactions to announce only largeTxs int // Number of large transactions to announce only
@ -468,35 +472,34 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
signer = types.LatestSigner(h.chain.Config())
choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey) choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey)
peers = h.peers.all() peers = h.peers.all()
) )
for _, tx := range txs { for _, tx := range txs {
var directSet map[*ethPeer]struct{} var directSet map[*ethPeer]struct{}
meta := h.txpool.GetMetadata(tx)
switch { switch {
case tx.Type() == types.BlobTxType: case meta.Type == types.BlobTxType:
blobTxs++ blobTxs++
case tx.Size() > txMaxBroadcastSize: case meta.Size > txMaxBroadcastSize:
largeTxs++ largeTxs++
default: default:
// Get transaction sender address. Here we can ignore any error // Get transaction sender address. Here we can ignore any error
// since we're just interested in any value. // since we're just interested in any value.
txSender, _ := types.Sender(signer, tx) directSet = choice.choosePeers(peers, meta.Sender)
directSet = choice.choosePeers(peers, txSender)
} }
for _, peer := range peers { for _, peer := range peers {
if peer.KnownTransaction(tx.Hash()) { if peer.KnownTransaction(tx) {
continue continue
} }
if _, ok := directSet[peer]; ok { if _, ok := directSet[peer]; ok {
// Send direct. // Send direct.
txset[peer] = append(txset[peer], tx.Hash()) txset[peer] = append(txset[peer], tx)
} else { } else {
// Send announcement. // Send announcement.
annos[peer] = append(annos[peer], tx.Hash()) annos[peer] = append(annos[peer], tx)
} }
} }
} }
@ -519,7 +522,7 @@ func (h *handler) txBroadcastLoop() {
for { for {
select { select {
case event := <-h.txsCh: case event := <-h.txsCh:
h.BroadcastTransactions(event.Txs) h.BroadcastTransactions(event.Hashes)
case <-h.txsSub.Err(): case <-h.txsSub.Err():
return return
} }

View file

@ -237,8 +237,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
handler.handler.synced.Store(true) // mark synced to accept transactions handler.handler.synced.Store(true) // mark synced to accept transactions
txs := make(chan core.NewTxsEvent) txs := make(chan core.NewTxHashesEvent)
sub := handler.txpool.SubscribeTransactions(txs, false) sub := handler.txpool.SubscribePropagationHashes(txs)
defer sub.Unsubscribe() defer sub.Unsubscribe()
// Create a source peer to send messages through and a sink handler to receive them // Create a source peer to send messages through and a sink handler to receive them
@ -267,10 +267,10 @@ func testRecvTransactions(t *testing.T, protocol uint) {
} }
select { select {
case event := <-txs: case event := <-txs:
if len(event.Txs) != 1 { if len(event.Hashes) != 1 {
t.Errorf("wrong number of added transactions: got %d, want 1", len(event.Txs)) t.Errorf("wrong number of added transactions: got %d, want 1", len(event.Hashes))
} else if event.Txs[0].Hash() != tx.Hash() { } else if event.Hashes[0] != tx.Hash() {
t.Errorf("added wrong tx hash: got %v, want %v", event.Txs[0].Hash(), tx.Hash()) t.Errorf("added wrong tx hash: got %v, want %v", event.Hashes[0], tx.Hash())
} }
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Errorf("no NewTxsEvent received within 2 seconds") t.Errorf("no NewTxsEvent received within 2 seconds")
@ -398,7 +398,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
for i := 0; i < len(sinks); i++ { for i := 0; i < len(sinks); i++ {
txChs[i] = make(chan core.NewTxsEvent, 1024) txChs[i] = make(chan core.NewTxsEvent, 1024)
sub := sinks[i].txpool.SubscribeTransactions(txChs[i], false) sub := sinks[i].txpool.SubscribeTransactions(txChs[i])
defer sub.Unsubscribe() defer sub.Unsubscribe()
} }
// Fill the source pool with transactions and wait for them at the sinks // Fill the source pool with transactions and wait for them at the sinks

View file

@ -56,8 +56,9 @@ var (
type testTxPool struct { type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions pool map[common.Hash]*types.Transaction // Hash map of collected transactions
txFeed event.Feed // Notification feed to allow waiting for inclusion txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool hashFeed event.Feed
lock sync.RWMutex // Protects the transaction pool
} }
// newTestTxPool creates a mock transaction pool. // newTestTxPool creates a mock transaction pool.
@ -124,6 +125,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
p.pool[tx.Hash()] = tx p.pool[tx.Hash()] = tx
} }
p.txFeed.Send(core.NewTxsEvent{Txs: txs}) p.txFeed.Send(core.NewTxsEvent{Txs: txs})
p.hashFeed.Send(core.NewTxHashesEventFromTxs(txs))
return make([]error, len(txs)) return make([]error, len(txs))
} }
@ -159,10 +161,14 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*
// SubscribeTransactions should return an event subscription of NewTxsEvent and // SubscribeTransactions should return an event subscription of NewTxsEvent and
// send events to the given channel. // send events to the given channel.
func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.txFeed.Subscribe(ch) return p.txFeed.Subscribe(ch)
} }
func (p *testTxPool) SubscribePropagationHashes(ch chan<- core.NewTxHashesEvent) event.Subscription {
return p.hashFeed.Subscribe(ch)
}
// FilterType should check whether the pool supports the given type of transactions. // FilterType should check whether the pool supports the given type of transactions.
func (p *testTxPool) FilterType(kind byte) bool { func (p *testTxPool) FilterType(kind byte) bool {
switch kind { switch kind {