diff --git a/consensus/XDPoS/XDPoS.go b/consensus/XDPoS/XDPoS.go index b9a56e36eb..c86a4d4957 100644 --- a/consensus/XDPoS/XDPoS.go +++ b/consensus/XDPoS/XDPoS.go @@ -24,6 +24,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/engines/engine_v1" "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/engines/engine_v2" "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" + "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/consensus/clique" "github.com/XinFinOrg/XDPoSChain/core/state" @@ -65,6 +66,11 @@ type XDPoS struct { EngineV2 *engine_v2.XDPoS_v2 } +// Subscribe to consensus engines forensics events. Currently only exist for engine v2 +func (x *XDPoS) SubscribeForensicsEvent(ch chan<- types.ForensicsEvent) event.Subscription { + return x.EngineV2.ForensicsProcessor.SubscribeForensicsEvent(ch) +} + // New creates a XDPoS delegated-proof-of-stake consensus engine with the initial // signers set to the ones provided by the user. func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS { diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 8b46776652..0bcc0d70b7 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -58,7 +58,7 @@ type XDPoS_v2 struct { HookReward func(chain consensus.ChainReader, state *state.StateDB, parentState *state.StateDB, header *types.Header) (map[string]interface{}, error) HookPenalty func(chain consensus.ChainReader, number *big.Int, parentHash common.Hash, candidates []common.Address) ([]common.Address, error) - forensics *Forensics + ForensicsProcessor *Forensics } func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *XDPoS_v2 { @@ -107,7 +107,7 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) * }, highestVotedRound: types.Round(0), highestCommitBlock: nil, - forensics: NewForensics(), + ForensicsProcessor: NewForensics(), } // Add callback to the timer timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout @@ -925,7 +925,7 @@ func (x *XDPoS_v2) commitBlocks(blockChainReader consensus.ChainReader, proposed // Perform forensics related operation var headerQcToBeCommitted []types.Header headerQcToBeCommitted = append(headerQcToBeCommitted, *parentBlock, *proposedBlockHeader) - go x.forensics.ForensicsMonitoring(blockChainReader, x, headerQcToBeCommitted, *incomingQc) + go x.ForensicsProcessor.ForensicsMonitoring(blockChainReader, x, headerQcToBeCommitted, *incomingQc) return true, nil } // Everything else, fail to commit diff --git a/consensus/XDPoS/engines/engine_v2/forensics.go b/consensus/XDPoS/engines/engine_v2/forensics.go index 4f0750a7d3..da841ca4aa 100644 --- a/consensus/XDPoS/engines/engine_v2/forensics.go +++ b/consensus/XDPoS/engines/engine_v2/forensics.go @@ -10,6 +10,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto" + "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/log" ) @@ -17,22 +18,11 @@ const ( NUM_OF_FORENSICS_QC = 3 ) -type ForensicsInfo struct { - HashPath []string // HashesTillSmallerRoundQc or HashesTillLargerRoundQc - QuorumCert types.QuorumCert - SignerAddresses []string -} - -type ForensicProof struct { - SmallerRoundInfo *ForensicsInfo - LargerRoundInfo *ForensicsInfo - DivergingHash common.Hash - AcrossEpochs bool -} - // Forensics instance. Placeholder for future properties to be added type Forensics struct { HighestCommittedQCs []types.QuorumCert + forensicsFeed event.Feed + scope event.SubscriptionScope } // Initiate a forensics process @@ -40,6 +30,12 @@ func NewForensics() *Forensics { return &Forensics{} } +// SubscribeForensicsEvent registers a subscription of ForensicsEvent and +// starts sending event to the given channel. +func (f *Forensics) SubscribeForensicsEvent(ch chan<- types.ForensicsEvent) event.Subscription { + return f.scope.Track(f.forensicsFeed.Subscribe(ch)) +} + func (f *Forensics) ForensicsMonitoring(chain consensus.ChainReader, engine *XDPoS_v2, headerQcToBeCommitted []types.Header, incomingQC types.QuorumCert) error { f.ProcessForensics(chain, engine, incomingQC) return f.SetCommittedQCs(headerQcToBeCommitted, incomingQC) @@ -132,7 +128,7 @@ func (f *Forensics) SendForensicProof(chain consensus.ChainReader, engine *XDPoS lowerRoundQC := firstQc higherRoundQC := secondQc - if (secondQc.ProposedBlockInfo.Round - firstQc.ProposedBlockInfo.Round) < 0 { + if secondQc.ProposedBlockInfo.Round < firstQc.ProposedBlockInfo.Round { lowerRoundQC = secondQc higherRoundQC = firstQc } @@ -146,28 +142,36 @@ func (f *Forensics) SendForensicProof(chain consensus.ChainReader, engine *XDPoS // Check if two QCs are across epoch, this is used as a indicator for the "prone to attack" scenario lowerRoundQcEpochSwitchInfo, err := engine.getEpochSwitchInfo(chain, nil, lowerRoundQC.ProposedBlockInfo.Hash) + if err != nil { + log.Error("[SendForensicProof] Errir while trying to find lowerRoundQcEpochSwitchInfo", "lowerRoundQC.ProposedBlockInfo.Hash", lowerRoundQC.ProposedBlockInfo.Hash, "err", err) + return err + } higherRoundQcEpochSwitchInfo, err := engine.getEpochSwitchInfo(chain, nil, higherRoundQC.ProposedBlockInfo.Hash) + if err != nil { + log.Error("[SendForensicProof] Errir while trying to find higherRoundQcEpochSwitchInfo", "higherRoundQC.ProposedBlockInfo.Hash", higherRoundQC.ProposedBlockInfo.Hash, "err", err) + return err + } accrossEpoches := false if lowerRoundQcEpochSwitchInfo.EpochSwitchBlockInfo.Hash != higherRoundQcEpochSwitchInfo.EpochSwitchBlockInfo.Hash { accrossEpoches = true } - forensicsProof := &ForensicProof{ + forensicsProof := &types.ForensicProof{ DivergingHash: ancestorHash, AcrossEpochs: accrossEpoches, - SmallerRoundInfo: &ForensicsInfo{ + SmallerRoundInfo: &types.ForensicsInfo{ HashPath: ancestorToLowerRoundPath, QuorumCert: lowerRoundQC, SignerAddresses: f.getQcSignerAddresses(lowerRoundQC), }, - LargerRoundInfo: &ForensicsInfo{ + LargerRoundInfo: &types.ForensicsInfo{ HashPath: ancestorToHigherRoundPath, QuorumCert: higherRoundQC, SignerAddresses: f.getQcSignerAddresses(higherRoundQC), }, } - // TODO: send to dedicated channel which will redirect to stats server log.Info("Forensics proof report generated, sending to the stats server", forensicsProof) + go f.forensicsFeed.Send(types.ForensicsEvent{ForensicsProof: forensicsProof}) return nil } diff --git a/consensus/XDPoS/engines/engine_v2/testing_utils.go b/consensus/XDPoS/engines/engine_v2/testing_utils.go index 033cd7fb31..030080b62a 100644 --- a/consensus/XDPoS/engines/engine_v2/testing_utils.go +++ b/consensus/XDPoS/engines/engine_v2/testing_utils.go @@ -84,5 +84,5 @@ func (x *XDPoS_v2) AuthorizeFaker(signer common.Address) { } func (x *XDPoS_v2) GetForensicsFaker() *Forensics { - return x.forensics + return x.ForensicsProcessor } diff --git a/consensus/tests/engine_v2_tests/forensics_test.go b/consensus/tests/engine_v2_tests/forensics_test.go index cc5c2ae9dd..b89874d111 100644 --- a/consensus/tests/engine_v2_tests/forensics_test.go +++ b/consensus/tests/engine_v2_tests/forensics_test.go @@ -156,9 +156,33 @@ func TestForensicsMonitoringNotOnSameChainButHaveSameRoundQC(t *testing.T) { parentOfForkedHeader := blockchain.GetBlockByHash(currentForkBlock.ParentHash()).Header() grandParentOfForkedHeader := blockchain.GetBlockByHash(parentOfForkedHeader.ParentHash).Header() forkedHeaders = append(forkedHeaders, *grandParentOfForkedHeader, *parentOfForkedHeader) + + // Set up forensics events trigger + forensicsEventCh := make(chan types.ForensicsEvent) + forensics.SubscribeForensicsEvent(forensicsEventCh) + err = forensics.ForensicsMonitoring(blockchain, blockchain.Engine().(*XDPoS.XDPoS).EngineV2, forkedHeaders, *incomingQC) assert.Nil(t, err) - // TODO: Check SendForensicProof triggered + + // Check SendForensicProof triggered + for { + select { + case forensics := <-forensicsEventCh: + assert.NotNil(t, forensics.ForensicsProof) + assert.False(t, forensics.ForensicsProof.AcrossEpochs) + assert.Equal(t, types.Round(13), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Round) + assert.Equal(t, uint64(913), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64()) + assert.Equal(t, 9, len(forensics.ForensicsProof.SmallerRoundInfo.HashPath)) + assert.Equal(t, 4, len(forensics.ForensicsProof.SmallerRoundInfo.SignerAddresses)) + assert.Equal(t, types.Round(13), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Round) + assert.Equal(t, uint64(912), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64()) + assert.Equal(t, 8, len(forensics.ForensicsProof.LargerRoundInfo.HashPath)) + assert.Equal(t, 4, len(forensics.ForensicsProof.LargerRoundInfo.SignerAddresses)) + return + case <-time.After(5 * time.Second): + t.FailNow() + } + } } func TestForensicsMonitoringNotOnSameChainDoNotHaveSameRoundQC(t *testing.T) { @@ -190,7 +214,86 @@ func TestForensicsMonitoringNotOnSameChainDoNotHaveSameRoundQC(t *testing.T) { grandParentOfForkedHeader := blockchain.GetBlockByHash(parentOfForkedHeader.ParentHash).Header() forkedHeaders = append(forkedHeaders, *grandParentOfForkedHeader, *parentOfForkedHeader) + // Set up forensics events trigger + forensicsEventCh := make(chan types.ForensicsEvent) + forensics.SubscribeForensicsEvent(forensicsEventCh) + err = forensics.ForensicsMonitoring(blockchain, blockchain.Engine().(*XDPoS.XDPoS).EngineV2, forkedHeaders, *incomingQC) assert.Nil(t, err) - // TODO: Check SendForensicProof triggered + // Check SendForensicProof triggered + for { + select { + case forensics := <-forensicsEventCh: + assert.NotNil(t, forensics.ForensicsProof) + assert.False(t, forensics.ForensicsProof.AcrossEpochs) + assert.Equal(t, types.Round(14), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Round) + assert.Equal(t, uint64(914), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64()) + assert.Equal(t, 10, len(forensics.ForensicsProof.SmallerRoundInfo.HashPath)) + assert.Equal(t, 4, len(forensics.ForensicsProof.SmallerRoundInfo.SignerAddresses)) + assert.Equal(t, types.Round(16), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Round) + assert.Equal(t, uint64(906), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64()) + assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.HashPath)) + assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.SignerAddresses)) + return + case <-time.After(5 * time.Second): + t.FailNow() + } + } +} + +// "prone to attack" test where the "across epoch" field is true +func TestForensicsAcrossEpoch(t *testing.T) { + var numOfForks = new(int) + *numOfForks = 10 + var forkRoundDifference = new(int) + *forkRoundDifference = 10 + var forkedChainSignersKey []*ecdsa.PrivateKey + forkedChainSignersKey = append(forkedChainSignersKey, acc1Key) + blockchain, _, _, _, _, currentForkBlock := PrepareXDCTestBlockChainForV2Engine(t, 1801, params.TestXDPoSMockChainConfig, &ForkedBlockOptions{numOfForkedBlocks: numOfForks, forkedRoundDifference: forkRoundDifference, signersKey: forkedChainSignersKey}) + forensics := blockchain.Engine().(*XDPoS.XDPoS).EngineV2.GetForensicsFaker() + + // Now, let's try set committed blocks, where the highestedCommitted blocks are 1799, 1800 and 1801 + var headers []types.Header + var decodedBlock1801ExtraField types.ExtraFields_v2 + err := utils.DecodeBytesExtraFields(blockchain.GetHeaderByNumber(1801).Extra, &decodedBlock1801ExtraField) + assert.Nil(t, err) + err = forensics.SetCommittedQCs(append(headers, *blockchain.GetHeaderByNumber(1799), *blockchain.GetHeaderByNumber(1800)), *decodedBlock1801ExtraField.QuorumCert) + assert.Nil(t, err) + + var decodedExtraField types.ExtraFields_v2 + // Decode the QC from forking chain + err = utils.DecodeBytesExtraFields(currentForkBlock.Header().Extra, &decodedExtraField) + assert.Nil(t, err) + + incomingQC := decodedExtraField.QuorumCert + var forkedHeaders []types.Header + parentOfForkedHeader := blockchain.GetBlockByHash(currentForkBlock.ParentHash()).Header() + grandParentOfForkedHeader := blockchain.GetBlockByHash(parentOfForkedHeader.ParentHash).Header() + forkedHeaders = append(forkedHeaders, *grandParentOfForkedHeader, *parentOfForkedHeader) + + // Set up forensics events trigger + forensicsEventCh := make(chan types.ForensicsEvent) + forensics.SubscribeForensicsEvent(forensicsEventCh) + + err = forensics.ForensicsMonitoring(blockchain, blockchain.Engine().(*XDPoS.XDPoS).EngineV2, forkedHeaders, *incomingQC) + assert.Nil(t, err) + // Check SendForensicProof triggered + for { + select { + case forensics := <-forensicsEventCh: + assert.NotNil(t, forensics.ForensicsProof) + assert.True(t, forensics.ForensicsProof.AcrossEpochs) + assert.Equal(t, types.Round(900), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Round) + assert.Equal(t, uint64(1800), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64()) + assert.Equal(t, 10, len(forensics.ForensicsProof.SmallerRoundInfo.HashPath)) + assert.Equal(t, 4, len(forensics.ForensicsProof.SmallerRoundInfo.SignerAddresses)) + assert.Equal(t, types.Round(902), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Round) + assert.Equal(t, uint64(1792), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64()) + assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.HashPath)) + assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.SignerAddresses)) + return + case <-time.After(5 * time.Second): + t.FailNow() + } + } } diff --git a/core/types/forensics.go b/core/types/forensics.go new file mode 100644 index 0000000000..0d0ac54f2f --- /dev/null +++ b/core/types/forensics.go @@ -0,0 +1,20 @@ +package types + +import "github.com/XinFinOrg/XDPoSChain/common" + +type ForensicsInfo struct { + HashPath []string // HashesTillSmallerRoundQc or HashesTillLargerRoundQc + QuorumCert QuorumCert + SignerAddresses []string +} + +type ForensicProof struct { + SmallerRoundInfo *ForensicsInfo + LargerRoundInfo *ForensicsInfo + DivergingHash common.Hash + AcrossEpochs bool +} + +type ForensicsEvent struct { + ForensicsProof *ForensicProof +} diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index dc8003d936..4a380dec56 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -33,6 +33,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/common/mclock" "github.com/XinFinOrg/XDPoSChain/consensus" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/eth" @@ -56,6 +57,12 @@ const ( chainHeadChanSize = 10 ) +type consensusEngine interface { + // SubscribeForensicsEvent should return an event subscription of + // ForensicsEvent and send events to the given channel. + SubscribeForensicsEvent(chan<- types.ForensicsEvent) event.Subscription +} + type txPool interface { // SubscribeTxPreEvent should return an event subscription of // TxPreEvent and send events to the given channel. @@ -140,9 +147,11 @@ func (s *Service) loop() { // Subscribe to chain events to execute updates on var blockchain blockChain var txpool txPool + var engine consensusEngine if s.eth != nil { blockchain = s.eth.BlockChain() txpool = s.eth.TxPool() + engine = s.eth.Engine().(*XDPoS.XDPoS) } else { blockchain = s.les.BlockChain() txpool = s.les.TxPool() @@ -156,11 +165,19 @@ func (s *Service) loop() { txSub := txpool.SubscribeTxPreEvent(txEventCh) defer txSub.Unsubscribe() + // Forensics events + forensicsEventCh := make(chan types.ForensicsEvent) + if engine != nil { + forensicsSub := engine.SubscribeForensicsEvent(forensicsEventCh) + defer forensicsSub.Unsubscribe() + } + // Start a goroutine that exhausts the subsciptions to avoid events piling up var ( - quitCh = make(chan struct{}) - headCh = make(chan *types.Block, 1) - txCh = make(chan struct{}, 1) + quitCh = make(chan struct{}) + headCh = make(chan *types.Block, 1) + txCh = make(chan struct{}, 1) + forensicsCh = make(chan *types.ForensicProof, 1) ) go func() { var lastTx mclock.AbsTime @@ -168,6 +185,11 @@ func (s *Service) loop() { HandleLoop: for { select { + case forensics := <-forensicsEventCh: + select { + case forensicsCh <- forensics.ForensicsProof: + default: + } // Notify of chain head events, but drop if too frequent case head := <-chainHeadCh: select { @@ -268,6 +290,10 @@ func (s *Service) loop() { if err = s.reportPending(conn); err != nil { log.Warn("Transaction stats report failed", "err", err) } + case forensicsReport := <-forensicsCh: + if err = s.reportForensics(conn, forensicsReport); err != nil { + log.Error("Forensics proof stats report failed", "err", err) + } } } // Make sure the connection is closed @@ -519,6 +545,24 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error { return websocket.JSON.Send(conn, report) } +// reportForensics forward the forensics repors it to the stats server. +func (s *Service) reportForensics(conn *websocket.Conn, forensicsProof *types.ForensicProof) error { + log.Info( + "Sending Forensics report to ethstats", + "SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Hash", forensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Hash, + "LargerRoundInfo.QuorumCert.ProposedBlockInfo.Hash", forensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Hash, + ) + + stats := map[string]interface{}{ + "id": s.node, + "forensicsProof": forensicsProof, + } + report := map[string][]interface{}{ + "emit": {"forensics", stats}, + } + return websocket.JSON.Send(conn, report) +} + // assembleBlockStats retrieves any required metadata to report a single block // and assembles the block stats. If block is nil, the current head is processed. func (s *Service) assembleBlockStats(block *types.Block) *blockStats {