diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 28326ae605..27441ac2e2 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -94,6 +94,16 @@ const ( // storeVersion is the current slotter layout used for the billy.Database // store. storeVersion = 1 + + // gappedLifetime is the approximate duration for which nonce-gapped transactions + // are kept before being dropped. Since gapped is only a reorder buffer and it + // is expected that the original transactions were inserted in the mempool in + // nonce order, the duration is kept short to avoid DoS vectors. + gappedLifetime = 1 * time.Minute + + // maxGappedTxs is the maximum number of gapped transactions kept overall. + // This is a safety limit to avoid DoS vectors. + maxGapped = 128 ) // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and @@ -330,6 +340,9 @@ type BlobPool struct { stored uint64 // Useful data size of all transactions on disk limbo *limbo // Persistent data store for the non-finalized blobs + gapped map[common.Address][]*types.Transaction // Transactions that are currently gapped (nonce too high) + gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion + signer types.Signer // Transaction signer to use for sender recovery chain BlockChain // Chain object to access the state through @@ -363,6 +376,8 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo lookup: newLookup(), index: make(map[common.Address][]*blobTxMeta), spent: make(map[common.Address]*uint256.Int), + gapped: make(map[common.Address][]*types.Transaction), + gappedSource: make(map[common.Hash]common.Address), } } @@ -834,6 +849,9 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { resettimeHist.Update(time.Since(start).Nanoseconds()) }(time.Now()) + // Handle reorg buffer timeouts evicting old gapped transactions + p.evictGapped() + statedb, err := p.chain.StateAt(newHead.Root) if err != nil { log.Error("Failed to reset blobpool state", "err", err) @@ -1196,7 +1214,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error { State: p.state, FirstNonceGap: func(addr common.Address) uint64 { - // Nonce gaps are not permitted in the blob pool, the first gap will + // Nonce gaps are permitted in the blob pool, but only as part of the + // in-memory 'gapped' buffer. We expose the gap here to validateTx, + // then handle the error by adding to the buffer. The first gap will // be the next nonce shifted by however many transactions we already // have pooled. return p.state.GetNonce(addr) + uint64(len(p.index[addr])) @@ -1275,7 +1295,9 @@ func (p *BlobPool) Has(hash common.Hash) bool { p.lock.RLock() defer p.lock.RUnlock() - return p.lookup.exists(hash) + poolHas := p.lookup.exists(hash) + _, gapped := p.gappedSource[hash] + return poolHas || gapped } func (p *BlobPool) getRLP(hash common.Hash) []byte { @@ -1466,10 +1488,6 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { adds = append(adds, tx.WithoutBlobTxSidecar()) } } - if len(adds) > 0 { - p.discoverFeed.Send(core.NewTxsEvent{Txs: adds}) - p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) - } return errs } @@ -1488,6 +1506,13 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { addtimeHist.Update(time.Since(start).Nanoseconds()) }(time.Now()) + return p.addLocked(tx, true) +} + +// addLocked inserts a new blob transaction into the pool if it passes validation (both +// consensus validity and pool restrictions). It must be called with the pool lock held. +// Only for internal use. +func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error) { // Ensure the transaction is valid from all perspectives if err := p.validateTx(tx); err != nil { log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err) @@ -1500,6 +1525,21 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { addStaleMeter.Mark(1) case errors.Is(err, core.ErrNonceTooHigh): addGappedMeter.Mark(1) + // Store the tx in memory, and revalidate later + from, _ := types.Sender(p.signer, tx) + allowance := p.gappedAllowance(from) + if allowance >= 1 && len(p.gapped) < maxGapped { + p.gapped[from] = append(p.gapped[from], tx) + p.gappedSource[tx.Hash()] = from + log.Trace("added tx to gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) + return nil + } else { + // if maxGapped is reached, it is better to give time to gapped + // transactions by keeping the old and dropping this one. + // Thus replacing a gapped transaction with another gapped transaction + // is discouraged. + log.Trace("no gapped blob queue allowance", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) + } case errors.Is(err, core.ErrInsufficientFunds): addOverdraftedMeter.Mark(1) case errors.Is(err, txpool.ErrAccountLimitExceeded): @@ -1637,6 +1677,58 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { p.updateStorageMetrics() addValidMeter.Mark(1) + + // Notify all listeners of the new arrival + p.discoverFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) + p.insertFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx.WithoutBlobTxSidecar()}}) + + //check the gapped queue for this account and try to promote + if gtxs, ok := p.gapped[from]; checkGapped && ok && len(gtxs) > 0 { + // We have to add in nonce order, but we want to stable sort to cater for situations + // where transactions are replaced, keeping the original receive order for same nonce + sort.SliceStable(gtxs, func(i, j int) bool { + return gtxs[i].Nonce() < gtxs[j].Nonce() + }) + for len(gtxs) > 0 { + stateNonce := p.state.GetNonce(from) + firstgap := stateNonce + uint64(len(p.index[from])) + + if gtxs[0].Nonce() > firstgap { + // Anything beyond the first gap is not addable yet + break + } + + // Drop any buffered transactions that became stale in the meantime (included in chain or replaced) + // If we arrive to the transaction in the pending range (between the state Nonce and first gap, we + // try to add them now while removing from here. + tx := gtxs[0] + gtxs[0] = nil + gtxs = gtxs[1:] + delete(p.gappedSource, tx.Hash()) + + if tx.Nonce() < stateNonce { + // Stale, drop it. Eventually we could add to limbo here if hash matches. + log.Trace("Gapped blob transaction became stale", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "state", stateNonce, "qlen", len(p.gapped[from])) + continue + } + + if tx.Nonce() <= firstgap { + // If we hit the pending range, including the first gap, add it and continue to try to add more. + // We do not recurse here, but continue to loop instead. + // We are under lock, so we can add the transaction directly. + if err := p.addLocked(tx, false); err == nil { + log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) + } else { + log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err) + } + } + } + if len(gtxs) == 0 { + delete(p.gapped, from) + } else { + p.gapped[from] = gtxs + } + } return nil } @@ -1868,6 +1960,50 @@ func (p *BlobPool) Nonce(addr common.Address) uint64 { return p.state.GetNonce(addr) } +// gappedAllowance returns the number of gapped transactions still +// allowed for the given account. Allowance is based on a slow-start +// logic, allowing more gaps (resource usage) to accounts with a +// higher nonce. Can also return negative values. +func (p *BlobPool) gappedAllowance(addr common.Address) int { + // Gaps happen, but we don't want to allow too many. + // Use log10(nonce+1) as the allowance, with a minimum of 0. + nonce := p.state.GetNonce(addr) + allowance := int(math.Log10(float64(nonce + 1))) + // Cap the allowance to the remaining pool space + return min(allowance, maxTxsPerAccount-len(p.index[addr])) - len(p.gapped[addr]) +} + +// evictGapped removes the old transactions from the gapped reorder buffer. +// Concurrency: The caller must hold the pool lock before calling this function. +func (p *BlobPool) evictGapped() { + cutoff := time.Now().Add(-gappedLifetime) + for from, txs := range p.gapped { + nonce := p.state.GetNonce(from) + // Reuse the original slice to avoid extra allocations. + // This is safe because we only keep references to the original gappedTx objects, + // and we overwrite the slice for this account after filtering. + keep := txs[:0] + for i, gtx := range txs { + if gtx.Time().Before(cutoff) || gtx.Nonce() < nonce { + // Evict old or stale transactions + // Should we add stale to limbo here if it would belong? + delete(p.gappedSource, gtx.Hash()) + txs[i] = nil // Explicitly nil out evicted element + } else { + keep = append(keep, gtx) + } + } + if len(keep) < len(txs) { + log.Trace("Evicting old gapped blob transactions", "count", len(txs)-len(keep), "from", from) + } + if len(keep) == 0 { + delete(p.gapped, from) + } else { + p.gapped[from] = keep + } + } +} + // Stats retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions. func (p *BlobPool) Stats() (int, int) { @@ -1902,9 +2038,15 @@ func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*ty // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus { - if p.Has(hash) { + p.lock.RLock() + defer p.lock.RUnlock() + + if p.lookup.exists(hash) { return txpool.TxStatusPending } + if _, gapped := p.gappedSource[hash]; gapped { + return txpool.TxStatusQueued + } return txpool.TxStatusUnknown } diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index eda87008c3..4bb3567b69 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -1352,9 +1352,10 @@ func TestAdd(t *testing.T) { } // addtx is a helper sender/tx tuple to represent a new tx addition type addtx struct { - from string - tx *types.BlobTx - err error + from string + tx *types.BlobTx + err error + check func(*BlobPool, *types.Transaction) bool } tests := []struct { @@ -1371,6 +1372,7 @@ func TestAdd(t *testing.T) { "bob": {balance: 21100 + blobSize, nonce: 1}, "claire": {balance: 21100 + blobSize}, "dave": {balance: 21100 + blobSize, nonce: 1}, + "eve": {balance: 21100 + blobSize, nonce: 10}, // High nonce to test gapped acceptance }, adds: []addtx{ { // New account, no previous txs: accept nonce 0 @@ -1398,6 +1400,14 @@ func TestAdd(t *testing.T) { tx: makeUnsignedTx(2, 1, 1, 1), err: core.ErrNonceTooHigh, }, + { // Old account, 10 txs in chain: 0 pending: accept nonce 11 as gapped + from: "eve", + tx: makeUnsignedTx(11, 1, 1, 1), + err: nil, + check: func(pool *BlobPool, tx *types.Transaction) bool { + return pool.Status(tx.Hash()) == txpool.TxStatusQueued + }, + }, }, }, // Transactions from already pooled accounts should only be accepted if @@ -1758,15 +1768,28 @@ func TestAdd(t *testing.T) { t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, errs[0], add.err) } if add.err == nil { - size, exist := pool.lookup.sizeOfTx(signed.Hash()) - if !exist { - t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j) + // first check if tx is in the pool (reorder queue or pending) + if !pool.Has(signed.Hash()) { + t.Errorf("test %d, tx %d: added transaction not found in pool", i, j) } - if size != signed.Size() { - t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v", - i, j, size, signed.Size()) + // if it is pending, check if size matches + if pool.Status(signed.Hash()) == txpool.TxStatusPending { + size, exist := pool.lookup.sizeOfTx(signed.Hash()) + if !exist { + t.Errorf("test %d, tx %d: failed to lookup transaction's size", i, j) + } + if size != signed.Size() { + t.Errorf("test %d, tx %d: transaction's size mismatches: have %v, want %v", + i, j, size, signed.Size()) + } } } + if add.check != nil { + if !add.check(pool, signed) { + t.Errorf("test %d, tx %d: custom check failed", i, j) + } + } + // Verify the pool internals after each addition verifyPoolInternals(t, pool) } verifyPoolInternals(t, pool)