diff --git a/eth/filters/api.go b/eth/filters/api.go index 333a3bea3b..201e01dea7 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -45,6 +45,7 @@ type filter struct { typ Type deadline *time.Timer // filter is inactiv when deadline triggers hashes []common.Hash + txs []*types.Transaction crit FilterCriteria logs []*types.Log s *Subscription // associated subscription in event system @@ -102,7 +103,7 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) { } } -// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// NewPendingTransactionFilter creates a filter that fetches pending transactions // as transactions enter the pending state. // // It is part of the filter package because this filter can be used through the @@ -111,21 +112,21 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) { // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan []common.Hash) + pendingTxs = make(chan []*types.Transaction) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { for { select { - case ph := <-pendingTxs: + case pTx := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph...) + f.txs = append(f.txs, pTx...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -140,9 +141,10 @@ func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { return pendingTxSub.ID } -// NewPendingTransactions creates a subscription that is triggered each time a transaction -// enters the transaction pool and was signed from one of the transactions this nodes manages. -func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { +// NewPendingTransactions creates a subscription that is triggered each time a +// transaction enters the transaction pool. If fullTx is true the full tx is +// sent to the client, otherwise the hash is sent. +func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported @@ -151,16 +153,20 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscrip rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan []common.Hash, 128) - pendingTxSub := api.events.SubscribePendingTxs(txHashes) + txs := make(chan []*types.Transaction, 128) + pendingTxSub := api.events.SubscribePendingTxs(txs) for { select { - case hashes := <-txHashes: + case txs := <-txs: // To keep the original behaviour, send a single tx hash in one notification. // TODO(rjl493456442) Send a batch of tx hashes in one notification - for _, h := range hashes { - notifier.Notify(rpcSub.ID, h) + for _, tx := range txs { + if fullTx != nil && *fullTx { + notifier.Notify(rpcSub.ID, tx) + } else { + notifier.Notify(rpcSub.ID, tx.Hash()) + } } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() @@ -426,10 +432,14 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.deadline.Reset(api.timeout) switch f.typ { - case PendingTransactionsSubscription, BlocksSubscription: + case BlocksSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil + case PendingTransactionsSubscription: + txs := f.txs + f.txs = nil + return txs, nil case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index e1167d409d..fdc7d85410 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -157,8 +157,8 @@ const ( PendingLogsSubscription // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. MinedAndPendingLogsSubscription - // PendingTransactionsSubscription queries tx hashes for pending - // transactions entering the pending state + // PendingTransactionsSubscription queries for pending transactions entering + // the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription @@ -184,7 +184,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan []common.Hash + txs chan []*types.Transaction headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -278,7 +278,7 @@ func (sub *Subscription) Unsubscribe() { case sub.es.uninstall <- sub.f: break uninstallLoop case <-sub.f.logs: - case <-sub.f.hashes: + case <-sub.f.txs: case <-sub.f.headers: } } @@ -345,7 +345,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -362,7 +362,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -379,7 +379,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -395,7 +395,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan []common.Hash), + txs: make(chan []*types.Transaction), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -403,15 +403,15 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } -// SubscribePendingTxs creates a subscription that writes transaction hashes for +// SubscribePendingTxs creates a subscription that writes transactions for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: hashes, + txs: txs, headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -455,12 +455,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog } func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { - hashes := make([]common.Hash, 0, len(ev.Txs)) - for _, tx := range ev.Txs { - hashes = append(hashes, tx.Hash()) - } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes + f.txs <- ev.Txs } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 1b4457b00e..02365288b8 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -251,7 +251,7 @@ func TestPendingTxFilter(t *testing.T) { types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } - hashes []common.Hash + txs []*types.Transaction ) fid0 := api.NewPendingTransactionFilter() @@ -266,9 +266,9 @@ func TestPendingTxFilter(t *testing.T) { t.Fatalf("Unable to retrieve logs: %v", err) } - h := results.([]common.Hash) - hashes = append(hashes, h...) - if len(hashes) >= len(transactions) { + tx := results.([]*types.Transaction) + txs = append(txs, tx...) + if len(txs) >= len(transactions) { break } // check timeout @@ -279,13 +279,13 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(100 * time.Millisecond) } - if len(hashes) != len(transactions) { - t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + if len(txs) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs)) return } - for i := range hashes { - if hashes[i] != transactions[i].Hash() { - t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + for i := range txs { + if txs[i].Hash() != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash()) } } } @@ -850,11 +850,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) { fids[i] = fid // Wait for at least one tx to arrive in filter for { - hashes, err := api.GetFilterChanges(fid) + txs, err := api.GetFilterChanges(fid) if err != nil { t.Fatalf("Filter should exist: %v\n", err) } - if len(hashes.([]common.Hash)) > 0 { + if len(txs.([]*types.Transaction)) > 0 { break } runtime.Gosched() diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 05e8fed2da..84e3bb6e12 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -344,7 +344,7 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) // State Access -// NetworkID returns the network ID (also known as the chain ID) for this chain. +// NetworkID returns the network ID for this client. func (ec *Client) NetworkID(ctx context.Context) (*big.Int, error) { version := new(big.Int) var ver string diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index 0de01595d2..a3d4d64125 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -167,7 +167,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) { return &result, err } -// SubscribePendingTransactions subscribes to new pending transactions. +// SubscribeFullPendingTransactions subscribes to new pending transactions. +func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) +} + +// SubscribePendingTransactions subscribes to new pending transaction hashes. func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") }