mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-07-03 19:51:15 +00:00
correct order - dv before importing
This commit is contained in:
parent
360584a498
commit
89d931299f
3 changed files with 69 additions and 39 deletions
|
|
@ -24,6 +24,7 @@ import (
|
|||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"bytes"
|
||||
"github.com/ethereum/go-ethereum/accounts"
|
||||
|
|
@ -52,11 +53,8 @@ import (
|
|||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"time"
|
||||
)
|
||||
|
||||
const NumOfMasternodes = 99
|
||||
|
||||
type LesServer interface {
|
||||
Start(srvr *p2p.Server)
|
||||
Stop()
|
||||
|
|
@ -192,28 +190,25 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
c := eth.engine.(*XDPoS.XDPoS)
|
||||
|
||||
// Hook sends tx sign to smartcontract after inserting block to chain.
|
||||
importedHook := func(block *types.Block) {
|
||||
importedHook := func(block *types.Block) error {
|
||||
snap, err := c.GetSnapshot(eth.blockchain, block.Header())
|
||||
if err != nil {
|
||||
if err == consensus.ErrUnknownAncestor {
|
||||
log.Warn("Block chain forked.", "error", err)
|
||||
} else {
|
||||
log.Error("Fail to get snapshot for sign tx validator.", "error", err)
|
||||
}
|
||||
return
|
||||
return fmt.Errorf("Fail to get snapshot for sign tx validator: %v", err)
|
||||
}
|
||||
if _, authorized := snap.Signers[eth.etherbase]; authorized {
|
||||
// double validation
|
||||
m2, err := getM2(snap, eth, block)
|
||||
if err != nil {
|
||||
log.Error("Fail to validate M2 condition for imported block", "error", err)
|
||||
return
|
||||
return fmt.Errorf("Fail to validate M2 condition for importing block: %v", err)
|
||||
}
|
||||
if eth.etherbase != m2 {
|
||||
// firstly, look into pending txPool
|
||||
pendingMap, err := eth.txPool.Pending()
|
||||
if err != nil {
|
||||
log.Error("Fail to get txPool pending", "err", err)
|
||||
log.Warn("Fail to get txPool pending", "err", err, "Continue with empty txPool pending.")
|
||||
//reset pendingMap
|
||||
pendingMap = map[common.Address]types.Transactions{}
|
||||
}
|
||||
|
|
@ -222,10 +217,9 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
for _, tx := range txsSentFromM2 {
|
||||
if tx.To().String() == common.BlockSigners {
|
||||
if err := contracts.CreateTransactionSign(chainConfig, eth.txPool, eth.accountManager, block, chainDb); err != nil {
|
||||
log.Error("Fail to create tx sign for imported block", "error", err)
|
||||
return
|
||||
return fmt.Errorf("Fail to create tx sign for importing block: %v", err)
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -238,10 +232,9 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
from, err := eth.txPool.GetSender(event.Tx)
|
||||
if (err == nil) && (event.Tx.To().String() == common.BlockSigners) && (from == m2) {
|
||||
if err := contracts.CreateTransactionSign(chainConfig, eth.txPool, eth.accountManager, block, chainDb); err != nil {
|
||||
log.Error("Fail to create tx sign for imported block", "error", err)
|
||||
return
|
||||
return fmt.Errorf("Fail to create tx sign for importing block: %v", err)
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
//timeout 10s
|
||||
case <-time.After(time.Duration(10) * time.Second):
|
||||
|
|
@ -249,11 +242,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
}
|
||||
subEvent.Unsubscribe()
|
||||
} else if err := contracts.CreateTransactionSign(chainConfig, eth.txPool, eth.accountManager, block, chainDb); err != nil {
|
||||
log.Error("Fail to create tx sign for imported block", "error", err)
|
||||
return
|
||||
return fmt.Errorf("Fail to create tx sign for importing block: %v", err)
|
||||
}
|
||||
// end of double validation
|
||||
}
|
||||
return nil
|
||||
}
|
||||
eth.protocolManager.fetcher.SetImportedHook(importedHook)
|
||||
|
||||
|
|
@ -383,6 +376,9 @@ func getM2(snap *XDPoS.Snapshot, eth *Ethereum, block *types.Block) (common.Addr
|
|||
if no%epoch != 0 {
|
||||
cpNo = no - (no % epoch)
|
||||
}
|
||||
if cpNo == 0 {
|
||||
return eth.etherbase, nil
|
||||
}
|
||||
cpBlk := eth.blockchain.GetBlockByNumber(cpNo)
|
||||
m, err := contracts.GetM1M2FromCheckpointBlock(cpBlk)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -138,11 +138,11 @@ type Fetcher struct {
|
|||
dropPeer peerDropFn // Drops a peer for misbehaving
|
||||
|
||||
// Testing hooks
|
||||
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
|
||||
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
|
||||
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
|
||||
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
|
||||
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
|
||||
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
|
||||
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
|
||||
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
|
||||
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
|
||||
importedHook func(*types.Block) error // Method to call upon successful block import (both eth/61 and eth/62)
|
||||
}
|
||||
|
||||
// New creates a block fetcher to retrieve blocks based on hash announcements.
|
||||
|
|
@ -665,6 +665,14 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
|
|||
f.dropPeer(peer)
|
||||
return
|
||||
}
|
||||
// Invoke the imported hook to run double validation layer
|
||||
if f.importedHook != nil {
|
||||
if err := f.importedHook(block); err != nil {
|
||||
log.Error("Double validation failed", "err", err, "Discard this block!")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Run the actual import and log any issues
|
||||
if _, err := f.insertChain(types.Blocks{block}); err != nil {
|
||||
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
||||
|
|
@ -674,10 +682,6 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
|
|||
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
|
||||
go f.broadcastBlock(block, false)
|
||||
|
||||
// Invoke the imported hook if needed
|
||||
if f.importedHook != nil {
|
||||
f.importedHook(block)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
@ -736,6 +740,6 @@ func (f *Fetcher) forgetBlock(hash common.Hash) {
|
|||
}
|
||||
|
||||
// Bind import hook when block imported into chain.
|
||||
func (f *Fetcher) SetImportedHook(importedHook func(*types.Block)) {
|
||||
func (f *Fetcher) SetImportedHook(importedHook func(*types.Block) error) {
|
||||
f.importedHook = importedHook
|
||||
}
|
||||
|
|
@ -288,7 +288,10 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
|
|||
|
||||
// Iteratively announce blocks until all are imported
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := len(hashes) - 2; i >= 0; i-- {
|
||||
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
|
||||
|
|
@ -326,7 +329,10 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
|
|||
}
|
||||
// Iteratively announce blocks until all are imported
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := len(hashes) - 2; i >= 0; i-- {
|
||||
tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
|
||||
|
|
@ -363,7 +369,10 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
|
|||
for i := 0; i < overlap; i++ {
|
||||
imported <- nil
|
||||
}
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := len(hashes) - 2; i >= 0; i-- {
|
||||
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
|
||||
|
|
@ -437,7 +446,10 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
|
|||
|
||||
// Iteratively announce blocks, skipping one entry
|
||||
imported := make(chan *types.Block, len(hashes)-1)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := len(hashes) - 1; i >= 0; i-- {
|
||||
if i != skip {
|
||||
|
|
@ -468,7 +480,10 @@ func testQueueGapFill(t *testing.T, protocol int) {
|
|||
|
||||
// Iteratively announce blocks, skipping one entry
|
||||
imported := make(chan *types.Block, len(hashes)-1)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := len(hashes) - 1; i >= 0; i-- {
|
||||
if i != skip {
|
||||
|
|
@ -505,7 +520,10 @@ func testImportDeduplication(t *testing.T, protocol int) {
|
|||
fetching := make(chan []common.Hash)
|
||||
imported := make(chan *types.Block, len(hashes)-1)
|
||||
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
// Announce the duplicating block, wait for retrieval, and also propagate directly
|
||||
tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
|
||||
|
|
@ -614,7 +632,10 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
|
|||
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
|
||||
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
// Announce a block with a bad number, check for immediate drop
|
||||
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
|
||||
|
|
@ -666,7 +687,10 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
|
|||
tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
|
||||
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
// Iteratively announce blocks until all are imported
|
||||
for i := len(hashes) - 2; i >= 0; i-- {
|
||||
|
|
@ -696,7 +720,10 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
|
|||
tester := newTester()
|
||||
|
||||
imported, announces := make(chan *types.Block), int32(0)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
|
||||
if added {
|
||||
atomic.AddInt32(&announces, 1)
|
||||
|
|
@ -743,7 +770,10 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
|||
tester := newTester()
|
||||
|
||||
imported, enqueued := make(chan *types.Block), int32(0)
|
||||
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
|
||||
if added {
|
||||
atomic.AddInt32(&enqueued, 1)
|
||||
|
|
@ -787,4 +817,4 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
|||
verifyImportEvent(t, imported, true)
|
||||
}
|
||||
verifyImportDone(t, imported)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue