core/txpool/blobpool: remove legacy sidecar conversion (#33352)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

This PR removes the legacy sidecar conversion logic.

After the Osaka fork, the blobpool will accept only blob sidecar version
1.
Any remaining version 0 blob transactions, if they still exist, will no
longer
be eligible for inclusion.

Note that conversion at the RPC layer is still supported, and version 0
blob
transactions will be automatically converted to version 1 there.
This commit is contained in:
rjl493456442 2025-12-19 03:33:07 +08:00 committed by GitHub
parent ffe9dc97e5
commit bd77b77ede
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 30 additions and 907 deletions

View file

@ -21,12 +21,10 @@ import (
"container/heap" "container/heap"
"errors" "errors"
"fmt" "fmt"
"maps"
"math" "math"
"math/big" "math/big"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -96,11 +94,6 @@ const (
// storeVersion is the current slotter layout used for the billy.Database // storeVersion is the current slotter layout used for the billy.Database
// store. // store.
storeVersion = 1 storeVersion = 1
// conversionTimeWindow defines the period after the Osaka fork during which
// the pool will still accept and convert legacy blob transactions. After this
// window, all legacy blob transactions will be rejected.
conversionTimeWindow = time.Hour * 2
) )
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
@ -337,9 +330,8 @@ type BlobPool struct {
stored uint64 // Useful data size of all transactions on disk stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs limbo *limbo // Persistent data store for the non-finalized blobs
signer types.Signer // Transaction signer to use for sender recovery signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through chain BlockChain // Chain object to access the state through
cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
head atomic.Pointer[types.Header] // Current head of the chain head atomic.Pointer[types.Header] // Current head of the chain
state *state.StateDB // Current state at the head of the chain state *state.StateDB // Current state at the head of the chain
@ -368,7 +360,6 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
hasPendingAuth: hasPendingAuth, hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()), signer: types.LatestSigner(chain.Config()),
chain: chain, chain: chain,
cQueue: newConversionQueue(), // Deprecate it after the osaka fork
lookup: newLookup(), lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta), index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int), spent: make(map[common.Address]*uint256.Int),
@ -490,9 +481,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
// Close closes down the underlying persistent store. // Close closes down the underlying persistent store.
func (p *BlobPool) Close() error { func (p *BlobPool) Close() error {
// Terminate the conversion queue
p.cQueue.close()
var errs []error var errs []error
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
if err := p.limbo.Close(); err != nil { if err := p.limbo.Close(); err != nil {
@ -890,172 +878,6 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
basefeeGauge.Update(int64(basefee.Uint64())) basefeeGauge.Update(int64(basefee.Uint64()))
blobfeeGauge.Update(int64(blobfee.Uint64())) blobfeeGauge.Update(int64(blobfee.Uint64()))
p.updateStorageMetrics() p.updateStorageMetrics()
// Perform the conversion logic at the fork boundary
if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) {
// Deep copy all indexed transaction metadata.
var (
ids = make(map[common.Address]map[uint64]uint64)
txs = make(map[common.Address]map[uint64]common.Hash)
)
for sender, list := range p.index {
ids[sender] = make(map[uint64]uint64)
txs[sender] = make(map[uint64]common.Hash)
for _, m := range list {
ids[sender][m.nonce] = m.id
txs[sender][m.nonce] = m.hash
}
}
// Initiate the background conversion thread.
p.cQueue.launchBillyConversion(func() {
p.convertLegacySidecars(ids, txs)
})
}
}
// compareAndSwap checks if the specified transaction is still tracked in the pool
// and replace the metadata accordingly. It should only be used in the fork boundary
// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped
// for simplicity which we assume it's very likely to happen.
//
// The returned flag indicates whether the replacement succeeded.
func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool {
p.lock.Lock()
defer p.lock.Unlock()
newId, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to store transaction", "hash", hash, "err", err)
return false
}
newSize := uint64(len(blob))
newStorageSize := p.store.Size(newId)
// Terminate the procedure if the transaction was already evicted. The
// newly added blob should be removed before return.
if !p.lookup.update(hash, newId, newSize) {
if derr := p.store.Delete(newId); derr != nil {
log.Error("Failed to delete the dangling blob tx", "err", derr)
} else {
log.Warn("Deleted the dangling blob tx", "id", newId)
}
return false
}
// Update the metadata of blob transaction
for _, meta := range p.index[address] {
if meta.hash == hash {
meta.id = newId
meta.version = types.BlobSidecarVersion1
meta.storageSize = newStorageSize
meta.size = newSize
p.stored += uint64(newStorageSize)
p.stored -= uint64(oldStorageSize)
break
}
}
if err := p.store.Delete(oldID); err != nil {
log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err)
}
return true
}
// convertLegacySidecar fetches transaction data from the store, performs an
// on-the-fly conversion. This function is intended for use only during the
// Osaka fork transition period.
//
// The returned flag indicates whether the replacement succeeds or not.
func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool {
start := time.Now()
// Retrieves the legacy blob transaction from the underlying store with
// read lock held, preventing any potential data race around the slot
// specified by the id.
p.lock.RLock()
data, err := p.store.Get(id)
if err != nil {
p.lock.RUnlock()
// The transaction may have been evicted simultaneously, safe to skip conversion.
log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err)
return false
}
oldStorageSize := p.store.Size(id)
p.lock.RUnlock()
// Decode the transaction, the failure is not expected and report the error
// loudly if possible. If the blob transaction in this slot is corrupted,
// leave it in the store, it will be dropped during the next pool
// initialization.
var tx types.Transaction
if err = rlp.DecodeBytes(data, &tx); err != nil {
log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err)
return false
}
// Skip conversion if the transaction does not match the expected hash, or if it was
// already converted. This can occur if the original transaction was evicted from the
// pool and the slot was reused by a new one.
if tx.Hash() != hash {
log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash())
return false
}
sc := tx.BlobTxSidecar()
if sc.Version >= types.BlobSidecarVersion1 {
log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id)
return false
}
// Perform the sidecar conversion, the failure is not expected and report the error
// loudly if possible.
if err := tx.BlobTxSidecar().ToV1(); err != nil {
log.Error("Failed to convert blob transaction", "hash", hash, "err", err)
return false
}
// Encode the converted transaction, the failure is not expected and report
// the error loudly if possible.
blob, err := rlp.EncodeToBytes(&tx)
if err != nil {
log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err)
return false
}
// Replace the legacy blob transaction with the converted format.
if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) {
log.Error("Failed to replace the legacy transaction", "hash", hash)
return false
}
log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
return true
}
// convertLegacySidecars converts all given transactions to sidecar version 1.
//
// If any of them fails to be converted, the subsequent transactions will still
// be processed, as we assume the failure is very unlikely to happen. If happens,
// these transactions will be stuck in the pool until eviction.
func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) {
var (
start = time.Now()
success int
failure int
)
for addr, list := range txs {
// Transactions evicted from the pool must be contiguous, if in any case,
// the transactions are gapped with each other, they will be discarded.
nonces := slices.Collect(maps.Keys(list))
slices.Sort(nonces)
// Convert the txs with nonce order
for _, nonce := range nonces {
if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) {
success++
} else {
failure++
}
}
}
log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start)))
} }
// reorg assembles all the transactors and missing transactions between an old // reorg assembles all the transactors and missing transactions between an old
@ -1535,8 +1357,8 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
// //
// The version argument specifies the type of proofs to return, either the // The version argument specifies the type of proofs to return, either the
// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is // blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is
// CPU intensive, so only done if explicitly requested with the convert flag. // CPU intensive and prohibited in the blobpool explicitly.
func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) { func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
var ( var (
blobs = make([]*kzg4844.Blob, len(vhashes)) blobs = make([]*kzg4844.Blob, len(vhashes))
commitments = make([]kzg4844.Commitment, len(vhashes)) commitments = make([]kzg4844.Commitment, len(vhashes))
@ -1587,7 +1409,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) (
} }
// Mark hash as seen. // Mark hash as seen.
filled[hash] = struct{}{} filled[hash] = struct{}{}
if sidecar.Version != version && !convert { if sidecar.Version != version {
// Skip blobs with incompatible version. Note we still track the blob hash // Skip blobs with incompatible version. Note we still track the blob hash
// in `filled` here, ensuring that we do not resolve this tx another time. // in `filled` here, ensuring that we do not resolve this tx another time.
continue continue
@ -1596,29 +1418,13 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) (
var pf []kzg4844.Proof var pf []kzg4844.Proof
switch version { switch version {
case types.BlobSidecarVersion0: case types.BlobSidecarVersion0:
if sidecar.Version == types.BlobSidecarVersion0 { pf = []kzg4844.Proof{sidecar.Proofs[i]}
pf = []kzg4844.Proof{sidecar.Proofs[i]}
} else {
proof, err := kzg4844.ComputeBlobProof(&sidecar.Blobs[i], sidecar.Commitments[i])
if err != nil {
return nil, nil, nil, err
}
pf = []kzg4844.Proof{proof}
}
case types.BlobSidecarVersion1: case types.BlobSidecarVersion1:
if sidecar.Version == types.BlobSidecarVersion0 { cellProofs, err := sidecar.CellProofsAt(i)
cellProofs, err := kzg4844.ComputeCellProofs(&sidecar.Blobs[i]) if err != nil {
if err != nil { return nil, nil, nil, err
return nil, nil, nil, err
}
pf = cellProofs
} else {
cellProofs, err := sidecar.CellProofsAt(i)
if err != nil {
return nil, nil, nil, err
}
pf = cellProofs
} }
pf = cellProofs
} }
for _, index := range list { for _, index := range list {
blobs[index] = &sidecar.Blobs[i] blobs[index] = &sidecar.Blobs[i]
@ -1645,56 +1451,15 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int {
return available return available
} }
// preCheck performs the static validation upon the provided tx and converts
// the legacy sidecars if Osaka fork has been activated with a short time window.
//
// This function is pure static and lock free.
func (p *BlobPool) preCheck(tx *types.Transaction) error {
var (
head = p.head.Load()
isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time)
deadline time.Time
)
if isOsaka {
deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow)
}
// Validate the transaction statically at first to avoid unnecessary
// conversion. This step doesn't require lock protection.
if err := p.ValidateTxBasics(tx); err != nil {
return err
}
// Before the Osaka fork, reject the blob txs with cell proofs
if !isOsaka {
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
return nil
} else {
return errors.New("cell proof is not supported yet")
}
}
// After the Osaka fork, reject the legacy blob txs if the conversion
// time window is passed.
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 {
return nil
}
if head.Time > uint64(deadline.Unix()) {
return errors.New("legacy blob tx is not supported")
}
// Convert the legacy sidecar after Osaka fork. This could be a long
// procedure which takes a few seconds, even minutes if there is a long
// queue. Fortunately it will only block the routine of the source peer
// announcing the tx, without affecting other parts.
return p.cQueue.convert(tx)
}
// Add inserts a set of blob transactions into the pool if they pass validation (both // Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions). // consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
var ( var (
errs []error = make([]error, len(txs)) errs = make([]error, len(txs))
adds = make([]*types.Transaction, 0, len(txs)) adds = make([]*types.Transaction, 0, len(txs))
) )
for i, tx := range txs { for i, tx := range txs {
if errs[i] = p.preCheck(tx); errs[i] != nil { if errs[i] = p.ValidateTxBasics(tx); errs[i] != nil {
continue continue
} }
if errs[i] = p.add(tx); errs[i] == nil { if errs[i] = p.add(tx); errs[i] == nil {

View file

@ -92,10 +92,6 @@ type testBlockChain struct {
blockTime *uint64 blockTime *uint64
} }
func (bc *testBlockChain) setHeadTime(time uint64) {
bc.blockTime = &time
}
func (bc *testBlockChain) Config() *params.ChainConfig { func (bc *testBlockChain) Config() *params.ChainConfig {
return bc.config return bc.config
} }
@ -433,11 +429,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
hashes = append(hashes, tx.vhashes...) hashes = append(hashes, tx.vhashes...)
} }
} }
blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0, false) blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1, false) blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1329,7 +1325,7 @@ func TestBlobCountLimit(t *testing.T) {
// Check that first succeeds second fails. // Check that first succeeds second fails.
if errs[0] != nil { if errs[0] != nil {
t.Fatalf("expected tx with 7 blobs to succeed") t.Fatalf("expected tx with 7 blobs to succeed, got %v", errs[0])
} }
if !errors.Is(errs[1], txpool.ErrTxBlobLimitExceeded) { if !errors.Is(errs[1], txpool.ErrTxBlobLimitExceeded) {
t.Fatalf("expected tx with 8 blobs to fail, got: %v", errs[1]) t.Fatalf("expected tx with 8 blobs to fail, got: %v", errs[1])
@ -1806,66 +1802,6 @@ func TestAdd(t *testing.T) {
} }
} }
// Tests that transactions with legacy sidecars are accepted within the
// conversion window but rejected after it has passed.
func TestAddLegacyBlobTx(t *testing.T) {
testAddLegacyBlobTx(t, true) // conversion window has not yet passed
testAddLegacyBlobTx(t, false) // conversion window passed
}
func testAddLegacyBlobTx(t *testing.T, accept bool) {
var (
key1, _ = crypto.GenerateKey()
key2, _ = crypto.GenerateKey()
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
)
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.Commit(0, true, false)
chain := &testBlockChain{
config: params.MergedTestChainConfig,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
}
var timeDiff uint64
if accept {
timeDiff = uint64(conversionTimeWindow.Seconds()) - 1
} else {
timeDiff = uint64(conversionTimeWindow.Seconds()) + 1
}
time := *params.MergedTestChainConfig.OsakaTime + timeDiff
chain.setHeadTime(time)
pool := New(Config{Datadir: t.TempDir()}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
// Attempt to add legacy blob transactions.
var (
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion0)
txs = []*types.Transaction{tx1, tx2}
)
errs := pool.Add(txs, true)
for _, err := range errs {
if accept && err != nil {
t.Fatalf("expected tx add to succeed, %v", err)
}
if !accept && err == nil {
t.Fatal("expected tx add to fail")
}
}
verifyPoolInternals(t, pool)
pool.Close()
}
func TestGetBlobs(t *testing.T) { func TestGetBlobs(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
@ -1952,7 +1888,6 @@ func TestGetBlobs(t *testing.T) {
limit int limit int
fillRandom bool // Whether to randomly fill some of the requested blobs with unknowns fillRandom bool // Whether to randomly fill some of the requested blobs with unknowns
version byte // Blob sidecar version to request version byte // Blob sidecar version to request
convert bool // Whether to convert version on retrieval
}{ }{
{ {
start: 0, limit: 6, start: 0, limit: 6,
@ -2018,11 +1953,6 @@ func TestGetBlobs(t *testing.T) {
start: 0, limit: 18, fillRandom: true, start: 0, limit: 18, fillRandom: true,
version: types.BlobSidecarVersion1, version: types.BlobSidecarVersion1,
}, },
{
start: 0, limit: 18, fillRandom: true,
version: types.BlobSidecarVersion1,
convert: true, // Convert some version 0 blobs to version 1 while retrieving
},
} }
for i, c := range cases { for i, c := range cases {
var ( var (
@ -2044,7 +1974,7 @@ func TestGetBlobs(t *testing.T) {
filled[len(vhashes)] = struct{}{} filled[len(vhashes)] = struct{}{}
vhashes = append(vhashes, testrand.Hash()) vhashes = append(vhashes, testrand.Hash())
} }
blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version, c.convert) blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version)
if err != nil { if err != nil {
t.Errorf("Unexpected error for case %d, %v", i, err) t.Errorf("Unexpected error for case %d, %v", i, err)
} }
@ -2070,8 +2000,7 @@ func TestGetBlobs(t *testing.T) {
// If an item is missing, but shouldn't, error // If an item is missing, but shouldn't, error
if blobs[j] == nil || proofs[j] == nil { if blobs[j] == nil || proofs[j] == nil {
// This is only an error if there was no version mismatch // This is only an error if there was no version mismatch
if c.convert || if (c.version == types.BlobSidecarVersion1 && 6 <= testBlobIndex && testBlobIndex < 12) ||
(c.version == types.BlobSidecarVersion1 && 6 <= testBlobIndex && testBlobIndex < 12) ||
(c.version == types.BlobSidecarVersion0 && (testBlobIndex < 6 || 12 <= testBlobIndex)) { (c.version == types.BlobSidecarVersion0 && (testBlobIndex < 6 || 12 <= testBlobIndex)) {
t.Errorf("tracked blob retrieval failed: item %d, hash %x", j, vhashes[j]) t.Errorf("tracked blob retrieval failed: item %d, hash %x", j, vhashes[j])
} }
@ -2098,185 +2027,6 @@ func TestGetBlobs(t *testing.T) {
pool.Close() pool.Close()
} }
// TestSidecarConversion will verify that after the Osaka fork, all legacy
// sidecars in the pool are successfully convert to v1 sidecars.
func TestSidecarConversion(t *testing.T) {
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
// Create a temporary folder for the persistent backend
storage := t.TempDir()
os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
var (
preOsakaTxs = make(types.Transactions, 10)
postOsakaTxs = make(types.Transactions, 3)
keys = make([]*ecdsa.PrivateKey, len(preOsakaTxs)+len(postOsakaTxs))
addrs = make([]common.Address, len(preOsakaTxs)+len(postOsakaTxs))
statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
)
for i := range keys {
keys[i], _ = crypto.GenerateKey()
addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey)
statedb.AddBalance(addrs[i], uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
}
for i := range preOsakaTxs {
preOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 2, 0, keys[i], types.BlobSidecarVersion0)
}
for i := range postOsakaTxs {
if i == 0 {
// First has a v0 sidecar.
postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion0)
}
postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion1)
}
statedb.Commit(0, true, false)
// Test plan:
// 1) Create a bunch v0 sidecar txs and add to pool before Osaka.
// 2) Pass in new Osaka header to activate the conversion thread.
// 3) Continue adding both v0 and v1 transactions to the pool.
// 4) Verify that as additional blocks come in, transactions involved in the
// migration are correctly discarded.
config := &params.ChainConfig{
ChainID: big.NewInt(1),
LondonBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
CancunTime: newUint64(0),
PragueTime: newUint64(0),
OsakaTime: newUint64(1),
BlobScheduleConfig: params.DefaultBlobSchedule,
}
chain := &testBlockChain{
config: config,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
blocks: make(map[uint64]*types.Block),
}
// Create 3 blocks:
// - the current block, before Osaka
// - the first block after Osaka
// - another post-Osaka block with several transactions in it
header0 := chain.CurrentBlock()
header0.Time = 0
chain.blocks[0] = types.NewBlockWithHeader(header0)
header1 := chain.CurrentBlock()
header1.Number = big.NewInt(1)
header1.Time = 1
chain.blocks[1] = types.NewBlockWithHeader(header1)
header2 := chain.CurrentBlock()
header2.Time = 2
header2.Number = big.NewInt(2)
// Make a copy of one of the pre-Osaka transactions and convert it to v1 here
// so that we can add it to the pool later and ensure a duplicate is not added
// by the conversion queue.
tx := preOsakaTxs[len(preOsakaTxs)-1]
sc := *tx.BlobTxSidecar() // copy sidecar
sc.ToV1()
tx.WithBlobTxSidecar(&sc)
block2 := types.NewBlockWithHeader(header2).WithBody(types.Body{Transactions: append(postOsakaTxs, tx)})
chain.blocks[2] = block2
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, header0, newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
errs := pool.Add(preOsakaTxs, true)
for i, err := range errs {
if err != nil {
t.Errorf("failed to insert blob tx from %s: %s", addrs[i], errs[i])
}
}
// Kick off migration.
pool.Reset(header0, header1)
// Add the v0 sidecar tx, but don't block so we can keep doing other stuff
// while it converts the sidecar.
addDone := make(chan struct{})
go func() {
pool.Add(types.Transactions{postOsakaTxs[0]}, false)
close(addDone)
}()
// Add the post-Osaka v1 sidecar txs.
errs = pool.Add(postOsakaTxs[1:], false)
for _, err := range errs {
if err != nil {
t.Fatalf("expected tx add to succeed: %v", err)
}
}
// Wait for the first tx's conversion to complete, then check that all
// transactions added after Osaka can be accounted for in the pool.
<-addDone
pending := pool.Pending(txpool.PendingFilter{BlobTxs: true, BlobVersion: types.BlobSidecarVersion1})
for _, tx := range postOsakaTxs {
from, _ := pool.signer.Sender(tx)
if len(pending[from]) != 1 || pending[from][0].Hash != tx.Hash() {
t.Fatalf("expected post-Osaka txs to be pending")
}
}
// Now update the pool with the next block. This should cause the pool to
// clear out the post-Osaka txs since they were included in block 2. Since the
// test blockchain doesn't manage nonces, we'll just do that manually before
// the reset is called. Don't forget about the pre-Osaka transaction we also
// added to block 2!
for i := range postOsakaTxs {
statedb.SetNonce(addrs[len(preOsakaTxs)+i], 1, tracing.NonceChangeEoACall)
}
statedb.SetNonce(addrs[len(preOsakaTxs)-1], 1, tracing.NonceChangeEoACall)
pool.Reset(header1, block2.Header())
// Now verify no post-Osaka transactions are tracked by the pool.
for i, tx := range postOsakaTxs {
if pool.Get(tx.Hash()) != nil {
t.Fatalf("expected txs added post-osaka to have been placed in limbo due to inclusion in a block: index %d, hash %s", i, tx.Hash())
}
}
// Wait for the pool migration to complete.
<-pool.cQueue.anyBillyConversionDone
// Verify all transactions in the pool were converted and verify the
// subsequent cell proofs.
count, _ := pool.Stats()
if count != len(preOsakaTxs)-1 {
t.Errorf("expected pending count to match initial tx count: pending=%d, expected=%d", count, len(preOsakaTxs)-1)
}
for addr, acc := range pool.index {
for _, m := range acc {
if m.version != types.BlobSidecarVersion1 {
t.Errorf("expected sidecar to have been converted: from %s, hash %s", addr, m.hash)
}
tx := pool.Get(m.hash)
if tx == nil {
t.Errorf("failed to get tx by hash: %s", m.hash)
}
sc := tx.BlobTxSidecar()
if err := kzg4844.VerifyCellProofs(sc.Blobs, sc.Commitments, sc.Proofs); err != nil {
t.Errorf("failed to verify cell proofs for tx %s after conversion: %s", m.hash, err)
}
}
}
verifyPoolInternals(t, pool)
// Launch conversion a second time.
// This is just a sanity check to ensure we can handle it.
pool.Reset(header0, header1)
pool.Close()
}
// fakeBilly is a billy.Database implementation which just drops data on the floor. // fakeBilly is a billy.Database implementation which just drops data on the floor.
type fakeBilly struct { type fakeBilly struct {
billy.Database billy.Database
@ -2360,5 +2110,3 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
} }
} }
} }
func newUint64(val uint64) *uint64 { return &val }

View file

@ -1,218 +0,0 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"errors"
"slices"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// maxPendingConversionTasks caps the number of pending conversion tasks. This
// prevents excessive memory usage; the worst-case scenario (2k transactions
// with 6 blobs each) would consume approximately 1.5GB of memory.
const maxPendingConversionTasks = 2048
// txConvert represents a conversion task with an attached legacy blob transaction.
type txConvert struct {
tx *types.Transaction // Legacy blob transaction
done chan error // Channel for signaling back if the conversion succeeds
}
// conversionQueue is a dedicated queue for converting legacy blob transactions
// received from the network after the Osaka fork. Since conversion is expensive,
// it is performed in the background by a single thread, ensuring the main Geth
// process is not overloaded.
type conversionQueue struct {
tasks chan *txConvert
startBilly chan func()
quit chan struct{}
closed chan struct{}
billyQueue []func()
billyTaskDone chan struct{}
// This channel will be closed when the first billy conversion finishes.
// It's added for unit tests to synchronize with the conversion progress.
anyBillyConversionDone chan struct{}
}
// newConversionQueue constructs the conversion queue.
func newConversionQueue() *conversionQueue {
q := &conversionQueue{
tasks: make(chan *txConvert),
startBilly: make(chan func()),
quit: make(chan struct{}),
closed: make(chan struct{}),
anyBillyConversionDone: make(chan struct{}),
}
go q.loop()
return q
}
// convert accepts a legacy blob transaction with version-0 blobs and queues it
// for conversion.
//
// This function may block for a long time until the transaction is processed.
func (q *conversionQueue) convert(tx *types.Transaction) error {
done := make(chan error, 1)
select {
case q.tasks <- &txConvert{tx: tx, done: done}:
return <-done
case <-q.closed:
return errors.New("conversion queue closed")
}
}
// launchBillyConversion starts a conversion task in the background.
func (q *conversionQueue) launchBillyConversion(fn func()) error {
select {
case q.startBilly <- fn:
return nil
case <-q.closed:
return errors.New("conversion queue closed")
}
}
// close terminates the conversion queue.
func (q *conversionQueue) close() {
select {
case <-q.closed:
return
default:
close(q.quit)
<-q.closed
}
}
// run converts a batch of legacy blob txs to the new cell proof format.
func (q *conversionQueue) run(tasks []*txConvert, done chan struct{}, interrupt *atomic.Int32) {
defer close(done)
for _, t := range tasks {
if interrupt != nil && interrupt.Load() != 0 {
t.done <- errors.New("conversion is interrupted")
continue
}
sidecar := t.tx.BlobTxSidecar()
if sidecar == nil {
t.done <- errors.New("tx without sidecar")
continue
}
// Run the conversion, the original sidecar will be mutated in place
start := time.Now()
err := sidecar.ToV1()
t.done <- err
log.Trace("Converted legacy blob tx", "hash", t.tx.Hash(), "err", err, "elapsed", common.PrettyDuration(time.Since(start)))
}
}
func (q *conversionQueue) loop() {
defer close(q.closed)
var (
done chan struct{} // Non-nil if background routine is active
interrupt *atomic.Int32 // Flag to signal conversion interruption
// The pending tasks for sidecar conversion. We assume the number of legacy
// blob transactions requiring conversion will not be excessive. However,
// a hard cap is applied as a protective measure.
txTasks []*txConvert
firstBilly = true
)
for {
select {
case t := <-q.tasks:
if len(txTasks) >= maxPendingConversionTasks {
t.done <- errors.New("conversion queue is overloaded")
continue
}
txTasks = append(txTasks, t)
// Launch the background conversion thread if it's idle
if done == nil {
done, interrupt = make(chan struct{}), new(atomic.Int32)
tasks := slices.Clone(txTasks)
txTasks = txTasks[:0]
go q.run(tasks, done, interrupt)
}
case <-done:
done, interrupt = nil, nil
if len(txTasks) > 0 {
done, interrupt = make(chan struct{}), new(atomic.Int32)
tasks := slices.Clone(txTasks)
txTasks = txTasks[:0]
go q.run(tasks, done, interrupt)
}
case fn := <-q.startBilly:
q.billyQueue = append(q.billyQueue, fn)
q.runNextBillyTask()
case <-q.billyTaskDone:
if firstBilly {
close(q.anyBillyConversionDone)
firstBilly = false
}
q.runNextBillyTask()
case <-q.quit:
if done != nil {
log.Debug("Waiting for blob proof conversion to exit")
interrupt.Store(1)
<-done
}
if q.billyTaskDone != nil {
log.Debug("Waiting for blobpool billy conversion to exit")
<-q.billyTaskDone
}
// Signal any tasks that were queued for the next batch but never started
// so callers blocked in convert() receive an error instead of hanging.
for _, t := range txTasks {
// Best-effort notify; t.done is a buffered channel of size 1
// created by convert(), and we send exactly once per task.
t.done <- errors.New("conversion queue closed")
}
// Drop references to allow GC of the backing array.
txTasks = txTasks[:0]
return
}
}
}
func (q *conversionQueue) runNextBillyTask() {
if len(q.billyQueue) == 0 {
q.billyTaskDone = nil
return
}
fn := q.billyQueue[0]
q.billyQueue = append(q.billyQueue[:0], q.billyQueue[1:]...)
done := make(chan struct{})
go func() { defer close(done); fn() }()
q.billyTaskDone = done
}

View file

@ -1,171 +0,0 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"crypto/ecdsa"
"crypto/sha256"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
// createV1BlobTx creates a blob transaction with version 1 sidecar for testing.
func createV1BlobTx(nonce uint64, key *ecdsa.PrivateKey) *types.Transaction {
blob := &kzg4844.Blob{byte(nonce)}
commitment, _ := kzg4844.BlobToCommitment(blob)
cellProofs, _ := kzg4844.ComputeCellProofs(blob)
blobtx := &types.BlobTx{
ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
Nonce: nonce,
GasTipCap: uint256.NewInt(1),
GasFeeCap: uint256.NewInt(1000),
Gas: 21000,
BlobFeeCap: uint256.NewInt(100),
BlobHashes: []common.Hash{kzg4844.CalcBlobHashV1(sha256.New(), &commitment)},
Value: uint256.NewInt(100),
Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, []kzg4844.Blob{*blob}, []kzg4844.Commitment{commitment}, cellProofs),
}
return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
}
func TestConversionQueueBasic(t *testing.T) {
queue := newConversionQueue()
defer queue.close()
key, _ := crypto.GenerateKey()
tx := makeTx(0, 1, 1, 1, key)
if err := queue.convert(tx); err != nil {
t.Fatalf("Expected successful conversion, got error: %v", err)
}
if tx.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Errorf("Expected sidecar version to be %d, got %d", types.BlobSidecarVersion1, tx.BlobTxSidecar().Version)
}
}
func TestConversionQueueV1BlobTx(t *testing.T) {
queue := newConversionQueue()
defer queue.close()
key, _ := crypto.GenerateKey()
tx := createV1BlobTx(0, key)
version := tx.BlobTxSidecar().Version
err := queue.convert(tx)
if err != nil {
t.Fatalf("Expected successful conversion, got error: %v", err)
}
if tx.BlobTxSidecar().Version != version {
t.Errorf("Expected sidecar version to remain %d, got %d", version, tx.BlobTxSidecar().Version)
}
}
func TestConversionQueueClosed(t *testing.T) {
queue := newConversionQueue()
// Close the queue first
queue.close()
key, _ := crypto.GenerateKey()
tx := makeTx(0, 1, 1, 1, key)
err := queue.convert(tx)
if err == nil {
t.Fatal("Expected error when converting on closed queue, got nil")
}
}
func TestConversionQueueDoubleClose(t *testing.T) {
queue := newConversionQueue()
queue.close()
queue.close() // Should not panic
}
func TestConversionQueueAutoRestartBatch(t *testing.T) {
queue := newConversionQueue()
defer queue.close()
key, _ := crypto.GenerateKey()
// Create a heavy transaction to ensure the first batch runs long enough
// for subsequent tasks to be queued while it is active.
heavy := makeMultiBlobTx(0, 1, 1, 1, int(params.BlobTxMaxBlobs), 0, key, types.BlobSidecarVersion0)
var wg sync.WaitGroup
wg.Add(1)
heavyDone := make(chan error, 1)
go func() {
defer wg.Done()
heavyDone <- queue.convert(heavy)
}()
// Give the conversion worker a head start so that the following tasks are
// enqueued while the first batch is running.
time.Sleep(200 * time.Millisecond)
tx1 := makeTx(1, 1, 1, 1, key)
tx2 := makeTx(2, 1, 1, 1, key)
wg.Add(2)
done1 := make(chan error, 1)
done2 := make(chan error, 1)
go func() { defer wg.Done(); done1 <- queue.convert(tx1) }()
go func() { defer wg.Done(); done2 <- queue.convert(tx2) }()
select {
case err := <-done1:
if err != nil {
t.Fatalf("tx1 conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for tx1 conversion")
}
select {
case err := <-done2:
if err != nil {
t.Fatalf("tx2 conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for tx2 conversion")
}
select {
case err := <-heavyDone:
if err != nil {
t.Fatalf("heavy conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for heavy conversion")
}
wg.Wait()
if tx1.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Fatalf("tx1 sidecar version mismatch: have %d, want %d", tx1.BlobTxSidecar().Version, types.BlobSidecarVersion1)
}
if tx2.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Fatalf("tx2 sidecar version mismatch: have %d, want %d", tx2.BlobTxSidecar().Version, types.BlobSidecarVersion1)
}
}

View file

@ -110,13 +110,3 @@ func (l *lookup) untrack(tx *blobTxMeta) {
} }
} }
} }
// update updates the transaction index. It should only be used in the conversion.
func (l *lookup) update(hash common.Hash, id uint64, size uint64) bool {
meta, exists := l.txIndex[hash]
if !exists {
return false
}
meta.id, meta.size = id, size
return true
}

View file

@ -130,7 +130,7 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
return fmt.Errorf("%w: gas %v, minimum needed %v", core.ErrIntrinsicGas, tx.Gas(), intrGas) return fmt.Errorf("%w: gas %v, minimum needed %v", core.ErrIntrinsicGas, tx.Gas(), intrGas)
} }
// Ensure the transaction can cover floor data gas. // Ensure the transaction can cover floor data gas.
if opts.Config.IsPrague(head.Number, head.Time) { if rules.IsPrague {
floorDataGas, err := core.FloorDataGas(tx.Data()) floorDataGas, err := core.FloorDataGas(tx.Data())
if err != nil { if err != nil {
return err return err
@ -160,6 +160,15 @@ func validateBlobTx(tx *types.Transaction, head *types.Header, opts *ValidationO
if sidecar == nil { if sidecar == nil {
return errors.New("missing sidecar in blob transaction") return errors.New("missing sidecar in blob transaction")
} }
// Ensure the sidecar is constructed with the correct version, consistent
// with the current fork.
version := types.BlobSidecarVersion0
if opts.Config.IsOsaka(head.Number, head.Time) {
version = types.BlobSidecarVersion1
}
if sidecar.Version != version {
return fmt.Errorf("unexpected sidecar version, want: %d, got: %d", version, sidecar.Version)
}
// Ensure the blob fee cap satisfies the minimum blob gas price // Ensure the blob fee cap satisfies the minimum blob gas price
if tx.BlobGasFeeCapIntCmp(blobTxMinBlobGasPrice) < 0 { if tx.BlobGasFeeCapIntCmp(blobTxMinBlobGasPrice) < 0 {
return fmt.Errorf("%w: blob fee cap %v, minimum needed %v", ErrTxGasPriceTooLow, tx.BlobGasFeeCap(), blobTxMinBlobGasPrice) return fmt.Errorf("%w: blob fee cap %v, minimum needed %v", ErrTxGasPriceTooLow, tx.BlobGasFeeCap(), blobTxMinBlobGasPrice)

View file

@ -517,7 +517,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
if len(hashes) > 128 { if len(hashes) > 128 {
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
} }
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0, false) blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0)
if err != nil { if err != nil {
return nil, engine.InvalidParams.With(err) return nil, engine.InvalidParams.With(err)
} }
@ -578,7 +578,7 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo
return nil, nil return nil, nil
} }
blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1, false) blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1)
if err != nil { if err != nil {
return nil, engine.InvalidParams.With(err) return nil, engine.InvalidParams.With(err)
} }