core, eth/protocols/snap, eth/downloader: snap/2 sync logic (#34626)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

Adds snap/2 (EIP-8189), a block-access-list (BAL) based state sync, and
wires it to run side by side with snap/1. It's opt-in (for now) behind a
new --snap.v2 flag and chosen at startup.

https://eips.ethereum.org/EIPS/eip-8189

---------

Co-authored-by: Toni Wahrstätter <info@toniwahrstaetter.com>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
Jonny Rhea 2026-06-11 01:45:07 -05:00 committed by GitHub
parent eea6242742
commit 17aab1ac9a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 4327 additions and 506 deletions

View file

@ -84,6 +84,19 @@ func (s *Suite) dialSnap() (*Conn, error) {
return conn, nil
}
// dialSnap2 creates a connection advertising snap/2 as the only snap capability.
// This is used by the snap/2 (EIP-8189) test suite to force the peer to
// negotiate snap/2 rather than falling back to snap/1.
func (s *Suite) dialSnap2() (*Conn, error) {
conn, err := s.dial()
if err != nil {
return nil, fmt.Errorf("dial failed: %v", err)
}
conn.caps = append(conn.caps, p2p.Cap{Name: "snap", Version: 2})
conn.ourHighestSnapProtoVersion = 2
return conn, nil
}
// Conn represents an individual connection with a peer
type Conn struct {
*rlpx.Conn
@ -183,7 +196,10 @@ func (c *Conn) ReadEth() (any, error) {
}
}
// ReadSnap reads a snap/1 response with the given id from the connection.
// ReadSnap reads a snap protocol response from the connection. It decodes
// the full message catalog of both snap/1 and snap/2. The caller is
// expected to only receive codes that were actually valid on the
// negotiated protocol version.
func (c *Conn) ReadSnap() (any, error) {
c.SetReadDeadline(time.Now().Add(timeout))
for {
@ -215,6 +231,10 @@ func (c *Conn) ReadSnap() (any, error) {
msg = new(snap.GetTrieNodesPacket)
case snap.TrieNodesMsg:
msg = new(snap.TrieNodesPacket)
case snap.GetAccessListsMsg:
msg = new(snap.GetAccessListsPacket)
case snap.AccessListsMsg:
msg = new(snap.AccessListsPacket)
default:
panic(fmt.Errorf("unhandled snap code: %d", code))
}

View file

@ -33,7 +33,11 @@ const (
const (
baseProtoLen = 16
ethProtoLen = 18
snapProtoLen = 8
// snapProtoLen accommodates snap/2 (EIP-8189) which extends snap/1 with two
// additional message codes (GetBlockAccessLists=0x08, BlockAccessLists=0x09).
// Using 10 is safe for snap/1 connections because the extra codes are simply
// never used on that protocol version.
snapProtoLen = 10
)
// Unexported handshake structure from p2p/peer.go.

View file

@ -0,0 +1,375 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package ethtest
import (
"bytes"
"fmt"
"math/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/internal/utesting"
"github.com/ethereum/go-ethereum/rlp"
)
// Snap/2 (EIP-8189) replaces trie node healing with BAL-based state catch-up.
// It keeps 0x00..0x05 (AccountRange/StorageRanges/ByteCodes) unchanged, removes
// GetTrieNodes (0x06) / TrieNodes (0x07), and adds GetBlockAccessLists (0x08) /
// BlockAccessLists (0x09).
//
// The tests in this file focus on the wire behavior that is new or changed in
// snap/2. Tests for the unchanged messages are already covered by the snap/1
// suite in snap.go; the harness reuses the same code paths because those
// message formats are identical across versions.
// TestSnap2Status performs an RLPx+eth+snap/2 handshake against the node,
// verifying that the node advertises and negotiates snap/2.
func (s *Suite) TestSnap2Status(t *utesting.T) {
t.Log(`This test performs a snap/2 (EIP-8189) handshake. The peer is expected to
advertise snap/2 as a p2p capability and accept the connection.`)
conn, err := s.dialSnap2()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
if conn.negotiatedSnapProtoVersion != 2 {
t.Fatalf("unexpected negotiated snap version: got %d, want 2", conn.negotiatedSnapProtoVersion)
}
}
type accessListsTest struct {
nBytes uint64
hashes []common.Hash
// minEntries/maxEntries bound the number of entries the response list
// MUST contain. Per EIP-8189 the server may truncate from the tail when
// the byte soft limit is reached, but MUST preserve request order.
minEntries int
maxEntries int
desc string
}
// TestSnap2GetBlockAccessLists exercises various forms of GetBlockAccessLists
// requests defined in EIP-8189. Per the spec:
//
// - Nodes MUST always respond.
// - Unavailable BALs are returned as the RLP empty string (0x80) at the
// matching position.
// - The server MAY return fewer entries than requested (respecting the byte
// soft limit or QoS limits), truncating from the tail.
// - Returned entries MUST preserve request order.
// - When a BAL is returned, its keccak256(rlp.encode(bal)) MUST match the
// block-access-list-hash field of the corresponding block header.
func (s *Suite) TestSnap2GetBlockAccessLists(t *utesting.T) {
var (
head = s.chain.Head()
headHash = head.Hash()
preHash = s.chain.blocks[s.chain.Len()-2].Hash()
unknown = common.HexToHash("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef")
)
// Collect a window of recent canonical block hashes. Limit to at most 16
// entries to keep the request small and well under any reasonable limit.
var recent []common.Hash
start := s.chain.Len() - 16
if start < 1 {
start = 1
}
for i := start; i < s.chain.Len(); i++ {
recent = append(recent, s.chain.blocks[i].Hash())
}
tests := []accessListsTest{
{
desc: `An empty request. The server must respond with an empty list and must
not disconnect.`,
nBytes: softResponseLimitSnap,
hashes: nil,
minEntries: 0,
maxEntries: 0,
},
{
desc: `A request for a single random/unknown block hash. Per the spec the
server must respond and include an RLP empty string (0x80) at that position.`,
nBytes: softResponseLimitSnap,
hashes: []common.Hash{unknown},
minEntries: 1,
maxEntries: 1,
},
{
desc: `A request for multiple random/unknown block hashes. The server must
preserve request order and return an RLP empty string for each position.`,
nBytes: softResponseLimitSnap,
hashes: []common.Hash{
unknown,
common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"),
common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002"),
},
minEntries: 3,
maxEntries: 3,
},
{
desc: `A request for the chain head. The server must respond. If the node is
post-Amsterdam and has the BAL for this block, the returned BAL must hash to
the block-access-list-hash in the header. Otherwise an empty entry is valid.`,
nBytes: softResponseLimitSnap,
hashes: []common.Hash{headHash},
minEntries: 1,
maxEntries: 1,
},
{
desc: `A request for the chain head and its parent. The server must return
exactly two entries, in request order.`,
nBytes: softResponseLimitSnap,
hashes: []common.Hash{headHash, preHash},
minEntries: 2,
maxEntries: 2,
},
{
desc: `A mixed request with known and unknown hashes. The server must
return entries in request order, with the RLP empty string at positions
corresponding to unknown hashes.`,
nBytes: softResponseLimitSnap,
hashes: []common.Hash{headHash, unknown, preHash, unknown},
// We expect exactly 4 entries — mixed responses are small and well
// under the byte limit, so truncation is not expected.
minEntries: 4,
maxEntries: 4,
},
{
desc: `A request spanning the most recent canonical window. Implementations
may serve or drop individual entries, but the entries that are returned must
preserve request order.`,
nBytes: softResponseLimitSnap,
hashes: recent,
minEntries: 0,
maxEntries: len(recent),
},
{
desc: `A request with a very small byte soft limit. The server must return
at least zero entries and no more than the requested number, truncating from
the tail. It must not disconnect.`,
nBytes: 1,
hashes: recent,
minEntries: 0,
maxEntries: len(recent),
},
{
desc: `A request with a zero byte soft limit. The server must still respond
(possibly with an empty list) and must not disconnect.`,
nBytes: 0,
hashes: recent,
minEntries: 0,
maxEntries: len(recent),
},
{
desc: `A request containing the same hash repeated. The server must treat
each position independently and preserve request order.`,
nBytes: softResponseLimitSnap,
hashes: []common.Hash{headHash, headHash, headHash},
minEntries: 3,
maxEntries: 3,
},
}
for i, tc := range tests {
if i > 0 {
t.Log("\n")
}
t.Logf("-- Test %d", i)
t.Log(tc.desc)
t.Log(" request:")
t.Logf(" hashes: %d", len(tc.hashes))
t.Logf(" responseBytes: %d", tc.nBytes)
if err := s.snapGetAccessLists(t, &tc); err != nil {
t.Errorf("test %d failed: %v", i, err)
}
}
}
// TestSnap2TrieNodesRemoved verifies that snap/2 no longer serves the
// GetTrieNodes message (0x06). Per EIP-8189, snap/2 removes GetTrieNodes and
// TrieNodes entirely. A server that negotiated snap/2 must not treat these
// codes as valid snap messages and should disconnect the peer that sends them.
func (s *Suite) TestSnap2TrieNodesRemoved(t *utesting.T) {
t.Log(`This test verifies that sending a GetTrieNodes message over a snap/2
connection causes the peer to reject the request. Per EIP-8189, GetTrieNodes
is removed in snap/2.`)
conn, err := s.dialSnap2()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
// Build a syntactically valid GetTrieNodes request to the head state root.
paths, err := rlp.EncodeToRawList([]snap.TrieNodePathSet{{[]byte{0}}})
if err != nil {
t.Fatalf("failed to encode paths: %v", err)
}
req := &snap.GetTrieNodesPacket{
ID: uint64(rand.Int63()),
Root: s.chain.Head().Root(),
Paths: paths,
Bytes: 5000,
}
if err := conn.Write(snapProto, snap.GetTrieNodesMsg, req); err != nil {
t.Fatalf("failed to write GetTrieNodes: %v", err)
}
// We expect either a disconnect or a read error/timeout. We must NOT
// receive a valid TrieNodes response. Loop a few times to consume any
// incidental messages the peer might send (e.g. block updates) before
// deciding.
for i := 0; i < 5; i++ {
msg, err := conn.ReadSnap()
if err != nil {
// Disconnect or read error — the peer rejected the request.
return
}
if _, ok := msg.(*snap.TrieNodesPacket); ok {
t.Fatal("peer responded with TrieNodes over snap/2; GetTrieNodes must be unsupported")
}
}
t.Fatal("peer did not reject GetTrieNodes over snap/2 within the observation window")
}
// softResponseLimitSnap mirrors the recommended 2 MiB soft limit for
// BlockAccessLists responses from EIP-8189 §"Response Size Limit".
const softResponseLimitSnap = 2 * 1024 * 1024
// snapGetAccessLists sends a GetBlockAccessLists request, validates the
// response structure against EIP-8189, and verifies BAL content against the
// block-access-list-hash field of the corresponding block header (when the
// block is known and a BAL was returned).
func (s *Suite) snapGetAccessLists(t *utesting.T, tc *accessListsTest) error {
conn, err := s.dialSnap2()
if err != nil {
return fmt.Errorf("dial failed: %v", err)
}
defer conn.Close()
if err = conn.peer(s.chain, nil); err != nil {
return fmt.Errorf("peering failed: %v", err)
}
req := &snap.GetAccessListsPacket{
ID: uint64(rand.Int63()),
Hashes: tc.hashes,
Bytes: tc.nBytes,
}
msg, err := conn.snapRequest(snap.GetAccessListsMsg, req)
if err != nil {
return fmt.Errorf("access list request failed: %v", err)
}
res, ok := msg.(*snap.AccessListsPacket)
if !ok {
return fmt.Errorf("unexpected response type: %T", msg)
}
if res.ID != req.ID {
return fmt.Errorf("request id mismatch: got %d, want %d", res.ID, req.ID)
}
// Check list length bounds.
got := res.AccessLists.Len()
if got < tc.minEntries || got > tc.maxEntries {
return fmt.Errorf("response has %d entries, want between %d and %d", got, tc.minEntries, tc.maxEntries)
}
// Build a map of request-index -> block so we can verify BAL hashes.
blocks := make(map[int]*types.Block)
for i, h := range tc.hashes {
for _, b := range s.chain.blocks {
if b.Hash() == h {
blocks[i] = b
break
}
}
}
// Iterate the response, validating each entry positionally.
var (
idx int
it = res.AccessLists.ContentIterator()
)
for it.Next() {
raw := it.Value()
block := blocks[idx]
// Empty entry: per spec, indicates BAL is unavailable for that block.
if bytes.Equal(raw, rlp.EmptyString) {
if block != nil && block.Header().BlockAccessListHash != nil {
// Not a failure — the server is allowed to legitimately not
// have the BAL. But we log it so the test output is diagnosable.
t.Logf(" entry %d: server returned empty for known post-Amsterdam block %x", idx, tc.hashes[idx])
}
idx++
continue
}
// Non-empty entry. A BAL is only legitimate for a block we know
// locally whose header commits to one; for any other hash the only
// valid response is the RLP empty string, so receiving data here
// means the server fabricated it.
if block == nil {
return fmt.Errorf("entry %d: server returned BAL data for unknown hash %x", idx, tc.hashes[idx])
}
if block.Header().BlockAccessListHash == nil {
return fmt.Errorf("entry %d: server returned BAL data for a block with no expected BAL (hash %x)", idx, tc.hashes[idx])
}
// Per EIP-8189: compute keccak256(rlp.encode(bal)) against the raw
// bytes actually received on the wire, and compare to the header
// commitment. Hashing raw bytes (rather than re-encoding after a
// decode round-trip) catches peers that send non-canonical BAL
// encodings.
have := crypto.Keccak256Hash(raw)
want := *block.Header().BlockAccessListHash
if have != want {
return fmt.Errorf("entry %d: BAL hash mismatch: have %x, want %x", idx, have, want)
}
// Decode and validate the BAL's internal structure: ordering of
// accounts/slots/changes, code-size limits, and per-entry access-index
// bounds, against the known block.
var accessList bal.BlockAccessList
if err := rlp.DecodeBytes(raw, &accessList); err != nil {
return fmt.Errorf("entry %d: invalid BAL RLP: %v", idx, err)
}
if err := accessList.Validate(block.GasLimit(), len(block.Transactions())); err != nil {
return fmt.Errorf("entry %d: BAL failed validation: %v", idx, err)
}
idx++
}
// Sanity: iterator consumed exactly the reported number of entries.
if idx != got {
return fmt.Errorf("iterator visited %d entries, expected %d", idx, got)
}
return nil
}

View file

@ -106,6 +106,16 @@ func (s *Suite) SnapTests() []utesting.Test {
}
}
// Snap2Tests returns the list of tests for the snap/2 protocol (EIP-8189).
// These tests require the peer to advertise and negotiate snap/2.
func (s *Suite) Snap2Tests() []utesting.Test {
return []utesting.Test{
{Name: "Status", Fn: s.TestSnap2Status},
{Name: "GetBlockAccessLists", Fn: s.TestSnap2GetBlockAccessLists},
{Name: "TrieNodesRemoved", Fn: s.TestSnap2TrieNodesRemoved},
}
}
func (s *Suite) TestStatus(t *utesting.T) {
t.Log(`This test is just a sanity check. It performs an eth protocol handshake.`)
conn, err := s.dialAndPeer(nil)

View file

@ -99,6 +99,31 @@ func TestSnapSuite(t *testing.T) {
}
}
func TestSnap2Suite(t *testing.T) {
jwtPath, secret, err := makeJWTSecret(t)
if err != nil {
t.Fatalf("could not make jwt secret: %v", err)
}
geth, err := runGeth("./testdata", jwtPath)
if err != nil {
t.Fatalf("could not run geth: %v", err)
}
defer geth.Close()
suite, err := NewSuite(geth.Server().Self(), "./testdata", geth.HTTPAuthEndpoint(), common.Bytes2Hex(secret[:]))
if err != nil {
t.Fatalf("could not create new test suite: %v", err)
}
for _, test := range suite.Snap2Tests() {
t.Run(test.Name, func(t *testing.T) {
result := utesting.RunTests([]utesting.Test{{Name: test.Name, Fn: test.Fn}}, os.Stdout)
if result[0].Failed {
t.Fatal()
}
})
}
}
// runGeth creates and starts a geth node
func runGeth(dir string, jwtPath string) (*node.Node, error) {
stack, err := node.New(&node.Config{
@ -141,6 +166,7 @@ func setupGeth(stack *node.Node, dir string) error {
TrieDirtyCache: 16,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 10,
SnapV2: true, // advertise snap/2 (alongside snap/1) so the snap/2 suite can negotiate it
})
if err != nil {
return err

View file

@ -64,6 +64,7 @@ var (
rlpxPingCommand,
rlpxEthTestCommand,
rlpxSnapTestCommand,
rlpxSnap2TestCommand,
},
}
rlpxPingCommand = &cli.Command{
@ -99,6 +100,20 @@ var (
testNodeEngineFlag,
},
}
rlpxSnap2TestCommand = &cli.Command{
Name: "snap2-test",
Usage: "Runs snap/2 (EIP-8189) protocol tests against a node",
ArgsUsage: "",
Action: rlpxSnap2Test,
Flags: []cli.Flag{
testPatternFlag,
testTAPFlag,
testChainDirFlag,
testNodeFlag,
testNodeJWTFlag,
testNodeEngineFlag,
},
}
)
func rlpxPing(ctx *cli.Context) error {
@ -164,6 +179,16 @@ func rlpxSnapTest(ctx *cli.Context) error {
return runTests(ctx, suite.SnapTests())
}
// rlpxSnap2Test runs the snap/2 (EIP-8189) protocol test suite.
func rlpxSnap2Test(ctx *cli.Context) error {
p := cliTestParams(ctx)
suite, err := ethtest.NewSuite(p.node, p.chainDir, p.engineAPI, p.jwt)
if err != nil {
exit(err)
}
return runTests(ctx, suite.Snap2Tests())
}
type testParams struct {
node *enode.Node
engineAPI string

View file

@ -204,6 +204,7 @@ var (
utils.MetricsInfluxDBBucketFlag,
utils.MetricsInfluxDBOrganizationFlag,
utils.StateSizeTrackingFlag,
utils.SnapV2Flag,
}
)

View file

@ -297,6 +297,12 @@ var (
Value: ethconfig.Defaults.EnableStateSizeTracking,
Category: flags.StateCategory,
}
SnapV2Flag = &cli.BoolFlag{
Name: "snap.v2",
Usage: "Enable the experimental snap/2 (EIP-8189, BAL-based) sync protocol (advertises and syncs via snap/2; not safe on public networks)",
Value: ethconfig.Defaults.SnapV2,
Category: flags.StateCategory,
}
BinTrieGroupDepthFlag = &cli.IntFlag{
Name: "bintrie.groupdepth",
Usage: "Number of levels per serialized group in binary trie (1-8, default 5). Lower values create smaller groups with more nodes.",
@ -1905,6 +1911,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(StateSizeTrackingFlag.Name) {
cfg.EnableStateSizeTracking = ctx.Bool(StateSizeTrackingFlag.Name)
}
if ctx.IsSet(SnapV2Flag.Name) {
cfg.SnapV2 = ctx.Bool(SnapV2Flag.Name)
}
// Override any default configs for hard coded networks.
switch {
case ctx.Bool(MainnetFlag.Name):

View file

@ -1164,7 +1164,7 @@ func (bc *BlockChain) SnapSyncStart() error {
// given hash, regardless of the chain contents prior to snap sync. It is
// invoked once snap sync completes and assumes that SnapSyncStart was called
// previously.
func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
func (bc *BlockChain) SnapSyncComplete(hash common.Hash, isSnapV2 bool) error {
// Make sure that both the block as well at its state trie exists
block := bc.GetBlockByHash(hash)
if block == nil {
@ -1175,19 +1175,28 @@ func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
}
defer bc.chainmu.Unlock()
// Reset the trie database with the fresh snap synced state.
// Reset the trie database with the fresh snap synced state. Snap/1 needs
// a full trie-to-flat regeneration; snap/2 adopts the already-consistent
// flat state and skips that work.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Enable(root); err != nil {
return err
if isSnapV2 {
if err := bc.triedb.AdoptSyncedState(root); err != nil {
return err
}
} else {
if err := bc.triedb.Enable(root); err != nil {
return err
}
}
}
if !bc.HasState(root) {
return fmt.Errorf("non existent state [%x..]", root[:4])
}
// Destroy any existing state snapshot and regenerate it in the background,
// also resuming the normal maintenance of any previously paused snapshot.
if bc.snaps != nil {
// The legacy snapshot tree needs to be wiped and rebuilt from the trie
// after a snap/1 sync.
if !isSnapV2 && bc.snaps != nil {
bc.snaps.Rebuild(root)
}

View file

@ -250,3 +250,10 @@ func DeleteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte) {
log.Crit("Failed to remove generate-trie done marker", "err", err)
}
}
// DeleteSnapshotSyncStatus removes the serialized sync status from the database.
func DeleteSnapshotSyncStatus(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotSyncStatusKey); err != nil {
log.Crit("Failed to remove snapshot sync status", "err", err)
}
}

View file

@ -352,6 +352,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
RequiredBlocks: config.RequiredBlocks,
SnapV2: config.SnapV2,
}); err != nil {
return nil, err
}
@ -448,7 +449,7 @@ func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruni
func (s *Ethereum) Protocols() []p2p.Protocol {
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.discmix)
if s.config.SnapshotCache > 0 {
protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler))...)
protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.config.SnapV2)...)
}
return protos
}

