diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index db965bc71a..f4aa406e2a 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -21,10 +21,12 @@ import ( "container/heap" "errors" "fmt" + "maps" "math" "math/big" "os" "path/filepath" + "slices" "sort" "sync" "sync/atomic" @@ -337,7 +339,7 @@ type BlobPool struct { signer types.Signer // Transaction signer to use for sender recovery chain BlockChain // Chain object to access the state through - cQueue *conversionQueue // The queue for performing legacy sidecar conversion + cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka) head atomic.Pointer[types.Header] // Current head of the chain state *state.StateDB // Current state at the head of the chain @@ -883,6 +885,172 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { basefeeGauge.Update(int64(basefee.Uint64())) blobfeeGauge.Update(int64(blobfee.Uint64())) p.updateStorageMetrics() + + // Perform the conversion logic at the fork boundary + if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) { + // Deep copy all indexed transaction metadata. + var ( + ids = make(map[common.Address]map[uint64]uint64) + txs = make(map[common.Address]map[uint64]common.Hash) + ) + for sender, list := range p.index { + ids[sender] = make(map[uint64]uint64) + txs[sender] = make(map[uint64]common.Hash) + for _, m := range list { + ids[sender][m.nonce] = m.id + txs[sender][m.nonce] = m.hash + } + } + // Initiate the background conversion thread. + p.cQueue.launchBillyConversion(func() { + p.convertLegacySidecars(ids, txs) + }) + } +} + +// compareAndSwap checks if the specified transaction is still tracked in the pool +// and replace the metadata accordingly. It should only be used in the fork boundary +// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped +// for simplicity which we assume it's very likely to happen. +// +// The returned flag indicates whether the replacement succeeded. +func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool { + p.lock.Lock() + defer p.lock.Unlock() + + newId, err := p.store.Put(blob) + if err != nil { + log.Error("Failed to store transaction", "hash", hash, "err", err) + return false + } + newSize := uint64(len(blob)) + newStorageSize := p.store.Size(newId) + + // Terminate the procedure if the transaction was already evicted. The + // newly added blob should be removed before return. + if !p.lookup.update(hash, newId, newSize) { + if derr := p.store.Delete(newId); derr != nil { + log.Error("Failed to delete the dangling blob tx", "err", derr) + } else { + log.Warn("Deleted the dangling blob tx", "id", newId) + } + return false + } + // Update the metadata of blob transaction + for _, meta := range p.index[address] { + if meta.hash == hash { + meta.id = newId + meta.version = types.BlobSidecarVersion1 + meta.storageSize = newStorageSize + meta.size = newSize + + p.stored += uint64(newStorageSize) + p.stored -= uint64(oldStorageSize) + break + } + } + if err := p.store.Delete(oldID); err != nil { + log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err) + } + return true +} + +// convertLegacySidecar fetches transaction data from the store, performs an +// on-the-fly conversion. This function is intended for use only during the +// Osaka fork transition period. +// +// The returned flag indicates whether the replacement succeeds or not. +func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool { + start := time.Now() + + // Retrieves the legacy blob transaction from the underlying store with + // read lock held, preventing any potential data race around the slot + // specified by the id. + p.lock.RLock() + data, err := p.store.Get(id) + if err != nil { + p.lock.RUnlock() + // The transaction may have been evicted simultaneously, safe to skip conversion. + log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err) + return false + } + oldStorageSize := p.store.Size(id) + p.lock.RUnlock() + + // Decode the transaction, the failure is not expected and report the error + // loudly if possible. If the blob transaction in this slot is corrupted, + // leave it in the store, it will be dropped during the next pool + // initialization. + var tx types.Transaction + if err = rlp.DecodeBytes(data, &tx); err != nil { + log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err) + return false + } + + // Skip conversion if the transaction does not match the expected hash, or if it was + // already converted. This can occur if the original transaction was evicted from the + // pool and the slot was reused by a new one. + if tx.Hash() != hash { + log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash()) + return false + } + sc := tx.BlobTxSidecar() + if sc.Version >= types.BlobSidecarVersion1 { + log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id) + return false + } + + // Perform the sidecar conversion, the failure is not expected and report the error + // loudly if possible. + if err := tx.BlobTxSidecar().ToV1(); err != nil { + log.Error("Failed to convert blob transaction", "hash", hash, "err", err) + return false + } + + // Encode the converted transaction, the failure is not expected and report + // the error loudly if possible. + blob, err := rlp.EncodeToBytes(&tx) + if err != nil { + log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err) + return false + } + + // Replace the legacy blob transaction with the converted format. + if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) { + log.Error("Failed to replace the legacy transaction", "hash", hash) + return false + } + log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) + return true +} + +// convertLegacySidecars converts all given transactions to sidecar version 1. +// +// If any of them fails to be converted, the subsequent transactions will still +// be processed, as we assume the failure is very unlikely to happen. If happens, +// these transactions will be stuck in the pool until eviction. +func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) { + var ( + start = time.Now() + success int + failure int + ) + for addr, list := range txs { + // Transactions evicted from the pool must be contiguous, if in any case, + // the transactions are gapped with each other, they will be discarded. + nonces := slices.Collect(maps.Keys(list)) + slices.Sort(nonces) + + // Convert the txs with nonce order + for _, nonce := range nonces { + if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) { + success++ + } else { + failure++ + } + } + } + log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start))) } // reorg assembles all the transactors and missing transactions between an old diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 75a87940bd..f0f00c8055 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -2098,6 +2098,185 @@ func TestGetBlobs(t *testing.T) { pool.Close() } +// TestSidecarConversion will verify that after the Osaka fork, all legacy +// sidecars in the pool are successfully convert to v1 sidecars. +func TestSidecarConversion(t *testing.T) { + // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) + + // Create a temporary folder for the persistent backend + storage := t.TempDir() + os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700) + + var ( + preOsakaTxs = make(types.Transactions, 10) + postOsakaTxs = make(types.Transactions, 3) + keys = make([]*ecdsa.PrivateKey, len(preOsakaTxs)+len(postOsakaTxs)) + addrs = make([]common.Address, len(preOsakaTxs)+len(postOsakaTxs)) + statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + ) + for i := range keys { + keys[i], _ = crypto.GenerateKey() + addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey) + statedb.AddBalance(addrs[i], uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + } + for i := range preOsakaTxs { + preOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 2, 0, keys[i], types.BlobSidecarVersion0) + } + for i := range postOsakaTxs { + if i == 0 { + // First has a v0 sidecar. + postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion0) + } + postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion1) + } + statedb.Commit(0, true, false) + + // Test plan: + // 1) Create a bunch v0 sidecar txs and add to pool before Osaka. + // 2) Pass in new Osaka header to activate the conversion thread. + // 3) Continue adding both v0 and v1 transactions to the pool. + // 4) Verify that as additional blocks come in, transactions involved in the + // migration are correctly discarded. + + config := ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + LondonBlock: big.NewInt(0), + BerlinBlock: big.NewInt(0), + CancunTime: newUint64(0), + PragueTime: newUint64(0), + OsakaTime: newUint64(1), + BlobScheduleConfig: params.DefaultBlobSchedule, + } + chain := &testBlockChain{ + config: config, + basefee: uint256.NewInt(1050), + blobfee: uint256.NewInt(105), + statedb: statedb, + blocks: make(map[uint64]*types.Block), + } + + // Create 3 blocks: + // - the current block, before Osaka + // - the first block after Osaka + // - another post-Osaka block with several transactions in it + header0 := chain.CurrentBlock() + header0.Time = 0 + chain.blocks[0] = types.NewBlockWithHeader(header0) + + header1 := chain.CurrentBlock() + header1.Number = big.NewInt(1) + header1.Time = 1 + chain.blocks[1] = types.NewBlockWithHeader(header1) + + header2 := chain.CurrentBlock() + header2.Time = 2 + header2.Number = big.NewInt(2) + + // Make a copy of one of the pre-Osaka transactions and convert it to v1 here + // so that we can add it to the pool later and ensure a duplicate is not added + // by the conversion queue. + tx := preOsakaTxs[len(preOsakaTxs)-1] + sc := *tx.BlobTxSidecar() // copy sidecar + sc.ToV1() + tx.WithBlobTxSidecar(&sc) + + block2 := types.NewBlockWithHeader(header2).WithBody(types.Body{Transactions: append(postOsakaTxs, tx)}) + chain.blocks[2] = block2 + + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, header0, newReserver()); err != nil { + t.Fatalf("failed to create blob pool: %v", err) + } + + errs := pool.Add(preOsakaTxs, true) + for i, err := range errs { + if err != nil { + t.Errorf("failed to insert blob tx from %s: %s", addrs[i], errs[i]) + } + } + + // Kick off migration. + pool.Reset(header0, header1) + + // Add the v0 sidecar tx, but don't block so we can keep doing other stuff + // while it converts the sidecar. + addDone := make(chan struct{}) + go func() { + pool.Add(types.Transactions{postOsakaTxs[0]}, false) + close(addDone) + }() + + // Add the post-Osaka v1 sidecar txs. + errs = pool.Add(postOsakaTxs[1:], false) + for _, err := range errs { + if err != nil { + t.Fatalf("expected tx add to succeed: %v", err) + } + } + + // Wait for the first tx's conversion to complete, then check that all + // transactions added after Osaka can be accounted for in the pool. + <-addDone + pending := pool.Pending(txpool.PendingFilter{BlobTxs: true, BlobVersion: types.BlobSidecarVersion1}) + for _, tx := range postOsakaTxs { + from, _ := pool.signer.Sender(tx) + if len(pending[from]) != 1 || pending[from][0].Hash != tx.Hash() { + t.Fatalf("expected post-Osaka txs to be pending") + } + } + + // Now update the pool with the next block. This should cause the pool to + // clear out the post-Osaka txs since they were included in block 2. Since the + // test blockchain doesn't manage nonces, we'll just do that manually before + // the reset is called. Don't forget about the pre-Osaka transaction we also + // added to block 2! + for i := range postOsakaTxs { + statedb.SetNonce(addrs[len(preOsakaTxs)+i], 1, tracing.NonceChangeEoACall) + } + statedb.SetNonce(addrs[len(preOsakaTxs)-1], 1, tracing.NonceChangeEoACall) + pool.Reset(header1, block2.Header()) + + // Now verify no post-Osaka transactions are tracked by the pool. + for i, tx := range postOsakaTxs { + if pool.Get(tx.Hash()) != nil { + t.Fatalf("expected txs added post-osaka to have been placed in limbo due to inclusion in a block: index %d, hash %s", i, tx.Hash()) + } + } + + // Wait for the pool migration to complete. + <-pool.cQueue.anyBillyConversionDone + + // Verify all transactions in the pool were converted and verify the + // subsequent cell proofs. + count, _ := pool.Stats() + if count != len(preOsakaTxs)-1 { + t.Errorf("expected pending count to match initial tx count: pending=%d, expected=%d", count, len(preOsakaTxs)-1) + } + for addr, acc := range pool.index { + for _, m := range acc { + if m.version != types.BlobSidecarVersion1 { + t.Errorf("expected sidecar to have been converted: from %s, hash %s", addr, m.hash) + } + tx := pool.Get(m.hash) + if tx == nil { + t.Errorf("failed to get tx by hash: %s", m.hash) + } + sc := tx.BlobTxSidecar() + if err := kzg4844.VerifyCellProofs(sc.Blobs, sc.Commitments, sc.Proofs); err != nil { + t.Errorf("failed to verify cell proofs for tx %s after conversion: %s", m.hash, err) + } + } + } + + verifyPoolInternals(t, pool) + + // Launch conversion a second time. + // This is just a sanity check to ensure we can handle it. + pool.Reset(header0, header1) + + pool.Close() +} + // fakeBilly is a billy.Database implementation which just drops data on the floor. type fakeBilly struct { billy.Database @@ -2180,3 +2359,5 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { } } } + +func newUint64(val uint64) *uint64 { return &val } diff --git a/core/txpool/blobpool/conversion.go b/core/txpool/blobpool/conversion.go index 5026892fc8..95828d83b2 100644 --- a/core/txpool/blobpool/conversion.go +++ b/core/txpool/blobpool/conversion.go @@ -32,8 +32,8 @@ import ( // with 6 blobs each) would consume approximately 1.5GB of memory. const maxPendingConversionTasks = 2048 -// cTask represents a conversion task with an attached legacy blob transaction. -type cTask struct { +// txConvert represents a conversion task with an attached legacy blob transaction. +type txConvert struct { tx *types.Transaction // Legacy blob transaction done chan error // Channel for signaling back if the conversion succeeds } @@ -43,17 +43,27 @@ type cTask struct { // it is performed in the background by a single thread, ensuring the main Geth // process is not overloaded. type conversionQueue struct { - tasks chan *cTask - quit chan struct{} - closed chan struct{} + tasks chan *txConvert + startBilly chan func() + quit chan struct{} + closed chan struct{} + + billyQueue []func() + billyTaskDone chan struct{} + + // This channel will be closed when the first billy conversion finishes. + // It's added for unit tests to synchronize with the conversion progress. + anyBillyConversionDone chan struct{} } // newConversionQueue constructs the conversion queue. func newConversionQueue() *conversionQueue { q := &conversionQueue{ - tasks: make(chan *cTask), - quit: make(chan struct{}), - closed: make(chan struct{}), + tasks: make(chan *txConvert), + startBilly: make(chan func()), + quit: make(chan struct{}), + closed: make(chan struct{}), + anyBillyConversionDone: make(chan struct{}), } go q.loop() return q @@ -66,13 +76,23 @@ func newConversionQueue() *conversionQueue { func (q *conversionQueue) convert(tx *types.Transaction) error { done := make(chan error, 1) select { - case q.tasks <- &cTask{tx: tx, done: done}: + case q.tasks <- &txConvert{tx: tx, done: done}: return <-done case <-q.closed: return errors.New("conversion queue closed") } } +// launchBillyConversion starts a conversion task in the background. +func (q *conversionQueue) launchBillyConversion(fn func()) error { + select { + case q.startBilly <- fn: + return nil + case <-q.closed: + return errors.New("conversion queue closed") + } +} + // close terminates the conversion queue. func (q *conversionQueue) close() { select { @@ -85,7 +105,7 @@ func (q *conversionQueue) close() { } // run converts a batch of legacy blob txs to the new cell proof format. -func (q *conversionQueue) run(tasks []*cTask, done chan struct{}, interrupt *atomic.Int32) { +func (q *conversionQueue) run(tasks []*txConvert, done chan struct{}, interrupt *atomic.Int32) { defer close(done) for _, t := range tasks { @@ -116,37 +136,68 @@ func (q *conversionQueue) loop() { // The pending tasks for sidecar conversion. We assume the number of legacy // blob transactions requiring conversion will not be excessive. However, // a hard cap is applied as a protective measure. - cTasks []*cTask + txTasks []*txConvert + + firstBilly = true ) + for { select { case t := <-q.tasks: - if len(cTasks) >= maxPendingConversionTasks { + if len(txTasks) >= maxPendingConversionTasks { t.done <- errors.New("conversion queue is overloaded") continue } - cTasks = append(cTasks, t) + txTasks = append(txTasks, t) // Launch the background conversion thread if it's idle if done == nil { done, interrupt = make(chan struct{}), new(atomic.Int32) - tasks := slices.Clone(cTasks) - cTasks = cTasks[:0] + tasks := slices.Clone(txTasks) + txTasks = txTasks[:0] go q.run(tasks, done, interrupt) } case <-done: done, interrupt = nil, nil - case <-q.quit: - if done == nil { - return + case fn := <-q.startBilly: + q.billyQueue = append(q.billyQueue, fn) + q.runNextBillyTask() + + case <-q.billyTaskDone: + if firstBilly { + close(q.anyBillyConversionDone) + firstBilly = false + } + q.runNextBillyTask() + + case <-q.quit: + if done != nil { + log.Debug("Waiting for blob proof conversion to exit") + interrupt.Store(1) + <-done + } + if q.billyTaskDone != nil { + log.Debug("Waiting for blobpool billy conversion to exit") + <-q.billyTaskDone } - interrupt.Store(1) - log.Debug("Waiting for blob proof conversion to exit") - <-done return } } } + +func (q *conversionQueue) runNextBillyTask() { + if len(q.billyQueue) == 0 { + q.billyTaskDone = nil + return + } + + fn := q.billyQueue[0] + q.billyQueue = append(q.billyQueue[:0], q.billyQueue[1:]...) + + done := make(chan struct{}) + go func() { defer close(done); fn() }() + q.billyTaskDone = done +} diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go index 7607cd487a..874ca85b8c 100644 --- a/core/txpool/blobpool/lookup.go +++ b/core/txpool/blobpool/lookup.go @@ -110,3 +110,13 @@ func (l *lookup) untrack(tx *blobTxMeta) { } } } + +// update updates the transaction index. It should only be used in the conversion. +func (l *lookup) update(hash common.Hash, id uint64, size uint64) bool { + meta, exists := l.txIndex[hash] + if !exists { + return false + } + meta.id, meta.size = id, size + return true +}