From df0bd8960cc14527fe0c08bc1b008ad221482d4b Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 15 Sep 2025 15:34:57 +0200 Subject: [PATCH] core/txpool/blobpool: migrate billy to new slot size (#31966) Implements a migration path for the blobpool slotter --------- Co-authored-by: lightclient Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: Gary Rong --- core/txpool/blobpool/blobpool.go | 21 ++++- core/txpool/blobpool/blobpool_test.go | 109 ++++++++++++++++++++++++++ core/txpool/blobpool/limbo.go | 16 +++- core/txpool/blobpool/slotter.go | 84 +++++++++++++++++++- core/txpool/blobpool/slotter_test.go | 45 ++++++++++- crypto/kzg4844/kzg4844.go | 4 +- go.mod | 2 +- go.sum | 4 +- 8 files changed, 274 insertions(+), 11 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 722c176bb1..55d24c7a93 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -55,6 +55,12 @@ const ( // tiny overflows causing all txs to move a shelf higher, wasting disk space. txAvgSize = 4 * 1024 + // txBlobOverhead is an approximation of the overhead that an additional blob + // has on transaction size. This is added to the slotter to avoid tiny + // overflows causing all txs to move a shelf higher, wasting disk space. A + // small buffer is added to the proof overhead. + txBlobOverhead = uint32(kzg4844.CellProofsPerBlob*len(kzg4844.Proof{}) + 64) + // txMaxSize is the maximum size a single transaction can have, outside // the included blobs. Since blob transactions are pulled instead of pushed, // and only a small metadata is kept in ram, the rest is on disk, there is @@ -83,6 +89,10 @@ const ( // limboedTransactionStore is the subfolder containing the currently included // but not yet finalized transaction blobs. limboedTransactionStore = "limbo" + + // storeVersion is the current slotter layout used for the billy.Database + // store. + storeVersion = 1 ) // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and @@ -392,6 +402,14 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser } p.head, p.state = head, state + // Create new slotter for pre-Osaka blob configuration. + slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config())) + + // See if we need to migrate the queue blob store after fusaka + slotter, err = tryMigrate(p.chain.Config(), slotter, queuedir) + if err != nil { + return err + } // Index all transactions on disk and delete anything unprocessable var fails []uint64 index := func(id uint64, size uint32, blob []byte) { @@ -399,7 +417,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser fails = append(fails, id) } } - slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(p.chain.Config())) store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, slotter, index) if err != nil { return err @@ -433,7 +450,7 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser // Pool initialized, attach the blob limbo to it to track blobs included // recently but not yet finalized - p.limbo, err = newLimbo(limbodir, eip4844.LatestMaxBlobsPerBlock(p.chain.Config())) + p.limbo, err = newLimbo(p.chain.Config(), limbodir) if err != nil { p.Close() return err diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index c9609e1259..e46529a241 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -1165,6 +1165,115 @@ func TestChangingSlotterSize(t *testing.T) { } } +// TestBillyMigration tests the billy migration from the default slotter to +// the PeerDAS slotter. This tests both the migration of the slotter +// as well as increasing the slotter size of the new slotter. +func TestBillyMigration(t *testing.T) { + //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) + + // Create a temporary folder for the persistent backend + storage := t.TempDir() + + os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700) + os.MkdirAll(filepath.Join(storage, limboedTransactionStore), 0700) + // Create the billy with the old slotter + oldSlotter := newSlotterEIP7594(6) + store, _ := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, oldSlotter, nil) + + // Create transactions from a few accounts. + var ( + key1, _ = crypto.GenerateKey() + key2, _ = crypto.GenerateKey() + key3, _ = crypto.GenerateKey() + + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = crypto.PubkeyToAddress(key2.PublicKey) + addr3 = crypto.PubkeyToAddress(key3.PublicKey) + + tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0) + tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 0, key2, types.BlobSidecarVersion0) + tx3 = makeMultiBlobTx(0, 1, 800, 110, 24, 0, key3, types.BlobSidecarVersion0) + + blob1, _ = rlp.EncodeToBytes(tx1) + blob2, _ = rlp.EncodeToBytes(tx2) + ) + + // Write the two safely sized txs to store. note: although the store is + // configured for a blob count of 6, it can also support around ~1mb of call + // data - all this to say that we aren't using the the absolute largest shelf + // available. + store.Put(blob1) + store.Put(blob2) + store.Close() + + // Mimic a blobpool with max blob count of 6 upgrading to a max blob count of 24. + for _, maxBlobs := range []int{6, 24} { + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + statedb.AddBalance(addr3, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + statedb.Commit(0, true, false) + + // Make custom chain config where the max blob count changes based on the loop variable. + zero := uint64(0) + config := ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + LondonBlock: big.NewInt(0), + BerlinBlock: big.NewInt(0), + CancunTime: &zero, + OsakaTime: &zero, + BlobScheduleConfig: ¶ms.BlobScheduleConfig{ + Cancun: ¶ms.BlobConfig{ + Target: maxBlobs / 2, + Max: maxBlobs, + UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction, + }, + Osaka: ¶ms.BlobConfig{ + Target: maxBlobs / 2, + Max: maxBlobs, + UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction, + }, + }, + } + chain := &testBlockChain{ + config: config, + basefee: uint256.NewInt(1050), + blobfee: uint256.NewInt(105), + statedb: statedb, + } + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { + t.Fatalf("failed to create blob pool: %v", err) + } + + // Try to add the big blob tx. In the initial iteration it should overflow + // the pool. On the subsequent iteration it should be accepted. + errs := pool.Add([]*types.Transaction{tx3}, true) + if _, ok := pool.index[addr3]; ok && maxBlobs == 6 { + t.Errorf("expected insert of oversized blob tx to fail: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0]) + } else if !ok && maxBlobs == 10 { + t.Errorf("expected insert of oversized blob tx to succeed: blobs=24, maxBlobs=%d, err=%v", maxBlobs, errs[0]) + } + + // Verify the regular two txs are always available. + if got := pool.Get(tx1.Hash()); got == nil { + t.Errorf("expected tx %s from %s in pool", tx1.Hash(), addr1) + } + if got := pool.Get(tx2.Hash()); got == nil { + t.Errorf("expected tx %s from %s in pool", tx2.Hash(), addr2) + } + + // Verify all the calculated pool internals. Interestingly, this is **not** + // a duplication of the above checks, this actually validates the verifier + // using the above already hard coded checks. + // + // Do not remove this, nor alter the above to be generic. + verifyPoolInternals(t, pool) + + pool.Close() + } +} + // TestBlobCountLimit tests the blobpool enforced limits on the max blob count. func TestBlobCountLimit(t *testing.T) { var ( diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 99d1b4ad6b..50c40c9d83 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -20,8 +20,10 @@ import ( "errors" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/billy" ) @@ -48,11 +50,21 @@ type limbo struct { } // newLimbo opens and indexes a set of limboed blob transactions. -func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) { +func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) { l := &limbo{ index: make(map[common.Hash]uint64), groups: make(map[uint64]map[uint64]common.Hash), } + + // Create new slotter for pre-Osaka blob configuration. + slotter := newSlotter(eip4844.LatestMaxBlobsPerBlock(config)) + + // See if we need to migrate the limbo after fusaka. + slotter, err := tryMigrate(config, slotter, datadir) + if err != nil { + return nil, err + } + // Index all limboed blobs on disk and delete anything unprocessable var fails []uint64 index := func(id uint64, size uint32, data []byte) { @@ -60,7 +72,7 @@ func newLimbo(datadir string, maxBlobsPerTransaction int) (*limbo, error) { fails = append(fails, id) } } - store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, newSlotter(maxBlobsPerTransaction), index) + store, err := billy.Open(billy.Options{Path: datadir, Repair: true}, slotter, index) if err != nil { return nil, err } diff --git a/core/txpool/blobpool/slotter.go b/core/txpool/blobpool/slotter.go index 84ccc0f27b..9b793e366c 100644 --- a/core/txpool/blobpool/slotter.go +++ b/core/txpool/blobpool/slotter.go @@ -16,6 +16,49 @@ package blobpool +import ( + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/billy" +) + +// tryMigrate checks if the billy needs to be migrated and migrates if needed. +// Returns a slotter that can be used for the database. +func tryMigrate(config *params.ChainConfig, slotter billy.SlotSizeFn, datadir string) (billy.SlotSizeFn, error) { + // Check if we need to migrate our blob db to the new slotter. + if config.OsakaTime != nil { + // Open the store using the version slotter to see if any version has been + // written. + var version int + index := func(_ uint64, _ uint32, blob []byte) { + version = max(version, parseSlotterVersion(blob)) + } + store, err := billy.Open(billy.Options{Path: datadir}, newVersionSlotter(), index) + if err != nil { + return nil, err + } + store.Close() + + // If the version found is less than the currently configured store version, + // perform a migration then write the updated version of the store. + if version < storeVersion { + newSlotter := newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(config)) + if err := billy.Migrate(billy.Options{Path: datadir, Repair: true}, slotter, newSlotter); err != nil { + return nil, err + } + store, err = billy.Open(billy.Options{Path: datadir}, newVersionSlotter(), nil) + if err != nil { + return nil, err + } + writeSlotterVersion(store, storeVersion) + store.Close() + } + // Set the slotter to the format now that the Osaka is active. + slotter = newSlotterEIP7594(eip4844.LatestMaxBlobsPerBlock(config)) + } + return slotter, nil +} + // newSlotter creates a helper method for the Billy datastore that returns the // individual shelf sizes used to store transactions in. // @@ -25,7 +68,7 @@ package blobpool // The slotter also creates a shelf for 0-blob transactions. Whilst those are not // allowed in the current protocol, having an empty shelf is not a relevant use // of resources, but it makes stress testing with junk transactions simpler. -func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) { +func newSlotter(maxBlobsPerTransaction int) billy.SlotSizeFn { slotsize := uint32(txAvgSize) slotsize -= uint32(blobSize) // underflows, it's ok, will overflow back in the first return @@ -36,3 +79,42 @@ func newSlotter(maxBlobsPerTransaction int) func() (uint32, bool) { return slotsize, finished } } + +// newSlotterEIP7594 creates a different slotter for EIP-7594 transactions. +// EIP-7594 (PeerDAS) changes the average transaction size which means the current +// static 4KB average size is not enough anymore. +// This slotter adds a dynamic overhead component to the slotter, which also +// captures the notion that blob transactions with more blobs are also more likely to +// to have more calldata. +func newSlotterEIP7594(maxBlobsPerTransaction int) billy.SlotSizeFn { + slotsize := uint32(txAvgSize) + slotsize -= uint32(blobSize) + txBlobOverhead // underflows, it's ok, will overflow back in the first return + + return func() (size uint32, done bool) { + slotsize += blobSize + txBlobOverhead + finished := slotsize > uint32(maxBlobsPerTransaction)*(blobSize+txBlobOverhead)+txMaxSize + + return slotsize, finished + } +} + +// newVersionSlotter creates a slotter with a single 8 byte shelf to store +// version metadata in. +func newVersionSlotter() billy.SlotSizeFn { + return func() (size uint32, done bool) { + return 8, true + } +} + +// parseSlotterVersion will parse the slotter's version from a given data blob. +func parseSlotterVersion(blob []byte) int { + if len(blob) > 0 { + return int(blob[0]) + } + return 0 +} + +// writeSlotterVersion writes the current slotter version into the store. +func writeSlotterVersion(store billy.Database, version int) { + store.Put([]byte{byte(version)}) +} diff --git a/core/txpool/blobpool/slotter_test.go b/core/txpool/blobpool/slotter_test.go index 8d46f47d2c..e4cf232f4e 100644 --- a/core/txpool/blobpool/slotter_test.go +++ b/core/txpool/blobpool/slotter_test.go @@ -16,7 +16,9 @@ package blobpool -import "testing" +import ( + "testing" +) // Tests that the slotter creates the expected database shelves. func TestNewSlotter(t *testing.T) { @@ -58,3 +60,44 @@ func TestNewSlotter(t *testing.T) { } } } + +// Tests that the slotter creates the expected database shelves. +func TestNewSlotterEIP7594(t *testing.T) { + // Generate the database shelve sizes + slotter := newSlotterEIP7594(6) + + var shelves []uint32 + for { + shelf, done := slotter() + shelves = append(shelves, shelf) + if done { + break + } + } + // Compare the database shelves to the expected ones + want := []uint32{ + 0*blobSize + 0*txBlobOverhead + txAvgSize, // 0 blob + some expected tx infos + 1*blobSize + 1*txBlobOverhead + txAvgSize, // 1 blob + some expected tx infos + 2*blobSize + 2*txBlobOverhead + txAvgSize, // 2 blob + some expected tx infos (could be fewer blobs and more tx data) + 3*blobSize + 3*txBlobOverhead + txAvgSize, // 3 blob + some expected tx infos (could be fewer blobs and more tx data) + 4*blobSize + 4*txBlobOverhead + txAvgSize, // 4 blob + some expected tx infos (could be fewer blobs and more tx data) + 5*blobSize + 5*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 6*blobSize + 6*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 7*blobSize + 7*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 8*blobSize + 8*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 9*blobSize + 9*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 10*blobSize + 10*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 11*blobSize + 11*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 12*blobSize + 12*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 13*blobSize + 13*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos < 4 blobs + max tx metadata size + 14*blobSize + 14*txBlobOverhead + txAvgSize, // 1-6 blobs + unexpectedly large tx infos >= 4 blobs + max tx metadata size + } + if len(shelves) != len(want) { + t.Errorf("shelves count mismatch: have %d, want %d", len(shelves), len(want)) + } + for i := 0; i < len(shelves) && i < len(want); i++ { + if shelves[i] != want[i] { + t.Errorf("shelf %d mismatch: have %d, want %d", i, shelves[i], want[i]) + } + } +} diff --git a/crypto/kzg4844/kzg4844.go b/crypto/kzg4844/kzg4844.go index 9da2386368..3ccc204838 100644 --- a/crypto/kzg4844/kzg4844.go +++ b/crypto/kzg4844/kzg4844.go @@ -34,10 +34,10 @@ var ( blobT = reflect.TypeFor[Blob]() commitmentT = reflect.TypeFor[Commitment]() proofT = reflect.TypeFor[Proof]() - - CellProofsPerBlob = 128 ) +const CellProofsPerBlob = 128 + // Blob represents a 4844 data blob. type Blob [131072]byte diff --git a/go.mod b/go.mod index 28c9af9259..058fe3bd8e 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/graph-gophers/graphql-go v1.3.0 github.com/hashicorp/go-bexpr v0.1.10 - github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 + github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db github.com/holiman/bloomfilter/v2 v2.0.3 github.com/holiman/uint256 v1.3.2 github.com/huin/goupnp v1.3.0 diff --git a/go.sum b/go.sum index 53913262ae..16518fdf43 100644 --- a/go.sum +++ b/go.sum @@ -188,8 +188,8 @@ github.com/graph-gophers/graphql-go v1.3.0 h1:Eb9x/q6MFpCLz7jBCiP/WTxjSDrYLR1QY4 github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= -github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= -github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= +github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db h1:IZUYC/xb3giYwBLMnr8d0TGTzPKFGNTCGgGLoyeX330= +github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db/go.mod h1:xTEYN9KCHxuYHs+NmrmzFcnvHMzLLNiGFafCb1n3Mfg= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=