View file

@ -142,7 +142,7 @@ type Downloader struct {
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
snapSyncer snap.Syncer // snap/1 or snap/2 state syncer, selected at construction
stateSyncStart chan *stateSync
// Cancellation and termination
@ -201,7 +201,7 @@ type BlockChain interface {
SnapSyncStart() error
// SnapSyncComplete directly commits the head block to a certain entity.
SnapSyncComplete(common.Hash) error
SnapSyncComplete(hash common.Hash, isSnapV2 bool) error
// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
// chain cutoff into the ancient store.
@ -232,7 +232,7 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, chain BlockChain, dropPeer peerDropFn, success func(), snapV2 bool) *Downloader {
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
dl := &Downloader{
stateDB: stateDb,
@ -245,10 +245,15 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, chain BlockChain, drop
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
}
// Select the snap/1 or snap/2 state syncer based on the feature flag.
if snapV2 {
dl.snapSyncer = snap.NewV2Syncer(stateDb, chain.TrieDB().Scheme())
} else {
dl.snapSyncer = snap.NewV1Syncer(stateDb, chain.TrieDB().Scheme())
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success), chain)
@ -278,7 +283,7 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
default:
log.Error("Unknown downloader mode", "mode", mode)
}
progress, pending := d.SnapSyncer.Progress()
progress := d.snapSyncer.Progress()
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
@ -294,8 +299,8 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
HealedTrienodeBytes: uint64(progress.TrienodeHealBytes),
HealedBytecodes: progress.BytecodeHealSynced,
HealedBytecodeBytes: uint64(progress.BytecodeHealBytes),
HealingTrienodes: pending.TrienodeHeal,
HealingBytecode: pending.BytecodeHeal,
HealingTrienodes: progress.HealingTrienodes,
HealingBytecode: progress.HealingBytecode,
}
}
@ -889,7 +894,7 @@ func (d *Downloader) processSnapSyncContent() error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
d.pivotLock.RLock()
sync := d.syncState(d.pivotHeader.Root)
sync := d.syncState(d.pivotHeader)
d.pivotLock.RUnlock()
defer func() {
@ -959,10 +964,9 @@ func (d *Downloader) processSnapSyncContent() error {
if oldPivot == nil { // no results piling up, we can move the pivot
if !d.committed.Load() { // not yet passed the pivot, we can move the pivot
if pivot.Root != sync.root { // pivot position changed, we can move the pivot
if pivot.Root != sync.pivot.Root { // pivot state root changed, we can move the pivot
sync.Cancel()
sync = d.syncState(pivot.Root)
sync = d.syncState(pivot)
go closeOnErr(sync)
}
}
@ -977,8 +981,7 @@ func (d *Downloader) processSnapSyncContent() error {
// If new pivot block found, cancel old state retrieval and restart
if oldPivot != P {
sync.Cancel()
sync = d.syncState(P.Header.Root)
sync = d.syncState(P.Header)
go closeOnErr(sync)
oldPivot = P
}
@ -1070,7 +1073,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
return err
}
if err := d.blockchain.SnapSyncComplete(block.Hash()); err != nil {
if err := d.blockchain.SnapSyncComplete(block.Hash(), d.snapSyncer.Version() == snap.SNAP2); err != nil {
return err
}
d.committed.Store(true)
@ -1086,23 +1089,46 @@ func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) erro
if err != nil {
return err
}
return d.SnapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof)
return d.snapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof)
case *snap.StorageRangesPacket:
hashset, slotset := packet.Unpack()
return d.SnapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof)
return d.snapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof)
case *snap.ByteCodesPacket:
return d.SnapSyncer.OnByteCodes(peer, packet.ID, packet.Codes)
return d.snapSyncer.OnByteCodes(peer, packet.ID, packet.Codes)
case *snap.TrieNodesPacket:
return d.SnapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes)
return d.snapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes)
case *snap.AccessListsPacket:
return d.snapSyncer.OnAccessLists(peer, packet.ID, packet.AccessLists)
default:
return fmt.Errorf("unexpected snap packet type: %T", packet)
}
}
// RegisterSnapPeer registers a snap peer with the active state syncer. Peers that
// negotiated a snap version below the syncer's minimum are skipped — e.g. the
// snap/2 syncer skips snap/1-only peers, which cannot answer its BAL requests.
func (d *Downloader) RegisterSnapPeer(p *snap.Peer) error {
if p.Version() < d.snapSyncer.Version() {
return nil
}
return d.snapSyncer.Register(p)
}
// UnregisterSnapPeer removes a snap peer from the active state syncer. It mirrors
// RegisterSnapPeer's version gate: a peer below the active syncer's version was
// never registered, so there is nothing to remove.
func (d *Downloader) UnregisterSnapPeer(p *snap.Peer) error {
if p.Version() < d.snapSyncer.Version() {
return nil
}
return d.snapSyncer.Unregister(p.ID())
}
// readHeaderRange returns a list of headers, using the given last header as the base,
// and going backwards towards genesis. This method assumes that the caller already has
// placed a reasonable cap on count.

