core/txpool/blobpool: defer replacement delete out of addLocked

This commit is contained in:
agwab 2026-04-27 23:31:39 +09:00
parent a5f41e51af
commit 07757faec0

View file

@ -1590,19 +1590,30 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
waitStart := time.Now() waitStart := time.Now()
p.lock.Lock() p.lock.Lock()
addwaitHist.Update(time.Since(waitStart).Nanoseconds()) addwaitHist.Update(time.Since(waitStart).Nanoseconds())
defer p.lock.Unlock()
defer func(start time.Time) { defer func(start time.Time) {
addtimeHist.Update(time.Since(start).Nanoseconds()) addtimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now()) }(time.Now())
return p.addLocked(tx, true) toDelete, err := p.addLocked(tx, true)
p.lock.Unlock()
for _, id := range toDelete {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "id", id, "err", err)
}
}
if err == nil {
p.updateStorageMetrics()
}
return err
} }
// addLocked inserts a new blob transaction into the pool if it passes validation (both // addLocked inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restrictions). It must be called with the pool lock held. // consensus validity and pool restrictions). It must be called with the pool lock held.
// Only for internal use. // Only for internal use.
func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error) { //
// Returns billy slot ids that the caller must delete after releasing the lock.
func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (toDelete []uint64, err error) {
// Ensure the transaction is valid from all perspectives // Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx); err != nil { if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err) log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
@ -1622,7 +1633,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
p.gapped[from] = append(p.gapped[from], tx) p.gapped[from] = append(p.gapped[from], tx)
p.gappedSource[tx.Hash()] = from p.gappedSource[tx.Hash()] = from
log.Trace("added tx to gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) log.Trace("added tx to gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
return nil return nil, nil
} else { } else {
// if maxGapped is reached, it is better to give time to gapped // if maxGapped is reached, it is better to give time to gapped
// transactions by keeping the old and dropping this one. // transactions by keeping the old and dropping this one.
@ -1639,7 +1650,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
default: default:
addInvalidMeter.Mark(1) addInvalidMeter.Mark(1)
} }
return err return nil, err
} }
// If the address is not yet known, request exclusivity to track the account // If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted // only by this subpool until all transactions are evicted
@ -1647,7 +1658,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
if _, ok := p.index[from]; !ok { if _, ok := p.index[from]; !ok {
if err := p.reserver.Hold(from); err != nil { if err := p.reserver.Hold(from); err != nil {
addNonExclusiveMeter.Mark(1) addNonExclusiveMeter.Mark(1)
return err return nil, err
} }
defer func() { defer func() {
// If the transaction is rejected by some post-validation check, remove // If the transaction is rejected by some post-validation check, remove
@ -1666,11 +1677,11 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
blob, err := rlp.EncodeToBytes(tx) blob, err := rlp.EncodeToBytes(tx)
if err != nil { if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err return nil, err
} }
id, err := p.store.Put(blob) id, err := p.store.Put(blob)
if err != nil { if err != nil {
return err return nil, err
} }
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
@ -1689,10 +1700,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
dropReplacedMeter.Mark(1) dropReplacedMeter.Mark(1)
prev := p.index[from][offset] prev := p.index[from][offset]
if err := p.store.Delete(prev.id); err != nil { toDelete = append(toDelete, prev.id)
// Shitty situation, but try to recover gracefully instead of going boom
log.Error("Failed to delete replaced transaction", "id", prev.id, "err", err)
}
// Update the transaction index // Update the transaction index
p.index[from][offset] = meta p.index[from][offset] = meta
p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap) p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap)
@ -1764,7 +1772,6 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
for p.stored > p.config.Datacap { for p.stored > p.config.Datacap {
p.drop() p.drop()
} }
p.updateStorageMetrics()
addValidMeter.Mark(1) addValidMeter.Mark(1)
@ -1812,7 +1819,9 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
// If we hit the pending range, including the first gap, add it and continue to try to add more. // If we hit the pending range, including the first gap, add it and continue to try to add more.
// We do not recurse here, but continue to loop instead. // We do not recurse here, but continue to loop instead.
// We are under lock, so we can add the transaction directly. // We are under lock, so we can add the transaction directly.
if err := p.addLocked(tx, false); err == nil { subDelete, err := p.addLocked(tx, false)
toDelete = append(toDelete, subDelete...)
if err == nil {
log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
} else { } else {
log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err) log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err)
@ -1825,7 +1834,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
p.gapped[from] = gtxs p.gapped[from] = gtxs
} }
} }
return nil return toDelete, nil
} }
// drop removes the worst transaction from the pool. It is primarily used when a // drop removes the worst transaction from the pool. It is primarily used when a