diff --git a/accounts/keystore/keystore_test.go b/accounts/keystore/keystore_test.go index 8905714521..43bb61ddf2 100644 --- a/accounts/keystore/keystore_test.go +++ b/accounts/keystore/keystore_test.go @@ -406,14 +406,12 @@ func TestImportRace(t *testing.T) { _, ks2 := tmpKeyStore(t) var atom atomic.Uint32 var wg sync.WaitGroup - wg.Add(2) for i := 0; i < 2; i++ { - go func() { - defer wg.Done() + wg.Go(func() { if _, err := ks2.Import(json, "new", "new"); err != nil { atom.Add(1) } - }() + }) } wg.Wait() if atom.Load() != 1 { diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 3030cea640..ce3300adcc 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -263,23 +263,21 @@ func TestHasherConcurrency(t *testing.T) { defer pool.Drain(0) wg := sync.WaitGroup{} cycles := 100 - wg.Add(maxproccnt * cycles) errc := make(chan error) for p := 0; p < maxproccnt; p++ { for i := 0; i < cycles; i++ { - go func() { + wg.Go(func() { bmt := New(pool) n := rand.Intn(4096) tdata := testDataReader(n) data := make([]byte, n) tdata.Read(data) err := testHasherCorrectness(bmt, hasher, data, n, 128) - wg.Done() if err != nil { errc <- err } - }() + }) } } go func() { @@ -386,18 +384,16 @@ func benchmarkBMTBaseline(n int, t *testing.B) { for i := 0; i < t.N; i++ { count := int32((n-1)/hasher().Size() + 1) wg := sync.WaitGroup{} - wg.Add(maxproccnt) var i int32 for j := 0; j < maxproccnt; j++ { - go func() { - defer wg.Done() + wg.Go(func() { h := hasher() for atomic.AddInt32(&i, 1) < count { h.Reset() h.Write(data) h.Sum(nil) } - }() + }) } wg.Wait() } @@ -437,15 +433,13 @@ func benchmarkHasherReuse(poolsize, n int, t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { wg := sync.WaitGroup{} - wg.Add(cycles) for j := 0; j < cycles; j++ { - bmt := New(pool) - go func() { - defer wg.Done() + wg.Go(func() { + bmt := New(pool) bmt.Reset() bmt.Write(data) bmt.Sum(nil) - }() + }) } wg.Wait() } diff --git a/common/prque/lazyqueue_test.go b/common/prque/lazyqueue_test.go index 090d7120e3..a9c493d537 100644 --- a/common/prque/lazyqueue_test.go +++ b/common/prque/lazyqueue_test.go @@ -79,9 +79,7 @@ func TestLazyQueue(t *testing.T) { stopCh = make(chan chan struct{}) ) defer wg.Wait() - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { for { select { case <-clock.After(testQueueRefresh): @@ -92,7 +90,7 @@ func TestLazyQueue(t *testing.T) { return } } - }() + }) for c := 0; c < testSteps; c++ { i := rand.Intn(testItems) diff --git a/consensus/XDPoS/engines/engine_v2/vote.go b/consensus/XDPoS/engines/engine_v2/vote.go index c7a8cdec82..8f9aa95874 100644 --- a/consensus/XDPoS/engines/engine_v2/vote.go +++ b/consensus/XDPoS/engines/engine_v2/vote.go @@ -164,10 +164,9 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has emptySigner := common.Address{} // Filter out non-Master nodes signatures var wg sync.WaitGroup - wg.Add(len(votes)) - for h, vote := range votes { - go func(hash common.Hash, v *types.Vote) { - defer wg.Done() + for _, vote := range votes { + wg.Go(func() { + v := vote.(*types.Vote) signerAddress := v.GetSigner() if signerAddress != emptySigner { // verify that signer belongs to the final masternodes, we have not do so in previous steps @@ -199,7 +198,7 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has return } v.SetSigner(masterNode) - }(h, vote.(*types.Vote)) + }) } wg.Wait() elapsed := time.Since(start) diff --git a/console/console.go b/console/console.go index b6dc490aa5..81cf828db7 100644 --- a/console/console.go +++ b/console/console.go @@ -119,8 +119,7 @@ func New(config Config) (*Console, error) { return nil, err } - console.wg.Add(1) - go console.interruptHandler() + console.wg.Go(console.interruptHandler) return console, nil } @@ -341,8 +340,6 @@ func (c *Console) Evaluate(statement string) { // interruptHandler runs in its own goroutine and waits for signals. // When a signal is received, it interrupts the JS interpreter. func (c *Console) interruptHandler() { - defer c.wg.Done() - // During Interactive, liner inhibits the signal while it is prompting for // input. However, the signal will be received while evaluating JS. // diff --git a/core/blockchain.go b/core/blockchain.go index 16ed45e526..122253d1df 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -334,8 +334,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } // Start future block processor. - bc.wg.Add(1) - go bc.futureBlocksLoop() + bc.wg.Go(bc.futureBlocksLoop) return bc, nil } @@ -2580,8 +2579,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { // futureBlocksLoop processes the 'future block' queue. func (bc *BlockChain) futureBlocksLoop() { - defer bc.wg.Done() - futureTimer := time.NewTicker(100 * time.Millisecond) defer futureTimer.Stop() diff --git a/core/bloombits/scheduler.go b/core/bloombits/scheduler.go index 6449c7465a..2dc14ed049 100644 --- a/core/bloombits/scheduler.go +++ b/core/bloombits/scheduler.go @@ -62,9 +62,8 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by pend := make(chan uint64, cap(dist)) // Start the pipeline schedulers to forward between user -> distributor -> user - wg.Add(2) - go s.scheduleRequests(sections, dist, pend, quit, wg) - go s.scheduleDeliveries(pend, done, quit, wg) + wg.Go(func() { s.scheduleRequests(sections, dist, pend, quit) }) + wg.Go(func() { s.scheduleDeliveries(pend, done, quit) }) } // reset cleans up any leftovers from previous runs. This is required before a @@ -84,9 +83,8 @@ func (s *scheduler) reset() { // scheduleRequests reads section retrieval requests from the input channel, // deduplicates the stream and pushes unique retrieval tasks into the distribution // channel for a database or network layer to honour. -func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) { +func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}) { // Clean up the goroutine and pipeline when done - defer wg.Done() defer close(pend) // Keep reading and scheduling section requests @@ -131,9 +129,8 @@ func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend // scheduleDeliveries reads section acceptance notifications and waits for them // to be delivered, pushing them into the output data buffer. -func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { +func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}) { // Clean up the goroutine and pipeline when done - defer wg.Done() defer close(done) // Keep reading notifications and scheduling deliveries diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 24854dd7ee..c28854afaa 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -182,16 +182,10 @@ func TestCopy(t *testing.T) { } // Finalise the changes on all concurrently - finalise := func(wg *sync.WaitGroup, db *StateDB) { - defer wg.Done() - db.Finalise(true) - } - var wg sync.WaitGroup - wg.Add(3) - go finalise(&wg, orig) - go finalise(&wg, copy) - go finalise(&wg, ccopy) + wg.Go(func() { orig.Finalise(true) }) + wg.Go(func() { copy.Finalise(true) }) + wg.Go(func() { ccopy.Finalise(true) }) wg.Wait() // Verify that the three states have been updated independently diff --git a/core/state_processor.go b/core/state_processor.go index 7b6892231b..2a8d027686 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -585,19 +585,17 @@ func InitSignerInTransactions(config *params.ChainConfig, header *types.Header, chunkSize++ } wg := sync.WaitGroup{} - wg.Add(nWorker) for i := 0; i < nWorker; i++ { from := i * chunkSize to := from + chunkSize if to > txs.Len() { to = txs.Len() } - go func(from int, to int) { + wg.Go(func() { for j := from; j < to; j++ { types.CacheSigner(signer, txs[j]) } - wg.Done() - }(from, to) + }) } wg.Wait() } diff --git a/core/txpool/lending_pool.go b/core/txpool/lending_pool.go index 6c1019b7c2..3e46a095d6 100644 --- a/core/txpool/lending_pool.go +++ b/core/txpool/lending_pool.go @@ -182,8 +182,7 @@ func NewLendingPool(chainconfig *params.ChainConfig, chain blockChainLending) *L pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return - pool.wg.Add(1) - go pool.loop() + pool.wg.Go(pool.loop) return pool } @@ -192,8 +191,6 @@ func NewLendingPool(chainconfig *params.ChainConfig, chain blockChainLending) *L // outside blockchain events as well as for various reporting and transaction // eviction events. func (pool *LendingPool) loop() { - defer pool.wg.Done() - // Start the stats reporting and transaction eviction tickers var prevPending, prevQueued int diff --git a/core/txpool/order_pool.go b/core/txpool/order_pool.go index 44331ecafe..44ff861505 100644 --- a/core/txpool/order_pool.go +++ b/core/txpool/order_pool.go @@ -191,8 +191,7 @@ func NewOrderPool(chainconfig *params.ChainConfig, chain blockChainXDCx) *OrderP pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return - pool.wg.Add(1) - go pool.loop() + pool.wg.Go(pool.loop) return pool } @@ -201,8 +200,6 @@ func NewOrderPool(chainconfig *params.ChainConfig, chain blockChainXDCx) *OrderP // outside blockchain events as well as for various reporting and transaction // eviction events. func (pool *OrderPool) loop() { - defer pool.wg.Done() - // Start the stats reporting and transaction eviction tickers report := time.NewTicker(statsReportInterval) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index c2e2aa3f4c..71f3e08d21 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -334,8 +334,7 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) pool.reset(nil, chain.CurrentBlock().Header()) // Start the reorg loop early so it can handle requests generated during journal loading. - pool.wg.Add(1) - go pool.scheduleReorgLoop() + pool.wg.Go(pool.scheduleReorgLoop) // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { @@ -351,8 +350,7 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) // Subscribe events from blockchain and start the main event loop. pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) - pool.wg.Add(1) - go pool.loop() + pool.wg.Go(pool.loop) return pool } @@ -361,8 +359,6 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) // outside blockchain events as well as for various reporting and transaction // eviction events. func (pool *TxPool) loop() { - defer pool.wg.Done() - var ( prevPending, prevQueued, prevStales int // Start the stats reporting and transaction eviction tickers @@ -1224,8 +1220,6 @@ func (pool *TxPool) queueTxEvent(tx *types.Transaction) { // call those methods directly, but request them being run using requestReset and // requestPromoteExecutables instead. func (pool *TxPool) scheduleReorgLoop() { - defer pool.wg.Done() - var ( curDone chan struct{} // non-nil while runReorg is active nextDone = make(chan struct{}) diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index e52bcdb49c..738a413ef6 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -248,10 +248,8 @@ func XTestDelivery(t *testing.T) { q := newQueue(10, 10) var wg sync.WaitGroup q.Prepare(1, FastSync) - wg.Add(1) - go func() { + wg.Go(func() { // deliver headers - defer wg.Done() c := 1 for { //fmt.Printf("getting headers from %d\n", c) @@ -262,11 +260,9 @@ func XTestDelivery(t *testing.T) { q.Schedule(hdrs, uint64(c)) c += l } - }() - wg.Add(1) - go func() { + }) + wg.Go(func() { // collect results - defer wg.Done() tot := 0 for { res := q.Results(true) @@ -276,10 +272,8 @@ func XTestDelivery(t *testing.T) { world.forget(res[len(res)-1].Header.Number.Uint64()) } - }() - wg.Add(1) - go func() { - defer wg.Done() + }) + wg.Go(func() { // reserve body fetch i := 4 for { @@ -304,10 +298,8 @@ func XTestDelivery(t *testing.T) { time.Sleep(200 * time.Millisecond) } } - }() - wg.Add(1) - go func() { - defer wg.Done() + }) + wg.Go(func() { // reserve receiptfetch peer := dummyPeer("peer-3") for { @@ -326,10 +318,8 @@ func XTestDelivery(t *testing.T) { time.Sleep(200 * time.Millisecond) } } - }() - wg.Add(1) - go func() { - defer wg.Done() + }) + wg.Go(func() { for i := 0; i < 50; i++ { time.Sleep(300 * time.Millisecond) //world.tick() @@ -340,17 +330,15 @@ func XTestDelivery(t *testing.T) { time.Sleep(2990 * time.Millisecond) } - }() - wg.Add(1) - go func() { - defer wg.Done() + }) + wg.Go(func() { for { time.Sleep(990 * time.Millisecond) fmt.Printf("world block tip is %d\n", world.chain[len(world.chain)-1].Header().Number.Uint64()) fmt.Println(q.Stats()) } - }() + }) wg.Wait() } diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index a117c09e66..b1a0c403d5 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -47,10 +47,9 @@ var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain func init() { var forkLen = int(MaxForkAncestry + 50) var wg sync.WaitGroup - wg.Add(3) - go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }() - go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }() - go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }() + wg.Go(func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1) }) + wg.Go(func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2) }) + wg.Go(func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3) }) wg.Wait() } diff --git a/eth/protocol_test.go b/eth/protocol_test.go index b398f90650..bb8c09e636 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -140,7 +140,6 @@ func testSendTransactions(t *testing.T, protocol int) { // Connect several peers. They should all receive the pending transactions. var wg sync.WaitGroup checktxs := func(p *testPeer) { - defer wg.Done() defer p.close() seen := make(map[common.Hash]bool) for _, tx := range alltxs { @@ -172,9 +171,10 @@ func testSendTransactions(t *testing.T, protocol int) { } } for i := 0; i < 3; i++ { - p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), protocol, pm, true) - wg.Add(1) - go checktxs(p) + wg.Go(func() { + p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), protocol, pm, true) + checktxs(p) + }) } wg.Wait() } diff --git a/event/event_test.go b/event/event_test.go index 5a06545638..7edeb696c2 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -190,11 +190,9 @@ func BenchmarkPostConcurrent(b *testing.B) { for i := 0; i < b.N; i++ { mux.Post(testEvent(0)) } - wg.Done() } - wg.Add(5) for i := 0; i < 5; i++ { - go poster() + wg.Go(poster) } wg.Wait() } diff --git a/event/example_scope_test.go b/event/example_scope_test.go index 9db1c9425a..0408303c25 100644 --- a/event/example_scope_test.go +++ b/event/example_scope_test.go @@ -93,9 +93,7 @@ func ExampleSubscriptionScope() { // Run a subscriber in the background. divsub := app.SubscribeResults('/', divs) mulsub := app.SubscribeResults('*', muls) - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { defer fmt.Println("subscriber exited") defer divsub.Unsubscribe() defer mulsub.Unsubscribe() @@ -111,7 +109,7 @@ func ExampleSubscriptionScope() { return } } - }() + }) // Interact with the app. app.Calc('/', 22, 11) diff --git a/event/feed_test.go b/event/feed_test.go index 74e8587a87..800f8af954 100644 --- a/event/feed_test.go +++ b/event/feed_test.go @@ -180,12 +180,10 @@ func TestFeedSubscribeBlockedPost(t *testing.T) { defer wg.Wait() feed.Subscribe(ch1) - wg.Add(nsends) for i := 0; i < nsends; i++ { - go func() { + wg.Go(func() { feed.Send(99) - wg.Done() - }() + }) } sub2 := feed.Subscribe(ch2) @@ -217,12 +215,10 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) { } // Queue up some Sends. None of these can make progress while bchan isn't read. - wg.Add(nsends) for i := 0; i < nsends; i++ { - go func() { + wg.Go(func() { feed.Send(99) - wg.Done() - }() + }) } // Subscribe the other channels. for i, ch := range chans { @@ -250,11 +246,9 @@ func TestFeedUnsubscribeSentChan(t *testing.T) { ) defer sub2.Unsubscribe() - wg.Add(1) - go func() { + wg.Go(func() { feed.Send(0) - wg.Done() - }() + }) // Wait for the value on ch1. <-ch1 @@ -267,11 +261,9 @@ func TestFeedUnsubscribeSentChan(t *testing.T) { // Send again. This should send to ch2 only, so the wait group will unblock // as soon as a value is received on ch2. - wg.Add(1) - go func() { + wg.Go(func() { feed.Send(0) - wg.Done() - }() + }) <-ch2 wg.Wait() } diff --git a/event/feedof_test.go b/event/feedof_test.go index 846afc9ee1..ff6a6f2f04 100644 --- a/event/feedof_test.go +++ b/event/feedof_test.go @@ -124,12 +124,10 @@ func TestFeedOfSubscribeBlockedPost(t *testing.T) { defer wg.Wait() feed.Subscribe(ch1) - wg.Add(nsends) for i := 0; i < nsends; i++ { - go func() { + wg.Go(func() { feed.Send(99) - wg.Done() - }() + }) } sub2 := feed.Subscribe(ch2) @@ -161,12 +159,10 @@ func TestFeedOfUnsubscribeBlockedPost(t *testing.T) { } // Queue up some Sends. None of these can make progress while bchan isn't read. - wg.Add(nsends) for i := 0; i < nsends; i++ { - go func() { + wg.Go(func() { feed.Send(99) - wg.Done() - }() + }) } // Subscribe the other channels. for i, ch := range chans { @@ -194,11 +190,9 @@ func TestFeedOfUnsubscribeSentChan(t *testing.T) { ) defer sub2.Unsubscribe() - wg.Add(1) - go func() { + wg.Go(func() { feed.Send(0) - wg.Done() - }() + }) // Wait for the value on ch1. <-ch1 @@ -211,11 +205,9 @@ func TestFeedOfUnsubscribeSentChan(t *testing.T) { // Send again. This should send to ch2 only, so the wait group will unblock // as soon as a value is received on ch2. - wg.Add(1) - go func() { + wg.Go(func() { feed.Send(0) - wg.Done() - }() + }) <-ch2 wg.Wait() } diff --git a/metrics/counter_float_64_test.go b/metrics/counter_float_64_test.go index 618cbbbc2b..147d9a91e0 100644 --- a/metrics/counter_float_64_test.go +++ b/metrics/counter_float_64_test.go @@ -18,13 +18,11 @@ func BenchmarkCounterFloat64Parallel(b *testing.B) { b.ResetTimer() var wg sync.WaitGroup for i := 0; i < 10; i++ { - wg.Add(1) - go func() { + wg.Go(func() { for i := 0; i < b.N; i++ { c.Inc(1.0) } - wg.Done() - }() + }) } wg.Wait() if have, want := c.Snapshot().Count(), 10.0*float64(b.N); have != want { diff --git a/metrics/gauge_float64_test.go b/metrics/gauge_float64_test.go index 194a18821f..7e34e7e3c8 100644 --- a/metrics/gauge_float64_test.go +++ b/metrics/gauge_float64_test.go @@ -17,13 +17,11 @@ func BenchmarkGaugeFloat64Parallel(b *testing.B) { c := NewGaugeFloat64() var wg sync.WaitGroup for i := 0; i < 10; i++ { - wg.Add(1) - go func() { + wg.Go(func() { for i := 0; i < b.N; i++ { c.Update(float64(i)) } - wg.Done() - }() + }) } wg.Wait() if have, want := c.Snapshot().Value(), float64(b.N-1); have != want { diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index dc144f2425..fbd36ac99a 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -27,10 +27,8 @@ func BenchmarkMetrics(b *testing.B) { RegisterDebugGCStats(r) b.ResetTimer() var wg sync.WaitGroup - wg.Add(128) for i := 0; i < 128; i++ { - go func() { - defer wg.Done() + wg.Go(func() { for i := 0; i < b.N; i++ { c.Inc(1) cf.Inc(1.0) @@ -40,7 +38,7 @@ func BenchmarkMetrics(b *testing.B) { m.Mark(1) t.Update(1) } - }() + }) } wg.Wait() } diff --git a/metrics/registry_test.go b/metrics/registry_test.go index 6af0796da9..f0f829fb9a 100644 --- a/metrics/registry_test.go +++ b/metrics/registry_test.go @@ -27,13 +27,11 @@ func benchmarkRegistryGetOrRegisterParallel(b *testing.B, amount int) { b.ResetTimer() var wg sync.WaitGroup for i := 0; i < amount; i++ { - wg.Add(1) - go func() { + wg.Go(func() { for i := 0; i < b.N; i++ { GetOrRegisterMeter("foo", r) } - wg.Done() - }() + }) } wg.Wait() } diff --git a/p2p/peer.go b/p2p/peer.go index 6cabe89342..4119bac3dd 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -195,9 +195,8 @@ func (p *Peer) run() (remoteRequested bool, err error) { readErr = make(chan error, 1) reason DiscReason // sent to the peer ) - p.wg.Add(2) - go p.readLoop(readErr) - go p.pingLoop() + p.wg.Go(func() { p.readLoop(readErr) }) + p.wg.Go(p.pingLoop) // Start all protocol handlers. writeStart <- struct{}{} @@ -240,8 +239,6 @@ loop: } func (p *Peer) pingLoop() { - defer p.wg.Done() - ping := time.NewTimer(pingInterval) defer ping.Stop() @@ -264,7 +261,6 @@ func (p *Peer) pingLoop() { } func (p *Peer) readLoop(errc chan<- error) { - defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { @@ -353,7 +349,6 @@ outer: } func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { - p.wg.Add(len(p.running)) for _, proto := range p.running { proto.closed = p.closed proto.wstart = writeStart @@ -363,8 +358,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) - - go func() { + p.wg.Go(func() { err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) @@ -373,8 +367,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err - p.wg.Done() - }() + }) } } diff --git a/p2p/rlpx_test.go b/p2p/rlpx_test.go index 83eb97ad03..dcfa18b205 100644 --- a/p2p/rlpx_test.go +++ b/p2p/rlpx_test.go @@ -163,9 +163,7 @@ func TestProtocolHandshake(t *testing.T) { t.Fatal(err) } - wg.Add(2) - go func() { - defer wg.Done() + wg.Go(func() { defer fd0.Close() rlpx := newRLPX(fd0) remid, err := rlpx.doEncHandshake(prv0, node1) @@ -189,9 +187,8 @@ func TestProtocolHandshake(t *testing.T) { return } rlpx.close(DiscQuitting) - }() - go func() { - defer wg.Done() + }) + wg.Go(func() { defer fd1.Close() rlpx := newRLPX(fd1) remid, err := rlpx.doEncHandshake(prv1, nil) @@ -218,7 +215,7 @@ func TestProtocolHandshake(t *testing.T) { if err := ExpectMsg(rlpx, discMsg, []DiscReason{DiscQuitting}); err != nil { t.Errorf("error receiving disconnect: %v", err) } - }() + }) wg.Wait() } diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 6ef2e5b78b..22c6b89f9d 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -293,16 +293,14 @@ func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error { return err } var wg sync.WaitGroup - wg.Add(2) - go wsCopy(&wg, conn, clientConn) - go wsCopy(&wg, clientConn, conn) + wg.Go(func() { wsCopy(conn, clientConn) }) + wg.Go(func() { wsCopy(clientConn, conn) }) wg.Wait() conn.Close() return nil } -func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) { - defer wg.Done() +func wsCopy(src, dst *websocket.Conn) { for { msgType, r, err := src.NextReader() if err != nil { diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index ae93eac796..d2737e33bb 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -131,8 +131,6 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { lowid = rand1 highid = rand2 } - var steps = highid - lowid - wg.Add(steps) for i := lowid; i < highid; i++ { select { case <-quit: @@ -144,17 +142,16 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { err := net.Stop(nodes[i]) if err != nil { log.Error(fmt.Sprintf("Error stopping node %s", nodes[i])) - wg.Done() continue } - go func(id discover.NodeID) { + wg.Go(func() { + id := nodes[i] time.Sleep(randWait) err := net.Start(id) if err != nil { log.Error(fmt.Sprintf("Error starting node %s", id)) } - wg.Done() - }(nodes[i]) + }) } wg.Wait() } diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go index 8cc84f8e83..db19f6ab14 100644 --- a/p2p/simulations/mocker_test.go +++ b/p2p/simulations/mocker_test.go @@ -86,10 +86,7 @@ func TestMocker(t *testing.T) { nodemap := make(map[discover.NodeID]bool) nodesComplete := false connCount := 0 - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { for connCount < (nodeCount-1)*2 { select { case event := <-events: @@ -111,7 +108,7 @@ func TestMocker(t *testing.T) { return } } - }() + }) //take the last element of the mockerlist as the default mocker-type to ensure one is enabled mockertype := mockerlist[len(mockerlist)-1] diff --git a/rlp/encode_test.go b/rlp/encode_test.go index 61bc2d3b35..4daf4de12a 100644 --- a/rlp/encode_test.go +++ b/rlp/encode_test.go @@ -491,8 +491,7 @@ func TestEncodeToReaderReturnToPool(t *testing.T) { buf := make([]byte, 50) wg := new(sync.WaitGroup) for i := 0; i < 5; i++ { - wg.Add(1) - go func() { + wg.Go(func() { for i := 0; i < 1000; i++ { _, r, _ := EncodeToReader("foo") io.ReadAll(r) @@ -501,8 +500,7 @@ func TestEncodeToReaderReturnToPool(t *testing.T) { r.Read(buf) r.Read(buf) } - wg.Done() - }() + }) } wg.Wait() } @@ -572,10 +570,7 @@ func BenchmarkEncodeConcurrentInterface(b *testing.B) { var wg sync.WaitGroup for cpu := 0; cpu < runtime.NumCPU(); cpu++ { - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() { var buffer bytes.Buffer for i := 0; i < b.N; i++ { buffer.Reset() @@ -584,7 +579,7 @@ func BenchmarkEncodeConcurrentInterface(b *testing.B) { panic(err) } } - }() + }) } wg.Wait() } diff --git a/rpc/client_test.go b/rpc/client_test.go index 09ff3902d1..e980d0601a 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -372,7 +372,6 @@ func testClientCancel(transport string, t *testing.T) { ncallers = 10 ) caller := func(index int) { - defer wg.Done() for i := 0; i < nreqs; i++ { var ( ctx context.Context @@ -404,9 +403,8 @@ func testClientCancel(transport string, t *testing.T) { cancel() } } - wg.Add(ncallers) for i := 0; i < ncallers; i++ { - go caller(i) + wg.Go(func() { caller(i) }) } wg.Wait() } diff --git a/rpc/websocket.go b/rpc/websocket.go index 96681e3b14..46e48fa1cc 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -318,8 +318,7 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readL } return nil }) - wc.wg.Add(1) - go wc.pingLoop() + wc.wg.Go(wc.pingLoop) return wc } @@ -347,7 +346,6 @@ func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError // pingLoop sends periodic ping frames when the connection is idle. func (wc *websocketCodec) pingLoop() { var pingTimer = time.NewTimer(wsPingInterval) - defer wc.wg.Done() defer pingTimer.Stop() for { diff --git a/trie/hasher.go b/trie/hasher.go index f81913df9d..255f944628 100644 --- a/trie/hasher.go +++ b/trie/hasher.go @@ -113,9 +113,8 @@ func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached collapsed = n.copy() if h.parallel { var wg sync.WaitGroup - wg.Add(16) for i := 0; i < 16; i++ { - go func(i int) { + wg.Go(func() { hasher := newHasher(false) if child := n.Children[i]; child != nil { collapsed.Children[i], cached.Children[i] = hasher.hash(child, false) @@ -123,8 +122,7 @@ func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached collapsed.Children[i] = nilValueNode } returnHasherToPool(hasher) - wg.Done() - }(i) + }) } wg.Wait() } else {