mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-12 01:41:36 +00:00
core: fix memory leaking
This commit is contained in:
parent
aec9c18432
commit
533d2109d5
10 changed files with 89 additions and 39 deletions
|
|
@ -2113,10 +2113,12 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash,
|
|||
var (
|
||||
err error
|
||||
startTime = time.Now()
|
||||
statedb *state.StateDB
|
||||
interrupt atomic.Bool
|
||||
sdb = state.NewDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps)
|
||||
makeWitness bool
|
||||
|
||||
throwaway *state.StateDB // StateDB for speculative transaction pre-executor
|
||||
statedb *state.StateDB // StateDB for sequential transaction executor
|
||||
)
|
||||
if bc.chainConfig.IsByzantium(block.Number()) && (config.StatelessSelfValidation || config.MakeWitness) {
|
||||
makeWitness = true
|
||||
|
|
@ -2129,6 +2131,17 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash,
|
|||
// execution witness.
|
||||
if bc.chainConfig.IsByzantium(block.Number()) {
|
||||
sdb = sdb.EnablePrefetch(makeWitness)
|
||||
|
||||
// Explicitly terminate all the background prefetcher. This is essential
|
||||
// to prevent goroutine leaks.
|
||||
defer func() {
|
||||
if statedb != nil {
|
||||
statedb.StopPrefetcher()
|
||||
}
|
||||
if throwaway != nil {
|
||||
throwaway.StopPrefetcher()
|
||||
}
|
||||
}()
|
||||
}
|
||||
if bc.cfg.NoPrefetch {
|
||||
statedb, err = state.New(parentRoot, sdb)
|
||||
|
|
@ -2144,7 +2157,7 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
throwaway, err := state.NewWithReader(parentRoot, sdb, prefetch)
|
||||
throwaway, err = state.NewWithReader(parentRoot, sdb, prefetch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -424,9 +424,23 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
|
|||
return state.New(root, state.NewDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps))
|
||||
}
|
||||
|
||||
// StateWithPrefetching returns a new mutable state based on a particular point in time.
|
||||
func (bc *BlockChain) StateWithPrefetching(root common.Hash) (*state.StateDB, error) {
|
||||
return state.New(root, state.NewDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps).EnablePrefetch(false))
|
||||
// StateConfig specifies the configuration for initializating the stateDB.
|
||||
type StateConfig struct {
|
||||
Prefetch bool
|
||||
PrefetchRead bool
|
||||
WithSnapshot bool
|
||||
}
|
||||
|
||||
// StateWithConfig returns a new mutable state based on a particular point in time.
|
||||
func (bc *BlockChain) StateWithConfig(root common.Hash, config StateConfig) (*state.StateDB, error) {
|
||||
sdb := state.NewDatabase(bc.triedb, bc.codedb)
|
||||
if config.WithSnapshot {
|
||||
sdb = sdb.WithSnapshot(bc.snaps)
|
||||
}
|
||||
if config.Prefetch {
|
||||
sdb = sdb.EnablePrefetch(config.PrefetchRead)
|
||||
}
|
||||
return state.New(root, sdb)
|
||||
}
|
||||
|
||||
// HistoricState returns a historic state specified by the given root.
|
||||
|
|
|
|||
|
|
@ -88,6 +88,9 @@ type Prefetcher interface {
|
|||
|
||||
// PrefetchStorage schedules the storage slot for prefetching.
|
||||
PrefetchStorage(addr common.Address, keys []common.Hash, read bool)
|
||||
|
||||
// TermPrefetch terminates all the background prefetching activities.
|
||||
TermPrefetch()
|
||||
}
|
||||
|
||||
// WitnessCollector is an optional extension implemented by hashers that can
|
||||
|
|
@ -137,3 +140,4 @@ func (n *noopHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[common.
|
|||
return common.Hash{}, trienode.NewMergedNodeSet(), make(map[common.Address]Hashes), nil
|
||||
}
|
||||
func (n *noopHasher) Copy() Hasher { return &noopHasher{} }
|
||||
func (n *noopHasher) Close() {}
|
||||
|
|
|
|||
|
|
@ -238,11 +238,6 @@ func (h *binaryHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[commo
|
|||
return root, nodes, nil, nil
|
||||
}
|
||||
|
||||
// Close terminates all prefetcher goroutines. Safe to call multiple times.
|
||||
func (h *binaryHasher) Close() {
|
||||
h.trie.term()
|
||||
}
|
||||
|
||||
// Copy implements Hasher, returning a deep-copied hasher instance.
|
||||
func (h *binaryHasher) Copy() Hasher {
|
||||
return &binaryHasher{
|
||||
|
|
@ -287,3 +282,8 @@ func (h *binaryHasher) PrefetchStorage(addr common.Address, keys []common.Hash,
|
|||
}
|
||||
h.trie.prefetchStorage(addr, keys, read)
|
||||
}
|
||||
|
||||
// TermPrefetch terminates all prefetcher goroutines. Safe to call multiple times.
|
||||
func (h *binaryHasher) TermPrefetch() {
|
||||
h.trie.term()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ func newTestBinaryHasher(t *testing.T, db *triedb.Database, root common.Hash, cf
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { h.Close() })
|
||||
t.Cleanup(func() { h.TermPrefetch() })
|
||||
return h
|
||||
}
|
||||
|
||||
|
|
@ -59,7 +59,7 @@ func commitAndReopenBinary(t *testing.T, h *binaryHasher, cfg hasherTestConfig)
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { h2.Close() })
|
||||
t.Cleanup(func() { h2.TermPrefetch() })
|
||||
return h2
|
||||
}
|
||||
|
||||
|
|
@ -209,7 +209,7 @@ func TestBinaryHasherCopy(t *testing.T) {
|
|||
origRoot := h.Hash()
|
||||
|
||||
cpy := h.Copy()
|
||||
defer cpy.(*binaryHasher).Close()
|
||||
defer cpy.(*binaryHasher).TermPrefetch()
|
||||
|
||||
// Mutate the copy: delete slot3, add slot2 with new value.
|
||||
if err := cpy.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3, hasherSlot2}, []common.Hash{{}, hasherVal3}); err != nil {
|
||||
|
|
|
|||
|
|
@ -345,11 +345,6 @@ func (h *merkleHasher) Hash() common.Hash {
|
|||
// the resulting state root hash, along with the set of dirty trie nodes
|
||||
// generated by the updates.
|
||||
func (h *merkleHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[common.Address]Hashes, error) {
|
||||
// Explicitly terminate all resolved tries. Some of them may not be
|
||||
// terminated due to read-only prefetching. This is essential to
|
||||
// prevent goroutine leaks.
|
||||
h.Close()
|
||||
|
||||
var (
|
||||
eg errgroup.Group
|
||||
root common.Hash
|
||||
|
|
@ -408,17 +403,6 @@ func (h *merkleHasher) Copy() Hasher {
|
|||
return cpy
|
||||
}
|
||||
|
||||
// Close terminates all prefetcher goroutines. Safe to call multiple times.
|
||||
func (h *merkleHasher) Close() {
|
||||
h.acctTrie.term()
|
||||
for _, tr := range h.storageTries {
|
||||
tr.term()
|
||||
}
|
||||
for _, tr := range h.deletedTries {
|
||||
tr.term()
|
||||
}
|
||||
}
|
||||
|
||||
// ProveAccount implements Prover, constructing a proof for the given account.
|
||||
func (h *merkleHasher) ProveAccount(addr common.Address, proofDb ethdb.KeyValueWriter) error {
|
||||
return h.acctTrie.Prove(crypto.Keccak256(addr.Bytes()), proofDb)
|
||||
|
|
@ -461,9 +445,23 @@ func (h *merkleHasher) PrefetchStorage(addr common.Address, keys []common.Hash,
|
|||
if !h.prefetch {
|
||||
return
|
||||
}
|
||||
if !h.prefetchRead && read {
|
||||
return
|
||||
}
|
||||
tr, err := h.openStorageTrie(addr, true)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tr.prefetchStorage(addr, keys, read)
|
||||
}
|
||||
|
||||
// TermPrefetch terminates all prefetcher goroutines. Safe to call multiple times.
|
||||
func (h *merkleHasher) TermPrefetch() {
|
||||
h.acctTrie.term()
|
||||
for _, tr := range h.storageTries {
|
||||
tr.term()
|
||||
}
|
||||
for _, tr := range h.deletedTries {
|
||||
tr.term()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ func newTestHasher(t *testing.T, db *triedb.Database, root common.Hash, cfg hash
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { h.Close() })
|
||||
t.Cleanup(func() { h.TermPrefetch() })
|
||||
return h
|
||||
}
|
||||
|
||||
|
|
@ -105,7 +105,7 @@ func commitAndReopen(t *testing.T, h *merkleHasher, cfg hasherTestConfig) *merkl
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { h2.Close() })
|
||||
t.Cleanup(func() { h2.TermPrefetch() })
|
||||
return h2
|
||||
}
|
||||
|
||||
|
|
@ -524,7 +524,7 @@ func TestMerkleHasherCopy(t *testing.T) {
|
|||
origRoot := h.Hash()
|
||||
|
||||
cpy := h.Copy()
|
||||
defer cpy.(*merkleHasher).Close()
|
||||
defer cpy.(*merkleHasher).TermPrefetch()
|
||||
|
||||
// Mutate the copy: delete slot3, add slot2 with new value.
|
||||
if err := cpy.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3, hasherSlot2}, []common.Hash{{}, hasherVal3}); err != nil {
|
||||
|
|
@ -590,7 +590,7 @@ func TestMerkleHasherWitness(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer prover.Close()
|
||||
defer prover.TermPrefetch()
|
||||
|
||||
// Collect all expected proof nodes into a single set. The union of
|
||||
// account proofs (addr1, addr2) and storage proofs (addr1/slot1)
|
||||
|
|
|
|||
|
|
@ -1020,7 +1020,6 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag
|
|||
}
|
||||
s.DatabaseCommits = time.Since(start)
|
||||
|
||||
// The reader/hasher update must be performed as the final step
|
||||
s.reader, _ = s.db.Reader(s.originalRoot)
|
||||
s.hasher, _ = s.db.Hasher(s.originalRoot)
|
||||
return ret, nil
|
||||
|
|
@ -1142,3 +1141,15 @@ func (s *StateDB) Witness() *stateless.Witness {
|
|||
func (s *StateDB) AccessEvents() *AccessEvents {
|
||||
return s.accessEvents
|
||||
}
|
||||
|
||||
// StopPrefetcher terminates all the background prefetching activities.
|
||||
func (s *StateDB) StopPrefetcher() {
|
||||
if s.hasher == nil {
|
||||
return
|
||||
}
|
||||
prefetch, ok := s.hasher.(Prefetcher)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
prefetch.TermPrefetch()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -304,10 +304,10 @@ func (sc *stateUpdate) encodeBinary() (map[common.Hash][]byte, map[common.Addres
|
|||
func (sc *stateUpdate) stateSet(isMerkle bool) (*triedb.StateSet, error) {
|
||||
var (
|
||||
err error
|
||||
accounts = make(map[common.Hash][]byte)
|
||||
storages = make(map[common.Hash]map[common.Hash][]byte)
|
||||
accountOrigin = make(map[common.Address][]byte)
|
||||
storageOrigin = make(map[common.Address]map[common.Hash][]byte)
|
||||
accounts map[common.Hash][]byte
|
||||
storages map[common.Hash]map[common.Hash][]byte
|
||||
accountOrigin map[common.Address][]byte
|
||||
storageOrigin map[common.Address]map[common.Hash][]byte
|
||||
)
|
||||
if isMerkle {
|
||||
accounts, accountOrigin, storages, storageOrigin, err = sc.encodeMerkle()
|
||||
|
|
|
|||
|
|
@ -80,6 +80,11 @@ func (env *environment) txFitsSize(tx *types.Transaction) bool {
|
|||
return env.size+tx.Size() < params.MaxBlockSize-maxBlockSizeBufferZone
|
||||
}
|
||||
|
||||
// discard terminates the background threads before discarding it.
|
||||
func (env *environment) discard() {
|
||||
env.state.StopPrefetcher()
|
||||
}
|
||||
|
||||
const (
|
||||
commitInterruptNone int32 = iota
|
||||
commitInterruptNewHead
|
||||
|
|
@ -142,6 +147,8 @@ func (miner *Miner) generateWork(ctx context.Context, genParam *generateParams,
|
|||
if err != nil {
|
||||
return &newPayloadResult{err: err}
|
||||
}
|
||||
defer work.discard()
|
||||
|
||||
// Check withdrawals fit max block size.
|
||||
// Due to the cap on withdrawal count, this can actually never happen, but we still need to
|
||||
// check to ensure the CL notices there's a problem if the withdrawal cap is ever lifted.
|
||||
|
|
@ -317,7 +324,10 @@ func (miner *Miner) prepareWork(ctx context.Context, genParams *generateParams,
|
|||
// makeEnv creates a new environment for the sealing block.
|
||||
func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address, witness bool) (*environment, error) {
|
||||
// Retrieve the parent state to execute on top.
|
||||
state, err := miner.chain.StateAt(parent.Root)
|
||||
state, err := miner.chain.StateWithConfig(parent.Root, core.StateConfig{
|
||||
Prefetch: true,
|
||||
PrefetchRead: witness,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue