mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
core/txpool/blobpool: fork boundary conversion 3 (#32716)
This implements the conversion of existing blob transactions to the new proof version. Conversion is triggered at the Osaka fork boundary. The conversion is designed to be idempotent, and may be triggered multiple times in case of a reorg around the fork boundary. This change is the last missing piece that completes our strategy for the blobpool conversion. After the Osaka fork, - new transactions will be converted on-the-fly upon entry to the pool - reorged transactions will be converted while being reinjected - (this change) existing transactions will be converted in the background --------- Co-authored-by: Gary Rong <garyrong0905@gmail.com> Co-authored-by: lightclient <lightclient@protonmail.com>
This commit is contained in:
parent
bc451546ae
commit
ad55a3e07f
4 changed files with 432 additions and 22 deletions
|
|
@ -21,10 +21,12 @@ import (
|
|||
"container/heap"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math"
|
||||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
|
@ -337,7 +339,7 @@ type BlobPool struct {
|
|||
|
||||
signer types.Signer // Transaction signer to use for sender recovery
|
||||
chain BlockChain // Chain object to access the state through
|
||||
cQueue *conversionQueue // The queue for performing legacy sidecar conversion
|
||||
cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
|
||||
|
||||
head atomic.Pointer[types.Header] // Current head of the chain
|
||||
state *state.StateDB // Current state at the head of the chain
|
||||
|
|
@ -883,6 +885,172 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
|
|||
basefeeGauge.Update(int64(basefee.Uint64()))
|
||||
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
||||
p.updateStorageMetrics()
|
||||
|
||||
// Perform the conversion logic at the fork boundary
|
||||
if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) {
|
||||
// Deep copy all indexed transaction metadata.
|
||||
var (
|
||||
ids = make(map[common.Address]map[uint64]uint64)
|
||||
txs = make(map[common.Address]map[uint64]common.Hash)
|
||||
)
|
||||
for sender, list := range p.index {
|
||||
ids[sender] = make(map[uint64]uint64)
|
||||
txs[sender] = make(map[uint64]common.Hash)
|
||||
for _, m := range list {
|
||||
ids[sender][m.nonce] = m.id
|
||||
txs[sender][m.nonce] = m.hash
|
||||
}
|
||||
}
|
||||
// Initiate the background conversion thread.
|
||||
p.cQueue.launchBillyConversion(func() {
|
||||
p.convertLegacySidecars(ids, txs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// compareAndSwap checks if the specified transaction is still tracked in the pool
|
||||
// and replace the metadata accordingly. It should only be used in the fork boundary
|
||||
// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped
|
||||
// for simplicity which we assume it's very likely to happen.
|
||||
//
|
||||
// The returned flag indicates whether the replacement succeeded.
|
||||
func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
newId, err := p.store.Put(blob)
|
||||
if err != nil {
|
||||
log.Error("Failed to store transaction", "hash", hash, "err", err)
|
||||
return false
|
||||
}
|
||||
newSize := uint64(len(blob))
|
||||
newStorageSize := p.store.Size(newId)
|
||||
|
||||
// Terminate the procedure if the transaction was already evicted. The
|
||||
// newly added blob should be removed before return.
|
||||
if !p.lookup.update(hash, newId, newSize) {
|
||||
if derr := p.store.Delete(newId); derr != nil {
|
||||
log.Error("Failed to delete the dangling blob tx", "err", derr)
|
||||
} else {
|
||||
log.Warn("Deleted the dangling blob tx", "id", newId)
|
||||
}
|
||||
return false
|
||||
}
|
||||
// Update the metadata of blob transaction
|
||||
for _, meta := range p.index[address] {
|
||||
if meta.hash == hash {
|
||||
meta.id = newId
|
||||
meta.version = types.BlobSidecarVersion1
|
||||
meta.storageSize = newStorageSize
|
||||
meta.size = newSize
|
||||
|
||||
p.stored += uint64(newStorageSize)
|
||||
p.stored -= uint64(oldStorageSize)
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := p.store.Delete(oldID); err != nil {
|
||||
log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// convertLegacySidecar fetches transaction data from the store, performs an
|
||||
// on-the-fly conversion. This function is intended for use only during the
|
||||
// Osaka fork transition period.
|
||||
//
|
||||
// The returned flag indicates whether the replacement succeeds or not.
|
||||
func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool {
|
||||
start := time.Now()
|
||||
|
||||
// Retrieves the legacy blob transaction from the underlying store with
|
||||
// read lock held, preventing any potential data race around the slot
|
||||
// specified by the id.
|
||||
p.lock.RLock()
|
||||
data, err := p.store.Get(id)
|
||||
if err != nil {
|
||||
p.lock.RUnlock()
|
||||
// The transaction may have been evicted simultaneously, safe to skip conversion.
|
||||
log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err)
|
||||
return false
|
||||
}
|
||||
oldStorageSize := p.store.Size(id)
|
||||
p.lock.RUnlock()
|
||||
|
||||
// Decode the transaction, the failure is not expected and report the error
|
||||
// loudly if possible. If the blob transaction in this slot is corrupted,
|
||||
// leave it in the store, it will be dropped during the next pool
|
||||
// initialization.
|
||||
var tx types.Transaction
|
||||
if err = rlp.DecodeBytes(data, &tx); err != nil {
|
||||
log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Skip conversion if the transaction does not match the expected hash, or if it was
|
||||
// already converted. This can occur if the original transaction was evicted from the
|
||||
// pool and the slot was reused by a new one.
|
||||
if tx.Hash() != hash {
|
||||
log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash())
|
||||
return false
|
||||
}
|
||||
sc := tx.BlobTxSidecar()
|
||||
if sc.Version >= types.BlobSidecarVersion1 {
|
||||
log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id)
|
||||
return false
|
||||
}
|
||||
|
||||
// Perform the sidecar conversion, the failure is not expected and report the error
|
||||
// loudly if possible.
|
||||
if err := tx.BlobTxSidecar().ToV1(); err != nil {
|
||||
log.Error("Failed to convert blob transaction", "hash", hash, "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Encode the converted transaction, the failure is not expected and report
|
||||
// the error loudly if possible.
|
||||
blob, err := rlp.EncodeToBytes(&tx)
|
||||
if err != nil {
|
||||
log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Replace the legacy blob transaction with the converted format.
|
||||
if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) {
|
||||
log.Error("Failed to replace the legacy transaction", "hash", hash)
|
||||
return false
|
||||
}
|
||||
log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return true
|
||||
}
|
||||
|
||||
// convertLegacySidecars converts all given transactions to sidecar version 1.
|
||||
//
|
||||
// If any of them fails to be converted, the subsequent transactions will still
|
||||
// be processed, as we assume the failure is very unlikely to happen. If happens,
|
||||
// these transactions will be stuck in the pool until eviction.
|
||||
func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) {
|
||||
var (
|
||||
start = time.Now()
|
||||
success int
|
||||
failure int
|
||||
)
|
||||
for addr, list := range txs {
|
||||
// Transactions evicted from the pool must be contiguous, if in any case,
|
||||
// the transactions are gapped with each other, they will be discarded.
|
||||
nonces := slices.Collect(maps.Keys(list))
|
||||
slices.Sort(nonces)
|
||||
|
||||
// Convert the txs with nonce order
|
||||
for _, nonce := range nonces {
|
||||
if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) {
|
||||
success++
|
||||
} else {
|
||||
failure++
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
|
||||
// reorg assembles all the transactors and missing transactions between an old
|
||||
|
|
|
|||
|
|
@ -2098,6 +2098,185 @@ func TestGetBlobs(t *testing.T) {
|
|||
pool.Close()
|
||||
}
|
||||
|
||||
// TestSidecarConversion will verify that after the Osaka fork, all legacy
|
||||
// sidecars in the pool are successfully convert to v1 sidecars.
|
||||
func TestSidecarConversion(t *testing.T) {
|
||||
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
|
||||
|
||||
// Create a temporary folder for the persistent backend
|
||||
storage := t.TempDir()
|
||||
os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
|
||||
|
||||
var (
|
||||
preOsakaTxs = make(types.Transactions, 10)
|
||||
postOsakaTxs = make(types.Transactions, 3)
|
||||
keys = make([]*ecdsa.PrivateKey, len(preOsakaTxs)+len(postOsakaTxs))
|
||||
addrs = make([]common.Address, len(preOsakaTxs)+len(postOsakaTxs))
|
||||
statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
|
||||
)
|
||||
for i := range keys {
|
||||
keys[i], _ = crypto.GenerateKey()
|
||||
addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey)
|
||||
statedb.AddBalance(addrs[i], uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
|
||||
}
|
||||
for i := range preOsakaTxs {
|
||||
preOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 2, 0, keys[i], types.BlobSidecarVersion0)
|
||||
}
|
||||
for i := range postOsakaTxs {
|
||||
if i == 0 {
|
||||
// First has a v0 sidecar.
|
||||
postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion0)
|
||||
}
|
||||
postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion1)
|
||||
}
|
||||
statedb.Commit(0, true, false)
|
||||
|
||||
// Test plan:
|
||||
// 1) Create a bunch v0 sidecar txs and add to pool before Osaka.
|
||||
// 2) Pass in new Osaka header to activate the conversion thread.
|
||||
// 3) Continue adding both v0 and v1 transactions to the pool.
|
||||
// 4) Verify that as additional blocks come in, transactions involved in the
|
||||
// migration are correctly discarded.
|
||||
|
||||
config := ¶ms.ChainConfig{
|
||||
ChainID: big.NewInt(1),
|
||||
LondonBlock: big.NewInt(0),
|
||||
BerlinBlock: big.NewInt(0),
|
||||
CancunTime: newUint64(0),
|
||||
PragueTime: newUint64(0),
|
||||
OsakaTime: newUint64(1),
|
||||
BlobScheduleConfig: params.DefaultBlobSchedule,
|
||||
}
|
||||
chain := &testBlockChain{
|
||||
config: config,
|
||||
basefee: uint256.NewInt(1050),
|
||||
blobfee: uint256.NewInt(105),
|
||||
statedb: statedb,
|
||||
blocks: make(map[uint64]*types.Block),
|
||||
}
|
||||
|
||||
// Create 3 blocks:
|
||||
// - the current block, before Osaka
|
||||
// - the first block after Osaka
|
||||
// - another post-Osaka block with several transactions in it
|
||||
header0 := chain.CurrentBlock()
|
||||
header0.Time = 0
|
||||
chain.blocks[0] = types.NewBlockWithHeader(header0)
|
||||
|
||||
header1 := chain.CurrentBlock()
|
||||
header1.Number = big.NewInt(1)
|
||||
header1.Time = 1
|
||||
chain.blocks[1] = types.NewBlockWithHeader(header1)
|
||||
|
||||
header2 := chain.CurrentBlock()
|
||||
header2.Time = 2
|
||||
header2.Number = big.NewInt(2)
|
||||
|
||||
// Make a copy of one of the pre-Osaka transactions and convert it to v1 here
|
||||
// so that we can add it to the pool later and ensure a duplicate is not added
|
||||
// by the conversion queue.
|
||||
tx := preOsakaTxs[len(preOsakaTxs)-1]
|
||||
sc := *tx.BlobTxSidecar() // copy sidecar
|
||||
sc.ToV1()
|
||||
tx.WithBlobTxSidecar(&sc)
|
||||
|
||||
block2 := types.NewBlockWithHeader(header2).WithBody(types.Body{Transactions: append(postOsakaTxs, tx)})
|
||||
chain.blocks[2] = block2
|
||||
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, header0, newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
|
||||
errs := pool.Add(preOsakaTxs, true)
|
||||
for i, err := range errs {
|
||||
if err != nil {
|
||||
t.Errorf("failed to insert blob tx from %s: %s", addrs[i], errs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Kick off migration.
|
||||
pool.Reset(header0, header1)
|
||||
|
||||
// Add the v0 sidecar tx, but don't block so we can keep doing other stuff
|
||||
// while it converts the sidecar.
|
||||
addDone := make(chan struct{})
|
||||
go func() {
|
||||
pool.Add(types.Transactions{postOsakaTxs[0]}, false)
|
||||
close(addDone)
|
||||
}()
|
||||
|
||||
// Add the post-Osaka v1 sidecar txs.
|
||||
errs = pool.Add(postOsakaTxs[1:], false)
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
t.Fatalf("expected tx add to succeed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the first tx's conversion to complete, then check that all
|
||||
// transactions added after Osaka can be accounted for in the pool.
|
||||
<-addDone
|
||||
pending := pool.Pending(txpool.PendingFilter{BlobTxs: true, BlobVersion: types.BlobSidecarVersion1})
|
||||
for _, tx := range postOsakaTxs {
|
||||
from, _ := pool.signer.Sender(tx)
|
||||
if len(pending[from]) != 1 || pending[from][0].Hash != tx.Hash() {
|
||||
t.Fatalf("expected post-Osaka txs to be pending")
|
||||
}
|
||||
}
|
||||
|
||||
// Now update the pool with the next block. This should cause the pool to
|
||||
// clear out the post-Osaka txs since they were included in block 2. Since the
|
||||
// test blockchain doesn't manage nonces, we'll just do that manually before
|
||||
// the reset is called. Don't forget about the pre-Osaka transaction we also
|
||||
// added to block 2!
|
||||
for i := range postOsakaTxs {
|
||||
statedb.SetNonce(addrs[len(preOsakaTxs)+i], 1, tracing.NonceChangeEoACall)
|
||||
}
|
||||
statedb.SetNonce(addrs[len(preOsakaTxs)-1], 1, tracing.NonceChangeEoACall)
|
||||
pool.Reset(header1, block2.Header())
|
||||
|
||||
// Now verify no post-Osaka transactions are tracked by the pool.
|
||||
for i, tx := range postOsakaTxs {
|
||||
if pool.Get(tx.Hash()) != nil {
|
||||
t.Fatalf("expected txs added post-osaka to have been placed in limbo due to inclusion in a block: index %d, hash %s", i, tx.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the pool migration to complete.
|
||||
<-pool.cQueue.anyBillyConversionDone
|
||||
|
||||
// Verify all transactions in the pool were converted and verify the
|
||||
// subsequent cell proofs.
|
||||
count, _ := pool.Stats()
|
||||
if count != len(preOsakaTxs)-1 {
|
||||
t.Errorf("expected pending count to match initial tx count: pending=%d, expected=%d", count, len(preOsakaTxs)-1)
|
||||
}
|
||||
for addr, acc := range pool.index {
|
||||
for _, m := range acc {
|
||||
if m.version != types.BlobSidecarVersion1 {
|
||||
t.Errorf("expected sidecar to have been converted: from %s, hash %s", addr, m.hash)
|
||||
}
|
||||
tx := pool.Get(m.hash)
|
||||
if tx == nil {
|
||||
t.Errorf("failed to get tx by hash: %s", m.hash)
|
||||
}
|
||||
sc := tx.BlobTxSidecar()
|
||||
if err := kzg4844.VerifyCellProofs(sc.Blobs, sc.Commitments, sc.Proofs); err != nil {
|
||||
t.Errorf("failed to verify cell proofs for tx %s after conversion: %s", m.hash, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
verifyPoolInternals(t, pool)
|
||||
|
||||
// Launch conversion a second time.
|
||||
// This is just a sanity check to ensure we can handle it.
|
||||
pool.Reset(header0, header1)
|
||||
|
||||
pool.Close()
|
||||
}
|
||||
|
||||
// fakeBilly is a billy.Database implementation which just drops data on the floor.
|
||||
type fakeBilly struct {
|
||||
billy.Database
|
||||
|
|
@ -2180,3 +2359,5 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newUint64(val uint64) *uint64 { return &val }
|
||||
|
|
|
|||
|
|
@ -32,8 +32,8 @@ import (
|
|||
// with 6 blobs each) would consume approximately 1.5GB of memory.
|
||||
const maxPendingConversionTasks = 2048
|
||||
|
||||
// cTask represents a conversion task with an attached legacy blob transaction.
|
||||
type cTask struct {
|
||||
// txConvert represents a conversion task with an attached legacy blob transaction.
|
||||
type txConvert struct {
|
||||
tx *types.Transaction // Legacy blob transaction
|
||||
done chan error // Channel for signaling back if the conversion succeeds
|
||||
}
|
||||
|
|
@ -43,17 +43,27 @@ type cTask struct {
|
|||
// it is performed in the background by a single thread, ensuring the main Geth
|
||||
// process is not overloaded.
|
||||
type conversionQueue struct {
|
||||
tasks chan *cTask
|
||||
quit chan struct{}
|
||||
closed chan struct{}
|
||||
tasks chan *txConvert
|
||||
startBilly chan func()
|
||||
quit chan struct{}
|
||||
closed chan struct{}
|
||||
|
||||
billyQueue []func()
|
||||
billyTaskDone chan struct{}
|
||||
|
||||
// This channel will be closed when the first billy conversion finishes.
|
||||
// It's added for unit tests to synchronize with the conversion progress.
|
||||
anyBillyConversionDone chan struct{}
|
||||
}
|
||||
|
||||
// newConversionQueue constructs the conversion queue.
|
||||
func newConversionQueue() *conversionQueue {
|
||||
q := &conversionQueue{
|
||||
tasks: make(chan *cTask),
|
||||
quit: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
tasks: make(chan *txConvert),
|
||||
startBilly: make(chan func()),
|
||||
quit: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
anyBillyConversionDone: make(chan struct{}),
|
||||
}
|
||||
go q.loop()
|
||||
return q
|
||||
|
|
@ -66,13 +76,23 @@ func newConversionQueue() *conversionQueue {
|
|||
func (q *conversionQueue) convert(tx *types.Transaction) error {
|
||||
done := make(chan error, 1)
|
||||
select {
|
||||
case q.tasks <- &cTask{tx: tx, done: done}:
|
||||
case q.tasks <- &txConvert{tx: tx, done: done}:
|
||||
return <-done
|
||||
case <-q.closed:
|
||||
return errors.New("conversion queue closed")
|
||||
}
|
||||
}
|
||||
|
||||
// launchBillyConversion starts a conversion task in the background.
|
||||
func (q *conversionQueue) launchBillyConversion(fn func()) error {
|
||||
select {
|
||||
case q.startBilly <- fn:
|
||||
return nil
|
||||
case <-q.closed:
|
||||
return errors.New("conversion queue closed")
|
||||
}
|
||||
}
|
||||
|
||||
// close terminates the conversion queue.
|
||||
func (q *conversionQueue) close() {
|
||||
select {
|
||||
|
|
@ -85,7 +105,7 @@ func (q *conversionQueue) close() {
|
|||
}
|
||||
|
||||
// run converts a batch of legacy blob txs to the new cell proof format.
|
||||
func (q *conversionQueue) run(tasks []*cTask, done chan struct{}, interrupt *atomic.Int32) {
|
||||
func (q *conversionQueue) run(tasks []*txConvert, done chan struct{}, interrupt *atomic.Int32) {
|
||||
defer close(done)
|
||||
|
||||
for _, t := range tasks {
|
||||
|
|
@ -116,37 +136,68 @@ func (q *conversionQueue) loop() {
|
|||
// The pending tasks for sidecar conversion. We assume the number of legacy
|
||||
// blob transactions requiring conversion will not be excessive. However,
|
||||
// a hard cap is applied as a protective measure.
|
||||
cTasks []*cTask
|
||||
txTasks []*txConvert
|
||||
|
||||
firstBilly = true
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case t := <-q.tasks:
|
||||
if len(cTasks) >= maxPendingConversionTasks {
|
||||
if len(txTasks) >= maxPendingConversionTasks {
|
||||
t.done <- errors.New("conversion queue is overloaded")
|
||||
continue
|
||||
}
|
||||
cTasks = append(cTasks, t)
|
||||
txTasks = append(txTasks, t)
|
||||
|
||||
// Launch the background conversion thread if it's idle
|
||||
if done == nil {
|
||||
done, interrupt = make(chan struct{}), new(atomic.Int32)
|
||||
|
||||
tasks := slices.Clone(cTasks)
|
||||
cTasks = cTasks[:0]
|
||||
tasks := slices.Clone(txTasks)
|
||||
txTasks = txTasks[:0]
|
||||
go q.run(tasks, done, interrupt)
|
||||
}
|
||||
|
||||
case <-done:
|
||||
done, interrupt = nil, nil
|
||||
|
||||
case <-q.quit:
|
||||
if done == nil {
|
||||
return
|
||||
case fn := <-q.startBilly:
|
||||
q.billyQueue = append(q.billyQueue, fn)
|
||||
q.runNextBillyTask()
|
||||
|
||||
case <-q.billyTaskDone:
|
||||
if firstBilly {
|
||||
close(q.anyBillyConversionDone)
|
||||
firstBilly = false
|
||||
}
|
||||
q.runNextBillyTask()
|
||||
|
||||
case <-q.quit:
|
||||
if done != nil {
|
||||
log.Debug("Waiting for blob proof conversion to exit")
|
||||
interrupt.Store(1)
|
||||
<-done
|
||||
}
|
||||
if q.billyTaskDone != nil {
|
||||
log.Debug("Waiting for blobpool billy conversion to exit")
|
||||
<-q.billyTaskDone
|
||||
}
|
||||
interrupt.Store(1)
|
||||
log.Debug("Waiting for blob proof conversion to exit")
|
||||
<-done
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *conversionQueue) runNextBillyTask() {
|
||||
if len(q.billyQueue) == 0 {
|
||||
q.billyTaskDone = nil
|
||||
return
|
||||
}
|
||||
|
||||
fn := q.billyQueue[0]
|
||||
q.billyQueue = append(q.billyQueue[:0], q.billyQueue[1:]...)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() { defer close(done); fn() }()
|
||||
q.billyTaskDone = done
|
||||
}
|
||||
|
|
|
|||
|
|
@ -110,3 +110,13 @@ func (l *lookup) untrack(tx *blobTxMeta) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update updates the transaction index. It should only be used in the conversion.
|
||||
func (l *lookup) update(hash common.Hash, id uint64, size uint64) bool {
|
||||
meta, exists := l.txIndex[hash]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
meta.id, meta.size = id, size
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue