miner: add OpenTelemetry spans for block building path (#33773)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

Instruments the block building path with OpenTelemetry tracing spans.

- added spans in forkchoiceUpdated -> buildPayload -> background payload
loop -> generateWork iterations. Spans should look something like this:

```
jsonrpc.engine/forkchoiceUpdatedV3
|- rpc.runMethod
|  |- engine.forkchoiceUpdated
|     |- miner.buildPayload [payload.id, parent.hash, timestamp]
|        |- miner.generateWork [txs.count, gas.used, fees] (empty block)
|        |  |- miner.prepareWork
|        |  |- miner.FinalizeAndAssemble
|        |     |- consensus.beacon.FinalizeAndAssemble [block.number, txs.count, withdrawals.count]
|        |        |- consensus.beacon.Finalize
|        |        |- consensus.beacon.IntermediateRoot
|        |        |- consensus.beacon.NewBlock
|        |- miner.background [block.number, iterations.total, exit.reason, empty.delivered]
|           |- miner.buildIteration [iteration, update.accepted]
|           |  |- miner.generateWork [txs.count, gas.used, fees]
|           |     |- miner.prepareWork
|           |     |- miner.fillTransactions [pending.plain.count, pending.blob.count]
|           |     |  |- miner.commitTransactions.priority (if prio txs exist)
|           |     |  |  |- miner.commitTransactions
|           |     |  |     |- miner.commitTransaction (per tx)
|           |     |  |- miner.commitTransactions.normal (if normal txs exist)
|           |     |     |- miner.commitTransactions
|           |     |        |- miner.commitTransaction (per tx)
|           |     |- miner.FinalizeAndAssemble
|           |        |- consensus.beacon.FinalizeAndAssemble [block.number, txs.count, withdrawals.count]
|           |           |- consensus.beacon.Finalize
|           |           |- consensus.beacon.IntermediateRoot
|           |           |- consensus.beacon.NewBlock
|           |- miner.buildIteration [iteration, update.accepted]
|           |  |- ...
|           |- ...

```

- added simulated server spans in SimulatedBeacon.sealBlock so dev mode
(geth --dev) produces traces that mirror production Engine API calls
from a real consensus client.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Jonny Rhea 2026-03-16 13:24:41 -05:00 committed by GitHub
parent a7d09cc14f
commit 98b13f342f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 245 additions and 106 deletions

View file

@ -17,6 +17,7 @@
package beacon
import (
"context"
"errors"
"fmt"
"math/big"
@ -29,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/internal/telemetry"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
@ -351,9 +353,17 @@ func (beacon *Beacon) Finalize(chain consensus.ChainHeaderReader, header *types.
// FinalizeAndAssemble implements consensus.Engine, setting the final state and
// assembling the block.
func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
func (beacon *Beacon) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (result *types.Block, err error) {
ctx, _, spanEnd := telemetry.StartSpan(ctx, "consensus.beacon.FinalizeAndAssemble",
telemetry.Int64Attribute("block.number", int64(header.Number.Uint64())),
telemetry.Int64Attribute("txs.count", int64(len(body.Transactions))),
telemetry.Int64Attribute("withdrawals.count", int64(len(body.Withdrawals))),
)
defer spanEnd(&err)
if !beacon.IsPoSHeader(header) {
return beacon.ethone.FinalizeAndAssemble(chain, header, state, body, receipts)
block, delegateErr := beacon.ethone.FinalizeAndAssemble(ctx, chain, header, state, body, receipts)
return block, delegateErr
}
shanghai := chain.Config().IsShanghai(header.Number, header.Time)
if shanghai {
@ -367,13 +377,20 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
}
}
// Finalize and assemble the block.
_, _, finalizeSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.Finalize")
beacon.Finalize(chain, header, state, body)
finalizeSpanEnd(nil)
// Assign the final state root to header.
_, _, rootSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.IntermediateRoot")
header.Root = state.IntermediateRoot(true)
rootSpanEnd(nil)
// Assemble the final block.
return types.NewBlock(header, body, receipts, trie.NewStackTrie(nil)), nil
_, _, blockSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.NewBlock")
block := types.NewBlock(header, body, receipts, trie.NewStackTrie(nil))
blockSpanEnd(nil)
return block, nil
}
// Seal generates a new sealing request for the given input block and pushes

View file

@ -19,6 +19,7 @@ package clique
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@ -581,7 +582,7 @@ func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Heade
// FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set,
// nor block rewards given, and returns the final block.
func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
func (c *Clique) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
if len(body.Withdrawals) > 0 {
return nil, errors.New("clique does not support withdrawals")
}

View file

@ -18,6 +18,7 @@
package consensus
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -92,7 +93,7 @@ type Engine interface {
//
// Note: The block header and state database might be updated to reflect any
// consensus rules that happen at finalization (e.g. block rewards).
FinalizeAndAssemble(chain ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error)
FinalizeAndAssemble(ctx context.Context, chain ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error)
// Seal generates a new sealing request for the given input block and pushes
// the result into the given channel.

View file

@ -17,6 +17,7 @@
package ethash
import (
"context"
"errors"
"fmt"
"math/big"
@ -513,7 +514,7 @@ func (ethash *Ethash) Finalize(chain consensus.ChainHeaderReader, header *types.
// FinalizeAndAssemble implements consensus.Engine, accumulating the block and
// uncle rewards, setting the final state and assembling the block.
func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
func (ethash *Ethash) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) {
if len(body.Withdrawals) > 0 {
return nil, errors.New("ethash does not support withdrawals")
}

View file

@ -17,6 +17,7 @@
package core
import (
"context"
"fmt"
"math/big"
@ -411,7 +412,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
}
body := types.Body{Transactions: b.txs, Uncles: b.uncles, Withdrawals: b.withdrawals}
block, err := b.engine.FinalizeAndAssemble(cm, b.header, statedb, &body, b.receipts)
block, err := b.engine.FinalizeAndAssemble(context.Background(), cm, b.header, statedb, &body, b.receipts)
if err != nil {
panic(err)
}

View file

@ -1865,11 +1865,11 @@ func (p *BlobPool) drop() {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (p *BlobPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
// If only plain transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if !filter.BlobTxs {
return nil
return nil, 0
}
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
@ -1885,6 +1885,7 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
pendtimeHist.Update(time.Since(execStart).Nanoseconds())
}()
var count int
pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
for addr, txs := range p.index {
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
@ -1930,9 +1931,10 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
}
if len(lazies) > 0 {
pending[addr] = lazies
count += len(lazies)
}
}
return pending
return pending, count
}
// updateStorageMetrics retrieves a bunch of stats from the data store and pushes

View file

@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p := pool.Pending(txpool.PendingFilter{
p, _ := pool.Pending(txpool.PendingFilter{
MinTip: uint256.NewInt(1),
BaseFee: chain.basefee,
BlobFee: chain.blobfee,

View file

@ -494,15 +494,16 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.BlobTxs {
return nil
return nil, 0
}
pool.mu.Lock()
defer pool.mu.Unlock()
var count int
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
@ -539,9 +540,10 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
}
}
pending[addr] = lazies
count += len(lazies)
}
}
return pending
return pending, count
}
// ValidateTxBasics checks whether a transaction is valid according to the consensus

View file

@ -154,7 +154,7 @@ type SubPool interface {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction
Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int)
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions

View file

@ -359,14 +359,17 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
func (p *TxPool) Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) {
var count int
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(filter) {
txs[addr] = set
set, n := subpool.Pending(filter)
for addr, list := range set {
txs[addr] = list
}
count += n
}
return txs
return txs, count
}
// SubscribeTransactions registers a subscription for new transaction events,

View file

@ -347,7 +347,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(txpool.PendingFilter{})
pending, _ := b.eth.txPool.Pending(txpool.PendingFilter{})
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {

View file

@ -162,7 +162,7 @@ func newConsensusAPIWithoutHeartbeat(eth *eth.Ethereum) *ConsensusAPI {
//
// If there are payloadAttributes: we try to assemble a block with the payloadAttributes
// and return its payloadID.
func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV1(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if payloadAttributes != nil {
switch {
case payloadAttributes.Withdrawals != nil || payloadAttributes.BeaconRoot != nil:
@ -171,12 +171,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, paramsErr("fcuV1 called post-shanghai")
}
}
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false)
return api.forkchoiceUpdated(ctx, update, payloadAttributes, engine.PayloadV1, false)
}
// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload
// attributes. It supports both PayloadAttributesV1 and PayloadAttributesV2.
func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.BeaconRoot != nil:
@ -189,12 +189,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, unsupportedForkErr("fcuV2 must only be called with paris or shanghai payloads")
}
}
return api.forkchoiceUpdated(update, params, engine.PayloadV2, false)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV2, false)
}
// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
// in the payload attributes. It supports only PayloadAttributesV3.
func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.Withdrawals == nil:
@ -209,12 +209,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV3, false)
}
// ForkchoiceUpdatedV4 is equivalent to V3 with the addition of slot number
// in the payload attributes. It supports only PayloadAttributesV4.
func (api *ConsensusAPI) ForkchoiceUpdatedV4(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV4(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.Withdrawals == nil:
@ -231,10 +231,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV4(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV4, false)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV4, false)
}
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, payloadWitness bool) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) forkchoiceUpdated(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, payloadWitness bool) (result engine.ForkChoiceResponse, err error) {
ctx, _, spanEnd := telemetry.StartSpan(ctx, "engine.forkchoiceUpdated")
defer spanEnd(&err)
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()
@ -375,7 +377,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) {
return valid(&id), nil
}
payload, err := api.eth.Miner().BuildPayload(args, payloadWitness)
payload, err := api.eth.Miner().BuildPayload(ctx, args, payloadWitness)
if err != nil {
log.Error("Failed to build payload", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)

View file

@ -190,7 +190,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedV1(fcState, &blockParams)
_, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
@ -270,7 +270,7 @@ func TestInvalidPayloadTimestamp(t *testing.T) {
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedV1(fcState, &params)
_, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, &params)
if test.shouldErr && err == nil {
t.Fatalf("expected error preparing payload with invalid timestamp, err=%v", err)
} else if !test.shouldErr && err != nil {
@ -329,7 +329,7 @@ func TestEth2NewBlock(t *testing.T) {
SafeBlockHash: block.Hash(),
FinalizedBlockHash: block.Hash(),
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if have, want := ethservice.BlockChain().CurrentBlock().Number.Uint64(), block.NumberU64(); have != want {
@ -369,7 +369,7 @@ func TestEth2NewBlock(t *testing.T) {
SafeBlockHash: block.Hash(),
FinalizedBlockHash: block.Hash(),
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().Number.Uint64() != block.NumberU64() {
@ -515,7 +515,7 @@ func setupBlocks(t *testing.T, ethservice *eth.Ethereum, n int, parent *types.He
SafeBlockHash: payload.ParentHash,
FinalizedBlockHash: payload.ParentHash,
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().Number.Uint64() != payload.Number {
@ -629,7 +629,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
err error
)
for i := 0; ; i++ {
if resp, err = api.ForkchoiceUpdatedV1(fcState, &params); err != nil {
if resp, err = api.ForkchoiceUpdatedV1(context.Background(), fcState, &params); err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
if resp.PayloadStatus.Status != engine.VALID {
@ -660,7 +660,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
SafeBlockHash: payload.ExecutionPayload.ParentHash,
FinalizedBlockHash: payload.ExecutionPayload.ParentHash,
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().Number.Uint64() != payload.ExecutionPayload.Number {
@ -679,7 +679,7 @@ func assembleEnvelope(api *ConsensusAPI, parentHash common.Hash, params *engine.
Withdrawals: params.Withdrawals,
BeaconRoot: params.BeaconRoot,
}
payload, err := api.eth.Miner().BuildPayload(args, false)
payload, err := api.eth.Miner().BuildPayload(context.Background(), args, false)
if err != nil {
return nil, err
}
@ -867,7 +867,7 @@ func TestTrickRemoteBlockCache(t *testing.T) {
t.Error("invalid status: VALID on an invalid chain")
}
// Now reorg to the head of the invalid chain
resp, err := apiB.ForkchoiceUpdatedV1(engine.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil)
resp, err := apiB.ForkchoiceUpdatedV1(context.Background(), engine.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil)
if err != nil {
t.Fatal(err)
}
@ -970,7 +970,7 @@ func TestSimultaneousNewBlock(t *testing.T) {
for ii := 0; ii < 10; ii++ {
go func() {
defer wg.Done()
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil {
errMu.Lock()
testErr = fmt.Errorf("failed to insert block: %w", err)
errMu.Unlock()
@ -1011,7 +1011,7 @@ func TestWithdrawals(t *testing.T) {
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
}
resp, err := api.ForkchoiceUpdatedV2(fcState, &blockParams)
resp, err := api.ForkchoiceUpdatedV2(context.Background(), fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
@ -1063,7 +1063,7 @@ func TestWithdrawals(t *testing.T) {
},
}
fcState.HeadBlockHash = execData.ExecutionPayload.BlockHash
_, err = api.ForkchoiceUpdatedV2(fcState, &blockParams)
_, err = api.ForkchoiceUpdatedV2(context.Background(), fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
@ -1090,7 +1090,7 @@ func TestWithdrawals(t *testing.T) {
// 11: set block as head.
fcState.HeadBlockHash = execData.ExecutionPayload.BlockHash
_, err = api.ForkchoiceUpdatedV2(fcState, nil)
_, err = api.ForkchoiceUpdatedV2(context.Background(), fcState, nil)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
@ -1196,10 +1196,10 @@ func TestNilWithdrawals(t *testing.T) {
)
if !shanghai {
payloadVersion = engine.PayloadV1
_, err = api.ForkchoiceUpdatedV1(fcState, &test.blockParams)
_, err = api.ForkchoiceUpdatedV1(context.Background(), fcState, &test.blockParams)
} else {
payloadVersion = engine.PayloadV2
_, err = api.ForkchoiceUpdatedV2(fcState, &test.blockParams)
_, err = api.ForkchoiceUpdatedV2(context.Background(), fcState, &test.blockParams)
}
if test.wantErr {
if err == nil {
@ -1579,7 +1579,7 @@ func TestParentBeaconBlockRoot(t *testing.T) {
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
}
resp, err := api.ForkchoiceUpdatedV3(fcState, &blockParams)
resp, err := api.ForkchoiceUpdatedV3(context.Background(), fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err.(*engine.EngineAPIError).ErrorData())
}
@ -1610,7 +1610,7 @@ func TestParentBeaconBlockRoot(t *testing.T) {
}
fcState.HeadBlockHash = execData.ExecutionPayload.BlockHash
resp, err = api.ForkchoiceUpdatedV3(fcState, nil)
resp, err = api.ForkchoiceUpdatedV3(context.Background(), fcState, nil)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err.(*engine.EngineAPIError).ErrorData())
}
@ -1666,7 +1666,7 @@ func TestWitnessCreationAndConsumption(t *testing.T) {
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedWithWitnessV3(fcState, &blockParams)
_, err := api.ForkchoiceUpdatedWithWitnessV3(context.Background(), fcState, &blockParams)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}

View file

@ -126,7 +126,7 @@ func NewSimulatedBeacon(period uint64, feeRecipient common.Address, eth *eth.Eth
// if genesis block, send forkchoiceUpdated to trigger transition to PoS
if block.Number.Sign() == 0 {
version := payloadVersion(eth.BlockChain().Config(), block.Time)
if _, err := engineAPI.forkchoiceUpdated(current, nil, version, false); err != nil {
if _, err := engineAPI.forkchoiceUpdated(context.Background(), current, nil, version, false); err != nil {
return nil, err
}
}
@ -212,7 +212,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
slotNumber := uint64(0)
attribute.SlotNumber = &slotNumber
}
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, attribute, version, false)
// Create a server span for forkchoiceUpdated with payload attributes,
// simulating an incoming engine API request from a real consensus client.
fcCtx, fcSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version),
})
fcResponse, err := c.engineAPI.forkchoiceUpdated(fcCtx, c.curForkchoiceState, attribute, version, false)
fcSpanEnd(&err)
if err != nil {
return err
}
@ -226,7 +235,15 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
return nil
}
// Create a server span for getPayload, simulating the consensus client
// coming back to retrieve the built payload.
_, gpSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "getPayloadV" + fmt.Sprintf("%d", version),
})
envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true, nil, nil)
gpSpanEnd(&err)
if err != nil {
return err
}
@ -274,6 +291,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
Service: "engine",
Method: "newPayloadV" + fmt.Sprintf("%d", version),
})
// Mark the payload as canon
_, err = c.engineAPI.newPayload(npCtx, *payload, blobHashes, beaconRoot, requests, false)
npSpanEnd(&err)
@ -282,8 +300,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
}
c.setCurrentState(payload.BlockHash, finalizedHash)
// Mark the block containing the payload as canonical
if _, err = c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, nil, version, false); err != nil {
// Create a server span for the final forkchoiceUpdated (no payload attributes),
// which sets the new block as the canonical chain head.
fcuCtx, fcuSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version),
})
_, err = c.engineAPI.forkchoiceUpdated(fcuCtx, c.curForkchoiceState, nil, version, false)
fcuSpanEnd(&err)
if err != nil {
return err
}
c.lastBlockTime = payload.Timestamp
@ -349,7 +375,7 @@ func (c *SimulatedBeacon) Rollback() {
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
// Ensure no pending transactions.
c.eth.TxPool().Sync()
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 {
return errors.New("pending block dirty")
}
@ -363,7 +389,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
// AdjustTime creates a new block with an adjusted timestamp.
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 {
return errors.New("could not adjust time on non-empty block")
}
parent := c.eth.BlockChain().CurrentBlock()

View file

@ -35,7 +35,7 @@ import (
// ForkchoiceUpdatedWithWitnessV1 is analogous to ForkchoiceUpdatedV1, only it
// generates an execution witness too if block building was requested.
func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV1(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV1(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if payloadAttributes != nil {
switch {
case payloadAttributes.Withdrawals != nil || payloadAttributes.BeaconRoot != nil:
@ -44,12 +44,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV1(update engine.Forkchoice
return engine.STATUS_INVALID, paramsErr("fcuV1 called post-shanghai")
}
}
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, true)
return api.forkchoiceUpdated(ctx, update, payloadAttributes, engine.PayloadV1, true)
}
// ForkchoiceUpdatedWithWitnessV2 is analogous to ForkchoiceUpdatedV2, only it
// generates an execution witness too if block building was requested.
func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV2(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV2(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.BeaconRoot != nil:
@ -62,12 +62,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV2(update engine.Forkchoice
return engine.STATUS_INVALID, unsupportedForkErr("fcuV2 must only be called with paris or shanghai payloads")
}
}
return api.forkchoiceUpdated(update, params, engine.PayloadV2, true)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV2, true)
}
// ForkchoiceUpdatedWithWitnessV3 is analogous to ForkchoiceUpdatedV3, only it
// generates an execution witness too if block building was requested.
func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV3(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV3(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if params != nil {
switch {
case params.Withdrawals == nil:
@ -82,7 +82,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV3(update engine.Forkchoice
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, true)
return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV3, true)
}
// NewPayloadWithWitnessV1 is analogous to NewPayloadV1, only it also generates

View file

@ -86,7 +86,7 @@ type txPool interface {
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction
Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int)
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions

View file

@ -128,10 +128,11 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
}
// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (p *testTxPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
p.lock.RLock()
defer p.lock.RUnlock()
var count int
batches := make(map[common.Address][]*types.Transaction)
for _, tx := range p.pool {
from, _ := types.Sender(types.HomesteadSigner{}, tx)
@ -152,9 +153,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
count++
}
}
return pending
return pending, count
}
// SubscribeTransactions should return an event subscription of NewTxsEvent and

View file

@ -25,7 +25,8 @@ import (
// syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) {
var hashes []common.Hash
for _, batch := range h.txpool.Pending(txpool.PendingFilter{BlobTxs: false}) {
pending, _ := h.txpool.Pending(txpool.PendingFilter{BlobTxs: false})
for _, batch := range pending {
for _, tx := range batch {
hashes = append(hashes, tx.Hash)
}

View file

@ -406,7 +406,7 @@ func (sim *simulator) processBlock(ctx context.Context, block *simBlock, header,
Withdrawals: *block.BlockOverrides.Withdrawals,
}
chainHeadReader := &simChainHeadReader{ctx, sim.b}
b, err := sim.b.Engine().FinalizeAndAssemble(chainHeadReader, header, sim.state, blockBody, receipts)
b, err := sim.b.Engine().FinalizeAndAssemble(ctx, chainHeadReader, header, sim.state, blockBody, receipts)
if err != nil {
return nil, nil, nil, err
}

View file

@ -18,6 +18,7 @@
package miner
import (
"context"
"fmt"
"math/big"
"sync"
@ -135,8 +136,8 @@ func (miner *Miner) SetGasTip(tip *big.Int) error {
}
// BuildPayload builds the payload according to the provided parameters.
func (miner *Miner) BuildPayload(args *BuildPayloadArgs, witness bool) (*Payload, error) {
return miner.buildPayload(args, witness)
func (miner *Miner) BuildPayload(ctx context.Context, args *BuildPayloadArgs, witness bool) (*Payload, error) {
return miner.buildPayload(ctx, args, witness)
}
// getPending retrieves the pending block based on the current head block.
@ -156,16 +157,17 @@ func (miner *Miner) getPending() *newPayloadResult {
if miner.chainConfig.IsShanghai(new(big.Int).Add(header.Number, big.NewInt(1)), timestamp) {
withdrawal = []*types.Withdrawal{}
}
ret := miner.generateWork(&generateParams{
timestamp: timestamp,
forceTime: false,
parentHash: header.Hash(),
coinbase: miner.config.PendingFeeRecipient,
random: common.Hash{},
withdrawals: withdrawal,
beaconRoot: nil,
noTxs: false,
}, false) // we will never make a witness for a pending block
ret := miner.generateWork(context.Background(),
&generateParams{
timestamp: timestamp,
forceTime: false,
parentHash: header.Hash(),
coinbase: miner.config.PendingFeeRecipient,
random: common.Hash{},
withdrawals: withdrawal,
beaconRoot: nil,
noTxs: false,
}, false) // we will never make a witness for a pending block
if ret.err != nil {
return nil
}

View file

@ -17,6 +17,7 @@
package miner
import (
"context"
"crypto/sha256"
"encoding/binary"
"math/big"
@ -28,9 +29,11 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/stateless"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/telemetry"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"go.opentelemetry.io/otel/trace"
)
// BuildPayloadArgs contains the provided parameters for building payload.
@ -101,14 +104,15 @@ func newPayload(empty *types.Block, emptyRequests [][]byte, witness *stateless.W
return payload
}
// update updates the full-block with latest built version.
func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) {
// update updates the full-block with latest built version. It returns true if
// the update was accepted (i.e. the new block has higher fees than the previous).
func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) (result bool) {
payload.lock.Lock()
defer payload.lock.Unlock()
select {
case <-payload.stop:
return // reject stale update
return false // reject stale update
default:
}
// Ensure the newly provided full block has a higher transaction fee.
@ -133,8 +137,10 @@ func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) {
"root", r.block.Root(),
"elapsed", common.PrettyDuration(elapsed),
)
result = true
}
payload.cond.Broadcast() // fire signal for notifying full block
return
}
// Resolve returns the latest built payload and also terminates the background
@ -209,8 +215,33 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope {
return envelope
}
func (miner *Miner) runBuildIteration(ctx context.Context, start time.Time, iteration int, payload *Payload, params *generateParams, witness bool) {
ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.buildIteration",
telemetry.Int64Attribute("iteration", int64(iteration)),
)
var err error
defer spanEnd(&err)
r := miner.generateWork(ctx, params, witness)
err = r.err
if err == nil {
accepted := payload.update(r, time.Since(start))
span.SetAttributes(telemetry.BoolAttribute("update.accepted", accepted))
} else {
log.Info("Error while generating work", "id", payload.id, "err", err)
}
}
// buildPayload builds the payload according to the provided parameters.
func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload, error) {
func (miner *Miner) buildPayload(ctx context.Context, args *BuildPayloadArgs, witness bool) (result *Payload, err error) {
payloadID := args.Id()
ctx, _, spanEnd := telemetry.StartSpan(ctx, "miner.buildPayload",
telemetry.StringAttribute("payload.id", payloadID.String()),
telemetry.StringAttribute("parent.hash", args.Parent.String()),
telemetry.Int64Attribute("timestamp", int64(args.Timestamp)),
)
defer spanEnd(&err)
// Build the initial version with no transaction included. It should be fast
// enough to run. The empty payload can at least make sure there is something
// to deliver for not missing slot.
@ -225,16 +256,25 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload
slotNum: args.SlotNum,
noTxs: true,
}
empty := miner.generateWork(emptyParams, witness)
empty := miner.generateWork(ctx, emptyParams, witness)
if empty.err != nil {
return nil, empty.err
}
// Construct a payload object for return.
payload := newPayload(empty.block, empty.requests, empty.witness, args.Id())
payload := newPayload(empty.block, empty.requests, empty.witness, payloadID)
// Spin up a routine for updating the payload in background. This strategy
// can maximum the revenue for including transactions with highest fee.
go func() {
var iteration int
bCtx, bSpan, bSpanEnd := telemetry.StartSpan(ctx, "miner.background",
telemetry.Int64Attribute("block.number", int64(empty.block.NumberU64())),
)
defer func() {
bSpan.SetAttributes(telemetry.Int64Attribute("iterations.total", int64(iteration)))
bSpanEnd(nil)
}()
// Setup the timer for re-building the payload. The initial clock is kept
// for triggering process immediately.
timer := time.NewTimer(0)
@ -256,7 +296,6 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload
slotNum: args.SlotNum,
noTxs: false,
}
for {
select {
case <-timer.C:
@ -267,22 +306,21 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload
// Check payload.stop first to avoid an unnecessary generateWork.
select {
case <-payload.stop:
payload.updateSpanForDelivery(bSpan)
log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery")
return
default:
}
start := time.Now()
r := miner.generateWork(fullParams, witness)
if r.err == nil {
payload.update(r, time.Since(start))
} else {
log.Info("Error while generating work", "id", payload.id, "err", r.err)
}
iteration++
miner.runBuildIteration(bCtx, start, iteration, payload, fullParams, witness)
timer.Reset(max(0, miner.config.Recommit-time.Since(start)))
case <-payload.stop:
payload.updateSpanForDelivery(bSpan)
log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery")
return
case <-endTimer.C:
bSpan.SetAttributes(telemetry.StringAttribute("exit.reason", "timeout"))
log.Info("Stopping work on payload", "id", payload.id, "reason", "timeout")
return
}
@ -291,6 +329,16 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload
return payload, nil
}
func (payload *Payload) updateSpanForDelivery(bSpan trace.Span) {
payload.lock.Lock()
emptyDelivered := payload.full == nil
payload.lock.Unlock()
bSpan.SetAttributes(
telemetry.StringAttribute("exit.reason", "delivery"),
telemetry.BoolAttribute("empty.delivered", emptyDelivered),
)
}
// BuildTestingPayload is for testing_buildBlockV*. It creates a block with the exact content given
// by the parameters instead of using the locally available transactions.
func (miner *Miner) BuildTestingPayload(args *BuildPayloadArgs, transactions []*types.Transaction, empty bool, extraData []byte) (*engine.ExecutionPayloadEnvelope, error) {
@ -307,7 +355,7 @@ func (miner *Miner) BuildTestingPayload(args *BuildPayloadArgs, transactions []*
overrideExtraData: extraData,
overrideTxs: transactions,
}
res := miner.generateWork(fullParams, false)
res := miner.generateWork(context.Background(), fullParams, false)
if res.err != nil {
return nil, res.err
}

View file

@ -17,6 +17,7 @@
package miner
import (
"context"
"math/big"
"reflect"
"testing"
@ -156,7 +157,7 @@ func TestBuildPayload(t *testing.T) {
Random: common.Hash{},
FeeRecipient: recipient,
}
payload, err := w.buildPayload(args, false)
payload, err := w.buildPayload(context.Background(), args, false)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}

View file

@ -17,6 +17,7 @@
package miner
import (
"context"
"errors"
"fmt"
"math/big"
@ -32,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/internal/telemetry"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
@ -125,8 +127,23 @@ type generateParams struct {
}
// generateWork generates a sealing block based on the given parameters.
func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPayloadResult {
work, err := miner.prepareWork(genParam, witness)
func (miner *Miner) generateWork(ctx context.Context, genParam *generateParams, witness bool) (result *newPayloadResult) {
ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.generateWork")
defer func() {
if result != nil && result.err == nil {
span.SetAttributes(
telemetry.Int64Attribute("txs.count", int64(len(result.block.Transactions()))),
telemetry.Int64Attribute("gas.used", int64(result.block.GasUsed())),
telemetry.StringAttribute("fees", result.fees.String()),
)
}
if result != nil {
spanEnd(&result.err)
} else {
spanEnd(nil)
}
}()
work, err := miner.prepareWork(ctx, genParam, witness)
if err != nil {
return &newPayloadResult{err: err}
}
@ -148,7 +165,7 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay
if genParam.forceOverrides && len(genParam.overrideTxs) > 0 {
for _, tx := range genParam.overrideTxs {
work.state.SetTxContext(tx.Hash(), work.tcount)
if err := miner.commitTransaction(work, tx); err != nil {
if err := miner.commitTransaction(ctx, work, tx); err != nil {
// all passed transactions HAVE to be valid at this point
return &newPayloadResult{err: err}
}
@ -160,7 +177,7 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay
})
defer timer.Stop()
err := miner.fillTransactions(interrupt, work)
err := miner.fillTransactions(ctx, interrupt, work)
if errors.Is(err, errBlockInterruptedByTimeout) {
log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(miner.config.Recommit))
}
@ -195,7 +212,7 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay
work.header.RequestsHash = &reqHash
}
block, err := miner.engine.FinalizeAndAssemble(miner.chain, work.header, work.state, &body, work.receipts)
block, err := miner.engine.FinalizeAndAssemble(ctx, miner.chain, work.header, work.state, &body, work.receipts)
if err != nil {
return &newPayloadResult{err: err}
}
@ -213,7 +230,9 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay
// prepareWork constructs the sealing task according to the given parameters,
// either based on the last chain head or specified parent. In this function
// the pending transactions are not filled yet, only the empty task returned.
func (miner *Miner) prepareWork(genParams *generateParams, witness bool) (*environment, error) {
func (miner *Miner) prepareWork(ctx context.Context, genParams *generateParams, witness bool) (result *environment, err error) {
_, _, spanEnd := telemetry.StartSpan(ctx, "miner.prepareWork")
defer spanEnd(&err)
miner.confMu.RLock()
defer miner.confMu.RUnlock()
@ -330,7 +349,9 @@ func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase
}, nil
}
func (miner *Miner) commitTransaction(env *environment, tx *types.Transaction) error {
func (miner *Miner) commitTransaction(ctx context.Context, env *environment, tx *types.Transaction) (err error) {
_, _, spanEnd := telemetry.StartSpan(ctx, "miner.commitTransaction")
defer spanEnd(&err)
if tx.Type() == types.BlobTxType {
return miner.commitBlobTransaction(env, tx)
}
@ -389,7 +410,9 @@ func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (*
return receipt, nil
}
func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
func (miner *Miner) commitTransactions(ctx context.Context, env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
ctx, _, spanEnd := telemetry.StartSpan(ctx, "miner.commitTransactions")
defer spanEnd(nil)
isCancun := miner.chainConfig.IsCancun(env.header.Number, env.header.Time)
for {
// Check interruption signal and abort building if it's fired.
@ -479,7 +502,7 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran
// Start executing the transaction
env.state.SetTxContext(tx.Hash(), env.tcount)
err := miner.commitTransaction(env, tx)
err := miner.commitTransaction(ctx, env, tx)
switch {
case errors.Is(err, core.ErrNonceTooLow):
// New head notification data race between the transaction pool and miner, shift
@ -503,7 +526,9 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) error {
func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int32, env *environment) (err error) {
ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.fillTransactions")
defer spanEnd(&err)
miner.confMu.RLock()
tip := miner.config.GasPrice
prio := miner.prio
@ -523,7 +548,7 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment)
filter.GasLimitCap = params.MaxTxGas
}
filter.BlobTxs = false
pendingPlainTxs := miner.txpool.Pending(filter)
pendingPlainTxs, plainTxCount := miner.txpool.Pending(filter)
filter.BlobTxs = true
if miner.chainConfig.IsOsaka(env.header.Number, env.header.Time) {
@ -531,7 +556,11 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment)
} else {
filter.BlobVersion = types.BlobSidecarVersion0
}
pendingBlobTxs := miner.txpool.Pending(filter)
pendingBlobTxs, blobTxCount := miner.txpool.Pending(filter)
span.SetAttributes(
telemetry.Int64Attribute("pending.plain.count", int64(plainTxCount)),
telemetry.Int64Attribute("pending.blob.count", int64(blobTxCount)),
)
// Split the pending transactions into locals and remotes.
prioPlainTxs, normalPlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
@ -552,7 +581,7 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment)
plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee)
if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
@ -560,7 +589,7 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment)
plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee)
if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}