core: add BlobTxForPool type

This commit is contained in:
healthykim 2026-05-05 23:27:19 +02:00
parent d422ab39d5
commit bddf792428
5 changed files with 325 additions and 112 deletions

View file

@ -147,28 +147,39 @@ type blobTxMeta struct {
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
} }
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // newBlobTxForPool decomposes a blob transaction into BlobTxForPool
// and assembles a helper struct to track in memory. // type.
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). func newBlobTxForPool(tx *types.Transaction) *types.BlobTxForPool {
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { sc := tx.BlobTxSidecar()
if tx.BlobTxSidecar() == nil { if sc == nil {
// This should never happen, as the pool only admits blob transactions with a sidecar
panic("missing blob tx sidecar") panic("missing blob tx sidecar")
} }
return &types.BlobTxForPool{
Tx: tx.WithoutBlobTxSidecar(),
Version: sc.Version,
Commitments: sc.Commitments,
Proofs: sc.Proofs,
Blobs: sc.Blobs,
}
}
// newBlobTxMeta retrieves the indexed metadata fields from a pooled blob
// transaction and assembles a helper struct to track in memory.
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, ptx *types.BlobTxForPool) *blobTxMeta {
meta := &blobTxMeta{ meta := &blobTxMeta{
hash: tx.Hash(), hash: ptx.Tx.Hash(),
vhashes: tx.BlobHashes(), vhashes: ptx.Tx.BlobHashes(),
version: tx.BlobTxSidecar().Version, version: ptx.Version,
id: id, id: id,
storageSize: storageSize, storageSize: storageSize,
size: size, size: size,
nonce: tx.Nonce(), nonce: ptx.Tx.Nonce(),
costCap: uint256.MustFromBig(tx.Cost()), costCap: uint256.MustFromBig(ptx.Tx.Cost()),
execTipCap: uint256.MustFromBig(tx.GasTipCap()), execTipCap: uint256.MustFromBig(ptx.Tx.GasTipCap()),
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), execFeeCap: uint256.MustFromBig(ptx.Tx.GasFeeCap()),
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), blobFeeCap: uint256.MustFromBig(ptx.Tx.BlobGasFeeCap()),
execGas: tx.Gas(), execGas: ptx.Tx.Gas(),
blobGas: tx.BlobGas(), blobGas: ptx.Tx.BlobGas(),
} }
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap) meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap)
@ -460,10 +471,20 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
return err return err
} }
// Index all transactions on disk and delete anything unprocessable // Index all transactions on disk and delete anything unprocessable
var fails []uint64 var (
fails []uint64
convertTxs []*types.Transaction
)
index := func(id uint64, size uint32, blob []byte) { index := func(id uint64, size uint32, blob []byte) {
if p.parseTransaction(id, size, blob) != nil { legacy, err := p.parseTransaction(id, size, blob)
if err != nil {
fails = append(fails, id) fails = append(fails, id)
} else if legacy {
fails = append(fails, id)
tx := new(types.Transaction)
if err := rlp.DecodeBytes(blob, tx); err != nil {
convertTxs = append(convertTxs, tx)
}
} }
} }
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index) store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index)
@ -472,6 +493,32 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
} }
p.store = store p.store = store
// Migrate legacy transactions (types.Transaction) to pooledBlobTx format.
if len(convertTxs) > 0 {
for _, tx := range convertTxs {
ptx := newBlobTxForPool(tx)
blob, err := rlp.EncodeToBytes(ptx)
if err != nil {
continue
}
id, err := p.store.Put(blob)
if err != nil {
continue
}
meta := newBlobTxMeta(id, ptx.TxSize(), p.store.Size(id), ptx)
sender, err := types.Sender(p.signer, ptx.Tx)
if err != nil {
fails = append(fails, id)
continue
}
if err := p.trackTransaction(meta, sender); err != nil {
fails = append(fails, id)
continue
}
}
}
if len(fails) > 0 { if len(fails) > 0 {
log.Warn("Dropping invalidated blob transactions", "ids", fails) log.Warn("Dropping invalidated blob transactions", "ids", fails)
dropInvalidMeter.Mark(int64(len(fails))) dropInvalidMeter.Mark(int64(len(fails)))
@ -483,6 +530,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
} }
} }
} }
// Sort the indexed transactions by nonce and delete anything gapped, create // Sort the indexed transactions by nonce and delete anything gapped, create
// the eviction heap of anyone still standing // the eviction heap of anyone still standing
for addr := range p.index { for addr := range p.index {
@ -558,36 +606,38 @@ func (p *BlobPool) Close() error {
// parseTransaction is a callback method on pool creation that gets called for // parseTransaction is a callback method on pool creation that gets called for
// each transaction on disk to create the in-memory metadata index. // each transaction on disk to create the in-memory metadata index.
// Announced state is not initialized here, it needs to be iniitalized seprately. // Return value `bool` is set to true when the entry has old Transaction type.
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) (bool, error) {
tx := new(types.Transaction) ptx := new(types.BlobTxForPool)
if err := rlp.DecodeBytes(blob, tx); err != nil { if err := rlp.DecodeBytes(blob, ptx); err != nil {
// This path is impossible unless the disk data representation changes tx := new(types.Transaction)
// across restarts. For that ever improbable case, recover gracefully if err := rlp.DecodeBytes(blob, tx); err != nil {
// by ignoring this data entry. return false, err
log.Error("Failed to decode blob pool entry", "id", id, "err", err) }
return err if tx.BlobTxSidecar() == nil {
return false, errors.New("missing blob sidecar")
}
return true, nil
} }
if tx.BlobTxSidecar() == nil { meta := newBlobTxMeta(id, ptx.TxSize(), size, ptx)
log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash()) if p.lookup.exists(meta.hash) {
return errors.New("missing blob sidecar") return false, errors.New("duplicate blob entry")
} }
sender, err := types.Sender(p.signer, ptx.Tx)
if err != nil {
return false, err
}
return false, p.trackTransaction(meta, sender)
}
meta := newBlobTxMeta(id, tx.Size(), size, tx) // trackTransaction registers a transaction's metadata in the pool's indices.
func (p *BlobPool) trackTransaction(meta *blobTxMeta, sender common.Address) error {
if p.lookup.exists(meta.hash) { if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not // This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get // removed via the normal shutdown-startup procedure and thus may get
// partially resurrected. // partially resurrected.
log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash()) log.Error("Rejecting duplicate blob pool entry", "id", meta.id, "hash", meta.hash)
return errors.New("duplicate blob entry") return fmt.Errorf("duplicate blob entry %d, %s", meta.id, meta.hash)
}
sender, err := types.Sender(p.signer, tx)
if err != nil {
// This path is impossible unless the signature validity changes across
// restarts. For that ever improbable case, recover gracefully by ignoring
// this data entry.
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
return err
} }
if _, ok := p.index[sender]; !ok { if _, ok := p.index[sender]; !ok {
if err := p.reserver.Hold(sender); err != nil { if err := p.reserver.Hold(sender); err != nil {
@ -604,6 +654,9 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
return nil return nil
} }
// recheck verifies the pool's content for a specific account and drops anything
// that does not
// recheck verifies the pool's content for a specific account and drops anything // recheck verifies the pool's content for a specific account and drops anything
// that does not fit anymore (dangling or filled nonce, overdraft). // that does not fit anymore (dangling or filled nonce, overdraft).
func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint64) { func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint64) {
@ -863,17 +916,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return return
} }
var tx types.Transaction ptx := new(types.BlobTxForPool)
if err = rlp.DecodeBytes(data, &tx); err != nil { if err := rlp.DecodeBytes(data, ptx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return return
} }
block, ok := inclusions[tx.Hash()] block, ok := inclusions[ptx.Tx.Hash()]
if !ok { if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id) log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
return return
} }
if err := p.limbo.push(&tx, block); err != nil { if err := p.limbo.push(ptx, block); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err) log.Warn("Failed to offload blob tx into limbo", "err", err)
return return
} }
@ -1108,7 +1161,7 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot // Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable. // add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash) ptx, err := p.limbo.pull(txhash)
if err != nil { if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err) log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return err return err
@ -1124,30 +1177,29 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// could theoretically halt a Geth node for ~1.2s by reorging per block. However, // could theoretically halt a Geth node for ~1.2s by reorging per block. However,
// this attack is financially inefficient to execute. // this attack is financially inefficient to execute.
head := p.head.Load() head := p.head.Load()
if p.chain.Config().IsOsaka(head.Number, head.Time) && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 { if p.chain.Config().IsOsaka(head.Number, head.Time) && ptx.Version == types.BlobSidecarVersion0 {
if err := tx.BlobTxSidecar().ToV1(); err != nil { sc := ptx.Sidecar()
if err := sc.ToV1(); err != nil {
log.Error("Failed to convert the legacy sidecar", "err", err) log.Error("Failed to convert the legacy sidecar", "err", err)
return err return err
} }
log.Info("Legacy blob transaction is reorged", "hash", tx.Hash()) ptx.WithSidecar(sc)
log.Info("Legacy blob transaction is reorged", "hash", ptx.Tx.Hash())
} }
// Serialize the transaction back into the primary datastore. blob, err := rlp.EncodeToBytes(ptx)
blob, err := rlp.EncodeToBytes(tx)
if err != nil { if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) log.Error("Failed to encode transaction for storage", "hash", ptx.Tx.Hash(), "err", err)
return err return err
} }
id, err := p.store.Put(blob) id, err := p.store.Put(blob)
if err != nil { if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) log.Error("Failed to write transaction into storage", "hash", ptx.Tx.Hash(), "err", err)
return err return err
} }
meta := newBlobTxMeta(id, ptx.TxSize(), p.store.Size(id), ptx)
// Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok { if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil { if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) log.Warn("Failed to reserve account for blob pool", "tx", ptx.Tx.Hash(), "from", addr, "err", err)
return err return err
} }
p.index[addr] = []*blobTxMeta{meta} p.index[addr] = []*blobTxMeta{meta}
@ -1404,20 +1456,27 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
if len(data) == 0 { if len(data) == 0 {
return nil return nil
} }
item := new(types.Transaction) ptx := new(types.BlobTxForPool)
if err := rlp.DecodeBytes(data, item); err != nil { if err := rlp.DecodeBytes(data, ptx); err != nil {
id, _ := p.lookup.storeidOfTx(hash) id, _ := p.lookup.storeidOfTx(hash)
log.Error("Blobs corrupted for traced transaction", log.Error("Blobs corrupted for traced transaction",
"hash", hash, "id", id, "err", err) "hash", hash, "id", id, "err", err)
return nil return nil
} }
return item return ptx.ToTx()
} }
// GetRLP returns a RLP-encoded transaction if it is contained in the pool. // GetRLP returns a RLP-encoded transaction for network if it is contained in the pool.
func (p *BlobPool) GetRLP(hash common.Hash) []byte { func (p *BlobPool) GetRLP(hash common.Hash) []byte {
return p.getRLP(hash) data := p.getRLP(hash)
rlp, err := types.EncodeForNetwork(data)
if err != nil {
log.Error("Failed to encode pooled tx into the network type", "hash", hash, "err", err)
return nil
}
return rlp
} }
// GetMetadata returns the transaction type and transaction size with the // GetMetadata returns the transaction type and transaction size with the
@ -1486,18 +1545,14 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blo
} }
// Decode the blob transaction // Decode the blob transaction
tx := new(types.Transaction) ptx := new(types.BlobTxForPool)
if err := rlp.DecodeBytes(data, tx); err != nil { if err := rlp.DecodeBytes(data, ptx); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err) log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err)
continue continue
} }
sidecar := tx.BlobTxSidecar() sidecar := ptx.Sidecar()
if sidecar == nil {
log.Error("Blob tx without sidecar", "hash", tx.Hash(), "id", txID)
continue
}
// Traverse the blobs in the transaction // Traverse the blobs in the transaction
for i, hash := range tx.BlobHashes() { for i, hash := range ptx.Tx.BlobHashes() {
list, ok := indices[hash] list, ok := indices[hash]
if !ok { if !ok {
continue // non-interesting blob continue // non-interesting blob
@ -1641,7 +1696,8 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
} }
// Transaction permitted into the pool from a nonce and cost perspective, // Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices // insert it into the database and update the indices
blob, err := rlp.EncodeToBytes(tx) ptx := newBlobTxForPool(tx)
blob, err := rlp.EncodeToBytes(ptx)
if err != nil { if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err return err
@ -1650,7 +1706,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
if err != nil { if err != nil {
return err return err
} }
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), ptx)
var ( var (
next = p.state.GetNonce(from) next = p.state.GetNonce(from)

View file

@ -235,6 +235,12 @@ func makeTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64,
return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx) return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
} }
// encodeForPool encodes a blob transaction in the BlobTxForPool storage format.
func encodeForPool(tx *types.Transaction) []byte {
blob, _ := rlp.EncodeToBytes(newBlobTxForPool(tx))
return blob
}
// makeMultiBlobTx is a utility method to construct a ramdom blob tx with // makeMultiBlobTx is a utility method to construct a ramdom blob tx with
// certain number of blobs in its sidecar. // certain number of blobs in its sidecar.
func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction { func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction {
@ -530,7 +536,7 @@ func TestOpenDrops(t *testing.T) {
) )
for _, nonce := range []uint64{0, 1, 3, 4, 6, 7} { // first gap at #2, another at #5 for _, nonce := range []uint64{0, 1, 3, 4, 6, 7} { // first gap at #2, another at #5
tx := makeTx(nonce, 1, 1, 1, gapper) tx := makeTx(nonce, 1, 1, 1, gapper)
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
if nonce < 2 { if nonce < 2 {
@ -547,7 +553,7 @@ func TestOpenDrops(t *testing.T) {
) )
for _, nonce := range []uint64{1, 2, 3} { // first gap at #0, all set dangling for _, nonce := range []uint64{1, 2, 3} { // first gap at #0, all set dangling
tx := makeTx(nonce, 1, 1, 1, dangler) tx := makeTx(nonce, 1, 1, 1, dangler)
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
dangling[id] = struct{}{} dangling[id] = struct{}{}
@ -560,7 +566,7 @@ func TestOpenDrops(t *testing.T) {
) )
for _, nonce := range []uint64{0, 1, 2} { // account nonce at 3, all set filled for _, nonce := range []uint64{0, 1, 2} { // account nonce at 3, all set filled
tx := makeTx(nonce, 1, 1, 1, filler) tx := makeTx(nonce, 1, 1, 1, filler)
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
filled[id] = struct{}{} filled[id] = struct{}{}
@ -573,7 +579,7 @@ func TestOpenDrops(t *testing.T) {
) )
for _, nonce := range []uint64{0, 1, 2, 3} { // account nonce at 2, half filled for _, nonce := range []uint64{0, 1, 2, 3} { // account nonce at 2, half filled
tx := makeTx(nonce, 1, 1, 1, overlapper) tx := makeTx(nonce, 1, 1, 1, overlapper)
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
if nonce >= 2 { if nonce >= 2 {
@ -595,7 +601,7 @@ func TestOpenDrops(t *testing.T) {
} else { } else {
tx = makeTx(uint64(i), 1, 1, 1, underpayer) tx = makeTx(uint64(i), 1, 1, 1, underpayer)
} }
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
underpaid[id] = struct{}{} underpaid[id] = struct{}{}
@ -614,7 +620,7 @@ func TestOpenDrops(t *testing.T) {
} else { } else {
tx = makeTx(uint64(i), 1, 1, 1, outpricer) tx = makeTx(uint64(i), 1, 1, 1, outpricer)
} }
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
if i < 2 { if i < 2 {
@ -636,7 +642,7 @@ func TestOpenDrops(t *testing.T) {
} else { } else {
tx = makeTx(nonce, 1, 1, 1, exceeder) tx = makeTx(nonce, 1, 1, 1, exceeder)
} }
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
exceeded[id] = struct{}{} exceeded[id] = struct{}{}
@ -654,7 +660,7 @@ func TestOpenDrops(t *testing.T) {
} else { } else {
tx = makeTx(nonce, 1, 1, 1, overdrafter) tx = makeTx(nonce, 1, 1, 1, overdrafter)
} }
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
id, _ := store.Put(blob) id, _ := store.Put(blob)
if nonce < 1 { if nonce < 1 {
@ -670,7 +676,7 @@ func TestOpenDrops(t *testing.T) {
overcapped = make(map[uint64]struct{}) overcapped = make(map[uint64]struct{})
) )
for nonce := uint64(0); nonce < maxTxsPerAccount+3; nonce++ { for nonce := uint64(0); nonce < maxTxsPerAccount+3; nonce++ {
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, overcapper)) blob := encodeForPool(makeTx(nonce, 1, 1, 1, overcapper))
id, _ := store.Put(blob) id, _ := store.Put(blob)
if nonce < maxTxsPerAccount { if nonce < maxTxsPerAccount {
@ -686,7 +692,7 @@ func TestOpenDrops(t *testing.T) {
duplicated = make(map[uint64]struct{}) duplicated = make(map[uint64]struct{})
) )
for _, nonce := range []uint64{0, 1, 2} { for _, nonce := range []uint64{0, 1, 2} {
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, duplicater)) blob := encodeForPool(makeTx(nonce, 1, 1, 1, duplicater))
for i := 0; i < int(nonce)+1; i++ { for i := 0; i < int(nonce)+1; i++ {
id, _ := store.Put(blob) id, _ := store.Put(blob)
@ -705,7 +711,7 @@ func TestOpenDrops(t *testing.T) {
) )
for _, nonce := range []uint64{0, 1, 2} { for _, nonce := range []uint64{0, 1, 2} {
for i := 0; i < int(nonce)+1; i++ { for i := 0; i < int(nonce)+1; i++ {
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater)) blob := encodeForPool(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater))
id, _ := store.Put(blob) id, _ := store.Put(blob)
if i == 0 { if i == 0 {
@ -842,7 +848,7 @@ func TestOpenIndex(t *testing.T) {
) )
for _, i := range []int{5, 3, 4, 2, 0, 1} { // Randomize the tx insertion order to force sorting on load for _, i := range []int{5, 3, 4, 2, 0, 1} { // Randomize the tx insertion order to force sorting on load
tx := makeTx(uint64(i), txExecTipCaps[i], txExecFeeCaps[i], txBlobFeeCaps[i], key) tx := makeTx(uint64(i), txExecTipCaps[i], txExecFeeCaps[i], txBlobFeeCaps[i], key)
blob, _ := rlp.EncodeToBytes(tx) blob := encodeForPool(tx)
store.Put(blob) store.Put(blob)
} }
store.Close() store.Close()
@ -934,9 +940,9 @@ func TestOpenHeap(t *testing.T) {
tx2 = makeTx(0, 1, 800, 70, key2) tx2 = makeTx(0, 1, 800, 70, key2)
tx3 = makeTx(0, 1, 1500, 110, key3) tx3 = makeTx(0, 1, 1500, 110, key3)
blob1, _ = rlp.EncodeToBytes(tx1) blob1 = encodeForPool(tx1)
blob2, _ = rlp.EncodeToBytes(tx2) blob2 = encodeForPool(tx2)
blob3, _ = rlp.EncodeToBytes(tx3) blob3 = encodeForPool(tx3)
heapOrder = []common.Address{addr2, addr1, addr3} heapOrder = []common.Address{addr2, addr1, addr3}
heapIndex = map[common.Address]int{addr2: 0, addr1: 1, addr3: 2} heapIndex = map[common.Address]int{addr2: 0, addr1: 1, addr3: 2}
@ -1009,9 +1015,9 @@ func TestOpenCap(t *testing.T) {
tx2 = makeTx(0, 1, 800, 70, key2) tx2 = makeTx(0, 1, 800, 70, key2)
tx3 = makeTx(0, 1, 1500, 110, key3) tx3 = makeTx(0, 1, 1500, 110, key3)
blob1, _ = rlp.EncodeToBytes(tx1) blob1 = encodeForPool(tx1)
blob2, _ = rlp.EncodeToBytes(tx2) blob2 = encodeForPool(tx2)
blob3, _ = rlp.EncodeToBytes(tx3) blob3 = encodeForPool(tx3)
keep = []common.Address{addr1, addr3} keep = []common.Address{addr1, addr3}
drop = []common.Address{addr2} drop = []common.Address{addr2}
@ -1098,8 +1104,8 @@ func TestChangingSlotterSize(t *testing.T) {
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0) tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0) tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)
blob1, _ = rlp.EncodeToBytes(tx1) blob1 = encodeForPool(tx1)
blob2, _ = rlp.EncodeToBytes(tx2) blob2 = encodeForPool(tx2)
) )
// Write the two safely sized txs to store. note: although the store is // Write the two safely sized txs to store. note: although the store is
@ -1201,8 +1207,8 @@ func TestBillyMigration(t *testing.T) {
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0) tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0) tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)
blob1, _ = rlp.EncodeToBytes(tx1) blob1 = encodeForPool(tx1)
blob2, _ = rlp.EncodeToBytes(tx2) blob2 = encodeForPool(tx2)
) )
// Write the two safely sized txs to store. note: although the store is // Write the two safely sized txs to store. note: although the store is
@ -1746,7 +1752,7 @@ func TestAdd(t *testing.T) {
// Sign the seed transactions and store them in the data store // Sign the seed transactions and store them in the data store
for _, tx := range seed.txs { for _, tx := range seed.txs {
signed := types.MustSignNewTx(keys[acc], types.LatestSigner(params.MainnetChainConfig), tx) signed := types.MustSignNewTx(keys[acc], types.LatestSigner(params.MainnetChainConfig), tx)
blob, _ := rlp.EncodeToBytes(signed) blob := encodeForPool(signed)
store.Put(blob) store.Put(blob)
} }
} }
@ -1853,9 +1859,9 @@ func TestGetBlobs(t *testing.T) {
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion1) // [6, 12) tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion1) // [6, 12)
tx3 = makeMultiBlobTx(0, 1, 800, 110, 6, 12, key3, types.BlobSidecarVersion0) // [12, 18) tx3 = makeMultiBlobTx(0, 1, 800, 110, 6, 12, key3, types.BlobSidecarVersion0) // [12, 18)
blob1, _ = rlp.EncodeToBytes(tx1) blob1 = encodeForPool(tx1)
blob2, _ = rlp.EncodeToBytes(tx2) blob2 = encodeForPool(tx2)
blob3, _ = rlp.EncodeToBytes(tx3) blob3 = encodeForPool(tx3)
) )
// Write the two safely sized txs to store. note: although the store is // Write the two safely sized txs to store. note: although the store is

