all: use WaigGroup.Go() to simplify code (#1699)

This commit is contained in:
Daniel Liu 2025-11-29 19:47:08 +08:00 committed by GitHub
parent e6b1fe9595
commit ec08863ba0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 100 additions and 210 deletions

View file

@ -406,14 +406,12 @@ func TestImportRace(t *testing.T) {
_, ks2 := tmpKeyStore(t)
var atom atomic.Uint32
var wg sync.WaitGroup
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
wg.Go(func() {
if _, err := ks2.Import(json, "new", "new"); err != nil {
atom.Add(1)
}
}()
})
}
wg.Wait()
if atom.Load() != 1 {

View file

@ -263,23 +263,21 @@ func TestHasherConcurrency(t *testing.T) {
defer pool.Drain(0)
wg := sync.WaitGroup{}
cycles := 100
wg.Add(maxproccnt * cycles)
errc := make(chan error)
for p := 0; p < maxproccnt; p++ {
for i := 0; i < cycles; i++ {
go func() {
wg.Go(func() {
bmt := New(pool)
n := rand.Intn(4096)
tdata := testDataReader(n)
data := make([]byte, n)
tdata.Read(data)
err := testHasherCorrectness(bmt, hasher, data, n, 128)
wg.Done()
if err != nil {
errc <- err
}
}()
})
}
}
go func() {
@ -386,18 +384,16 @@ func benchmarkBMTBaseline(n int, t *testing.B) {
for i := 0; i < t.N; i++ {
count := int32((n-1)/hasher().Size() + 1)
wg := sync.WaitGroup{}
wg.Add(maxproccnt)
var i int32
for j := 0; j < maxproccnt; j++ {
go func() {
defer wg.Done()
wg.Go(func() {
h := hasher()
for atomic.AddInt32(&i, 1) < count {
h.Reset()
h.Write(data)
h.Sum(nil)
}
}()
})
}
wg.Wait()
}
@ -437,15 +433,13 @@ func benchmarkHasherReuse(poolsize, n int, t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
wg := sync.WaitGroup{}
wg.Add(cycles)
for j := 0; j < cycles; j++ {
bmt := New(pool)
go func() {
defer wg.Done()
wg.Go(func() {
bmt := New(pool)
bmt.Reset()
bmt.Write(data)
bmt.Sum(nil)
}()
})
}
wg.Wait()
}

View file

@ -79,9 +79,7 @@ func TestLazyQueue(t *testing.T) {
stopCh = make(chan chan struct{})
)
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for {
select {
case <-clock.After(testQueueRefresh):
@ -92,7 +90,7 @@ func TestLazyQueue(t *testing.T) {
return
}
}
}()
})
for c := 0; c < testSteps; c++ {
i := rand.Intn(testItems)

View file

@ -164,10 +164,9 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has
emptySigner := common.Address{}
// Filter out non-Master nodes signatures
var wg sync.WaitGroup
wg.Add(len(votes))
for h, vote := range votes {
go func(hash common.Hash, v *types.Vote) {
defer wg.Done()
for _, vote := range votes {
wg.Go(func() {
v := vote.(*types.Vote)
signerAddress := v.GetSigner()
if signerAddress != emptySigner {
// verify that signer belongs to the final masternodes, we have not do so in previous steps
@ -199,7 +198,7 @@ func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Has
return
}
v.SetSigner(masterNode)
}(h, vote.(*types.Vote))
})
}
wg.Wait()
elapsed := time.Since(start)

View file

@ -119,8 +119,7 @@ func New(config Config) (*Console, error) {
return nil, err
}
console.wg.Add(1)
go console.interruptHandler()
console.wg.Go(console.interruptHandler)
return console, nil
}
@ -341,8 +340,6 @@ func (c *Console) Evaluate(statement string) {
// interruptHandler runs in its own goroutine and waits for signals.
// When a signal is received, it interrupts the JS interpreter.
func (c *Console) interruptHandler() {
defer c.wg.Done()
// During Interactive, liner inhibits the signal while it is prompting for
// input. However, the signal will be received while evaluating JS.
//

View file

@ -334,8 +334,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
// Start future block processor.
bc.wg.Add(1)
go bc.futureBlocksLoop()
bc.wg.Go(bc.futureBlocksLoop)
return bc, nil
}
@ -2580,8 +2579,6 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
// futureBlocksLoop processes the 'future block' queue.
func (bc *BlockChain) futureBlocksLoop() {
defer bc.wg.Done()
futureTimer := time.NewTicker(100 * time.Millisecond)
defer futureTimer.Stop()

View file

@ -62,9 +62,8 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by
pend := make(chan uint64, cap(dist))
// Start the pipeline schedulers to forward between user -> distributor -> user
wg.Add(2)
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
wg.Go(func() { s.scheduleRequests(sections, dist, pend, quit) })
wg.Go(func() { s.scheduleDeliveries(pend, done, quit) })
}
// reset cleans up any leftovers from previous runs. This is required before a
@ -84,9 +83,8 @@ func (s *scheduler) reset() {
// scheduleRequests reads section retrieval requests from the input channel,
// deduplicates the stream and pushes unique retrieval tasks into the distribution
// channel for a database or network layer to honour.
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(pend)
// Keep reading and scheduling section requests
@ -131,9 +129,8 @@ func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend
// scheduleDeliveries reads section acceptance notifications and waits for them
// to be delivered, pushing them into the output data buffer.
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(done)
// Keep reading notifications and scheduling deliveries

View file

@ -182,16 +182,10 @@ func TestCopy(t *testing.T) {
}
// Finalise the changes on all concurrently
finalise := func(wg *sync.WaitGroup, db *StateDB) {
defer wg.Done()
db.Finalise(true)
}
var wg sync.WaitGroup
wg.Add(3)
go finalise(&wg, orig)
go finalise(&wg, copy)
go finalise(&wg, ccopy)
wg.Go(func() { orig.Finalise(true) })
wg.Go(func() { copy.Finalise(true) })
wg.Go(func() { ccopy.Finalise(true) })
wg.Wait()
// Verify that the three states have been updated independently

View file

@ -585,19 +585,17 @@ func InitSignerInTransactions(config *params.ChainConfig, header *types.Header,
chunkSize++
}
wg := sync.WaitGroup{}
wg.Add(nWorker)
for i := 0; i < nWorker; i++ {
from := i * chunkSize
to := from + chunkSize
if to > txs.Len() {
to = txs.Len()
}
go func(from int, to int) {
wg.Go(func() {
for j := from; j < to; j++ {
types.CacheSigner(signer, txs[j])
}
wg.Done()
}(from, to)
})
}
wg.Wait()
}

View file

@ -182,8 +182,7 @@ func NewLendingPool(chainconfig *params.ChainConfig, chain blockChainLending) *L
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
pool.wg.Go(pool.loop)
return pool
}
@ -192,8 +191,6 @@ func NewLendingPool(chainconfig *params.ChainConfig, chain blockChainLending) *L
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (pool *LendingPool) loop() {
defer pool.wg.Done()
// Start the stats reporting and transaction eviction tickers
var prevPending, prevQueued int

View file

@ -191,8 +191,7 @@ func NewOrderPool(chainconfig *params.ChainConfig, chain blockChainXDCx) *OrderP
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
pool.wg.Go(pool.loop)
return pool
}
@ -201,8 +200,6 @@ func NewOrderPool(chainconfig *params.ChainConfig, chain blockChainXDCx) *OrderP
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (pool *OrderPool) loop() {
defer pool.wg.Done()
// Start the stats reporting and transaction eviction tickers
report := time.NewTicker(statsReportInterval)

View file

@ -334,8 +334,7 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
pool.reset(nil, chain.CurrentBlock().Header())
// Start the reorg loop early so it can handle requests generated during journal loading.
pool.wg.Add(1)
go pool.scheduleReorgLoop()
pool.wg.Go(pool.scheduleReorgLoop)
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
@ -351,8 +350,7 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
// Subscribe events from blockchain and start the main event loop.
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()
pool.wg.Go(pool.loop)
return pool
}
@ -361,8 +359,6 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (pool *TxPool) loop() {
defer pool.wg.Done()
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
@ -1224,8 +1220,6 @@ func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
// call those methods directly, but request them being run using requestReset and
// requestPromoteExecutables instead.
func (pool *TxPool) scheduleReorgLoop() {
defer pool.wg.Done()
var (
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})

View file

@ -248,10 +248,8 @@ func XTestDelivery(t *testing.T) {
q := newQueue(10, 10)
var wg sync.WaitGroup
q.Prepare(1, FastSync)
wg.Add(1)
go func() {
wg.Go(func() {
// deliver headers
defer wg.Done()
c := 1
for {
//fmt.Printf("getting headers from %d\n", c)
@ -262,11 +260,9 @@ func XTestDelivery(t *testing.T) {
q.Schedule(hdrs, uint64(c))
c += l
}
}()
wg.Add(1)
go func() {
})
wg.Go(func() {
// collect results
defer wg.Done()
tot := 0
for {
res := q.Results(true)
@ -276,10 +272,8 @@ func XTestDelivery(t *testing.T) {
world.forget(res[len(res)-1].Header.Number.Uint64())
}
}()
wg.Add(1)
go func() {
defer wg.Done()
})
wg.Go(func() {
// reserve body fetch
i := 4
for {
@ -304,10 +298,8 @@ func XTestDelivery(t *testing.T) {
time.Sleep(200 * time.Millisecond)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
})
wg.Go(func() {
// reserve receiptfetch
peer := dummyPeer("peer-3")
for {
@ -326,10 +318,8 @@ func XTestDelivery(t *testing.T) {
time.Sleep(200 * time.Millisecond)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
})
wg.Go(func() {
for i := 0; i < 50; i++ {
time.Sleep(300 * time.Millisecond)
//world.tick()
@ -340,17 +330,15 @@ func XTestDelivery(t *testing.T) {
time.Sleep(2990 * time.Millisecond)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
})
wg.Go(func() {
for {
time.Sleep(990 * time.Millisecond)
fmt.Printf("world block tip is %d\n",
world.chain[len(world.chain)-1].Header().Number.Uint64())
fmt.Println(q.Stats())
}
}()
})
wg.Wait()
}

View file

@ -47,10 +47,9 @@ var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
func init() {
var forkLen = int(MaxForkAncestry + 50)
var wg sync.WaitGroup
wg.Add(3)
go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()
go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }()
go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }()
wg.Go(func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1) })
wg.Go(func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2) })
wg.Go(func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3) })
wg.Wait()
}

View file

@ -140,7 +140,6 @@ func testSendTransactions(t *testing.T, protocol int) {
// Connect several peers. They should all receive the pending transactions.
var wg sync.WaitGroup
checktxs := func(p *testPeer) {
defer wg.Done()
defer p.close()
seen := make(map[common.Hash]bool)
for _, tx := range alltxs {
@ -172,9 +171,10 @@ func testSendTransactions(t *testing.T, protocol int) {
}
}
for i := 0; i < 3; i++ {
p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), protocol, pm, true)
wg.Add(1)
go checktxs(p)
wg.Go(func() {
p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), protocol, pm, true)
checktxs(p)
})
}
wg.Wait()
}

View file

@ -190,11 +190,9 @@ func BenchmarkPostConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
mux.Post(testEvent(0))
}
wg.Done()
}
wg.Add(5)
for i := 0; i < 5; i++ {
go poster()
wg.Go(poster)
}
wg.Wait()
}

View file

@ -93,9 +93,7 @@ func ExampleSubscriptionScope() {
// Run a subscriber in the background.
divsub := app.SubscribeResults('/', divs)
mulsub := app.SubscribeResults('*', muls)
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
defer fmt.Println("subscriber exited")
defer divsub.Unsubscribe()
defer mulsub.Unsubscribe()
@ -111,7 +109,7 @@ func ExampleSubscriptionScope() {
return
}
}
}()
})
// Interact with the app.
app.Calc('/', 22, 11)

View file

@ -180,12 +180,10 @@ func TestFeedSubscribeBlockedPost(t *testing.T) {
defer wg.Wait()
feed.Subscribe(ch1)
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
wg.Go(func() {
feed.Send(99)
wg.Done()
}()
})
}
sub2 := feed.Subscribe(ch2)
@ -217,12 +215,10 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) {
}
// Queue up some Sends. None of these can make progress while bchan isn't read.
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
wg.Go(func() {
feed.Send(99)
wg.Done()
}()
})
}
// Subscribe the other channels.
for i, ch := range chans {
@ -250,11 +246,9 @@ func TestFeedUnsubscribeSentChan(t *testing.T) {
)
defer sub2.Unsubscribe()
wg.Add(1)
go func() {
wg.Go(func() {
feed.Send(0)
wg.Done()
}()
})
// Wait for the value on ch1.
<-ch1
@ -267,11 +261,9 @@ func TestFeedUnsubscribeSentChan(t *testing.T) {
// Send again. This should send to ch2 only, so the wait group will unblock
// as soon as a value is received on ch2.
wg.Add(1)
go func() {
wg.Go(func() {
feed.Send(0)
wg.Done()
}()
})
<-ch2
wg.Wait()
}

View file

@ -124,12 +124,10 @@ func TestFeedOfSubscribeBlockedPost(t *testing.T) {
defer wg.Wait()
feed.Subscribe(ch1)
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
wg.Go(func() {
feed.Send(99)
wg.Done()
}()
})
}
sub2 := feed.Subscribe(ch2)
@ -161,12 +159,10 @@ func TestFeedOfUnsubscribeBlockedPost(t *testing.T) {
}
// Queue up some Sends. None of these can make progress while bchan isn't read.
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
wg.Go(func() {
feed.Send(99)
wg.Done()
}()
})
}
// Subscribe the other channels.
for i, ch := range chans {
@ -194,11 +190,9 @@ func TestFeedOfUnsubscribeSentChan(t *testing.T) {
)
defer sub2.Unsubscribe()
wg.Add(1)
go func() {
wg.Go(func() {
feed.Send(0)
wg.Done()
}()
})
// Wait for the value on ch1.
<-ch1
@ -211,11 +205,9 @@ func TestFeedOfUnsubscribeSentChan(t *testing.T) {
// Send again. This should send to ch2 only, so the wait group will unblock
// as soon as a value is received on ch2.
wg.Add(1)
go func() {
wg.Go(func() {
feed.Send(0)
wg.Done()
}()
})
<-ch2
wg.Wait()
}

View file

@ -18,13 +18,11 @@ func BenchmarkCounterFloat64Parallel(b *testing.B) {
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
wg.Go(func() {
for i := 0; i < b.N; i++ {
c.Inc(1.0)
}
wg.Done()
}()
})
}
wg.Wait()
if have, want := c.Snapshot().Count(), 10.0*float64(b.N); have != want {

View file

@ -17,13 +17,11 @@ func BenchmarkGaugeFloat64Parallel(b *testing.B) {
c := NewGaugeFloat64()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
wg.Go(func() {
for i := 0; i < b.N; i++ {
c.Update(float64(i))
}
wg.Done()
}()
})
}
wg.Wait()
if have, want := c.Snapshot().Value(), float64(b.N-1); have != want {

View file

@ -27,10 +27,8 @@ func BenchmarkMetrics(b *testing.B) {
RegisterDebugGCStats(r)
b.ResetTimer()
var wg sync.WaitGroup
wg.Add(128)
for i := 0; i < 128; i++ {
go func() {
defer wg.Done()
wg.Go(func() {
for i := 0; i < b.N; i++ {
c.Inc(1)
cf.Inc(1.0)
@ -40,7 +38,7 @@ func BenchmarkMetrics(b *testing.B) {
m.Mark(1)
t.Update(1)
}
}()
})
}
wg.Wait()
}

View file

@ -27,13 +27,11 @@ func benchmarkRegistryGetOrRegisterParallel(b *testing.B, amount int) {
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < amount; i++ {
wg.Add(1)
go func() {
wg.Go(func() {
for i := 0; i < b.N; i++ {
GetOrRegisterMeter("foo", r)
}
wg.Done()
}()
})
}
wg.Wait()
}

View file

@ -195,9 +195,8 @@ func (p *Peer) run() (remoteRequested bool, err error) {
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
p.wg.Go(func() { p.readLoop(readErr) })
p.wg.Go(p.pingLoop)
// Start all protocol handlers.
writeStart <- struct{}{}
@ -240,8 +239,6 @@ loop:
}
func (p *Peer) pingLoop() {
defer p.wg.Done()
ping := time.NewTimer(pingInterval)
defer ping.Stop()
@ -264,7 +261,6 @@ func (p *Peer) pingLoop() {
}
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
if err != nil {
@ -353,7 +349,6 @@ outer:
}
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto.closed = p.closed
proto.wstart = writeStart
@ -363,8 +358,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error)
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
p.wg.Go(func() {
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
@ -373,8 +367,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error)
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
p.wg.Done()
}()
})
}
}

View file

@ -163,9 +163,7 @@ func TestProtocolHandshake(t *testing.T) {
t.Fatal(err)
}
wg.Add(2)
go func() {
defer wg.Done()
wg.Go(func() {
defer fd0.Close()
rlpx := newRLPX(fd0)
remid, err := rlpx.doEncHandshake(prv0, node1)
@ -189,9 +187,8 @@ func TestProtocolHandshake(t *testing.T) {
return
}
rlpx.close(DiscQuitting)
}()
go func() {
defer wg.Done()
})
wg.Go(func() {
defer fd1.Close()
rlpx := newRLPX(fd1)
remid, err := rlpx.doEncHandshake(prv1, nil)
@ -218,7 +215,7 @@ func TestProtocolHandshake(t *testing.T) {
if err := ExpectMsg(rlpx, discMsg, []DiscReason{DiscQuitting}); err != nil {
t.Errorf("error receiving disconnect: %v", err)
}
}()
})
wg.Wait()
}

View file

@ -293,16 +293,14 @@ func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error {
return err
}
var wg sync.WaitGroup
wg.Add(2)
go wsCopy(&wg, conn, clientConn)
go wsCopy(&wg, clientConn, conn)
wg.Go(func() { wsCopy(conn, clientConn) })
wg.Go(func() { wsCopy(clientConn, conn) })
wg.Wait()
conn.Close()
return nil
}
func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) {
defer wg.Done()
func wsCopy(src, dst *websocket.Conn) {
for {
msgType, r, err := src.NextReader()
if err != nil {

View file

@ -131,8 +131,6 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) {
lowid = rand1
highid = rand2
}
var steps = highid - lowid
wg.Add(steps)
for i := lowid; i < highid; i++ {
select {
case <-quit:
@ -144,17 +142,16 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) {
err := net.Stop(nodes[i])
if err != nil {
log.Error(fmt.Sprintf("Error stopping node %s", nodes[i]))
wg.Done()
continue
}
go func(id discover.NodeID) {
wg.Go(func() {
id := nodes[i]
time.Sleep(randWait)
err := net.Start(id)
if err != nil {
log.Error(fmt.Sprintf("Error starting node %s", id))
}
wg.Done()
}(nodes[i])
})
}
wg.Wait()
}

View file

@ -86,10 +86,7 @@ func TestMocker(t *testing.T) {
nodemap := make(map[discover.NodeID]bool)
nodesComplete := false
connCount := 0
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for connCount < (nodeCount-1)*2 {
select {
case event := <-events:
@ -111,7 +108,7 @@ func TestMocker(t *testing.T) {
return
}
}
}()
})
//take the last element of the mockerlist as the default mocker-type to ensure one is enabled
mockertype := mockerlist[len(mockerlist)-1]

View file

@ -491,8 +491,7 @@ func TestEncodeToReaderReturnToPool(t *testing.T) {
buf := make([]byte, 50)
wg := new(sync.WaitGroup)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
wg.Go(func() {
for i := 0; i < 1000; i++ {
_, r, _ := EncodeToReader("foo")
io.ReadAll(r)
@ -501,8 +500,7 @@ func TestEncodeToReaderReturnToPool(t *testing.T) {
r.Read(buf)
r.Read(buf)
}
wg.Done()
}()
})
}
wg.Wait()
}
@ -572,10 +570,7 @@ func BenchmarkEncodeConcurrentInterface(b *testing.B) {
var wg sync.WaitGroup
for cpu := 0; cpu < runtime.NumCPU(); cpu++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var buffer bytes.Buffer
for i := 0; i < b.N; i++ {
buffer.Reset()
@ -584,7 +579,7 @@ func BenchmarkEncodeConcurrentInterface(b *testing.B) {
panic(err)
}
}
}()
})
}
wg.Wait()
}

View file

@ -372,7 +372,6 @@ func testClientCancel(transport string, t *testing.T) {
ncallers = 10
)
caller := func(index int) {
defer wg.Done()
for i := 0; i < nreqs; i++ {
var (
ctx context.Context
@ -404,9 +403,8 @@ func testClientCancel(transport string, t *testing.T) {
cancel()
}
}
wg.Add(ncallers)
for i := 0; i < ncallers; i++ {
go caller(i)
wg.Go(func() { caller(i) })
}
wg.Wait()
}

View file

@ -318,8 +318,7 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readL
}
return nil
})
wc.wg.Add(1)
go wc.pingLoop()
wc.wg.Go(wc.pingLoop)
return wc
}
@ -347,7 +346,6 @@ func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError
// pingLoop sends periodic ping frames when the connection is idle.
func (wc *websocketCodec) pingLoop() {
var pingTimer = time.NewTimer(wsPingInterval)
defer wc.wg.Done()
defer pingTimer.Stop()
for {

View file

@ -113,9 +113,8 @@ func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached
collapsed = n.copy()
if h.parallel {
var wg sync.WaitGroup
wg.Add(16)
for i := 0; i < 16; i++ {
go func(i int) {
wg.Go(func() {
hasher := newHasher(false)
if child := n.Children[i]; child != nil {
collapsed.Children[i], cached.Children[i] = hasher.hash(child, false)
@ -123,8 +122,7 @@ func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached
collapsed.Children[i] = nilValueNode
}
returnHasherToPool(hasher)
wg.Done()
}(i)
})
}
wg.Wait()
} else {