ethstats: report newPayload processing time to stats server (#33395)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

Add NewPayloadEvent to track engine API newPayload block processing
times and report them to ethstats. This enables monitoring of block
processing performance.

https://notes.ethereum.org/@savid/block-observability

related: https://github.com/ethereum/go-ethereum/pull/33231

---------

Co-authored-by: MariusVanDerWijden <m.vanderwijden@live.de>
This commit is contained in:
Andrew Davis 2026-01-06 03:49:30 +11:00 committed by GitHub
parent de5ea2ffd8
commit a8a4804895
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 89 additions and 7 deletions

View file

@ -311,6 +311,7 @@ type BlockChain struct {
chainHeadFeed event.Feed chainHeadFeed event.Feed
logsFeed event.Feed logsFeed event.Feed
blockProcFeed event.Feed blockProcFeed event.Feed
newPayloadFeed event.Feed // Feed for engine API newPayload events
blockProcCounter int32 blockProcCounter int32
scope event.SubscriptionScope scope event.SubscriptionScope
genesisBlock *types.Block genesisBlock *types.Block

View file

@ -522,3 +522,13 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
} }
// SubscribeNewPayloadEvent registers a subscription for NewPayloadEvent.
func (bc *BlockChain) SubscribeNewPayloadEvent(ch chan<- NewPayloadEvent) event.Subscription {
return bc.scope.Track(bc.newPayloadFeed.Subscribe(ch))
}
// SendNewPayloadEvent sends a NewPayloadEvent to subscribers.
func (bc *BlockChain) SendNewPayloadEvent(ev NewPayloadEvent) {
bc.newPayloadFeed.Send(ev)
}

View file

@ -17,6 +17,9 @@
package core package core
import ( import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
@ -35,3 +38,10 @@ type ChainEvent struct {
type ChainHeadEvent struct { type ChainHeadEvent struct {
Header *types.Header Header *types.Header
} }
// NewPayloadEvent is posted when engine_newPayloadVX processes a block.
type NewPayloadEvent struct {
Hash common.Hash
Number uint64
ProcessingTime time.Duration
}

View file

@ -315,6 +315,11 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch) return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
} }
// SubscribeNewPayloadEvent registers a subscription for NewPayloadEvent.
func (b *EthAPIBackend) SubscribeNewPayloadEvent(ch chan<- core.NewPayloadEvent) event.Subscription {
return b.eth.BlockChain().SubscribeNewPayloadEvent(ch)
}
func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch) return b.eth.BlockChain().SubscribeLogsEvent(ch)
} }

View file

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
@ -787,7 +788,9 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
} }
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number()) log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number())
start := time.Now()
proofs, err := api.eth.BlockChain().InsertBlockWithoutSetHead(block, witness) proofs, err := api.eth.BlockChain().InsertBlockWithoutSetHead(block, witness)
processingTime := time.Since(start)
if err != nil { if err != nil {
log.Warn("NewPayload: inserting block failed", "error", err) log.Warn("NewPayload: inserting block failed", "error", err)
@ -800,6 +803,13 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
} }
hash := block.Hash() hash := block.Hash()
// Emit NewPayloadEvent for ethstats reporting
api.eth.BlockChain().SendNewPayloadEvent(core.NewPayloadEvent{
Hash: hash,
Number: block.NumberU64(),
ProcessingTime: processingTime,
})
// If witness collection was requested, inject that into the result too // If witness collection was requested, inject that into the result too
var ow *hexutil.Bytes var ow *hexutil.Bytes
if proofs != nil { if proofs != nil {

View file

@ -63,6 +63,7 @@ const (
type backend interface { type backend interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription
SubscribeNewPayloadEvent(ch chan<- core.NewPayloadEvent) event.Subscription
CurrentHeader() *types.Header CurrentHeader() *types.Header
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
Stats() (pending int, queued int) Stats() (pending int, queued int)
@ -92,8 +93,9 @@ type Service struct {
pongCh chan struct{} // Pong notifications are fed into this channel pongCh chan struct{} // Pong notifications are fed into this channel
histCh chan []uint64 // History request block numbers are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel
headSub event.Subscription headSub event.Subscription
txSub event.Subscription txSub event.Subscription
newPayloadSub event.Subscription
} }
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the // connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
@ -198,7 +200,9 @@ func (s *Service) Start() error {
s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh) s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh)
txEventCh := make(chan core.NewTxsEvent, txChanSize) txEventCh := make(chan core.NewTxsEvent, txChanSize)
s.txSub = s.backend.SubscribeNewTxsEvent(txEventCh) s.txSub = s.backend.SubscribeNewTxsEvent(txEventCh)
go s.loop(chainHeadCh, txEventCh) newPayloadCh := make(chan core.NewPayloadEvent, chainHeadChanSize)
s.newPayloadSub = s.backend.SubscribeNewPayloadEvent(newPayloadCh)
go s.loop(chainHeadCh, txEventCh, newPayloadCh)
log.Info("Stats daemon started") log.Info("Stats daemon started")
return nil return nil
@ -208,18 +212,20 @@ func (s *Service) Start() error {
func (s *Service) Stop() error { func (s *Service) Stop() error {
s.headSub.Unsubscribe() s.headSub.Unsubscribe()
s.txSub.Unsubscribe() s.txSub.Unsubscribe()
s.newPayloadSub.Unsubscribe()
log.Info("Stats daemon stopped") log.Info("Stats daemon stopped")
return nil return nil
} }
// loop keeps trying to connect to the netstats server, reporting chain events // loop keeps trying to connect to the netstats server, reporting chain events
// until termination. // until termination.
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent) { func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent, newPayloadCh chan core.NewPayloadEvent) {
// Start a goroutine that exhausts the subscriptions to avoid events piling up // Start a goroutine that exhausts the subscriptions to avoid events piling up
var ( var (
quitCh = make(chan struct{}) quitCh = make(chan struct{})
headCh = make(chan *types.Header, 1) headCh = make(chan *types.Header, 1)
txCh = make(chan struct{}, 1) txCh = make(chan struct{}, 1)
newPayloadEvCh = make(chan core.NewPayloadEvent, 1)
) )
go func() { go func() {
var lastTx mclock.AbsTime var lastTx mclock.AbsTime
@ -246,11 +252,20 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
default: default:
} }
// Notify of new payload events, but drop if too frequent
case ev := <-newPayloadCh:
select {
case newPayloadEvCh <- ev:
default:
}
// node stopped // node stopped
case <-s.txSub.Err(): case <-s.txSub.Err():
break HandleLoop break HandleLoop
case <-s.headSub.Err(): case <-s.headSub.Err():
break HandleLoop break HandleLoop
case <-s.newPayloadSub.Err():
break HandleLoop
} }
} }
close(quitCh) close(quitCh)
@ -336,6 +351,10 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
if err = s.reportPending(conn); err != nil { if err = s.reportPending(conn); err != nil {
log.Warn("Post-block transaction stats report failed", "err", err) log.Warn("Post-block transaction stats report failed", "err", err)
} }
case ev := <-newPayloadEvCh:
if err = s.reportNewPayload(conn, ev); err != nil {
log.Warn("New payload stats report failed", "err", err)
}
case <-txCh: case <-txCh:
if err = s.reportPending(conn); err != nil { if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err) log.Warn("Transaction stats report failed", "err", err)
@ -600,6 +619,33 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
return []byte("[]"), nil return []byte("[]"), nil
} }
// newPayloadStats is the information to report about new payload events.
type newPayloadStats struct {
Number uint64 `json:"number"`
Hash common.Hash `json:"hash"`
ProcessingTime uint64 `json:"processingTime"` // nanoseconds
}
// reportNewPayload reports a new payload event to the stats server.
func (s *Service) reportNewPayload(conn *connWrapper, ev core.NewPayloadEvent) error {
details := &newPayloadStats{
Number: ev.Number,
Hash: ev.Hash,
ProcessingTime: uint64(ev.ProcessingTime.Nanoseconds()),
}
log.Trace("Sending new payload to ethstats", "number", details.Number, "hash", details.Hash)
stats := map[string]interface{}{
"id": s.node,
"block": details,
}
report := map[string][]interface{}{
"emit": {"block_new_payload", stats},
}
return conn.WriteJSON(report)
}
// reportBlock retrieves the current chain head and reports it to the stats server. // reportBlock retrieves the current chain head and reports it to the stats server.
func (s *Service) reportBlock(conn *connWrapper, header *types.Header) error { func (s *Service) reportBlock(conn *connWrapper, header *types.Header) error {
// Gather the block details from the header or block chain // Gather the block details from the header or block chain