core: implement BAL reader

This commit is contained in:
Gary Rong 2026-01-30 14:46:36 +08:00
parent e26fa207f2
commit adce9fbb34
4 changed files with 533 additions and 5 deletions

View file

@ -2093,7 +2093,7 @@ func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, s
//
// Note: the main processor and prefetcher share the same reader with a local
// cache for mitigating the overhead of state access.
prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot)
prefetch, process, err := bc.statedb.ReadersWithCache(parentRoot)
if err != nil {
return nil, err
}

View file

@ -221,10 +221,10 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), sr), nil
}
// ReadersWithCacheStats creates a pair of state readers that share the same
// underlying state reader and internal state cache, while maintaining separate
// statistics respectively.
func (db *CachingDB) ReadersWithCacheStats(stateRoot common.Hash) (Reader, Reader, error) {
// ReadersWithCache creates a pair of state readers that share the same
// underlying state reader and internal state cache, while maintaining
// separate statistics respectively.
func (db *CachingDB) ReadersWithCache(stateRoot common.Hash) (Reader, Reader, error) {
r, err := db.StateReader(stateRoot)
if err != nil {
return nil, nil, err
@ -235,6 +235,21 @@ func (db *CachingDB) ReadersWithCacheStats(stateRoot common.Hash) (Reader, Reade
return ra, rb, nil
}
// ReaderEIP7928 creates a state reader with the manner of Block-level accessList.
func (db *CachingDB) ReaderEIP7928(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int) (Reader, error) {
base, err := db.StateReader(stateRoot)
if err != nil {
return nil, err
}
// Construct the state reader with native cache and associated statistics
r := newStateReaderWithStats(newStateReaderWithCache(base))
// Construct the state reader with background prefetching
pr := newPrefetchStateReader(r, accessList, threads)
return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), pr), nil
}
// OpenTrie opens the main account trie at a specific root hash.
func (db *CachingDB) OpenTrie(root common.Hash) (Trie, error) {
if db.triedb.IsVerkle() {

View file

@ -0,0 +1,312 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
// The EIP27928 reader utilizes a hierarchical architecture to optimize state
// access during block execution:
//
// - Base layer: The reader is initialized with the pre-transition state root,
// providing the access of the state.
//
// - Prefetching Layer: This base reader is wrapped by newPrefetchStateReader.
// Using an Access List hint, it asynchronously fetches required state data
// in the background, minimizing I/O blocking during transaction processing.
//
// - Execution Layer: To support parallel transaction execution within the EIP
// 7928 context, readers are wrapped in ReaderWithBlockLevelAccessList.
// This layer provides a "unified view" by merging the pre-transition state
// with mutated states from preceding transactions in the block.
//
// - Tracking Layer: Finally, the readerTracker wraps the execution reader to
// capture all state accesses made during a specific transaction. These individual
// access are subsequently merged to construct a comprehensive access list
// for the entire block.
//
// The architecture can be illustrated by the diagram below:
// [ Block Level Access List ] <────────────────┐
// ▲ │ (Merge)
// │ │
// ┌───────┴───────┐ ┌───────┴───────┐
// │ readerTracker │ │ readerTracker │ (Access Tracking)
// └───────┬───────┘ └───────┬───────┘
// │ │
// ┌──────────────┴──────────────┐ ┌──────────────┴──────────────┐
// │ ReaderWithBlockLevelAL │ │ ReaderWithBlockLevelAL │ (Unified View)
// │ (Pre-state + Mutations) │ │ (Pre-state + Mutations) │
// └──────────────┬──────────────┘ └──────────────┬──────────────┘
// │ │
// └────────────────┬─────────────────┘
// │
// ┌──────────────┴──────────────┐
// │ newPrefetchStateReader │ (Async I/O)
// │ (Access List Hint driven) │
// └──────────────┬──────────────┘
// │
// ┌──────────────┴──────────────┐
// │ Base Reader │ (State Root)
// │ (State & Contract Code) │
// └─────────────────────────────┘
import (
"maps"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
)
type fetchTask struct {
addr common.Address
slots []common.Hash
}
func (t *fetchTask) weight() int { return 1 + len(t.slots) }
type prefetchStateReader struct {
StateReader
tasks []*fetchTask
nThreads int
done chan struct{}
term chan struct{}
closeOnce sync.Once
}
func newPrefetchStateReader(reader StateReader, accessList map[common.Address][]common.Hash, nThreads int) *prefetchStateReader {
tasks := make([]*fetchTask, 0, len(accessList))
for addr, slots := range accessList {
tasks = append(tasks, &fetchTask{
addr: addr,
slots: slots,
})
}
return newPrefetchStateReaderInternal(reader, tasks, nThreads)
}
func newPrefetchStateReaderInternal(reader StateReader, tasks []*fetchTask, nThreads int) *prefetchStateReader {
r := &prefetchStateReader{
StateReader: reader,
tasks: tasks,
nThreads: nThreads,
done: make(chan struct{}),
term: make(chan struct{}),
}
go r.prefetch()
return r
}
func (r *prefetchStateReader) Close() {
r.closeOnce.Do(func() {
close(r.term)
<-r.done
})
}
func (r *prefetchStateReader) Wait() error {
select {
case <-r.term:
return nil
case <-r.done:
return nil
}
}
func (r *prefetchStateReader) prefetch() {
defer close(r.done)
if len(r.tasks) == 0 {
return
}
var total int
for _, t := range r.tasks {
total += t.weight()
}
var (
wg sync.WaitGroup
unit = (total + r.nThreads - 1) / r.nThreads // round-up the per worker unit
)
for i := 0; i < r.nThreads; i++ {
start := i * unit
if start >= total {
break
}
limit := (i + 1) * unit
if i == r.nThreads-1 {
limit = total
}
// Schedule the worker for prefetching, the items on the range [start, limit)
// is exclusively assigned for this worker.
wg.Add(1)
go func(workerID, startW, endW int) {
r.process(startW, endW)
wg.Done()
}(i, start, limit)
}
wg.Wait()
}
func (r *prefetchStateReader) process(start, limit int) {
var total = 0
for _, t := range r.tasks {
tw := t.weight()
if total+tw > start {
s := 0
if start > total {
s = start - total
}
l := tw
if limit < total+tw {
l = limit - total
}
for j := s; j < l; j++ {
select {
case <-r.term:
return
default:
if j == 0 {
r.StateReader.Account(t.addr)
} else {
r.StateReader.Storage(t.addr, t.slots[j-1])
}
}
}
}
total += tw
if total >= limit {
return
}
}
}
// ReaderWithBlockLevelAccessList provides state access that reflects the
// pre-transition state combined with the mutations made by transactions
// prior to TxIndex.
type ReaderWithBlockLevelAccessList struct {
Reader
AccessList *bal.ConstructionBlockAccessList
TxIndex int
}
func NewReaderWithBlockLevelAccessList(base Reader, accessList *bal.ConstructionBlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList {
return &ReaderWithBlockLevelAccessList{
Reader: base,
AccessList: accessList,
TxIndex: txIndex,
}
}
// Account implements Reader, returning the account with the specific address.
func (r *ReaderWithBlockLevelAccessList) Account(addr common.Address) (*types.StateAccount, error) {
panic("implement me")
}
// Storage implements Reader, returning the storage slot with the specific
// address and slot key.
func (r *ReaderWithBlockLevelAccessList) Storage(addr common.Address, slot common.Hash) (common.Hash, error) {
panic("implement me")
}
// Has implements Reader, returning the flag indicating whether the contract
// code with specified address and hash exists or not.
func (r *ReaderWithBlockLevelAccessList) Has(addr common.Address, codeHash common.Hash) bool {
panic("implement me")
}
// Code implements Reader, returning the contract code with specified address
// and hash.
func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) ([]byte, error) {
panic("implement me")
}
// CodeSize implements Reader, returning the contract code size with specified
// address and hash.
func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) (int, error) {
panic("implement me")
}
// StorageAccessList represents a set of storage slots accessed within an account.
type StorageAccessList map[common.Hash]struct{}
// StateAccessList maps account addresses to their respective accessed storage slots.
type StateAccessList map[common.Address]StorageAccessList
// Merge merges the entries from the other StateAccessList into the receiver.
func (s StateAccessList) Merge(other StateAccessList) {
for addr, otherSlots := range other {
slots, exists := s[addr]
if !exists {
s[addr] = otherSlots
continue
}
maps.Copy(slots, otherSlots)
}
}
// StateReaderTracker defines the capability to retrieve the access footprint
// recorded during state reading operations.
type StateReaderTracker interface {
GetStateAccessList() StateAccessList
}
type readerTracker struct {
Reader
access StateAccessList
lock sync.RWMutex
}
func newReaderTracker(reader Reader) *readerTracker {
return &readerTracker{
Reader: reader,
access: make(StateAccessList),
}
}
// Account implements StateReader, tracking the accessed address locally.
func (r *readerTracker) Account(addr common.Address) (*types.StateAccount, error) {
r.lock.Lock()
defer r.lock.Unlock()
_, exists := r.access[addr]
if !exists {
r.access[addr] = make(StorageAccessList)
}
return r.Reader.Account(addr)
}
// Storage implements StateReader, tracking the accessed slot identifier locally.
func (r *readerTracker) Storage(addr common.Address, slot common.Hash) (common.Hash, error) {
r.lock.Lock()
defer r.lock.Unlock()
list, exists := r.access[addr]
if !exists {
list = make(StorageAccessList)
r.access[addr] = list
}
list[slot] = struct{}{}
return r.Reader.Storage(addr, slot)
}
// GetStateAccessList implements StateReaderTracker, returning the access footprint.
func (r *readerTracker) GetStateAccessList() StateAccessList {
r.lock.RLock()
defer r.lock.RUnlock()
return r.access
}

View file

@ -0,0 +1,201 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"fmt"
"maps"
"math/rand"
"sync"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/testrand"
)
type countingStateReader struct {
accounts map[common.Address]int
storages map[common.Address]map[common.Hash]int
lock sync.Mutex
}
func newRefStateReader() *countingStateReader {
return &countingStateReader{
accounts: make(map[common.Address]int),
storages: make(map[common.Address]map[common.Hash]int),
}
}
func (r *countingStateReader) validate(total int) error {
var sum int
for addr, n := range r.accounts {
if n != 1 {
return fmt.Errorf("duplicated account access: %x-%d", addr, n)
}
sum += 1
slots, exists := r.storages[addr]
if !exists {
continue
}
for key, n := range slots {
if n != 1 {
return fmt.Errorf("duplicated storage access: %x-%x-%d", addr, key, n)
}
sum += 1
}
}
for addr := range r.storages {
_, exists := r.accounts[addr]
if !exists {
return fmt.Errorf("dangling storage access: %x", addr)
}
}
if sum != total {
return fmt.Errorf("unexpected number of access, want: %d, got: %d", total, sum)
}
return nil
}
func (r *countingStateReader) Account(addr common.Address) (*types.StateAccount, error) {
r.lock.Lock()
defer r.lock.Unlock()
r.accounts[addr] += 1
return nil, nil
}
func (r *countingStateReader) Storage(addr common.Address, slot common.Hash) (common.Hash, error) {
r.lock.Lock()
defer r.lock.Unlock()
slots, exists := r.storages[addr]
if !exists {
slots = make(map[common.Hash]int)
r.storages[addr] = slots
}
slots[slot] += 1
return common.Hash{}, nil
}
func makeFetchTasks(n int) ([]*fetchTask, int) {
var (
total int
tasks []*fetchTask
)
for i := 0; i < n; i++ {
var slots []common.Hash
if rand.Intn(3) != 0 {
for j := 0; j < rand.Intn(100); j++ {
slots = append(slots, testrand.Hash())
}
}
tasks = append(tasks, &fetchTask{
addr: testrand.Address(),
slots: slots,
})
total += len(slots) + 1
}
return tasks, total
}
func TestPrefetchReader(t *testing.T) {
type suite struct {
tasks []*fetchTask
threads int
total int
}
var suites []suite
for i := 0; i < 100; i++ {
tasks, total := makeFetchTasks(100)
suites = append(suites, suite{
tasks: tasks,
threads: rand.Intn(30) + 1,
total: total,
})
}
// num(tasks) < num(threads)
tasks, total := makeFetchTasks(1)
suites = append(suites, suite{
tasks: tasks,
threads: 100,
total: total,
})
for _, s := range suites {
r := newRefStateReader()
pr := newPrefetchStateReaderInternal(r, s.tasks, s.threads)
pr.Wait()
if err := r.validate(s.total); err != nil {
t.Fatal(err)
}
}
}
func makeFakeSlots(n int) map[common.Hash]struct{} {
slots := make(map[common.Hash]struct{})
for i := 0; i < n; i++ {
slots[testrand.Hash()] = struct{}{}
}
return slots
}
type noopStateReader struct{}
func (r *noopStateReader) Account(addr common.Address) (*types.StateAccount, error) { return nil, nil }
func (r *noopStateReader) Storage(addr common.Address, slot common.Hash) (common.Hash, error) {
return common.Hash{}, nil
}
type noopCodeReader struct{}
func (r *noopCodeReader) Has(addr common.Address, codeHash common.Hash) bool { return false }
func (r *noopCodeReader) Code(addr common.Address, codeHash common.Hash) ([]byte, error) {
return nil, nil
}
func (r *noopCodeReader) CodeSize(addr common.Address, codeHash common.Hash) (int, error) {
return 0, nil
}
func TestReaderWithTracker(t *testing.T) {
var r Reader = newReaderTracker(newReader(&noopCodeReader{}, &noopStateReader{}))
accesses := map[common.Address]map[common.Hash]struct{}{
testrand.Address(): makeFakeSlots(10),
testrand.Address(): makeFakeSlots(0),
}
for addr, slots := range accesses {
r.Account(addr)
for slot := range slots {
r.Storage(addr, slot)
}
}
got := r.(StateReaderTracker).GetStateAccessList()
if len(got) != len(accesses) {
t.Fatalf("Unexpected access list, want: %d, got: %d", len(accesses), len(got))
}
for addr, slots := range got {
entry, ok := accesses[addr]
if !ok {
t.Fatal("Unexpected access list")
}
if !maps.Equal(slots, entry) {
t.Fatal("Unexpected slots")
}
}
}