View file

@ -52,8 +52,14 @@ func newTester(t *testing.T, mode ethconfig.SyncMode) *downloadTester {
return newTesterWithNotification(t, mode, nil)
}
// newTesterWithNotification creates a new downloader test mocker.
// newTesterWithNotification creates a new downloader test mocker (snap/1).
func newTesterWithNotification(t *testing.T, mode ethconfig.SyncMode, success func()) *downloadTester {
return newTesterWithSnap(t, mode, success, false)
}
// newTesterWithSnap is like newTesterWithNotification but selects the snap/2
// state syncer when snapV2 is set.
func newTesterWithSnap(t *testing.T, mode ethconfig.SyncMode, success func(), snapV2 bool) *downloadTester {
db, err := rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{})
if err != nil {
panic(err)
@ -74,7 +80,7 @@ func newTesterWithNotification(t *testing.T, mode ethconfig.SyncMode, success fu
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
tester.downloader = New(db, mode, tester.chain, tester.dropPeer, success)
tester.downloader = New(db, mode, tester.chain, tester.dropPeer, success, snapV2)
return tester
}
@ -102,7 +108,7 @@ func (dl *downloadTester) newPeer(id string, version uint, blocks []*types.Block
if err := dl.downloader.RegisterPeer(id, version, peer); err != nil {
panic(err)
}
if err := dl.downloader.SnapSyncer.Register(peer); err != nil {
if err := dl.downloader.snapSyncer.Register(peer); err != nil {
panic(err)
}
return peer
@ -114,7 +120,7 @@ func (dl *downloadTester) dropPeer(id string) {
defer dl.lock.Unlock()
delete(dl.peers, id)
dl.downloader.SnapSyncer.Unregister(id)
dl.downloader.snapSyncer.Unregister(id)
dl.downloader.UnregisterPeer(id)
}
@ -329,7 +335,7 @@ func (dlp *downloadTesterPeer) RequestAccountRange(id uint64, root, origin, limi
}
hashes, accounts, _ := res.Unpack()
go dlp.dl.downloader.SnapSyncer.OnAccounts(dlp, id, hashes, accounts, proofs)
go dlp.dl.downloader.snapSyncer.OnAccounts(dlp, id, hashes, accounts, proofs)
return nil
}
@ -356,7 +362,7 @@ func (dlp *downloadTesterPeer) RequestStorageRanges(id uint64, root common.Hash,
}
hashes, slots := res.Unpack()
go dlp.dl.downloader.SnapSyncer.OnStorage(dlp, id, hashes, slots, proofs)
go dlp.dl.downloader.snapSyncer.OnStorage(dlp, id, hashes, slots, proofs)
return nil
}
@ -368,11 +374,12 @@ func (dlp *downloadTesterPeer) RequestByteCodes(id uint64, hashes []common.Hash,
Bytes: uint64(bytes),
}
codes := snap.ServiceGetByteCodesQuery(dlp.chain, req)
go dlp.dl.downloader.SnapSyncer.OnByteCodes(dlp, id, codes)
go dlp.dl.downloader.snapSyncer.OnByteCodes(dlp, id, codes)
return nil
}
// RequestTrieNodes fetches a batch of account or storage trie nodes.
// RequestTrieNodes fetches a batch of trie nodes (snap/1 healing). snap/2 never
// issues these, but the method is required to satisfy snap.SyncPeerV2.
func (dlp *downloadTesterPeer) RequestTrieNodes(id uint64, root common.Hash, count int, paths []snap.TrieNodePathSet, bytes int) error {
encPaths, err := rlp.EncodeToRawList(paths)
if err != nil {
@ -385,7 +392,19 @@ func (dlp *downloadTesterPeer) RequestTrieNodes(id uint64, root common.Hash, cou
Bytes: uint64(bytes),
}
nodes, _ := snap.ServiceGetTrieNodesQuery(dlp.chain, req)
go dlp.dl.downloader.SnapSyncer.OnTrieNodes(dlp, id, nodes)
go dlp.dl.downloader.snapSyncer.OnTrieNodes(dlp, id, nodes)
return nil
}
// RequestAccessLists fetches a batch of BALs by block hash.
func (dlp *downloadTesterPeer) RequestAccessLists(id uint64, hashes []common.Hash, bytes int) error {
req := &snap.GetAccessListsPacket{
ID: id,
Hashes: hashes,
Bytes: uint64(bytes),
}
als := snap.ServiceGetAccessListsQuery(dlp.chain, req)
go dlp.dl.downloader.snapSyncer.OnAccessLists(dlp, id, als)
return nil
}
@ -412,14 +431,15 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
}
}
func TestCanonicalSynchronisationFull(t *testing.T) { testCanonSync(t, eth.ETH69, FullSync) }
func TestCanonicalSynchronisationSnap(t *testing.T) { testCanonSync(t, eth.ETH69, SnapSync) }
func TestCanonicalSynchronisationFull(t *testing.T) { testCanonSync(t, eth.ETH69, FullSync, false) }
func TestCanonicalSynchronisationSnap(t *testing.T) { testCanonSync(t, eth.ETH69, SnapSync, false) }
func TestCanonicalSynchronisationSnapV2(t *testing.T) { testCanonSync(t, eth.ETH69, SnapSync, true) }
func testCanonSync(t *testing.T, protocol uint, mode SyncMode) {
func testCanonSync(t *testing.T, protocol uint, mode SyncMode, snapV2 bool) {
success := make(chan struct{})
tester := newTesterWithNotification(t, mode, func() {
tester := newTesterWithSnap(t, mode, func() {
close(success)
})
}, snapV2)
defer tester.terminate()
// Create a small enough block chain to download

View file

@ -19,14 +19,14 @@ package downloader
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// syncState starts downloading state with the given root hash.
func (d *Downloader) syncState(root common.Hash) *stateSync {
// syncState starts downloading state with the given pivot header.
func (d *Downloader) syncState(pivot *types.Header) *stateSync {
// Create the state sync
s := newStateSync(d, root)
s := newStateSync(d, pivot)
select {
case d.stateSyncStart <- s:
// If we tell the statesync to restart with a new root, we also need
@ -58,7 +58,7 @@ func (d *Downloader) stateFetcher() {
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
log.Trace("State sync starting", "root", s.root)
log.Trace("State sync starting", "pivot", s.pivot.Hash(), "number", s.pivot.Number)
go s.run()
defer s.Cancel()
@ -75,10 +75,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
}
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
// by a given pivot header.
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
root common.Hash // State root currently being synced
d *Downloader // Downloader instance to access and manage current peerset
pivot *types.Header // Pivot header currently being synced
started chan struct{} // Started is signalled once the sync loop starts
cancel chan struct{} // Channel to signal a termination request
@ -89,10 +89,10 @@ type stateSync struct {
// newStateSync creates a new state trie download scheduler. This method does not
// yet start the sync. The user needs to call run to initiate.
func newStateSync(d *Downloader, root common.Hash) *stateSync {
func newStateSync(d *Downloader, pivot *types.Header) *stateSync {
return &stateSync{
d: d,
root: root,
pivot: pivot,
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
@ -104,7 +104,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
// finish.
func (s *stateSync) run() {
close(s.started)
s.err = s.d.SnapSyncer.Sync(s.root, s.cancel)
s.err = s.d.snapSyncer.Sync(s.pivot, s.cancel)
close(s.done)
}

View file

@ -184,6 +184,11 @@ type Config struct {
// Enables tracking of state size
EnableStateSizeTracking bool
// SnapV2 enables the experimental snap/2 (EIP-8189, BAL-based) sync protocol:
// the node advertises snap/2 on the wire and uses the snap/2 state syncer.
// It is not safe to enable on public networks yet.
SnapV2 bool
// Enables VM tracing
VMTrace string
VMTraceJsonConfig string

View file

@ -57,6 +57,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
EnableWitnessStats bool
StatelessSelfValidation bool
EnableStateSizeTracking bool
SnapV2 bool
VMTrace string
VMTraceJsonConfig string
RPCGasCap uint64
@ -111,6 +112,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.EnableWitnessStats = c.EnableWitnessStats
enc.StatelessSelfValidation = c.StatelessSelfValidation
enc.EnableStateSizeTracking = c.EnableStateSizeTracking
enc.SnapV2 = c.SnapV2
enc.VMTrace = c.VMTrace
enc.VMTraceJsonConfig = c.VMTraceJsonConfig
enc.RPCGasCap = c.RPCGasCap
@ -169,6 +171,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
EnableWitnessStats *bool
StatelessSelfValidation *bool
EnableStateSizeTracking *bool
SnapV2 *bool
VMTrace *string
VMTraceJsonConfig *string
RPCGasCap *uint64
@ -306,6 +309,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.EnableStateSizeTracking != nil {
c.EnableStateSizeTracking = *dec.EnableStateSizeTracking
}
if dec.SnapV2 != nil {
c.SnapV2 = *dec.SnapV2
}
if dec.VMTrace != nil {
c.VMTrace = *dec.VMTrace
}

View file

@ -108,6 +108,7 @@ type handlerConfig struct {
Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
SnapV2 bool // Whether to advertise and sync via the snap/2 protocol
}
type handler struct {
@ -156,7 +157,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
handlerStartCh: make(chan struct{}),
}
// Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, config.Sync, h.chain, h.removePeer, h.enableSyncedFeatures)
h.downloader = downloader.New(config.Database, config.Sync, h.chain, h.removePeer, h.enableSyncedFeatures, config.SnapV2)
// If snap sync is requested but snapshots are disabled, fail loudly
if h.downloader.ConfigSyncMode() == ethconfig.SnapSync && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
@ -278,7 +279,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return err
}
if snap != nil {
if err := h.downloader.SnapSyncer.Register(snap); err != nil {
if err := h.downloader.RegisterSnapPeer(snap); err != nil {
peer.Log().Error("Failed to register peer in snap syncer", "err", err)
return err
}
@ -392,7 +393,7 @@ func (h *handler) unregisterPeer(id string) {
// Remove the `snap` extension if it exists
if peer.snapExt != nil {
h.downloader.SnapSyncer.Unregister(id)
h.downloader.UnregisterSnapPeer(peer.snapExt.Peer)
}
h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id)

View file

@ -0,0 +1,188 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"bytes"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)
// verifyAccessList checks that the given block access list matches the hash
// committed in the block header.
func verifyAccessList(b *bal.BlockAccessList, header *types.Header) error {
if header.BlockAccessListHash == nil {
return fmt.Errorf("header %d has no access list hash", header.Number)
}
have := b.Hash()
if have != *header.BlockAccessListHash {
return fmt.Errorf("access list hash mismatch for block %d: have %v, want %v", header.Number, have, *header.BlockAccessListHash)
}
return nil
}
// isFetched tells us if accountHash has been downloaded.
func (s *syncerV2) isFetched(accountHash common.Hash) bool {
s.lock.RLock()
defer s.lock.RUnlock()
for _, task := range s.tasks {
if bytes.Compare(accountHash[:], task.Last[:]) <= 0 {
return bytes.Compare(accountHash[:], task.Next[:]) < 0
}
}
return true
}
// isStorageFetched reports whether the specified storage slot has already
// been downloaded in a previous cycle.
func (s *syncerV2) isStorageFetched(accountHash, storageHash common.Hash) bool {
s.lock.RLock()
defer s.lock.RUnlock()
for _, task := range s.tasks {
if bytes.Compare(accountHash[:], task.Last[:]) > 0 {
continue
}
// The account falls within a completed account range.
if bytes.Compare(accountHash[:], task.Next[:]) < 0 {
return true
}
// All storage for this account has been synchronized.
if _, ok := task.stateCompleted[accountHash]; ok {
return true
}
// No storage sync task exists for this account yet.
subtasks, ok := task.SubTasks[accountHash]
if !ok {
return false
}
// Check whether the slot falls within a completed storage subrange.
for _, sub := range subtasks {
if bytes.Compare(storageHash[:], sub.Last[:]) <= 0 {
return bytes.Compare(storageHash[:], sub.Next[:]) < 0
}
}
return true // All storage subranges for this slot have been completed.
}
return true // The account belongs to a completed account range.
}
// applyAccessList applies a single block's access list diffs to the flat state
// in the database. For each account, it applies the post-block values (highest
// TxIdx entry) for balance, nonce, code, and storage. The storageRoot field is
// intentionally left stale. It will be recomputed during the trie rebuild.
func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) error {
// Iterate over all accounts in the access list
for _, access := range *b {
addr := access.Address
accountHash := crypto.Keccak256Hash(addr[:])
for _, slotWrites := range access.StorageChanges {
if n := len(slotWrites.SlotChanges); n > 0 {
value := slotWrites.SlotChanges[n-1].PostValue
slotKey := slotWrites.Slot.Bytes32()
storageHash := crypto.Keccak256Hash(slotKey[:])
if !s.isStorageFetched(accountHash, storageHash) {
continue
}
if value.IsZero() {
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
} else {
// Store the slot in the same encoding the snapshot and the
// trie rebuild use: RLP of the minimal big-endian value
// (leading zeros trimmed), matching core/state's snapshot
// writes.
blob, _ := rlp.EncodeToBytes(value.Bytes())
rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, blob)
}
}
}
if !s.isFetched(accountHash) {
continue
}
// Read the existing account from flat state (may not exist yet)
var (
account types.StateAccount
isNew bool
)
if data := rawdb.ReadAccountSnapshot(s.db, accountHash); len(data) > 0 {
existing, err := types.FullAccount(data)
if err != nil {
return fmt.Errorf("failed to decode account %v: %w", addr, err)
}
account = *existing
} else {
// New account — initialize with defaults
isNew = true
account.Balance = new(uint256.Int)
account.Root = types.EmptyRootHash
account.CodeHash = types.EmptyCodeHash[:]
}
// Apply balance change (last entry = post-block state)
if n := len(access.BalanceChanges); n > 0 {
account.Balance = new(uint256.Int).Set(access.BalanceChanges[n-1].PostBalance)
}
// Apply nonce change (last entry = post-block state)
if n := len(access.NonceChanges); n > 0 {
account.Nonce = access.NonceChanges[n-1].PostNonce
}
// Apply code change (last entry = post-block state)
if n := len(access.CodeChanges); n > 0 {
code := access.CodeChanges[n-1].NewCode
if len(code) > 0 {
codeHash := crypto.Keccak256(code)
rawdb.WriteCode(batch, common.BytesToHash(codeHash), code)
account.CodeHash = codeHash
} else {
account.CodeHash = types.EmptyCodeHash[:]
}
}
// Don't create empty accounts in flat state (EIP-161).
isEmpty := account.Balance.IsZero() && account.Nonce == 0 &&
bytes.Equal(account.CodeHash, types.EmptyCodeHash[:])
switch {
case isEmpty && isNew:
// This covers cases where an account is created and destroyed within the
// same transaction, or where its net state change across the block is zero.
// The empty -> empty transition should be excluded from account update.
case isEmpty && !isNew:
// Existing account got fully drained (e.g., pre-funded
// address that gets deployed to with init code that
// self-destructs). Delete the entry so the trie rebuild
// doesn't pick it up as an empty leaf.
rawdb.DeleteAccountSnapshot(batch, accountHash)
default:
// Write the updated account (storageRoot intentionally left stale)
rawdb.WriteAccountSnapshot(batch, accountHash, types.SlimAccountRLP(account))
}
}
return nil
}

View file

@ -0,0 +1,591 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"bytes"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)
// buildTestBAL constructs a BlockAccessList from a ConstructionBlockAccessList
// by RLP round-tripping (construction types use unexported encoding types).
func buildTestBAL(t *testing.T, cb *bal.ConstructionBlockAccessList) *bal.BlockAccessList {
t.Helper()
var buf bytes.Buffer
if err := cb.EncodeRLP(&buf); err != nil {
t.Fatalf("failed to encode BAL: %v", err)
}
var b bal.BlockAccessList
if err := rlp.DecodeBytes(buf.Bytes(), &b); err != nil {
t.Fatalf("failed to decode BAL: %v", err)
}
return &b
}
// applyBAL applies b to the syncer's flat state and commits it, mirroring the
// per-block batch flow used during catch-up: applyAccessList writes into a batch
// that the caller commits.
func applyBAL(t *testing.T, s *syncerV2, b *bal.BlockAccessList) {
t.Helper()
batch := s.db.NewBatch()
if err := s.applyAccessList(b, batch); err != nil {
t.Fatalf("applyAccessList failed: %v", err)
}
if err := batch.Write(); err != nil {
t.Fatalf("failed to commit BAL batch: %v", err)
}
}
// TestAccessListVerification checks that verifyAccessList accepts valid BALs
// and rejects tampered ones.
func TestAccessListVerification(t *testing.T) {
t.Parallel()
cb := bal.NewConstructionBlockAccessList()
addr := common.HexToAddress("0x01")
cb.BalanceChange(0, addr, uint256.NewInt(100))
b := buildTestBAL(t, cb)
correctHash := b.Hash()
// Valid: hash matches header
header := &types.Header{
Number: big.NewInt(1),
BlockAccessListHash: &correctHash,
}
if err := verifyAccessList(b, header); err != nil {
t.Fatalf("valid access list rejected: %v", err)
}
// Invalid: wrong hash in header
wrongHash := common.HexToHash("0xdead")
badHeader := &types.Header{
Number: big.NewInt(1),
BlockAccessListHash: &wrongHash,
}
if err := verifyAccessList(b, badHeader); err == nil {
t.Fatal("tampered access list accepted")
}
// Invalid: no hash in header
noHashHeader := &types.Header{
Number: big.NewInt(1),
}
if err := verifyAccessList(b, noHashHeader); err == nil {
t.Fatal("header without access list hash accepted")
}
}
// TestAccessListApplication verifies that applyAccessList correctly updates
// flat state (balance, nonce, code, storage) and leaves storageRoot stale.
func TestAccessListApplication(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0x01")
accountHash := crypto.Keccak256Hash(addr[:])
// Write an existing account to flat state
original := types.StateAccount{
Nonce: 5,
Balance: uint256.NewInt(1000),
Root: common.HexToHash("0xbeef"), // intentionally non-empty
CodeHash: types.EmptyCodeHash[:],
}
rawdb.WriteAccountSnapshot(db, accountHash, types.SlimAccountRLP(original))
// Write an existing storage slot. The BAL uses raw slot keys, but the
// snapshot layer stores slots under keccak256(slot).
rawSlot := common.HexToHash("0xaa")
slotHash := crypto.Keccak256Hash(rawSlot[:])
rawdb.WriteStorageSnapshot(db, accountHash, slotHash, common.HexToHash("0x01").Bytes())
// Build a BAL that changes balance, nonce, code, and storage
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, addr, uint256.NewInt(2000))
cb.NonceChange(addr, 0, 6)
cb.CodeChange(addr, 0, []byte{0x60, 0x00}) // PUSH1 0x00
cb.StorageWrite(0, addr, rawSlot, common.HexToHash("0x02"))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
// Verify account fields updated
data := rawdb.ReadAccountSnapshot(db, accountHash)
if len(data) == 0 {
t.Fatal("account snapshot missing after apply")
}
updated, err := types.FullAccount(data)
if err != nil {
t.Fatalf("failed to decode updated account: %v", err)
}
if updated.Balance.Cmp(uint256.NewInt(2000)) != 0 {
t.Errorf("balance wrong: got %v, want 2000", updated.Balance)
}
if updated.Nonce != 6 {
t.Errorf("nonce wrong: got %d, want 6", updated.Nonce)
}
wantCodeHash := crypto.Keccak256([]byte{0x60, 0x00})
if !bytes.Equal(updated.CodeHash, wantCodeHash) {
t.Errorf("code hash wrong: got %x, want %x", updated.CodeHash, wantCodeHash)
}
// Verify code was written
if code := rawdb.ReadCode(db, common.BytesToHash(wantCodeHash)); !bytes.Equal(code, []byte{0x60, 0x00}) {
t.Errorf("code wrong: got %x, want 6000", code)
}
// Verify storage updated. Slots are stored in the canonical snapshot
// encoding (RLP of the value with leading zeros trimmed), the same form
// the download path writes and the trie rebuild consumes.
storageVal := rawdb.ReadStorageSnapshot(db, accountHash, slotHash)
wantStorage, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(common.HexToHash("0x02").Bytes()))
if !bytes.Equal(storageVal, wantStorage) {
t.Errorf("storage wrong: got %x, want %x", storageVal, wantStorage)
}
// Verify storageRoot left stale (unchanged from original)
if updated.Root != original.Root {
t.Errorf("storageRoot should be stale: got %v, want %v", updated.Root, original.Root)
}
}
// TestAccessListApplicationMultiTx verifies that when an account has multiple
// changes at different transaction indices, only the highest index (post-block
// state) is applied.
func TestAccessListApplicationMultiTx(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0x02")
accountHash := crypto.Keccak256Hash(addr[:])
// Write initial account
original := types.StateAccount{
Nonce: 0,
Balance: uint256.NewInt(100),
Root: types.EmptyRootHash,
CodeHash: types.EmptyCodeHash[:],
}
rawdb.WriteAccountSnapshot(db, accountHash, types.SlimAccountRLP(original))
// Build BAL with multiple balance/nonce changes at different tx indices
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, addr, uint256.NewInt(200)) // tx 0
cb.BalanceChange(3, addr, uint256.NewInt(500)) // tx 3
cb.BalanceChange(7, addr, uint256.NewInt(9999)) // tx 7 (final)
cb.NonceChange(addr, 0, 1) // tx 0
cb.NonceChange(addr, 3, 2) // tx 3
cb.NonceChange(addr, 7, 3) // tx 7 (final)
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
data := rawdb.ReadAccountSnapshot(db, accountHash)
updated, err := types.FullAccount(data)
if err != nil {
t.Fatalf("failed to decode updated account: %v", err)
}
// Only the highest tx index values should be applied
if updated.Balance.Cmp(uint256.NewInt(9999)) != 0 {
t.Errorf("balance wrong: got %v, want 9999", updated.Balance)
}
if updated.Nonce != 3 {
t.Errorf("nonce wrong: got %d, want 3", updated.Nonce)
}
}
// TestAccessListApplicationZeroStorage verifies that a BAL slot write with a
// zero post-value deletes the snapshot entry instead of writing 32 zero
// bytes.
func TestAccessListApplicationZeroStorage(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0x06")
accountHash := crypto.Keccak256Hash(addr[:])
// Existing account with a non-zero storage slot.
original := types.StateAccount{
Nonce: 1,
Balance: uint256.NewInt(1),
Root: types.EmptyRootHash,
CodeHash: types.EmptyCodeHash[:],
}
rawdb.WriteAccountSnapshot(db, accountHash, types.SlimAccountRLP(original))
rawSlot := common.HexToHash("0xaa")
slotHash := crypto.Keccak256Hash(rawSlot[:])
rawdb.WriteStorageSnapshot(db, accountHash, slotHash, common.HexToHash("0x42").Bytes())
// BAL writes the slot to zero (deletion).
cb := bal.NewConstructionBlockAccessList()
cb.StorageWrite(0, addr, rawSlot, common.Hash{})
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
if val := rawdb.ReadStorageSnapshot(db, accountHash, slotHash); len(val) != 0 {
t.Errorf("zeroed slot should have been deleted, got %x", val)
}
}
// TestAccessListApplicationNewAccount verifies that applyAccessList creates
// new accounts that don't exist in the DB yet.
func TestAccessListApplicationNewAccount(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0x03")
accountHash := crypto.Keccak256Hash(addr[:])
// Verify account doesn't exist
if data := rawdb.ReadAccountSnapshot(db, accountHash); len(data) > 0 {
t.Fatal("account should not exist yet")
}
// Build BAL for a new account. BAL uses raw slot keys.
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, addr, uint256.NewInt(42))
cb.NonceChange(addr, 0, 1)
rawSlot := common.HexToHash("0xbb")
cb.StorageWrite(0, addr, rawSlot, common.HexToHash("0xff"))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
// Verify account was created
data := rawdb.ReadAccountSnapshot(db, accountHash)
if len(data) == 0 {
t.Fatal("account should exist after apply")
}
account, err := types.FullAccount(data)
if err != nil {
t.Fatalf("failed to decode new account: %v", err)
}
if account.Balance.Cmp(uint256.NewInt(42)) != 0 {
t.Errorf("balance wrong: got %v, want 42", account.Balance)
}
if account.Nonce != 1 {
t.Errorf("nonce wrong: got %d, want 1", account.Nonce)
}
if account.Root != types.EmptyRootHash {
t.Errorf("root should be empty for new account: got %v", account.Root)
}
// Verify storage was written under keccak256(rawSlot) in the canonical
// snapshot encoding (RLP of the value with leading zeros trimmed).
slotHash := crypto.Keccak256Hash(rawSlot[:])
storageVal := rawdb.ReadStorageSnapshot(db, accountHash, slotHash)
wantStorage, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(common.HexToHash("0xff").Bytes()))
if !bytes.Equal(storageVal, wantStorage) {
t.Errorf("storage wrong: got %x, want %x", storageVal, wantStorage)
}
}
// TestAccessListApplicationSkipsUnfetched verifies that applyAccessList does
// not write account entries for addresses whose hash falls in a range that
// hasn't been downloaded yet.
func TestAccessListApplicationSkipsUnfetched(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
// Pick two addresses and order them by hash.
addrA := common.HexToAddress("0x01")
addrB := common.HexToAddress("0x02")
hashA := crypto.Keccak256Hash(addrA[:])
hashB := crypto.Keccak256Hash(addrB[:])
fetchedAddr, fetchedHash := addrA, hashA
unfetchedAddr, unfetchedHash := addrB, hashB
if bytes.Compare(hashA[:], hashB[:]) > 0 {
fetchedAddr, fetchedHash = addrB, hashB
unfetchedAddr, unfetchedHash = addrA, hashA
}
// One remaining task covering [unfetchedHash, MaxHash]: the fetched hash
// is below Next so isFetched returns true; the unfetched hash equals Next
// so isFetched returns false.
syncer.tasks = []*accountTaskV2{{
Next: unfetchedHash,
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
stateCompleted: make(map[common.Hash]struct{}),
}}
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, fetchedAddr, uint256.NewInt(100))
cb.BalanceChange(0, unfetchedAddr, uint256.NewInt(200))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
// The fetched account should have been written.
if data := rawdb.ReadAccountSnapshot(db, fetchedHash); len(data) == 0 {
t.Error("expected fetched account to be written")
}
// The unfetched account should not have been touched.
if data := rawdb.ReadAccountSnapshot(db, unfetchedHash); len(data) != 0 {
t.Errorf("unfetched account should not be written, got %x", data)
}
}
// TestAccessListApplicationSkipsUnfetchedStorage verifies that storage writes
// are also skipped when the parent account's hash range isn't downloaded yet.
func TestAccessListApplicationSkipsUnfetchedStorage(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addrA := common.HexToAddress("0x01")
addrB := common.HexToAddress("0x02")
hashA := crypto.Keccak256Hash(addrA[:])
hashB := crypto.Keccak256Hash(addrB[:])
unfetchedAddr, unfetchedHash := addrB, hashB
if bytes.Compare(hashA[:], hashB[:]) > 0 {
unfetchedAddr, unfetchedHash = addrA, hashA
}
syncer.tasks = []*accountTaskV2{{
Next: unfetchedHash,
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
stateCompleted: make(map[common.Hash]struct{}),
}}
// BAL touches an unfetched account with a storage write AND an empty
// balance mutation. Neither should result in any flat-state writes.
rawSlot := common.HexToHash("0xaa")
slotHash := crypto.Keccak256Hash(rawSlot[:])
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, unfetchedAddr, uint256.NewInt(0)) // empty mutation
cb.StorageWrite(0, unfetchedAddr, rawSlot, common.HexToHash("0xff"))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
if data := rawdb.ReadAccountSnapshot(db, unfetchedHash); len(data) != 0 {
t.Errorf("unfetched account should not be written, got %x", data)
}
if val := rawdb.ReadStorageSnapshot(db, unfetchedHash, slotHash); len(val) != 0 {
t.Errorf("storage for unfetched account should not be written, got %x", val)
}
}
// TestAccessListApplicationPartialStorage verifies that for a large contract
// whose account hasn't been committed yet but whose storage is partially downloaded,
// applyAccessList rolls forward the slots below the active subtask frontier
// while skipping the ones above it and the account-level fields.
func TestAccessListApplicationPartialStorage(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0xc0")
accountHash := crypto.Keccak256Hash(addr[:])
// Two slots, ordered by their storage hash. The subtask frontier sits at
// the higher one so the lower slot is fetched and the higher is not.
loRaw := common.HexToHash("0xaa")
hiRaw := common.HexToHash("0xbb")
loHash := crypto.Keccak256Hash(loRaw[:])
hiHash := crypto.Keccak256Hash(hiRaw[:])
if bytes.Compare(loHash[:], hiHash[:]) > 0 {
loRaw, hiRaw = hiRaw, loRaw
loHash, hiHash = hiHash, loHash
}
// The account sits exactly at Next (held back behind storage retrieval), so
// isFetched returns false. Its subtask has fetched everything below hiHash.
syncer.tasks = []*accountTaskV2{{
Next: accountHash,
Last: common.MaxHash,
SubTasks: map[common.Hash][]*storageTaskV2{
accountHash: {{
Next: hiHash,
Last: common.MaxHash,
}},
},
stateCompleted: make(map[common.Hash]struct{}),
}}
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, addr, uint256.NewInt(500)) // account update must be skipped
cb.StorageWrite(0, addr, loRaw, common.HexToHash("0x11"))
cb.StorageWrite(0, addr, hiRaw, common.HexToHash("0x22"))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
// Account must not be written: it's still being filled.
if data := rawdb.ReadAccountSnapshot(db, accountHash); len(data) != 0 {
t.Errorf("account below Next should not be written, got %x", data)
}
// Slot below the frontier must be rolled forward.
wantLo, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(common.HexToHash("0x11").Bytes()))
if val := rawdb.ReadStorageSnapshot(db, accountHash, loHash); !bytes.Equal(val, wantLo) {
t.Errorf("fetched slot wrong: got %x, want %x", val, wantLo)
}
// Slot above the frontier must be skipped.
if val := rawdb.ReadStorageSnapshot(db, accountHash, hiHash); len(val) != 0 {
t.Errorf("unfetched slot should not be written, got %x", val)
}
}
func TestIsStorageFetched(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
var (
fetchedAcct = common.HexToHash("0x10") // below Next
fillingAcct = common.HexToHash("0x40") // == Next
beyondAcct = common.HexToHash("0x90") // above Last
)
prunedHiAcct := common.HexToHash("0x70") // in [Next, Last], still filling
completeAcct := common.HexToHash("0x71") // in [Next, Last], storage is complete
syncer.tasks = []*accountTaskV2{{
Next: fillingAcct,
Last: common.HexToHash("0x80"),
SubTasks: map[common.Hash][]*storageTaskV2{
fillingAcct: {{
Next: common.HexToHash("0x50"),
Last: common.MaxHash,
}},
prunedHiAcct: {{
Next: common.HexToHash("0x30"),
Last: common.HexToHash("0x60"),
}},
},
stateCompleted: map[common.Hash]struct{}{
completeAcct: {},
},
}}
noSubAcct := common.HexToHash("0x60") // in [Next,Last] but no subtasks yet
tests := []struct {
name string
account common.Hash
slot common.Hash
want bool
}{
{"account fully synced", fetchedAcct, common.HexToHash("0xff"), true},
{"storage fully synced", completeAcct, common.HexToHash("0xff"), true},
{"account before all tasks", common.HexToHash("0x01"), common.HexToHash("0xff"), true},
{"account beyond all tasks", beyondAcct, common.HexToHash("0xff"), true},
{"slot below storage frontier", fillingAcct, common.HexToHash("0x20"), true},
{"slot at storage frontier", fillingAcct, common.HexToHash("0x50"), false},
{"slot above storage frontier", fillingAcct, common.HexToHash("0x70"), false},
{"account filling, no subtasks", noSubAcct, common.HexToHash("0x01"), false},
{"slot in pruned low range", prunedHiAcct, common.HexToHash("0x10"), true},
{"slot at remaining frontier", prunedHiAcct, common.HexToHash("0x30"), false},
{"slot within remaining range", prunedHiAcct, common.HexToHash("0x50"), false},
{"slot in pruned high range", prunedHiAcct, common.HexToHash("0x90"), true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := syncer.isStorageFetched(tt.account, tt.slot); got != tt.want {
t.Errorf("isStorageFetched(%v, %v) = %v, want %v", tt.account, tt.slot, got, tt.want)
}
})
}
}
// TestAccessListApplicationSameTxCreateDestroy tests the edge case where an
// account is created and self-destructed in the same transaction during the
// pivot gap. Per EIP-7928, such accounts appear in the BAL with a balance
// change to zero but no nonce or code changes. Since the account didn't exist
// at the old pivot and doesn't exist at the new pivot (destroyed),
// applyAccessList should not leave a zero-balance account in the snapshot.
// Per EIP-161, empty accounts (zero balance, zero nonce, no code) must not exist
// in state.
func TestAccessListApplicationSameTxCreateDestroy(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0x04")
accountHash := crypto.Keccak256Hash(addr[:])
// Verify account doesn't exist before apply
if data := rawdb.ReadAccountSnapshot(db, accountHash); len(data) > 0 {
t.Fatal("account should not exist yet")
}
// Build a BAL mimicking same-tx create+destroy: the account appears
// with a balance change to zero and nothing else.
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, addr, uint256.NewInt(0))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
// Check if applyAccessList created an account.
data := rawdb.ReadAccountSnapshot(db, accountHash)
if len(data) > 0 {
// Account was created
account, err := types.FullAccount(data)
if err != nil {
t.Fatalf("failed to decode account: %v", err)
}
t.Errorf("account created for same-tx create+destroy: "+
"balance=%v, nonce=%d, codeHash=%x, root=%v",
account.Balance, account.Nonce, account.CodeHash, account.Root)
}
}
// TestAccessListApplicationDestroyExisting verifies that when a BAL reduces
// an existing flat-state account to nonce=0, balance=0, empty code (the
// pre-funded destruction pattern), applyAccessList deletes the entry rather
// than leaving it zereod.
func TestAccessListApplicationDestroyExisting(t *testing.T) {
t.Parallel()
db := rawdb.NewMemoryDatabase()
syncer := newSyncerV2(db, rawdb.HashScheme)
addr := common.HexToAddress("0x05")
accountHash := crypto.Keccak256Hash(addr[:])
// Pre-funded account: has balance, no nonce, no code.
original := types.StateAccount{
Nonce: 0,
Balance: uint256.NewInt(1000),
Root: types.EmptyRootHash,
CodeHash: types.EmptyCodeHash[:],
}
rawdb.WriteAccountSnapshot(db, accountHash, types.SlimAccountRLP(original))
// The BAL zeros the balance. Nonce and code were already empty, so
// the account ends up fully empty after applying.
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, addr, uint256.NewInt(0))
b := buildTestBAL(t, cb)
applyBAL(t, syncer, b)
if data := rawdb.ReadAccountSnapshot(db, accountHash); len(data) != 0 {
account, _ := types.FullAccount(data)
t.Errorf("destroyed account should have been deleted from flat state, "+
"got balance=%v, nonce=%d, codeHash=%x",
account.Balance, account.Nonce, account.CodeHash)
}
}

