mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-17 13:36:37 +00:00
core/txpool/blobpool: add blobTxForPool type (#34882)
This PR introduces a separate transaction pool type for sparse blobpool. In sparse blobpool, PooledTransactions message delivers transactions without blobs, partial or full cells are downloaded by Cells message. Blobpool no longer stores transactions with complete sidecars, and it stores transactions without blobs, along with the corresponding cells. Because of this, a dedicated type distinct from types.Transaction is required. This PR introduces a type called `BlobTxForPool` and stores each sidecar field independently, in order to bypass the assumption that a sidecar always exists as a complete unit. Reintroducing the conversion queue was considered, but was ultimately omitted because type conversion should be sufficiently fast. With sparse blobpool, blob -> cell computation would take about ~13ms per blob. Not sure whether this is fast enough, but otherwise we can add the conversion queue later on the sparse blobpool branch.
This commit is contained in:
parent
ab28bda83e
commit
726d657a4a
3 changed files with 386 additions and 115 deletions
|
|
@ -116,6 +116,8 @@ const (
|
|||
announceThreshold = -1
|
||||
)
|
||||
|
||||
var errLegacyTx = errors.New("legacy transaction format")
|
||||
|
||||
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
|
||||
// schedule the blob transactions into the following blocks. Only ever add the
|
||||
// bare minimum needed fields to keep the size down (and thus number of entries
|
||||
|
|
@ -147,28 +149,137 @@ type blobTxMeta struct {
|
|||
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
|
||||
}
|
||||
|
||||
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
|
||||
// and assembles a helper struct to track in memory.
|
||||
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
|
||||
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta {
|
||||
if tx.BlobTxSidecar() == nil {
|
||||
// This should never happen, as the pool only admits blob transactions with a sidecar
|
||||
// blobTxForPool is the storage representation of a blob transaction in the
|
||||
// blobpool.
|
||||
type blobTxForPool struct {
|
||||
Tx *types.Transaction // tx without sidecar
|
||||
Version byte
|
||||
Commitments []kzg4844.Commitment
|
||||
Proofs []kzg4844.Proof
|
||||
Blobs []kzg4844.Blob
|
||||
}
|
||||
|
||||
// Sidecar returns BlobTxSidecar of ptx.
|
||||
func (ptx *blobTxForPool) Sidecar() *types.BlobTxSidecar {
|
||||
return types.NewBlobTxSidecar(ptx.Version, ptx.Blobs, ptx.Commitments, ptx.Proofs)
|
||||
}
|
||||
|
||||
// ApplySidecar copies the sidecar's fields into the flat fields.
|
||||
func (ptx *blobTxForPool) ApplySidecar(sc *types.BlobTxSidecar) {
|
||||
ptx.Version = sc.Version
|
||||
ptx.Commitments = sc.Commitments
|
||||
ptx.Proofs = sc.Proofs
|
||||
ptx.Blobs = sc.Blobs
|
||||
}
|
||||
|
||||
// TxSize returns the transaction size on the network without
|
||||
// reconstructing the transaction.
|
||||
func (ptx *blobTxForPool) TxSize() uint64 {
|
||||
var blobs, commitments, proofs uint64
|
||||
for i := range ptx.Blobs {
|
||||
blobs += rlp.BytesSize(ptx.Blobs[i][:])
|
||||
}
|
||||
for i := range ptx.Commitments {
|
||||
commitments += rlp.BytesSize(ptx.Commitments[i][:])
|
||||
}
|
||||
for i := range ptx.Proofs {
|
||||
proofs += rlp.BytesSize(ptx.Proofs[i][:])
|
||||
}
|
||||
return ptx.Tx.Size() + rlp.ListSize(rlp.ListSize(blobs)+rlp.ListSize(commitments)+rlp.ListSize(proofs))
|
||||
}
|
||||
|
||||
// ToTx reconstructs a full Transaction with the sidecar attached.
|
||||
func (ptx *blobTxForPool) ToTx() *types.Transaction {
|
||||
return ptx.Tx.WithBlobTxSidecar(ptx.Sidecar())
|
||||
}
|
||||
|
||||
// newBlobTxForPool decomposes a blob transaction into blobTxForPool type.
|
||||
func newBlobTxForPool(tx *types.Transaction) *blobTxForPool {
|
||||
sc := tx.BlobTxSidecar()
|
||||
if sc == nil {
|
||||
panic("missing blob tx sidecar")
|
||||
}
|
||||
return &blobTxForPool{
|
||||
Tx: tx.WithoutBlobTxSidecar(),
|
||||
Version: sc.Version,
|
||||
Commitments: sc.Commitments,
|
||||
Proofs: sc.Proofs,
|
||||
Blobs: sc.Blobs,
|
||||
}
|
||||
}
|
||||
|
||||
// encodeForNetwork transforms stored blobTxForPool RLP into the standard
|
||||
// network transaction encoding. This is used for getRLP.
|
||||
//
|
||||
// Stored RLP: [type_byte || tx_fields, version, [comms], [proofs], [blobs]]
|
||||
// V0: type_byte || rlp([tx_fields, [blobs], [comms], [proofs]])
|
||||
// V1: type_byte || rlp([tx_fields, version, [blobs], [comms], [proofs]])
|
||||
func encodeForNetwork(storedRLP []byte) ([]byte, error) {
|
||||
elems, err := rlp.SplitListValues(storedRLP)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid blobTxForPool RLP: %w", err)
|
||||
}
|
||||
if len(elems) < 5 {
|
||||
return nil, fmt.Errorf("blobTxForPool has %d elements, need at least 5", len(elems))
|
||||
}
|
||||
|
||||
// 1. Extract tx byte and other tx fields
|
||||
txBytes, _, err := rlp.SplitString(elems[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid tx bytes: %w", err)
|
||||
}
|
||||
if len(txBytes) < 2 {
|
||||
return nil, errors.New("tx bytes too short")
|
||||
}
|
||||
typeByte := txBytes[0]
|
||||
txRLP := txBytes[1:]
|
||||
|
||||
// 2. Find the version of sidecar.
|
||||
version, _, err := rlp.SplitUint64(elems[1])
|
||||
if err != nil || version > 255 {
|
||||
return nil, fmt.Errorf("invalid version: %w", err)
|
||||
}
|
||||
versionByte := byte(version)
|
||||
// 3. Extract sidecar elements.
|
||||
commitmentsRLP := elems[2]
|
||||
proofsRLP := elems[3]
|
||||
blobsRLP := elems[4]
|
||||
|
||||
// 4. Reconstruct into the network format.
|
||||
var outer [][]byte
|
||||
if versionByte == types.BlobSidecarVersion0 {
|
||||
outer = [][]byte{txRLP, blobsRLP, commitmentsRLP, proofsRLP}
|
||||
} else {
|
||||
outer = [][]byte{txRLP, elems[1], blobsRLP, commitmentsRLP, proofsRLP}
|
||||
}
|
||||
body, err := rlp.MergeListValues(outer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Prepend type byte and wrap as an RLP string.
|
||||
inner := make([]byte, 1+len(body))
|
||||
inner[0] = typeByte
|
||||
copy(inner[1:], body)
|
||||
return rlp.EncodeToBytes(inner)
|
||||
}
|
||||
|
||||
// newBlobTxMeta retrieves the indexed metadata fields from a pooled blob
|
||||
// transaction and assembles a helper struct to track in memory.
|
||||
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, ptx *blobTxForPool) *blobTxMeta {
|
||||
meta := &blobTxMeta{
|
||||
hash: tx.Hash(),
|
||||
vhashes: tx.BlobHashes(),
|
||||
version: tx.BlobTxSidecar().Version,
|
||||
hash: ptx.Tx.Hash(),
|
||||
vhashes: ptx.Tx.BlobHashes(),
|
||||
version: ptx.Version,
|
||||
id: id,
|
||||
storageSize: storageSize,
|
||||
size: size,
|
||||
nonce: tx.Nonce(),
|
||||
costCap: uint256.MustFromBig(tx.Cost()),
|
||||
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
|
||||
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
|
||||
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
|
||||
execGas: tx.Gas(),
|
||||
blobGas: tx.BlobGas(),
|
||||
nonce: ptx.Tx.Nonce(),
|
||||
costCap: uint256.MustFromBig(ptx.Tx.Cost()),
|
||||
execTipCap: uint256.MustFromBig(ptx.Tx.GasTipCap()),
|
||||
execFeeCap: uint256.MustFromBig(ptx.Tx.GasFeeCap()),
|
||||
blobFeeCap: uint256.MustFromBig(ptx.Tx.BlobGasFeeCap()),
|
||||
execGas: ptx.Tx.Gas(),
|
||||
blobGas: ptx.Tx.BlobGas(),
|
||||
}
|
||||
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
|
||||
meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap)
|
||||
|
|
@ -460,10 +571,17 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
|||
return err
|
||||
}
|
||||
// Index all transactions on disk and delete anything unprocessable
|
||||
var fails []uint64
|
||||
var (
|
||||
toDelete []uint64
|
||||
convertTxs []uint64
|
||||
)
|
||||
index := func(id uint64, size uint32, blob []byte) {
|
||||
if p.parseTransaction(id, size, blob) != nil {
|
||||
fails = append(fails, id)
|
||||
err := p.parseTransaction(id, size, blob)
|
||||
if err != nil {
|
||||
toDelete = append(toDelete, id)
|
||||
}
|
||||
if errors.Is(err, errLegacyTx) {
|
||||
convertTxs = append(convertTxs, id)
|
||||
}
|
||||
}
|
||||
store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index)
|
||||
|
|
@ -472,17 +590,58 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
|
|||
}
|
||||
p.store = store
|
||||
|
||||
if len(fails) > 0 {
|
||||
log.Warn("Dropping invalidated blob transactions", "ids", fails)
|
||||
dropInvalidMeter.Mark(int64(len(fails)))
|
||||
// Migrate legacy transactions (types.Transaction) to pooledBlobTx format.
|
||||
if len(convertTxs) > 0 {
|
||||
for _, id := range convertTxs {
|
||||
var tx types.Transaction
|
||||
data, err := p.store.Get(id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
err = rlp.DecodeBytes(data, &tx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if tx.BlobTxSidecar() == nil {
|
||||
continue
|
||||
}
|
||||
ptx := newBlobTxForPool(&tx)
|
||||
blob, err := rlp.EncodeToBytes(ptx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
id, err := p.store.Put(blob)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
meta := newBlobTxMeta(id, ptx.TxSize(), p.store.Size(id), ptx)
|
||||
|
||||
for _, id := range fails {
|
||||
// If the newly inserted transaction fails to be tracked,
|
||||
// it should also be removed with those in `toDelete`
|
||||
sender, err := types.Sender(p.signer, ptx.Tx)
|
||||
if err != nil {
|
||||
toDelete = append(toDelete, id)
|
||||
continue
|
||||
}
|
||||
if err := p.trackTransaction(meta, sender); err != nil {
|
||||
toDelete = append(toDelete, id)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(toDelete) > 0 {
|
||||
log.Warn("Dropping invalidated blob transactions", "ids", toDelete)
|
||||
dropInvalidMeter.Mark(int64(len(toDelete)))
|
||||
|
||||
for _, id := range toDelete {
|
||||
if err := p.store.Delete(id); err != nil {
|
||||
p.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the indexed transactions by nonce and delete anything gapped, create
|
||||
// the eviction heap of anyone still standing
|
||||
for addr := range p.index {
|
||||
|
|
@ -558,36 +717,33 @@ func (p *BlobPool) Close() error {
|
|||
|
||||
// parseTransaction is a callback method on pool creation that gets called for
|
||||
// each transaction on disk to create the in-memory metadata index.
|
||||
// Announced state is not initialized here, it needs to be iniitalized seprately.
|
||||
// Return value `bool` is set to true when the entry has old Transaction type.
|
||||
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
|
||||
tx := new(types.Transaction)
|
||||
if err := rlp.DecodeBytes(blob, tx); err != nil {
|
||||
// This path is impossible unless the disk data representation changes
|
||||
// across restarts. For that ever improbable case, recover gracefully
|
||||
// by ignoring this data entry.
|
||||
log.Error("Failed to decode blob pool entry", "id", id, "err", err)
|
||||
var ptx blobTxForPool
|
||||
if err := rlp.DecodeBytes(blob, &ptx); err != nil {
|
||||
kind, content, _, splitErr := rlp.Split(blob)
|
||||
// check whether it is legacy tx type
|
||||
if splitErr == nil && kind == rlp.String && len(content) > 1 && content[0] == 3 {
|
||||
return errLegacyTx
|
||||
}
|
||||
return err
|
||||
}
|
||||
if tx.BlobTxSidecar() == nil {
|
||||
log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash())
|
||||
return errors.New("missing blob sidecar")
|
||||
meta := newBlobTxMeta(id, ptx.TxSize(), size, &ptx)
|
||||
sender, err := types.Sender(p.signer, ptx.Tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.trackTransaction(meta, sender)
|
||||
}
|
||||
|
||||
meta := newBlobTxMeta(id, tx.Size(), size, tx)
|
||||
// trackTransaction registers a transaction's metadata in the pool's indices.
|
||||
func (p *BlobPool) trackTransaction(meta *blobTxMeta, sender common.Address) error {
|
||||
if p.lookup.exists(meta.hash) {
|
||||
// This path is only possible after a crash, where deleted items are not
|
||||
// removed via the normal shutdown-startup procedure and thus may get
|
||||
// partially resurrected.
|
||||
log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash())
|
||||
return errors.New("duplicate blob entry")
|
||||
}
|
||||
sender, err := types.Sender(p.signer, tx)
|
||||
if err != nil {
|
||||
// This path is impossible unless the signature validity changes across
|
||||
// restarts. For that ever improbable case, recover gracefully by ignoring
|
||||
// this data entry.
|
||||
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
|
||||
return err
|
||||
log.Error("Rejecting duplicate blob pool entry", "id", meta.id, "hash", meta.hash)
|
||||
return fmt.Errorf("duplicate blob entry %d, %s", meta.id, meta.hash)
|
||||
}
|
||||
if _, ok := p.index[sender]; !ok {
|
||||
if err := p.reserver.Hold(sender); err != nil {
|
||||
|
|
@ -863,17 +1019,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
|
|||
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
var tx types.Transaction
|
||||
if err = rlp.DecodeBytes(data, &tx); err != nil {
|
||||
var ptx blobTxForPool
|
||||
if err := rlp.DecodeBytes(data, &ptx); err != nil {
|
||||
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
block, ok := inclusions[tx.Hash()]
|
||||
block, ok := inclusions[ptx.Tx.Hash()]
|
||||
if !ok {
|
||||
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
|
||||
return
|
||||
}
|
||||
if err := p.limbo.push(&tx, block); err != nil {
|
||||
if err := p.limbo.push(&ptx, block); err != nil {
|
||||
log.Warn("Failed to offload blob tx into limbo", "err", err)
|
||||
return
|
||||
}
|
||||
|
|
@ -1108,7 +1264,7 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
|
|||
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
||||
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
|
||||
// add the transaction back into the pool as it is not mineable.
|
||||
tx, err := p.limbo.pull(txhash)
|
||||
ptx, err := p.limbo.pull(txhash)
|
||||
if err != nil {
|
||||
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
|
||||
return err
|
||||
|
|
@ -1124,30 +1280,29 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
|||
// could theoretically halt a Geth node for ~1.2s by reorging per block. However,
|
||||
// this attack is financially inefficient to execute.
|
||||
head := p.head.Load()
|
||||
if p.chain.Config().IsOsaka(head.Number, head.Time) && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
|
||||
if err := tx.BlobTxSidecar().ToV1(); err != nil {
|
||||
if p.chain.Config().IsOsaka(head.Number, head.Time) && ptx.Version == types.BlobSidecarVersion0 {
|
||||
sc := ptx.Sidecar()
|
||||
if err := sc.ToV1(); err != nil {
|
||||
log.Error("Failed to convert the legacy sidecar", "err", err)
|
||||
return err
|
||||
}
|
||||
log.Info("Legacy blob transaction is reorged", "hash", tx.Hash())
|
||||
ptx.ApplySidecar(sc)
|
||||
log.Info("Legacy blob transaction is reorged", "hash", ptx.Tx.Hash())
|
||||
}
|
||||
// Serialize the transaction back into the primary datastore.
|
||||
blob, err := rlp.EncodeToBytes(tx)
|
||||
blob, err := rlp.EncodeToBytes(ptx)
|
||||
if err != nil {
|
||||
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
||||
log.Error("Failed to encode transaction for storage", "hash", ptx.Tx.Hash(), "err", err)
|
||||
return err
|
||||
}
|
||||
id, err := p.store.Put(blob)
|
||||
if err != nil {
|
||||
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
|
||||
log.Error("Failed to write transaction into storage", "hash", ptx.Tx.Hash(), "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the indices and metrics
|
||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
||||
meta := newBlobTxMeta(id, ptx.TxSize(), p.store.Size(id), ptx)
|
||||
if _, ok := p.index[addr]; !ok {
|
||||
if err := p.reserver.Hold(addr); err != nil {
|
||||
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
|
||||
log.Warn("Failed to reserve account for blob pool", "tx", ptx.Tx.Hash(), "from", addr, "err", err)
|
||||
return err
|
||||
}
|
||||
p.index[addr] = []*blobTxMeta{meta}
|
||||
|
|
@ -1404,20 +1559,29 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
|
|||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
item := new(types.Transaction)
|
||||
if err := rlp.DecodeBytes(data, item); err != nil {
|
||||
var ptx blobTxForPool
|
||||
if err := rlp.DecodeBytes(data, &ptx); err != nil {
|
||||
id, _ := p.lookup.storeidOfTx(hash)
|
||||
|
||||
log.Error("Blobs corrupted for traced transaction",
|
||||
"hash", hash, "id", id, "err", err)
|
||||
return nil
|
||||
}
|
||||
return item
|
||||
return ptx.ToTx()
|
||||
}
|
||||
|
||||
// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
|
||||
// GetRLP returns a RLP-encoded transaction for network if it is contained in the pool.
|
||||
// It converts the pool's internal type to the RLP format used by the eth protocol:
|
||||
// e.g. type_byte || [..., version, [blobs], [comms], [proofs]]
|
||||
func (p *BlobPool) GetRLP(hash common.Hash) []byte {
|
||||
return p.getRLP(hash)
|
||||
data := p.getRLP(hash)
|
||||
rlp, err := encodeForNetwork(data)
|
||||
if err != nil {
|
||||
log.Error("Failed to encode pooled tx into the network type", "hash", hash, "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return rlp
|
||||
}
|
||||
|
||||
// GetMetadata returns the transaction type and transaction size with the
|
||||
|
|
@ -1486,18 +1650,14 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blo
|
|||
}
|
||||
|
||||
// Decode the blob transaction
|
||||
tx := new(types.Transaction)
|
||||
if err := rlp.DecodeBytes(data, tx); err != nil {
|
||||
var ptx blobTxForPool
|
||||
if err := rlp.DecodeBytes(data, &ptx); err != nil {
|
||||
log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err)
|
||||
continue
|
||||
}
|
||||
sidecar := tx.BlobTxSidecar()
|
||||
if sidecar == nil {
|
||||
log.Error("Blob tx without sidecar", "hash", tx.Hash(), "id", txID)
|
||||
continue
|
||||
}
|
||||
sidecar := ptx.Sidecar()
|
||||
// Traverse the blobs in the transaction
|
||||
for i, hash := range tx.BlobHashes() {
|
||||
for i, hash := range ptx.Tx.BlobHashes() {
|
||||
list, ok := indices[hash]
|
||||
if !ok {
|
||||
continue // non-interesting blob
|
||||
|
|
@ -1644,7 +1804,8 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
|
|||
}
|
||||
// Transaction permitted into the pool from a nonce and cost perspective,
|
||||
// insert it into the database and update the indices
|
||||
blob, err := rlp.EncodeToBytes(tx)
|
||||
ptx := newBlobTxForPool(tx)
|
||||
blob, err := rlp.EncodeToBytes(ptx)
|
||||
if err != nil {
|
||||
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
|
||||
return err
|
||||
|
|
@ -1653,7 +1814,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), ptx)
|
||||
|
||||
var (
|
||||
next = p.state.GetNonce(from)
|
||||
|
|
|
|||
|
|
@ -235,6 +235,12 @@ func makeTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64,
|
|||
return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
|
||||
}
|
||||
|
||||
// encodeForPool encodes a blob transaction in the blobTxForPool storage format.
|
||||
func encodeForPool(tx *types.Transaction) []byte {
|
||||
blob, _ := rlp.EncodeToBytes(newBlobTxForPool(tx))
|
||||
return blob
|
||||
}
|
||||
|
||||
// makeMultiBlobTx is a utility method to construct a ramdom blob tx with
|
||||
// certain number of blobs in its sidecar.
|
||||
func makeMultiBlobTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction {
|
||||
|
|
@ -530,7 +536,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
)
|
||||
for _, nonce := range []uint64{0, 1, 3, 4, 6, 7} { // first gap at #2, another at #5
|
||||
tx := makeTx(nonce, 1, 1, 1, gapper)
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
if nonce < 2 {
|
||||
|
|
@ -547,7 +553,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
)
|
||||
for _, nonce := range []uint64{1, 2, 3} { // first gap at #0, all set dangling
|
||||
tx := makeTx(nonce, 1, 1, 1, dangler)
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
dangling[id] = struct{}{}
|
||||
|
|
@ -560,7 +566,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
)
|
||||
for _, nonce := range []uint64{0, 1, 2} { // account nonce at 3, all set filled
|
||||
tx := makeTx(nonce, 1, 1, 1, filler)
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
filled[id] = struct{}{}
|
||||
|
|
@ -573,7 +579,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
)
|
||||
for _, nonce := range []uint64{0, 1, 2, 3} { // account nonce at 2, half filled
|
||||
tx := makeTx(nonce, 1, 1, 1, overlapper)
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
if nonce >= 2 {
|
||||
|
|
@ -595,7 +601,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
} else {
|
||||
tx = makeTx(uint64(i), 1, 1, 1, underpayer)
|
||||
}
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
underpaid[id] = struct{}{}
|
||||
|
|
@ -614,7 +620,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
} else {
|
||||
tx = makeTx(uint64(i), 1, 1, 1, outpricer)
|
||||
}
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
if i < 2 {
|
||||
|
|
@ -636,7 +642,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
} else {
|
||||
tx = makeTx(nonce, 1, 1, 1, exceeder)
|
||||
}
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
exceeded[id] = struct{}{}
|
||||
|
|
@ -654,7 +660,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
} else {
|
||||
tx = makeTx(nonce, 1, 1, 1, overdrafter)
|
||||
}
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
if nonce < 1 {
|
||||
|
|
@ -670,7 +676,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
overcapped = make(map[uint64]struct{})
|
||||
)
|
||||
for nonce := uint64(0); nonce < maxTxsPerAccount+3; nonce++ {
|
||||
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, overcapper))
|
||||
blob := encodeForPool(makeTx(nonce, 1, 1, 1, overcapper))
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
if nonce < maxTxsPerAccount {
|
||||
|
|
@ -686,7 +692,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
duplicated = make(map[uint64]struct{})
|
||||
)
|
||||
for _, nonce := range []uint64{0, 1, 2} {
|
||||
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, duplicater))
|
||||
blob := encodeForPool(makeTx(nonce, 1, 1, 1, duplicater))
|
||||
|
||||
for i := 0; i < int(nonce)+1; i++ {
|
||||
id, _ := store.Put(blob)
|
||||
|
|
@ -705,7 +711,7 @@ func TestOpenDrops(t *testing.T) {
|
|||
)
|
||||
for _, nonce := range []uint64{0, 1, 2} {
|
||||
for i := 0; i < int(nonce)+1; i++ {
|
||||
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater))
|
||||
blob := encodeForPool(makeTx(nonce, 1, uint64(i)+1 /* unique hashes */, 1, repeater))
|
||||
|
||||
id, _ := store.Put(blob)
|
||||
if i == 0 {
|
||||
|
|
@ -842,7 +848,7 @@ func TestOpenIndex(t *testing.T) {
|
|||
)
|
||||
for _, i := range []int{5, 3, 4, 2, 0, 1} { // Randomize the tx insertion order to force sorting on load
|
||||
tx := makeTx(uint64(i), txExecTipCaps[i], txExecFeeCaps[i], txBlobFeeCaps[i], key)
|
||||
blob, _ := rlp.EncodeToBytes(tx)
|
||||
blob := encodeForPool(tx)
|
||||
store.Put(blob)
|
||||
}
|
||||
store.Close()
|
||||
|
|
@ -934,9 +940,9 @@ func TestOpenHeap(t *testing.T) {
|
|||
tx2 = makeTx(0, 1, 800, 70, key2)
|
||||
tx3 = makeTx(0, 1, 1500, 110, key3)
|
||||
|
||||
blob1, _ = rlp.EncodeToBytes(tx1)
|
||||
blob2, _ = rlp.EncodeToBytes(tx2)
|
||||
blob3, _ = rlp.EncodeToBytes(tx3)
|
||||
blob1 = encodeForPool(tx1)
|
||||
blob2 = encodeForPool(tx2)
|
||||
blob3 = encodeForPool(tx3)
|
||||
|
||||
heapOrder = []common.Address{addr2, addr1, addr3}
|
||||
heapIndex = map[common.Address]int{addr2: 0, addr1: 1, addr3: 2}
|
||||
|
|
@ -1009,9 +1015,9 @@ func TestOpenCap(t *testing.T) {
|
|||
tx2 = makeTx(0, 1, 800, 70, key2)
|
||||
tx3 = makeTx(0, 1, 1500, 110, key3)
|
||||
|
||||
blob1, _ = rlp.EncodeToBytes(tx1)
|
||||
blob2, _ = rlp.EncodeToBytes(tx2)
|
||||
blob3, _ = rlp.EncodeToBytes(tx3)
|
||||
blob1 = encodeForPool(tx1)
|
||||
blob2 = encodeForPool(tx2)
|
||||
blob3 = encodeForPool(tx3)
|
||||
|
||||
keep = []common.Address{addr1, addr3}
|
||||
drop = []common.Address{addr2}
|
||||
|
|
@ -1098,8 +1104,8 @@ func TestChangingSlotterSize(t *testing.T) {
|
|||
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
|
||||
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)
|
||||
|
||||
blob1, _ = rlp.EncodeToBytes(tx1)
|
||||
blob2, _ = rlp.EncodeToBytes(tx2)
|
||||
blob1 = encodeForPool(tx1)
|
||||
blob2 = encodeForPool(tx2)
|
||||
)
|
||||
|
||||
// Write the two safely sized txs to store. note: although the store is
|
||||
|
|
@ -1201,8 +1207,8 @@ func TestBillyMigration(t *testing.T) {
|
|||
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0)
|
||||
tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0)
|
||||
|
||||
blob1, _ = rlp.EncodeToBytes(tx1)
|
||||
blob2, _ = rlp.EncodeToBytes(tx2)
|
||||
blob1 = encodeForPool(tx1)
|
||||
blob2 = encodeForPool(tx2)
|
||||
)
|
||||
|
||||
// Write the two safely sized txs to store. note: although the store is
|
||||
|
|
@ -1281,6 +1287,85 @@ func TestBillyMigration(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestLegacyTxConversion verifies that on Init, transactions stored in the
|
||||
// legacy *types.Transaction RLP format are detected and migrated into the new
|
||||
// blobTxForPool storage format, and that they remain retrievable via the pool
|
||||
// API after the conversion.
|
||||
func TestLegacyTxConversion(t *testing.T) {
|
||||
storage := t.TempDir()
|
||||
os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
|
||||
os.MkdirAll(filepath.Join(storage, limboedTransactionStore), 0700)
|
||||
|
||||
// Initialize the pending store with two blob transactions encoded in the
|
||||
// legacy format.
|
||||
queuedir := filepath.Join(storage, pendingTransactionStore)
|
||||
store, err := billy.Open(billy.Options{Path: queuedir}, newSlotter(testMaxBlobsPerBlock), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open billy: %v", err)
|
||||
}
|
||||
|
||||
key1, _ := crypto.GenerateKey()
|
||||
key2, _ := crypto.GenerateKey()
|
||||
addr1 := crypto.PubkeyToAddress(key1.PublicKey)
|
||||
addr2 := crypto.PubkeyToAddress(key2.PublicKey)
|
||||
|
||||
tx1 := makeMultiBlobTx(0, 1, 1000, 100, 2, 0, key1, types.BlobSidecarVersion0)
|
||||
tx2 := makeMultiBlobTx(0, 1, 1000, 100, 2, 2, key2, types.BlobSidecarVersion0)
|
||||
|
||||
for _, tx := range []*types.Transaction{tx1, tx2} {
|
||||
legacy, err := rlp.EncodeToBytes(tx)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to legacy-encode tx: %v", err)
|
||||
}
|
||||
if _, err := store.Put(legacy); err != nil {
|
||||
t.Fatalf("failed to put legacy blob: %v", err)
|
||||
}
|
||||
}
|
||||
store.Close()
|
||||
|
||||
// Init should migrate the legacy entries into the new storage format.
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
|
||||
statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
|
||||
statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
|
||||
statedb.Commit(0, true, false)
|
||||
|
||||
chain := &testBlockChain{
|
||||
config: params.MainnetChainConfig,
|
||||
basefee: uint256.NewInt(params.InitialBaseFee),
|
||||
blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
// Both transactions should be retrievable.
|
||||
for _, want := range []*types.Transaction{tx1, tx2} {
|
||||
got := pool.Get(want.Hash())
|
||||
if got == nil {
|
||||
t.Fatalf("migrated tx %s not found in pool", want.Hash())
|
||||
}
|
||||
if got.BlobTxSidecar() == nil {
|
||||
t.Fatalf("migrated tx %s lost its sidecar", want.Hash())
|
||||
}
|
||||
if got.Hash() != want.Hash() {
|
||||
t.Fatalf("migrated tx hash mismatch: have %s, want %s", got.Hash(), want.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy formats should not exist on pool.store
|
||||
pool.store.Iterate(func(id uint64, size uint32, blob []byte) {
|
||||
var ptx blobTxForPool
|
||||
if err := rlp.DecodeBytes(blob, &ptx); err != nil {
|
||||
t.Errorf("entry %d not in new blobTxForPool format: %v", id, err)
|
||||
}
|
||||
})
|
||||
|
||||
verifyPoolInternals(t, pool)
|
||||
}
|
||||
|
||||
// TestBlobCountLimit tests the blobpool enforced limits on the max blob count.
|
||||
func TestBlobCountLimit(t *testing.T) {
|
||||
var (
|
||||
|
|
@ -1746,7 +1831,7 @@ func TestAdd(t *testing.T) {
|
|||
// Sign the seed transactions and store them in the data store
|
||||
for _, tx := range seed.txs {
|
||||
signed := types.MustSignNewTx(keys[acc], types.LatestSigner(params.MainnetChainConfig), tx)
|
||||
blob, _ := rlp.EncodeToBytes(signed)
|
||||
blob := encodeForPool(signed)
|
||||
store.Put(blob)
|
||||
}
|
||||
}
|
||||
|
|
@ -1853,9 +1938,9 @@ func TestGetBlobs(t *testing.T) {
|
|||
tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion1) // [6, 12)
|
||||
tx3 = makeMultiBlobTx(0, 1, 800, 110, 6, 12, key3, types.BlobSidecarVersion0) // [12, 18)
|
||||
|
||||
blob1, _ = rlp.EncodeToBytes(tx1)
|
||||
blob2, _ = rlp.EncodeToBytes(tx2)
|
||||
blob3, _ = rlp.EncodeToBytes(tx3)
|
||||
blob1 = encodeForPool(tx1)
|
||||
blob2 = encodeForPool(tx2)
|
||||
blob3 = encodeForPool(tx3)
|
||||
)
|
||||
|
||||
// Write the two safely sized txs to store. note: although the store is
|
||||
|
|
@ -2055,6 +2140,32 @@ func TestGetBlobs(t *testing.T) {
|
|||
pool.Close()
|
||||
}
|
||||
|
||||
// TestEncodeForNetwork verifies that encodeForNetwork produces output identical
|
||||
// to rlp.EncodeToBytes on the original transaction, for both V0 and V1 sidecars.
|
||||
func TestEncodeForNetwork(t *testing.T) {
|
||||
t.Run("v0", func(t *testing.T) { testEncodeForNetwork(t, types.BlobSidecarVersion0) })
|
||||
t.Run("v1", func(t *testing.T) { testEncodeForNetwork(t, types.BlobSidecarVersion1) })
|
||||
}
|
||||
|
||||
func testEncodeForNetwork(t *testing.T, version byte) {
|
||||
key, _ := crypto.GenerateKey()
|
||||
tx := makeMultiBlobTx(0, 1, 1, 1, 1, 0, key, version)
|
||||
|
||||
wantRLP, err := rlp.EncodeToBytes(tx)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to encode tx: %v", err)
|
||||
}
|
||||
storedRLP := encodeForPool(tx)
|
||||
|
||||
gotRLP, err := encodeForNetwork(storedRLP)
|
||||
if err != nil {
|
||||
t.Fatalf("encodeForNetwork failed: %v", err)
|
||||
}
|
||||
if !bytes.Equal(gotRLP, wantRLP) {
|
||||
t.Fatalf("network encoding mismatch (version %d): got %d bytes, want %d bytes", version, len(gotRLP), len(wantRLP))
|
||||
}
|
||||
}
|
||||
|
||||
// fakeBilly is a billy.Database implementation which just drops data on the floor.
|
||||
type fakeBilly struct {
|
||||
billy.Database
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import (
|
|||
type limboBlob struct {
|
||||
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
|
||||
Block uint64 // Block in which the blob transaction was included
|
||||
Tx *types.Transaction
|
||||
Ptx *blobTxForPool
|
||||
}
|
||||
|
||||
// limbo is a light, indexed database to temporarily store recently included
|
||||
|
|
@ -146,15 +146,14 @@ func (l *limbo) finalize(final *types.Header) {
|
|||
|
||||
// push stores a new blob transaction into the limbo, waiting until finality for
|
||||
// it to be automatically evicted.
|
||||
func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
||||
// If the blobs are already tracked by the limbo, consider it a programming
|
||||
// error. There's not much to do against it, but be loud.
|
||||
if _, ok := l.index[tx.Hash()]; ok {
|
||||
log.Error("Limbo cannot push already tracked blobs", "tx", tx.Hash())
|
||||
func (l *limbo) push(ptx *blobTxForPool, block uint64) error {
|
||||
hash := ptx.Tx.Hash()
|
||||
if _, ok := l.index[hash]; ok {
|
||||
log.Error("Limbo cannot push already tracked blobs", "tx", hash)
|
||||
return errors.New("already tracked blob transaction")
|
||||
}
|
||||
if err := l.setAndIndex(tx, block); err != nil {
|
||||
log.Error("Failed to set and index limboed blobs", "tx", tx.Hash(), "err", err)
|
||||
if err := l.setAndIndex(ptx, block); err != nil {
|
||||
log.Error("Failed to set and index limboed blobs", "tx", hash, "err", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
@ -163,7 +162,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
|
|||
// pull retrieves a previously pushed set of blobs back from the limbo, removing
|
||||
// it at the same time. This method should be used when a previously included blob
|
||||
// transaction gets reorged out.
|
||||
func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
|
||||
func (l *limbo) pull(tx common.Hash) (*blobTxForPool, error) {
|
||||
// If the blobs are not tracked by the limbo, there's not much to do. This
|
||||
// can happen for example if a blob transaction is mined without pushing it
|
||||
// into the network first.
|
||||
|
|
@ -177,7 +176,7 @@ func (l *limbo) pull(tx common.Hash) (*types.Transaction, error) {
|
|||
log.Error("Failed to get and drop limboed blobs", "tx", tx, "id", id, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
return item.Tx, nil
|
||||
return item.Ptx, nil
|
||||
}
|
||||
|
||||
// update changes the block number under which a blob transaction is tracked. This
|
||||
|
|
@ -209,7 +208,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
|
|||
log.Error("Failed to get and drop limboed blobs", "tx", txhash, "id", id, "err", err)
|
||||
return
|
||||
}
|
||||
if err := l.setAndIndex(item.Tx, block); err != nil {
|
||||
if err := l.setAndIndex(item.Ptx, block); err != nil {
|
||||
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
|
||||
return
|
||||
}
|
||||
|
|
@ -240,12 +239,12 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
|
|||
|
||||
// setAndIndex assembles a limbo blob database entry and stores it, also updating
|
||||
// the in-memory indices.
|
||||
func (l *limbo) setAndIndex(tx *types.Transaction, block uint64) error {
|
||||
txhash := tx.Hash()
|
||||
func (l *limbo) setAndIndex(ptx *blobTxForPool, block uint64) error {
|
||||
txhash := ptx.Tx.Hash()
|
||||
item := &limboBlob{
|
||||
TxHash: txhash,
|
||||
Block: block,
|
||||
Tx: tx,
|
||||
Ptx: ptx,
|
||||
}
|
||||
data, err := rlp.EncodeToBytes(item)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue