diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index da54952674..15f4430cc6 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -19,6 +19,7 @@ package blobpool import ( "container/heap" + "context" "errors" "fmt" "math" @@ -39,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/internal/telemetry" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -1620,7 +1622,10 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { // The version argument specifies the type of proofs to return, either the // blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is // CPU intensive and prohibited in the blobpool explicitly. -func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) { +func (p *BlobPool) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { + _, _, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs") + defer spanEnd(&err) + var ( blobs = make([]*kzg4844.Blob, len(vhashes)) commitments = make([]kzg4844.Commitment, len(vhashes)) diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 8032e21e8a..49ff4bfe1c 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -18,6 +18,7 @@ package blobpool import ( "bytes" + "context" "crypto/ecdsa" "crypto/sha256" "errors" @@ -440,11 +441,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { hashes = append(hashes, tx.vhashes...) } } - blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0) + blobs1, _, proofs1, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion0) if err != nil { t.Fatal(err) } - blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1) + blobs2, _, proofs2, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion1) if err != nil { t.Fatal(err) } @@ -2087,7 +2088,7 @@ func TestGetBlobs(t *testing.T) { filled[len(vhashes)] = struct{}{} vhashes = append(vhashes, testrand.Hash()) } - blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version) + blobs, _, proofs, err := pool.GetBlobs(context.Background(), vhashes, c.version) if err != nil { t.Errorf("Unexpected error for case %d, %v", i, err) } diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index a1f9673de8..4109971dc8 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -552,7 +552,19 @@ func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool, versi // // Client software MAY return an array of all null entries if syncing or otherwise // unable to serve blob pool data. -func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofListV1, error) { +func (api *ConsensusAPI) GetBlobsV1(ctx context.Context, hashes []common.Hash) (result engine.BlobAndProofListV1, err error) { + var ( + filled int + attrs = []telemetry.Attribute{ + telemetry.Int64Attribute("blobs.requested", int64(len(hashes))), + } + ) + ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobsV1", attrs...) + defer func() { + span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled))) + spanEnd(&err) + }() + // Reject the request if Osaka has been activated. // follow https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#cancun-api head := api.eth.BlockChain().CurrentHeader() @@ -562,7 +574,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofLi if len(hashes) > 128 { return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0) + blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion0) if err != nil { return nil, engine.InvalidParams.With(err) } @@ -576,6 +588,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofLi Blob: blobs[i][:], Proof: proofs[i][0][:], } + filled++ } return res, nil } @@ -605,28 +618,40 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofLi // // Client software MUST return null if syncing or otherwise unable to serve // blob pool data. -func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) (engine.BlobAndProofListV2, error) { +func (api *ConsensusAPI) GetBlobsV2(ctx context.Context, hashes []common.Hash) (engine.BlobAndProofListV2, error) { head := api.eth.BlockChain().CurrentHeader() if api.config().LatestFork(head.Time) < forks.Osaka { return nil, nil } - return api.getBlobs(hashes, true) + return api.getBlobs(ctx, hashes, true) } // GetBlobsV3 returns a set of blobs from the transaction pool. Same as // GetBlobsV2, except will return partial responses in case there is a missing // blob. -func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) (engine.BlobAndProofListV2, error) { +func (api *ConsensusAPI) GetBlobsV3(ctx context.Context, hashes []common.Hash) (engine.BlobAndProofListV2, error) { head := api.eth.BlockChain().CurrentHeader() if api.config().LatestFork(head.Time) < forks.Osaka { return nil, nil } - return api.getBlobs(hashes, false) + return api.getBlobs(ctx, hashes, false) } // getBlobs returns all available blobs. In v2, partial responses are not allowed, // while v3 supports partial responses. -func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAndProofListV2, error) { +func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 bool) (result engine.BlobAndProofListV2, err error) { + var ( + filled int + attrs = []telemetry.Attribute{ + telemetry.Int64Attribute("blobs.requested", int64(len(hashes))), + } + ) + ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobs", attrs...) + defer func() { + span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled))) + spanEnd(&err) + }() + if len(hashes) > 128 { return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } @@ -641,12 +666,11 @@ func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAnd } // Retrieve blobs from the pool. This operation is expensive and may involve // heavy disk I/O. - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1) + blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion1) if err != nil { return nil, engine.InvalidParams.With(err) } // Validate the blobs from the pool and assemble the response - filled := 0 res := make(engine.BlobAndProofListV2, len(hashes)) for i := range blobs { // The blob has been evicted since the last AvailableBlobs call. diff --git a/eth/catalyst/api_benchmark_test.go b/eth/catalyst/api_benchmark_test.go index 377e5caa43..ee0a0a4888 100644 --- a/eth/catalyst/api_benchmark_test.go +++ b/eth/catalyst/api_benchmark_test.go @@ -302,7 +302,7 @@ func BenchmarkGetBlobsV1(b *testing.B) { b.ResetTimer() for b.Loop() { - result, err := env.api.GetBlobsV1(env.vhashes) + result, err := env.api.GetBlobsV1(context.Background(), env.vhashes) if err != nil { b.Fatalf("GetBlobsV1 failed: %v", err) } @@ -329,7 +329,7 @@ func BenchmarkGetBlobsV2Extended(b *testing.B) { b.ResetTimer() for b.Loop() { - result, err := env.api.GetBlobsV2(env.vhashes) + result, err := env.api.GetBlobsV2(context.Background(), env.vhashes) if err != nil { b.Fatalf("GetBlobsV2 failed: %v", err) } @@ -356,7 +356,7 @@ func BenchmarkGetBlobsV3(b *testing.B) { b.ResetTimer() for b.Loop() { - result, err := env.api.GetBlobsV3(env.vhashes) + result, err := env.api.GetBlobsV3(context.Background(), env.vhashes) if err != nil { b.Fatalf("GetBlobsV3 failed: %v", err) } @@ -708,7 +708,7 @@ func BenchmarkGetBlobsV3RPCServerOnly(b *testing.B) { rpcServer.RegisterName("engine", env.api) // Verify the blobs are available via the direct API first. - result, err := env.api.GetBlobsV3(env.vhashes) + result, err := env.api.GetBlobsV3(context.Background(), env.vhashes) if err != nil { b.Fatalf("GetBlobsV3 failed: %v", err) } diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 05d688ed9f..0cf2c5c8e6 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -1987,7 +1987,7 @@ func TestGetBlobsV1(t *testing.T) { vhashes = append(vhashes, testrand.Hash()) expect = append(expect, nil) } - result, err := api.GetBlobsV1(vhashes) + result, err := api.GetBlobsV1(context.Background(), vhashes) if err != nil { t.Errorf("Unexpected error for case %d, %v", i, err) } @@ -2009,7 +2009,7 @@ func TestGetBlobsV1AfterOsakaFork(t *testing.T) { var engineErr *engine.EngineAPIError api := newConsensusAPIWithoutHeartbeat(ethServ) - _, err := api.GetBlobsV1([]common.Hash{testrand.Hash()}) + _, err := api.GetBlobsV1(context.Background(), []common.Hash{testrand.Hash()}) if !errors.As(err, &engineErr) { t.Fatalf("Unexpected error: %T", err) } else { @@ -2073,7 +2073,7 @@ func BenchmarkGetBlobsV2(b *testing.B) { } } -type getBlobsFn func(hashes []common.Hash) (engine.BlobAndProofListV2, error) +type getBlobsFn func(ctx context.Context, hashes []common.Hash) (engine.BlobAndProofListV2, error) func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom bool, expectPartialResponse bool, name string) { // Fill the request for retrieving blobs @@ -2096,7 +2096,7 @@ func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom if fillRandom { vhashes = append(vhashes, testrand.Hash()) } - result, err := getBlobs(vhashes) + result, err := getBlobs(context.Background(), vhashes) if err != nil { t.Errorf("Unexpected error for case %s, %v", name, err) }