diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 2229f1544c..1bf48cf949 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -61,11 +62,11 @@ const ( // small buffer is added to the proof overhead. txBlobOverhead = uint32(kzg4844.CellProofsPerBlob*len(kzg4844.Proof{}) + 64) - // txMaxSize is the maximum size a single transaction can have, outside - // the included blobs. Since blob transactions are pulled instead of pushed, - // and only a small metadata is kept in ram, the rest is on disk, there is - // no critical limit that should be enforced. Still, capping it to some sane - // limit can never hurt. + // txMaxSize is the maximum size a single transaction can have, including the + // blobs. Since blob transactions are pulled instead of pushed, and only a + // small metadata is kept in ram, the rest is on disk, there is no critical + // limit that should be enforced. Still, capping it to some sane limit can + // never hurt, which is aligned with maxBlobsPerTx constraint enforced internally. txMaxSize = 1024 * 1024 // maxBlobsPerTx is the maximum number of blobs that a single transaction can @@ -93,6 +94,11 @@ const ( // storeVersion is the current slotter layout used for the billy.Database // store. storeVersion = 1 + + // conversionTimeWindow defines the period after the Osaka fork during which + // the pool will still accept and convert legacy blob transactions. After this + // window, all legacy blob transactions will be rejected. + conversionTimeWindow = time.Hour * 2 ) // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and @@ -329,12 +335,13 @@ type BlobPool struct { stored uint64 // Useful data size of all transactions on disk limbo *limbo // Persistent data store for the non-finalized blobs - signer types.Signer // Transaction signer to use for sender recovery - chain BlockChain // Chain object to access the state through + signer types.Signer // Transaction signer to use for sender recovery + chain BlockChain // Chain object to access the state through + cQueue *conversionQueue // The queue for performing legacy sidecar conversion - head *types.Header // Current head of the chain - state *state.StateDB // Current state at the head of the chain - gasTip *uint256.Int // Currently accepted minimum gas tip + head atomic.Pointer[types.Header] // Current head of the chain + state *state.StateDB // Current state at the head of the chain + gasTip atomic.Pointer[uint256.Int] // Currently accepted minimum gas tip lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce @@ -359,6 +366,7 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo hasPendingAuth: hasPendingAuth, signer: types.LatestSigner(chain.Config()), chain: chain, + cQueue: newConversionQueue(), // Deprecate it after the osaka fork lookup: newLookup(), index: make(map[common.Address][]*blobTxMeta), spent: make(map[common.Address]*uint256.Int), @@ -400,7 +408,8 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser if err != nil { return err } - p.head, p.state = head, state + p.head.Store(head) + p.state = state // Create new slotter for pre-Osaka blob configuration. slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config())) @@ -440,11 +449,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser p.recheck(addr, nil) } var ( - basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head)) + basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), head)) blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice) ) - if p.head.ExcessBlobGas != nil { - blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head)) + if head.ExcessBlobGas != nil { + blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), head)) } p.evict = newPriceHeap(basefee, blobfee, p.index) @@ -474,6 +483,9 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser // Close closes down the underlying persistent store. func (p *BlobPool) Close() error { + // Terminate the conversion queue + p.cQueue.close() + var errs []error if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set if err := p.limbo.Close(); err != nil { @@ -832,7 +844,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { log.Error("Failed to reset blobpool state", "err", err) return } - p.head = newHead + p.head.Store(newHead) p.state = statedb // Run the reorg between the old and new head and figure out which accounts @@ -855,7 +867,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { } } // Flush out any blobs from limbo that are older than the latest finality - if p.chain.Config().IsCancun(p.head.Number, p.head.Time) { + if p.chain.Config().IsCancun(newHead.Number, newHead.Time) { p.limbo.finalize(p.chain.CurrentFinalBlock()) } // Reset the price heap for the new set of basefee/blobfee pairs @@ -1056,14 +1068,15 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { defer p.lock.Unlock() // Store the new minimum gas tip - old := p.gasTip - p.gasTip = uint256.MustFromBig(tip) + old := p.gasTip.Load() + newTip := uint256.MustFromBig(tip) + p.gasTip.Store(newTip) // If the min miner fee increased, remove transactions below the new threshold - if old == nil || p.gasTip.Cmp(old) > 0 { + if old == nil || newTip.Cmp(old) > 0 { for addr, txs := range p.index { for i, tx := range txs { - if tx.execTipCap.Cmp(p.gasTip) < 0 { + if tx.execTipCap.Cmp(newTip) < 0 { // Drop the offending transaction var ( ids = []uint64{tx.id} @@ -1123,10 +1136,10 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error { Config: p.chain.Config(), Accept: 1 << types.BlobTxType, MaxSize: txMaxSize, - MinTip: p.gasTip.ToBig(), + MinTip: p.gasTip.Load().ToBig(), MaxBlobCount: maxBlobsPerTx, } - return txpool.ValidateTransaction(tx, p.head, p.signer, opts) + return txpool.ValidateTransaction(tx, p.head.Load(), p.signer, opts) } // checkDelegationLimit determines if the tx sender is delegated or has a @@ -1164,10 +1177,10 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error { // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). +// +// This function assumes the static validation has been performed already and +// only runs the stateful checks with lock protection. func (p *BlobPool) validateTx(tx *types.Transaction) error { - if err := p.ValidateTxBasics(tx); err != nil { - return err - } // Ensure the transaction adheres to the stateful pool filters (nonce, balance) stateOpts := &txpool.ValidationOptionsWithState{ State: p.state, @@ -1444,17 +1457,67 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int { return available } +// preCheck performs the static validation upon the provided txs and converts +// the legacy sidecars if Osaka fork has been activated with a short time window. +// +// This function is pure static and lock free. +func (p *BlobPool) preCheck(txs []*types.Transaction) ([]*types.Transaction, []error) { + var ( + head = p.head.Load() + isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time) + deadline time.Time + ) + if isOsaka { + deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow) + } + var errs []error + for _, tx := range txs { + // Validate the transaction statically at first to avoid unnecessary + // conversion. This step doesn't require lock protection. + if err := p.ValidateTxBasics(tx); err != nil { + errs = append(errs, err) + continue + } + // Before the Osaka fork, reject the blob txs with cell proofs + if !isOsaka { + if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 { + errs = append(errs, nil) + } else { + errs = append(errs, errors.New("cell proof is not supported yet")) + } + continue + } + // After the Osaka fork, reject the legacy blob txs if the conversion + // time window is passed. + if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 { + errs = append(errs, nil) + continue + } + if head.Time > uint64(deadline.Unix()) { + errs = append(errs, errors.New("legacy blob tx is not supported")) + continue + } + // Convert the legacy sidecar after Osaka fork. This could be a long + // procedure which takes a few seconds, even minutes if there is a long + // queue. Fortunately it will only block the routine of the source peer + // announcing the tx, without affecting other parts. + errs = append(errs, p.cQueue.convert(tx)) + } + return txs, errs +} + // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). -// -// Note, if sync is set the method will block until all internal maintenance -// related to the add is finished. Only use this during tests for determinism. func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { var ( - errs = make([]error, len(txs)) + errs []error adds = make([]*types.Transaction, 0, len(txs)) ) + txs, errs = p.preCheck(txs) for i, tx := range txs { + if errs[i] != nil { + continue + } errs[i] = p.add(tx) if errs[i] == nil { adds = append(adds, tx.WithoutBlobTxSidecar()) @@ -1949,7 +2012,7 @@ func (p *BlobPool) Clear() { p.spent = make(map[common.Address]*uint256.Int) var ( - basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head)) + basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head.Load())) blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice) ) p.evict = newPriceHeap(basefee, blobfee, p.index) diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 57d27962ce..75a87940bd 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -88,6 +88,12 @@ type testBlockChain struct { statedb *state.StateDB blocks map[uint64]*types.Block + + blockTime *uint64 +} + +func (bc *testBlockChain) setHeadTime(time uint64) { + bc.blockTime = &time } func (bc *testBlockChain) Config() *params.ChainConfig { @@ -105,6 +111,10 @@ func (bc *testBlockChain) CurrentBlock() *types.Header { blockTime = *bc.config.CancunTime + 1 gasLimit = uint64(30_000_000) ) + if bc.blockTime != nil { + blockTime = *bc.blockTime + } + lo := new(big.Int) hi := new(big.Int).Mul(big.NewInt(5714), new(big.Int).Exp(big.NewInt(10), big.NewInt(18), nil)) @@ -1748,8 +1758,8 @@ func TestAdd(t *testing.T) { // Add each transaction one by one, verifying the pool internals in between for j, add := range tt.adds { signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx) - if err := pool.add(signed); !errors.Is(err, add.err) { - t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, err, add.err) + if errs := pool.Add([]*types.Transaction{signed}, true); !errors.Is(errs[0], add.err) { + t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, errs[0], add.err) } if add.err == nil { size, exist := pool.lookup.sizeOfTx(signed.Hash()) @@ -1796,8 +1806,14 @@ func TestAdd(t *testing.T) { } } -// Tests adding transactions with legacy sidecars are correctly rejected. +// Tests that transactions with legacy sidecars are accepted within the +// conversion window but rejected after it has passed. func TestAddLegacyBlobTx(t *testing.T) { + testAddLegacyBlobTx(t, true) // conversion window has not yet passed + testAddLegacyBlobTx(t, false) // conversion window passed +} + +func testAddLegacyBlobTx(t *testing.T, accept bool) { var ( key1, _ = crypto.GenerateKey() key2, _ = crypto.GenerateKey() @@ -1817,6 +1833,15 @@ func TestAddLegacyBlobTx(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } + var timeDiff uint64 + if accept { + timeDiff = uint64(conversionTimeWindow.Seconds()) - 1 + } else { + timeDiff = uint64(conversionTimeWindow.Seconds()) + 1 + } + time := *params.MergedTestChainConfig.OsakaTime + timeDiff + chain.setHeadTime(time) + pool := New(Config{Datadir: t.TempDir()}, chain, nil) if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) @@ -1826,12 +1851,15 @@ func TestAddLegacyBlobTx(t *testing.T) { var ( tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0) tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion0) - tx3 = makeMultiBlobTx(1, 1, 800, 70, 6, 12, key2, types.BlobSidecarVersion1) + txs = []*types.Transaction{tx1, tx2} ) - errs := pool.Add([]*types.Transaction{tx1, tx2, tx3}, true) + errs := pool.Add(txs, true) for _, err := range errs { - if err == nil { - t.Fatalf("expected tx add to fail") + if accept && err != nil { + t.Fatalf("expected tx add to succeed, %v", err) + } + if !accept && err == nil { + t.Fatal("expected tx add to fail") } } verifyPoolInternals(t, pool) diff --git a/core/txpool/blobpool/conversion.go b/core/txpool/blobpool/conversion.go new file mode 100644 index 0000000000..5026892fc8 --- /dev/null +++ b/core/txpool/blobpool/conversion.go @@ -0,0 +1,152 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "errors" + "slices" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// maxPendingConversionTasks caps the number of pending conversion tasks. This +// prevents excessive memory usage; the worst-case scenario (2k transactions +// with 6 blobs each) would consume approximately 1.5GB of memory. +const maxPendingConversionTasks = 2048 + +// cTask represents a conversion task with an attached legacy blob transaction. +type cTask struct { + tx *types.Transaction // Legacy blob transaction + done chan error // Channel for signaling back if the conversion succeeds +} + +// conversionQueue is a dedicated queue for converting legacy blob transactions +// received from the network after the Osaka fork. Since conversion is expensive, +// it is performed in the background by a single thread, ensuring the main Geth +// process is not overloaded. +type conversionQueue struct { + tasks chan *cTask + quit chan struct{} + closed chan struct{} +} + +// newConversionQueue constructs the conversion queue. +func newConversionQueue() *conversionQueue { + q := &conversionQueue{ + tasks: make(chan *cTask), + quit: make(chan struct{}), + closed: make(chan struct{}), + } + go q.loop() + return q +} + +// convert accepts a legacy blob transaction with version-0 blobs and queues it +// for conversion. +// +// This function may block for a long time until the transaction is processed. +func (q *conversionQueue) convert(tx *types.Transaction) error { + done := make(chan error, 1) + select { + case q.tasks <- &cTask{tx: tx, done: done}: + return <-done + case <-q.closed: + return errors.New("conversion queue closed") + } +} + +// close terminates the conversion queue. +func (q *conversionQueue) close() { + select { + case <-q.closed: + return + default: + close(q.quit) + <-q.closed + } +} + +// run converts a batch of legacy blob txs to the new cell proof format. +func (q *conversionQueue) run(tasks []*cTask, done chan struct{}, interrupt *atomic.Int32) { + defer close(done) + + for _, t := range tasks { + if interrupt != nil && interrupt.Load() != 0 { + t.done <- errors.New("conversion is interrupted") + continue + } + sidecar := t.tx.BlobTxSidecar() + if sidecar == nil { + t.done <- errors.New("tx without sidecar") + continue + } + // Run the conversion, the original sidecar will be mutated in place + start := time.Now() + err := sidecar.ToV1() + t.done <- err + log.Trace("Converted legacy blob tx", "hash", t.tx.Hash(), "err", err, "elapsed", common.PrettyDuration(time.Since(start))) + } +} + +func (q *conversionQueue) loop() { + defer close(q.closed) + + var ( + done chan struct{} // Non-nil if background routine is active + interrupt *atomic.Int32 // Flag to signal conversion interruption + + // The pending tasks for sidecar conversion. We assume the number of legacy + // blob transactions requiring conversion will not be excessive. However, + // a hard cap is applied as a protective measure. + cTasks []*cTask + ) + for { + select { + case t := <-q.tasks: + if len(cTasks) >= maxPendingConversionTasks { + t.done <- errors.New("conversion queue is overloaded") + continue + } + cTasks = append(cTasks, t) + + // Launch the background conversion thread if it's idle + if done == nil { + done, interrupt = make(chan struct{}), new(atomic.Int32) + + tasks := slices.Clone(cTasks) + cTasks = cTasks[:0] + go q.run(tasks, done, interrupt) + } + + case <-done: + done, interrupt = nil, nil + + case <-q.quit: + if done == nil { + return + } + interrupt.Store(1) + log.Debug("Waiting for blob proof conversion to exit") + <-done + return + } + } +} diff --git a/core/txpool/blobpool/conversion_test.go b/core/txpool/blobpool/conversion_test.go new file mode 100644 index 0000000000..a9fd26dbaf --- /dev/null +++ b/core/txpool/blobpool/conversion_test.go @@ -0,0 +1,101 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "crypto/ecdsa" + "crypto/sha256" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/uint256" +) + +// createV1BlobTx creates a blob transaction with version 1 sidecar for testing. +func createV1BlobTx(nonce uint64, key *ecdsa.PrivateKey) *types.Transaction { + blob := &kzg4844.Blob{byte(nonce)} + commitment, _ := kzg4844.BlobToCommitment(blob) + cellProofs, _ := kzg4844.ComputeCellProofs(blob) + + blobtx := &types.BlobTx{ + ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID), + Nonce: nonce, + GasTipCap: uint256.NewInt(1), + GasFeeCap: uint256.NewInt(1000), + Gas: 21000, + BlobFeeCap: uint256.NewInt(100), + BlobHashes: []common.Hash{kzg4844.CalcBlobHashV1(sha256.New(), &commitment)}, + Value: uint256.NewInt(100), + Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, []kzg4844.Blob{*blob}, []kzg4844.Commitment{commitment}, cellProofs), + } + return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx) +} + +func TestConversionQueueBasic(t *testing.T) { + queue := newConversionQueue() + defer queue.close() + + key, _ := crypto.GenerateKey() + tx := makeTx(0, 1, 1, 1, key) + if err := queue.convert(tx); err != nil { + t.Fatalf("Expected successful conversion, got error: %v", err) + } + if tx.BlobTxSidecar().Version != types.BlobSidecarVersion1 { + t.Errorf("Expected sidecar version to be %d, got %d", types.BlobSidecarVersion1, tx.BlobTxSidecar().Version) + } +} + +func TestConversionQueueV1BlobTx(t *testing.T) { + queue := newConversionQueue() + defer queue.close() + + key, _ := crypto.GenerateKey() + tx := createV1BlobTx(0, key) + version := tx.BlobTxSidecar().Version + + err := queue.convert(tx) + if err != nil { + t.Fatalf("Expected successful conversion, got error: %v", err) + } + if tx.BlobTxSidecar().Version != version { + t.Errorf("Expected sidecar version to remain %d, got %d", version, tx.BlobTxSidecar().Version) + } +} + +func TestConversionQueueClosed(t *testing.T) { + queue := newConversionQueue() + + // Close the queue first + queue.close() + key, _ := crypto.GenerateKey() + tx := makeTx(0, 1, 1, 1, key) + + err := queue.convert(tx) + if err == nil { + t.Fatal("Expected error when converting on closed queue, got nil") + } +} + +func TestConversionQueueDoubleClose(t *testing.T) { + queue := newConversionQueue() + queue.close() + queue.close() // Should not panic +} diff --git a/core/txpool/validation.go b/core/txpool/validation.go index 46974fad3c..df53f30a86 100644 --- a/core/txpool/validation.go +++ b/core/txpool/validation.go @@ -176,16 +176,14 @@ func validateBlobTx(tx *types.Transaction, head *types.Header, opts *ValidationO return err } // Fork-specific sidecar checks, including proof verification. - if opts.Config.IsOsaka(head.Number, head.Time) { + if sidecar.Version == types.BlobSidecarVersion1 { return validateBlobSidecarOsaka(sidecar, hashes) + } else { + return validateBlobSidecarLegacy(sidecar, hashes) } - return validateBlobSidecarLegacy(sidecar, hashes) } func validateBlobSidecarLegacy(sidecar *types.BlobTxSidecar, hashes []common.Hash) error { - if sidecar.Version != types.BlobSidecarVersion0 { - return fmt.Errorf("invalid sidecar version pre-osaka: %v", sidecar.Version) - } if len(sidecar.Proofs) != len(hashes) { return fmt.Errorf("invalid number of %d blob proofs expected %d", len(sidecar.Proofs), len(hashes)) } @@ -198,9 +196,6 @@ func validateBlobSidecarLegacy(sidecar *types.BlobTxSidecar, hashes []common.Has } func validateBlobSidecarOsaka(sidecar *types.BlobTxSidecar, hashes []common.Hash) error { - if sidecar.Version != types.BlobSidecarVersion1 { - return fmt.Errorf("invalid sidecar version post-osaka: %v", sidecar.Version) - } if len(sidecar.Proofs) != len(hashes)*kzg4844.CellProofsPerBlob { return fmt.Errorf("invalid number of %d blob proofs expected %d", len(sidecar.Proofs), len(hashes)*kzg4844.CellProofsPerBlob) }