core/txpool/blobpool: atomic update in limbo via peek/dropStore

Split getAndDrop into peek + dropStore to implement atomic update:
- peek() reads without deleting
- setAndIndex() creates new entry first
- dropStore() removes old entry only on success

On setAndIndex failure, old entry remains untouched - no data loss.
dropStore() deletes from store first to avoid orphaned index entries.

Suggested-by: AgwaB
This commit is contained in:
Mayveskii 2026-04-21 15:51:28 +03:00 committed by cisco
parent 077d83387a
commit f44892c679

View file

@ -191,31 +191,64 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
id, ok := l.index[txhash]
oldID, ok := l.index[txhash]
if !ok {
log.Trace("Limbo cannot update non-tracked blobs", "tx", txhash)
return
}
// If there was no change in the blob's inclusion block, don't mess around
// with heavy database operations.
if _, ok := l.groups[block][id]; ok {
// If the blob is already in the target block, nothing to do.
if _, dup := l.groups[block][oldID]; dup {
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
return
}
// Retrieve the old blobs from the data store and write them back with a new
// block number. IF anything fails, there's not much to do, go on.
item, err := l.getAndDrop(id)
// Peek reads the item without deleting it, preserving the old entry
// if setAndIndex fails below.
item, err := l.peek(oldID)
if err != nil {
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
log.Error("Failed to peek limboed blobs", "tx", txhash, "id", oldID, "err", err)
return
}
// Create new entry at new block number first. On failure, the old entry
// remains untouched - no data loss.
if err := l.setAndIndex(item.Tx, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
log.Error("failed to move limboed blobs, keeping old entry", "id", oldID, "err", err)
return
}
// New entry created successfully, now drop the old one.
if err := l.dropStore(oldID, item); err != nil {
log.Error("failed to drop old limboed blob slot", "id", oldID, "err", err)
}
log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block)
}
// peek retrieves a blob item from the limbo store without deleting it.
// Used by update to read the item before setAndIndex.
func (l *limbo) peek(id uint64) (*limboBlob, error) {
data, err := l.store.Get(id)
if err != nil {
return nil, err
}
item := new(limboBlob)
if err = rlp.DecodeBytes(data, item); err != nil {
return nil, err
}
return item, nil
}
// dropStore deletes a blob item from the store and indices.
// Used by update to remove the old entry after successful setAndIndex.
func (l *limbo) dropStore(id uint64, item *limboBlob) error {
if err := l.store.Delete(id); err != nil {
return err
}
delete(l.index, item.TxHash)
delete(l.groups[item.Block], id)
if len(l.groups[item.Block]) == 0 {
delete(l.groups, item.Block)
}
return nil
}
// getAndDrop retrieves a blob item from the limbo store and deletes it both from
// the store and indices.
func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
@ -227,12 +260,7 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
if err = rlp.DecodeBytes(data, item); err != nil {
return nil, err
}
delete(l.index, item.TxHash)
delete(l.groups[item.Block], id)
if len(l.groups[item.Block]) == 0 {
delete(l.groups, item.Block)
}
if err := l.store.Delete(id); err != nil {
if err := l.dropStore(id, item); err != nil {
return nil, err
}
return item, nil