eth/catalyst: avoid load the same blob tx multi times (#32190)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

- If all the `vhashes` are in the same `sidecar`, then it will load the
same blob tx many times. This PR aims to upgrade this.

---------

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
maskpp 2025-08-05 13:07:45 +08:00 committed by GitHub
parent cf50026466
commit e9dca3b181
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 306 additions and 124 deletions

View file

@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
@ -1299,32 +1300,86 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
// GetBlobs returns a number of blobs and proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
func (p *BlobPool) GetBlobs(vhashes []common.Hash) []*types.BlobTxSidecar {
sidecars := make([]*types.BlobTxSidecar, len(vhashes))
for idx, vhash := range vhashes {
// Retrieve the datastore item (in a short lock)
p.lock.RLock()
id, exists := p.lookup.storeidOfBlob(vhash)
if !exists {
p.lock.RUnlock()
continue
}
data, err := p.store.Get(id)
p.lock.RUnlock()
func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
var (
blobs = make([]*kzg4844.Blob, len(vhashes))
commitments = make([]kzg4844.Commitment, len(vhashes))
proofs = make([][]kzg4844.Proof, len(vhashes))
// After releasing the lock, try to fill any blobs requested
if err != nil {
log.Error("Tracked blob transaction missing from store", "id", id, "err", err)
continue
}
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", id, "err", err)
continue
}
sidecars[idx] = item.BlobTxSidecar()
indices = make(map[common.Hash][]int)
filled = make(map[common.Hash]struct{})
)
for i, h := range vhashes {
indices[h] = append(indices[h], i)
}
return sidecars
for _, vhash := range vhashes {
// Skip duplicate vhash that was already resolved in a previous iteration
if _, ok := filled[vhash]; ok {
continue
}
// Retrieve the corresponding blob tx with the vhash
p.lock.RLock()
txID, exists := p.lookup.storeidOfBlob(vhash)
p.lock.RUnlock()
if !exists {
return nil, nil, nil, fmt.Errorf("blob with vhash %x is not found", vhash)
}
data, err := p.store.Get(txID)
if err != nil {
return nil, nil, nil, err
}
// Decode the blob transaction
tx := new(types.Transaction)
if err := rlp.DecodeBytes(data, tx); err != nil {
return nil, nil, nil, err
}
sidecar := tx.BlobTxSidecar()
if sidecar == nil {
return nil, nil, nil, fmt.Errorf("blob tx without sidecar %x", tx.Hash())
}
// Traverse the blobs in the transaction
for i, hash := range tx.BlobHashes() {
list, ok := indices[hash]
if !ok {
continue // non-interesting blob
}
var pf []kzg4844.Proof
switch version {
case types.BlobSidecarVersion0:
if sidecar.Version == types.BlobSidecarVersion0 {
pf = []kzg4844.Proof{sidecar.Proofs[i]}
} else {
proof, err := kzg4844.ComputeBlobProof(&sidecar.Blobs[i], sidecar.Commitments[i])
if err != nil {
return nil, nil, nil, err
}
pf = []kzg4844.Proof{proof}
}
case types.BlobSidecarVersion1:
if sidecar.Version == types.BlobSidecarVersion0 {
cellProofs, err := kzg4844.ComputeCellProofs(&sidecar.Blobs[i])
if err != nil {
return nil, nil, nil, err
}
pf = cellProofs
} else {
cellProofs, err := sidecar.CellProofsAt(i)
if err != nil {
return nil, nil, nil, err
}
pf = cellProofs
}
}
for _, index := range list {
blobs[index] = &sidecar.Blobs[i]
commitments[index] = sidecar.Commitments[i]
proofs[index] = pf
}
filled[hash] = struct{}{}
}
}
return blobs, commitments, proofs, nil
}
// AvailableBlobs returns the number of blobs that are available in the subpool.

View file

@ -26,6 +26,7 @@ import (
"math/big"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
@ -50,6 +51,7 @@ var (
testBlobCommits []kzg4844.Commitment
testBlobProofs []kzg4844.Proof
testBlobVHashes [][32]byte
testBlobIndices = make(map[[32]byte]int)
)
const testMaxBlobsPerBlock = 6
@ -66,6 +68,7 @@ func init() {
testBlobProofs = append(testBlobProofs, testBlobProof)
testBlobVHash := kzg4844.CalcBlobHashV1(sha256.New(), &testBlobCommit)
testBlobIndices[testBlobVHash] = len(testBlobVHashes)
testBlobVHashes = append(testBlobVHashes, testBlobVHash)
}
}
@ -216,7 +219,7 @@ func makeTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64,
// makeMultiBlobTx is a utility method to construct a ramdom blob tx with
// certain number of blobs in its sidecar.
func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobCount int, key *ecdsa.PrivateKey) *types.Transaction {
func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction {
var (
blobs []kzg4844.Blob
blobHashes []common.Hash
@ -224,10 +227,15 @@ func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCa
proofs []kzg4844.Proof
)
for i := 0; i < blobCount; i++ {
blobs = append(blobs, *testBlobs[i])
commitments = append(commitments, testBlobCommits[i])
proofs = append(proofs, testBlobProofs[i])
blobHashes = append(blobHashes, testBlobVHashes[i])
blobs = append(blobs, *testBlobs[blobOffset+i])
commitments = append(commitments, testBlobCommits[blobOffset+i])
if version == types.BlobSidecarVersion0 {
proofs = append(proofs, testBlobProofs[blobOffset+i])
} else {
cellProofs, _ := kzg4844.ComputeCellProofs(testBlobs[blobOffset+i])
proofs = append(proofs, cellProofs...)
}
blobHashes = append(blobHashes, testBlobVHashes[blobOffset+i])
}
blobtx := &types.BlobTx{
ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
@ -238,7 +246,7 @@ func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCa
BlobFeeCap: uint256.NewInt(blobFeeCap),
BlobHashes: blobHashes,
Value: uint256.NewInt(100),
Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion0, blobs, commitments, proofs),
Sidecar: types.NewBlobTxSidecar(version, blobs, commitments, proofs),
}
return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
}
@ -396,35 +404,21 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) {
// whatever is in the pool, it can be retrieved correctly.
func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
// Collect all the blobs tracked by the pool
known := make(map[common.Hash]struct{})
var (
hashes []common.Hash
known = make(map[common.Hash]struct{})
)
for _, txs := range pool.index {
for _, tx := range txs {
for _, vhash := range tx.vhashes {
known[vhash] = struct{}{}
}
hashes = append(hashes, tx.vhashes...)
}
}
// Attempt to retrieve all test blobs
hashes := make([]common.Hash, len(testBlobVHashes))
for i := range testBlobVHashes {
copy(hashes[i][:], testBlobVHashes[i][:])
}
sidecars := pool.GetBlobs(hashes)
var blobs []*kzg4844.Blob
var proofs []*kzg4844.Proof
for idx, sidecar := range sidecars {
if sidecar == nil {
blobs = append(blobs, nil)
proofs = append(proofs, nil)
continue
}
blobHashes := sidecar.BlobHashes()
for i, hash := range blobHashes {
if hash == hashes[idx] {
blobs = append(blobs, &sidecar.Blobs[i])
proofs = append(proofs, &sidecar.Proofs[i])
}
}
blobs, _, proofs, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0)
if err != nil {
t.Fatal(err)
}
// Cross validate what we received vs what we wanted
if len(blobs) != len(hashes) || len(proofs) != len(hashes) {
@ -434,13 +428,12 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
for i, hash := range hashes {
// If an item is missing, but shouldn't, error
if blobs[i] == nil || proofs[i] == nil {
if _, ok := known[hash]; ok {
t.Errorf("tracked blob retrieval failed: item %d, hash %x", i, hash)
}
t.Errorf("tracked blob retrieval failed: item %d, hash %x", i, hash)
continue
}
// Item retrieved, make sure it matches the expectation
if *blobs[i] != *testBlobs[i] || *proofs[i] != testBlobProofs[i] {
index := testBlobIndices[hash]
if *blobs[i] != *testBlobs[index] || proofs[i][0] != testBlobProofs[index] {
t.Errorf("retrieved blob or proof mismatch: item %d, hash %x", i, hash)
continue
}
@ -1071,9 +1064,9 @@ func TestChangingSlotterSize(t *testing.T) {
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
addr3 = crypto.PubkeyToAddress(key3.PublicKey)
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, key1)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, key2)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, key3)
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)
blob1, _ = rlp.EncodeToBytes(tx1)
blob2, _ = rlp.EncodeToBytes(tx2)
@ -1191,8 +1184,8 @@ func TestBlobCountLimit(t *testing.T) {
// Attempt to add transactions.
var (
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, key1)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 7, key2)
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 7, 0, key2, types.BlobSidecarVersion0)
)
errs := pool.Add([]*types.Transaction{tx1, tx2}, true)
@ -1675,6 +1668,181 @@ func TestAdd(t *testing.T) {
}
}
func TestGetBlobs(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
// Create a temporary folder for the persistent backend
storage := t.TempDir()
os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
store, _ := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, newSlotter(params.BlobTxMaxBlobs), nil)
// Create transactions from a few accounts.
var (
key1, _ = crypto.GenerateKey()
key2, _ = crypto.GenerateKey()
key3, _ = crypto.GenerateKey()
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
addr3 = crypto.PubkeyToAddress(key3.PublicKey)
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0) // [0, 6)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion1) // [6, 12)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 6, 12, key3, types.BlobSidecarVersion0) // [12, 18)
blob1, _ = rlp.EncodeToBytes(tx1)
blob2, _ = rlp.EncodeToBytes(tx2)
blob3, _ = rlp.EncodeToBytes(tx3)
)
// Write the two safely sized txs to store. note: although the store is
// configured for a blob count of 6, it can also support around ~1mb of call
// data - all this to say that we aren't using the the absolute largest shelf
// available.
store.Put(blob1)
store.Put(blob2)
store.Put(blob3)
store.Close()
// Mimic a blobpool with max blob count of 6 upgrading to a max blob count of 24.
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr3, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.Commit(0, true, false)
// Make custom chain config where the max blob count changes based on the loop variable.
cancunTime := uint64(0)
config := &params.ChainConfig{
ChainID: big.NewInt(1),
LondonBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
CancunTime: &cancunTime,
BlobScheduleConfig: &params.BlobScheduleConfig{
Cancun: &params.BlobConfig{
Target: 12,
Max: 24,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
},
}
chain := &testBlockChain{
config: config,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
// Verify the regular three txs are always available.
if got := pool.Get(tx1.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx1.Hash(), addr1)
}
if got := pool.Get(tx2.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx2.Hash(), addr2)
}
if got := pool.Get(tx3.Hash()); got == nil {
t.Errorf("expected tx %s from %s in pool", tx3.Hash(), addr3)
}
cases := []struct {
start int
limit int
version byte
expErr bool
}{
{
start: 0, limit: 6,
version: types.BlobSidecarVersion0,
},
{
start: 0, limit: 6,
version: types.BlobSidecarVersion1,
},
{
start: 3, limit: 9,
version: types.BlobSidecarVersion0,
},
{
start: 3, limit: 9,
version: types.BlobSidecarVersion1,
},
{
start: 3, limit: 15,
version: types.BlobSidecarVersion0,
},
{
start: 3, limit: 15,
version: types.BlobSidecarVersion1,
},
{
start: 0, limit: 18,
version: types.BlobSidecarVersion0,
},
{
start: 0, limit: 18,
version: types.BlobSidecarVersion1,
},
{
start: 18, limit: 20,
version: types.BlobSidecarVersion0,
expErr: true,
},
}
for i, c := range cases {
var vhashes []common.Hash
for j := c.start; j < c.limit; j++ {
vhashes = append(vhashes, testBlobVHashes[j])
}
blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version)
if c.expErr {
if err == nil {
t.Errorf("Unexpected return, want error for case %d", i)
}
} else {
if err != nil {
t.Errorf("Unexpected error for case %d, %v", i, err)
}
// Cross validate what we received vs what we wanted
length := c.limit - c.start
if len(blobs) != length || len(proofs) != length {
t.Errorf("retrieved blobs/proofs size mismatch: have %d/%d, want %d", len(blobs), len(proofs), length)
continue
}
for j := 0; j < len(blobs); j++ {
// If an item is missing, but shouldn't, error
if blobs[j] == nil || proofs[j] == nil {
t.Errorf("tracked blob retrieval failed: item %d, hash %x", j, vhashes[j])
continue
}
// Item retrieved, make sure the blob matches the expectation
if *blobs[j] != *testBlobs[c.start+j] {
t.Errorf("retrieved blob mismatch: item %d, hash %x", j, vhashes[j])
continue
}
// Item retrieved, make sure the proof matches the expectation
if c.version == types.BlobSidecarVersion0 {
if proofs[j][0] != testBlobProofs[c.start+j] {
t.Errorf("retrieved proof mismatch: item %d, hash %x", j, vhashes[j])
}
} else {
want, _ := kzg4844.ComputeCellProofs(blobs[j])
if !reflect.DeepEqual(want, proofs[j]) {
t.Errorf("retrieved proof mismatch: item %d, hash %x", j, vhashes[j])
}
}
}
}
}
pool.Close()
}
// fakeBilly is a billy.Database implementation which just drops data on the floor.
type fakeBilly struct {
billy.Database

View file

@ -18,7 +18,6 @@
package catalyst
import (
"crypto/sha256"
"errors"
"fmt"
"strconv"
@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/version"
@ -120,10 +118,13 @@ var caps = []string{
var (
// Number of blobs requested via getBlobsV2
getBlobsRequestedCounter = metrics.NewRegisteredCounter("engine/getblobs/requested", nil)
// Number of blobs requested via getBlobsV2 that are present in the blobpool
getBlobsAvailableCounter = metrics.NewRegisteredCounter("engine/getblobs/available", nil)
// Number of times getBlobsV2 responded with “hit”
getBlobsV2RequestHit = metrics.NewRegisteredCounter("engine/getblobs/hit", nil)
// Number of times getBlobsV2 responded with “miss”
getBlobsV2RequestMiss = metrics.NewRegisteredCounter("engine/getblobs/miss", nil)
)
@ -494,29 +495,15 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
if len(hashes) > 128 {
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
}
var (
res = make([]*engine.BlobAndProofV1, len(hashes))
hasher = sha256.New()
index = make(map[common.Hash]int)
sidecars = api.eth.BlobTxPool().GetBlobs(hashes)
)
for i, hash := range hashes {
index[hash] = i
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0)
if err != nil {
return nil, engine.InvalidParams.With(err)
}
for i, sidecar := range sidecars {
if res[i] != nil || sidecar == nil {
// already filled
continue
}
for cIdx, commitment := range sidecar.Commitments {
computed := kzg4844.CalcBlobHashV1(hasher, &commitment)
if idx, ok := index[computed]; ok {
res[idx] = &engine.BlobAndProofV1{
Blob: sidecar.Blobs[cIdx][:],
Proof: sidecar.Proofs[cIdx][:],
}
}
res := make([]*engine.BlobAndProofV1, len(hashes))
for i := 0; i < len(blobs); i++ {
res[i] = &engine.BlobAndProofV1{
Blob: blobs[i][:],
Proof: proofs[i][0][:],
}
}
return res, nil
@ -538,47 +525,19 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo
}
getBlobsV2RequestHit.Inc(1)
// pull up the blob hashes
var (
res = make([]*engine.BlobAndProofV2, len(hashes))
index = make(map[common.Hash][]int)
sidecars = api.eth.BlobTxPool().GetBlobs(hashes)
)
for i, hash := range hashes {
index[hash] = append(index[hash], i)
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1)
if err != nil {
return nil, engine.InvalidParams.With(err)
}
for i, sidecar := range sidecars {
if res[i] != nil {
// already filled
continue
res := make([]*engine.BlobAndProofV2, len(hashes))
for i := 0; i < len(blobs); i++ {
var cellProofs []hexutil.Bytes
for _, proof := range proofs[i] {
cellProofs = append(cellProofs, proof[:])
}
if sidecar == nil {
// not found, return empty response
return nil, nil
}
if sidecar.Version != types.BlobSidecarVersion1 {
log.Info("GetBlobs queried V0 transaction: index %v, blobhashes %v", index, sidecar.BlobHashes())
return nil, nil
}
blobHashes := sidecar.BlobHashes()
for bIdx, hash := range blobHashes {
if idxes, ok := index[hash]; ok {
proofs, err := sidecar.CellProofsAt(bIdx)
if err != nil {
return nil, engine.InvalidParams.With(err)
}
var cellProofs []hexutil.Bytes
for _, proof := range proofs {
cellProofs = append(cellProofs, proof[:])
}
for _, idx := range idxes {
res[idx] = &engine.BlobAndProofV2{
Blob: sidecar.Blobs[bIdx][:],
CellProofs: cellProofs,
}
}
}
res[i] = &engine.BlobAndProofV2{
Blob: blobs[i][:],
CellProofs: cellProofs,
}
}
return res, nil