mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-03 14:52:55 +00:00
upgrade blobpool
This commit is contained in:
parent
7ad2e1102f
commit
524ba61b04
2 changed files with 168 additions and 90 deletions
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -113,6 +114,64 @@ type blobTxMeta struct {
|
||||||
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type blobTxMetaMarshal struct {
|
||||||
|
Hash common.Hash
|
||||||
|
Vhashes []common.Hash
|
||||||
|
|
||||||
|
ID uint64
|
||||||
|
StorageSize uint32
|
||||||
|
Size uint64
|
||||||
|
|
||||||
|
Nonce uint64
|
||||||
|
CostCap *uint256.Int
|
||||||
|
ExecTipCap *uint256.Int
|
||||||
|
ExecFeeCap *uint256.Int
|
||||||
|
BlobFeeCap *uint256.Int
|
||||||
|
ExecGas uint64
|
||||||
|
BlobGas uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodeRLP encodes the blobTxMeta into the given writer.
|
||||||
|
func (b *blobTxMeta) EncodeRLP(w io.Writer) error {
|
||||||
|
return rlp.Encode(w, blobTxMetaMarshal{
|
||||||
|
Hash: b.hash,
|
||||||
|
Vhashes: b.vhashes,
|
||||||
|
ID: b.id,
|
||||||
|
StorageSize: b.storageSize,
|
||||||
|
Size: b.size,
|
||||||
|
Nonce: b.nonce,
|
||||||
|
CostCap: b.costCap,
|
||||||
|
ExecTipCap: b.execTipCap,
|
||||||
|
ExecFeeCap: b.execFeeCap,
|
||||||
|
BlobFeeCap: b.blobFeeCap,
|
||||||
|
ExecGas: b.execGas,
|
||||||
|
BlobGas: b.blobGas,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// DecodeRLP decodes the blobTxMeta from the given stream.
|
||||||
|
func (b *blobTxMeta) DecodeRLP(s *rlp.Stream) error {
|
||||||
|
var meta blobTxMetaMarshal
|
||||||
|
if err := s.Decode(&meta); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.hash = meta.Hash
|
||||||
|
b.vhashes = meta.Vhashes
|
||||||
|
b.id = meta.ID
|
||||||
|
b.storageSize = meta.StorageSize
|
||||||
|
b.size = meta.Size
|
||||||
|
b.nonce = meta.Nonce
|
||||||
|
b.costCap = meta.CostCap
|
||||||
|
b.execTipCap = meta.ExecTipCap
|
||||||
|
b.execFeeCap = meta.ExecFeeCap
|
||||||
|
b.blobFeeCap = meta.BlobFeeCap
|
||||||
|
b.execGas = meta.ExecGas
|
||||||
|
b.blobGas = meta.BlobGas
|
||||||
|
b.basefeeJumps = dynamicFeeJumps(meta.ExecFeeCap)
|
||||||
|
b.blobfeeJumps = dynamicFeeJumps(meta.BlobFeeCap)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
||||||
// and assembles a helper struct to track in memory.
|
// and assembles a helper struct to track in memory.
|
||||||
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta {
|
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta {
|
||||||
|
|
@ -373,6 +432,16 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pool initialized, attach the blob limbo to it to track blobs included
|
||||||
|
// recently but not yet finalized
|
||||||
|
limbo, err := newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
|
||||||
|
if err != nil {
|
||||||
|
p.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.limbo = limbo
|
||||||
|
|
||||||
// Initialize the state with head block, or fallback to empty one in
|
// Initialize the state with head block, or fallback to empty one in
|
||||||
// case the head state is not available (might occur when node is not
|
// case the head state is not available (might occur when node is not
|
||||||
// fully synced).
|
// fully synced).
|
||||||
|
|
@ -399,6 +468,11 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
||||||
}
|
}
|
||||||
p.store = store
|
p.store = store
|
||||||
|
|
||||||
|
// If still exit blob txs in limbo, try to repair them. This is needed.
|
||||||
|
if err = p.limbo.tryRepair(p.store); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if len(fails) > 0 {
|
if len(fails) > 0 {
|
||||||
log.Warn("Dropping invalidated blob transactions", "ids", fails)
|
log.Warn("Dropping invalidated blob transactions", "ids", fails)
|
||||||
dropInvalidMeter.Mark(int64(len(fails)))
|
dropInvalidMeter.Mark(int64(len(fails)))
|
||||||
|
|
@ -423,14 +497,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
||||||
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head))
|
blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(p.chain.Config(), p.head))
|
||||||
}
|
}
|
||||||
p.evict = newPriceHeap(basefee, blobfee, p.index)
|
p.evict = newPriceHeap(basefee, blobfee, p.index)
|
||||||
|
|
||||||
// Pool initialized, attach the blob limbo to it to track blobs included
|
|
||||||
// recently but not yet finalized
|
|
||||||
p.limbo, err = newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config()))
|
|
||||||
if err != nil {
|
|
||||||
p.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Set the configured gas tip, triggering a filtering of anything just loaded
|
// Set the configured gas tip, triggering a filtering of anything just loaded
|
||||||
basefeeGauge.Update(int64(basefee.Uint64()))
|
basefeeGauge.Update(int64(basefee.Uint64()))
|
||||||
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
||||||
|
|
@ -486,6 +552,10 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := newBlobTxMeta(id, tx.Size(), size, tx)
|
meta := newBlobTxMeta(id, tx.Size(), size, tx)
|
||||||
|
if p.limbo.existsAndSet(meta) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if p.lookup.exists(meta.hash) {
|
if p.lookup.exists(meta.hash) {
|
||||||
// This path is only possible after a crash, where deleted items are not
|
// This path is only possible after a crash, where deleted items are not
|
||||||
// removed via the normal shutdown-startup procedure and thus may get
|
// removed via the normal shutdown-startup procedure and thus may get
|
||||||
|
|
@ -547,9 +617,11 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
p.stored -= uint64(txs[i].storageSize)
|
p.stored -= uint64(txs[i].storageSize)
|
||||||
p.lookup.untrack(txs[i])
|
p.lookup.untrack(txs[i])
|
||||||
|
|
||||||
// Included transactions blobs need to be moved to the limbo
|
|
||||||
if filled && inclusions != nil {
|
if filled && inclusions != nil {
|
||||||
p.offload(addr, txs[i].nonce, txs[i].id, inclusions)
|
// If the tx metadata is recorded by limbo, we don't need to delete the tx from db.
|
||||||
|
if p.offload(addr, txs[i], inclusions) {
|
||||||
|
ids = ids[:len(ids)-1]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(p.index, addr)
|
delete(p.index, addr)
|
||||||
|
|
@ -590,7 +662,10 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
|
|
||||||
// Included transactions blobs need to be moved to the limbo
|
// Included transactions blobs need to be moved to the limbo
|
||||||
if inclusions != nil {
|
if inclusions != nil {
|
||||||
p.offload(addr, txs[0].nonce, txs[0].id, inclusions)
|
// If the tx metadata is recorded by limbo, we don't need to delete the tx from db.
|
||||||
|
if p.offload(addr, txs[0], inclusions) {
|
||||||
|
ids = ids[:len(ids)-1]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
txs = txs[1:]
|
txs = txs[1:]
|
||||||
}
|
}
|
||||||
|
|
@ -769,26 +844,18 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
// any of it since there's no clear error case. Some errors may be due to coding
|
// any of it since there's no clear error case. Some errors may be due to coding
|
||||||
// issues, others caused by signers mining MEV stuff or swapping transactions. In
|
// issues, others caused by signers mining MEV stuff or swapping transactions. In
|
||||||
// all cases, the pool needs to continue operating.
|
// all cases, the pool needs to continue operating.
|
||||||
func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) {
|
// Return bool type to let caller know whether the tx is offloaded to limbo successfully.
|
||||||
data, err := p.store.Get(id)
|
func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) bool {
|
||||||
if err != nil {
|
block, ok := inclusions[meta.hash]
|
||||||
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var tx types.Transaction
|
|
||||||
if err = rlp.DecodeBytes(data, &tx); err != nil {
|
|
||||||
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
block, ok := inclusions[tx.Hash()]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
|
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
if err := p.limbo.push(&tx, block); err != nil {
|
if err := p.limbo.push(meta, block); err != nil {
|
||||||
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
|
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
|
||||||
|
|
@ -832,7 +899,11 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
|
||||||
}
|
}
|
||||||
// Flush out any blobs from limbo that are older than the latest finality
|
// Flush out any blobs from limbo that are older than the latest finality
|
||||||
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
|
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
|
||||||
p.limbo.finalize(p.chain.CurrentFinalBlock())
|
p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) {
|
||||||
|
if err := p.store.Delete(id); err != nil {
|
||||||
|
log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
// Reset the price heap for the new set of basefee/blobfee pairs
|
// Reset the price heap for the new set of basefee/blobfee pairs
|
||||||
var (
|
var (
|
||||||
|
|
@ -986,31 +1057,14 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
|
||||||
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
||||||
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
||||||
// add the transaction back into the pool as it is not mineable.
|
// add the transaction back into the pool as it is not mineable.
|
||||||
tx, err := p.limbo.pull(txhash)
|
meta, err := p.limbo.pull(txhash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// TODO: seems like an easy optimization here would be getting the serialized tx
|
|
||||||
// from limbo instead of re-serializing it here.
|
|
||||||
|
|
||||||
// Serialize the transaction back into the primary datastore.
|
|
||||||
blob, err := rlp.EncodeToBytes(tx)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
id, err := p.store.Put(blob)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the indices and metrics
|
|
||||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
|
||||||
if _, ok := p.index[addr]; !ok {
|
if _, ok := p.index[addr]; !ok {
|
||||||
if err := p.reserver.Hold(addr); err != nil {
|
if err := p.reserver.Hold(addr); err != nil {
|
||||||
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
|
log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.index[addr] = []*blobTxMeta{meta}
|
p.index[addr] = []*blobTxMeta{meta}
|
||||||
|
|
|
||||||
|
|
@ -30,10 +30,12 @@ import (
|
||||||
// to which it belongs as well as the block number in which it was included for
|
// to which it belongs as well as the block number in which it was included for
|
||||||
// finality eviction.
|
// finality eviction.
|
||||||
type limboBlob struct {
|
type limboBlob struct {
|
||||||
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
||||||
Block uint64 // Block in which the blob transaction was included
|
Block uint64 // Block in which the blob transaction was included
|
||||||
Tx *types.Transaction
|
Tx *types.Transaction `rlp:"omitempty"`
|
||||||
id uint64 // the billy id of transction
|
// Optional blob transaction metadata.
|
||||||
|
TxMeta *blobTxMeta `rlp:"omitempty"`
|
||||||
|
id uint64 // the billy id of limboBlob
|
||||||
}
|
}
|
||||||
|
|
||||||
// limbo is a light, indexed database to temporarily store recently included
|
// limbo is a light, indexed database to temporarily store recently included
|
||||||
|
|
@ -100,14 +102,53 @@ func (l *limbo) parseBlob(id uint64, data []byte) error {
|
||||||
return errors.New("duplicate blob")
|
return errors.New("duplicate blob")
|
||||||
}
|
}
|
||||||
// Delete tx and set id.
|
// Delete tx and set id.
|
||||||
item.id, item.Tx = id, nil
|
item.id = id
|
||||||
l.index[item.TxHash] = item
|
l.index[item.TxHash] = item
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// existsAndSet checks whether a blob transaction is already tracked by the limbo.
|
||||||
|
func (l *limbo) existsAndSet(meta *blobTxMeta) bool {
|
||||||
|
if item := l.index[meta.hash]; item != nil {
|
||||||
|
if item.Tx != nil {
|
||||||
|
item.TxMeta, item.Tx = meta, nil
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// tryRepair attempts to repair the limbo by re-encoding all transactions that are
|
||||||
|
// currently in the limbo, but not yet stored in the database. This is useful
|
||||||
|
// when the limbo is created from a previous state, and the transactions are not
|
||||||
|
// yet stored in the database. The method will re-encode all transactions and
|
||||||
|
// store them in the database, updating the in-memory indices at the same time.
|
||||||
|
func (l *limbo) tryRepair(store billy.Database) error {
|
||||||
|
for _, item := range l.index {
|
||||||
|
if item.Tx == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tx := item.Tx
|
||||||
|
// Transaction permitted into the pool from a nonce and cost perspective,
|
||||||
|
// insert it into the database and update the indices
|
||||||
|
blob, err := rlp.EncodeToBytes(tx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
id, err := store.Put(blob)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
meta := newBlobTxMeta(id, tx.Size(), store.Size(id), tx)
|
||||||
|
item.TxMeta, item.Tx = meta, nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// finalize evicts all blobs belonging to a recently finalized block or older.
|
// finalize evicts all blobs belonging to a recently finalized block or older.
|
||||||
func (l *limbo) finalize(final *types.Header) {
|
func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) {
|
||||||
// Just in case there's no final block yet (network not yet merged, weird
|
// Just in case there's no final block yet (network not yet merged, weird
|
||||||
// restart, sethead, etc), fail gracefully.
|
// restart, sethead, etc), fail gracefully.
|
||||||
if final == nil {
|
if final == nil {
|
||||||
|
|
@ -122,20 +163,24 @@ func (l *limbo) finalize(final *types.Header) {
|
||||||
log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err)
|
log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err)
|
||||||
}
|
}
|
||||||
delete(l.index, item.TxHash)
|
delete(l.index, item.TxHash)
|
||||||
|
if fn != nil {
|
||||||
|
meta := item.TxMeta
|
||||||
|
fn(meta.id, meta.hash)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// push stores a new blob transaction into the limbo, waiting until finality for
|
// push stores a new blob transaction into the limbo, waiting until finality for
|
||||||
// it to be automatically evicted.
|
// it to be automatically evicted.
|
||||||
func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
func (l *limbo) push(meta *blobTxMeta, block uint64) error {
|
||||||
// If the blobs are already tracked by the limbo, consider it a programming
|
// If the blobs are already tracked by the limbo, consider it a programming
|
||||||
// error. There's not much to do against it, but be loud.
|
// error. There's not much to do against it, but be loud.
|
||||||
if _, ok := l.index[tx.Hash()]; ok {
|
if _, ok := l.index[meta.hash]; ok {
|
||||||
log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash())
|
log.Error("Limbo cannot push already tracked blobs", "tx", meta.hash)
|
||||||
return errors.New("already tracked blob transaction")
|
return errors.New("already tracked blob transaction")
|
||||||
}
|
}
|
||||||
if err := l.setAndIndex(tx, block); err != nil {
|
if err := l.setAndIndex(meta, block); err != nil {
|
||||||
log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err)
|
log.Error("Failed to set and index limboed blobs", "tx", meta.hash, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -144,7 +189,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
||||||
// pull retrieves a previously pushed set of blobs back from the limbo, removing
|
// pull retrieves a previously pushed set of blobs back from the limbo, removing
|
||||||
// it at the same time. This method should be used when a previously included blob
|
// it at the same time. This method should be used when a previously included blob
|
||||||
// transaction gets reorged out.
|
// transaction gets reorged out.
|
||||||
func (l *limbo) pull(txhash common.Hash) (*types.Transaction, error) {
|
func (l *limbo) pull(txhash common.Hash) (*blobTxMeta, error) {
|
||||||
// If the blobs are not tracked by the limbo, there's not much to do. This
|
// If the blobs are not tracked by the limbo, there's not much to do. This
|
||||||
// can happen for example if a blob transaction is mined without pushing it
|
// can happen for example if a blob transaction is mined without pushing it
|
||||||
// into the network first.
|
// into the network first.
|
||||||
|
|
@ -153,12 +198,10 @@ func (l *limbo) pull(txhash common.Hash) (*types.Transaction, error) {
|
||||||
log.Trace("Limbo cannot pull non-tracked blobs", "tx", txhash)
|
log.Trace("Limbo cannot pull non-tracked blobs", "tx", txhash)
|
||||||
return nil, errors.New("unseen blob transaction")
|
return nil, errors.New("unseen blob transaction")
|
||||||
}
|
}
|
||||||
tx, err := l.getAndDrop(item.id)
|
if err := l.store.Delete(item.id); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return tx, nil
|
return item.TxMeta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// update changes the block number under which a blob transaction is tracked. This
|
// update changes the block number under which a blob transaction is tracked. This
|
||||||
|
|
@ -185,45 +228,26 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
|
||||||
}
|
}
|
||||||
// Retrieve the old blobs from the data store and write them back with a new
|
// Retrieve the old blobs from the data store and write them back with a new
|
||||||
// block number. IF anything fails, there's not much to do, go on.
|
// block number. IF anything fails, there's not much to do, go on.
|
||||||
tx, err := l.getAndDrop(item.id)
|
if err := l.store.Delete(item.id); err != nil {
|
||||||
if err != nil {
|
log.Error("Failed to drop old limboed blobs", "tx", txhash, "err", err)
|
||||||
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", item.id, "err", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := l.setAndIndex(tx, block); err != nil {
|
if err := l.setAndIndex(item.TxMeta, block); err != nil {
|
||||||
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
|
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block)
|
log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getAndDrop retrieves a blob item from the limbo store and deletes it both from
|
|
||||||
// the store and indices.
|
|
||||||
func (l *limbo) getAndDrop(id uint64) (*types.Transaction, error) {
|
|
||||||
data, err := l.store.Get(id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
item := new(limboBlob)
|
|
||||||
if err = rlp.DecodeBytes(data, item); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
delete(l.index, item.TxHash)
|
|
||||||
if err := l.store.Delete(id); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return item.Tx, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// setAndIndex assembles a limbo blob database entry and stores it, also updating
|
// setAndIndex assembles a limbo blob database entry and stores it, also updating
|
||||||
// the in-memory indices.
|
// the in-memory indices.
|
||||||
func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error {
|
||||||
txhash := tx.Hash()
|
txhash := meta.hash
|
||||||
item := &limboBlob{
|
item := &limboBlob{
|
||||||
TxHash: txhash,
|
TxHash: txhash,
|
||||||
Block: block,
|
Block: block,
|
||||||
Tx: tx,
|
TxMeta: meta,
|
||||||
|
Tx: nil,
|
||||||
}
|
}
|
||||||
data, err := rlp.EncodeToBytes(item)
|
data, err := rlp.EncodeToBytes(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -234,7 +258,7 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Delete tx and set id.
|
// Delete tx and set id.
|
||||||
item.id, item.Tx = id, nil
|
item.id = id
|
||||||
l.index[txhash] = item
|
l.index[txhash] = item
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue