mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-31 20:18:37 +00:00
Merge 6ad047e6b4 into d446676fc4
This commit is contained in:
commit
24d51cc059
3 changed files with 334 additions and 130 deletions
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -147,6 +148,67 @@ type blobTxMeta struct {
|
||||||
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type blobTxMetaMarshal struct {
|
||||||
|
Hash common.Hash
|
||||||
|
Vhashes []common.Hash
|
||||||
|
Version byte
|
||||||
|
|
||||||
|
ID uint64
|
||||||
|
StorageSize uint32
|
||||||
|
Size uint64
|
||||||
|
|
||||||
|
Nonce uint64
|
||||||
|
CostCap *uint256.Int
|
||||||
|
ExecTipCap *uint256.Int
|
||||||
|
ExecFeeCap *uint256.Int
|
||||||
|
BlobFeeCap *uint256.Int
|
||||||
|
ExecGas uint64
|
||||||
|
BlobGas uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// EncodeRLP encodes the blobTxMeta into the given writer.
|
||||||
|
func (b *blobTxMeta) EncodeRLP(w io.Writer) error {
|
||||||
|
return rlp.Encode(w, &blobTxMetaMarshal{
|
||||||
|
Hash: b.hash,
|
||||||
|
Vhashes: b.vhashes,
|
||||||
|
Version: b.version,
|
||||||
|
ID: b.id,
|
||||||
|
StorageSize: b.storageSize,
|
||||||
|
Size: b.size,
|
||||||
|
Nonce: b.nonce,
|
||||||
|
CostCap: b.costCap,
|
||||||
|
ExecTipCap: b.execTipCap,
|
||||||
|
ExecFeeCap: b.execFeeCap,
|
||||||
|
BlobFeeCap: b.blobFeeCap,
|
||||||
|
ExecGas: b.execGas,
|
||||||
|
BlobGas: b.blobGas,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// DecodeRLP decodes the blobTxMeta from the given stream.
|
||||||
|
func (b *blobTxMeta) DecodeRLP(s *rlp.Stream) error {
|
||||||
|
var meta blobTxMetaMarshal
|
||||||
|
if err := s.Decode(&meta); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.hash = meta.Hash
|
||||||
|
b.vhashes = meta.Vhashes
|
||||||
|
b.version = meta.Version
|
||||||
|
b.id = meta.ID
|
||||||
|
b.storageSize = meta.StorageSize
|
||||||
|
b.size = meta.Size
|
||||||
|
b.nonce = meta.Nonce
|
||||||
|
b.costCap = meta.CostCap
|
||||||
|
b.execTipCap = meta.ExecTipCap
|
||||||
|
b.execFeeCap = meta.ExecFeeCap
|
||||||
|
b.blobFeeCap = meta.BlobFeeCap
|
||||||
|
b.execGas = meta.ExecGas
|
||||||
|
b.blobGas = meta.BlobGas
|
||||||
|
b.basefeeJumps = dynamicFeeJumps(meta.ExecFeeCap)
|
||||||
|
b.blobfeeJumps = dynamicFeeJumps(meta.BlobFeeCap)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
||||||
// and assembles a helper struct to track in memory.
|
// and assembles a helper struct to track in memory.
|
||||||
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
|
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
|
||||||
|
|
@ -518,6 +580,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
||||||
p.Close()
|
p.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the configured gas tip, triggering a filtering of anything just loaded
|
// Set the configured gas tip, triggering a filtering of anything just loaded
|
||||||
basefeeGauge.Update(int64(basefee.Uint64()))
|
basefeeGauge.Update(int64(basefee.Uint64()))
|
||||||
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
blobfeeGauge.Update(int64(blobfee.Uint64()))
|
||||||
|
|
@ -625,8 +688,9 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
)
|
)
|
||||||
if gapped || filled {
|
if gapped || filled {
|
||||||
var (
|
var (
|
||||||
ids []uint64
|
ids []uint64
|
||||||
nonces []uint64
|
deleteID []uint64
|
||||||
|
nonces []uint64
|
||||||
)
|
)
|
||||||
for i := 0; i < len(txs); i++ {
|
for i := 0; i < len(txs); i++ {
|
||||||
ids = append(ids, txs[i].id)
|
ids = append(ids, txs[i].id)
|
||||||
|
|
@ -636,8 +700,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
p.lookup.untrack(txs[i])
|
p.lookup.untrack(txs[i])
|
||||||
|
|
||||||
// Included transactions blobs need to be moved to the limbo
|
// Included transactions blobs need to be moved to the limbo
|
||||||
if filled && inclusions != nil {
|
if !(filled && inclusions != nil && p.offload(addr, txs[i], inclusions)) {
|
||||||
p.offload(addr, txs[i].nonce, txs[i].id, inclusions)
|
deleteID = append(deleteID, txs[i].id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(p.index, addr)
|
delete(p.index, addr)
|
||||||
|
|
@ -654,7 +718,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
|
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
|
||||||
dropFilledMeter.Mark(int64(len(ids)))
|
dropFilledMeter.Mark(int64(len(ids)))
|
||||||
}
|
}
|
||||||
for _, id := range ids {
|
for _, id := range deleteID {
|
||||||
if err := p.store.Delete(id); err != nil {
|
if err := p.store.Delete(id); err != nil {
|
||||||
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
||||||
}
|
}
|
||||||
|
|
@ -665,8 +729,9 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
// anything below the current state
|
// anything below the current state
|
||||||
if txs[0].nonce < next {
|
if txs[0].nonce < next {
|
||||||
var (
|
var (
|
||||||
ids []uint64
|
ids []uint64
|
||||||
nonces []uint64
|
deleteID []uint64
|
||||||
|
nonces []uint64
|
||||||
)
|
)
|
||||||
for len(txs) > 0 && txs[0].nonce < next {
|
for len(txs) > 0 && txs[0].nonce < next {
|
||||||
ids = append(ids, txs[0].id)
|
ids = append(ids, txs[0].id)
|
||||||
|
|
@ -677,15 +742,15 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
p.lookup.untrack(txs[0])
|
p.lookup.untrack(txs[0])
|
||||||
|
|
||||||
// Included transactions blobs need to be moved to the limbo
|
// Included transactions blobs need to be moved to the limbo
|
||||||
if inclusions != nil {
|
if !(inclusions != nil && p.offload(addr, txs[0], inclusions)) {
|
||||||
p.offload(addr, txs[0].nonce, txs[0].id, inclusions)
|
deleteID = append(deleteID, txs[0].id)
|
||||||
}
|
}
|
||||||
txs = txs[1:]
|
txs = txs[1:]
|
||||||
}
|
}
|
||||||
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
|
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
|
||||||
dropOverlappedMeter.Mark(int64(len(ids)))
|
dropOverlappedMeter.Mark(int64(len(ids)))
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range deleteID {
|
||||||
if err := p.store.Delete(id); err != nil {
|
if err := p.store.Delete(id); err != nil {
|
||||||
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
|
||||||
}
|
}
|
||||||
|
|
@ -857,26 +922,29 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
||||||
// any of it since there's no clear error case. Some errors may be due to coding
|
// any of it since there's no clear error case. Some errors may be due to coding
|
||||||
// issues, others caused by signers mining MEV stuff or swapping transactions. In
|
// issues, others caused by signers mining MEV stuff or swapping transactions. In
|
||||||
// all cases, the pool needs to continue operating.
|
// all cases, the pool needs to continue operating.
|
||||||
func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) {
|
func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) bool {
|
||||||
data, err := p.store.Get(id)
|
block, ok := inclusions[meta.hash]
|
||||||
if err != nil {
|
|
||||||
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var tx types.Transaction
|
|
||||||
if err = rlp.DecodeBytes(data, &tx); err != nil {
|
|
||||||
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
block, ok := inclusions[tx.Hash()]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
|
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
if err := p.limbo.push(&tx, block); err != nil {
|
raw, err := p.store.Get(meta.id)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Blobs missing for included transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if err := p.limbo.push(raw, meta, block); err != nil {
|
||||||
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
if err := p.store.Delete(meta.id); err != nil {
|
||||||
|
log.Error("Failed to delete blob transaction", "from", addr, "id", meta.id, "err", err)
|
||||||
|
if rollbackErr := p.limbo.drop(meta.hash); rollbackErr != nil {
|
||||||
|
log.Error("Failed to rollback limboed blob", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", rollbackErr)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
|
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
|
||||||
|
|
@ -1108,46 +1176,74 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
|
||||||
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
||||||
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
||||||
// add the transaction back into the pool as it is not mineable.
|
// add the transaction back into the pool as it is not mineable.
|
||||||
tx, err := p.limbo.pull(txhash)
|
item, err := p.limbo.pull(txhash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// TODO: seems like an easy optimization here would be getting the serialized tx
|
var (
|
||||||
// from limbo instead of re-serializing it here.
|
meta = item.TxMeta
|
||||||
|
raw = item.Raw
|
||||||
|
tx = item.Tx
|
||||||
|
)
|
||||||
|
switch {
|
||||||
|
case len(raw) > 0:
|
||||||
|
case tx != nil:
|
||||||
|
raw, err = rlp.EncodeToBytes(tx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to encode transaction for reinjection", "hash", tx.Hash(), "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case meta != nil:
|
||||||
|
log.Error("Blobs unavailable for metadata-only limbo entry", "hash", meta.hash)
|
||||||
|
return errors.New("missing blob payload")
|
||||||
|
default:
|
||||||
|
log.Error("invalid limbo entry")
|
||||||
|
}
|
||||||
|
head := p.head.Load()
|
||||||
|
isOsaka := p.chain.Config().IsOsaka(head.Number, head.Time)
|
||||||
|
if tx == nil && (isOsaka || meta == nil) {
|
||||||
|
tx = new(types.Transaction)
|
||||||
|
if err := rlp.DecodeBytes(raw, tx); err != nil {
|
||||||
|
log.Error("Failed to decode transaction for reinjection", "hash", txhash, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
// Converts reorged-out legacy blob transactions to the new format to prevent
|
// Converts reorged-out legacy blob transactions to the new format to prevent
|
||||||
// them from becoming stuck in the pool until eviction.
|
// them from becoming stuct in the pool until eviction.
|
||||||
//
|
//
|
||||||
// Performance note: Conversion takes ~140ms (Mac M1 Pro). Since a maximum of
|
// Performance note: Conversion takes ~140ms (Mac M1 Pro). Since a maximum of
|
||||||
// 9 legacy blob transactions are allowed in a block pre-Osaka, an adversary
|
// 9 legacy blob transactions are allowed in a block pre-Osaka, an adversary
|
||||||
// could theoretically halt a Geth node for ~1.2s by reorging per block. However,
|
// could theoretically halt a Geth node for ~1.2s by reorging per block. However,
|
||||||
// this attack is financially inefficient to execute.
|
// this attack if financially inefficient to execute.
|
||||||
head := p.head.Load()
|
if isOsaka && tx != nil && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
|
||||||
if p.chain.Config().IsOsaka(head.Number, head.Time) && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
|
|
||||||
if err := tx.BlobTxSidecar().ToV1(); err != nil {
|
if err := tx.BlobTxSidecar().ToV1(); err != nil {
|
||||||
log.Error("Failed to convert the legacy sidecar", "err", err)
|
log.Error("Failed to convert the legacy sidecar", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Info("Legacy blob transaction is reorged", "hash", tx.Hash())
|
raw, err = rlp.EncodeToBytes(tx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to encode transaction for reinjection", "hash", txhash, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Info("Reinjecting legacy sidecar", "hash", txhash, "raw", raw)
|
||||||
|
meta = nil // Force metadata regeneration after sidecar upgrade.
|
||||||
}
|
}
|
||||||
// Serialize the transaction back into the primary datastore.
|
id, err := p.store.Put(raw)
|
||||||
blob, err := rlp.EncodeToBytes(tx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
log.Error("Failed to store transaction for reinjection", "hash", txhash, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
id, err := p.store.Put(blob)
|
if meta == nil {
|
||||||
if err != nil {
|
meta = newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
||||||
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
|
} else {
|
||||||
return err
|
meta.id = id
|
||||||
|
meta.storageSize = p.store.Size(id)
|
||||||
|
meta.size = uint64(len(raw))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the indices and metrics
|
|
||||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
|
||||||
if _, ok := p.index[addr]; !ok {
|
if _, ok := p.index[addr]; !ok {
|
||||||
if err := p.reserver.Hold(addr); err != nil {
|
if err := p.reserver.Hold(addr); err != nil {
|
||||||
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
|
log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.index[addr] = []*blobTxMeta{meta}
|
p.index[addr] = []*blobTxMeta{meta}
|
||||||
|
|
@ -2008,10 +2104,15 @@ func (p *BlobPool) updateLimboMetrics() {
|
||||||
datareal += slotDataused + slotDatagaps
|
datareal += slotDataused + slotDatagaps
|
||||||
slotused += shelf.FilledSlots
|
slotused += shelf.FilledSlots
|
||||||
|
|
||||||
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused))
|
// Skip per-shelf metrics for the 1KB compatibility shelf (used for legacy
|
||||||
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps))
|
//metadata-only entries). shelf.SlotSize/blobSize would be 0 for that
|
||||||
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots))
|
// shelf, producing a misleading gauge name.
|
||||||
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots))
|
if blobCount := shelf.SlotSize / blobSize; blobCount > 0 {
|
||||||
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused))
|
||||||
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps))
|
||||||
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots))
|
||||||
|
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
limboDatausedGauge.Update(int64(dataused))
|
limboDatausedGauge.Update(int64(dataused))
|
||||||
limboDatarealGauge.Update(int64(datareal))
|
limboDatarealGauge.Update(int64(datareal))
|
||||||
|
|
|
||||||
|
|
@ -2138,3 +2138,87 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBlobpoolReinjectRestoresPayloadAcrossRestart(t *testing.T) {
|
||||||
|
storage := t.TempDir()
|
||||||
|
|
||||||
|
key, _ := crypto.GenerateKey()
|
||||||
|
addr := crypto.PubkeyToAddress(key.PublicKey)
|
||||||
|
tx := makeMultiBlobTx(0, 1, 1000, 100, 1, 0, key, types.BlobSidecarVersion0)
|
||||||
|
|
||||||
|
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
|
||||||
|
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
|
||||||
|
statedb.Commit(0, true, false)
|
||||||
|
|
||||||
|
chain := &testBlockChain{
|
||||||
|
config: params.MainnetChainConfig,
|
||||||
|
basefee: uint256.NewInt(1050),
|
||||||
|
blobfee: uint256.NewInt(105),
|
||||||
|
statedb: statedb,
|
||||||
|
}
|
||||||
|
currentHead := chain.CurrentBlock()
|
||||||
|
|
||||||
|
pool := New(Config{Datadir: storage}, chain, nil)
|
||||||
|
if err := pool.Init(1, currentHead, newReserver()); err != nil {
|
||||||
|
t.Fatalf("failed to create blob pool: %v", err)
|
||||||
|
}
|
||||||
|
if errs := pool.Add([]*types.Transaction{tx}, true); errs[0] != nil {
|
||||||
|
t.Fatalf("failed to add tx to pool: %v", errs[0])
|
||||||
|
}
|
||||||
|
wantRLP := pool.getRLP(tx.Hash())
|
||||||
|
if len(wantRLP) == 0 {
|
||||||
|
t.Fatalf("missing blob tx payload before offload")
|
||||||
|
}
|
||||||
|
|
||||||
|
includeHead := &types.Header{
|
||||||
|
Number: big.NewInt(int64(currentHead.Number.Uint64() + 1)),
|
||||||
|
Difficulty: common.Big0,
|
||||||
|
BaseFee: currentHead.BaseFee,
|
||||||
|
}
|
||||||
|
chain.blocks = map[uint64]*types.Block{
|
||||||
|
includeHead.Number.Uint64(): types.NewBlockWithHeader(includeHead).WithBody(types.Body{
|
||||||
|
Transactions: []*types.Transaction{tx},
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
chain.statedb.SetNonce(addr, tx.Nonce()+1, tracing.NonceChangeUnspecified)
|
||||||
|
pool.Reset(currentHead, includeHead)
|
||||||
|
|
||||||
|
if got := pool.Get(tx.Hash()); got != nil {
|
||||||
|
t.Fatalf("got non-nil blob tx after offload")
|
||||||
|
}
|
||||||
|
if err := pool.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close pool: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pool = New(Config{Datadir: storage}, chain, nil)
|
||||||
|
if err := pool.Init(1, includeHead, newReserver()); err != nil {
|
||||||
|
t.Fatalf("failed to create blob pool: %v", err)
|
||||||
|
}
|
||||||
|
if got := pool.Get(tx.Hash()); got != nil {
|
||||||
|
t.Fatalf("got non-nil blob tx after offload")
|
||||||
|
}
|
||||||
|
|
||||||
|
chain.statedb.SetNonce(addr, tx.Nonce(), tracing.NonceChangeUnspecified)
|
||||||
|
pool.Reset(includeHead, currentHead)
|
||||||
|
|
||||||
|
got := pool.Get(tx.Hash())
|
||||||
|
if got == nil {
|
||||||
|
t.Fatalf("got nil blob tx after offload")
|
||||||
|
}
|
||||||
|
if !bytes.Equal(pool.GetRLP(tx.Hash()), wantRLP) {
|
||||||
|
t.Fatalf("got blob tx after offload")
|
||||||
|
}
|
||||||
|
blobs, _, proofs, err := pool.GetBlobs(tx.BlobHashes(), types.BlobSidecarVersion0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to get blobs: %v", err)
|
||||||
|
}
|
||||||
|
if len(blobs) != 1 || blobs[0] == nil {
|
||||||
|
t.Fatalf("missing blob after reinjection")
|
||||||
|
}
|
||||||
|
if len(proofs) != 1 || len(proofs[0]) != 1 {
|
||||||
|
t.Fatalf("missing proof after reinjection")
|
||||||
|
}
|
||||||
|
verifyBlobRetrievals(t, pool)
|
||||||
|
|
||||||
|
pool.Close()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ package blobpool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"sort"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
|
@ -31,38 +32,29 @@ import (
|
||||||
// to which it belongs as well as the block number in which it was included for
|
// to which it belongs as well as the block number in which it was included for
|
||||||
// finality eviction.
|
// finality eviction.
|
||||||
type limboBlob struct {
|
type limboBlob struct {
|
||||||
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
||||||
Block uint64 // Block in which the blob transaction was included
|
Block uint64 // Block in which the blob transaction was included
|
||||||
Tx *types.Transaction
|
Tx *types.Transaction `rlp:"nil"` // Optional full blob transaction (old storage style)
|
||||||
|
TxMeta *blobTxMeta // the blob transaction metadata.
|
||||||
|
Raw []byte // Canonical raw blob transaction payload for reinjection
|
||||||
|
id uint64 // the billy id of limboBlob
|
||||||
}
|
}
|
||||||
|
|
||||||
// limbo is a light, indexed database to temporarily store recently included
|
// limbo is a light, indexed database to temporarily store recently included
|
||||||
// blobs until they are finalized. The purpose is to support small reorgs, which
|
// blobs until they are finalized. The purpose is to support small reorgs, which
|
||||||
// would require pulling back up old blobs (which aren't part of the chain).
|
// would require pulling back up old blobs (which aren't part of the chain).
|
||||||
//
|
|
||||||
// TODO(karalabe): Currently updating the inclusion block of a blob needs a full db rewrite. Can we do without?
|
|
||||||
type limbo struct {
|
type limbo struct {
|
||||||
store billy.Database // Persistent data store for limboed blobs
|
store billy.Database // Persistent data store for limboed blobs
|
||||||
|
index map[common.Hash]*limboBlob // Mappings from tx hashes to datastore ids
|
||||||
index map[common.Hash]uint64 // Mappings from tx hashes to datastore ids
|
|
||||||
groups map[uint64]map[uint64]common.Hash // Set of txs included in past blocks
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLimbo opens and indexes a set of limboed blob transactions.
|
// newLimbo opens and indexes a set of limboed blob transactions.
|
||||||
func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
|
func newLimbo(_ *params.ChainConfig, datadir string) (*limbo, error) {
|
||||||
l := &limbo{
|
l := &limbo{
|
||||||
index: make(map[common.Hash]uint64),
|
index: make(map[common.Hash]*limboBlob),
|
||||||
groups: make(map[uint64]map[uint64]common.Hash),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create new slotter for pre-Osaka blob configuration.
|
slotter := newLimboSlotter()
|
||||||
slotter := newSlotter(params.BlobTxMaxBlobs)
|
|
||||||
|
|
||||||
// See if we need to migrate the limbo after fusaka.
|
|
||||||
slotter, err := tryMigrate(config, slotter, datadir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Index all limboed blobs on disk and delete anything unprocessable
|
// Index all limboed blobs on disk and delete anything unprocessable
|
||||||
var fails []uint64
|
var fails []uint64
|
||||||
|
|
@ -86,9 +78,46 @@ func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return l, nil
|
return l, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newLimboSlotter returns a shelf layout that can read the legacy limbo formats
|
||||||
|
// and also store full blob transaction payloads for reinjection.
|
||||||
|
func newLimboSlotter() billy.SlotSizeFn {
|
||||||
|
var (
|
||||||
|
sizes []uint32
|
||||||
|
seen = make(map[uint32]struct{})
|
||||||
|
)
|
||||||
|
addSlotter := func(slotter billy.SlotSizeFn) {
|
||||||
|
for {
|
||||||
|
size, done := slotter()
|
||||||
|
if _, ok := seen[size]; !ok {
|
||||||
|
seen[size] = struct{}{}
|
||||||
|
sizes = append(sizes, size)
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Preserve compatibility with the metadata-only limbo format introduced on
|
||||||
|
// this branch while also supporting the legacy and restored full payloads.
|
||||||
|
sizes = append(sizes, 1024)
|
||||||
|
seen[1024] = struct{}{}
|
||||||
|
|
||||||
|
addSlotter(newSlotter(params.BlobTxMaxBlobs))
|
||||||
|
addSlotter(newSlotterEIP7594(params.BlobTxMaxBlobs))
|
||||||
|
|
||||||
|
sort.Slice(sizes, func(i, j int) bool { return sizes[i] < sizes[j] })
|
||||||
|
var idx int
|
||||||
|
return func() (size uint32, done bool) {
|
||||||
|
size = sizes[idx]
|
||||||
|
idx++
|
||||||
|
return size, idx == len(sizes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes down the underlying persistent store.
|
// Close closes down the underlying persistent store.
|
||||||
func (l *limbo) Close() error {
|
func (l *limbo) Close() error {
|
||||||
return l.store.Close()
|
return l.store.Close()
|
||||||
|
|
@ -112,12 +141,8 @@ func (l *limbo) parseBlob(id uint64, data []byte) error {
|
||||||
log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id)
|
log.Error("Dropping duplicate blob limbo entry", "owner", item.TxHash, "id", id)
|
||||||
return errors.New("duplicate blob")
|
return errors.New("duplicate blob")
|
||||||
}
|
}
|
||||||
l.index[item.TxHash] = id
|
item.id = id
|
||||||
|
l.index[item.TxHash] = item
|
||||||
if _, ok := l.groups[item.Block]; !ok {
|
|
||||||
l.groups[item.Block] = make(map[uint64]common.Hash)
|
|
||||||
}
|
|
||||||
l.groups[item.Block][id] = item.TxHash
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -130,31 +155,30 @@ func (l *limbo) finalize(final *types.Header) {
|
||||||
log.Warn("Nil finalized block cannot evict old blobs")
|
log.Warn("Nil finalized block cannot evict old blobs")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for block, ids := range l.groups {
|
// Note: deleting keys from a map during range is explicitly safe in Go.
|
||||||
if block > final.Number.Uint64() {
|
// Any key deleted mid-iteration may or may not be visited; entries missed
|
||||||
|
// here will be cleaned up on the next finalize call.
|
||||||
|
for _, item := range l.index {
|
||||||
|
if item.Block > final.Number.Uint64() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for id, owner := range ids {
|
if err := l.drop(item.TxHash); err != nil {
|
||||||
if err := l.store.Delete(id); err != nil {
|
log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err)
|
||||||
log.Error("Failed to drop finalized blob", "block", block, "id", id, "err", err)
|
|
||||||
}
|
|
||||||
delete(l.index, owner)
|
|
||||||
}
|
}
|
||||||
delete(l.groups, block)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// push stores a new blob transaction into the limbo, waiting until finality for
|
// push stores a new blob transaction into the limbo, waiting until finality for
|
||||||
// it to be automatically evicted.
|
// it to be automatically evicted.
|
||||||
func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
func (l *limbo) push(raw []byte, meta *blobTxMeta, block uint64) error {
|
||||||
// If the blobs are already tracked by the limbo, consider it a programming
|
// If the blobs are already tracked by the limbo, consider it a programming
|
||||||
// error. There's not much to do against it, but be loud.
|
// error. There's not much to do against it, but be loud.
|
||||||
if _, ok := l.index[tx.Hash()]; ok {
|
if _, ok := l.index[meta.hash]; ok {
|
||||||
log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash())
|
log.Error("Limbo cannot push already tracked blobs", "tx", meta.hash)
|
||||||
return errors.New("already tracked blob transaction")
|
return errors.New("already tracked blob transaction")
|
||||||
}
|
}
|
||||||
if err := l.setAndIndex(tx, block); err != nil {
|
if err := l.setAndIndex(raw, nil, meta, block); err != nil {
|
||||||
log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err)
|
log.Error("Failed to set and index limboed blobs", "tx", meta.hash, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -163,21 +187,19 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
||||||
// pull retrieves a previously pushed set of blobs back from the limbo, removing
|
// pull retrieves a previously pushed set of blobs back from the limbo, removing
|
||||||
// it at the same time. This method should be used when a previously included blob
|
// it at the same time. This method should be used when a previously included blob
|
||||||
// transaction gets reorged out.
|
// transaction gets reorged out.
|
||||||
func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
|
func (l *limbo) pull(tx common.Hash) (*limboBlob, error) {
|
||||||
// If the blobs are not tracked by the limbo, there's not much to do. This
|
// If the blobs are not tracked by the limbo, there's not much to do. This
|
||||||
// can happen for example if a blob transaction is mined without pushing it
|
// can happen for example if a blob transaction is mined without pushing it
|
||||||
// into the network first.
|
// into the network first.
|
||||||
id, ok := l.index[tx]
|
item, ok := l.index[tx]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Trace("Limbo cannot pull non-tracked blobs", "tx", tx)
|
log.Trace("Limbo cannot pull non-tracked blobs", "tx", tx)
|
||||||
return nil, errors.New("unseen blob transaction")
|
return nil, errors.New("unseen blob transaction")
|
||||||
}
|
}
|
||||||
item, err := l.getAndDrop(id)
|
if err := l.drop(item.TxHash); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return item.Tx, nil
|
return item, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// update changes the block number under which a blob transaction is tracked. This
|
// update changes the block number under which a blob transaction is tracked. This
|
||||||
|
|
@ -191,61 +213,59 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
|
||||||
// If the blobs are not tracked by the limbo, there's not much to do. This
|
// If the blobs are not tracked by the limbo, there's not much to do. This
|
||||||
// can happen for example if a blob transaction is mined without pushing it
|
// can happen for example if a blob transaction is mined without pushing it
|
||||||
// into the network first.
|
// into the network first.
|
||||||
id, ok := l.index[txhash]
|
item, ok := l.index[txhash]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Trace("Limbo cannot update non-tracked blobs", "tx", txhash)
|
log.Trace("Limbo cannot update non-tracked blobs", "tx", txhash)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// If there was no change in the blob's inclusion block, don't mess around
|
// If there was no change in the blob's inclusion block, don't mess around
|
||||||
// with heavy database operations.
|
// with heavy database operations.
|
||||||
if _, ok := l.groups[block][id]; ok {
|
if item.Block == block {
|
||||||
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
|
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Retrieve the old blobs from the data store and write them back with a new
|
if err := l.drop(txhash); err != nil {
|
||||||
// block number. IF anything fails, there's not much to do, go on.
|
log.Error("Failed to drop old limboed metadata", "tx", txhash, "err", err)
|
||||||
item, err := l.getAndDrop(id)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := l.setAndIndex(item.Tx, block); err != nil {
|
if err := l.setAndIndex(item.Raw, item.Tx, item.TxMeta, block); err != nil {
|
||||||
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
|
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block)
|
log.Trace("Blob transaction updated in limbo", "tx", txhash, "old-block", item.Block, "new-block", block)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getAndDrop retrieves a blob item from the limbo store and deletes it both from
|
// drop removes the blob metadata from the limbo.
|
||||||
// the store and indices.
|
func (l *limbo) drop(txhash common.Hash) error {
|
||||||
func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
|
if item, ok := l.index[txhash]; ok {
|
||||||
data, err := l.store.Get(id)
|
// Remove the blob metadata entry from the limbo store. If anything fails,
|
||||||
if err != nil {
|
// there's not much to do besides logging the error and returning.
|
||||||
return nil, err
|
if err := l.store.Delete(item.id); err != nil {
|
||||||
|
log.Error("Failed to drop old limboed blobs", "tx", txhash, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
delete(l.index, txhash)
|
||||||
}
|
}
|
||||||
item := new(limboBlob)
|
return nil
|
||||||
if err = rlp.DecodeBytes(data, item); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
delete(l.index, item.TxHash)
|
|
||||||
delete(l.groups[item.Block], id)
|
|
||||||
if len(l.groups[item.Block]) == 0 {
|
|
||||||
delete(l.groups, item.Block)
|
|
||||||
}
|
|
||||||
if err := l.store.Delete(id); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return item, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setAndIndex assembles a limbo blob database entry and stores it, also updating
|
// setAndIndex assembles a limbo blob database entry and stores it, also updating
|
||||||
// the in-memory indices.
|
// the in-memory indices.
|
||||||
func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
func (l *limbo) setAndIndex(raw []byte, tx *types.Transaction, meta *blobTxMeta, block uint64) error {
|
||||||
txhash := tx.Hash()
|
var txhash common.Hash
|
||||||
|
switch {
|
||||||
|
case meta != nil:
|
||||||
|
txhash = meta.hash
|
||||||
|
case tx != nil:
|
||||||
|
txhash = tx.Hash()
|
||||||
|
default:
|
||||||
|
return errors.New("missing limbo payload")
|
||||||
|
}
|
||||||
item := &limboBlob{
|
item := &limboBlob{
|
||||||
TxHash: txhash,
|
TxHash: txhash,
|
||||||
Block: block,
|
Block: block,
|
||||||
Tx: tx,
|
Raw: raw,
|
||||||
|
TxMeta: meta,
|
||||||
}
|
}
|
||||||
data, err := rlp.EncodeToBytes(item)
|
data, err := rlp.EncodeToBytes(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -255,10 +275,9 @@ func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
l.index[txhash] = id
|
// Set the in-memory index
|
||||||
if _, ok := l.groups[block]; !ok {
|
item.id = id
|
||||||
l.groups[block] = make(map[uint64]common.Hash)
|
l.index[txhash] = item
|
||||||
}
|
|
||||||
l.groups[block][id] = txhash
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue