eth/filters: add transactionReceipts subscription (#32697)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

- Introduce a new subscription kind `transactionReceipts` to allow clients to
  receive transaction receipts over WebSocket as soon as they are available.
- Accept optional `transactionHashes` filter to subscribe to receipts for specific
  transactions; an empty or omitted filter subscribes to all receipts.
- Preserve the same receipt format as returned by `eth_getTransactionReceipt`.
- Avoid additional HTTP polling, reducing RPC load and latency.

---------

Co-authored-by: Sina Mahmoodi <itz.s1na@gmail.com>
This commit is contained in:
10gic 2025-10-09 20:14:53 +08:00 committed by GitHub
parent a1b8e4880d
commit 11208553dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 341 additions and 9 deletions

View file

@ -1690,7 +1690,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
// Set new head.
bc.writeHeadBlock(block)
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
bc.chainFeed.Send(ChainEvent{
Header: block.Header(),
Receipts: receipts,
Transactions: block.Transactions(),
})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
@ -2342,6 +2347,13 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
// collectLogs collects the logs that were generated or removed during the
// processing of a block. These logs are later announced as deleted or reborn.
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
_, logs := bc.collectReceiptsAndLogs(b, removed)
return logs
}
// collectReceiptsAndLogs retrieves receipts from the database and returns both receipts and logs.
// This avoids duplicate database reads when both are needed.
func (bc *BlockChain) collectReceiptsAndLogs(b *types.Block, removed bool) ([]*types.Receipt, []*types.Log) {
var blobGasPrice *big.Int
if b.ExcessBlobGas() != nil {
blobGasPrice = eip4844.CalcBlobFee(bc.chainConfig, b.Header())
@ -2359,7 +2371,7 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
logs = append(logs, log)
}
}
return logs
return receipts, logs
}
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
@ -2588,8 +2600,14 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
bc.writeHeadBlock(head)
// Emit events
logs := bc.collectLogs(head, false)
bc.chainFeed.Send(ChainEvent{Header: head.Header()})
receipts, logs := bc.collectReceiptsAndLogs(head, false)
bc.chainFeed.Send(ChainEvent{
Header: head.Header(),
Receipts: receipts,
Transactions: head.Transactions(),
})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}

View file

@ -27,7 +27,9 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
type RemovedLogsEvent struct{ Logs []*types.Log }
type ChainEvent struct {
Header *types.Header
Header *types.Header
Receipts []*types.Receipt
Transactions []*types.Transaction
}
type ChainHeadEvent struct {

View file

@ -43,6 +43,7 @@ var (
errPendingLogsUnsupported = errors.New("pending logs are not supported")
errExceedMaxTopics = errors.New("exceed max topics")
errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position")
errExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription")
)
const (
@ -50,6 +51,8 @@ const (
maxTopics = 4
// The maximum number of allowed topics within a topic criteria
maxSubTopics = 1000
// The maximum number of transaction hash criteria allowed in a single subscription
maxTxHashes = 200
)
// filter is a helper struct that holds meta information over the filter type
@ -296,6 +299,71 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
return rpcSub, nil
}
// TransactionReceiptsFilter defines criteria for transaction receipts subscription.
// If TransactionHashes is nil or empty, receipts for all transactions included in new blocks will be delivered.
// Otherwise, only receipts for the specified transactions will be delivered.
type TransactionReceiptsFilter struct {
TransactionHashes []common.Hash `json:"transactionHashes,omitempty"`
}
// TransactionReceipts creates a subscription that fires transaction receipts when transactions are included in blocks.
func (api *FilterAPI) TransactionReceipts(ctx context.Context, filter *TransactionReceiptsFilter) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
// Validate transaction hashes limit
if filter != nil && len(filter.TransactionHashes) > maxTxHashes {
return nil, errExceedMaxTxHashes
}
var (
rpcSub = notifier.CreateSubscription()
matchedReceipts = make(chan []*ReceiptWithTx)
txHashes []common.Hash
)
if filter != nil {
txHashes = filter.TransactionHashes
}
receiptsSub := api.events.SubscribeTransactionReceipts(txHashes, matchedReceipts)
go func() {
defer receiptsSub.Unsubscribe()
signer := types.LatestSigner(api.sys.backend.ChainConfig())
for {
select {
case receiptsWithTxs := <-matchedReceipts:
if len(receiptsWithTxs) > 0 {
// Convert to the same format as eth_getTransactionReceipt
marshaledReceipts := make([]map[string]interface{}, len(receiptsWithTxs))
for i, receiptWithTx := range receiptsWithTxs {
marshaledReceipts[i] = ethapi.MarshalReceipt(
receiptWithTx.Receipt,
receiptWithTx.Receipt.BlockHash,
receiptWithTx.Receipt.BlockNumber.Uint64(),
signer,
receiptWithTx.Transaction,
int(receiptWithTx.Receipt.TransactionIndex),
)
}
// Send a batch of tx receipts in one notification
notifier.Notify(rpcSub.ID, marshaledReceipts)
}
case <-rpcSub.Err():
return
}
}
}()
return rpcSub, nil
}
// FilterCriteria represents a request to create a new filter.
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
type FilterCriteria ethereum.FilterQuery

View file

@ -25,6 +25,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/types"
@ -551,3 +552,70 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
}
return true
}
// ReceiptWithTx contains a receipt and its corresponding transaction
type ReceiptWithTx struct {
Receipt *types.Receipt
Transaction *types.Transaction
}
// filterReceipts returns the receipts matching the given criteria
// In addition to returning receipts, it also returns the corresponding transactions.
// This is because receipts only contain low-level data, while user-facing data
// may require additional information from the Transaction.
func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx {
var ret []*ReceiptWithTx
receipts := ev.Receipts
txs := ev.Transactions
if len(receipts) != len(txs) {
log.Warn("Receipts and transactions length mismatch", "receipts", len(receipts), "transactions", len(txs))
return ret
}
if len(txHashes) == 0 {
// No filter, send all receipts with their transactions.
ret = make([]*ReceiptWithTx, len(receipts))
for i, receipt := range receipts {
ret[i] = &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
}
}
} else if len(txHashes) == 1 {
// Filter by single transaction hash.
// This is a common case, so we distinguish it from filtering by multiple tx hashes and made a small optimization.
for i, receipt := range receipts {
if receipt.TxHash == txHashes[0] {
ret = append(ret, &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
})
break
}
}
} else {
// Filter by multiple transaction hashes.
txHashMap := make(map[common.Hash]bool, len(txHashes))
for _, hash := range txHashes {
txHashMap[hash] = true
}
for i, receipt := range receipts {
if txHashMap[receipt.TxHash] {
ret = append(ret, &ReceiptWithTx{
Receipt: receipt,
Transaction: txs[i],
})
// Early exit if all receipts are found
if len(ret) == len(txHashes) {
break
}
}
}
}
return ret
}

View file

@ -158,6 +158,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// TransactionReceiptsSubscription queries for transaction receipts when transactions are included in blocks
TransactionReceiptsSubscription
// LastIndexSubscription keeps track of the last index
LastIndexSubscription
)
@ -182,6 +184,8 @@ type subscription struct {
logs chan []*types.Log
txs chan []*types.Transaction
headers chan *types.Header
receipts chan []*ReceiptWithTx
txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
@ -268,6 +272,7 @@ func (sub *Subscription) Unsubscribe() {
case <-sub.f.logs:
case <-sub.f.txs:
case <-sub.f.headers:
case <-sub.f.receipts:
}
}
@ -353,6 +358,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logs: logs,
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
@ -369,6 +375,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
headers: headers,
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
@ -385,6 +392,26 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc
logs: make(chan []*types.Log),
txs: txs,
headers: make(chan *types.Header),
receipts: make(chan []*ReceiptWithTx),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeTransactionReceipts creates a subscription that writes transaction receipts for
// transactions when they are included in blocks. If txHashes is provided, only receipts
// for those specific transaction hashes will be delivered.
func (es *EventSystem) SubscribeTransactionReceipts(txHashes []common.Hash, receipts chan []*ReceiptWithTx) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: TransactionReceiptsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
receipts: receipts,
txHashes: txHashes,
installed: make(chan struct{}),
err: make(chan error),
}
@ -415,6 +442,14 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Header
}
// Handle transaction receipts subscriptions when a new block is added
for _, f := range filters[TransactionReceiptsSubscription] {
matchedReceipts := filterReceipts(f.txHashes, ev)
if len(matchedReceipts) > 0 {
f.receipts <- matchedReceipts
}
}
}
// eventLoop (un)installs filters and processes mux events.

View file

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
@ -781,3 +782,143 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
}
}
}
// TestTransactionReceiptsSubscription tests the transaction receipts subscription functionality
func TestTransactionReceiptsSubscription(t *testing.T) {
t.Parallel()
const txNum = 5
// Setup test environment
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{})
api = NewFilterAPI(sys)
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
signer = types.NewLondonSigner(big.NewInt(1))
genesis = &core.Genesis{
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000000000000000)}}, // 1 ETH
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
_, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 1, func(i int, gen *core.BlockGen) {
// Add transactions to the block
for j := 0; j < txNum; j++ {
toAddr := common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268")
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: uint64(j),
GasPrice: gen.BaseFee(),
Gas: 21000,
To: &toAddr,
Value: big.NewInt(1000),
Data: nil,
}), signer, key1)
gen.AddTx(tx)
}
})
)
// Insert the blocks into the chain
blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil)
if err != nil {
t.Fatalf("failed to create tester chain: %v", err)
}
if n, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
}
// Prepare test data
receipts := blockchain.GetReceiptsByHash(chain[0].Hash())
if receipts == nil {
t.Fatalf("failed to get receipts")
}
chainEvent := core.ChainEvent{
Header: chain[0].Header(),
Receipts: receipts,
Transactions: chain[0].Transactions(),
}
txHashes := make([]common.Hash, txNum)
for i := 0; i < txNum; i++ {
txHashes[i] = chain[0].Transactions()[i].Hash()
}
testCases := []struct {
name string
filterTxHashes []common.Hash
expectedReceiptTxHashes []common.Hash
expectError bool
}{
{
name: "no filter - should return all receipts",
filterTxHashes: nil,
expectedReceiptTxHashes: txHashes,
expectError: false,
},
{
name: "single tx hash filter",
filterTxHashes: []common.Hash{txHashes[0]},
expectedReceiptTxHashes: []common.Hash{txHashes[0]},
expectError: false,
},
{
name: "multiple tx hashes filter",
filterTxHashes: []common.Hash{txHashes[0], txHashes[1], txHashes[2]},
expectedReceiptTxHashes: []common.Hash{txHashes[0], txHashes[1], txHashes[2]},
expectError: false,
},
}
// Run test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
receiptsChan := make(chan []*ReceiptWithTx)
sub := api.events.SubscribeTransactionReceipts(tc.filterTxHashes, receiptsChan)
// Send chain event
backend.chainFeed.Send(chainEvent)
// Wait for receipts
timeout := time.After(1 * time.Second)
var receivedReceipts []*types.Receipt
for {
select {
case receiptsWithTx := <-receiptsChan:
for _, receiptWithTx := range receiptsWithTx {
receivedReceipts = append(receivedReceipts, receiptWithTx.Receipt)
}
case <-timeout:
t.Fatalf("timeout waiting for receipts")
}
if len(receivedReceipts) >= len(tc.expectedReceiptTxHashes) {
break
}
}
// Verify receipt count
if len(receivedReceipts) != len(tc.expectedReceiptTxHashes) {
t.Errorf("Expected %d receipts, got %d", len(tc.expectedReceiptTxHashes), len(receivedReceipts))
}
// Verify specific transaction hashes are present
if tc.expectedReceiptTxHashes != nil {
receivedHashes := make(map[common.Hash]bool)
for _, receipt := range receivedReceipts {
receivedHashes[receipt.TxHash] = true
}
for _, expectedHash := range tc.expectedReceiptTxHashes {
if !receivedHashes[expectedHash] {
t.Errorf("Expected receipt for tx %x not found", expectedHash)
}
}
}
// Cleanup
sub.Unsubscribe()
<-sub.Err()
})
}
}

View file

@ -627,7 +627,7 @@ func (api *BlockChainAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rp
result := make([]map[string]interface{}, len(receipts))
for i, receipt := range receipts {
result[i] = marshalReceipt(receipt, block.Hash(), block.NumberU64(), signer, txs[i], i)
result[i] = MarshalReceipt(receipt, block.Hash(), block.NumberU64(), signer, txs[i], i)
}
return result, nil
}
@ -1488,11 +1488,11 @@ func (api *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash commo
return nil, err
}
// Derive the sender.
return marshalReceipt(receipt, blockHash, blockNumber, api.signer, tx, int(index)), nil
return MarshalReceipt(receipt, blockHash, blockNumber, api.signer, tx, int(index)), nil
}
// marshalReceipt marshals a transaction receipt into a JSON object.
func marshalReceipt(receipt *types.Receipt, blockHash common.Hash, blockNumber uint64, signer types.Signer, tx *types.Transaction, txIndex int) map[string]interface{} {
// MarshalReceipt marshals a transaction receipt into a JSON object.
func MarshalReceipt(receipt *types.Receipt, blockHash common.Hash, blockNumber uint64, signer types.Signer, tx *types.Transaction, txIndex int) map[string]interface{} {
from, _ := types.Sender(signer, tx)
fields := map[string]interface{}{