mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
Merge b57ec4aa7c into 2a45272408
This commit is contained in:
commit
33734557de
2 changed files with 133 additions and 130 deletions
|
|
@ -21,6 +21,7 @@ import (
|
|||
"container/heap"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/big"
|
||||
"os"
|
||||
|
|
@ -135,6 +136,67 @@ type blobTxMeta struct {
|
|||
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
||||
}
|
||||
|
||||
type blobTxMetaMarshal struct {
|
||||
Hash common.Hash
|
||||
Vhashes []common.Hash
|
||||
Version byte
|
||||
|
||||
ID uint64
|
||||
StorageSize uint32
|
||||
Size uint64
|
||||
|
||||
Nonce uint64
|
||||
CostCap *uint256.Int
|
||||
ExecTipCap *uint256.Int
|
||||
ExecFeeCap *uint256.Int
|
||||
BlobFeeCap *uint256.Int
|
||||
ExecGas uint64
|
||||
BlobGas uint64
|
||||
}
|
||||
|
||||
// EncodeRLP encodes the blobTxMeta into the given writer.
|
||||
func (b *blobTxMeta) EncodeRLP(w io.Writer) error {
|
||||
return rlp.Encode(w, &blobTxMetaMarshal{
|
||||
Hash: b.hash,
|
||||
Vhashes: b.vhashes,
|
||||
Version: b.version,
|
||||
ID: b.id,
|
||||
StorageSize: b.storageSize,
|
||||
Size: b.size,
|
||||
Nonce: b.nonce,
|
||||
CostCap: b.costCap,
|
||||
ExecTipCap: b.execTipCap,
|
||||
ExecFeeCap: b.execFeeCap,
|
||||
BlobFeeCap: b.blobFeeCap,
|
||||
ExecGas: b.execGas,
|
||||
BlobGas: b.blobGas,
|
||||
})
|
||||
}
|
||||
|
||||
// DecodeRLP decodes the blobTxMeta from the given stream.
|
||||
func (b *blobTxMeta) DecodeRLP(s *rlp.Stream) error {
|
||||
var meta blobTxMetaMarshal
|
||||
if err := s.Decode(&meta); err != nil {
|
||||
return err
|
||||
}
|
||||
b.hash = meta.Hash
|
||||
b.vhashes = meta.Vhashes
|
||||
b.version = meta.Version
|
||||
b.id = meta.ID
|
||||
b.storageSize = meta.StorageSize
|
||||
b.size = meta.Size
|
||||
b.nonce = meta.Nonce
|
||||
b.costCap = meta.CostCap
|
||||
b.execTipCap = meta.ExecTipCap
|
||||
b.execFeeCap = meta.ExecFeeCap
|
||||
b.blobFeeCap = meta.BlobFeeCap
|
||||
b.execGas = meta.ExecGas
|
||||
b.blobGas = meta.BlobGas
|
||||
b.basefeeJumps = dynamicFeeJumps(meta.ExecFeeCap)
|
||||
b.blobfeeJumps = dynamicFeeJumps(meta.BlobFeeCap)
|
||||
return nil
|
||||
}
|
||||
|
||||
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
||||
// and assembles a helper struct to track in memory.
|
||||
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
|
||||
|
|
@ -477,6 +539,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
|||
p.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the configured gas tip, triggering a filtering of anything just loaded
|
||||
basefeeGauge.Update(int64(basefee.Uint64()))
|
||||
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
||||
|
|
@ -595,7 +658,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
|||
|
||||
// Included transactions blobs need to be moved to the limbo
|
||||
if filled && inclusions != nil {
|
||||
p.offload(addr, txs[i].nonce, txs[i].id, inclusions)
|
||||
p.offload(addr, txs[i], inclusions)
|
||||
}
|
||||
}
|
||||
delete(p.index, addr)
|
||||
|
|
@ -636,7 +699,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
|||
|
||||
// Included transactions blobs need to be moved to the limbo
|
||||
if inclusions != nil {
|
||||
p.offload(addr, txs[0].nonce, txs[0].id, inclusions)
|
||||
p.offload(addr, txs[0], inclusions)
|
||||
}
|
||||
txs = txs[1:]
|
||||
}
|
||||
|
|
@ -815,23 +878,13 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
|||
// any of it since there's no clear error case. Some errors may be due to coding
|
||||
// issues, others caused by signers mining MEV stuff or swapping transactions. In
|
||||
// all cases, the pool needs to continue operating.
|
||||
func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) {
|
||||
data, err := p.store.Get(id)
|
||||
if err != nil {
|
||||
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
var tx types.Transaction
|
||||
if err = rlp.DecodeBytes(data, &tx); err != nil {
|
||||
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
block, ok := inclusions[tx.Hash()]
|
||||
func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) {
|
||||
block, ok := inclusions[meta.hash]
|
||||
if !ok {
|
||||
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
|
||||
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id)
|
||||
return
|
||||
}
|
||||
if err := p.limbo.push(&tx, block); err != nil {
|
||||
if err := p.limbo.push(meta, block); err != nil {
|
||||
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
||||
return
|
||||
}
|
||||
|
|
@ -881,7 +934,12 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
|
|||
}
|
||||
// Flush out any blobs from limbo that are older than the latest finality
|
||||
if p.chain.Config().IsCancun(newHead.Number, newHead.Time) {
|
||||
p.limbo.finalize(p.chain.CurrentFinalBlock())
|
||||
// Delete all limboed transactions up to the finalized block.
|
||||
p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) {
|
||||
if err := p.store.Delete(id); err != nil {
|
||||
log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
// Reset the price heap for the new set of basefee/blobfee pairs
|
||||
var (
|
||||
|
|
@ -1035,46 +1093,14 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
|
|||
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
||||
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
||||
// add the transaction back into the pool as it is not mineable.
|
||||
tx, err := p.limbo.pull(txhash)
|
||||
meta, err := p.limbo.pull(txhash)
|
||||
if err != nil {
|
||||
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
||||
return err
|
||||
}
|
||||
// TODO: seems like an easy optimization here would be getting the serialized tx
|
||||
// from limbo instead of re-serializing it here.
|
||||
|
||||
// Converts reorged-out legacy blob transactions to the new format to prevent
|
||||
// them from becoming stuck in the pool until eviction.
|
||||
//
|
||||
// Performance note: Conversion takes ~140ms (Mac M1 Pro). Since a maximum of
|
||||
// 9 legacy blob transactions are allowed in a block pre-Osaka, an adversary
|
||||
// could theoretically halt a Geth node for ~1.2s by reorging per block. However,
|
||||
// this attack is financially inefficient to execute.
|
||||
head := p.head.Load()
|
||||
if p.chain.Config().IsOsaka(head.Number, head.Time) && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
|
||||
if err := tx.BlobTxSidecar().ToV1(); err != nil {
|
||||
log.Error("Failed to convert the legacy sidecar", "err", err)
|
||||
return err
|
||||
}
|
||||
log.Info("Legacy blob transaction is reorged", "hash", tx.Hash())
|
||||
}
|
||||
// Serialize the transaction back into the primary datastore.
|
||||
blob, err := rlp.EncodeToBytes(tx)
|
||||
if err != nil {
|
||||
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
||||
return err
|
||||
}
|
||||
id, err := p.store.Put(blob)
|
||||
if err != nil {
|
||||
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the indices and metrics
|
||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
||||
if _, ok := p.index[addr]; !ok {
|
||||
if err := p.reserver.Hold(addr); err != nil {
|
||||
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
|
||||
log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err)
|
||||
return err
|
||||
}
|
||||
p.index[addr] = []*blobTxMeta{meta}
|
||||
|
|
|
|||
|
|
@ -31,37 +31,30 @@ import (
|
|||
// to which it belongs as well as the block number in which it was included for
|
||||
// finality eviction.
|
||||
type limboBlob struct {
|
||||
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
||||
Block uint64 // Block in which the blob transaction was included
|
||||
Tx *types.Transaction
|
||||
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
||||
Block uint64 // Block in which the blob transaction was included
|
||||
Tx *types.Transaction `rlp:"nil"` // Optional full blob transaction (old storage style)
|
||||
TxMeta *blobTxMeta // the blob transaction metadata.
|
||||
id uint64 // the billy id of limboBlob
|
||||
}
|
||||
|
||||
// limbo is a light, indexed database to temporarily store recently included
|
||||
// blobs until they are finalized. The purpose is to support small reorgs, which
|
||||
// would require pulling back up old blobs (which aren't part of the chain).
|
||||
//
|
||||
// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without?
|
||||
type limbo struct {
|
||||
store billy.Database // Persistent data store for limboed blobs
|
||||
|
||||
index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids
|
||||
groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks
|
||||
store billy.Database // Persistent data store for limboed blobs
|
||||
index map[common.Hash]*limboBlob // Mappings from tx hashes to datastore ids
|
||||
}
|
||||
|
||||
// newLimbo opens and indexes a set of limboed blob transactions.
|
||||
func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
|
||||
l := &limbo{
|
||||
index: make(map[common.Hash]uint64),
|
||||
groups: make(map[uint64]map[uint64]common.Hash),
|
||||
index: make(map[common.Hash]*limboBlob),
|
||||
}
|
||||
|
||||
// Create new slotter for pre-Osaka blob configuration.
|
||||
slotter := newSlotter(params.BlobTxMaxBlobs)
|
||||
|
||||
// See if we need to migrate the limbo after fusaka.
|
||||
slotter, err := tryMigrate(config, slotter, datadir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// The limbo won't store full blobs, just store the metadata, so use a fixed size 1KB bytes is big enough.
|
||||
slotter := func() (size uint32, done bool) {
|
||||
return 1024, true
|
||||
}
|
||||
|
||||
// Index all limboed blobs on disk and delete anything unprocessable
|
||||
|
|
@ -86,6 +79,7 @@ func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return l, nil
|
||||
}
|
||||
|
||||
|
|
@ -112,49 +106,45 @@ func (l *limbo) parseBlob(id uint64, data []byte) error {
|
|||
log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id)
|
||||
return errors.New("duplicate blob")
|
||||
}
|
||||
l.index[item.TxHash] = id
|
||||
|
||||
if _, ok := l.groups[item.Block]; !ok {
|
||||
l.groups[item.Block] = make(map[uint64]common.Hash)
|
||||
}
|
||||
l.groups[item.Block][id] = item.TxHash
|
||||
item.id = id
|
||||
l.index[item.TxHash] = item
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// finalize evicts all blobs belonging to a recently finalized block or older.
|
||||
func (l *limbo) finalize(final *types.Header) {
|
||||
func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) {
|
||||
// Just in case there's no final block yet (network not yet merged, weird
|
||||
// restart, sethead, etc), fail gracefully.
|
||||
if final == nil {
|
||||
log.Warn("Nil finalized block cannot evict old blobs")
|
||||
return
|
||||
}
|
||||
for block, ids := range l.groups {
|
||||
if block > final.Number.Uint64() {
|
||||
for _, item := range l.index {
|
||||
if item.Block > final.Number.Uint64() {
|
||||
continue
|
||||
}
|
||||
for id, owner := range ids {
|
||||
if err := l.store.Delete(id); err != nil {
|
||||
log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err)
|
||||
}
|
||||
delete(l.index, owner)
|
||||
if err := l.drop(item.TxHash); err != nil {
|
||||
log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err)
|
||||
}
|
||||
if fn != nil {
|
||||
meta := item.TxMeta
|
||||
fn(meta.id, meta.hash)
|
||||
}
|
||||
delete(l.groups, block)
|
||||
}
|
||||
}
|
||||
|
||||
// push stores a new blob transaction into the limbo, waiting until finality for
|
||||
// it to be automatically evicted.
|
||||
func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
||||
func (l *limbo) push(meta *blobTxMeta, block uint64) error {
|
||||
// If the blobs are already tracked by the limbo, consider it a programming
|
||||
// error. There's not much to do against it, but be loud.
|
||||
if _, ok := l.index[tx.Hash()]; ok {
|
||||
log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash())
|
||||
if _, ok := l.index[meta.hash]; ok {
|
||||
log.Error("Limbo cannot push already tracked blobs", "tx", meta.hash)
|
||||
return errors.New("already tracked blob transaction")
|
||||
}
|
||||
if err := l.setAndIndex(tx, block); err != nil {
|
||||
log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err)
|
||||
if err := l.setAndIndex(meta, block); err != nil {
|
||||
log.Error("Failed to set and index limboed blobs", "tx", meta.hash, "err", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -163,21 +153,19 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
|||
// pull retrieves a previously pushed set of blobs back from the limbo, removing
|
||||
// it at the same time. This method should be used when a previously included blob
|
||||
// transaction gets reorged out.
|
||||
func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
|
||||
func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) {
|
||||
// If the blobs are not tracked by the limbo, there's not much to do. This
|
||||
// can happen for example if a blob transaction is mined without pushing it
|
||||
// into the network first.
|
||||
id, ok := l.index[tx]
|
||||
item, ok := l.index[tx]
|
||||
if !ok {
|
||||
log.Trace("Limbo cannot pull non-tracked blobs", "tx", tx)
|
||||
return nil, errors.New("unseen blob transaction")
|
||||
}
|
||||
item, err := l.getAndDrop(id)
|
||||
if err != nil {
|
||||
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
|
||||
if err := l.drop(item.TxHash); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return item.Tx, nil
|
||||
return item.TxMeta, nil
|
||||
}
|
||||
|
||||
// update changes the block number under which a blob transaction is tracked. This
|
||||
|
|
@ -191,61 +179,51 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
|
|||
// If the blobs are not tracked by the limbo, there's not much to do. This
|
||||
// can happen for example if a blob transaction is mined without pushing it
|
||||
// into the network first.
|
||||
id, ok := l.index[txhash]
|
||||
item, ok := l.index[txhash]
|
||||
if !ok {
|
||||
log.Trace("Limbo cannot update non-tracked blobs", "tx", txhash)
|
||||
return
|
||||
}
|
||||
// If there was no change in the blob's inclusion block, don't mess around
|
||||
// with heavy database operations.
|
||||
if _, ok := l.groups[block][id]; ok {
|
||||
if item.Block == block {
|
||||
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
|
||||
return
|
||||
}
|
||||
// Retrieve the old blobs from the data store and write them back with a new
|
||||
// block number. IF anything fails, there's not much to do, go on.
|
||||
item, err := l.getAndDrop(id)
|
||||
if err != nil {
|
||||
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
|
||||
if err := l.drop(txhash); err != nil {
|
||||
log.Error("Failed to drop old limboed metadata", "tx", txhash, "err", err)
|
||||
return
|
||||
}
|
||||
if err := l.setAndIndex(item.Tx, block); err != nil {
|
||||
if err := l.setAndIndex(item.TxMeta, block); err != nil {
|
||||
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
|
||||
return
|
||||
}
|
||||
log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block)
|
||||
}
|
||||
|
||||
// getAndDrop retrieves a blob item from the limbo store and deletes it both from
|
||||
// the store and indices.
|
||||
func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
|
||||
data, err := l.store.Get(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// drop removes the blob metadata from the limbo.
|
||||
func (l *limbo) drop(txhash common.Hash) error {
|
||||
if item, ok := l.index[txhash]; ok {
|
||||
// Remove the blob metadata entry from the limbo store. If anything fails,
|
||||
// there's not much to do besides logging the error and returning.
|
||||
if err := l.store.Delete(item.id); err != nil {
|
||||
log.Error("Failed to drop old limboed blobs", "tx", txhash, "err", err)
|
||||
return err
|
||||
}
|
||||
delete(l.index, txhash)
|
||||
}
|
||||
item := new(limboBlob)
|
||||
if err = rlp.DecodeBytes(data, item); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(l.index, item.TxHash)
|
||||
delete(l.groups[item.Block], id)
|
||||
if len(l.groups[item.Block]) == 0 {
|
||||
delete(l.groups, item.Block)
|
||||
}
|
||||
if err := l.store.Delete(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return item, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// setAndIndex assembles a limbo blob database entry and stores it, also updating
|
||||
// the in-memory indices.
|
||||
func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
||||
txhash := tx.Hash()
|
||||
func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error {
|
||||
txhash := meta.hash
|
||||
item := &limboBlob{
|
||||
TxHash: txhash,
|
||||
Block: block,
|
||||
Tx: tx,
|
||||
TxMeta: meta,
|
||||
Tx: nil, // The tx already stored in the blob database, not here.
|
||||
}
|
||||
data, err := rlp.EncodeToBytes(item)
|
||||
if err != nil {
|
||||
|
|
@ -255,10 +233,9 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.index[txhash] = id
|
||||
if _, ok := l.groups[block]; !ok {
|
||||
l.groups[block] = make(map[uint64]common.Hash)
|
||||
}
|
||||
l.groups[block][id] = txhash
|
||||
// Set the in-memory index
|
||||
item.id = id
|
||||
l.index[txhash] = item
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue