From 98b13f342f046f72d0f5eca1bd4ca5b6951a8d79 Mon Sep 17 00:00:00 2001 From: Jonny Rhea <5555162+jrhea@users.noreply.github.com> Date: Mon, 16 Mar 2026 13:24:41 -0500 Subject: [PATCH] miner: add OpenTelemetry spans for block building path (#33773) Instruments the block building path with OpenTelemetry tracing spans. - added spans in forkchoiceUpdated -> buildPayload -> background payload loop -> generateWork iterations. Spans should look something like this: ``` jsonrpc.engine/forkchoiceUpdatedV3 |- rpc.runMethod | |- engine.forkchoiceUpdated | |- miner.buildPayload [payload.id, parent.hash, timestamp] | |- miner.generateWork [txs.count, gas.used, fees] (empty block) | | |- miner.prepareWork | | |- miner.FinalizeAndAssemble | | |- consensus.beacon.FinalizeAndAssemble [block.number, txs.count, withdrawals.count] | | |- consensus.beacon.Finalize | | |- consensus.beacon.IntermediateRoot | | |- consensus.beacon.NewBlock | |- miner.background [block.number, iterations.total, exit.reason, empty.delivered] | |- miner.buildIteration [iteration, update.accepted] | | |- miner.generateWork [txs.count, gas.used, fees] | | |- miner.prepareWork | | |- miner.fillTransactions [pending.plain.count, pending.blob.count] | | | |- miner.commitTransactions.priority (if prio txs exist) | | | | |- miner.commitTransactions | | | | |- miner.commitTransaction (per tx) | | | |- miner.commitTransactions.normal (if normal txs exist) | | | |- miner.commitTransactions | | | |- miner.commitTransaction (per tx) | | |- miner.FinalizeAndAssemble | | |- consensus.beacon.FinalizeAndAssemble [block.number, txs.count, withdrawals.count] | | |- consensus.beacon.Finalize | | |- consensus.beacon.IntermediateRoot | | |- consensus.beacon.NewBlock | |- miner.buildIteration [iteration, update.accepted] | | |- ... | |- ... ``` - added simulated server spans in SimulatedBeacon.sealBlock so dev mode (geth --dev) produces traces that mirror production Engine API calls from a real consensus client. --------- Co-authored-by: Felix Lange --- consensus/beacon/consensus.go | 23 ++++++-- consensus/clique/clique.go | 3 +- consensus/consensus.go | 3 +- consensus/ethash/consensus.go | 3 +- core/chain_makers.go | 3 +- core/txpool/blobpool/blobpool.go | 8 +-- core/txpool/blobpool/blobpool_test.go | 2 +- core/txpool/legacypool/legacypool.go | 8 +-- core/txpool/subpool.go | 2 +- core/txpool/txpool.go | 11 ++-- eth/api_backend.go | 2 +- eth/catalyst/api.go | 22 ++++---- eth/catalyst/api_test.go | 36 ++++++------- eth/catalyst/simulated_beacon.go | 38 +++++++++++--- eth/catalyst/witness.go | 12 ++--- eth/handler.go | 2 +- eth/handler_test.go | 6 ++- eth/sync.go | 3 +- internal/ethapi/simulate.go | 2 +- miner/miner.go | 26 ++++----- miner/payload_building.go | 76 ++++++++++++++++++++++----- miner/payload_building_test.go | 3 +- miner/worker.go | 57 +++++++++++++++----- 23 files changed, 245 insertions(+), 106 deletions(-) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index 45a480c50e..25f4f9d2b2 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -17,6 +17,7 @@ package beacon import ( + "context" "errors" "fmt" "math/big" @@ -29,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core/tracing" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/internal/telemetry" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" "github.com/holiman/uint256" @@ -351,9 +353,17 @@ func (beacon *Beacon) Finalize(chain consensus.ChainHeaderReader, header *types. // FinalizeAndAssemble implements consensus.Engine, setting the final state and // assembling the block. -func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) { +func (beacon *Beacon) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (result *types.Block, err error) { + ctx, _, spanEnd := telemetry.StartSpan(ctx, "consensus.beacon.FinalizeAndAssemble", + telemetry.Int64Attribute("block.number", int64(header.Number.Uint64())), + telemetry.Int64Attribute("txs.count", int64(len(body.Transactions))), + telemetry.Int64Attribute("withdrawals.count", int64(len(body.Withdrawals))), + ) + defer spanEnd(&err) + if !beacon.IsPoSHeader(header) { - return beacon.ethone.FinalizeAndAssemble(chain, header, state, body, receipts) + block, delegateErr := beacon.ethone.FinalizeAndAssemble(ctx, chain, header, state, body, receipts) + return block, delegateErr } shanghai := chain.Config().IsShanghai(header.Number, header.Time) if shanghai { @@ -367,13 +377,20 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea } } // Finalize and assemble the block. + _, _, finalizeSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.Finalize") beacon.Finalize(chain, header, state, body) + finalizeSpanEnd(nil) // Assign the final state root to header. + _, _, rootSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.IntermediateRoot") header.Root = state.IntermediateRoot(true) + rootSpanEnd(nil) // Assemble the final block. - return types.NewBlock(header, body, receipts, trie.NewStackTrie(nil)), nil + _, _, blockSpanEnd := telemetry.StartSpan(ctx, "consensus.beacon.NewBlock") + block := types.NewBlock(header, body, receipts, trie.NewStackTrie(nil)) + blockSpanEnd(nil) + return block, nil } // Seal generates a new sealing request for the given input block and pushes diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 28095011c1..3bf79d5a62 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -19,6 +19,7 @@ package clique import ( "bytes" + "context" "errors" "fmt" "io" @@ -581,7 +582,7 @@ func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Heade // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set, // nor block rewards given, and returns the final block. -func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) { +func (c *Clique) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) { if len(body.Withdrawals) > 0 { return nil, errors.New("clique does not support withdrawals") } diff --git a/consensus/consensus.go b/consensus/consensus.go index a68351f7ff..094026b614 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -18,6 +18,7 @@ package consensus import ( + "context" "math/big" "github.com/ethereum/go-ethereum/common" @@ -92,7 +93,7 @@ type Engine interface { // // Note: The block header and state database might be updated to reflect any // consensus rules that happen at finalization (e.g. block rewards). - FinalizeAndAssemble(chain ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) + FinalizeAndAssemble(ctx context.Context, chain ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) // Seal generates a new sealing request for the given input block and pushes // the result into the given channel. diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 61371e0636..56256d1215 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -17,6 +17,7 @@ package ethash import ( + "context" "errors" "fmt" "math/big" @@ -513,7 +514,7 @@ func (ethash *Ethash) Finalize(chain consensus.ChainHeaderReader, header *types. // FinalizeAndAssemble implements consensus.Engine, accumulating the block and // uncle rewards, setting the final state and assembling the block. -func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) { +func (ethash *Ethash) FinalizeAndAssemble(ctx context.Context, chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, body *types.Body, receipts []*types.Receipt) (*types.Block, error) { if len(body.Withdrawals) > 0 { return nil, errors.New("ethash does not support withdrawals") } diff --git a/core/chain_makers.go b/core/chain_makers.go index e4b5cf964f..8f6eed1697 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -17,6 +17,7 @@ package core import ( + "context" "fmt" "math/big" @@ -411,7 +412,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse } body := types.Body{Transactions: b.txs, Uncles: b.uncles, Withdrawals: b.withdrawals} - block, err := b.engine.FinalizeAndAssemble(cm, b.header, statedb, &body, b.receipts) + block, err := b.engine.FinalizeAndAssemble(context.Background(), cm, b.header, statedb, &body, b.receipts) if err != nil { panic(err) } diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 27fdd00016..7155a67a9b 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1865,11 +1865,11 @@ func (p *BlobPool) drop() { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { +func (p *BlobPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) { // If only plain transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if !filter.BlobTxs { - return nil + return nil, 0 } // Track the amount of time waiting to retrieve the list of pending blob txs // from the pool and the amount of time actually spent on assembling the data. @@ -1885,6 +1885,7 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx pendtimeHist.Update(time.Since(execStart).Nanoseconds()) }() + var count int pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index)) for addr, txs := range p.index { lazies := make([]*txpool.LazyTransaction, 0, len(txs)) @@ -1930,9 +1931,10 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx } if len(lazies) > 0 { pending[addr] = lazies + count += len(lazies) } } - return pending + return pending, count } // updateStorageMetrics retrieves a bunch of stats from the data store and pushes diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 6580a339e3..ba96bea8ed 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { b.ReportAllocs() for i := 0; i < b.N; i++ { - p := pool.Pending(txpool.PendingFilter{ + p, _ := pool.Pending(txpool.PendingFilter{ MinTip: uint256.NewInt(1), BaseFee: chain.basefee, BlobFee: chain.blobfee, diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 36970c820e..25c4b13166 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -494,15 +494,16 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { +func (pool *LegacyPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) { // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.BlobTxs { - return nil + return nil, 0 } pool.mu.Lock() defer pool.mu.Unlock() + var count int pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) for addr, list := range pool.pending { txs := list.Flatten() @@ -539,9 +540,10 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] } } pending[addr] = lazies + count += len(lazies) } } - return pending + return pending, count } // ValidateTxBasics checks whether a transaction is valid according to the consensus diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index db099ddf98..4cc1b193d6 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -154,7 +154,7 @@ type SubPool interface { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. - Pending(filter PendingFilter) map[common.Address][]*LazyTransaction + Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index a314a83f1b..25647e0cce 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -359,14 +359,17 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction { +func (p *TxPool) Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) { + var count int txs := make(map[common.Address][]*LazyTransaction) for _, subpool := range p.subpools { - for addr, set := range subpool.Pending(filter) { - txs[addr] = set + set, n := subpool.Pending(filter) + for addr, list := range set { + txs[addr] = list } + count += n } - return txs + return txs, count } // SubscribeTransactions registers a subscription for new transaction events, diff --git a/eth/api_backend.go b/eth/api_backend.go index 3f826b7861..726d8316a0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -347,7 +347,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { - pending := b.eth.txPool.Pending(txpool.PendingFilter{}) + pending, _ := b.eth.txPool.Pending(txpool.PendingFilter{}) var txs types.Transactions for _, batch := range pending { for _, lazy := range batch { diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 96d4570561..8a4aced04b 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -162,7 +162,7 @@ func newConsensusAPIWithoutHeartbeat(eth *eth.Ethereum) *ConsensusAPI { // // If there are payloadAttributes: we try to assemble a block with the payloadAttributes // and return its payloadID. -func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedV1(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if payloadAttributes != nil { switch { case payloadAttributes.Withdrawals != nil || payloadAttributes.BeaconRoot != nil: @@ -171,12 +171,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa return engine.STATUS_INVALID, paramsErr("fcuV1 called post-shanghai") } } - return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false) + return api.forkchoiceUpdated(ctx, update, payloadAttributes, engine.PayloadV1, false) } // ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload // attributes. It supports both PayloadAttributesV1 and PayloadAttributesV2. -func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if params != nil { switch { case params.BeaconRoot != nil: @@ -189,12 +189,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa return engine.STATUS_INVALID, unsupportedForkErr("fcuV2 must only be called with paris or shanghai payloads") } } - return api.forkchoiceUpdated(update, params, engine.PayloadV2, false) + return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV2, false) } // ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root // in the payload attributes. It supports only PayloadAttributesV3. -func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if params != nil { switch { case params.Withdrawals == nil: @@ -209,12 +209,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa // hash, even if params are wrong. To do this we need to split up // forkchoiceUpdate into a function that only updates the head and then a // function that kicks off block construction. - return api.forkchoiceUpdated(update, params, engine.PayloadV3, false) + return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV3, false) } // ForkchoiceUpdatedV4 is equivalent to V3 with the addition of slot number // in the payload attributes. It supports only PayloadAttributesV4. -func (api *ConsensusAPI) ForkchoiceUpdatedV4(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedV4(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if params != nil { switch { case params.Withdrawals == nil: @@ -231,10 +231,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV4(update engine.ForkchoiceStateV1, pa // hash, even if params are wrong. To do this we need to split up // forkchoiceUpdate into a function that only updates the head and then a // function that kicks off block construction. - return api.forkchoiceUpdated(update, params, engine.PayloadV4, false) + return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV4, false) } -func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, payloadWitness bool) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) forkchoiceUpdated(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, payloadWitness bool) (result engine.ForkChoiceResponse, err error) { + ctx, _, spanEnd := telemetry.StartSpan(ctx, "engine.forkchoiceUpdated") + defer spanEnd(&err) api.forkchoiceLock.Lock() defer api.forkchoiceLock.Unlock() @@ -375,7 +377,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl if api.localBlocks.has(id) { return valid(&id), nil } - payload, err := api.eth.Miner().BuildPayload(args, payloadWitness) + payload, err := api.eth.Miner().BuildPayload(ctx, args, payloadWitness) if err != nil { log.Error("Failed to build payload", "err", err) return valid(nil), engine.InvalidPayloadAttributes.With(err) diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index db0505101f..d126c362fe 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -190,7 +190,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) { SafeBlockHash: common.Hash{}, FinalizedBlockHash: common.Hash{}, } - _, err := api.ForkchoiceUpdatedV1(fcState, &blockParams) + _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, &blockParams) if err != nil { t.Fatalf("error preparing payload, err=%v", err) } @@ -270,7 +270,7 @@ func TestInvalidPayloadTimestamp(t *testing.T) { SafeBlockHash: common.Hash{}, FinalizedBlockHash: common.Hash{}, } - _, err := api.ForkchoiceUpdatedV1(fcState, ¶ms) + _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, ¶ms) if test.shouldErr && err == nil { t.Fatalf("expected error preparing payload with invalid timestamp, err=%v", err) } else if !test.shouldErr && err != nil { @@ -329,7 +329,7 @@ func TestEth2NewBlock(t *testing.T) { SafeBlockHash: block.Hash(), FinalizedBlockHash: block.Hash(), } - if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if have, want := ethservice.BlockChain().CurrentBlock().Number.Uint64(), block.NumberU64(); have != want { @@ -369,7 +369,7 @@ func TestEth2NewBlock(t *testing.T) { SafeBlockHash: block.Hash(), FinalizedBlockHash: block.Hash(), } - if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if ethservice.BlockChain().CurrentBlock().Number.Uint64() != block.NumberU64() { @@ -515,7 +515,7 @@ func setupBlocks(t *testing.T, ethservice *eth.Ethereum, n int, parent *types.He SafeBlockHash: payload.ParentHash, FinalizedBlockHash: payload.ParentHash, } - if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if ethservice.BlockChain().CurrentBlock().Number.Uint64() != payload.Number { @@ -629,7 +629,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) { err error ) for i := 0; ; i++ { - if resp, err = api.ForkchoiceUpdatedV1(fcState, ¶ms); err != nil { + if resp, err = api.ForkchoiceUpdatedV1(context.Background(), fcState, ¶ms); err != nil { t.Fatalf("error preparing payload, err=%v", err) } if resp.PayloadStatus.Status != engine.VALID { @@ -660,7 +660,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) { SafeBlockHash: payload.ExecutionPayload.ParentHash, FinalizedBlockHash: payload.ExecutionPayload.ParentHash, } - if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if ethservice.BlockChain().CurrentBlock().Number.Uint64() != payload.ExecutionPayload.Number { @@ -679,7 +679,7 @@ func assembleEnvelope(api *ConsensusAPI, parentHash common.Hash, params *engine. Withdrawals: params.Withdrawals, BeaconRoot: params.BeaconRoot, } - payload, err := api.eth.Miner().BuildPayload(args, false) + payload, err := api.eth.Miner().BuildPayload(context.Background(), args, false) if err != nil { return nil, err } @@ -867,7 +867,7 @@ func TestTrickRemoteBlockCache(t *testing.T) { t.Error("invalid status: VALID on an invalid chain") } // Now reorg to the head of the invalid chain - resp, err := apiB.ForkchoiceUpdatedV1(engine.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil) + resp, err := apiB.ForkchoiceUpdatedV1(context.Background(), engine.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil) if err != nil { t.Fatal(err) } @@ -970,7 +970,7 @@ func TestSimultaneousNewBlock(t *testing.T) { for ii := 0; ii < 10; ii++ { go func() { defer wg.Done() - if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + if _, err := api.ForkchoiceUpdatedV1(context.Background(), fcState, nil); err != nil { errMu.Lock() testErr = fmt.Errorf("failed to insert block: %w", err) errMu.Unlock() @@ -1011,7 +1011,7 @@ func TestWithdrawals(t *testing.T) { fcState := engine.ForkchoiceStateV1{ HeadBlockHash: parent.Hash(), } - resp, err := api.ForkchoiceUpdatedV2(fcState, &blockParams) + resp, err := api.ForkchoiceUpdatedV2(context.Background(), fcState, &blockParams) if err != nil { t.Fatalf("error preparing payload, err=%v", err) } @@ -1063,7 +1063,7 @@ func TestWithdrawals(t *testing.T) { }, } fcState.HeadBlockHash = execData.ExecutionPayload.BlockHash - _, err = api.ForkchoiceUpdatedV2(fcState, &blockParams) + _, err = api.ForkchoiceUpdatedV2(context.Background(), fcState, &blockParams) if err != nil { t.Fatalf("error preparing payload, err=%v", err) } @@ -1090,7 +1090,7 @@ func TestWithdrawals(t *testing.T) { // 11: set block as head. fcState.HeadBlockHash = execData.ExecutionPayload.BlockHash - _, err = api.ForkchoiceUpdatedV2(fcState, nil) + _, err = api.ForkchoiceUpdatedV2(context.Background(), fcState, nil) if err != nil { t.Fatalf("error preparing payload, err=%v", err) } @@ -1196,10 +1196,10 @@ func TestNilWithdrawals(t *testing.T) { ) if !shanghai { payloadVersion = engine.PayloadV1 - _, err = api.ForkchoiceUpdatedV1(fcState, &test.blockParams) + _, err = api.ForkchoiceUpdatedV1(context.Background(), fcState, &test.blockParams) } else { payloadVersion = engine.PayloadV2 - _, err = api.ForkchoiceUpdatedV2(fcState, &test.blockParams) + _, err = api.ForkchoiceUpdatedV2(context.Background(), fcState, &test.blockParams) } if test.wantErr { if err == nil { @@ -1579,7 +1579,7 @@ func TestParentBeaconBlockRoot(t *testing.T) { fcState := engine.ForkchoiceStateV1{ HeadBlockHash: parent.Hash(), } - resp, err := api.ForkchoiceUpdatedV3(fcState, &blockParams) + resp, err := api.ForkchoiceUpdatedV3(context.Background(), fcState, &blockParams) if err != nil { t.Fatalf("error preparing payload, err=%v", err.(*engine.EngineAPIError).ErrorData()) } @@ -1610,7 +1610,7 @@ func TestParentBeaconBlockRoot(t *testing.T) { } fcState.HeadBlockHash = execData.ExecutionPayload.BlockHash - resp, err = api.ForkchoiceUpdatedV3(fcState, nil) + resp, err = api.ForkchoiceUpdatedV3(context.Background(), fcState, nil) if err != nil { t.Fatalf("error preparing payload, err=%v", err.(*engine.EngineAPIError).ErrorData()) } @@ -1666,7 +1666,7 @@ func TestWitnessCreationAndConsumption(t *testing.T) { SafeBlockHash: common.Hash{}, FinalizedBlockHash: common.Hash{}, } - _, err := api.ForkchoiceUpdatedWithWitnessV3(fcState, &blockParams) + _, err := api.ForkchoiceUpdatedWithWitnessV3(context.Background(), fcState, &blockParams) if err != nil { t.Fatalf("error preparing payload, err=%v", err) } diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index 452902c78c..8a77cd8abe 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -126,7 +126,7 @@ func NewSimulatedBeacon(period uint64, feeRecipient common.Address, eth *eth.Eth // if genesis block, send forkchoiceUpdated to trigger transition to PoS if block.Number.Sign() == 0 { version := payloadVersion(eth.BlockChain().Config(), block.Time) - if _, err := engineAPI.forkchoiceUpdated(current, nil, version, false); err != nil { + if _, err := engineAPI.forkchoiceUpdated(context.Background(), current, nil, version, false); err != nil { return nil, err } } @@ -212,7 +212,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u slotNumber := uint64(0) attribute.SlotNumber = &slotNumber } - fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, attribute, version, false) + + // Create a server span for forkchoiceUpdated with payload attributes, + // simulating an incoming engine API request from a real consensus client. + fcCtx, fcSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + System: "jsonrpc", + Service: "engine", + Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version), + }) + fcResponse, err := c.engineAPI.forkchoiceUpdated(fcCtx, c.curForkchoiceState, attribute, version, false) + fcSpanEnd(&err) if err != nil { return err } @@ -226,7 +235,15 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u return nil } + // Create a server span for getPayload, simulating the consensus client + // coming back to retrieve the built payload. + _, gpSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + System: "jsonrpc", + Service: "engine", + Method: "getPayloadV" + fmt.Sprintf("%d", version), + }) envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true, nil, nil) + gpSpanEnd(&err) if err != nil { return err } @@ -274,6 +291,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u Service: "engine", Method: "newPayloadV" + fmt.Sprintf("%d", version), }) + // Mark the payload as canon _, err = c.engineAPI.newPayload(npCtx, *payload, blobHashes, beaconRoot, requests, false) npSpanEnd(&err) @@ -282,8 +300,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u } c.setCurrentState(payload.BlockHash, finalizedHash) - // Mark the block containing the payload as canonical - if _, err = c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, nil, version, false); err != nil { + // Create a server span for the final forkchoiceUpdated (no payload attributes), + // which sets the new block as the canonical chain head. + fcuCtx, fcuSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + System: "jsonrpc", + Service: "engine", + Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version), + }) + _, err = c.engineAPI.forkchoiceUpdated(fcuCtx, c.curForkchoiceState, nil, version, false) + fcuSpanEnd(&err) + if err != nil { return err } c.lastBlockTime = payload.Timestamp @@ -349,7 +375,7 @@ func (c *SimulatedBeacon) Rollback() { func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { // Ensure no pending transactions. c.eth.TxPool().Sync() - if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 { + if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 { return errors.New("pending block dirty") } @@ -363,7 +389,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { // AdjustTime creates a new block with an adjusted timestamp. func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { - if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 { + if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 { return errors.New("could not adjust time on non-empty block") } parent := c.eth.BlockChain().CurrentBlock() diff --git a/eth/catalyst/witness.go b/eth/catalyst/witness.go index 14ca29e079..fe75c66908 100644 --- a/eth/catalyst/witness.go +++ b/eth/catalyst/witness.go @@ -35,7 +35,7 @@ import ( // ForkchoiceUpdatedWithWitnessV1 is analogous to ForkchoiceUpdatedV1, only it // generates an execution witness too if block building was requested. -func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV1(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV1(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if payloadAttributes != nil { switch { case payloadAttributes.Withdrawals != nil || payloadAttributes.BeaconRoot != nil: @@ -44,12 +44,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV1(update engine.Forkchoice return engine.STATUS_INVALID, paramsErr("fcuV1 called post-shanghai") } } - return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, true) + return api.forkchoiceUpdated(ctx, update, payloadAttributes, engine.PayloadV1, true) } // ForkchoiceUpdatedWithWitnessV2 is analogous to ForkchoiceUpdatedV2, only it // generates an execution witness too if block building was requested. -func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV2(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV2(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if params != nil { switch { case params.BeaconRoot != nil: @@ -62,12 +62,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV2(update engine.Forkchoice return engine.STATUS_INVALID, unsupportedForkErr("fcuV2 must only be called with paris or shanghai payloads") } } - return api.forkchoiceUpdated(update, params, engine.PayloadV2, true) + return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV2, true) } // ForkchoiceUpdatedWithWitnessV3 is analogous to ForkchoiceUpdatedV3, only it // generates an execution witness too if block building was requested. -func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV3(update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV3(ctx context.Context, update engine.ForkchoiceStateV1, params *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) { if params != nil { switch { case params.Withdrawals == nil: @@ -82,7 +82,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedWithWitnessV3(update engine.Forkchoice // hash, even if params are wrong. To do this we need to split up // forkchoiceUpdate into a function that only updates the head and then a // function that kicks off block construction. - return api.forkchoiceUpdated(update, params, engine.PayloadV3, true) + return api.forkchoiceUpdated(ctx, update, params, engine.PayloadV3, true) } // NewPayloadWithWitnessV1 is analogous to NewPayloadV1, only it also generates diff --git a/eth/handler.go b/eth/handler.go index bb2cd5f88b..27b5e60697 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -86,7 +86,7 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. - Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction + Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions diff --git a/eth/handler_test.go b/eth/handler_test.go index 3470452980..fee6bae138 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -128,10 +128,11 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { } // Pending returns all the transactions known to the pool -func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { +func (p *testTxPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) { p.lock.RLock() defer p.lock.RUnlock() + var count int batches := make(map[common.Address][]*types.Transaction) for _, tx := range p.pool { from, _ := types.Sender(types.HomesteadSigner{}, tx) @@ -152,9 +153,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]* Gas: tx.Gas(), BlobGas: tx.BlobGas(), }) + count++ } } - return pending + return pending, count } // SubscribeTransactions should return an event subscription of NewTxsEvent and diff --git a/eth/sync.go b/eth/sync.go index ddae8443a3..8b4bd90abf 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -25,7 +25,8 @@ import ( // syncTransactions starts sending all currently pending transactions to the given peer. func (h *handler) syncTransactions(p *eth.Peer) { var hashes []common.Hash - for _, batch := range h.txpool.Pending(txpool.PendingFilter{BlobTxs: false}) { + pending, _ := h.txpool.Pending(txpool.PendingFilter{BlobTxs: false}) + for _, batch := range pending { for _, tx := range batch { hashes = append(hashes, tx.Hash) } diff --git a/internal/ethapi/simulate.go b/internal/ethapi/simulate.go index f0c69e39a4..ebe221114f 100644 --- a/internal/ethapi/simulate.go +++ b/internal/ethapi/simulate.go @@ -406,7 +406,7 @@ func (sim *simulator) processBlock(ctx context.Context, block *simBlock, header, Withdrawals: *block.BlockOverrides.Withdrawals, } chainHeadReader := &simChainHeadReader{ctx, sim.b} - b, err := sim.b.Engine().FinalizeAndAssemble(chainHeadReader, header, sim.state, blockBody, receipts) + b, err := sim.b.Engine().FinalizeAndAssemble(ctx, chainHeadReader, header, sim.state, blockBody, receipts) if err != nil { return nil, nil, nil, err } diff --git a/miner/miner.go b/miner/miner.go index ee890b5e54..0ff0237a08 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -18,6 +18,7 @@ package miner import ( + "context" "fmt" "math/big" "sync" @@ -135,8 +136,8 @@ func (miner *Miner) SetGasTip(tip *big.Int) error { } // BuildPayload builds the payload according to the provided parameters. -func (miner *Miner) BuildPayload(args *BuildPayloadArgs, witness bool) (*Payload, error) { - return miner.buildPayload(args, witness) +func (miner *Miner) BuildPayload(ctx context.Context, args *BuildPayloadArgs, witness bool) (*Payload, error) { + return miner.buildPayload(ctx, args, witness) } // getPending retrieves the pending block based on the current head block. @@ -156,16 +157,17 @@ func (miner *Miner) getPending() *newPayloadResult { if miner.chainConfig.IsShanghai(new(big.Int).Add(header.Number, big.NewInt(1)), timestamp) { withdrawal = []*types.Withdrawal{} } - ret := miner.generateWork(&generateParams{ - timestamp: timestamp, - forceTime: false, - parentHash: header.Hash(), - coinbase: miner.config.PendingFeeRecipient, - random: common.Hash{}, - withdrawals: withdrawal, - beaconRoot: nil, - noTxs: false, - }, false) // we will never make a witness for a pending block + ret := miner.generateWork(context.Background(), + &generateParams{ + timestamp: timestamp, + forceTime: false, + parentHash: header.Hash(), + coinbase: miner.config.PendingFeeRecipient, + random: common.Hash{}, + withdrawals: withdrawal, + beaconRoot: nil, + noTxs: false, + }, false) // we will never make a witness for a pending block if ret.err != nil { return nil } diff --git a/miner/payload_building.go b/miner/payload_building.go index 8bd9552338..97b4d0c509 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -17,6 +17,7 @@ package miner import ( + "context" "crypto/sha256" "encoding/binary" "math/big" @@ -28,9 +29,11 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/telemetry" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "go.opentelemetry.io/otel/trace" ) // BuildPayloadArgs contains the provided parameters for building payload. @@ -101,14 +104,15 @@ func newPayload(empty *types.Block, emptyRequests [][]byte, witness *stateless.W return payload } -// update updates the full-block with latest built version. -func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) { +// update updates the full-block with latest built version. It returns true if +// the update was accepted (i.e. the new block has higher fees than the previous). +func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) (result bool) { payload.lock.Lock() defer payload.lock.Unlock() select { case <-payload.stop: - return // reject stale update + return false // reject stale update default: } // Ensure the newly provided full block has a higher transaction fee. @@ -133,8 +137,10 @@ func (payload *Payload) update(r *newPayloadResult, elapsed time.Duration) { "root", r.block.Root(), "elapsed", common.PrettyDuration(elapsed), ) + result = true } payload.cond.Broadcast() // fire signal for notifying full block + return } // Resolve returns the latest built payload and also terminates the background @@ -209,8 +215,33 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope { return envelope } +func (miner *Miner) runBuildIteration(ctx context.Context, start time.Time, iteration int, payload *Payload, params *generateParams, witness bool) { + ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.buildIteration", + telemetry.Int64Attribute("iteration", int64(iteration)), + ) + var err error + defer spanEnd(&err) + + r := miner.generateWork(ctx, params, witness) + err = r.err + if err == nil { + accepted := payload.update(r, time.Since(start)) + span.SetAttributes(telemetry.BoolAttribute("update.accepted", accepted)) + } else { + log.Info("Error while generating work", "id", payload.id, "err", err) + } +} + // buildPayload builds the payload according to the provided parameters. -func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload, error) { +func (miner *Miner) buildPayload(ctx context.Context, args *BuildPayloadArgs, witness bool) (result *Payload, err error) { + payloadID := args.Id() + ctx, _, spanEnd := telemetry.StartSpan(ctx, "miner.buildPayload", + telemetry.StringAttribute("payload.id", payloadID.String()), + telemetry.StringAttribute("parent.hash", args.Parent.String()), + telemetry.Int64Attribute("timestamp", int64(args.Timestamp)), + ) + defer spanEnd(&err) + // Build the initial version with no transaction included. It should be fast // enough to run. The empty payload can at least make sure there is something // to deliver for not missing slot. @@ -225,16 +256,25 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload slotNum: args.SlotNum, noTxs: true, } - empty := miner.generateWork(emptyParams, witness) + empty := miner.generateWork(ctx, emptyParams, witness) if empty.err != nil { return nil, empty.err } // Construct a payload object for return. - payload := newPayload(empty.block, empty.requests, empty.witness, args.Id()) + payload := newPayload(empty.block, empty.requests, empty.witness, payloadID) // Spin up a routine for updating the payload in background. This strategy // can maximum the revenue for including transactions with highest fee. go func() { + var iteration int + bCtx, bSpan, bSpanEnd := telemetry.StartSpan(ctx, "miner.background", + telemetry.Int64Attribute("block.number", int64(empty.block.NumberU64())), + ) + defer func() { + bSpan.SetAttributes(telemetry.Int64Attribute("iterations.total", int64(iteration))) + bSpanEnd(nil) + }() + // Setup the timer for re-building the payload. The initial clock is kept // for triggering process immediately. timer := time.NewTimer(0) @@ -256,7 +296,6 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload slotNum: args.SlotNum, noTxs: false, } - for { select { case <-timer.C: @@ -267,22 +306,21 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload // Check payload.stop first to avoid an unnecessary generateWork. select { case <-payload.stop: + payload.updateSpanForDelivery(bSpan) log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery") return default: } start := time.Now() - r := miner.generateWork(fullParams, witness) - if r.err == nil { - payload.update(r, time.Since(start)) - } else { - log.Info("Error while generating work", "id", payload.id, "err", r.err) - } + iteration++ + miner.runBuildIteration(bCtx, start, iteration, payload, fullParams, witness) timer.Reset(max(0, miner.config.Recommit-time.Since(start))) case <-payload.stop: + payload.updateSpanForDelivery(bSpan) log.Info("Stopping work on payload", "id", payload.id, "reason", "delivery") return case <-endTimer.C: + bSpan.SetAttributes(telemetry.StringAttribute("exit.reason", "timeout")) log.Info("Stopping work on payload", "id", payload.id, "reason", "timeout") return } @@ -291,6 +329,16 @@ func (miner *Miner) buildPayload(args *BuildPayloadArgs, witness bool) (*Payload return payload, nil } +func (payload *Payload) updateSpanForDelivery(bSpan trace.Span) { + payload.lock.Lock() + emptyDelivered := payload.full == nil + payload.lock.Unlock() + bSpan.SetAttributes( + telemetry.StringAttribute("exit.reason", "delivery"), + telemetry.BoolAttribute("empty.delivered", emptyDelivered), + ) +} + // BuildTestingPayload is for testing_buildBlockV*. It creates a block with the exact content given // by the parameters instead of using the locally available transactions. func (miner *Miner) BuildTestingPayload(args *BuildPayloadArgs, transactions []*types.Transaction, empty bool, extraData []byte) (*engine.ExecutionPayloadEnvelope, error) { @@ -307,7 +355,7 @@ func (miner *Miner) BuildTestingPayload(args *BuildPayloadArgs, transactions []* overrideExtraData: extraData, overrideTxs: transactions, } - res := miner.generateWork(fullParams, false) + res := miner.generateWork(context.Background(), fullParams, false) if res.err != nil { return nil, res.err } diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index 295962d7ef..f8e495cc99 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -17,6 +17,7 @@ package miner import ( + "context" "math/big" "reflect" "testing" @@ -156,7 +157,7 @@ func TestBuildPayload(t *testing.T) { Random: common.Hash{}, FeeRecipient: recipient, } - payload, err := w.buildPayload(args, false) + payload, err := w.buildPayload(context.Background(), args, false) if err != nil { t.Fatalf("Failed to build payload %v", err) } diff --git a/miner/worker.go b/miner/worker.go index bfeeaa248b..e82f5f6e55 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -17,6 +17,7 @@ package miner import ( + "context" "errors" "fmt" "math/big" @@ -32,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/internal/telemetry" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/holiman/uint256" @@ -125,8 +127,23 @@ type generateParams struct { } // generateWork generates a sealing block based on the given parameters. -func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPayloadResult { - work, err := miner.prepareWork(genParam, witness) +func (miner *Miner) generateWork(ctx context.Context, genParam *generateParams, witness bool) (result *newPayloadResult) { + ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.generateWork") + defer func() { + if result != nil && result.err == nil { + span.SetAttributes( + telemetry.Int64Attribute("txs.count", int64(len(result.block.Transactions()))), + telemetry.Int64Attribute("gas.used", int64(result.block.GasUsed())), + telemetry.StringAttribute("fees", result.fees.String()), + ) + } + if result != nil { + spanEnd(&result.err) + } else { + spanEnd(nil) + } + }() + work, err := miner.prepareWork(ctx, genParam, witness) if err != nil { return &newPayloadResult{err: err} } @@ -148,7 +165,7 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay if genParam.forceOverrides && len(genParam.overrideTxs) > 0 { for _, tx := range genParam.overrideTxs { work.state.SetTxContext(tx.Hash(), work.tcount) - if err := miner.commitTransaction(work, tx); err != nil { + if err := miner.commitTransaction(ctx, work, tx); err != nil { // all passed transactions HAVE to be valid at this point return &newPayloadResult{err: err} } @@ -160,7 +177,7 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay }) defer timer.Stop() - err := miner.fillTransactions(interrupt, work) + err := miner.fillTransactions(ctx, interrupt, work) if errors.Is(err, errBlockInterruptedByTimeout) { log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(miner.config.Recommit)) } @@ -195,7 +212,7 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay work.header.RequestsHash = &reqHash } - block, err := miner.engine.FinalizeAndAssemble(miner.chain, work.header, work.state, &body, work.receipts) + block, err := miner.engine.FinalizeAndAssemble(ctx, miner.chain, work.header, work.state, &body, work.receipts) if err != nil { return &newPayloadResult{err: err} } @@ -213,7 +230,9 @@ func (miner *Miner) generateWork(genParam *generateParams, witness bool) *newPay // prepareWork constructs the sealing task according to the given parameters, // either based on the last chain head or specified parent. In this function // the pending transactions are not filled yet, only the empty task returned. -func (miner *Miner) prepareWork(genParams *generateParams, witness bool) (*environment, error) { +func (miner *Miner) prepareWork(ctx context.Context, genParams *generateParams, witness bool) (result *environment, err error) { + _, _, spanEnd := telemetry.StartSpan(ctx, "miner.prepareWork") + defer spanEnd(&err) miner.confMu.RLock() defer miner.confMu.RUnlock() @@ -330,7 +349,9 @@ func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase }, nil } -func (miner *Miner) commitTransaction(env *environment, tx *types.Transaction) error { +func (miner *Miner) commitTransaction(ctx context.Context, env *environment, tx *types.Transaction) (err error) { + _, _, spanEnd := telemetry.StartSpan(ctx, "miner.commitTransaction") + defer spanEnd(&err) if tx.Type() == types.BlobTxType { return miner.commitBlobTransaction(env, tx) } @@ -389,7 +410,9 @@ func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (* return receipt, nil } -func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { +func (miner *Miner) commitTransactions(ctx context.Context, env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { + ctx, _, spanEnd := telemetry.StartSpan(ctx, "miner.commitTransactions") + defer spanEnd(nil) isCancun := miner.chainConfig.IsCancun(env.header.Number, env.header.Time) for { // Check interruption signal and abort building if it's fired. @@ -479,7 +502,7 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran // Start executing the transaction env.state.SetTxContext(tx.Hash(), env.tcount) - err := miner.commitTransaction(env, tx) + err := miner.commitTransaction(ctx, env, tx) switch { case errors.Is(err, core.ErrNonceTooLow): // New head notification data race between the transaction pool and miner, shift @@ -503,7 +526,9 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) error { +func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int32, env *environment) (err error) { + ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.fillTransactions") + defer spanEnd(&err) miner.confMu.RLock() tip := miner.config.GasPrice prio := miner.prio @@ -523,7 +548,7 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) filter.GasLimitCap = params.MaxTxGas } filter.BlobTxs = false - pendingPlainTxs := miner.txpool.Pending(filter) + pendingPlainTxs, plainTxCount := miner.txpool.Pending(filter) filter.BlobTxs = true if miner.chainConfig.IsOsaka(env.header.Number, env.header.Time) { @@ -531,7 +556,11 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) } else { filter.BlobVersion = types.BlobSidecarVersion0 } - pendingBlobTxs := miner.txpool.Pending(filter) + pendingBlobTxs, blobTxCount := miner.txpool.Pending(filter) + span.SetAttributes( + telemetry.Int64Attribute("pending.plain.count", int64(plainTxCount)), + telemetry.Int64Attribute("pending.blob.count", int64(blobTxCount)), + ) // Split the pending transactions into locals and remotes. prioPlainTxs, normalPlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs @@ -552,7 +581,7 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee) blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee) - if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { + if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil { return err } } @@ -560,7 +589,7 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee) blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee) - if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { + if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil { return err } }