View file

@ -79,10 +79,16 @@ type Backend interface {
Handle(peer *Peer, packet Packet) error
}
// MakeProtocols constructs the P2P protocol definitions for `snap`.
func MakeProtocols(backend Backend) []p2p.Protocol {
protocols := make([]p2p.Protocol, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// MakeProtocols constructs the P2P protocol definitions for `snap`. When snapV2
// is set, the snap/2 version is advertised in addition to the default versions;
// otherwise only the default (snap/1) versions are offered on the wire.
func MakeProtocols(backend Backend, snapV2 bool) []p2p.Protocol {
versions := ProtocolVersions
if snapV2 {
versions = append([]uint{SNAP2}, versions...)
}
protocols := make([]p2p.Protocol, len(versions))
for i, version := range versions {
protocols[i] = p2p.Protocol{
Name: ProtocolName,
Version: version,
@ -132,7 +138,6 @@ var snap1 = map[uint64]msgHandler{
TrieNodesMsg: handleTrieNodes,
}
// nolint:unused
var snap2 = map[uint64]msgHandler{
GetAccountRangeMsg: handleGetAccountRange,
AccountRangeMsg: handleAccountRange,
@ -141,7 +146,7 @@ var snap2 = map[uint64]msgHandler{
GetByteCodesMsg: handleGetByteCodes,
ByteCodesMsg: handleByteCodes,
GetAccessListsMsg: handleGetAccessLists,
// AccessListsMsg: TODO
AccessListsMsg: handleAccessLists,
}
// HandleMessage is invoked whenever an inbound message is received from a
@ -162,8 +167,8 @@ func HandleMessage(backend Backend, peer *Peer) error {
switch peer.version {
case SNAP1:
handlers = snap1
//case SNAP2:
// handlers = snap2
case SNAP2:
handlers = snap2
default:
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
}

View file

@ -553,7 +553,6 @@ func handleTrieNodes(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &TrieNodesPacket{res.ID, nodes})
}
// nolint:unused
func handleGetAccessLists(backend Backend, msg Decoder, peer *Peer) error {
var req GetAccessListsPacket
if err := msg.Decode(&req); err != nil {
@ -598,3 +597,15 @@ func ServiceGetAccessListsQuery(chain *core.BlockChain, req *GetAccessListsPacke
}
return response
}
func handleAccessLists(backend Backend, msg Decoder, peer *Peer) error {
res := new(AccessListsPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
tresp := tracker.Response{ID: res.ID, MsgCode: AccessListsMsg, Size: res.AccessLists.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("BALs: %w", err)
}
return backend.Handle(peer, res)
}

View file

@ -155,7 +155,7 @@ func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes int) erro
}
// RequestTrieNodes fetches a batch of account or storage trie nodes rooted in
// a specific state trie. The `count` is the total count of paths being requested.
// a specific state trie, or off of a specific account.
func (p *Peer) RequestTrieNodes(id uint64, root common.Hash, count int, paths []TrieNodePathSet, bytes int) error {
p.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes))
@ -176,3 +176,22 @@ func (p *Peer) RequestTrieNodes(id uint64, root common.Hash, count int, paths []
Bytes: uint64(bytes),
})
}
// RequestAccessLists fetches a batch of BALs by block hash.
func (p *Peer) RequestAccessLists(id uint64, hashes []common.Hash, bytes int) error {
p.logger.Trace("Fetching set of BALs", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes))
err := p.tracker.Track(tracker.Request{
ReqCode: GetAccessListsMsg,
RespCode: AccessListsMsg,
ID: id,
Size: len(hashes),
})
if err != nil {
return err
}
return p2p.Send(p.rw, GetAccessListsMsg, &GetAccessListsPacket{
ID: id,
Hashes: hashes,
Bytes: uint64(bytes),
})
}

View file

@ -18,9 +18,12 @@ package snap
import (
"encoding/json"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
)
// Legacy sync progress definitions
@ -39,7 +42,7 @@ type legacyProgress struct {
Tasks []*legacyAccountTask // The suspended account tasks (contract tasks within)
}
func compareProgress(a legacyProgress, b SyncProgress) bool {
func compareProgress(a legacyProgress, b syncProgress) bool {
if len(a.Tasks) != len(b.Tasks) {
return false
}
@ -96,8 +99,8 @@ func makeLegacyProgress() legacyProgress {
}
}
func convertLegacy(legacy legacyProgress) SyncProgress {
var progress SyncProgress
func convertLegacy(legacy legacyProgress) syncProgress {
var progress syncProgress
for i, task := range legacy.Tasks {
subTasks := make(map[common.Hash][]*storageTask)
for owner, list := range task.SubTasks {
@ -130,7 +133,7 @@ func TestSyncProgressCompatibility(t *testing.T) {
if err != nil {
t.Fatalf("Failed to marshal progress %v", err)
}
var dec SyncProgress
var dec syncProgress
if err := json.Unmarshal(blob, &dec); err != nil {
t.Fatalf("Failed to unmarshal progress %v", err)
}
@ -152,3 +155,116 @@ func TestSyncProgressCompatibility(t *testing.T) {
t.Fatal("sync progress is not forward compatible")
}
}
// TestSyncProgressV1Discarded verifies that a persisted blob written in the
// old unversioned format (raw JSON, no version prefix) is detected and
// discarded on load, that the syncer falls through to a fresh start, and
// that any orphan flat-state entries from the prior format are wiped.
func TestSyncProgressV1Discarded(t *testing.T) {
db := rawdb.NewMemoryDatabase()
// Write a raw JSON blob (no version byte) to simulate progress persisted
// by a prior geth binary (snap/1 format).
legacy := map[string]any{
"Root": common.HexToHash("0xaaaa"),
"BlockNumber": uint64(42),
"Tasks": []any{},
}
blob, err := json.Marshal(legacy)
if err != nil {
t.Fatalf("marshal legacy: %v", err)
}
rawdb.WriteSnapshotSyncStatus(db, blob)
// Pre-write orphan flat-state entries that should be wiped on fresh start.
orphanAccountHash := common.HexToHash("0xdeadbeef")
rawdb.WriteAccountSnapshot(db, orphanAccountHash, []byte{0xde, 0xad})
orphanStorageAccount := common.HexToHash("0xfeedface")
orphanStorageSlot := common.HexToHash("0xabcd")
rawdb.WriteStorageSnapshot(db, orphanStorageAccount, orphanStorageSlot, []byte{0xff, 0xff})
syncer := newSyncerV2(db, rawdb.HashScheme)
syncer.loadSyncStatus()
if syncer.previousPivot != nil {
t.Fatalf("expected previousPivot nil after discarding old format, got %+v", syncer.previousPivot)
}
if len(syncer.tasks) != accountConcurrency {
t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks))
}
if data := rawdb.ReadAccountSnapshot(db, orphanAccountHash); len(data) != 0 {
t.Errorf("orphan account snapshot should be wiped, got %x", data)
}
if val := rawdb.ReadStorageSnapshot(db, orphanStorageAccount, orphanStorageSlot); len(val) != 0 {
t.Errorf("orphan storage snapshot should be wiped, got %x", val)
}
}
// TestSyncProgressV2RoundTrip verifies that the persisted blob is framed
// with the expected version byte at offset 0, and that all six status
// counters survive the round-trip.
func TestSyncProgressV2RoundTrip(t *testing.T) {
db := rawdb.NewMemoryDatabase()
saver := newSyncerV2(db, rawdb.HashScheme)
saver.pivot = &types.Header{Number: new(big.Int).SetUint64(123), Difficulty: common.Big0}
saver.accountSynced = 1
saver.accountBytes = 2
saver.bytecodeSynced = 3
saver.bytecodeBytes = 4
saver.storageSynced = 5
saver.storageBytes = 6
saver.saveSyncStatus()
raw := rawdb.ReadSnapshotSyncStatus(db)
if len(raw) == 0 || raw[0] != syncProgressVersion {
t.Fatalf("expected version byte %d at offset 0, got blob %x", syncProgressVersion, raw)
}
loader := newSyncerV2(db, rawdb.HashScheme)
loader.loadSyncStatus()
for _, c := range []struct {
name string
got uint64
want uint64
}{
{"accountSynced", loader.accountSynced, 1},
{"accountBytes", uint64(loader.accountBytes), 2},
{"bytecodeSynced", loader.bytecodeSynced, 3},
{"bytecodeBytes", uint64(loader.bytecodeBytes), 4},
{"storageSynced", loader.storageSynced, 5},
{"storageBytes", uint64(loader.storageBytes), 6},
} {
if c.got != c.want {
t.Errorf("%s mismatch: got %d, want %d", c.name, c.got, c.want)
}
}
}
// TestSyncProgressCorruptPayload verifies that a persisted blob with the
// correct version byte but unparseable JSON body is discarded, triggers a
// fresh-start fall-through (not a panic or a stale-state load), and the
// orphan flat state is wiped along with the corrupt status.
func TestSyncProgressCorruptPayload(t *testing.T) {
db := rawdb.NewMemoryDatabase()
// Version byte followed by garbage that isn't valid JSON.
rawdb.WriteSnapshotSyncStatus(db, []byte{syncProgressVersion, 0x7b, 0x7b, 0x7b})
// Pre-write orphan flat-state entries that should be wiped on fresh start.
orphanAccountHash := common.HexToHash("0xdeadbeef")
rawdb.WriteAccountSnapshot(db, orphanAccountHash, []byte{0xde, 0xad})
syncer := newSyncerV2(db, rawdb.HashScheme)
syncer.loadSyncStatus()
if syncer.previousPivot != nil {
t.Fatalf("expected previousPivot nil after corrupt payload, got %+v", syncer.previousPivot)
}
if len(syncer.tasks) != accountConcurrency {
t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks))
}
if data := rawdb.ReadAccountSnapshot(db, orphanAccountHash); len(data) != 0 {
t.Errorf("orphan account snapshot should be wiped, got %x", data)
}
}

View file

@ -28,20 +28,22 @@ import (
// Constants to match up protocol versions and messages
const (
SNAP1 = 1
//SNAP2 = 2
SNAP2 = 2
)
// ProtocolName is the official short name of the `snap` protocol used during
// devp2p capability negotiation.
const ProtocolName = "snap"
// ProtocolVersions are the supported versions of the `snap` protocol (first
// is primary).
// ProtocolVersions are the supported versions of the `snap` protocol advertised
// by default (first is primary). snap/2 is not safe to advertise unconditionally
// yet, so it is gated behind a feature flag and appended in MakeProtocols rather
// than listed here.
var ProtocolVersions = []uint{SNAP1}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ /*SNAP2: 10,*/ SNAP1: 8}
// protocolLengths are the number of implemented messages corresponding to
// different protocol versions. snap/2 adds GetAccessLists/AccessLists (0x08/0x09).
var protocolLengths = map[uint]uint64{SNAP2: 10, SNAP1: 8}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024

View file

@ -375,10 +375,10 @@ type healTask struct {
codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval, indexed by code hash
}
// SyncProgress is a database entry to allow suspending and resuming a snapshot state
// syncProgress is a database entry to allow suspending and resuming a snapshot state
// sync. Opposed to full and fast sync, there is no way to restart a suspended
// snap sync without prior knowledge of the suspension point.
type SyncProgress struct {
type syncProgress struct {
Tasks []*accountTask // The suspended account tasks (contract tasks within)
// Status report during syncing phase
@ -396,9 +396,9 @@ type SyncProgress struct {
BytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
}
// SyncPending is analogous to SyncProgress, but it's used to report on pending
// syncPending is analogous to syncProgress, but it's used to report on pending
// ephemeral sync progress that doesn't get persisted into the database.
type SyncPending struct {
type syncPending struct {
TrienodeHeal uint64 // Number of state trie nodes pending
BytecodeHeal uint64 // Number of bytecodes pending
}
@ -430,7 +430,7 @@ type SyncPeer interface {
Log() log.Logger
}
// Syncer is an Ethereum account and storage trie syncer based on snapshots and
// syncer is an Ethereum account and storage trie syncer based on snapshots and
// the snap protocol. It's purpose is to download all the accounts and storage
// slots from remote peers and reassemble chunks of the state trie, on top of
// which a state sync can be run to fix any gaps / overlaps.
@ -441,7 +441,7 @@ type SyncPeer interface {
// - The peer remains connected, but does not deliver a response in time
// - The peer delivers a stale response after a previous timeout
// - The peer delivers a refusal to serve the requested state
type Syncer struct {
type syncer struct {
db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
scheme string // Node scheme used in node database
@ -473,7 +473,7 @@ type Syncer struct {
storageSynced uint64 // Number of storage slots downloaded
storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
extProgress *SyncProgress // progress that can be exposed to external caller.
extProgress *syncProgress // progress that can be exposed to external caller.
// Request tracking during healing phase
trienodeHealIdlers map[string]struct{} // Peers that aren't serving trie node requests
@ -511,10 +511,10 @@ type Syncer struct {
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
}
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
// snap protocol.
func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
return &Syncer{
// newSyncer creates the snap/1 state syncer. It is unexported; callers outside
// the package obtain a Syncer through NewV1Syncer.
func newSyncer(db ethdb.KeyValueStore, scheme string) *syncer {
return &syncer{
db: db,
scheme: scheme,
@ -540,12 +540,12 @@ func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),
extProgress: new(SyncProgress),
extProgress: new(syncProgress),
}
}
// Register injects a new data source into the syncer's peerset.
func (s *Syncer) Register(peer SyncPeer) error {
func (s *syncer) Register(peer SyncPeer) error {
// Make sure the peer is not registered yet
id := peer.ID()
@ -573,7 +573,7 @@ func (s *Syncer) Register(peer SyncPeer) error {
}
// Unregister injects a new data source into the syncer's peerset.
func (s *Syncer) Unregister(id string) error {
func (s *syncer) Unregister(id string) error {
// Remove all traces of the peer from the registry
s.lock.Lock()
if _, ok := s.peers[id]; !ok {
@ -604,7 +604,7 @@ func (s *Syncer) Unregister(id string) error {
// with the given root and reconstruct the nodes based on the snapshot leaves.
// Previously downloaded segments will not be redownloaded of fixed, rather any
// errors will be healed after the leaves are fully accumulated.
func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
func (s *syncer) Sync(root common.Hash, cancel chan struct{}) error {
// Move the trie root from any previous value, revert stateless markers for
// any peers and initialize the syncer if it was not yet run
s.lock.Lock()
@ -719,7 +719,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
// Update sync progress
s.lock.Lock()
s.extProgress = &SyncProgress{
s.extProgress = &syncProgress{
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
@ -772,8 +772,8 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
var progress SyncProgress
func (s *syncer) loadSyncStatus() {
var progress syncProgress
if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
if err := json.Unmarshal(status, &progress); err != nil {
@ -891,7 +891,7 @@ func (s *Syncer) loadSyncStatus() {
}
// saveSyncStatus marshals the remaining sync tasks into leveldb.
func (s *Syncer) saveSyncStatus() {
func (s *syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
// Claim the right boundary as incomplete before flushing the
@ -921,7 +921,7 @@ func (s *Syncer) saveSyncStatus() {
}
}
// Store the actual progress markers
progress := &SyncProgress{
progress := &syncProgress{
Tasks: s.tasks,
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
@ -942,10 +942,10 @@ func (s *Syncer) saveSyncStatus() {
}
// Progress returns the snap sync status statistics.
func (s *Syncer) Progress() (*SyncProgress, *SyncPending) {
func (s *syncer) Progress() (*syncProgress, *syncPending) {
s.lock.Lock()
defer s.lock.Unlock()
pending := new(SyncPending)
pending := new(syncPending)
if s.healer != nil {
pending.TrienodeHeal = uint64(len(s.healer.trieTasks))
pending.BytecodeHeal = uint64(len(s.healer.codeTasks))
@ -955,7 +955,7 @@ func (s *Syncer) Progress() (*SyncProgress, *SyncPending) {
// cleanAccountTasks removes account range retrieval tasks that have already been
// completed.
func (s *Syncer) cleanAccountTasks() {
func (s *syncer) cleanAccountTasks() {
// If the sync was already done before, don't even bother
if len(s.tasks) == 0 {
return
@ -980,7 +980,7 @@ func (s *Syncer) cleanAccountTasks() {
// cleanStorageTasks iterates over all the account tasks and storage sub-tasks
// within, cleaning any that have been completed.
func (s *Syncer) cleanStorageTasks() {
func (s *syncer) cleanStorageTasks() {
for _, task := range s.tasks {
for account, subtasks := range task.SubTasks {
// Remove storage range retrieval tasks that completed
@ -1017,7 +1017,7 @@ func (s *Syncer) cleanStorageTasks() {
// assignAccountTasks attempts to match idle peers to pending account range
// retrievals.
func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) {
func (s *syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
@ -1114,7 +1114,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
}
// assignBytecodeTasks attempts to match idle peers to pending code retrievals.
func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *bytecodeRequest, cancel chan struct{}) {
func (s *syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *bytecodeRequest, cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
@ -1217,7 +1217,7 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
// assignStorageTasks attempts to match idle peers to pending storage range
// retrievals.
func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *storageRequest, cancel chan struct{}) {
func (s *syncer) assignStorageTasks(success chan *storageResponse, fail chan *storageRequest, cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
@ -1374,7 +1374,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// assignTrienodeHealTasks attempts to match idle peers to trie node requests to
// heal any trie errors caused by the snap sync's chunked retrieval model.
func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fail chan *trienodeHealRequest, cancel chan struct{}) {
func (s *syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fail chan *trienodeHealRequest, cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
@ -1502,7 +1502,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
// assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
// heal any trie errors caused by the snap sync's chunked retrieval model.
func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fail chan *bytecodeHealRequest, cancel chan struct{}) {
func (s *syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fail chan *bytecodeHealRequest, cancel chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
@ -1618,7 +1618,7 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
// revertRequests locates all the currently pending requests from a particular
// peer and reverts them, rescheduling for others to fulfill.
func (s *Syncer) revertRequests(peer string) {
func (s *syncer) revertRequests(peer string) {
// Gather the requests first, revertals need the lock too
s.lock.Lock()
var accountReqs []*accountRequest
@ -1673,7 +1673,7 @@ func (s *Syncer) revertRequests(peer string) {
// scheduleRevertAccountRequest asks the event loop to clean up an account range
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) {
func (s *syncer) scheduleRevertAccountRequest(req *accountRequest) {
select {
case req.revert <- req:
// Sync event loop notified
@ -1689,7 +1689,7 @@ func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) {
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertAccountRequest.
func (s *Syncer) revertAccountRequest(req *accountRequest) {
func (s *syncer) revertAccountRequest(req *accountRequest) {
log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id)
select {
case <-req.stale:
@ -1718,7 +1718,7 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) {
// scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request
// and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
func (s *syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
select {
case req.revert <- req:
// Sync event loop notified
@ -1734,7 +1734,7 @@ func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertBytecodeRequest.
func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
func (s *syncer) revertBytecodeRequest(req *bytecodeRequest) {
log.Debug("Reverting bytecode request", "peer", req.peer)
select {
case <-req.stale:
@ -1763,7 +1763,7 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
// scheduleRevertStorageRequest asks the event loop to clean up a storage range
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) {
func (s *syncer) scheduleRevertStorageRequest(req *storageRequest) {
select {
case req.revert <- req:
// Sync event loop notified
@ -1779,7 +1779,7 @@ func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) {
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertStorageRequest.
func (s *Syncer) revertStorageRequest(req *storageRequest) {
func (s *syncer) revertStorageRequest(req *storageRequest) {
log.Debug("Reverting storage request", "peer", req.peer)
select {
case <-req.stale:
@ -1812,7 +1812,7 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) {
// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
func (s *syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
select {
case req.revert <- req:
// Sync event loop notified
@ -1828,7 +1828,7 @@ func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertTrienodeHealRequest.
func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
func (s *syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
log.Debug("Reverting trienode heal request", "peer", req.peer)
select {
case <-req.stale:
@ -1857,7 +1857,7 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal
// request and return all failed retrieval tasks to the scheduler for reassignment.
func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
func (s *syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
select {
case req.revert <- req:
// Sync event loop notified
@ -1873,7 +1873,7 @@ func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
//
// Note, this needs to run on the event runloop thread to reschedule to idle peers.
// On peer threads, use scheduleRevertBytecodeHealRequest.
func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
func (s *syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
log.Debug("Reverting bytecode heal request", "peer", req.peer)
select {
case <-req.stale:
@ -1902,7 +1902,7 @@ func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
// processAccountResponse integrates an already validated account range response
// into the account tasks.
func (s *Syncer) processAccountResponse(res *accountResponse) {
func (s *syncer) processAccountResponse(res *accountResponse) {
// Switch the task from pending to filling
res.task.req = nil
res.task.res = res
@ -2024,7 +2024,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
// processBytecodeResponse integrates an already validated bytecode response
// into the account tasks.
func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
func (s *syncer) processBytecodeResponse(res *bytecodeResponse) {
batch := s.db.NewBatch()
var codes uint64
@ -2068,7 +2068,7 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
// processStorageResponse integrates an already validated storage response
// into the account tasks.
func (s *Syncer) processStorageResponse(res *storageResponse) {
func (s *syncer) processStorageResponse(res *storageResponse) {
// Switch the subtask from pending to idle
if res.subTask != nil {
res.subTask.req = nil
@ -2308,7 +2308,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
func (s *syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
@ -2385,7 +2385,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
}
}
func (s *Syncer) commitHealer(force bool) {
func (s *syncer) commitHealer(force bool) {
if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize {
return
}
@ -2401,7 +2401,7 @@ func (s *Syncer) commitHealer(force bool) {
// processBytecodeHealResponse integrates an already validated bytecode response
// into the healer tasks.
func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
func (s *syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
for i, hash := range res.hashes {
node := res.codes[i]
@ -2431,7 +2431,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
// forwardAccountTask takes a filled account task and persists anything available
// into the database, after which it forwards the next account marker so that the
// task's next chunk may be filled.
func (s *Syncer) forwardAccountTask(task *accountTask) {
func (s *syncer) forwardAccountTask(task *accountTask) {
// Remove any pending delivery
res := task.res
if res == nil {
@ -2521,7 +2521,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// OnAccounts is a callback method to invoke when a range of accounts are
// received from a remote peer.
func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
func (s *syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
size := common.StorageSize(len(hashes) * common.HashLength)
for _, account := range accounts {
size += common.StorageSize(len(account))
@ -2621,7 +2621,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// OnByteCodes is a callback method to invoke when a batch of contract
// bytes codes are received from a remote peer.
func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
func (s *syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
s.lock.RLock()
syncing := !s.snapped
s.lock.RUnlock()
@ -2634,7 +2634,7 @@ func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// onByteCodes is a callback method to invoke when a batch of contract
// bytes codes are received from a remote peer in the syncing phase.
func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
func (s *syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
var size common.StorageSize
for _, code := range bytecodes {
size += common.StorageSize(len(code))
@ -2732,7 +2732,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// OnStorage is a callback method to invoke when ranges of storage slots
// are received from a remote peer.
func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
func (s *syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
// Gather some trace stats to aid in debugging issues
var (
hashCount int
@ -2881,7 +2881,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// OnTrieNodes is a callback method to invoke when a batch of trie nodes
// are received from a remote peer.
func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
func (s *syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
var size common.StorageSize
for _, node := range trienodes {
size += common.StorageSize(len(node))
@ -2988,7 +2988,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// onHealByteCodes is a callback method to invoke when a batch of contract
// bytes codes are received from a remote peer in the healing phase.
func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
func (s *syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
var size common.StorageSize
for _, code := range bytecodes {
size += common.StorageSize(len(code))
@ -3088,7 +3088,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// or storage slot) is downloaded during the healing stage. The flat states
// can be persisted blindly and can be fixed later in the generation stage.
// Note it's not concurrent safe, please handle the concurrent issue outside.
func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
func (s *syncer) onHealState(paths [][]byte, value []byte) error {
if len(paths) == 1 {
var account types.StateAccount
if err := rlp.DecodeBytes(value, &account); err != nil {
@ -3115,7 +3115,7 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
// report calculates various status reports and provides it to the user.
func (s *Syncer) report(force bool) {
func (s *syncer) report(force bool) {
if len(s.tasks) > 0 {
s.reportSyncProgress(force)
return
@ -3124,7 +3124,7 @@ func (s *Syncer) report(force bool) {
}
// reportSyncProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportSyncProgress(force bool) {
func (s *syncer) reportSyncProgress(force bool) {
// Don't report all the events, just occasionally
if !force && time.Since(s.logTime) < 8*time.Second {
return
@ -3170,7 +3170,7 @@ func (s *Syncer) reportSyncProgress(force bool) {
}
// reportHealProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportHealProgress(force bool) {
func (s *syncer) reportHealProgress(force bool) {
// Don't report all the events, just occasionally
if !force && time.Since(s.logTime) < 8*time.Second {
return

View file

@ -129,7 +129,7 @@ type (
type testPeer struct {
id string
test *testing.T
remote *Syncer
remote *syncer
logger log.Logger
accountTrie *trie.Trie
accountValues []*kv
@ -623,9 +623,9 @@ func testSyncBloatedProof(t *testing.T, scheme string) {
}
}
func setupSyncer(scheme string, peers ...*testPeer) *Syncer {
func setupSyncer(scheme string, peers ...*testPeer) *syncer {
stateDb := rawdb.NewMemoryDatabase()
syncer := NewSyncer(stateDb, scheme)
syncer := newSyncer(stateDb, scheme)
for _, peer := range peers {
syncer.Register(peer)
peer.remote = syncer

View file

@ -0,0 +1,147 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
)
// Progress is the set of snap-syncer progress that eth/downloader surfaces in
// ethereum.SyncProgress. The two syncer versions report it via different types
// (syncProgress / syncProgressV2). The adapters normalize to this.
type Progress struct {
AccountSynced uint64
AccountBytes common.StorageSize
BytecodeSynced uint64
BytecodeBytes common.StorageSize
StorageSynced uint64
StorageBytes common.StorageSize
// Healing-phase status. Reported by snap/1 only.
TrienodeHealSynced uint64
TrienodeHealBytes common.StorageSize
BytecodeHealSynced uint64
BytecodeHealBytes common.StorageSize
HealingTrienodes uint64
HealingBytecode uint64
}
// Syncer is the uniform view over the snap/1 (*syncer) and snap/2 (*syncerV2)
// state syncers, consumed by eth/downloader. Peers are passed as SyncPeerV2,
// which is a superset of SyncPeer, so a single peer value works for both
// underlying syncers.
type Syncer interface {
Sync(pivot *types.Header, cancel chan struct{}) error
Progress() Progress
Register(peer SyncPeerV2) error
Unregister(id string) error
OnAccounts(peer SyncPeerV2, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error
OnStorage(peer SyncPeerV2, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error
OnByteCodes(peer SyncPeerV2, id uint64, bytecodes [][]byte) error
OnTrieNodes(peer SyncPeerV2, id uint64, trienodes [][]byte) error
OnAccessLists(peer SyncPeerV2, id uint64, lists rlp.RawList[rlp.RawValue]) error
// Version is the snap protocol version this syncer implements.
Version() uint
}
// NewV1Syncer returns a Syncer backed by the snap/1 state syncer.
func NewV1Syncer(db ethdb.Database, scheme string) Syncer {
return syncerV1Adapter{newSyncer(db, scheme)}
}
// NewV2Syncer returns a Syncer backed by the snap/2 state syncer.
func NewV2Syncer(db ethdb.Database, scheme string) Syncer {
return syncerV2Adapter{newSyncerV2(db, scheme)}
}
// syncerV1Adapter adapts the snap/1 *syncer to Syncer.
type syncerV1Adapter struct{ *syncer }
func (s syncerV1Adapter) Sync(pivot *types.Header, cancel chan struct{}) error {
return s.syncer.Sync(pivot.Root, cancel)
}
func (s syncerV1Adapter) Progress() Progress {
progress, pending := s.syncer.Progress()
return Progress{
AccountSynced: progress.AccountSynced,
AccountBytes: progress.AccountBytes,
BytecodeSynced: progress.BytecodeSynced,
BytecodeBytes: progress.BytecodeBytes,
StorageSynced: progress.StorageSynced,
StorageBytes: progress.StorageBytes,
TrienodeHealSynced: progress.TrienodeHealSynced,
TrienodeHealBytes: progress.TrienodeHealBytes,
BytecodeHealSynced: progress.BytecodeHealSynced,
BytecodeHealBytes: progress.BytecodeHealBytes,
HealingTrienodes: pending.TrienodeHeal,
HealingBytecode: pending.BytecodeHeal,
}
}
// The snap/1 syncer's methods take SyncPeer. SyncPeerV2 is a superset, so the
// incoming peer satisfies them directly. Explicit forwarders are needed because
// the parameter types differ.
func (s syncerV1Adapter) Register(peer SyncPeerV2) error { return s.syncer.Register(peer) }
func (s syncerV1Adapter) OnAccounts(peer SyncPeerV2, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
return s.syncer.OnAccounts(peer, id, hashes, accounts, proof)
}
func (s syncerV1Adapter) OnStorage(peer SyncPeerV2, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
return s.syncer.OnStorage(peer, id, hashes, slots, proof)
}
func (s syncerV1Adapter) OnByteCodes(peer SyncPeerV2, id uint64, bytecodes [][]byte) error {
return s.syncer.OnByteCodes(peer, id, bytecodes)
}
func (s syncerV1Adapter) OnTrieNodes(peer SyncPeerV2, id uint64, trienodes [][]byte) error {
return s.syncer.OnTrieNodes(peer, id, trienodes)
}
// OnAccessLists is a no-op for snap/1, which never requests BALs.
func (syncerV1Adapter) OnAccessLists(SyncPeerV2, uint64, rlp.RawList[rlp.RawValue]) error {
return nil
}
// Version is SNAP1
func (syncerV1Adapter) Version() uint { return SNAP1 }
// syncerV2Adapter adapts the snap/2 *syncerV2 to Syncer. Its peer-facing methods
// already take SyncPeerV2 and its Sync already takes a header, so only Progress
// (different return type) and OnTrieNodes (absent) need wrapping.
type syncerV2Adapter struct{ *syncerV2 }
func (s syncerV2Adapter) Progress() Progress {
progress := s.syncerV2.Progress()
return Progress{
AccountSynced: progress.AccountSynced,
AccountBytes: progress.AccountBytes,
BytecodeSynced: progress.BytecodeSynced,
BytecodeBytes: progress.BytecodeBytes,
StorageSynced: progress.StorageSynced,
StorageBytes: progress.StorageBytes,
}
}
// OnTrieNodes is a no-op for snap/2, which heals via BALs rather than trie nodes.
// Stale responses from snap/1 peers are silently ignored.
func (syncerV2Adapter) OnTrieNodes(SyncPeerV2, uint64, [][]byte) error { return nil }
// Version is SNAP2; snap/2 needs SNAP2 peers to serve the BAL requests it issues.
func (syncerV2Adapter) Version() uint { return SNAP2 }

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff