From c82830d22861e68a4a6f84ea75e142692c5807f6 Mon Sep 17 00:00:00 2001 From: wit liu <765765346@qq.com> Date: Fri, 14 Nov 2025 22:29:43 +0800 Subject: [PATCH] engines/engine_v2: use errgroup to handle goroutine error (#1758) --- consensus/XDPoS/engines/engine_v2/syncInfo.go | 47 ++++++++------- consensus/XDPoS/engines/engine_v2/timeout.go | 60 ++++++++++--------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/consensus/XDPoS/engines/engine_v2/syncInfo.go b/consensus/XDPoS/engines/engine_v2/syncInfo.go index b007379376..7c6b34d001 100644 --- a/consensus/XDPoS/engines/engine_v2/syncInfo.go +++ b/consensus/XDPoS/engines/engine_v2/syncInfo.go @@ -1,17 +1,19 @@ package engine_v2 import ( + "context" "errors" "fmt" + "runtime" "strconv" "strings" - "sync" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus" "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/log" + "golang.org/x/sync/errgroup" ) // Verify syncInfo and trigger process QC or TC if successful @@ -127,31 +129,30 @@ func (x *XDPoS_v2) processSyncInfoPool(chain consensus.ChainReader) { } func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.Signature, candidates []common.Address) error { - var wg sync.WaitGroup - wg.Add(len(signatures)) - var haveError error + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(runtime.NumCPU()) - for _, signature := range signatures { - go func(sig types.Signature) { - defer wg.Done() - verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates) - if err != nil { - log.Error("[verifySignatures] Error while verfying QC message signatures", "error", err) - haveError = errors.New("error while verfying QC message signatures") - return + for _, sig := range signatures { + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates) + if err != nil { + log.Error("[verifySignatures] Error while verifying message signatures", "error", err) + return errors.New("error while verifying QC message signatures") + } + if !verified { + log.Error("[verifySignatures] Signature not verified during signature verification") + return errors.New("fail to verify QC due to signature mismatch") + } + return nil } - if !verified { - log.Error("[verifySignatures] Signature not verified doing signature verification") - haveError = errors.New("fail to verify QC due to signature mismatch") - return - } - }(signature) + }) } - wg.Wait() - if haveError != nil { - return haveError - } - return nil + + return eg.Wait() } func (x *XDPoS_v2) hygieneSyncInfoPool() { diff --git a/consensus/XDPoS/engines/engine_v2/timeout.go b/consensus/XDPoS/engines/engine_v2/timeout.go index 729d8f982d..61f8e52ca0 100644 --- a/consensus/XDPoS/engines/engine_v2/timeout.go +++ b/consensus/XDPoS/engines/engine_v2/timeout.go @@ -1,11 +1,12 @@ package engine_v2 import ( + "context" "errors" "fmt" + "runtime" "strconv" "strings" - "sync" "time" "github.com/XinFinOrg/XDPoSChain/common" @@ -13,6 +14,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/log" + "golang.org/x/sync/errgroup" ) func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) { @@ -184,42 +186,42 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time return utils.ErrInvalidTCSignatures } - var wg sync.WaitGroup - wg.Add(len(signatures)) - - var mutex sync.Mutex - var haveError error - signedTimeoutObj := types.TimeoutSigHash(&types.TimeoutForSign{ Round: timeoutCert.Round, GapNumber: timeoutCert.GapNumber, }) - for _, signature := range signatures { - go func(sig types.Signature) { - defer wg.Done() - verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochCandidates) - if err != nil || !verified { - log.Error("[verifyTC] Error or verification failure", "signature", sig, "error", err) - mutex.Lock() // Lock before accessing haveError - if haveError == nil { - if err != nil { - log.Error("[verifyTC] Error while verfying TC message signatures", "tcRound", timeoutCert.Round, "tcGapNumber", timeoutCert.GapNumber, "tcSignLen", len(signatures), "error", err) - haveError = fmt.Errorf("error while verifying TC message signatures, %s", err) - } else { - log.Warn("[verifyTC] Signature not verified doing TC verification", "tcRound", timeoutCert.Round, "tcGapNumber", timeoutCert.GapNumber, "tcSignLen", len(signatures)) - haveError = errors.New("fail to verify TC due to signature mis-match") - } + eg, ctx := errgroup.WithContext(context.Background()) + eg.SetLimit(runtime.NumCPU()) + + for _, sig := range signatures { + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochCandidates) + if err != nil { + log.Error("[verifyTC] Error while verifying TC message signatures", + "tcRound", timeoutCert.Round, + "tcGapNumber", timeoutCert.GapNumber, + "tcSignLen", len(signatures), + "error", err) + return fmt.Errorf("error while verifying TC message signatures: %w", err) } - mutex.Unlock() // Unlock after modifying haveError + if !verified { + log.Warn("[verifyTC] Signature not verified during TC verification", + "tcRound", timeoutCert.Round, + "tcGapNumber", timeoutCert.GapNumber, + "tcSignLen", len(signatures)) + return errors.New("fail to verify TC due to signature mis-match") + } + return nil } - }(signature) + }) } - wg.Wait() - if haveError != nil { - return haveError - } - return nil + + return eg.Wait() } /*