mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-28 17:27:36 +00:00
feat: parallel package for precompile pre-processing (#228)
## Why this should be merged EVM parallel co-processors that interface with the regular transaction path via precompiles. ## How this works Introduces the `parallel.Processor`, which orchestrates a set of `parallel.Handler`s. Each `Handler` performs arbitrary, strongly typed processing of any sub-set of transactions in a block and makes its results available to a precompile and/or a post-block method for persisting state. Although stateful, `Handler`s can only read the pre-block and post-block state, which isolates them from conflicts with the regular transaction path. There is deliberately no support for a precompile to "write" to a `Handler`, only to "read". This is because the transaction might still revert, which would also have to be communicated to the `Handler`, resulting in unnecessary complexity. Logs/events are the recommended approach for precompile -> `Handler` communication, to be read from the `types.Receipts` at the end of the block. ## How this was tested Integration tests covering: 1. Selection of transactions to process + end-to-end plumbing of data through a `Handler`. 2. Registration as a precompile, exercised with actual transaction processing, and demonstrating log + return-data correctness. --------- Signed-off-by: Arran Schlosberg <519948+ARR4N@users.noreply.github.com>
This commit is contained in:
parent
6804c3c0f6
commit
98a792673a
6 changed files with 1307 additions and 0 deletions
|
|
@ -56,6 +56,9 @@ type StateReader interface {
|
|||
|
||||
AddressInAccessList(addr common.Address) bool
|
||||
SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool)
|
||||
|
||||
TxHash() common.Hash
|
||||
TxIndex() int
|
||||
}
|
||||
|
||||
// AddressContext carries addresses available to contexts such as calls and
|
||||
|
|
|
|||
52
libevm/precompiles/parallel/eventual.go
Normal file
52
libevm/precompiles/parallel/eventual.go
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
// Copyright 2025-2026 the libevm authors.
|
||||
//
|
||||
// The libevm additions to go-ethereum are free software: you can redistribute
|
||||
// them and/or modify them under the terms of the GNU Lesser General Public License
|
||||
// as published by the Free Software Foundation, either version 3 of the License,
|
||||
// or (at your option) any later version.
|
||||
//
|
||||
// The libevm additions are distributed in the hope that they will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
|
||||
// General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see
|
||||
// <http://www.gnu.org/licenses/>.
|
||||
|
||||
package parallel
|
||||
|
||||
// An eventual type holds a value that is set at some unknown point in the
|
||||
// future and used, possibly concurrently, by one or more peekers or a single
|
||||
// taker (together, "getters"). The zero value is NOT ready for use.
|
||||
type eventual[T any] struct {
|
||||
ch chan T
|
||||
}
|
||||
|
||||
// eventually returns a new eventual value.
|
||||
func eventually[T any]() eventual[T] {
|
||||
return eventual[T]{
|
||||
ch: make(chan T, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// put sets the value, unblocking any current and future getters. put itself is
|
||||
// non-blocking, however it is NOT possible to overwrite the value without an
|
||||
// intervening call to [eventual.take].
|
||||
func (e eventual[T]) put(v T) {
|
||||
e.ch <- v
|
||||
}
|
||||
|
||||
// peek returns the value after making it available for other getters. Although
|
||||
// the act of peeking is threadsafe, the returned value might not be.
|
||||
func (e eventual[T]) peek() T {
|
||||
v := <-e.ch
|
||||
e.ch <- v
|
||||
return v
|
||||
}
|
||||
|
||||
// take returns the value and resets e to its default state as if immediately
|
||||
// after construction.
|
||||
func (e eventual[T]) take() T {
|
||||
return <-e.ch
|
||||
}
|
||||
297
libevm/precompiles/parallel/handler.go
Normal file
297
libevm/precompiles/parallel/handler.go
Normal file
|
|
@ -0,0 +1,297 @@
|
|||
// Copyright 2025-2026 the libevm authors.
|
||||
//
|
||||
// The libevm additions to go-ethereum are free software: you can redistribute
|
||||
// them and/or modify them under the terms of the GNU Lesser General Public License
|
||||
// as published by the Free Software Foundation, either version 3 of the License,
|
||||
// or (at your option) any later version.
|
||||
//
|
||||
// The libevm additions are distributed in the hope that they will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
|
||||
// General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see
|
||||
// <http://www.gnu.org/licenses/>.
|
||||
|
||||
package parallel
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ava-labs/libevm/common"
|
||||
"github.com/ava-labs/libevm/core/types"
|
||||
"github.com/ava-labs/libevm/core/vm"
|
||||
"github.com/ava-labs/libevm/libevm"
|
||||
"github.com/ava-labs/libevm/libevm/stateconf"
|
||||
)
|
||||
|
||||
// A Handler is responsible for processing [types.Transactions] in an
|
||||
// embarrassingly parallel fashion. It is the responsibility of the Handler to
|
||||
// determine whether this is possible, typically only so if one of the following
|
||||
// is true with respect to a precompile associated with the Handler:
|
||||
//
|
||||
// 1. The destination address is that of the precompile; or
|
||||
// 2. At least one [types.AccessTuple] references the precompile's address.
|
||||
//
|
||||
// Scenario (2) allows precompile access to be determined through inspection of
|
||||
// the [types.Transaction] alone, without the need for execution.
|
||||
//
|
||||
// A [Processor] will orchestrate calling of Handler methods as follows:
|
||||
//
|
||||
// | - Prefetch(i) - Process(i)
|
||||
// | / /
|
||||
// | BeforeBlock() - ShouldProcess(0..n) - PostProcess() - AfterBlock()
|
||||
// | \ \
|
||||
// | - Prefetch(j) - Process(j)
|
||||
//
|
||||
// IntRA-Handler guarantees:
|
||||
//
|
||||
// 1. BeforeBlock() precedes all ShouldProcess() calls.
|
||||
// 2. ShouldProcess() calls are sequential, in the same order as transactions in the block.
|
||||
// 3. Prefetch() precedes the respective Process() call. Not called if ShouldProcess() returns false.
|
||||
// 4. PostProcess() precedes AfterBlock().
|
||||
//
|
||||
// Note that PostProcess() MAY be called at any time after BeforeBlock(), and
|
||||
// implementations MUST synchronise with Process() by using the [Results]. There
|
||||
// are no intER-Handler guarantees except that AfterBlock() methods are called
|
||||
// sequentially, in the same order as they were registered with [AddHandler].
|
||||
//
|
||||
// All [libevm.StateReader] instances are opened to the state at the beginning
|
||||
// of the block. The [StateDB] is the same one used to execute the block, before
|
||||
// being committed, and MAY be written to.
|
||||
type Handler[CommonData, Data, Result, Aggregated any] interface {
|
||||
// BeforeBlock is called before all calls to ShouldProcess() on this
|
||||
// Handler.
|
||||
BeforeBlock(libevm.StateReader, *types.Header) CommonData
|
||||
// ShouldProcess reports whether the Handler SHOULD receive the transaction
|
||||
// for processing and, if so, how much gas to charge. Processing is
|
||||
// performed i.f.f. the returned boolean is true and there is sufficient gas
|
||||
// limit to cover intrinsic gas for all Handlers that returned true. If
|
||||
// there is insufficient gas for processing then the transaction will result
|
||||
// in [vm.ErrOutOfGas] as long as the [Processor] is registered with
|
||||
// [vm.RegisterHooks] as a [vm.Preprocessor].
|
||||
//
|
||||
// Implementations MUST NOT perform any meaningful computation
|
||||
// but MAY perform inter-transaction checks such as, for example,
|
||||
// deduplication of work.
|
||||
ShouldProcess(IndexedTx, CommonData) (do bool, gas uint64)
|
||||
// Prefetch is called before the respective call to Process() on this
|
||||
// Handler. It MUST NOT perform any meaningful computation beyond what is
|
||||
// necessary to determine the necessary state to propagate to Process().
|
||||
Prefetch(libevm.StateReader, IndexedTx, CommonData) Data
|
||||
// Process is responsible for performing all meaningful, per-transaction
|
||||
// computation. It receives the common data returned by the single call to
|
||||
// BeforeBlock() as well as the data from the respective call to Prefetch().
|
||||
// The returned result is propagated to PostProcess() and any calls to the
|
||||
// function returned by [AddHandler].
|
||||
//
|
||||
// NOTE: if the result is exposed to the EVM via a precompile then said
|
||||
// precompile will block until Process() returns. While this guarantees the
|
||||
// availability of pre-processed results, it is also the hot path for EVM
|
||||
// transactions.
|
||||
Process(libevm.StateReader, IndexedTx, CommonData, Data) Result
|
||||
// PostProcess is called concurrently with all calls to Process(). It allows
|
||||
// for online aggregation of results into a format ready for writing to
|
||||
// state.
|
||||
//
|
||||
// NOTE: although PostProcess() MAY perform computation, it will block the
|
||||
// calling of AfterBlock() and hence also the execution of the next block.
|
||||
PostProcess(CommonData, Results[Result]) Aggregated
|
||||
// AfterBlock is called after PostProcess() returns and all regular EVM
|
||||
// transaction processing is complete. It MUST NOT perform any meaningful
|
||||
// computation beyond what is necessary to (a) parse receipts, and (b)
|
||||
// persist aggregated results.
|
||||
AfterBlock(StateDB, Aggregated, *types.Block, types.Receipts)
|
||||
}
|
||||
|
||||
// An IndexedTx couples a [types.Transaction] with its index in a block.
|
||||
type IndexedTx struct {
|
||||
Index int
|
||||
*types.Transaction
|
||||
}
|
||||
|
||||
// Results provides mechanisms for blocking on the output of [Handler.Process].
|
||||
type Results[R any] struct {
|
||||
WaitForAll func()
|
||||
TxOrder, ProcessOrder <-chan TxResult[R]
|
||||
}
|
||||
|
||||
// A TxResult couples an [IndexedTx] with its respective result from
|
||||
// [Handler.Process].
|
||||
type TxResult[R any] struct {
|
||||
Tx IndexedTx
|
||||
Result R
|
||||
}
|
||||
|
||||
// StateDB is the subset of [state.StateDB] methods that MAY be called by
|
||||
// [Handler.AfterBlock].
|
||||
type StateDB interface {
|
||||
libevm.StateReader
|
||||
SetState(_ common.Address, key, val common.Hash, _ ...stateconf.StateDBStateOption)
|
||||
}
|
||||
|
||||
var _ handler = (*wrapper[any, any, any, any])(nil)
|
||||
|
||||
// A wrapper exposes the generic functionality of a [Handler] in a non-generic
|
||||
// manner, allowing [Processor] to be free of type parameters.
|
||||
type wrapper[CD, D, R, A any] struct {
|
||||
Handler[CD, D, R, A]
|
||||
|
||||
totalTxsInBlock int
|
||||
txsBeingProcessed sync.WaitGroup
|
||||
|
||||
common eventual[CD]
|
||||
data []eventual[D]
|
||||
|
||||
results []eventual[result[R]]
|
||||
whenProcessed, txOrder chan TxResult[R]
|
||||
|
||||
aggregated eventual[A]
|
||||
}
|
||||
|
||||
// AddHandler registers the [Handler] with the [Processor] and returns a
|
||||
// function to fetch the [TxResult] for the i'th transaction passed to
|
||||
// [Processor.StartBlock].
|
||||
//
|
||||
// The returned function until the respective transaction has had its result
|
||||
// processed, and then returns the value returned by the [Handler]. The returned
|
||||
// boolean will be false if no processing occurred, either because the [Handler]
|
||||
// indicated as such or because the transaction supplied insufficient gas.
|
||||
//
|
||||
// Multiple calls to Result with the same argument are allowed. Callers MUST NOT
|
||||
// charge the gas price for preprocessing as this is handled by
|
||||
// [Processor.PreprocessingGasCharge] if registered as a [vm.Preprocessor].
|
||||
//
|
||||
// Within the scope of a given block, the same value will be returned by each
|
||||
// call with the same argument, such that if R is a pointer then modifications
|
||||
// will persist between calls. However, the caller does NOT have mutually
|
||||
// exclusive access to the [TxResult] so SHOULD NOT modify it, especially since
|
||||
// the result MAY also be accessed by [Handler.PostProcess], with no ordering
|
||||
// guarantees.
|
||||
func AddHandler[CD, D, R, A any](p *Processor, h Handler[CD, D, R, A]) func(txIndex int) (TxResult[R], bool) {
|
||||
w := &wrapper[CD, D, R, A]{
|
||||
Handler: h,
|
||||
common: eventually[CD](),
|
||||
aggregated: eventually[A](),
|
||||
}
|
||||
p.handlers = append(p.handlers, w)
|
||||
return w.result
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) beforeBlock(sdb libevm.StateReader, b *types.Block) {
|
||||
w.totalTxsInBlock = len(b.Transactions())
|
||||
// We can reuse the channels already in the data and results slices because
|
||||
// they're emptied by [wrapper.process] and [wrapper.finishBlock]
|
||||
// respectively.
|
||||
for i := len(w.results); i < w.totalTxsInBlock; i++ {
|
||||
w.data = append(w.data, eventually[D]())
|
||||
w.results = append(w.results, eventually[result[R]]())
|
||||
}
|
||||
|
||||
go func() {
|
||||
// goroutine guaranteed to have completed by the time a respective
|
||||
// getter unblocks (i.e. in any call to [wrapper.prefetch]).
|
||||
w.common.put(w.BeforeBlock(sdb, types.CopyHeader(b.Header())))
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) shouldProcess(tx IndexedTx) (do bool, gas uint64) {
|
||||
return w.Handler.ShouldProcess(tx, w.common.peek())
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) beforeWork(jobs int) {
|
||||
w.txsBeingProcessed.Add(jobs)
|
||||
w.whenProcessed = make(chan TxResult[R], jobs)
|
||||
w.txOrder = make(chan TxResult[R], jobs)
|
||||
go func() {
|
||||
w.txsBeingProcessed.Wait()
|
||||
close(w.whenProcessed)
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) prefetch(sdb libevm.StateReader, job *prefetch) {
|
||||
w.data[job.tx.Index].put(w.Prefetch(sdb, job.tx, w.common.peek()))
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) process(sdb libevm.StateReader, job *process) {
|
||||
defer w.txsBeingProcessed.Done()
|
||||
|
||||
idx := job.tx.Index
|
||||
val := w.Process(sdb, job.tx, w.common.peek(), w.data[idx].take())
|
||||
r := result[R]{
|
||||
tx: job.tx,
|
||||
val: &val,
|
||||
}
|
||||
w.results[idx].put(r)
|
||||
w.whenProcessed <- TxResult[R]{
|
||||
Tx: job.tx,
|
||||
Result: val,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) nullResult(job *job) {
|
||||
w.results[job.tx.Index].put(result[R]{
|
||||
tx: job.tx,
|
||||
val: nil,
|
||||
})
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) result(i int) (TxResult[R], bool) {
|
||||
r := w.results[i].peek()
|
||||
|
||||
txr := TxResult[R]{
|
||||
Tx: r.tx,
|
||||
}
|
||||
if r.val == nil {
|
||||
return txr, false
|
||||
}
|
||||
txr.Result = *r.val
|
||||
return txr, true
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) postProcess() {
|
||||
go func() {
|
||||
defer close(w.txOrder)
|
||||
for i := range w.totalTxsInBlock {
|
||||
r, ok := w.result(i)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
w.txOrder <- r
|
||||
}
|
||||
}()
|
||||
|
||||
res := Results[R]{
|
||||
WaitForAll: w.txsBeingProcessed.Wait,
|
||||
TxOrder: w.txOrder,
|
||||
ProcessOrder: w.whenProcessed,
|
||||
}
|
||||
w.aggregated.put(w.PostProcess(w.common.peek(), res))
|
||||
}
|
||||
|
||||
func (w *wrapper[CD, D, R, A]) finishBlock(sdb vm.StateDB, b *types.Block, rs types.Receipts) {
|
||||
w.AfterBlock(sdb, w.aggregated.take(), b, rs)
|
||||
|
||||
// [wrapper.postProcess] is guaranteed to have finished because it sets
|
||||
// [wrapper.aggregated], from which we have just read. However
|
||||
// [Handler.PostProcess] is under no obligation to block on anything, and
|
||||
// the goroutine filling [wrapper.txOrder] might still be reading results.
|
||||
// We therefore guarantee its completion before "getting and keeping" all of
|
||||
// [wrapper.results] otherwise said goroutine can leak.
|
||||
for range w.txOrder {
|
||||
// Nobody needs these anymore, but we need to know that the channel has
|
||||
// been closed.
|
||||
}
|
||||
// Although we know this will unblock effectively immediately, it's safer to
|
||||
// verify the intuition than to rely on complex reasoning.
|
||||
w.txsBeingProcessed.Wait()
|
||||
|
||||
w.common.take()
|
||||
for _, v := range w.results[:w.totalTxsInBlock] {
|
||||
// Every result channel is guaranteed to have some value in its buffer
|
||||
// because [Processor.BeforeBlock] either sends a nil *R or it
|
||||
// dispatches a job, which will send a non-nil *R.
|
||||
v.take()
|
||||
}
|
||||
}
|
||||
333
libevm/precompiles/parallel/parallel.go
Normal file
333
libevm/precompiles/parallel/parallel.go
Normal file
|
|
@ -0,0 +1,333 @@
|
|||
// Copyright 2025-2026 the libevm authors.
|
||||
//
|
||||
// The libevm additions to go-ethereum are free software: you can redistribute
|
||||
// them and/or modify them under the terms of the GNU Lesser General Public License
|
||||
// as published by the Free Software Foundation, either version 3 of the License,
|
||||
// or (at your option) any later version.
|
||||
//
|
||||
// The libevm additions are distributed in the hope that they will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
|
||||
// General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see
|
||||
// <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package parallel provides functionality for precompiled contracts with
|
||||
// lifespans of an entire block.
|
||||
package parallel
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/ava-labs/libevm/common"
|
||||
"github.com/ava-labs/libevm/core"
|
||||
"github.com/ava-labs/libevm/core/state"
|
||||
"github.com/ava-labs/libevm/core/types"
|
||||
"github.com/ava-labs/libevm/core/vm"
|
||||
"github.com/ava-labs/libevm/libevm"
|
||||
"github.com/ava-labs/libevm/params"
|
||||
)
|
||||
|
||||
// A handler is the non-generic equivalent of a [Handler], exposed by [wrapper].
|
||||
type handler interface {
|
||||
beforeBlock(libevm.StateReader, *types.Block)
|
||||
shouldProcess(IndexedTx) (do bool, gas uint64)
|
||||
beforeWork(jobs int)
|
||||
prefetch(libevm.StateReader, *prefetch)
|
||||
nullResult(*job)
|
||||
process(libevm.StateReader, *process)
|
||||
postProcess()
|
||||
finishBlock(vm.StateDB, *types.Block, types.Receipts)
|
||||
}
|
||||
|
||||
// A Processor orchestrates dispatch and collection of results from one or more
|
||||
// [Handler] instances.
|
||||
type Processor struct {
|
||||
handlers []handler
|
||||
|
||||
workers sync.WaitGroup
|
||||
stateShare stateDBSharer
|
||||
prefetch chan *prefetch
|
||||
process chan *process
|
||||
|
||||
txGas map[common.Hash]uint64
|
||||
}
|
||||
|
||||
type (
|
||||
// job is an alias to allow it to be used as an "underlying type" for
|
||||
// generic type parameters, while prefetch and process are explicitly *not*
|
||||
// aliases, to guarantee that they aren't considered equivalent.
|
||||
job = struct {
|
||||
handler handler
|
||||
tx IndexedTx
|
||||
}
|
||||
prefetch job
|
||||
process job
|
||||
)
|
||||
|
||||
type result[T any] struct {
|
||||
tx IndexedTx
|
||||
val *T
|
||||
}
|
||||
|
||||
// New constructs a new [Processor] with the specified number of concurrent
|
||||
// prefetching and processing workers. As prefetching is typically IO-bound, it
|
||||
// is reasonable to have more prefetchers than processors. The number of
|
||||
// processors SHOULD be determined from GOMAXPROCS. Pipelining in such a fashion
|
||||
// stops prefetching for later transactions being blocked by earlier,
|
||||
// long-running processing; see the respective methods on [Handler] for more
|
||||
// context.
|
||||
//
|
||||
// [Processor.Close] MUST be called after the final call to
|
||||
// [Processor.FinishBlock] to avoid leaking goroutines.
|
||||
func New(prefetchers, processors int) *Processor {
|
||||
prefetchers = max(prefetchers, 1)
|
||||
processors = max(processors, 1)
|
||||
workers := prefetchers + processors
|
||||
|
||||
p := &Processor{
|
||||
stateShare: stateDBSharer{
|
||||
workers: workers,
|
||||
available: make(chan struct{}),
|
||||
sdb: make(chan *state.StateDB, 1),
|
||||
},
|
||||
prefetch: make(chan *prefetch),
|
||||
process: make(chan *process),
|
||||
txGas: make(map[common.Hash]uint64),
|
||||
}
|
||||
|
||||
p.workers.Add(workers) // for shutdown via [Processor.Close]
|
||||
p.stateShare.wg.Add(workers) // for readiness of [Processor.worker] loops
|
||||
for range prefetchers {
|
||||
go worker(p, p.prefetch, func(sdb libevm.StateReader, job *prefetch) {
|
||||
job.handler.prefetch(sdb, job)
|
||||
})
|
||||
}
|
||||
for range processors {
|
||||
go worker(p, p.process, func(sdb libevm.StateReader, job *process) {
|
||||
job.handler.process(sdb, job)
|
||||
})
|
||||
}
|
||||
p.stateShare.wg.Wait()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// A stateDBSharer allows concurrent workers to make copies of a primary
|
||||
// database. When the `available` channel is closed, all workers call
|
||||
// [state.StateDB.Copy] then signal completion on the [sync.WaitGroup]. The
|
||||
// channel is replaced for each round of distribution.
|
||||
type stateDBSharer struct {
|
||||
available chan struct{}
|
||||
sdb chan *state.StateDB
|
||||
workers int
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (s *stateDBSharer) distribute(sdb *state.StateDB) {
|
||||
ch := s.available // already copied by [Processor.worker], which is waiting for it to close
|
||||
s.available = make(chan struct{}) // will be copied, ready for the next distribution
|
||||
|
||||
s.sdb <- sdb
|
||||
s.wg.Add(s.workers)
|
||||
close(ch) // Take a moment to enjoy the symmetry :)
|
||||
s.wg.Wait()
|
||||
<-s.sdb
|
||||
}
|
||||
|
||||
func worker[J ~job](p *Processor, work <-chan *J, do func(libevm.StateReader, *J)) {
|
||||
defer p.workers.Done()
|
||||
|
||||
var sdb *state.StateDB
|
||||
share := &p.stateShare
|
||||
stateAvailable := share.available
|
||||
// Without this signal of readiness, a premature call to
|
||||
// [Processor.StartBlock] could replace `share.nextAvailable` before we've
|
||||
// copied it.
|
||||
share.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stateAvailable: // guaranteed at the beginning of each block
|
||||
// [state.StateDB.Copy] is a complex method that isn't explicitly
|
||||
// documented as being threadsafe.
|
||||
sdb = (<-share.sdb).Copy()
|
||||
share.sdb <- sdb // no need to return the original as each worker copies
|
||||
|
||||
stateAvailable = share.available
|
||||
share.wg.Done()
|
||||
|
||||
case w, ok := <-work:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
do(sdb, w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the [Processor], after which it can no longer be used.
|
||||
func (p *Processor) Close() {
|
||||
close(p.prefetch)
|
||||
close(p.process)
|
||||
p.workers.Wait()
|
||||
}
|
||||
|
||||
// StartBlock dispatches transactions to every [Handler] but returns immediately
|
||||
// after performing preliminary setup. It MUST be paired with a call to
|
||||
// [Processor.FinishBlock], without overlap of blocks.
|
||||
func (p *Processor) StartBlock(sdb *state.StateDB, rules params.Rules, b *types.Block) error {
|
||||
// The distribution mechanism copies the StateDB so we don't need to do it
|
||||
// here, but [wrapper.beforeBlock] doesn't make its own copy. Note that even
|
||||
// reading from a [state.StateDB] is not threadsafe.
|
||||
p.stateShare.distribute(sdb)
|
||||
for _, h := range p.handlers {
|
||||
h.beforeBlock(sdb.Copy(), b)
|
||||
}
|
||||
|
||||
txs := b.Transactions()
|
||||
jobs := make([]*job, 0, len(p.handlers)*len(txs))
|
||||
workloads := make([]int, len(p.handlers))
|
||||
|
||||
for txIdx, rawTx := range txs {
|
||||
tx := IndexedTx{
|
||||
Index: txIdx,
|
||||
Transaction: rawTx,
|
||||
}
|
||||
|
||||
do, err := p.shouldProcess(tx, rules) // MUST NOT be concurrent within a Handler
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, h := range p.handlers {
|
||||
j := &job{
|
||||
tx: tx,
|
||||
handler: h,
|
||||
}
|
||||
if !do[i] {
|
||||
h.nullResult(j)
|
||||
continue
|
||||
}
|
||||
workloads[i]++
|
||||
jobs = append(jobs, j)
|
||||
}
|
||||
}
|
||||
|
||||
for i, w := range workloads {
|
||||
p.handlers[i].beforeWork(w)
|
||||
}
|
||||
// All of the following goroutines are dependent on the one(s) preceding
|
||||
// them, while [wrapper.finishBlock] is dependent on [wrapper.postProcess].
|
||||
// The return of [Processor.FinishBlock] is therefore a guarantee of the end
|
||||
// of the lifespans of all of these goroutines.
|
||||
go func() {
|
||||
for _, j := range jobs {
|
||||
p.prefetch <- (*prefetch)(j)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for _, j := range jobs {
|
||||
p.process <- (*process)(j)
|
||||
}
|
||||
}()
|
||||
for _, h := range p.handlers {
|
||||
go h.postProcess()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FinishBlock propagates its arguments to every [Handler] and resets the
|
||||
// [Processor] to a state ready for the next block. A return from FinishBlock
|
||||
// guarantees that all dispatched work from the respective call to
|
||||
// [Processor.StartBlock] has been completed.
|
||||
func (p *Processor) FinishBlock(sdb vm.StateDB, b *types.Block, rs types.Receipts) {
|
||||
// [Handler.FinishBlock] is allowed to write to state, so these MUST NOT be
|
||||
// concurrent.
|
||||
for _, h := range p.handlers {
|
||||
h.finishBlock(sdb, b, rs)
|
||||
}
|
||||
for tx := range p.txGas {
|
||||
delete(p.txGas, tx)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Processor) shouldProcess(tx IndexedTx, rules params.Rules) (process []bool, retErr error) {
|
||||
// An explicit 0 is necessary to avoid [Processor.PreprocessingGasCharge]
|
||||
// returning [ErrTxUnknown].
|
||||
p.txGas[tx.Hash()] = 0
|
||||
|
||||
process = make([]bool, len(p.handlers))
|
||||
var totalCost uint64
|
||||
for i, h := range p.handlers {
|
||||
do, cost := h.shouldProcess(tx)
|
||||
if !do {
|
||||
continue
|
||||
}
|
||||
process[i] = true
|
||||
// It's safe to cap total cost at [math.MaxUint64] because intrinsic gas
|
||||
// is always non-zero and the tx would therefore OOG. Not that we could
|
||||
// reasonably expect such high gas consumption though ¯\_(ツ)_/¯
|
||||
totalCost += min(cost, math.MaxUint64-totalCost)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if retErr == nil {
|
||||
p.txGas[tx.Hash()] = totalCost
|
||||
}
|
||||
}()
|
||||
|
||||
spent, err := txIntrinsicGas(tx.Transaction, &rules)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("calculating intrinsic gas of %#x: %v", tx.Hash(), err)
|
||||
}
|
||||
if spent > tx.Gas() {
|
||||
// If this happens then consensus has a bug because the tx shouldn't
|
||||
// have been included. We include the check, however, for completeness
|
||||
// as we would otherwise underflow below.
|
||||
return nil, core.ErrIntrinsicGas
|
||||
}
|
||||
if remain := tx.Gas() - spent; remain < totalCost {
|
||||
for i := range process {
|
||||
process[i] = false
|
||||
}
|
||||
}
|
||||
return process, nil
|
||||
}
|
||||
|
||||
func txIntrinsicGas(tx *types.Transaction, rules *params.Rules) (uint64, error) {
|
||||
return intrinsicGas(tx.Data(), tx.AccessList(), tx.To(), rules)
|
||||
}
|
||||
|
||||
func intrinsicGas(data []byte, access types.AccessList, txTo *common.Address, rules *params.Rules) (uint64, error) {
|
||||
create := txTo == nil
|
||||
return core.IntrinsicGas(
|
||||
data,
|
||||
access,
|
||||
create,
|
||||
rules.IsHomestead,
|
||||
rules.IsIstanbul, // EIP-2028
|
||||
rules.IsShanghai, // EIP-3860
|
||||
)
|
||||
}
|
||||
|
||||
// ErrTxUnknown is returned by [Processor.PreprocessingGasCharge] if it is
|
||||
// called with a transaction hash that wasn't in the last block passed to
|
||||
// [Processor.StartBlock].
|
||||
var ErrTxUnknown = errors.New("transaction unknown by parallel preprocessor")
|
||||
|
||||
// PreprocessingGasCharge implements the [vm.Preprocessor] interface and MUST be
|
||||
// registered via [vm.RegisterHooks] to ensure proper gas accounting.
|
||||
func (p *Processor) PreprocessingGasCharge(tx common.Hash) (uint64, error) {
|
||||
g, ok := p.txGas[tx]
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("%w: %v", ErrTxUnknown, tx)
|
||||
}
|
||||
return g, nil
|
||||
}
|
||||
|
||||
var _ vm.Preprocessor = (*Processor)(nil)
|
||||
576
libevm/precompiles/parallel/parallel_test.go
Normal file
576
libevm/precompiles/parallel/parallel_test.go
Normal file
|
|
@ -0,0 +1,576 @@
|
|||
// Copyright 2025-2026 the libevm authors.
|
||||
//
|
||||
// The libevm additions to go-ethereum are free software: you can redistribute
|
||||
// them and/or modify them under the terms of the GNU Lesser General Public License
|
||||
// as published by the Free Software Foundation, either version 3 of the License,
|
||||
// or (at your option) any later version.
|
||||
//
|
||||
// The libevm additions are distributed in the hope that they will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
|
||||
// General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see
|
||||
// <http://www.gnu.org/licenses/>.
|
||||
|
||||
package parallel
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
"math/rand/v2"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/holiman/uint256"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"github.com/ava-labs/libevm/common"
|
||||
"github.com/ava-labs/libevm/core"
|
||||
"github.com/ava-labs/libevm/core/types"
|
||||
"github.com/ava-labs/libevm/core/vm"
|
||||
"github.com/ava-labs/libevm/crypto"
|
||||
"github.com/ava-labs/libevm/libevm"
|
||||
"github.com/ava-labs/libevm/libevm/ethtest"
|
||||
"github.com/ava-labs/libevm/libevm/hookstest"
|
||||
"github.com/ava-labs/libevm/params"
|
||||
"github.com/ava-labs/libevm/trie"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m, goleak.IgnoreCurrent())
|
||||
}
|
||||
|
||||
type recorder struct {
|
||||
tb testing.TB
|
||||
|
||||
gas uint64
|
||||
addr common.Address
|
||||
blockKey, prefetchKey, processKey common.Hash
|
||||
|
||||
gotReceipts types.Receipts
|
||||
gotAggregated aggregated
|
||||
}
|
||||
|
||||
type aggregated struct {
|
||||
txOrder, processOrder []TxResult[recorded]
|
||||
}
|
||||
|
||||
type recorded struct {
|
||||
TxData []byte
|
||||
Prefetch, Process common.Hash
|
||||
Common commonData
|
||||
}
|
||||
|
||||
type commonData struct {
|
||||
HeaderExtra []byte
|
||||
BeforeBlockStateVal common.Hash
|
||||
}
|
||||
|
||||
func (r *recorder) BeforeBlock(sdb libevm.StateReader, h *types.Header) commonData {
|
||||
return commonData{
|
||||
HeaderExtra: slices.Clone(h.Extra),
|
||||
BeforeBlockStateVal: sdb.GetState(r.addr, r.blockKey),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recorder) ShouldProcess(tx IndexedTx, _ commonData) (bool, uint64) {
|
||||
// TODO(arr4n) test that the [commonData] received here is the same as that
|
||||
// returned by [recorder.BeforeBlock].
|
||||
if to := tx.To(); to != nil && *to == r.addr {
|
||||
return true, r.gas
|
||||
}
|
||||
return false, 0
|
||||
}
|
||||
|
||||
type prefetched struct {
|
||||
prefetchStateVal common.Hash
|
||||
common commonData
|
||||
}
|
||||
|
||||
func (r *recorder) Prefetch(sdb libevm.StateReader, tx IndexedTx, cd commonData) prefetched {
|
||||
return prefetched{
|
||||
common: cd,
|
||||
prefetchStateVal: sdb.GetState(r.addr, r.prefetchKey),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recorder) Process(sdb libevm.StateReader, tx IndexedTx, cd commonData, data prefetched) recorded {
|
||||
if diff := cmp.Diff(cd, data.common); diff != "" {
|
||||
r.tb.Errorf("Mismatched CommonData propagation to Handler methods; diff (-Process, +Prefetch):\n%s", diff)
|
||||
}
|
||||
|
||||
return recorded{
|
||||
TxData: slices.Clone(tx.Data()),
|
||||
Prefetch: data.prefetchStateVal,
|
||||
Process: sdb.GetState(r.addr, r.processKey),
|
||||
Common: cd,
|
||||
}
|
||||
}
|
||||
|
||||
var _ PrecompileResult = recorded{}
|
||||
|
||||
func (r recorded) PrecompileOutput(env vm.PrecompileEnvironment, input []byte) ([]byte, error) {
|
||||
l := r.asLog()
|
||||
l.Address = env.Addresses().EVMSemantic.Self
|
||||
env.StateDB().AddLog(l)
|
||||
return r.precompileReturnData(), nil
|
||||
}
|
||||
|
||||
func (r recorded) precompileReturnData() []byte {
|
||||
return slices.Concat(r.Common.HeaderExtra, []byte("|"), r.TxData)
|
||||
}
|
||||
|
||||
func (r recorded) asLog() *types.Log {
|
||||
return &types.Log{
|
||||
Topics: []common.Hash{r.Common.BeforeBlockStateVal, r.Prefetch, r.Process},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recorder) PostProcess(cd commonData, res Results[recorded]) aggregated {
|
||||
// Although unnecessary because of the ranging over both channels, this just
|
||||
// demonstrates that it's non-blocking.
|
||||
defer res.WaitForAll()
|
||||
|
||||
var out aggregated
|
||||
for res := range res.TxOrder {
|
||||
out.txOrder = append(out.txOrder, res)
|
||||
}
|
||||
for res := range res.ProcessOrder {
|
||||
out.processOrder = append(out.processOrder, res)
|
||||
}
|
||||
|
||||
if len(out.txOrder) > 0 {
|
||||
if diff := cmp.Diff(cd, out.txOrder[0].Result.Common); diff != "" {
|
||||
r.tb.Errorf("Mismatched CommonData propagation to Handler methods; diff (-PostProcess, +Process):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (r *recorder) AfterBlock(_ StateDB, agg aggregated, _ *types.Block, rs types.Receipts) {
|
||||
r.gotReceipts = slices.Clone(rs)
|
||||
r.gotAggregated = agg
|
||||
}
|
||||
|
||||
func asHash(s string) (h common.Hash) {
|
||||
copy(h[:], []byte(s))
|
||||
return
|
||||
}
|
||||
|
||||
func TestProcessor(t *testing.T) {
|
||||
handler := &recorder{
|
||||
tb: t,
|
||||
addr: common.Address{'c', 'o', 'n', 'c', 'a', 't'},
|
||||
gas: 1e6,
|
||||
blockKey: asHash("block"),
|
||||
prefetchKey: asHash("prefetch"),
|
||||
processKey: asHash("process"),
|
||||
}
|
||||
p := New(8, 8)
|
||||
getResult := AddHandler(p, handler)
|
||||
t.Cleanup(p.Close)
|
||||
|
||||
type blockParams struct {
|
||||
numTxs int
|
||||
sendToAddrEvery, sufficientGasEvery int
|
||||
}
|
||||
|
||||
// Each set of params is effectively a test case, but they are all run on
|
||||
// the same [Processor].
|
||||
tests := []blockParams{
|
||||
{
|
||||
numTxs: 0,
|
||||
},
|
||||
{
|
||||
numTxs: 500,
|
||||
sendToAddrEvery: 7,
|
||||
sufficientGasEvery: 5,
|
||||
},
|
||||
{
|
||||
numTxs: 1_000,
|
||||
sendToAddrEvery: 7,
|
||||
sufficientGasEvery: 5,
|
||||
},
|
||||
{
|
||||
numTxs: 1_000,
|
||||
sendToAddrEvery: 11,
|
||||
sufficientGasEvery: 3,
|
||||
},
|
||||
{
|
||||
numTxs: 100,
|
||||
sendToAddrEvery: 1,
|
||||
sufficientGasEvery: 1,
|
||||
},
|
||||
{
|
||||
numTxs: 0,
|
||||
},
|
||||
}
|
||||
|
||||
rng := rand.New(rand.NewPCG(0, 0)) //nolint:gosec // Reproducibility is useful for testing
|
||||
for range 100 {
|
||||
tests = append(tests, blockParams{
|
||||
numTxs: rng.IntN(1000),
|
||||
sendToAddrEvery: 1 + rng.IntN(30),
|
||||
sufficientGasEvery: 1 + rng.IntN(30),
|
||||
})
|
||||
}
|
||||
|
||||
_, _, sdb := ethtest.NewEmptyStateDB(t)
|
||||
h := handler
|
||||
blockVal := asHash("block_val")
|
||||
sdb.SetState(h.addr, h.blockKey, blockVal)
|
||||
prefetchVal := asHash("prefetch_val")
|
||||
sdb.SetState(h.addr, h.prefetchKey, prefetchVal)
|
||||
processVal := asHash("process_val")
|
||||
sdb.SetState(h.addr, h.processKey, processVal)
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
t.Logf("%+v", tt)
|
||||
|
||||
var rules params.Rules
|
||||
txs := make(types.Transactions, tt.numTxs)
|
||||
wantProcessed := make([]bool, tt.numTxs)
|
||||
for i := range len(txs) {
|
||||
var (
|
||||
to common.Address
|
||||
extraGas uint64
|
||||
)
|
||||
|
||||
wantProcessed[i] = true
|
||||
if i%tt.sendToAddrEvery == 0 {
|
||||
to = handler.addr
|
||||
} else {
|
||||
wantProcessed[i] = false
|
||||
}
|
||||
if i%tt.sufficientGasEvery == 0 {
|
||||
extraGas = handler.gas
|
||||
} else {
|
||||
wantProcessed[i] = false
|
||||
}
|
||||
|
||||
data := binary.BigEndian.AppendUint64(nil, uint64(i)) //nolint:gosec // Known to be positive
|
||||
gas, err := intrinsicGas(data, types.AccessList{}, &handler.addr, &rules)
|
||||
require.NoError(t, err, "core.IntrinsicGas(%#x, nil, false, ...)", data)
|
||||
|
||||
txs[i] = types.NewTx(&types.LegacyTx{
|
||||
To: &to,
|
||||
Data: data,
|
||||
Gas: gas + extraGas,
|
||||
})
|
||||
}
|
||||
|
||||
extra := []byte("extra")
|
||||
block := types.NewBlock(&types.Header{Extra: extra}, txs, nil, nil, trie.NewStackTrie(nil))
|
||||
require.NoError(t, p.StartBlock(sdb, rules, block), "StartBlock()")
|
||||
|
||||
var wantPerTx []TxResult[recorded]
|
||||
for i, tx := range txs {
|
||||
wantOK := wantProcessed[i]
|
||||
|
||||
var want recorded
|
||||
if wantOK {
|
||||
want = recorded{
|
||||
Common: commonData{
|
||||
HeaderExtra: extra,
|
||||
BeforeBlockStateVal: blockVal,
|
||||
},
|
||||
Prefetch: prefetchVal,
|
||||
Process: processVal,
|
||||
TxData: tx.Data(),
|
||||
}
|
||||
wantPerTx = append(wantPerTx, TxResult[recorded]{
|
||||
Tx: IndexedTx{
|
||||
Index: i,
|
||||
Transaction: tx,
|
||||
},
|
||||
Result: want,
|
||||
})
|
||||
}
|
||||
|
||||
got, gotOK := getResult(i)
|
||||
if gotOK != wantOK {
|
||||
t.Errorf("Result(%d) got ok %t; want %t", i, gotOK, wantOK)
|
||||
continue
|
||||
}
|
||||
if diff := cmp.Diff(want, got.Result); diff != "" {
|
||||
t.Errorf("Result(%d) diff (-want +got):\n%s", i, diff)
|
||||
}
|
||||
}
|
||||
|
||||
p.FinishBlock(sdb, block, nil)
|
||||
tests := []struct {
|
||||
name string
|
||||
got []TxResult[recorded]
|
||||
opt cmp.Option
|
||||
}{
|
||||
{
|
||||
name: "in_transaction_order",
|
||||
got: h.gotAggregated.txOrder,
|
||||
},
|
||||
{
|
||||
name: "in_process_order",
|
||||
got: h.gotAggregated.processOrder,
|
||||
opt: cmpopts.SortSlices(func(a, b TxResult[recorded]) bool {
|
||||
return a.Tx.Index < b.Tx.Index
|
||||
}),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
opts := cmp.Options{
|
||||
tt.opt,
|
||||
cmp.Comparer(func(a, b *types.Transaction) bool {
|
||||
return a.Hash() == b.Hash()
|
||||
}),
|
||||
}
|
||||
if diff := cmp.Diff(wantPerTx, tt.got, opts); diff != "" {
|
||||
t.Errorf("handler.PostProcess() argument diff (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
if t.Failed() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type vmHooks struct {
|
||||
vm.Preprocessor // the [Processor]
|
||||
vm.NOOPHooks
|
||||
}
|
||||
|
||||
func (h *vmHooks) PreprocessingGasCharge(tx common.Hash) (uint64, error) {
|
||||
return h.Preprocessor.PreprocessingGasCharge(tx)
|
||||
}
|
||||
|
||||
func TestIntegration(t *testing.T) {
|
||||
const handlerGas = 500
|
||||
handler := &recorder{
|
||||
tb: t,
|
||||
addr: common.Address{'c', 'o', 'n', 'c', 'a', 't'},
|
||||
gas: handlerGas,
|
||||
}
|
||||
sut := New(8, 8)
|
||||
precompile := AddAsPrecompile(sut, handler)
|
||||
t.Cleanup(sut.Close)
|
||||
|
||||
vm.RegisterHooks(&vmHooks{Preprocessor: sut})
|
||||
t.Cleanup(vm.TestOnlyClearRegisteredHooks)
|
||||
|
||||
stub := &hookstest.Stub{
|
||||
PrecompileOverrides: map[common.Address]libevm.PrecompiledContract{
|
||||
handler.addr: vm.NewStatefulPrecompile(precompile),
|
||||
},
|
||||
}
|
||||
stub.Register(t)
|
||||
|
||||
key, err := crypto.GenerateKey()
|
||||
require.NoErrorf(t, err, "crypto.GenerateKey()")
|
||||
eoa := crypto.PubkeyToAddress(key.PublicKey)
|
||||
|
||||
state, evm := ethtest.NewZeroEVM(t)
|
||||
state.CreateAccount(eoa)
|
||||
state.SetBalance(eoa, new(uint256.Int).SetAllOne())
|
||||
|
||||
var (
|
||||
txs types.Transactions
|
||||
wantReturnData [][]byte
|
||||
wantReceipts types.Receipts
|
||||
)
|
||||
ignore := cmp.Options{
|
||||
cmpopts.IgnoreFields(
|
||||
types.Receipt{},
|
||||
"PostState", "CumulativeGasUsed", "BlockNumber", "BlockHash", "Bloom",
|
||||
),
|
||||
cmpopts.IgnoreFields(types.Log{}, "BlockHash"),
|
||||
}
|
||||
|
||||
header := &types.Header{
|
||||
Number: big.NewInt(0),
|
||||
BaseFee: big.NewInt(0),
|
||||
}
|
||||
config := evm.ChainConfig()
|
||||
rules := config.Rules(header.Number, true, header.Time)
|
||||
signer := types.MakeSigner(config, header.Number, header.Time)
|
||||
|
||||
for i, addr := range []common.Address{
|
||||
{'o', 't', 'h', 'e', 'r'},
|
||||
handler.addr,
|
||||
} {
|
||||
ui := uint(i) //nolint:gosec // Known to be positive
|
||||
data := []byte("hello, world")
|
||||
|
||||
gas, err := intrinsicGas(data, types.AccessList{}, &addr, &rules)
|
||||
require.NoError(t, err, "core.IntrinsicGas(%#x, nil, false, ...)", data)
|
||||
if addr == handler.addr {
|
||||
gas += handlerGas
|
||||
}
|
||||
|
||||
tx := types.MustSignNewTx(key, signer, &types.LegacyTx{
|
||||
Nonce: uint64(ui),
|
||||
To: &addr,
|
||||
Data: data,
|
||||
Gas: gas,
|
||||
})
|
||||
txs = append(txs, tx)
|
||||
|
||||
wantR := &types.Receipt{
|
||||
Status: types.ReceiptStatusSuccessful,
|
||||
TxHash: tx.Hash(),
|
||||
GasUsed: gas,
|
||||
TransactionIndex: ui,
|
||||
}
|
||||
if addr != handler.addr {
|
||||
wantReturnData = append(wantReturnData, []byte{})
|
||||
} else {
|
||||
rec := &recorded{
|
||||
Common: commonData{
|
||||
HeaderExtra: slices.Clone(header.Extra),
|
||||
},
|
||||
TxData: tx.Data(),
|
||||
}
|
||||
wantReturnData = append(wantReturnData, rec.precompileReturnData())
|
||||
|
||||
want := rec.asLog()
|
||||
|
||||
want.Address = handler.addr
|
||||
want.TxHash = tx.Hash()
|
||||
want.TxIndex = ui
|
||||
|
||||
wantR.Logs = []*types.Log{want}
|
||||
}
|
||||
wantReceipts = append(wantReceipts, wantR)
|
||||
}
|
||||
|
||||
block := types.NewBlock(header, txs, nil, nil, trie.NewStackTrie(nil))
|
||||
require.NoError(t, sut.StartBlock(state, rules, block), "StartBlock()")
|
||||
|
||||
pool := core.GasPool(math.MaxUint64)
|
||||
var receipts types.Receipts
|
||||
for i, tx := range txs {
|
||||
state.SetTxContext(tx.Hash(), i)
|
||||
|
||||
t.Run("precompile_return_data", func(t *testing.T) {
|
||||
// Although [core.ApplyTransaction] is used to get receipts, it
|
||||
// doesn't provide access to return data. We therefore *also* use
|
||||
// [core.ApplyMessage] but MUST avoid repeating the same state
|
||||
// transition as it would fail the second time.
|
||||
id := evm.StateDB.Snapshot()
|
||||
t.Cleanup(func() {
|
||||
evm.StateDB.RevertToSnapshot(id)
|
||||
})
|
||||
|
||||
msg, err := core.TransactionToMessage(tx, signer, big.NewInt(0))
|
||||
require.NoError(t, err, "core.TransactionToMessage()")
|
||||
|
||||
got, err := core.ApplyMessage(evm, msg, &pool)
|
||||
require.NoError(t, err, "core.ApplyMessage()")
|
||||
if diff := cmp.Diff(wantReturnData[i], got.ReturnData, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("Return data from precompile (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
|
||||
var usedGas uint64
|
||||
receipt, err := core.ApplyTransaction(
|
||||
evm.ChainConfig(),
|
||||
ethtest.DummyChainContext(),
|
||||
&block.Header().Coinbase,
|
||||
&pool,
|
||||
state,
|
||||
block.Header(),
|
||||
tx,
|
||||
&usedGas,
|
||||
vm.Config{},
|
||||
)
|
||||
require.NoError(t, err, "ApplyTransaction([%d])", i)
|
||||
receipts = append(receipts, receipt)
|
||||
}
|
||||
sut.FinishBlock(state, block, receipts)
|
||||
|
||||
if diff := cmp.Diff(wantReceipts, handler.gotReceipts, ignore); diff != "" {
|
||||
t.Errorf("%T diff (-want +got):\n%s", receipts, diff)
|
||||
}
|
||||
}
|
||||
|
||||
type expensive struct {
|
||||
gasCost uint64
|
||||
}
|
||||
|
||||
func (expensive) BeforeBlock(libevm.StateReader, *types.Header) int { return 0 }
|
||||
func (e expensive) ShouldProcess(IndexedTx, int) (do bool, gas uint64) { return true, e.gasCost }
|
||||
func (expensive) Prefetch(libevm.StateReader, IndexedTx, int) int { return 0 }
|
||||
func (expensive) Process(libevm.StateReader, IndexedTx, int, int) int { return 0 }
|
||||
func (expensive) PostProcess(int, Results[int]) int { return 0 }
|
||||
func (expensive) AfterBlock(StateDB, int, *types.Block, types.Receipts) {}
|
||||
|
||||
func TestTotalCost(t *testing.T) {
|
||||
tx := types.NewTx(&types.LegacyTx{
|
||||
To: &common.Address{},
|
||||
Gas: params.TxGas,
|
||||
})
|
||||
b := types.NewBlock(
|
||||
&types.Header{Number: big.NewInt(0)},
|
||||
types.Transactions{tx},
|
||||
nil, nil,
|
||||
trie.NewStackTrie(nil),
|
||||
)
|
||||
rules := params.MergedTestChainConfig.Rules(big.NewInt(0), true, 0)
|
||||
_, _, sdb := ethtest.NewEmptyStateDB(t)
|
||||
|
||||
tests := []struct {
|
||||
costs []uint64
|
||||
want uint64
|
||||
}{
|
||||
{
|
||||
costs: []uint64{1},
|
||||
want: 1,
|
||||
},
|
||||
{
|
||||
costs: []uint64{1, 0},
|
||||
want: 1,
|
||||
},
|
||||
{
|
||||
costs: []uint64{1, 1},
|
||||
want: 2,
|
||||
},
|
||||
{
|
||||
costs: []uint64{math.MaxUint64 - 42, 41},
|
||||
want: math.MaxUint64 - 1,
|
||||
},
|
||||
{
|
||||
costs: []uint64{math.MaxUint64 - 42, 43},
|
||||
want: math.MaxUint64,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("%d", tt.costs), func(t *testing.T) {
|
||||
p := New(1, 1)
|
||||
t.Cleanup(p.Close)
|
||||
|
||||
for _, c := range tt.costs {
|
||||
AddHandler(p, expensive{gasCost: c})
|
||||
}
|
||||
require.NoError(t, p.StartBlock(sdb, rules, b), "StartBlock()")
|
||||
t.Cleanup(func() { p.FinishBlock(sdb, b, nil) })
|
||||
|
||||
got, err := p.PreprocessingGasCharge(tx.Hash())
|
||||
if err != nil || got != tt.want {
|
||||
t.Errorf("PreprocessingGasCharge() got (%d, %v); want (%d, nil)", got, err, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(arr4n) unit test for [AddPrecompile] unhappy paths.
|
||||
46
libevm/precompiles/parallel/precompile.go
Normal file
46
libevm/precompiles/parallel/precompile.go
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
// Copyright 2025-2026 the libevm authors.
|
||||
//
|
||||
// The libevm additions to go-ethereum are free software: you can redistribute
|
||||
// them and/or modify them under the terms of the GNU Lesser General Public License
|
||||
// as published by the Free Software Foundation, either version 3 of the License,
|
||||
// or (at your option) any later version.
|
||||
//
|
||||
// The libevm additions are distributed in the hope that they will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
|
||||
// General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see
|
||||
// <http://www.gnu.org/licenses/>.
|
||||
|
||||
package parallel
|
||||
|
||||
import "github.com/ava-labs/libevm/core/vm"
|
||||
|
||||
// PrecompileResult is the interface required for a [Handler] to be converted
|
||||
// into a [vm.PrecompiledStatefulContract].
|
||||
type PrecompileResult interface {
|
||||
// PrecompileOutput's arguments match those of
|
||||
// [vm.PrecompiledStatefulContract]. It MUST NOT re-charge the `Gas()`
|
||||
// amount returned by the [Handler], but MAY charge for other computation as
|
||||
// necessary.
|
||||
PrecompileOutput(vm.PrecompileEnvironment, []byte) ([]byte, error)
|
||||
}
|
||||
|
||||
// AddAsPrecompile is equivalent to [AddHandler] except that the returned
|
||||
// function is a [vm.PrecompiledStatefulContract] instead of a raw result
|
||||
// fetcher. If the function returned by [AddHandler] returns `false` then the
|
||||
// precompile returns [vm.ErrExecutionReverted].
|
||||
func AddAsPrecompile[CD, D any, R PrecompileResult, A any](p *Processor, h Handler[CD, D, R, A]) vm.PrecompiledStatefulContract {
|
||||
results := AddHandler(p, h)
|
||||
|
||||
return func(env vm.PrecompileEnvironment, input []byte) ([]byte, error) {
|
||||
res, ok := results(env.ReadOnlyState().TxIndex())
|
||||
if !ok {
|
||||
// TODO(arr4n) add revert data to match a Solidity-style error
|
||||
return nil, vm.ErrExecutionReverted
|
||||
}
|
||||
return res.Result.PrecompileOutput(env, input)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue