From 89d931299f5e5f83753f3c8034109421bcf1f338 Mon Sep 17 00:00:00 2001 From: MestryOmkar Date: Fri, 9 Nov 2018 14:46:03 +0530 Subject: [PATCH] correct order - dv before importing --- eth/backend.go | 32 ++++++++++------------- eth/fetcher/fetcher.go | 24 ++++++++++------- eth/fetcher/fetcher_test.go | 52 +++++++++++++++++++++++++++++-------- 3 files changed, 69 insertions(+), 39 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 114b61ed51..cdfdaf7369 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -24,6 +24,7 @@ import ( "runtime" "sync" "sync/atomic" + "time" "bytes" "github.com/ethereum/go-ethereum/accounts" @@ -52,11 +53,8 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" - "time" ) -const NumOfMasternodes = 99 - type LesServer interface { Start(srvr *p2p.Server) Stop() @@ -192,28 +190,25 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { c := eth.engine.(*XDPoS.XDPoS) // Hook sends tx sign to smartcontract after inserting block to chain. - importedHook := func(block *types.Block) { + importedHook := func(block *types.Block) error { snap, err := c.GetSnapshot(eth.blockchain, block.Header()) if err != nil { if err == consensus.ErrUnknownAncestor { log.Warn("Block chain forked.", "error", err) - } else { - log.Error("Fail to get snapshot for sign tx validator.", "error", err) } - return + return fmt.Errorf("Fail to get snapshot for sign tx validator: %v", err) } if _, authorized := snap.Signers[eth.etherbase]; authorized { // double validation m2, err := getM2(snap, eth, block) if err != nil { - log.Error("Fail to validate M2 condition for imported block", "error", err) - return + return fmt.Errorf("Fail to validate M2 condition for importing block: %v", err) } if eth.etherbase != m2 { // firstly, look into pending txPool pendingMap, err := eth.txPool.Pending() if err != nil { - log.Error("Fail to get txPool pending", "err", err) + log.Warn("Fail to get txPool pending", "err", err, "Continue with empty txPool pending.") //reset pendingMap pendingMap = map[common.Address]types.Transactions{} } @@ -222,10 +217,9 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { for _, tx := range txsSentFromM2 { if tx.To().String() == common.BlockSigners { if err := contracts.CreateTransactionSign(chainConfig, eth.txPool, eth.accountManager, block, chainDb); err != nil { - log.Error("Fail to create tx sign for imported block", "error", err) - return + return fmt.Errorf("Fail to create tx sign for importing block: %v", err) } - return + return nil } } } @@ -238,10 +232,9 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { from, err := eth.txPool.GetSender(event.Tx) if (err == nil) && (event.Tx.To().String() == common.BlockSigners) && (from == m2) { if err := contracts.CreateTransactionSign(chainConfig, eth.txPool, eth.accountManager, block, chainDb); err != nil { - log.Error("Fail to create tx sign for imported block", "error", err) - return + return fmt.Errorf("Fail to create tx sign for importing block: %v", err) } - return + return nil } //timeout 10s case <-time.After(time.Duration(10) * time.Second): @@ -249,11 +242,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } subEvent.Unsubscribe() } else if err := contracts.CreateTransactionSign(chainConfig, eth.txPool, eth.accountManager, block, chainDb); err != nil { - log.Error("Fail to create tx sign for imported block", "error", err) - return + return fmt.Errorf("Fail to create tx sign for importing block: %v", err) } // end of double validation } + return nil } eth.protocolManager.fetcher.SetImportedHook(importedHook) @@ -383,6 +376,9 @@ func getM2(snap *XDPoS.Snapshot, eth *Ethereum, block *types.Block) (common.Addr if no%epoch != 0 { cpNo = no - (no % epoch) } + if cpNo == 0 { + return eth.etherbase, nil + } cpBlk := eth.blockchain.GetBlockByNumber(cpNo) m, err := contracts.GetM1M2FromCheckpointBlock(cpBlk) if err != nil { diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 7043a256e3..d7f0aa553d 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -138,11 +138,11 @@ type Fetcher struct { dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks - announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list - queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue - fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch - completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) - importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) + announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list + queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue + fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch + completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) + importedHook func(*types.Block) error // Method to call upon successful block import (both eth/61 and eth/62) } // New creates a block fetcher to retrieve blocks based on hash announcements. @@ -665,6 +665,14 @@ func (f *Fetcher) insert(peer string, block *types.Block) { f.dropPeer(peer) return } + // Invoke the imported hook to run double validation layer + if f.importedHook != nil { + if err := f.importedHook(block); err != nil { + log.Error("Double validation failed", "err", err, "Discard this block!") + return + } + } + // Run the actual import and log any issues if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) @@ -674,10 +682,6 @@ func (f *Fetcher) insert(peer string, block *types.Block) { propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false) - // Invoke the imported hook if needed - if f.importedHook != nil { - f.importedHook(block) - } }() } @@ -736,6 +740,6 @@ func (f *Fetcher) forgetBlock(hash common.Hash) { } // Bind import hook when block imported into chain. -func (f *Fetcher) SetImportedHook(importedHook func(*types.Block)) { +func (f *Fetcher) SetImportedHook(importedHook func(*types.Block) error) { f.importedHook = importedHook } \ No newline at end of file diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index 9d53b98b60..ec84ae03f5 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -288,7 +288,10 @@ func testSequentialAnnouncements(t *testing.T, protocol int) { // Iteratively announce blocks until all are imported imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -326,7 +329,10 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) { } // Iteratively announce blocks until all are imported imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher) @@ -363,7 +369,10 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) { for i := 0; i < overlap; i++ { imported <- nil } - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -437,7 +446,10 @@ func testRandomArrivalImport(t *testing.T, protocol int) { // Iteratively announce blocks, skipping one entry imported := make(chan *types.Block, len(hashes)-1) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } for i := len(hashes) - 1; i >= 0; i-- { if i != skip { @@ -468,7 +480,10 @@ func testQueueGapFill(t *testing.T, protocol int) { // Iteratively announce blocks, skipping one entry imported := make(chan *types.Block, len(hashes)-1) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } for i := len(hashes) - 1; i >= 0; i-- { if i != skip { @@ -505,7 +520,10 @@ func testImportDeduplication(t *testing.T, protocol int) { fetching := make(chan []common.Hash) imported := make(chan *types.Block, len(hashes)-1) tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } // Announce the duplicating block, wait for retrieval, and also propagate directly tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -614,7 +632,10 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) { badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } // Announce a block with a bad number, check for immediate drop tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher) @@ -666,7 +687,10 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) { tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes } imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } // Iteratively announce blocks until all are imported for i := len(hashes) - 2; i >= 0; i-- { @@ -696,7 +720,10 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { tester := newTester() imported, announces := make(chan *types.Block), int32(0) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) { if added { atomic.AddInt32(&announces, 1) @@ -743,7 +770,10 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { tester := newTester() imported, enqueued := make(chan *types.Block), int32(0) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(block *types.Block) error { + imported <- block + return nil + } tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) { if added { atomic.AddInt32(&enqueued, 1) @@ -787,4 +817,4 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { verifyImportEvent(t, imported, true) } verifyImportDone(t, imported) -} +} \ No newline at end of file