mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-22 07:49:26 +00:00
eth/catalyst, core/txpool/blobpool: add tracing to GetBlobs endpoints (#35026)
- Adds tracing to the `GetBlobsV1/V2/V3` - Adds `blobs.requested` and `blobs.filled` attributes to `GetBlobsV1/V2/V3` spans. - Adds tracing to `BlobTxPool().GetBlobs()`
This commit is contained in:
parent
4daaaadfc4
commit
2522b716f4
5 changed files with 51 additions and 21 deletions
|
|
@ -19,6 +19,7 @@ package blobpool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
|
@ -39,6 +40,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto/kzg4844"
|
"github.com/ethereum/go-ethereum/crypto/kzg4844"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
|
"github.com/ethereum/go-ethereum/internal/telemetry"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
|
@ -1620,7 +1622,10 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
|
||||||
// The version argument specifies the type of proofs to return, either the
|
// The version argument specifies the type of proofs to return, either the
|
||||||
// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is
|
// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is
|
||||||
// CPU intensive and prohibited in the blobpool explicitly.
|
// CPU intensive and prohibited in the blobpool explicitly.
|
||||||
func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
|
func (p *BlobPool) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) {
|
||||||
|
_, _, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs")
|
||||||
|
defer spanEnd(&err)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
blobs = make([]*kzg4844.Blob, len(vhashes))
|
blobs = make([]*kzg4844.Blob, len(vhashes))
|
||||||
commitments = make([]kzg4844.Commitment, len(vhashes))
|
commitments = make([]kzg4844.Commitment, len(vhashes))
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package blobpool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
@ -440,11 +441,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
|
||||||
hashes = append(hashes, tx.vhashes...)
|
hashes = append(hashes, tx.vhashes...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0)
|
blobs1, _, proofs1, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1)
|
blobs2, _, proofs2, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
@ -2087,7 +2088,7 @@ func TestGetBlobs(t *testing.T) {
|
||||||
filled[len(vhashes)] = struct{}{}
|
filled[len(vhashes)] = struct{}{}
|
||||||
vhashes = append(vhashes, testrand.Hash())
|
vhashes = append(vhashes, testrand.Hash())
|
||||||
}
|
}
|
||||||
blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version)
|
blobs, _, proofs, err := pool.GetBlobs(context.Background(), vhashes, c.version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error for case %d, %v", i, err)
|
t.Errorf("Unexpected error for case %d, %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -552,7 +552,19 @@ func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool, versi
|
||||||
//
|
//
|
||||||
// Client software MAY return an array of all null entries if syncing or otherwise
|
// Client software MAY return an array of all null entries if syncing or otherwise
|
||||||
// unable to serve blob pool data.
|
// unable to serve blob pool data.
|
||||||
func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofListV1, error) {
|
func (api *ConsensusAPI) GetBlobsV1(ctx context.Context, hashes []common.Hash) (result engine.BlobAndProofListV1, err error) {
|
||||||
|
var (
|
||||||
|
filled int
|
||||||
|
attrs = []telemetry.Attribute{
|
||||||
|
telemetry.Int64Attribute("blobs.requested", int64(len(hashes))),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobsV1", attrs...)
|
||||||
|
defer func() {
|
||||||
|
span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled)))
|
||||||
|
spanEnd(&err)
|
||||||
|
}()
|
||||||
|
|
||||||
// Reject the request if Osaka has been activated.
|
// Reject the request if Osaka has been activated.
|
||||||
// follow https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#cancun-api
|
// follow https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#cancun-api
|
||||||
head := api.eth.BlockChain().CurrentHeader()
|
head := api.eth.BlockChain().CurrentHeader()
|
||||||
|
|
@ -562,7 +574,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofLi
|
||||||
if len(hashes) > 128 {
|
if len(hashes) > 128 {
|
||||||
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
|
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
|
||||||
}
|
}
|
||||||
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0)
|
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, engine.InvalidParams.With(err)
|
return nil, engine.InvalidParams.With(err)
|
||||||
}
|
}
|
||||||
|
|
@ -576,6 +588,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofLi
|
||||||
Blob: blobs[i][:],
|
Blob: blobs[i][:],
|
||||||
Proof: proofs[i][0][:],
|
Proof: proofs[i][0][:],
|
||||||
}
|
}
|
||||||
|
filled++
|
||||||
}
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
@ -605,28 +618,40 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofLi
|
||||||
//
|
//
|
||||||
// Client software MUST return null if syncing or otherwise unable to serve
|
// Client software MUST return null if syncing or otherwise unable to serve
|
||||||
// blob pool data.
|
// blob pool data.
|
||||||
func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) (engine.BlobAndProofListV2, error) {
|
func (api *ConsensusAPI) GetBlobsV2(ctx context.Context, hashes []common.Hash) (engine.BlobAndProofListV2, error) {
|
||||||
head := api.eth.BlockChain().CurrentHeader()
|
head := api.eth.BlockChain().CurrentHeader()
|
||||||
if api.config().LatestFork(head.Time) < forks.Osaka {
|
if api.config().LatestFork(head.Time) < forks.Osaka {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return api.getBlobs(hashes, true)
|
return api.getBlobs(ctx, hashes, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBlobsV3 returns a set of blobs from the transaction pool. Same as
|
// GetBlobsV3 returns a set of blobs from the transaction pool. Same as
|
||||||
// GetBlobsV2, except will return partial responses in case there is a missing
|
// GetBlobsV2, except will return partial responses in case there is a missing
|
||||||
// blob.
|
// blob.
|
||||||
func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) (engine.BlobAndProofListV2, error) {
|
func (api *ConsensusAPI) GetBlobsV3(ctx context.Context, hashes []common.Hash) (engine.BlobAndProofListV2, error) {
|
||||||
head := api.eth.BlockChain().CurrentHeader()
|
head := api.eth.BlockChain().CurrentHeader()
|
||||||
if api.config().LatestFork(head.Time) < forks.Osaka {
|
if api.config().LatestFork(head.Time) < forks.Osaka {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return api.getBlobs(hashes, false)
|
return api.getBlobs(ctx, hashes, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBlobs returns all available blobs. In v2, partial responses are not allowed,
|
// getBlobs returns all available blobs. In v2, partial responses are not allowed,
|
||||||
// while v3 supports partial responses.
|
// while v3 supports partial responses.
|
||||||
func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAndProofListV2, error) {
|
func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 bool) (result engine.BlobAndProofListV2, err error) {
|
||||||
|
var (
|
||||||
|
filled int
|
||||||
|
attrs = []telemetry.Attribute{
|
||||||
|
telemetry.Int64Attribute("blobs.requested", int64(len(hashes))),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobs", attrs...)
|
||||||
|
defer func() {
|
||||||
|
span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled)))
|
||||||
|
spanEnd(&err)
|
||||||
|
}()
|
||||||
|
|
||||||
if len(hashes) > 128 {
|
if len(hashes) > 128 {
|
||||||
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
|
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
|
||||||
}
|
}
|
||||||
|
|
@ -641,12 +666,11 @@ func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAnd
|
||||||
}
|
}
|
||||||
// Retrieve blobs from the pool. This operation is expensive and may involve
|
// Retrieve blobs from the pool. This operation is expensive and may involve
|
||||||
// heavy disk I/O.
|
// heavy disk I/O.
|
||||||
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1)
|
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, engine.InvalidParams.With(err)
|
return nil, engine.InvalidParams.With(err)
|
||||||
}
|
}
|
||||||
// Validate the blobs from the pool and assemble the response
|
// Validate the blobs from the pool and assemble the response
|
||||||
filled := 0
|
|
||||||
res := make(engine.BlobAndProofListV2, len(hashes))
|
res := make(engine.BlobAndProofListV2, len(hashes))
|
||||||
for i := range blobs {
|
for i := range blobs {
|
||||||
// The blob has been evicted since the last AvailableBlobs call.
|
// The blob has been evicted since the last AvailableBlobs call.
|
||||||
|
|
|
||||||
|
|
@ -302,7 +302,7 @@ func BenchmarkGetBlobsV1(b *testing.B) {
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for b.Loop() {
|
for b.Loop() {
|
||||||
result, err := env.api.GetBlobsV1(env.vhashes)
|
result, err := env.api.GetBlobsV1(context.Background(), env.vhashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("GetBlobsV1 failed: %v", err)
|
b.Fatalf("GetBlobsV1 failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -329,7 +329,7 @@ func BenchmarkGetBlobsV2Extended(b *testing.B) {
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for b.Loop() {
|
for b.Loop() {
|
||||||
result, err := env.api.GetBlobsV2(env.vhashes)
|
result, err := env.api.GetBlobsV2(context.Background(), env.vhashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("GetBlobsV2 failed: %v", err)
|
b.Fatalf("GetBlobsV2 failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -356,7 +356,7 @@ func BenchmarkGetBlobsV3(b *testing.B) {
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for b.Loop() {
|
for b.Loop() {
|
||||||
result, err := env.api.GetBlobsV3(env.vhashes)
|
result, err := env.api.GetBlobsV3(context.Background(), env.vhashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("GetBlobsV3 failed: %v", err)
|
b.Fatalf("GetBlobsV3 failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -708,7 +708,7 @@ func BenchmarkGetBlobsV3RPCServerOnly(b *testing.B) {
|
||||||
rpcServer.RegisterName("engine", env.api)
|
rpcServer.RegisterName("engine", env.api)
|
||||||
|
|
||||||
// Verify the blobs are available via the direct API first.
|
// Verify the blobs are available via the direct API first.
|
||||||
result, err := env.api.GetBlobsV3(env.vhashes)
|
result, err := env.api.GetBlobsV3(context.Background(), env.vhashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("GetBlobsV3 failed: %v", err)
|
b.Fatalf("GetBlobsV3 failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1987,7 +1987,7 @@ func TestGetBlobsV1(t *testing.T) {
|
||||||
vhashes = append(vhashes, testrand.Hash())
|
vhashes = append(vhashes, testrand.Hash())
|
||||||
expect = append(expect, nil)
|
expect = append(expect, nil)
|
||||||
}
|
}
|
||||||
result, err := api.GetBlobsV1(vhashes)
|
result, err := api.GetBlobsV1(context.Background(), vhashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error for case %d, %v", i, err)
|
t.Errorf("Unexpected error for case %d, %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
@ -2009,7 +2009,7 @@ func TestGetBlobsV1AfterOsakaFork(t *testing.T) {
|
||||||
|
|
||||||
var engineErr *engine.EngineAPIError
|
var engineErr *engine.EngineAPIError
|
||||||
api := newConsensusAPIWithoutHeartbeat(ethServ)
|
api := newConsensusAPIWithoutHeartbeat(ethServ)
|
||||||
_, err := api.GetBlobsV1([]common.Hash{testrand.Hash()})
|
_, err := api.GetBlobsV1(context.Background(), []common.Hash{testrand.Hash()})
|
||||||
if !errors.As(err, &engineErr) {
|
if !errors.As(err, &engineErr) {
|
||||||
t.Fatalf("Unexpected error: %T", err)
|
t.Fatalf("Unexpected error: %T", err)
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -2073,7 +2073,7 @@ func BenchmarkGetBlobsV2(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type getBlobsFn func(hashes []common.Hash) (engine.BlobAndProofListV2, error)
|
type getBlobsFn func(ctx context.Context, hashes []common.Hash) (engine.BlobAndProofListV2, error)
|
||||||
|
|
||||||
func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom bool, expectPartialResponse bool, name string) {
|
func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom bool, expectPartialResponse bool, name string) {
|
||||||
// Fill the request for retrieving blobs
|
// Fill the request for retrieving blobs
|
||||||
|
|
@ -2096,7 +2096,7 @@ func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom
|
||||||
if fillRandom {
|
if fillRandom {
|
||||||
vhashes = append(vhashes, testrand.Hash())
|
vhashes = append(vhashes, testrand.Hash())
|
||||||
}
|
}
|
||||||
result, err := getBlobs(vhashes)
|
result, err := getBlobs(context.Background(), vhashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error for case %s, %v", name, err)
|
t.Errorf("Unexpected error for case %s, %v", name, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue