From 98a792673af1a158e8e9b47616d0e0066bd61fb9 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg <519948+ARR4N@users.noreply.github.com> Date: Wed, 18 Feb 2026 12:02:13 +0000 Subject: [PATCH] feat: `parallel` package for precompile pre-processing (#228) ## Why this should be merged EVM parallel co-processors that interface with the regular transaction path via precompiles. ## How this works Introduces the `parallel.Processor`, which orchestrates a set of `parallel.Handler`s. Each `Handler` performs arbitrary, strongly typed processing of any sub-set of transactions in a block and makes its results available to a precompile and/or a post-block method for persisting state. Although stateful, `Handler`s can only read the pre-block and post-block state, which isolates them from conflicts with the regular transaction path. There is deliberately no support for a precompile to "write" to a `Handler`, only to "read". This is because the transaction might still revert, which would also have to be communicated to the `Handler`, resulting in unnecessary complexity. Logs/events are the recommended approach for precompile -> `Handler` communication, to be read from the `types.Receipts` at the end of the block. ## How this was tested Integration tests covering: 1. Selection of transactions to process + end-to-end plumbing of data through a `Handler`. 2. Registration as a precompile, exercised with actual transaction processing, and demonstrating log + return-data correctness. --------- Signed-off-by: Arran Schlosberg <519948+ARR4N@users.noreply.github.com> --- libevm/libevm.go | 3 + libevm/precompiles/parallel/eventual.go | 52 ++ libevm/precompiles/parallel/handler.go | 297 ++++++++++ libevm/precompiles/parallel/parallel.go | 333 +++++++++++ libevm/precompiles/parallel/parallel_test.go | 576 +++++++++++++++++++ libevm/precompiles/parallel/precompile.go | 46 ++ 6 files changed, 1307 insertions(+) create mode 100644 libevm/precompiles/parallel/eventual.go create mode 100644 libevm/precompiles/parallel/handler.go create mode 100644 libevm/precompiles/parallel/parallel.go create mode 100644 libevm/precompiles/parallel/parallel_test.go create mode 100644 libevm/precompiles/parallel/precompile.go diff --git a/libevm/libevm.go b/libevm/libevm.go index a429d60487..4f9e3a363b 100644 --- a/libevm/libevm.go +++ b/libevm/libevm.go @@ -56,6 +56,9 @@ type StateReader interface { AddressInAccessList(addr common.Address) bool SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool) + + TxHash() common.Hash + TxIndex() int } // AddressContext carries addresses available to contexts such as calls and diff --git a/libevm/precompiles/parallel/eventual.go b/libevm/precompiles/parallel/eventual.go new file mode 100644 index 0000000000..ba67cc2c56 --- /dev/null +++ b/libevm/precompiles/parallel/eventual.go @@ -0,0 +1,52 @@ +// Copyright 2025-2026 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package parallel + +// An eventual type holds a value that is set at some unknown point in the +// future and used, possibly concurrently, by one or more peekers or a single +// taker (together, "getters"). The zero value is NOT ready for use. +type eventual[T any] struct { + ch chan T +} + +// eventually returns a new eventual value. +func eventually[T any]() eventual[T] { + return eventual[T]{ + ch: make(chan T, 1), + } +} + +// put sets the value, unblocking any current and future getters. put itself is +// non-blocking, however it is NOT possible to overwrite the value without an +// intervening call to [eventual.take]. +func (e eventual[T]) put(v T) { + e.ch <- v +} + +// peek returns the value after making it available for other getters. Although +// the act of peeking is threadsafe, the returned value might not be. +func (e eventual[T]) peek() T { + v := <-e.ch + e.ch <- v + return v +} + +// take returns the value and resets e to its default state as if immediately +// after construction. +func (e eventual[T]) take() T { + return <-e.ch +} diff --git a/libevm/precompiles/parallel/handler.go b/libevm/precompiles/parallel/handler.go new file mode 100644 index 0000000000..0043243140 --- /dev/null +++ b/libevm/precompiles/parallel/handler.go @@ -0,0 +1,297 @@ +// Copyright 2025-2026 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package parallel + +import ( + "sync" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/libevm" + "github.com/ava-labs/libevm/libevm/stateconf" +) + +// A Handler is responsible for processing [types.Transactions] in an +// embarrassingly parallel fashion. It is the responsibility of the Handler to +// determine whether this is possible, typically only so if one of the following +// is true with respect to a precompile associated with the Handler: +// +// 1. The destination address is that of the precompile; or +// 2. At least one [types.AccessTuple] references the precompile's address. +// +// Scenario (2) allows precompile access to be determined through inspection of +// the [types.Transaction] alone, without the need for execution. +// +// A [Processor] will orchestrate calling of Handler methods as follows: +// +// | - Prefetch(i) - Process(i) +// | / / +// | BeforeBlock() - ShouldProcess(0..n) - PostProcess() - AfterBlock() +// | \ \ +// | - Prefetch(j) - Process(j) +// +// IntRA-Handler guarantees: +// +// 1. BeforeBlock() precedes all ShouldProcess() calls. +// 2. ShouldProcess() calls are sequential, in the same order as transactions in the block. +// 3. Prefetch() precedes the respective Process() call. Not called if ShouldProcess() returns false. +// 4. PostProcess() precedes AfterBlock(). +// +// Note that PostProcess() MAY be called at any time after BeforeBlock(), and +// implementations MUST synchronise with Process() by using the [Results]. There +// are no intER-Handler guarantees except that AfterBlock() methods are called +// sequentially, in the same order as they were registered with [AddHandler]. +// +// All [libevm.StateReader] instances are opened to the state at the beginning +// of the block. The [StateDB] is the same one used to execute the block, before +// being committed, and MAY be written to. +type Handler[CommonData, Data, Result, Aggregated any] interface { + // BeforeBlock is called before all calls to ShouldProcess() on this + // Handler. + BeforeBlock(libevm.StateReader, *types.Header) CommonData + // ShouldProcess reports whether the Handler SHOULD receive the transaction + // for processing and, if so, how much gas to charge. Processing is + // performed i.f.f. the returned boolean is true and there is sufficient gas + // limit to cover intrinsic gas for all Handlers that returned true. If + // there is insufficient gas for processing then the transaction will result + // in [vm.ErrOutOfGas] as long as the [Processor] is registered with + // [vm.RegisterHooks] as a [vm.Preprocessor]. + // + // Implementations MUST NOT perform any meaningful computation + // but MAY perform inter-transaction checks such as, for example, + // deduplication of work. + ShouldProcess(IndexedTx, CommonData) (do bool, gas uint64) + // Prefetch is called before the respective call to Process() on this + // Handler. It MUST NOT perform any meaningful computation beyond what is + // necessary to determine the necessary state to propagate to Process(). + Prefetch(libevm.StateReader, IndexedTx, CommonData) Data + // Process is responsible for performing all meaningful, per-transaction + // computation. It receives the common data returned by the single call to + // BeforeBlock() as well as the data from the respective call to Prefetch(). + // The returned result is propagated to PostProcess() and any calls to the + // function returned by [AddHandler]. + // + // NOTE: if the result is exposed to the EVM via a precompile then said + // precompile will block until Process() returns. While this guarantees the + // availability of pre-processed results, it is also the hot path for EVM + // transactions. + Process(libevm.StateReader, IndexedTx, CommonData, Data) Result + // PostProcess is called concurrently with all calls to Process(). It allows + // for online aggregation of results into a format ready for writing to + // state. + // + // NOTE: although PostProcess() MAY perform computation, it will block the + // calling of AfterBlock() and hence also the execution of the next block. + PostProcess(CommonData, Results[Result]) Aggregated + // AfterBlock is called after PostProcess() returns and all regular EVM + // transaction processing is complete. It MUST NOT perform any meaningful + // computation beyond what is necessary to (a) parse receipts, and (b) + // persist aggregated results. + AfterBlock(StateDB, Aggregated, *types.Block, types.Receipts) +} + +// An IndexedTx couples a [types.Transaction] with its index in a block. +type IndexedTx struct { + Index int + *types.Transaction +} + +// Results provides mechanisms for blocking on the output of [Handler.Process]. +type Results[R any] struct { + WaitForAll func() + TxOrder, ProcessOrder <-chan TxResult[R] +} + +// A TxResult couples an [IndexedTx] with its respective result from +// [Handler.Process]. +type TxResult[R any] struct { + Tx IndexedTx + Result R +} + +// StateDB is the subset of [state.StateDB] methods that MAY be called by +// [Handler.AfterBlock]. +type StateDB interface { + libevm.StateReader + SetState(_ common.Address, key, val common.Hash, _ ...stateconf.StateDBStateOption) +} + +var _ handler = (*wrapper[any, any, any, any])(nil) + +// A wrapper exposes the generic functionality of a [Handler] in a non-generic +// manner, allowing [Processor] to be free of type parameters. +type wrapper[CD, D, R, A any] struct { + Handler[CD, D, R, A] + + totalTxsInBlock int + txsBeingProcessed sync.WaitGroup + + common eventual[CD] + data []eventual[D] + + results []eventual[result[R]] + whenProcessed, txOrder chan TxResult[R] + + aggregated eventual[A] +} + +// AddHandler registers the [Handler] with the [Processor] and returns a +// function to fetch the [TxResult] for the i'th transaction passed to +// [Processor.StartBlock]. +// +// The returned function until the respective transaction has had its result +// processed, and then returns the value returned by the [Handler]. The returned +// boolean will be false if no processing occurred, either because the [Handler] +// indicated as such or because the transaction supplied insufficient gas. +// +// Multiple calls to Result with the same argument are allowed. Callers MUST NOT +// charge the gas price for preprocessing as this is handled by +// [Processor.PreprocessingGasCharge] if registered as a [vm.Preprocessor]. +// +// Within the scope of a given block, the same value will be returned by each +// call with the same argument, such that if R is a pointer then modifications +// will persist between calls. However, the caller does NOT have mutually +// exclusive access to the [TxResult] so SHOULD NOT modify it, especially since +// the result MAY also be accessed by [Handler.PostProcess], with no ordering +// guarantees. +func AddHandler[CD, D, R, A any](p *Processor, h Handler[CD, D, R, A]) func(txIndex int) (TxResult[R], bool) { + w := &wrapper[CD, D, R, A]{ + Handler: h, + common: eventually[CD](), + aggregated: eventually[A](), + } + p.handlers = append(p.handlers, w) + return w.result +} + +func (w *wrapper[CD, D, R, A]) beforeBlock(sdb libevm.StateReader, b *types.Block) { + w.totalTxsInBlock = len(b.Transactions()) + // We can reuse the channels already in the data and results slices because + // they're emptied by [wrapper.process] and [wrapper.finishBlock] + // respectively. + for i := len(w.results); i < w.totalTxsInBlock; i++ { + w.data = append(w.data, eventually[D]()) + w.results = append(w.results, eventually[result[R]]()) + } + + go func() { + // goroutine guaranteed to have completed by the time a respective + // getter unblocks (i.e. in any call to [wrapper.prefetch]). + w.common.put(w.BeforeBlock(sdb, types.CopyHeader(b.Header()))) + }() +} + +func (w *wrapper[CD, D, R, A]) shouldProcess(tx IndexedTx) (do bool, gas uint64) { + return w.Handler.ShouldProcess(tx, w.common.peek()) +} + +func (w *wrapper[CD, D, R, A]) beforeWork(jobs int) { + w.txsBeingProcessed.Add(jobs) + w.whenProcessed = make(chan TxResult[R], jobs) + w.txOrder = make(chan TxResult[R], jobs) + go func() { + w.txsBeingProcessed.Wait() + close(w.whenProcessed) + }() +} + +func (w *wrapper[CD, D, R, A]) prefetch(sdb libevm.StateReader, job *prefetch) { + w.data[job.tx.Index].put(w.Prefetch(sdb, job.tx, w.common.peek())) +} + +func (w *wrapper[CD, D, R, A]) process(sdb libevm.StateReader, job *process) { + defer w.txsBeingProcessed.Done() + + idx := job.tx.Index + val := w.Process(sdb, job.tx, w.common.peek(), w.data[idx].take()) + r := result[R]{ + tx: job.tx, + val: &val, + } + w.results[idx].put(r) + w.whenProcessed <- TxResult[R]{ + Tx: job.tx, + Result: val, + } +} + +func (w *wrapper[CD, D, R, A]) nullResult(job *job) { + w.results[job.tx.Index].put(result[R]{ + tx: job.tx, + val: nil, + }) +} + +func (w *wrapper[CD, D, R, A]) result(i int) (TxResult[R], bool) { + r := w.results[i].peek() + + txr := TxResult[R]{ + Tx: r.tx, + } + if r.val == nil { + return txr, false + } + txr.Result = *r.val + return txr, true +} + +func (w *wrapper[CD, D, R, A]) postProcess() { + go func() { + defer close(w.txOrder) + for i := range w.totalTxsInBlock { + r, ok := w.result(i) + if !ok { + continue + } + w.txOrder <- r + } + }() + + res := Results[R]{ + WaitForAll: w.txsBeingProcessed.Wait, + TxOrder: w.txOrder, + ProcessOrder: w.whenProcessed, + } + w.aggregated.put(w.PostProcess(w.common.peek(), res)) +} + +func (w *wrapper[CD, D, R, A]) finishBlock(sdb vm.StateDB, b *types.Block, rs types.Receipts) { + w.AfterBlock(sdb, w.aggregated.take(), b, rs) + + // [wrapper.postProcess] is guaranteed to have finished because it sets + // [wrapper.aggregated], from which we have just read. However + // [Handler.PostProcess] is under no obligation to block on anything, and + // the goroutine filling [wrapper.txOrder] might still be reading results. + // We therefore guarantee its completion before "getting and keeping" all of + // [wrapper.results] otherwise said goroutine can leak. + for range w.txOrder { + // Nobody needs these anymore, but we need to know that the channel has + // been closed. + } + // Although we know this will unblock effectively immediately, it's safer to + // verify the intuition than to rely on complex reasoning. + w.txsBeingProcessed.Wait() + + w.common.take() + for _, v := range w.results[:w.totalTxsInBlock] { + // Every result channel is guaranteed to have some value in its buffer + // because [Processor.BeforeBlock] either sends a nil *R or it + // dispatches a job, which will send a non-nil *R. + v.take() + } +} diff --git a/libevm/precompiles/parallel/parallel.go b/libevm/precompiles/parallel/parallel.go new file mode 100644 index 0000000000..b6674d5a39 --- /dev/null +++ b/libevm/precompiles/parallel/parallel.go @@ -0,0 +1,333 @@ +// Copyright 2025-2026 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +// Package parallel provides functionality for precompiled contracts with +// lifespans of an entire block. +package parallel + +import ( + "errors" + "fmt" + "math" + "sync" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/state" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/libevm" + "github.com/ava-labs/libevm/params" +) + +// A handler is the non-generic equivalent of a [Handler], exposed by [wrapper]. +type handler interface { + beforeBlock(libevm.StateReader, *types.Block) + shouldProcess(IndexedTx) (do bool, gas uint64) + beforeWork(jobs int) + prefetch(libevm.StateReader, *prefetch) + nullResult(*job) + process(libevm.StateReader, *process) + postProcess() + finishBlock(vm.StateDB, *types.Block, types.Receipts) +} + +// A Processor orchestrates dispatch and collection of results from one or more +// [Handler] instances. +type Processor struct { + handlers []handler + + workers sync.WaitGroup + stateShare stateDBSharer + prefetch chan *prefetch + process chan *process + + txGas map[common.Hash]uint64 +} + +type ( + // job is an alias to allow it to be used as an "underlying type" for + // generic type parameters, while prefetch and process are explicitly *not* + // aliases, to guarantee that they aren't considered equivalent. + job = struct { + handler handler + tx IndexedTx + } + prefetch job + process job +) + +type result[T any] struct { + tx IndexedTx + val *T +} + +// New constructs a new [Processor] with the specified number of concurrent +// prefetching and processing workers. As prefetching is typically IO-bound, it +// is reasonable to have more prefetchers than processors. The number of +// processors SHOULD be determined from GOMAXPROCS. Pipelining in such a fashion +// stops prefetching for later transactions being blocked by earlier, +// long-running processing; see the respective methods on [Handler] for more +// context. +// +// [Processor.Close] MUST be called after the final call to +// [Processor.FinishBlock] to avoid leaking goroutines. +func New(prefetchers, processors int) *Processor { + prefetchers = max(prefetchers, 1) + processors = max(processors, 1) + workers := prefetchers + processors + + p := &Processor{ + stateShare: stateDBSharer{ + workers: workers, + available: make(chan struct{}), + sdb: make(chan *state.StateDB, 1), + }, + prefetch: make(chan *prefetch), + process: make(chan *process), + txGas: make(map[common.Hash]uint64), + } + + p.workers.Add(workers) // for shutdown via [Processor.Close] + p.stateShare.wg.Add(workers) // for readiness of [Processor.worker] loops + for range prefetchers { + go worker(p, p.prefetch, func(sdb libevm.StateReader, job *prefetch) { + job.handler.prefetch(sdb, job) + }) + } + for range processors { + go worker(p, p.process, func(sdb libevm.StateReader, job *process) { + job.handler.process(sdb, job) + }) + } + p.stateShare.wg.Wait() + + return p +} + +// A stateDBSharer allows concurrent workers to make copies of a primary +// database. When the `available` channel is closed, all workers call +// [state.StateDB.Copy] then signal completion on the [sync.WaitGroup]. The +// channel is replaced for each round of distribution. +type stateDBSharer struct { + available chan struct{} + sdb chan *state.StateDB + workers int + wg sync.WaitGroup +} + +func (s *stateDBSharer) distribute(sdb *state.StateDB) { + ch := s.available // already copied by [Processor.worker], which is waiting for it to close + s.available = make(chan struct{}) // will be copied, ready for the next distribution + + s.sdb <- sdb + s.wg.Add(s.workers) + close(ch) // Take a moment to enjoy the symmetry :) + s.wg.Wait() + <-s.sdb +} + +func worker[J ~job](p *Processor, work <-chan *J, do func(libevm.StateReader, *J)) { + defer p.workers.Done() + + var sdb *state.StateDB + share := &p.stateShare + stateAvailable := share.available + // Without this signal of readiness, a premature call to + // [Processor.StartBlock] could replace `share.nextAvailable` before we've + // copied it. + share.wg.Done() + + for { + select { + case <-stateAvailable: // guaranteed at the beginning of each block + // [state.StateDB.Copy] is a complex method that isn't explicitly + // documented as being threadsafe. + sdb = (<-share.sdb).Copy() + share.sdb <- sdb // no need to return the original as each worker copies + + stateAvailable = share.available + share.wg.Done() + + case w, ok := <-work: + if !ok { + return + } + do(sdb, w) + } + } +} + +// Close shuts down the [Processor], after which it can no longer be used. +func (p *Processor) Close() { + close(p.prefetch) + close(p.process) + p.workers.Wait() +} + +// StartBlock dispatches transactions to every [Handler] but returns immediately +// after performing preliminary setup. It MUST be paired with a call to +// [Processor.FinishBlock], without overlap of blocks. +func (p *Processor) StartBlock(sdb *state.StateDB, rules params.Rules, b *types.Block) error { + // The distribution mechanism copies the StateDB so we don't need to do it + // here, but [wrapper.beforeBlock] doesn't make its own copy. Note that even + // reading from a [state.StateDB] is not threadsafe. + p.stateShare.distribute(sdb) + for _, h := range p.handlers { + h.beforeBlock(sdb.Copy(), b) + } + + txs := b.Transactions() + jobs := make([]*job, 0, len(p.handlers)*len(txs)) + workloads := make([]int, len(p.handlers)) + + for txIdx, rawTx := range txs { + tx := IndexedTx{ + Index: txIdx, + Transaction: rawTx, + } + + do, err := p.shouldProcess(tx, rules) // MUST NOT be concurrent within a Handler + if err != nil { + return err + } + for i, h := range p.handlers { + j := &job{ + tx: tx, + handler: h, + } + if !do[i] { + h.nullResult(j) + continue + } + workloads[i]++ + jobs = append(jobs, j) + } + } + + for i, w := range workloads { + p.handlers[i].beforeWork(w) + } + // All of the following goroutines are dependent on the one(s) preceding + // them, while [wrapper.finishBlock] is dependent on [wrapper.postProcess]. + // The return of [Processor.FinishBlock] is therefore a guarantee of the end + // of the lifespans of all of these goroutines. + go func() { + for _, j := range jobs { + p.prefetch <- (*prefetch)(j) + } + }() + go func() { + for _, j := range jobs { + p.process <- (*process)(j) + } + }() + for _, h := range p.handlers { + go h.postProcess() + } + return nil +} + +// FinishBlock propagates its arguments to every [Handler] and resets the +// [Processor] to a state ready for the next block. A return from FinishBlock +// guarantees that all dispatched work from the respective call to +// [Processor.StartBlock] has been completed. +func (p *Processor) FinishBlock(sdb vm.StateDB, b *types.Block, rs types.Receipts) { + // [Handler.FinishBlock] is allowed to write to state, so these MUST NOT be + // concurrent. + for _, h := range p.handlers { + h.finishBlock(sdb, b, rs) + } + for tx := range p.txGas { + delete(p.txGas, tx) + } +} + +func (p *Processor) shouldProcess(tx IndexedTx, rules params.Rules) (process []bool, retErr error) { + // An explicit 0 is necessary to avoid [Processor.PreprocessingGasCharge] + // returning [ErrTxUnknown]. + p.txGas[tx.Hash()] = 0 + + process = make([]bool, len(p.handlers)) + var totalCost uint64 + for i, h := range p.handlers { + do, cost := h.shouldProcess(tx) + if !do { + continue + } + process[i] = true + // It's safe to cap total cost at [math.MaxUint64] because intrinsic gas + // is always non-zero and the tx would therefore OOG. Not that we could + // reasonably expect such high gas consumption though ¯\_(ツ)_/¯ + totalCost += min(cost, math.MaxUint64-totalCost) + } + + defer func() { + if retErr == nil { + p.txGas[tx.Hash()] = totalCost + } + }() + + spent, err := txIntrinsicGas(tx.Transaction, &rules) + if err != nil { + return nil, fmt.Errorf("calculating intrinsic gas of %#x: %v", tx.Hash(), err) + } + if spent > tx.Gas() { + // If this happens then consensus has a bug because the tx shouldn't + // have been included. We include the check, however, for completeness + // as we would otherwise underflow below. + return nil, core.ErrIntrinsicGas + } + if remain := tx.Gas() - spent; remain < totalCost { + for i := range process { + process[i] = false + } + } + return process, nil +} + +func txIntrinsicGas(tx *types.Transaction, rules *params.Rules) (uint64, error) { + return intrinsicGas(tx.Data(), tx.AccessList(), tx.To(), rules) +} + +func intrinsicGas(data []byte, access types.AccessList, txTo *common.Address, rules *params.Rules) (uint64, error) { + create := txTo == nil + return core.IntrinsicGas( + data, + access, + create, + rules.IsHomestead, + rules.IsIstanbul, // EIP-2028 + rules.IsShanghai, // EIP-3860 + ) +} + +// ErrTxUnknown is returned by [Processor.PreprocessingGasCharge] if it is +// called with a transaction hash that wasn't in the last block passed to +// [Processor.StartBlock]. +var ErrTxUnknown = errors.New("transaction unknown by parallel preprocessor") + +// PreprocessingGasCharge implements the [vm.Preprocessor] interface and MUST be +// registered via [vm.RegisterHooks] to ensure proper gas accounting. +func (p *Processor) PreprocessingGasCharge(tx common.Hash) (uint64, error) { + g, ok := p.txGas[tx] + if !ok { + return 0, fmt.Errorf("%w: %v", ErrTxUnknown, tx) + } + return g, nil +} + +var _ vm.Preprocessor = (*Processor)(nil) diff --git a/libevm/precompiles/parallel/parallel_test.go b/libevm/precompiles/parallel/parallel_test.go new file mode 100644 index 0000000000..a8eb1a8604 --- /dev/null +++ b/libevm/precompiles/parallel/parallel_test.go @@ -0,0 +1,576 @@ +// Copyright 2025-2026 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package parallel + +import ( + "encoding/binary" + "fmt" + "math" + "math/big" + "math/rand/v2" + "slices" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/core/vm" + "github.com/ava-labs/libevm/crypto" + "github.com/ava-labs/libevm/libevm" + "github.com/ava-labs/libevm/libevm/ethtest" + "github.com/ava-labs/libevm/libevm/hookstest" + "github.com/ava-labs/libevm/params" + "github.com/ava-labs/libevm/trie" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, goleak.IgnoreCurrent()) +} + +type recorder struct { + tb testing.TB + + gas uint64 + addr common.Address + blockKey, prefetchKey, processKey common.Hash + + gotReceipts types.Receipts + gotAggregated aggregated +} + +type aggregated struct { + txOrder, processOrder []TxResult[recorded] +} + +type recorded struct { + TxData []byte + Prefetch, Process common.Hash + Common commonData +} + +type commonData struct { + HeaderExtra []byte + BeforeBlockStateVal common.Hash +} + +func (r *recorder) BeforeBlock(sdb libevm.StateReader, h *types.Header) commonData { + return commonData{ + HeaderExtra: slices.Clone(h.Extra), + BeforeBlockStateVal: sdb.GetState(r.addr, r.blockKey), + } +} + +func (r *recorder) ShouldProcess(tx IndexedTx, _ commonData) (bool, uint64) { + // TODO(arr4n) test that the [commonData] received here is the same as that + // returned by [recorder.BeforeBlock]. + if to := tx.To(); to != nil && *to == r.addr { + return true, r.gas + } + return false, 0 +} + +type prefetched struct { + prefetchStateVal common.Hash + common commonData +} + +func (r *recorder) Prefetch(sdb libevm.StateReader, tx IndexedTx, cd commonData) prefetched { + return prefetched{ + common: cd, + prefetchStateVal: sdb.GetState(r.addr, r.prefetchKey), + } +} + +func (r *recorder) Process(sdb libevm.StateReader, tx IndexedTx, cd commonData, data prefetched) recorded { + if diff := cmp.Diff(cd, data.common); diff != "" { + r.tb.Errorf("Mismatched CommonData propagation to Handler methods; diff (-Process, +Prefetch):\n%s", diff) + } + + return recorded{ + TxData: slices.Clone(tx.Data()), + Prefetch: data.prefetchStateVal, + Process: sdb.GetState(r.addr, r.processKey), + Common: cd, + } +} + +var _ PrecompileResult = recorded{} + +func (r recorded) PrecompileOutput(env vm.PrecompileEnvironment, input []byte) ([]byte, error) { + l := r.asLog() + l.Address = env.Addresses().EVMSemantic.Self + env.StateDB().AddLog(l) + return r.precompileReturnData(), nil +} + +func (r recorded) precompileReturnData() []byte { + return slices.Concat(r.Common.HeaderExtra, []byte("|"), r.TxData) +} + +func (r recorded) asLog() *types.Log { + return &types.Log{ + Topics: []common.Hash{r.Common.BeforeBlockStateVal, r.Prefetch, r.Process}, + } +} + +func (r *recorder) PostProcess(cd commonData, res Results[recorded]) aggregated { + // Although unnecessary because of the ranging over both channels, this just + // demonstrates that it's non-blocking. + defer res.WaitForAll() + + var out aggregated + for res := range res.TxOrder { + out.txOrder = append(out.txOrder, res) + } + for res := range res.ProcessOrder { + out.processOrder = append(out.processOrder, res) + } + + if len(out.txOrder) > 0 { + if diff := cmp.Diff(cd, out.txOrder[0].Result.Common); diff != "" { + r.tb.Errorf("Mismatched CommonData propagation to Handler methods; diff (-PostProcess, +Process):\n%s", diff) + } + } + + return out +} + +func (r *recorder) AfterBlock(_ StateDB, agg aggregated, _ *types.Block, rs types.Receipts) { + r.gotReceipts = slices.Clone(rs) + r.gotAggregated = agg +} + +func asHash(s string) (h common.Hash) { + copy(h[:], []byte(s)) + return +} + +func TestProcessor(t *testing.T) { + handler := &recorder{ + tb: t, + addr: common.Address{'c', 'o', 'n', 'c', 'a', 't'}, + gas: 1e6, + blockKey: asHash("block"), + prefetchKey: asHash("prefetch"), + processKey: asHash("process"), + } + p := New(8, 8) + getResult := AddHandler(p, handler) + t.Cleanup(p.Close) + + type blockParams struct { + numTxs int + sendToAddrEvery, sufficientGasEvery int + } + + // Each set of params is effectively a test case, but they are all run on + // the same [Processor]. + tests := []blockParams{ + { + numTxs: 0, + }, + { + numTxs: 500, + sendToAddrEvery: 7, + sufficientGasEvery: 5, + }, + { + numTxs: 1_000, + sendToAddrEvery: 7, + sufficientGasEvery: 5, + }, + { + numTxs: 1_000, + sendToAddrEvery: 11, + sufficientGasEvery: 3, + }, + { + numTxs: 100, + sendToAddrEvery: 1, + sufficientGasEvery: 1, + }, + { + numTxs: 0, + }, + } + + rng := rand.New(rand.NewPCG(0, 0)) //nolint:gosec // Reproducibility is useful for testing + for range 100 { + tests = append(tests, blockParams{ + numTxs: rng.IntN(1000), + sendToAddrEvery: 1 + rng.IntN(30), + sufficientGasEvery: 1 + rng.IntN(30), + }) + } + + _, _, sdb := ethtest.NewEmptyStateDB(t) + h := handler + blockVal := asHash("block_val") + sdb.SetState(h.addr, h.blockKey, blockVal) + prefetchVal := asHash("prefetch_val") + sdb.SetState(h.addr, h.prefetchKey, prefetchVal) + processVal := asHash("process_val") + sdb.SetState(h.addr, h.processKey, processVal) + + for _, tt := range tests { + t.Run("", func(t *testing.T) { + t.Logf("%+v", tt) + + var rules params.Rules + txs := make(types.Transactions, tt.numTxs) + wantProcessed := make([]bool, tt.numTxs) + for i := range len(txs) { + var ( + to common.Address + extraGas uint64 + ) + + wantProcessed[i] = true + if i%tt.sendToAddrEvery == 0 { + to = handler.addr + } else { + wantProcessed[i] = false + } + if i%tt.sufficientGasEvery == 0 { + extraGas = handler.gas + } else { + wantProcessed[i] = false + } + + data := binary.BigEndian.AppendUint64(nil, uint64(i)) //nolint:gosec // Known to be positive + gas, err := intrinsicGas(data, types.AccessList{}, &handler.addr, &rules) + require.NoError(t, err, "core.IntrinsicGas(%#x, nil, false, ...)", data) + + txs[i] = types.NewTx(&types.LegacyTx{ + To: &to, + Data: data, + Gas: gas + extraGas, + }) + } + + extra := []byte("extra") + block := types.NewBlock(&types.Header{Extra: extra}, txs, nil, nil, trie.NewStackTrie(nil)) + require.NoError(t, p.StartBlock(sdb, rules, block), "StartBlock()") + + var wantPerTx []TxResult[recorded] + for i, tx := range txs { + wantOK := wantProcessed[i] + + var want recorded + if wantOK { + want = recorded{ + Common: commonData{ + HeaderExtra: extra, + BeforeBlockStateVal: blockVal, + }, + Prefetch: prefetchVal, + Process: processVal, + TxData: tx.Data(), + } + wantPerTx = append(wantPerTx, TxResult[recorded]{ + Tx: IndexedTx{ + Index: i, + Transaction: tx, + }, + Result: want, + }) + } + + got, gotOK := getResult(i) + if gotOK != wantOK { + t.Errorf("Result(%d) got ok %t; want %t", i, gotOK, wantOK) + continue + } + if diff := cmp.Diff(want, got.Result); diff != "" { + t.Errorf("Result(%d) diff (-want +got):\n%s", i, diff) + } + } + + p.FinishBlock(sdb, block, nil) + tests := []struct { + name string + got []TxResult[recorded] + opt cmp.Option + }{ + { + name: "in_transaction_order", + got: h.gotAggregated.txOrder, + }, + { + name: "in_process_order", + got: h.gotAggregated.processOrder, + opt: cmpopts.SortSlices(func(a, b TxResult[recorded]) bool { + return a.Tx.Index < b.Tx.Index + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := cmp.Options{ + tt.opt, + cmp.Comparer(func(a, b *types.Transaction) bool { + return a.Hash() == b.Hash() + }), + } + if diff := cmp.Diff(wantPerTx, tt.got, opts); diff != "" { + t.Errorf("handler.PostProcess() argument diff (-want +got):\n%s", diff) + } + }) + } + }) + + if t.Failed() { + break + } + } +} + +type vmHooks struct { + vm.Preprocessor // the [Processor] + vm.NOOPHooks +} + +func (h *vmHooks) PreprocessingGasCharge(tx common.Hash) (uint64, error) { + return h.Preprocessor.PreprocessingGasCharge(tx) +} + +func TestIntegration(t *testing.T) { + const handlerGas = 500 + handler := &recorder{ + tb: t, + addr: common.Address{'c', 'o', 'n', 'c', 'a', 't'}, + gas: handlerGas, + } + sut := New(8, 8) + precompile := AddAsPrecompile(sut, handler) + t.Cleanup(sut.Close) + + vm.RegisterHooks(&vmHooks{Preprocessor: sut}) + t.Cleanup(vm.TestOnlyClearRegisteredHooks) + + stub := &hookstest.Stub{ + PrecompileOverrides: map[common.Address]libevm.PrecompiledContract{ + handler.addr: vm.NewStatefulPrecompile(precompile), + }, + } + stub.Register(t) + + key, err := crypto.GenerateKey() + require.NoErrorf(t, err, "crypto.GenerateKey()") + eoa := crypto.PubkeyToAddress(key.PublicKey) + + state, evm := ethtest.NewZeroEVM(t) + state.CreateAccount(eoa) + state.SetBalance(eoa, new(uint256.Int).SetAllOne()) + + var ( + txs types.Transactions + wantReturnData [][]byte + wantReceipts types.Receipts + ) + ignore := cmp.Options{ + cmpopts.IgnoreFields( + types.Receipt{}, + "PostState", "CumulativeGasUsed", "BlockNumber", "BlockHash", "Bloom", + ), + cmpopts.IgnoreFields(types.Log{}, "BlockHash"), + } + + header := &types.Header{ + Number: big.NewInt(0), + BaseFee: big.NewInt(0), + } + config := evm.ChainConfig() + rules := config.Rules(header.Number, true, header.Time) + signer := types.MakeSigner(config, header.Number, header.Time) + + for i, addr := range []common.Address{ + {'o', 't', 'h', 'e', 'r'}, + handler.addr, + } { + ui := uint(i) //nolint:gosec // Known to be positive + data := []byte("hello, world") + + gas, err := intrinsicGas(data, types.AccessList{}, &addr, &rules) + require.NoError(t, err, "core.IntrinsicGas(%#x, nil, false, ...)", data) + if addr == handler.addr { + gas += handlerGas + } + + tx := types.MustSignNewTx(key, signer, &types.LegacyTx{ + Nonce: uint64(ui), + To: &addr, + Data: data, + Gas: gas, + }) + txs = append(txs, tx) + + wantR := &types.Receipt{ + Status: types.ReceiptStatusSuccessful, + TxHash: tx.Hash(), + GasUsed: gas, + TransactionIndex: ui, + } + if addr != handler.addr { + wantReturnData = append(wantReturnData, []byte{}) + } else { + rec := &recorded{ + Common: commonData{ + HeaderExtra: slices.Clone(header.Extra), + }, + TxData: tx.Data(), + } + wantReturnData = append(wantReturnData, rec.precompileReturnData()) + + want := rec.asLog() + + want.Address = handler.addr + want.TxHash = tx.Hash() + want.TxIndex = ui + + wantR.Logs = []*types.Log{want} + } + wantReceipts = append(wantReceipts, wantR) + } + + block := types.NewBlock(header, txs, nil, nil, trie.NewStackTrie(nil)) + require.NoError(t, sut.StartBlock(state, rules, block), "StartBlock()") + + pool := core.GasPool(math.MaxUint64) + var receipts types.Receipts + for i, tx := range txs { + state.SetTxContext(tx.Hash(), i) + + t.Run("precompile_return_data", func(t *testing.T) { + // Although [core.ApplyTransaction] is used to get receipts, it + // doesn't provide access to return data. We therefore *also* use + // [core.ApplyMessage] but MUST avoid repeating the same state + // transition as it would fail the second time. + id := evm.StateDB.Snapshot() + t.Cleanup(func() { + evm.StateDB.RevertToSnapshot(id) + }) + + msg, err := core.TransactionToMessage(tx, signer, big.NewInt(0)) + require.NoError(t, err, "core.TransactionToMessage()") + + got, err := core.ApplyMessage(evm, msg, &pool) + require.NoError(t, err, "core.ApplyMessage()") + if diff := cmp.Diff(wantReturnData[i], got.ReturnData, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Return data from precompile (-want +got):\n%s", diff) + } + }) + + var usedGas uint64 + receipt, err := core.ApplyTransaction( + evm.ChainConfig(), + ethtest.DummyChainContext(), + &block.Header().Coinbase, + &pool, + state, + block.Header(), + tx, + &usedGas, + vm.Config{}, + ) + require.NoError(t, err, "ApplyTransaction([%d])", i) + receipts = append(receipts, receipt) + } + sut.FinishBlock(state, block, receipts) + + if diff := cmp.Diff(wantReceipts, handler.gotReceipts, ignore); diff != "" { + t.Errorf("%T diff (-want +got):\n%s", receipts, diff) + } +} + +type expensive struct { + gasCost uint64 +} + +func (expensive) BeforeBlock(libevm.StateReader, *types.Header) int { return 0 } +func (e expensive) ShouldProcess(IndexedTx, int) (do bool, gas uint64) { return true, e.gasCost } +func (expensive) Prefetch(libevm.StateReader, IndexedTx, int) int { return 0 } +func (expensive) Process(libevm.StateReader, IndexedTx, int, int) int { return 0 } +func (expensive) PostProcess(int, Results[int]) int { return 0 } +func (expensive) AfterBlock(StateDB, int, *types.Block, types.Receipts) {} + +func TestTotalCost(t *testing.T) { + tx := types.NewTx(&types.LegacyTx{ + To: &common.Address{}, + Gas: params.TxGas, + }) + b := types.NewBlock( + &types.Header{Number: big.NewInt(0)}, + types.Transactions{tx}, + nil, nil, + trie.NewStackTrie(nil), + ) + rules := params.MergedTestChainConfig.Rules(big.NewInt(0), true, 0) + _, _, sdb := ethtest.NewEmptyStateDB(t) + + tests := []struct { + costs []uint64 + want uint64 + }{ + { + costs: []uint64{1}, + want: 1, + }, + { + costs: []uint64{1, 0}, + want: 1, + }, + { + costs: []uint64{1, 1}, + want: 2, + }, + { + costs: []uint64{math.MaxUint64 - 42, 41}, + want: math.MaxUint64 - 1, + }, + { + costs: []uint64{math.MaxUint64 - 42, 43}, + want: math.MaxUint64, + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%d", tt.costs), func(t *testing.T) { + p := New(1, 1) + t.Cleanup(p.Close) + + for _, c := range tt.costs { + AddHandler(p, expensive{gasCost: c}) + } + require.NoError(t, p.StartBlock(sdb, rules, b), "StartBlock()") + t.Cleanup(func() { p.FinishBlock(sdb, b, nil) }) + + got, err := p.PreprocessingGasCharge(tx.Hash()) + if err != nil || got != tt.want { + t.Errorf("PreprocessingGasCharge() got (%d, %v); want (%d, nil)", got, err, tt.want) + } + }) + } +} + +// TODO(arr4n) unit test for [AddPrecompile] unhappy paths. diff --git a/libevm/precompiles/parallel/precompile.go b/libevm/precompiles/parallel/precompile.go new file mode 100644 index 0000000000..b4436b510d --- /dev/null +++ b/libevm/precompiles/parallel/precompile.go @@ -0,0 +1,46 @@ +// Copyright 2025-2026 the libevm authors. +// +// The libevm additions to go-ethereum are free software: you can redistribute +// them and/or modify them under the terms of the GNU Lesser General Public License +// as published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The libevm additions are distributed in the hope that they will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see +// . + +package parallel + +import "github.com/ava-labs/libevm/core/vm" + +// PrecompileResult is the interface required for a [Handler] to be converted +// into a [vm.PrecompiledStatefulContract]. +type PrecompileResult interface { + // PrecompileOutput's arguments match those of + // [vm.PrecompiledStatefulContract]. It MUST NOT re-charge the `Gas()` + // amount returned by the [Handler], but MAY charge for other computation as + // necessary. + PrecompileOutput(vm.PrecompileEnvironment, []byte) ([]byte, error) +} + +// AddAsPrecompile is equivalent to [AddHandler] except that the returned +// function is a [vm.PrecompiledStatefulContract] instead of a raw result +// fetcher. If the function returned by [AddHandler] returns `false` then the +// precompile returns [vm.ErrExecutionReverted]. +func AddAsPrecompile[CD, D any, R PrecompileResult, A any](p *Processor, h Handler[CD, D, R, A]) vm.PrecompiledStatefulContract { + results := AddHandler(p, h) + + return func(env vm.PrecompileEnvironment, input []byte) ([]byte, error) { + res, ok := results(env.ReadOnlyState().TxIndex()) + if !ok { + // TODO(arr4n) add revert data to match a Solidity-style error + return nil, vm.ErrExecutionReverted + } + return res.Result.PrecompileOutput(env, input) + } +}