core/txpool/blobpool: introduce sidecar conversion for legacy blob transactions (#32656)

This pull request introduces a queue for legacy sidecar conversion to
handle transactions that persist after the Osaka fork. Simply dropping 
these transactions would significantly harm the user experience.

To balance usability with system complexity, we have introduced a
conversion time window of two hours post Osaka fork. During this period, 
the system will accept legacy blob transactions and convert them in a 
background process.

After the window, all legacy transactions will be rejected. Notably, all
the blob transactions will be validated statically before the conversion, 
and also all conversion are performed in a single thread, minimize the risk 
of being DoS.

We believe this two hour window provides sufficient time to process
in-flight legacy transactions and allows submitters to migrate to the 
new format.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
rjl493456442 2025-09-20 10:19:55 +08:00 committed by GitHub
parent fd65f56031
commit 684f0db4a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 384 additions and 45 deletions

View file

@ -27,6 +27,7 @@ import (
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@ -61,11 +62,11 @@ const (
// small buffer is added to the proof overhead.
txBlobOverhead = uint32(kzg4844.CellProofsPerBlob*len(kzg4844.Proof{}) + 64)
// txMaxSize is the maximum size a single transaction can have, outside
// the included blobs. Since blob transactions are pulled instead of pushed,
// and only a small metadata is kept in ram, the rest is on disk, there is
// no critical limit that should be enforced. Still, capping it to some sane
// limit can never hurt.
// txMaxSize is the maximum size a single transaction can have, including the
// blobs. Since blob transactions are pulled instead of pushed, and only a
// small metadata is kept in ram, the rest is on disk, there is no critical
// limit that should be enforced. Still, capping it to some sane limit can
// never hurt, which is aligned with maxBlobsPerTx constraint enforced internally.
txMaxSize = 1024 * 1024
// maxBlobsPerTx is the maximum number of blobs that a single transaction can
@ -93,6 +94,11 @@ const (
// storeVersion is the current slotter layout used for the billy.Database
// store.
storeVersion = 1
// conversionTimeWindow defines the period after the Osaka fork during which
// the pool will still accept and convert legacy blob transactions. After this
// window, all legacy blob transactions will be rejected.
conversionTimeWindow = time.Hour * 2
)
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
@ -329,12 +335,13 @@ type BlobPool struct {
stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs
signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
cQueue *conversionQueue // The queue for performing legacy sidecar conversion
head *types.Header // Current head of the chain
state *state.StateDB // Current state at the head of the chain
gasTip *uint256.Int // Currently accepted minimum gas tip
head atomic.Pointer[types.Header] // Current head of the chain
state *state.StateDB // Current state at the head of the chain
gasTip atomic.Pointer[uint256.Int] // Currently accepted minimum gas tip
lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries
index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
@ -359,6 +366,7 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()),
chain: chain,
cQueue: newConversionQueue(), // Deprecate it after the osaka fork
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
@ -400,7 +408,8 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
if err != nil {
return err
}
p.head, p.state = head, state
p.head.Store(head)
p.state = state
// Create new slotter for pre-Osaka blob configuration.
slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
@ -440,11 +449,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
p.recheck(addr, nil)
}
var (
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head))
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), head))
blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice)
)
if p.head.ExcessBlobGas != nil {
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head))
if head.ExcessBlobGas != nil {
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), head))
}
p.evict = newPriceHeap(basefee, blobfee, p.index)
@ -474,6 +483,9 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
// Close closes down the underlying persistent store.
func (p *BlobPool) Close() error {
// Terminate the conversion queue
p.cQueue.close()
var errs []error
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
if err := p.limbo.Close(); err != nil {
@ -832,7 +844,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
log.Error("Failed to reset blobpool state", "err", err)
return
}
p.head = newHead
p.head.Store(newHead)
p.state = statedb
// Run the reorg between the old and new head and figure out which accounts
@ -855,7 +867,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
}
}
// Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
if p.chain.Config().IsCancun(newHead.Number, newHead.Time) {
p.limbo.finalize(p.chain.CurrentFinalBlock())
}
// Reset the price heap for the new set of basefee/blobfee pairs
@ -1056,14 +1068,15 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
defer p.lock.Unlock()
// Store the new minimum gas tip
old := p.gasTip
p.gasTip = uint256.MustFromBig(tip)
old := p.gasTip.Load()
newTip := uint256.MustFromBig(tip)
p.gasTip.Store(newTip)
// If the min miner fee increased, remove transactions below the new threshold
if old == nil || p.gasTip.Cmp(old) > 0 {
if old == nil || newTip.Cmp(old) > 0 {
for addr, txs := range p.index {
for i, tx := range txs {
if tx.execTipCap.Cmp(p.gasTip) < 0 {
if tx.execTipCap.Cmp(newTip) < 0 {
// Drop the offending transaction
var (
ids = []uint64{tx.id}
@ -1123,10 +1136,10 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error {
Config: p.chain.Config(),
Accept: 1 << types.BlobTxType,
MaxSize: txMaxSize,
MinTip: p.gasTip.ToBig(),
MinTip: p.gasTip.Load().ToBig(),
MaxBlobCount: maxBlobsPerTx,
}
return txpool.ValidateTransaction(tx, p.head, p.signer, opts)
return txpool.ValidateTransaction(tx, p.head.Load(), p.signer, opts)
}
// checkDelegationLimit determines if the tx sender is delegated or has a
@ -1164,10 +1177,10 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
//
// This function assumes the static validation has been performed already and
// only runs the stateful checks with lock protection.
func (p *BlobPool) validateTx(tx *types.Transaction) error {
if err := p.ValidateTxBasics(tx); err != nil {
return err
}
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
stateOpts := &txpool.ValidationOptionsWithState{
State: p.state,
@ -1444,17 +1457,67 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int {
return available
}
// preCheck performs the static validation upon the provided txs and converts
// the legacy sidecars if Osaka fork has been activated with a short time window.
//
// This function is pure static and lock free.
func (p *BlobPool) preCheck(txs []*types.Transaction) ([]*types.Transaction, []error) {
var (
head = p.head.Load()
isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time)
deadline time.Time
)
if isOsaka {
deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow)
}
var errs []error
for _, tx := range txs {
// Validate the transaction statically at first to avoid unnecessary
// conversion. This step doesn't require lock protection.
if err := p.ValidateTxBasics(tx); err != nil {
errs = append(errs, err)
continue
}
// Before the Osaka fork, reject the blob txs with cell proofs
if !isOsaka {
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
errs = append(errs, nil)
} else {
errs = append(errs, errors.New("cell proof is not supported yet"))
}
continue
}
// After the Osaka fork, reject the legacy blob txs if the conversion
// time window is passed.
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 {
errs = append(errs, nil)
continue
}
if head.Time > uint64(deadline.Unix()) {
errs = append(errs, errors.New("legacy blob tx is not supported"))
continue
}
// Convert the legacy sidecar after Osaka fork. This could be a long
// procedure which takes a few seconds, even minutes if there is a long
// queue. Fortunately it will only block the routine of the source peer
// announcing the tx, without affecting other parts.
errs = append(errs, p.cQueue.convert(tx))
}
return txs, errs
}
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
//
// Note, if sync is set the method will block until all internal maintenance
// related to the add is finished. Only use this during tests for determinism.
func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
var (
errs = make([]error, len(txs))
errs []error
adds = make([]*types.Transaction, 0, len(txs))
)
txs, errs = p.preCheck(txs)
for i, tx := range txs {
if errs[i] != nil {
continue
}
errs[i] = p.add(tx)
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
@ -1949,7 +2012,7 @@ func (p *BlobPool) Clear() {
p.spent = make(map[common.Address]*uint256.Int)
var (
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head))
basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head.Load()))
blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice)
)
p.evict = newPriceHeap(basefee, blobfee, p.index)

View file

@ -88,6 +88,12 @@ type testBlockChain struct {
statedb *state.StateDB
blocks map[uint64]*types.Block
blockTime *uint64
}
func (bc *testBlockChain) setHeadTime(time uint64) {
bc.blockTime = &time
}
func (bc *testBlockChain) Config() *params.ChainConfig {
@ -105,6 +111,10 @@ func (bc *testBlockChain) CurrentBlock() *types.Header {
blockTime = *bc.config.CancunTime + 1
gasLimit = uint64(30_000_000)
)
if bc.blockTime != nil {
blockTime = *bc.blockTime
}
lo := new(big.Int)
hi := new(big.Int).Mul(big.NewInt(5714), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil))
@ -1748,8 +1758,8 @@ func TestAdd(t *testing.T) {
// Add each transaction one by one, verifying the pool internals in between
for j, add := range tt.adds {
signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx)
if err := pool.add(signed); !errors.Is(err, add.err) {
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, err, add.err)
if errs := pool.Add([]*types.Transaction{signed}, true); !errors.Is(errs[0], add.err) {
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, errs[0], add.err)
}
if add.err == nil {
size, exist := pool.lookup.sizeOfTx(signed.Hash())
@ -1796,8 +1806,14 @@ func TestAdd(t *testing.T) {
}
}
// Tests adding transactions with legacy sidecars are correctly rejected.
// Tests that transactions with legacy sidecars are accepted within the
// conversion window but rejected after it has passed.
func TestAddLegacyBlobTx(t *testing.T) {
testAddLegacyBlobTx(t, true) // conversion window has not yet passed
testAddLegacyBlobTx(t, false) // conversion window passed
}
func testAddLegacyBlobTx(t *testing.T, accept bool) {
var (
key1, _ = crypto.GenerateKey()
key2, _ = crypto.GenerateKey()
@ -1817,6 +1833,15 @@ func TestAddLegacyBlobTx(t *testing.T) {
blobfee: uint256.NewInt(105),
statedb: statedb,
}
var timeDiff uint64
if accept {
timeDiff = uint64(conversionTimeWindow.Seconds()) - 1
} else {
timeDiff = uint64(conversionTimeWindow.Seconds()) + 1
}
time := *params.MergedTestChainConfig.OsakaTime + timeDiff
chain.setHeadTime(time)
pool := New(Config{Datadir: t.TempDir()}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
@ -1826,12 +1851,15 @@ func TestAddLegacyBlobTx(t *testing.T) {
var (
tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion0)
tx3 = makeMultiBlobTx(1, 1, 800, 70, 6, 12, key2, types.BlobSidecarVersion1)
txs = []*types.Transaction{tx1, tx2}
)
errs := pool.Add([]*types.Transaction{tx1, tx2, tx3}, true)
errs := pool.Add(txs, true)
for _, err := range errs {
if err == nil {
t.Fatalf("expected tx add to fail")
if accept && err != nil {
t.Fatalf("expected tx add to succeed, %v", err)
}
if !accept && err == nil {
t.Fatal("expected tx add to fail")
}
}
verifyPoolInternals(t, pool)

View file

@ -0,0 +1,152 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"errors"
"slices"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// maxPendingConversionTasks caps the number of pending conversion tasks. This
// prevents excessive memory usage; the worst-case scenario (2k transactions
// with 6 blobs each) would consume approximately 1.5GB of memory.
const maxPendingConversionTasks = 2048
// cTask represents a conversion task with an attached legacy blob transaction.
type cTask struct {
tx *types.Transaction // Legacy blob transaction
done chan error // Channel for signaling back if the conversion succeeds
}
// conversionQueue is a dedicated queue for converting legacy blob transactions
// received from the network after the Osaka fork. Since conversion is expensive,
// it is performed in the background by a single thread, ensuring the main Geth
// process is not overloaded.
type conversionQueue struct {
tasks chan *cTask
quit chan struct{}
closed chan struct{}
}
// newConversionQueue constructs the conversion queue.
func newConversionQueue() *conversionQueue {
q := &conversionQueue{
tasks: make(chan *cTask),
quit: make(chan struct{}),
closed: make(chan struct{}),
}
go q.loop()
return q
}
// convert accepts a legacy blob transaction with version-0 blobs and queues it
// for conversion.
//
// This function may block for a long time until the transaction is processed.
func (q *conversionQueue) convert(tx *types.Transaction) error {
done := make(chan error, 1)
select {
case q.tasks <- &cTask{tx: tx, done: done}:
return <-done
case <-q.closed:
return errors.New("conversion queue closed")
}
}
// close terminates the conversion queue.
func (q *conversionQueue) close() {
select {
case <-q.closed:
return
default:
close(q.quit)
<-q.closed
}
}
// run converts a batch of legacy blob txs to the new cell proof format.
func (q *conversionQueue) run(tasks []*cTask, done chan struct{}, interrupt *atomic.Int32) {
defer close(done)
for _, t := range tasks {
if interrupt != nil && interrupt.Load() != 0 {
t.done <- errors.New("conversion is interrupted")
continue
}
sidecar := t.tx.BlobTxSidecar()
if sidecar == nil {
t.done <- errors.New("tx without sidecar")
continue
}
// Run the conversion, the original sidecar will be mutated in place
start := time.Now()
err := sidecar.ToV1()
t.done <- err
log.Trace("Converted legacy blob tx", "hash", t.tx.Hash(), "err", err, "elapsed", common.PrettyDuration(time.Since(start)))
}
}
func (q *conversionQueue) loop() {
defer close(q.closed)
var (
done chan struct{} // Non-nil if background routine is active
interrupt *atomic.Int32 // Flag to signal conversion interruption
// The pending tasks for sidecar conversion. We assume the number of legacy
// blob transactions requiring conversion will not be excessive. However,
// a hard cap is applied as a protective measure.
cTasks []*cTask
)
for {
select {
case t := <-q.tasks:
if len(cTasks) >= maxPendingConversionTasks {
t.done <- errors.New("conversion queue is overloaded")
continue
}
cTasks = append(cTasks, t)
// Launch the background conversion thread if it's idle
if done == nil {
done, interrupt = make(chan struct{}), new(atomic.Int32)
tasks := slices.Clone(cTasks)
cTasks = cTasks[:0]
go q.run(tasks, done, interrupt)
}
case <-done:
done, interrupt = nil, nil
case <-q.quit:
if done == nil {
return
}
interrupt.Store(1)
log.Debug("Waiting for blob proof conversion to exit")
<-done
return
}
}
}

View file

@ -0,0 +1,101 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"crypto/ecdsa"
"crypto/sha256"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
// createV1BlobTx creates a blob transaction with version 1 sidecar for testing.
func createV1BlobTx(nonce uint64, key *ecdsa.PrivateKey) *types.Transaction {
blob := &kzg4844.Blob{byte(nonce)}
commitment, _ := kzg4844.BlobToCommitment(blob)
cellProofs, _ := kzg4844.ComputeCellProofs(blob)
blobtx := &types.BlobTx{
ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
Nonce: nonce,
GasTipCap: uint256.NewInt(1),
GasFeeCap: uint256.NewInt(1000),
Gas: 21000,
BlobFeeCap: uint256.NewInt(100),
BlobHashes: []common.Hash{kzg4844.CalcBlobHashV1(sha256.New(), &commitment)},
Value: uint256.NewInt(100),
Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, []kzg4844.Blob{*blob}, []kzg4844.Commitment{commitment}, cellProofs),
}
return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
}
func TestConversionQueueBasic(t *testing.T) {
queue := newConversionQueue()
defer queue.close()
key, _ := crypto.GenerateKey()
tx := makeTx(0, 1, 1, 1, key)
if err := queue.convert(tx); err != nil {
t.Fatalf("Expected successful conversion, got error: %v", err)
}
if tx.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Errorf("Expected sidecar version to be %d, got %d", types.BlobSidecarVersion1, tx.BlobTxSidecar().Version)
}
}
func TestConversionQueueV1BlobTx(t *testing.T) {
queue := newConversionQueue()
defer queue.close()
key, _ := crypto.GenerateKey()
tx := createV1BlobTx(0, key)
version := tx.BlobTxSidecar().Version
err := queue.convert(tx)
if err != nil {
t.Fatalf("Expected successful conversion, got error: %v", err)
}
if tx.BlobTxSidecar().Version != version {
t.Errorf("Expected sidecar version to remain %d, got %d", version, tx.BlobTxSidecar().Version)
}
}
func TestConversionQueueClosed(t *testing.T) {
queue := newConversionQueue()
// Close the queue first
queue.close()
key, _ := crypto.GenerateKey()
tx := makeTx(0, 1, 1, 1, key)
err := queue.convert(tx)
if err == nil {
t.Fatal("Expected error when converting on closed queue, got nil")
}
}
func TestConversionQueueDoubleClose(t *testing.T) {
queue := newConversionQueue()
queue.close()
queue.close() // Should not panic
}

View file

@ -176,16 +176,14 @@ func validateBlobTx(tx *types.Transaction, head *types.Header, opts *ValidationO
return err
}
// Fork-specific sidecar checks, including proof verification.
if opts.Config.IsOsaka(head.Number, head.Time) {
if sidecar.Version == types.BlobSidecarVersion1 {
return validateBlobSidecarOsaka(sidecar, hashes)
} else {
return validateBlobSidecarLegacy(sidecar, hashes)
}
return validateBlobSidecarLegacy(sidecar, hashes)
}
func validateBlobSidecarLegacy(sidecar *types.BlobTxSidecar, hashes []common.Hash) error {
if sidecar.Version != types.BlobSidecarVersion0 {
return fmt.Errorf("invalid sidecar version pre-osaka: %v", sidecar.Version)
}
if len(sidecar.Proofs) != len(hashes) {
return fmt.Errorf("invalid number of %d blob proofs expected %d", len(sidecar.Proofs), len(hashes))
}
@ -198,9 +196,6 @@ func validateBlobSidecarLegacy(sidecar *types.BlobTxSidecar, hashes []common.Has
}
func validateBlobSidecarOsaka(sidecar *types.BlobTxSidecar, hashes []common.Hash) error {
if sidecar.Version != types.BlobSidecarVersion1 {
return fmt.Errorf("invalid sidecar version post-osaka: %v", sidecar.Version)
}
if len(sidecar.Proofs) != len(hashes)*kzg4844.CellProofsPerBlob {
return fmt.Errorf("invalid number of %d blob proofs expected %d", len(sidecar.Proofs), len(hashes)*kzg4844.CellProofsPerBlob)
}