diff --git a/core/blockchain.go b/core/blockchain.go index b5d658c079..1a8d6a9368 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2113,10 +2113,12 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, var ( err error startTime = time.Now() - statedb *state.StateDB interrupt atomic.Bool sdb = state.NewDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps) makeWitness bool + + throwaway *state.StateDB // StateDB for speculative transaction pre-executor + statedb *state.StateDB // StateDB for sequential transaction executor ) if bc.chainConfig.IsByzantium(block.Number()) && (config.StatelessSelfValidation || config.MakeWitness) { makeWitness = true @@ -2129,6 +2131,17 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, // execution witness. if bc.chainConfig.IsByzantium(block.Number()) { sdb = sdb.EnablePrefetch(makeWitness) + + // Explicitly terminate all the background prefetcher. This is essential + // to prevent goroutine leaks. + defer func() { + if statedb != nil { + statedb.StopPrefetcher() + } + if throwaway != nil { + throwaway.StopPrefetcher() + } + }() } if bc.cfg.NoPrefetch { statedb, err = state.New(parentRoot, sdb) @@ -2144,7 +2157,7 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, if err != nil { return nil, err } - throwaway, err := state.NewWithReader(parentRoot, sdb, prefetch) + throwaway, err = state.NewWithReader(parentRoot, sdb, prefetch) if err != nil { return nil, err } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 82e9305bdc..9fda0e2f0a 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -424,9 +424,23 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { return state.New(root, state.NewDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps)) } -// StateWithPrefetching returns a new mutable state based on a particular point in time. -func (bc *BlockChain) StateWithPrefetching(root common.Hash) (*state.StateDB, error) { - return state.New(root, state.NewDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps).EnablePrefetch(false)) +// StateConfig specifies the configuration for initializating the stateDB. +type StateConfig struct { + Prefetch bool + PrefetchRead bool + WithSnapshot bool +} + +// StateWithConfig returns a new mutable state based on a particular point in time. +func (bc *BlockChain) StateWithConfig(root common.Hash, config StateConfig) (*state.StateDB, error) { + sdb := state.NewDatabase(bc.triedb, bc.codedb) + if config.WithSnapshot { + sdb = sdb.WithSnapshot(bc.snaps) + } + if config.Prefetch { + sdb = sdb.EnablePrefetch(config.PrefetchRead) + } + return state.New(root, sdb) } // HistoricState returns a historic state specified by the given root. diff --git a/core/state/database_hasher.go b/core/state/database_hasher.go index 5936f24dbf..7fcf3edea3 100644 --- a/core/state/database_hasher.go +++ b/core/state/database_hasher.go @@ -88,6 +88,9 @@ type Prefetcher interface { // PrefetchStorage schedules the storage slot for prefetching. PrefetchStorage(addr common.Address, keys []common.Hash, read bool) + + // TermPrefetch terminates all the background prefetching activities. + TermPrefetch() } // WitnessCollector is an optional extension implemented by hashers that can @@ -137,3 +140,4 @@ func (n *noopHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[common. return common.Hash{}, trienode.NewMergedNodeSet(), make(map[common.Address]Hashes), nil } func (n *noopHasher) Copy() Hasher { return &noopHasher{} } +func (n *noopHasher) Close() {} diff --git a/core/state/database_hasher_binary.go b/core/state/database_hasher_binary.go index d1b3823fe1..86f33cfc5c 100644 --- a/core/state/database_hasher_binary.go +++ b/core/state/database_hasher_binary.go @@ -238,11 +238,6 @@ func (h *binaryHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[commo return root, nodes, nil, nil } -// Close terminates all prefetcher goroutines. Safe to call multiple times. -func (h *binaryHasher) Close() { - h.trie.term() -} - // Copy implements Hasher, returning a deep-copied hasher instance. func (h *binaryHasher) Copy() Hasher { return &binaryHasher{ @@ -287,3 +282,8 @@ func (h *binaryHasher) PrefetchStorage(addr common.Address, keys []common.Hash, } h.trie.prefetchStorage(addr, keys, read) } + +// TermPrefetch terminates all prefetcher goroutines. Safe to call multiple times. +func (h *binaryHasher) TermPrefetch() { + h.trie.term() +} diff --git a/core/state/database_hasher_binary_test.go b/core/state/database_hasher_binary_test.go index 59f61c1b02..13fe28fc75 100644 --- a/core/state/database_hasher_binary_test.go +++ b/core/state/database_hasher_binary_test.go @@ -34,7 +34,7 @@ func newTestBinaryHasher(t *testing.T, db *triedb.Database, root common.Hash, cf if err != nil { t.Fatal(err) } - t.Cleanup(func() { h.Close() }) + t.Cleanup(func() { h.TermPrefetch() }) return h } @@ -59,7 +59,7 @@ func commitAndReopenBinary(t *testing.T, h *binaryHasher, cfg hasherTestConfig) if err != nil { t.Fatal(err) } - t.Cleanup(func() { h2.Close() }) + t.Cleanup(func() { h2.TermPrefetch() }) return h2 } @@ -209,7 +209,7 @@ func TestBinaryHasherCopy(t *testing.T) { origRoot := h.Hash() cpy := h.Copy() - defer cpy.(*binaryHasher).Close() + defer cpy.(*binaryHasher).TermPrefetch() // Mutate the copy: delete slot3, add slot2 with new value. if err := cpy.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3, hasherSlot2}, []common.Hash{{}, hasherVal3}); err != nil { diff --git a/core/state/database_hasher_merkle.go b/core/state/database_hasher_merkle.go index 1d131ef4af..cd99a4cc9a 100644 --- a/core/state/database_hasher_merkle.go +++ b/core/state/database_hasher_merkle.go @@ -345,11 +345,6 @@ func (h *merkleHasher) Hash() common.Hash { // the resulting state root hash, along with the set of dirty trie nodes // generated by the updates. func (h *merkleHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[common.Address]Hashes, error) { - // Explicitly terminate all resolved tries. Some of them may not be - // terminated due to read-only prefetching. This is essential to - // prevent goroutine leaks. - h.Close() - var ( eg errgroup.Group root common.Hash @@ -408,17 +403,6 @@ func (h *merkleHasher) Copy() Hasher { return cpy } -// Close terminates all prefetcher goroutines. Safe to call multiple times. -func (h *merkleHasher) Close() { - h.acctTrie.term() - for _, tr := range h.storageTries { - tr.term() - } - for _, tr := range h.deletedTries { - tr.term() - } -} - // ProveAccount implements Prover, constructing a proof for the given account. func (h *merkleHasher) ProveAccount(addr common.Address, proofDb ethdb.KeyValueWriter) error { return h.acctTrie.Prove(crypto.Keccak256(addr.Bytes()), proofDb) @@ -461,9 +445,23 @@ func (h *merkleHasher) PrefetchStorage(addr common.Address, keys []common.Hash, if !h.prefetch { return } + if !h.prefetchRead && read { + return + } tr, err := h.openStorageTrie(addr, true) if err != nil { return } tr.prefetchStorage(addr, keys, read) } + +// TermPrefetch terminates all prefetcher goroutines. Safe to call multiple times. +func (h *merkleHasher) TermPrefetch() { + h.acctTrie.term() + for _, tr := range h.storageTries { + tr.term() + } + for _, tr := range h.deletedTries { + tr.term() + } +} diff --git a/core/state/database_hasher_merkle_test.go b/core/state/database_hasher_merkle_test.go index e60fd18bbb..8e15d734bc 100644 --- a/core/state/database_hasher_merkle_test.go +++ b/core/state/database_hasher_merkle_test.go @@ -80,7 +80,7 @@ func newTestHasher(t *testing.T, db *triedb.Database, root common.Hash, cfg hash if err != nil { t.Fatal(err) } - t.Cleanup(func() { h.Close() }) + t.Cleanup(func() { h.TermPrefetch() }) return h } @@ -105,7 +105,7 @@ func commitAndReopen(t *testing.T, h *merkleHasher, cfg hasherTestConfig) *merkl if err != nil { t.Fatal(err) } - t.Cleanup(func() { h2.Close() }) + t.Cleanup(func() { h2.TermPrefetch() }) return h2 } @@ -524,7 +524,7 @@ func TestMerkleHasherCopy(t *testing.T) { origRoot := h.Hash() cpy := h.Copy() - defer cpy.(*merkleHasher).Close() + defer cpy.(*merkleHasher).TermPrefetch() // Mutate the copy: delete slot3, add slot2 with new value. if err := cpy.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3, hasherSlot2}, []common.Hash{{}, hasherVal3}); err != nil { @@ -590,7 +590,7 @@ func TestMerkleHasherWitness(t *testing.T) { if err != nil { t.Fatal(err) } - defer prover.Close() + defer prover.TermPrefetch() // Collect all expected proof nodes into a single set. The union of // account proofs (addr1, addr2) and storage proofs (addr1/slot1) diff --git a/core/state/statedb.go b/core/state/statedb.go index 01fd13fb59..716f262bf5 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1020,7 +1020,6 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag } s.DatabaseCommits = time.Since(start) - // The reader/hasher update must be performed as the final step s.reader, _ = s.db.Reader(s.originalRoot) s.hasher, _ = s.db.Hasher(s.originalRoot) return ret, nil @@ -1142,3 +1141,15 @@ func (s *StateDB) Witness() *stateless.Witness { func (s *StateDB) AccessEvents() *AccessEvents { return s.accessEvents } + +// StopPrefetcher terminates all the background prefetching activities. +func (s *StateDB) StopPrefetcher() { + if s.hasher == nil { + return + } + prefetch, ok := s.hasher.(Prefetcher) + if !ok { + return + } + prefetch.TermPrefetch() +} diff --git a/core/state/stateupdate.go b/core/state/stateupdate.go index ca6d5382da..7659e9ff18 100644 --- a/core/state/stateupdate.go +++ b/core/state/stateupdate.go @@ -304,10 +304,10 @@ func (sc *stateUpdate) encodeBinary() (map[common.Hash][]byte, map[common.Addres func (sc *stateUpdate) stateSet(isMerkle bool) (*triedb.StateSet, error) { var ( err error - accounts = make(map[common.Hash][]byte) - storages = make(map[common.Hash]map[common.Hash][]byte) - accountOrigin = make(map[common.Address][]byte) - storageOrigin = make(map[common.Address]map[common.Hash][]byte) + accounts map[common.Hash][]byte + storages map[common.Hash]map[common.Hash][]byte + accountOrigin map[common.Address][]byte + storageOrigin map[common.Address]map[common.Hash][]byte ) if isMerkle { accounts, accountOrigin, storages, storageOrigin, err = sc.encodeMerkle() diff --git a/miner/worker.go b/miner/worker.go index b75241c20a..7890e8e8e2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -80,6 +80,11 @@ func (env *environment) txFitsSize(tx *types.Transaction) bool { return env.size+tx.Size() < params.MaxBlockSize-maxBlockSizeBufferZone } +// discard terminates the background threads before discarding it. +func (env *environment) discard() { + env.state.StopPrefetcher() +} + const ( commitInterruptNone int32 = iota commitInterruptNewHead @@ -142,6 +147,8 @@ func (miner *Miner) generateWork(ctx context.Context, genParam *generateParams, if err != nil { return &newPayloadResult{err: err} } + defer work.discard() + // Check withdrawals fit max block size. // Due to the cap on withdrawal count, this can actually never happen, but we still need to // check to ensure the CL notices there's a problem if the withdrawal cap is ever lifted. @@ -317,7 +324,10 @@ func (miner *Miner) prepareWork(ctx context.Context, genParams *generateParams, // makeEnv creates a new environment for the sealing block. func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address, witness bool) (*environment, error) { // Retrieve the parent state to execute on top. - state, err := miner.chain.StateAt(parent.Root) + state, err := miner.chain.StateWithConfig(parent.Root, core.StateConfig{ + Prefetch: true, + PrefetchRead: witness, + }) if err != nil { return nil, err }