View file

@ -33,7 +33,7 @@ import (
type limboBlob struct { type limboBlob struct {
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
Block uint64 // Block in which the blob transaction was included Block uint64 // Block in which the blob transaction was included
Tx *types.Transaction Ptx *types.BlobTxForPool
} }
// limbo is a light, indexed database to temporarily store recently included // limbo is a light, indexed database to temporarily store recently included
@ -146,15 +146,14 @@ func (l *limbo) finalize(final *types.Header) {
// push stores a new blob transaction into the limbo, waiting until finality for // push stores a new blob transaction into the limbo, waiting until finality for
// it to be automatically evicted. // it to be automatically evicted.
func (l *limbo) push(tx *types.Transaction, block uint64) error { func (l *limbo) push(ptx *types.BlobTxForPool, block uint64) error {
// If the blobs are already tracked by the limbo, consider it a programming hash := ptx.Tx.Hash()
// error. There's not much to do against it, but be loud. if _, ok := l.index[hash]; ok {
if _, ok := l.index[tx.Hash()]; ok { log.Error("Limbo cannot push already tracked blobs", "tx", hash)
log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash())
return errors.New("already tracked blob transaction") return errors.New("already tracked blob transaction")
} }
if err := l.setAndIndex(tx, block); err != nil { if err := l.setAndIndex(ptx, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err) log.Error("Failed to set and index limboed blobs", "tx", hash, "err", err)
return err return err
} }
return nil return nil
@ -163,7 +162,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
// pull retrieves a previously pushed set of blobs back from the limbo, removing // pull retrieves a previously pushed set of blobs back from the limbo, removing
// it at the same time. This method should be used when a previously included blob // it at the same time. This method should be used when a previously included blob
// transaction gets reorged out. // transaction gets reorged out.
func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) { func (l *limbo) pull(tx common.Hash) (*types.BlobTxForPool, error) {
// If the blobs are not tracked by the limbo, there's not much to do. This // If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it // can happen for example if a blob transaction is mined without pushing it
// into the network first. // into the network first.
@ -177,7 +176,7 @@ func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err) log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
return nil, err return nil, err
} }
return item.Tx, nil return item.Ptx, nil
} }
// update changes the block number under which a blob transaction is tracked. This // update changes the block number under which a blob transaction is tracked. This
@ -209,7 +208,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err) log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
return return
} }
if err := l.setAndIndex(item.Tx, block); err != nil { if err := l.setAndIndex(item.Ptx, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err) log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
return return
} }
@ -240,12 +239,12 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
// setAndIndex assembles a limbo blob database entry and stores it, also updating // setAndIndex assembles a limbo blob database entry and stores it, also updating
// the in-memory indices. // the in-memory indices.
func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { func (l *limbo) setAndIndex(ptx *types.BlobTxForPool, block uint64) error {
txhash := tx.Hash() txhash := ptx.Tx.Hash()
item := &limboBlob{ item := &limboBlob{
TxHash: txhash, TxHash: txhash,
Block: block, Block: block,
Tx: tx, Ptx: ptx,
} }
data, err := rlp.EncodeToBytes(item) data, err := rlp.EncodeToBytes(item)
if err != nil { if err != nil {

View file

@ -176,6 +176,112 @@ func (sc *BlobTxSidecar) Copy() *BlobTxSidecar {
} }
} }
// BlobTxForPool is a type used for blob transaction in the blobpool.
type BlobTxForPool struct {
Tx *Transaction // tx without sidecar
Version byte
Commitments []kzg4844.Commitment
Proofs []kzg4844.Proof
Blobs []kzg4844.Blob
}
// Sidecar returns BlobTxSidecar of ptx.
func (ptx *BlobTxForPool) Sidecar() *BlobTxSidecar {
return NewBlobTxSidecar(ptx.Version, ptx.Blobs, ptx.Commitments, ptx.Proofs)
}
// WithSidecar copies the sidecar's fields into the flat fields.
func (ptx *BlobTxForPool) WithSidecar(sc *BlobTxSidecar) {
ptx.Version = sc.Version
ptx.Commitments = sc.Commitments
ptx.Proofs = sc.Proofs
ptx.Blobs = sc.Blobs
}
// TxSize returns the transaction size on the network without
// reconstructing the transaction.
func (ptx *BlobTxForPool) TxSize() uint64 {
var blobs, commitments, proofs uint64
for i := range ptx.Blobs {
blobs += rlp.BytesSize(ptx.Blobs[i][:])
}
for i := range ptx.Commitments {
commitments += rlp.BytesSize(ptx.Commitments[i][:])
}
for i := range ptx.Proofs {
proofs += rlp.BytesSize(ptx.Proofs[i][:])
}
return ptx.Tx.Size() + rlp.ListSize(rlp.ListSize(blobs)+rlp.ListSize(commitments)+rlp.ListSize(proofs))
}
// ToTx reconstructs a full Transaction with the sidecar attached.
func (ptx *BlobTxForPool) ToTx() *Transaction {
return ptx.Tx.WithBlobTxSidecar(ptx.Sidecar())
}
// EncodeForNetwork transforms stored BlobTxForPool RLP into the standard
// network transaction encoding.
//
// Stored RLP: [type_byte || tx_fields, version, [comms], [proofs], [blobs]]
// V0: type_byte || rlp([tx_fields, [blobs], [comms], [proofs]])
// V1: type_byte || rlp([tx_fields, version, [blobs], [comms], [proofs]])
func EncodeForNetwork(storedRLP []byte) ([]byte, error) {
elems, err := rlp.SplitListValues(storedRLP)
if err != nil {
return nil, fmt.Errorf("invalid BlobTxForPool RLP: %w", err)
}
if len(elems) < 5 {
return nil, fmt.Errorf("BlobTxForPool has %d elements, need at least 5", len(elems))
}
// 1. Extract tx byte and other tx fields
txBytes, _, err := rlp.SplitString(elems[0])
if err != nil {
return nil, fmt.Errorf("invalid tx bytes: %w", err)
}
if len(txBytes) < 2 {
return nil, errors.New("tx bytes too short")
}
typeByte := txBytes[0]
txRLP := txBytes[1:]
// 2. Find the version of sidecar.
version, _, err := rlp.SplitString(elems[1])
if err != nil {
return nil, fmt.Errorf("invalid version: %w", err)
}
var versionByte byte
switch len(version) {
case 0:
versionByte = 0
case 1:
versionByte = version[0]
default:
return nil, fmt.Errorf("invalid version length: %d", len(version))
}
// 3. Extract sidecar elements.
commitmentsRLP := elems[2]
proofsRLP := elems[3]
blobsRLP := elems[4]
// 4. Reconstruct into the network format.
var outer [][]byte
if versionByte == BlobSidecarVersion0 {
outer = [][]byte{txRLP, blobsRLP, commitmentsRLP, proofsRLP}
} else {
outer = [][]byte{txRLP, elems[1], blobsRLP, commitmentsRLP, proofsRLP}
}
body, err := rlp.MergeListValues(outer)
if err != nil {
return nil, err
}
// Prepend type byte and wrap as an RLP string.
inner := make([]byte, 1+len(body))
inner[0] = typeByte
copy(inner[1:], body)
return rlp.EncodeToBytes(inner)
}
// blobTxWithBlobs represents blob tx with its corresponding sidecar. // blobTxWithBlobs represents blob tx with its corresponding sidecar.
// This is an interface because sidecars are versioned. // This is an interface because sidecars are versioned.
type blobTxWithBlobs interface { type blobTxWithBlobs interface {

View file

@ -17,12 +17,14 @@
package types package types
import ( import (
"bytes"
"crypto/ecdsa" "crypto/ecdsa"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256" "github.com/holiman/uint256"
) )
@ -86,6 +88,50 @@ func createEmptyBlobTx(key *ecdsa.PrivateKey, withSidecar bool) *Transaction {
return MustSignNewTx(key, signer, blobtx) return MustSignNewTx(key, signer, blobtx)
} }
// TestEncodeForNetwork verifies that EncodeForNetwork produces output identical
// to rlp.EncodeToBytes on the original transaction, for both V0 and V1 sidecars.
func TestEncodeForNetwork(t *testing.T) {
t.Run("v0", func(t *testing.T) { testEncodeForNetwork(t, BlobSidecarVersion0) })
t.Run("v1", func(t *testing.T) { testEncodeForNetwork(t, BlobSidecarVersion1) })
}
func testEncodeForNetwork(t *testing.T, version byte) {
key, _ := crypto.GenerateKey()
tx := createEmptyBlobTx(key, true)
if version == BlobSidecarVersion1 {
if err := tx.BlobTxSidecar().ToV1(); err != nil {
t.Fatalf("failed to convert sidecar to v1: %v", err)
}
}
wantRLP, err := rlp.EncodeToBytes(tx)
if err != nil {
t.Fatalf("failed to encode tx: %v", err)
}
sc := tx.BlobTxSidecar()
ptx := &BlobTxForPool{
Tx: tx.WithoutBlobTxSidecar(),
Version: sc.Version,
Commitments: sc.Commitments,
Proofs: sc.Proofs,
Blobs: sc.Blobs,
}
storedRLP, err := rlp.EncodeToBytes(ptx)
if err != nil {
t.Fatalf("failed to encode BlobTxForPool: %v", err)
}
gotRLP, err := EncodeForNetwork(storedRLP)
if err != nil {
t.Fatalf("EncodeForNetwork failed: %v", err)
}
if !bytes.Equal(gotRLP, wantRLP) {
t.Fatalf("network encoding mismatch (version %d): got %d bytes, want %d bytes", version, len(gotRLP), len(wantRLP))
}
}
func createEmptyBlobTxInner(withSidecar bool) *BlobTx { func createEmptyBlobTxInner(withSidecar bool) *BlobTx {
sidecar := NewBlobTxSidecar(BlobSidecarVersion0, []kzg4844.Blob{*emptyBlob}, []kzg4844.Commitment{emptyBlobCommit}, []kzg4844.Proof{emptyBlobProof}) sidecar := NewBlobTxSidecar(BlobSidecarVersion0, []kzg4844.Blob{*emptyBlob}, []kzg4844.Commitment{emptyBlobCommit}, []kzg4844.Proof{emptyBlobProof})
blobtx := &BlobTx{ blobtx := &BlobTx{