engines/engine_v2: use errgroup to handle goroutine error (#1758)

This commit is contained in:
wit liu 2025-11-14 22:29:43 +08:00 committed by GitHub
parent 5b873ea522
commit c82830d228
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 55 additions and 52 deletions

View file

@ -1,17 +1,19 @@
package engine_v2
import (
"context"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"golang.org/x/sync/errgroup"
)
// Verify syncInfo and trigger process QC or TC if successful
@ -127,31 +129,30 @@ func (x *XDPoS_v2) processSyncInfoPool(chain consensus.ChainReader) {
}
func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.Signature, candidates []common.Address) error {
var wg sync.WaitGroup
wg.Add(len(signatures))
var haveError error
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(runtime.NumCPU())
for _, signature := range signatures {
go func(sig types.Signature) {
defer wg.Done()
verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates)
if err != nil {
log.Error("[verifySignatures] Error while verfying QC message signatures", "error", err)
haveError = errors.New("error while verfying QC message signatures")
return
for _, sig := range signatures {
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates)
if err != nil {
log.Error("[verifySignatures] Error while verifying message signatures", "error", err)
return errors.New("error while verifying QC message signatures")
}
if !verified {
log.Error("[verifySignatures] Signature not verified during signature verification")
return errors.New("fail to verify QC due to signature mismatch")
}
return nil
}
if !verified {
log.Error("[verifySignatures] Signature not verified doing signature verification")
haveError = errors.New("fail to verify QC due to signature mismatch")
return
}
}(signature)
})
}
wg.Wait()
if haveError != nil {
return haveError
}
return nil
return eg.Wait()
}
func (x *XDPoS_v2) hygieneSyncInfoPool() {

View file

@ -1,11 +1,12 @@
package engine_v2
import (
"context"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/common"
@ -13,6 +14,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"golang.org/x/sync/errgroup"
)
func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) {
@ -184,42 +186,42 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
return utils.ErrInvalidTCSignatures
}
var wg sync.WaitGroup
wg.Add(len(signatures))
var mutex sync.Mutex
var haveError error
signedTimeoutObj := types.TimeoutSigHash(&types.TimeoutForSign{
Round: timeoutCert.Round,
GapNumber: timeoutCert.GapNumber,
})
for _, signature := range signatures {
go func(sig types.Signature) {
defer wg.Done()
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochCandidates)
if err != nil || !verified {
log.Error("[verifyTC] Error or verification failure", "signature", sig, "error", err)
mutex.Lock() // Lock before accessing haveError
if haveError == nil {
if err != nil {
log.Error("[verifyTC] Error while verfying TC message signatures", "tcRound", timeoutCert.Round, "tcGapNumber", timeoutCert.GapNumber, "tcSignLen", len(signatures), "error", err)
haveError = fmt.Errorf("error while verifying TC message signatures, %s", err)
} else {
log.Warn("[verifyTC] Signature not verified doing TC verification", "tcRound", timeoutCert.Round, "tcGapNumber", timeoutCert.GapNumber, "tcSignLen", len(signatures))
haveError = errors.New("fail to verify TC due to signature mis-match")
}
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(runtime.NumCPU())
for _, sig := range signatures {
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochCandidates)
if err != nil {
log.Error("[verifyTC] Error while verifying TC message signatures",
"tcRound", timeoutCert.Round,
"tcGapNumber", timeoutCert.GapNumber,
"tcSignLen", len(signatures),
"error", err)
return fmt.Errorf("error while verifying TC message signatures: %w", err)
}
mutex.Unlock() // Unlock after modifying haveError
if !verified {
log.Warn("[verifyTC] Signature not verified during TC verification",
"tcRound", timeoutCert.Round,
"tcGapNumber", timeoutCert.GapNumber,
"tcSignLen", len(signatures))
return errors.New("fail to verify TC due to signature mis-match")
}
return nil
}
}(signature)
})
}
wg.Wait()
if haveError != nil {
return haveError
}
return nil
return eg.Wait()
}
/*