diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index f2e0d5f9d2..f8021e00c4 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -116,6 +116,8 @@ const ( announceThreshold = -1 ) +var errLegacyTx = errors.New("legacy transaction format") + // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and // schedule the blob transactions into the following blocks. Only ever add the // bare minimum needed fields to keep the size down (and thus number of entries @@ -147,28 +149,137 @@ type blobTxMeta struct { evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces } -// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction -// and assembles a helper struct to track in memory. -// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). -func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { - if tx.BlobTxSidecar() == nil { - // This should never happen, as the pool only admits blob transactions with a sidecar +// blobTxForPool is the storage representation of a blob transaction in the +// blobpool. +type blobTxForPool struct { + Tx *types.Transaction // tx without sidecar + Version byte + Commitments []kzg4844.Commitment + Proofs []kzg4844.Proof + Blobs []kzg4844.Blob +} + +// Sidecar returns BlobTxSidecar of ptx. +func (ptx *blobTxForPool) Sidecar() *types.BlobTxSidecar { + return types.NewBlobTxSidecar(ptx.Version, ptx.Blobs, ptx.Commitments, ptx.Proofs) +} + +// ApplySidecar copies the sidecar's fields into the flat fields. +func (ptx *blobTxForPool) ApplySidecar(sc *types.BlobTxSidecar) { + ptx.Version = sc.Version + ptx.Commitments = sc.Commitments + ptx.Proofs = sc.Proofs + ptx.Blobs = sc.Blobs +} + +// TxSize returns the transaction size on the network without +// reconstructing the transaction. +func (ptx *blobTxForPool) TxSize() uint64 { + var blobs, commitments, proofs uint64 + for i := range ptx.Blobs { + blobs += rlp.BytesSize(ptx.Blobs[i][:]) + } + for i := range ptx.Commitments { + commitments += rlp.BytesSize(ptx.Commitments[i][:]) + } + for i := range ptx.Proofs { + proofs += rlp.BytesSize(ptx.Proofs[i][:]) + } + return ptx.Tx.Size() + rlp.ListSize(rlp.ListSize(blobs)+rlp.ListSize(commitments)+rlp.ListSize(proofs)) +} + +// ToTx reconstructs a full Transaction with the sidecar attached. +func (ptx *blobTxForPool) ToTx() *types.Transaction { + return ptx.Tx.WithBlobTxSidecar(ptx.Sidecar()) +} + +// newBlobTxForPool decomposes a blob transaction into blobTxForPool type. +func newBlobTxForPool(tx *types.Transaction) *blobTxForPool { + sc := tx.BlobTxSidecar() + if sc == nil { panic("missing blob tx sidecar") } + return &blobTxForPool{ + Tx: tx.WithoutBlobTxSidecar(), + Version: sc.Version, + Commitments: sc.Commitments, + Proofs: sc.Proofs, + Blobs: sc.Blobs, + } +} + +// encodeForNetwork transforms stored blobTxForPool RLP into the standard +// network transaction encoding. This is used for getRLP. +// +// Stored RLP: [type_byte || tx_fields, version, [comms], [proofs], [blobs]] +// V0: type_byte || rlp([tx_fields, [blobs], [comms], [proofs]]) +// V1: type_byte || rlp([tx_fields, version, [blobs], [comms], [proofs]]) +func encodeForNetwork(storedRLP []byte) ([]byte, error) { + elems, err := rlp.SplitListValues(storedRLP) + if err != nil { + return nil, fmt.Errorf("invalid blobTxForPool RLP: %w", err) + } + if len(elems) < 5 { + return nil, fmt.Errorf("blobTxForPool has %d elements, need at least 5", len(elems)) + } + + // 1. Extract tx byte and other tx fields + txBytes, _, err := rlp.SplitString(elems[0]) + if err != nil { + return nil, fmt.Errorf("invalid tx bytes: %w", err) + } + if len(txBytes) < 2 { + return nil, errors.New("tx bytes too short") + } + typeByte := txBytes[0] + txRLP := txBytes[1:] + + // 2. Find the version of sidecar. + version, _, err := rlp.SplitUint64(elems[1]) + if err != nil || version > 255 { + return nil, fmt.Errorf("invalid version: %w", err) + } + versionByte := byte(version) + // 3. Extract sidecar elements. + commitmentsRLP := elems[2] + proofsRLP := elems[3] + blobsRLP := elems[4] + + // 4. Reconstruct into the network format. + var outer [][]byte + if versionByte == types.BlobSidecarVersion0 { + outer = [][]byte{txRLP, blobsRLP, commitmentsRLP, proofsRLP} + } else { + outer = [][]byte{txRLP, elems[1], blobsRLP, commitmentsRLP, proofsRLP} + } + body, err := rlp.MergeListValues(outer) + if err != nil { + return nil, err + } + // Prepend type byte and wrap as an RLP string. + inner := make([]byte, 1+len(body)) + inner[0] = typeByte + copy(inner[1:], body) + return rlp.EncodeToBytes(inner) +} + +// newBlobTxMeta retrieves the indexed metadata fields from a pooled blob +// transaction and assembles a helper struct to track in memory. +func newBlobTxMeta(id uint64, size uint64, storageSize uint32, ptx *blobTxForPool) *blobTxMeta { meta := &blobTxMeta{ - hash: tx.Hash(), - vhashes: tx.BlobHashes(), - version: tx.BlobTxSidecar().Version, + hash: ptx.Tx.Hash(), + vhashes: ptx.Tx.BlobHashes(), + version: ptx.Version, id: id, storageSize: storageSize, size: size, - nonce: tx.Nonce(), - costCap: uint256.MustFromBig(tx.Cost()), - execTipCap: uint256.MustFromBig(tx.GasTipCap()), - execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), - execGas: tx.Gas(), - blobGas: tx.BlobGas(), + nonce: ptx.Tx.Nonce(), + costCap: uint256.MustFromBig(ptx.Tx.Cost()), + execTipCap: uint256.MustFromBig(ptx.Tx.GasTipCap()), + execFeeCap: uint256.MustFromBig(ptx.Tx.GasFeeCap()), + blobFeeCap: uint256.MustFromBig(ptx.Tx.BlobGasFeeCap()), + execGas: ptx.Tx.Gas(), + blobGas: ptx.Tx.BlobGas(), } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap) @@ -460,10 +571,17 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser return err } // Index all transactions on disk and delete anything unprocessable - var fails []uint64 + var ( + toDelete []uint64 + convertTxs []uint64 + ) index := func(id uint64, size uint32, blob []byte) { - if p.parseTransaction(id, size, blob) != nil { - fails = append(fails, id) + err := p.parseTransaction(id, size, blob) + if err != nil { + toDelete = append(toDelete, id) + } + if errors.Is(err, errLegacyTx) { + convertTxs = append(convertTxs, id) } } store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index) @@ -472,17 +590,58 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser } p.store = store - if len(fails) > 0 { - log.Warn("Dropping invalidated blob transactions", "ids", fails) - dropInvalidMeter.Mark(int64(len(fails))) + // Migrate legacy transactions (types.Transaction) to pooledBlobTx format. + if len(convertTxs) > 0 { + for _, id := range convertTxs { + var tx types.Transaction + data, err := p.store.Get(id) + if err != nil { + continue + } + err = rlp.DecodeBytes(data, &tx) + if err != nil { + continue + } + if tx.BlobTxSidecar() == nil { + continue + } + ptx := newBlobTxForPool(&tx) + blob, err := rlp.EncodeToBytes(ptx) + if err != nil { + continue + } + id, err := p.store.Put(blob) + if err != nil { + continue + } + meta := newBlobTxMeta(id, ptx.TxSize(), p.store.Size(id), ptx) - for _, id := range fails { + // If the newly inserted transaction fails to be tracked, + // it should also be removed with those in `toDelete` + sender, err := types.Sender(p.signer, ptx.Tx) + if err != nil { + toDelete = append(toDelete, id) + continue + } + if err := p.trackTransaction(meta, sender); err != nil { + toDelete = append(toDelete, id) + continue + } + } + } + + if len(toDelete) > 0 { + log.Warn("Dropping invalidated blob transactions", "ids", toDelete) + dropInvalidMeter.Mark(int64(len(toDelete))) + + for _, id := range toDelete { if err := p.store.Delete(id); err != nil { p.Close() return err } } } + // Sort the indexed transactions by nonce and delete anything gapped, create // the eviction heap of anyone still standing for addr := range p.index { @@ -558,36 +717,33 @@ func (p *BlobPool) Close() error { // parseTransaction is a callback method on pool creation that gets called for // each transaction on disk to create the in-memory metadata index. -// Announced state is not initialized here, it needs to be iniitalized seprately. +// Return value `bool` is set to true when the entry has old Transaction type. func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { - tx := new(types.Transaction) - if err := rlp.DecodeBytes(blob, tx); err != nil { - // This path is impossible unless the disk data representation changes - // across restarts. For that ever improbable case, recover gracefully - // by ignoring this data entry. - log.Error("Failed to decode blob pool entry", "id", id, "err", err) + var ptx blobTxForPool + if err := rlp.DecodeBytes(blob, &ptx); err != nil { + kind, content, _, splitErr := rlp.Split(blob) + // check whether it is legacy tx type + if splitErr == nil && kind == rlp.String && len(content) > 1 && content[0] == 3 { + return errLegacyTx + } return err } - if tx.BlobTxSidecar() == nil { - log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash()) - return errors.New("missing blob sidecar") + meta := newBlobTxMeta(id, ptx.TxSize(), size, &ptx) + sender, err := types.Sender(p.signer, ptx.Tx) + if err != nil { + return err } + return p.trackTransaction(meta, sender) +} - meta := newBlobTxMeta(id, tx.Size(), size, tx) +// trackTransaction registers a transaction's metadata in the pool's indices. +func (p *BlobPool) trackTransaction(meta *blobTxMeta, sender common.Address) error { if p.lookup.exists(meta.hash) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get // partially resurrected. - log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash()) - return errors.New("duplicate blob entry") - } - sender, err := types.Sender(p.signer, tx) - if err != nil { - // This path is impossible unless the signature validity changes across - // restarts. For that ever improbable case, recover gracefully by ignoring - // this data entry. - log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err) - return err + log.Error("Rejecting duplicate blob pool entry", "id", meta.id, "hash", meta.hash) + return fmt.Errorf("duplicate blob entry %d, %s", meta.id, meta.hash) } if _, ok := p.index[sender]; !ok { if err := p.reserver.Hold(sender); err != nil { @@ -863,17 +1019,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) return } - var tx types.Transaction - if err = rlp.DecodeBytes(data, &tx); err != nil { + var ptx blobTxForPool + if err := rlp.DecodeBytes(data, &ptx); err != nil { log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) return } - block, ok := inclusions[tx.Hash()] + block, ok := inclusions[ptx.Tx.Hash()] if !ok { log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id) return } - if err := p.limbo.push(&tx, block); err != nil { + if err := p.limbo.push(&ptx, block); err != nil { log.Warn("Failed to offload blob tx into limbo", "err", err) return } @@ -1108,7 +1264,7 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]* func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Retrieve the associated blob from the limbo. Without the blobs, we cannot // add the transaction back into the pool as it is not mineable. - tx, err := p.limbo.pull(txhash) + ptx, err := p.limbo.pull(txhash) if err != nil { log.Error("Blobs unavailable, dropping reorged tx", "err", err) return err @@ -1124,30 +1280,29 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // could theoretically halt a Geth node for ~1.2s by reorging per block. However, // this attack is financially inefficient to execute. head := p.head.Load() - if p.chain.Config().IsOsaka(head.Number, head.Time) && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 { - if err := tx.BlobTxSidecar().ToV1(); err != nil { + if p.chain.Config().IsOsaka(head.Number, head.Time) && ptx.Version == types.BlobSidecarVersion0 { + sc := ptx.Sidecar() + if err := sc.ToV1(); err != nil { log.Error("Failed to convert the legacy sidecar", "err", err) return err } - log.Info("Legacy blob transaction is reorged", "hash", tx.Hash()) + ptx.ApplySidecar(sc) + log.Info("Legacy blob transaction is reorged", "hash", ptx.Tx.Hash()) } - // Serialize the transaction back into the primary datastore. - blob, err := rlp.EncodeToBytes(tx) + blob, err := rlp.EncodeToBytes(ptx) if err != nil { - log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) + log.Error("Failed to encode transaction for storage", "hash", ptx.Tx.Hash(), "err", err) return err } id, err := p.store.Put(blob) if err != nil { - log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) + log.Error("Failed to write transaction into storage", "hash", ptx.Tx.Hash(), "err", err) return err } - - // Update the indices and metrics - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + meta := newBlobTxMeta(id, ptx.TxSize(), p.store.Size(id), ptx) if _, ok := p.index[addr]; !ok { if err := p.reserver.Hold(addr); err != nil { - log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) + log.Warn("Failed to reserve account for blob pool", "tx", ptx.Tx.Hash(), "from", addr, "err", err) return err } p.index[addr] = []*blobTxMeta{meta} @@ -1404,20 +1559,29 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { if len(data) == 0 { return nil } - item := new(types.Transaction) - if err := rlp.DecodeBytes(data, item); err != nil { + var ptx blobTxForPool + if err := rlp.DecodeBytes(data, &ptx); err != nil { id, _ := p.lookup.storeidOfTx(hash) log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err) return nil } - return item + return ptx.ToTx() } -// GetRLP returns a RLP-encoded transaction if it is contained in the pool. +// GetRLP returns a RLP-encoded transaction for network if it is contained in the pool. +// It converts the pool's internal type to the RLP format used by the eth protocol: +// e.g. type_byte || [..., version, [blobs], [comms], [proofs]] func (p *BlobPool) GetRLP(hash common.Hash) []byte { - return p.getRLP(hash) + data := p.getRLP(hash) + rlp, err := encodeForNetwork(data) + if err != nil { + log.Error("Failed to encode pooled tx into the network type", "hash", hash, "err", err) + return nil + } + + return rlp } // GetMetadata returns the transaction type and transaction size with the @@ -1486,18 +1650,14 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blo } // Decode the blob transaction - tx := new(types.Transaction) - if err := rlp.DecodeBytes(data, tx); err != nil { + var ptx blobTxForPool + if err := rlp.DecodeBytes(data, &ptx); err != nil { log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err) continue } - sidecar := tx.BlobTxSidecar() - if sidecar == nil { - log.Error("Blob tx without sidecar", "hash", tx.Hash(), "id", txID) - continue - } + sidecar := ptx.Sidecar() // Traverse the blobs in the transaction - for i, hash := range tx.BlobHashes() { + for i, hash := range ptx.Tx.BlobHashes() { list, ok := indices[hash] if !ok { continue // non-interesting blob @@ -1644,7 +1804,8 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error } // Transaction permitted into the pool from a nonce and cost perspective, // insert it into the database and update the indices - blob, err := rlp.EncodeToBytes(tx) + ptx := newBlobTxForPool(tx) + blob, err := rlp.EncodeToBytes(ptx) if err != nil { log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) return err @@ -1653,7 +1814,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error if err != nil { return err } - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), ptx) var ( next = p.state.GetNonce(from) diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 7c57755401..8032e21e8a 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -235,6 +235,12 @@ func makeTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx) } +// encodeForPool encodes a blob transaction in the blobTxForPool storage format. +func encodeForPool(tx *types.Transaction) []byte { + blob, _ := rlp.EncodeToBytes(newBlobTxForPool(tx)) + return blob +} + // makeMultiBlobTx is a utility method to construct a ramdom blob tx with // certain number of blobs in its sidecar. func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction { @@ -530,7 +536,7 @@ func TestOpenDrops(t *testing.T) { ) for _, nonce := range []uint64{0, 1, 3, 4, 6, 7} { // first gap at #2, another at #5 tx := makeTx(nonce, 1, 1, 1, gapper) - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) if nonce < 2 { @@ -547,7 +553,7 @@ func TestOpenDrops(t *testing.T) { ) for _, nonce := range []uint64{1, 2, 3} { // first gap at #0, all set dangling tx := makeTx(nonce, 1, 1, 1, dangler) - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) dangling[id] = struct{}{} @@ -560,7 +566,7 @@ func TestOpenDrops(t *testing.T) { ) for _, nonce := range []uint64{0, 1, 2} { // account nonce at 3, all set filled tx := makeTx(nonce, 1, 1, 1, filler) - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) filled[id] = struct{}{} @@ -573,7 +579,7 @@ func TestOpenDrops(t *testing.T) { ) for _, nonce := range []uint64{0, 1, 2, 3} { // account nonce at 2, half filled tx := makeTx(nonce, 1, 1, 1, overlapper) - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) if nonce >= 2 { @@ -595,7 +601,7 @@ func TestOpenDrops(t *testing.T) { } else { tx = makeTx(uint64(i), 1, 1, 1, underpayer) } - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) underpaid[id] = struct{}{} @@ -614,7 +620,7 @@ func TestOpenDrops(t *testing.T) { } else { tx = makeTx(uint64(i), 1, 1, 1, outpricer) } - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) if i < 2 { @@ -636,7 +642,7 @@ func TestOpenDrops(t *testing.T) { } else { tx = makeTx(nonce, 1, 1, 1, exceeder) } - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) exceeded[id] = struct{}{} @@ -654,7 +660,7 @@ func TestOpenDrops(t *testing.T) { } else { tx = makeTx(nonce, 1, 1, 1, overdrafter) } - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) id, _ := store.Put(blob) if nonce < 1 { @@ -670,7 +676,7 @@ func TestOpenDrops(t *testing.T) { overcapped = make(map[uint64]struct{}) ) for nonce := uint64(0); nonce < maxTxsPerAccount+3; nonce++ { - blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, overcapper)) + blob := encodeForPool(makeTx(nonce, 1, 1, 1, overcapper)) id, _ := store.Put(blob) if nonce < maxTxsPerAccount { @@ -686,7 +692,7 @@ func TestOpenDrops(t *testing.T) { duplicated = make(map[uint64]struct{}) ) for _, nonce := range []uint64{0, 1, 2} { - blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, duplicater)) + blob := encodeForPool(makeTx(nonce, 1, 1, 1, duplicater)) for i := 0; i < int(nonce)+1; i++ { id, _ := store.Put(blob) @@ -705,7 +711,7 @@ func TestOpenDrops(t *testing.T) { ) for _, nonce := range []uint64{0, 1, 2} { for i := 0; i < int(nonce)+1; i++ { - blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater)) + blob := encodeForPool(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater)) id, _ := store.Put(blob) if i == 0 { @@ -842,7 +848,7 @@ func TestOpenIndex(t *testing.T) { ) for _, i := range []int{5, 3, 4, 2, 0, 1} { // Randomize the tx insertion order to force sorting on load tx := makeTx(uint64(i), txExecTipCaps[i], txExecFeeCaps[i], txBlobFeeCaps[i], key) - blob, _ := rlp.EncodeToBytes(tx) + blob := encodeForPool(tx) store.Put(blob) } store.Close() @@ -934,9 +940,9 @@ func TestOpenHeap(t *testing.T) { tx2 = makeTx(0, 1, 800, 70, key2) tx3 = makeTx(0, 1, 1500, 110, key3) - blob1, _ = rlp.EncodeToBytes(tx1) - blob2, _ = rlp.EncodeToBytes(tx2) - blob3, _ = rlp.EncodeToBytes(tx3) + blob1 = encodeForPool(tx1) + blob2 = encodeForPool(tx2) + blob3 = encodeForPool(tx3) heapOrder = []common.Address{addr2, addr1, addr3} heapIndex = map[common.Address]int{addr2: 0, addr1: 1, addr3: 2} @@ -1009,9 +1015,9 @@ func TestOpenCap(t *testing.T) { tx2 = makeTx(0, 1, 800, 70, key2) tx3 = makeTx(0, 1, 1500, 110, key3) - blob1, _ = rlp.EncodeToBytes(tx1) - blob2, _ = rlp.EncodeToBytes(tx2) - blob3, _ = rlp.EncodeToBytes(tx3) + blob1 = encodeForPool(tx1) + blob2 = encodeForPool(tx2) + blob3 = encodeForPool(tx3) keep = []common.Address{addr1, addr3} drop = []common.Address{addr2} @@ -1098,8 +1104,8 @@ func TestChangingSlotterSize(t *testing.T) { tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0) tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0) - blob1, _ = rlp.EncodeToBytes(tx1) - blob2, _ = rlp.EncodeToBytes(tx2) + blob1 = encodeForPool(tx1) + blob2 = encodeForPool(tx2) ) // Write the two safely sized txs to store. note: although the store is @@ -1201,8 +1207,8 @@ func TestBillyMigration(t *testing.T) { tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0) tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0) - blob1, _ = rlp.EncodeToBytes(tx1) - blob2, _ = rlp.EncodeToBytes(tx2) + blob1 = encodeForPool(tx1) + blob2 = encodeForPool(tx2) ) // Write the two safely sized txs to store. note: although the store is @@ -1281,6 +1287,85 @@ func TestBillyMigration(t *testing.T) { } } +// TestLegacyTxConversion verifies that on Init, transactions stored in the +// legacy *types.Transaction RLP format are detected and migrated into the new +// blobTxForPool storage format, and that they remain retrievable via the pool +// API after the conversion. +func TestLegacyTxConversion(t *testing.T) { + storage := t.TempDir() + os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700) + os.MkdirAll(filepath.Join(storage, limboedTransactionStore), 0700) + + // Initialize the pending store with two blob transactions encoded in the + // legacy format. + queuedir := filepath.Join(storage, pendingTransactionStore) + store, err := billy.Open(billy.Options{Path: queuedir}, newSlotter(testMaxBlobsPerBlock), nil) + if err != nil { + t.Fatalf("failed to open billy: %v", err) + } + + key1, _ := crypto.GenerateKey() + key2, _ := crypto.GenerateKey() + addr1 := crypto.PubkeyToAddress(key1.PublicKey) + addr2 := crypto.PubkeyToAddress(key2.PublicKey) + + tx1 := makeMultiBlobTx(0, 1, 1000, 100, 2, 0, key1, types.BlobSidecarVersion0) + tx2 := makeMultiBlobTx(0, 1, 1000, 100, 2, 2, key2, types.BlobSidecarVersion0) + + for _, tx := range []*types.Transaction{tx1, tx2} { + legacy, err := rlp.EncodeToBytes(tx) + if err != nil { + t.Fatalf("failed to legacy-encode tx: %v", err) + } + if _, err := store.Put(legacy); err != nil { + t.Fatalf("failed to put legacy blob: %v", err) + } + } + store.Close() + + // Init should migrate the legacy entries into the new storage format. + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + statedb.Commit(0, true, false) + + chain := &testBlockChain{ + config: params.MainnetChainConfig, + basefee: uint256.NewInt(params.InitialBaseFee), + blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice), + statedb: statedb, + } + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { + t.Fatalf("failed to create blob pool: %v", err) + } + defer pool.Close() + + // Both transactions should be retrievable. + for _, want := range []*types.Transaction{tx1, tx2} { + got := pool.Get(want.Hash()) + if got == nil { + t.Fatalf("migrated tx %s not found in pool", want.Hash()) + } + if got.BlobTxSidecar() == nil { + t.Fatalf("migrated tx %s lost its sidecar", want.Hash()) + } + if got.Hash() != want.Hash() { + t.Fatalf("migrated tx hash mismatch: have %s, want %s", got.Hash(), want.Hash()) + } + } + + // Legacy formats should not exist on pool.store + pool.store.Iterate(func(id uint64, size uint32, blob []byte) { + var ptx blobTxForPool + if err := rlp.DecodeBytes(blob, &ptx); err != nil { + t.Errorf("entry %d not in new blobTxForPool format: %v", id, err) + } + }) + + verifyPoolInternals(t, pool) +} + // TestBlobCountLimit tests the blobpool enforced limits on the max blob count. func TestBlobCountLimit(t *testing.T) { var ( @@ -1746,7 +1831,7 @@ func TestAdd(t *testing.T) { // Sign the seed transactions and store them in the data store for _, tx := range seed.txs { signed := types.MustSignNewTx(keys[acc], types.LatestSigner(params.MainnetChainConfig), tx) - blob, _ := rlp.EncodeToBytes(signed) + blob := encodeForPool(signed) store.Put(blob) } } @@ -1853,9 +1938,9 @@ func TestGetBlobs(t *testing.T) { tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion1) // [6, 12) tx3 = makeMultiBlobTx(0, 1, 800, 110, 6, 12, key3, types.BlobSidecarVersion0) // [12, 18) - blob1, _ = rlp.EncodeToBytes(tx1) - blob2, _ = rlp.EncodeToBytes(tx2) - blob3, _ = rlp.EncodeToBytes(tx3) + blob1 = encodeForPool(tx1) + blob2 = encodeForPool(tx2) + blob3 = encodeForPool(tx3) ) // Write the two safely sized txs to store. note: although the store is @@ -2055,6 +2140,32 @@ func TestGetBlobs(t *testing.T) { pool.Close() } +// TestEncodeForNetwork verifies that encodeForNetwork produces output identical +// to rlp.EncodeToBytes on the original transaction, for both V0 and V1 sidecars. +func TestEncodeForNetwork(t *testing.T) { + t.Run("v0", func(t *testing.T) { testEncodeForNetwork(t, types.BlobSidecarVersion0) }) + t.Run("v1", func(t *testing.T) { testEncodeForNetwork(t, types.BlobSidecarVersion1) }) +} + +func testEncodeForNetwork(t *testing.T, version byte) { + key, _ := crypto.GenerateKey() + tx := makeMultiBlobTx(0, 1, 1, 1, 1, 0, key, version) + + wantRLP, err := rlp.EncodeToBytes(tx) + if err != nil { + t.Fatalf("failed to encode tx: %v", err) + } + storedRLP := encodeForPool(tx) + + gotRLP, err := encodeForNetwork(storedRLP) + if err != nil { + t.Fatalf("encodeForNetwork failed: %v", err) + } + if !bytes.Equal(gotRLP, wantRLP) { + t.Fatalf("network encoding mismatch (version %d): got %d bytes, want %d bytes", version, len(gotRLP), len(wantRLP)) + } +} + // fakeBilly is a billy.Database implementation which just drops data on the floor. type fakeBilly struct { billy.Database diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 36284d6a03..b8bee2f22a 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -33,7 +33,7 @@ import ( type limboBlob struct { TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs Block uint64 // Block in which the blob transaction was included - Tx *types.Transaction + Ptx *blobTxForPool } // limbo is a light, indexed database to temporarily store recently included @@ -146,15 +146,14 @@ func (l *limbo) finalize(final *types.Header) { // push stores a new blob transaction into the limbo, waiting until finality for // it to be automatically evicted. -func (l *limbo) push(tx *types.Transaction, block uint64) error { - // If the blobs are already tracked by the limbo, consider it a programming - // error. There's not much to do against it, but be loud. - if _, ok := l.index[tx.Hash()]; ok { - log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash()) +func (l *limbo) push(ptx *blobTxForPool, block uint64) error { + hash := ptx.Tx.Hash() + if _, ok := l.index[hash]; ok { + log.Error("Limbo cannot push already tracked blobs", "tx", hash) return errors.New("already tracked blob transaction") } - if err := l.setAndIndex(tx, block); err != nil { - log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err) + if err := l.setAndIndex(ptx, block); err != nil { + log.Error("Failed to set and index limboed blobs", "tx", hash, "err", err) return err } return nil @@ -163,7 +162,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error { // pull retrieves a previously pushed set of blobs back from the limbo, removing // it at the same time. This method should be used when a previously included blob // transaction gets reorged out. -func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) { +func (l *limbo) pull(tx common.Hash) (*blobTxForPool, error) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. @@ -177,7 +176,7 @@ func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) { log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err) return nil, err } - return item.Tx, nil + return item.Ptx, nil } // update changes the block number under which a blob transaction is tracked. This @@ -209,7 +208,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) { log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err) return } - if err := l.setAndIndex(item.Tx, block); err != nil { + if err := l.setAndIndex(item.Ptx, block); err != nil { log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err) return } @@ -240,12 +239,12 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) { // setAndIndex assembles a limbo blob database entry and stores it, also updating // the in-memory indices. -func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error { - txhash := tx.Hash() +func (l *limbo) setAndIndex(ptx *blobTxForPool, block uint64) error { + txhash := ptx.Tx.Hash() item := &limboBlob{ TxHash: txhash, Block: block, - Tx: tx, + Ptx: ptx, } data, err := rlp.EncodeToBytes(item) if err != nil {