add a pair connections with each peer

This commit is contained in:
parmarrushabh 2018-11-07 12:23:51 +05:30
parent 9414ae6f2a
commit 9fb0674907
10 changed files with 152 additions and 65 deletions

View file

@ -271,38 +271,40 @@ func (pm *ProtocolManager) handle(p *peer) error {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
err := pm.peers.Register(p)
if err != nil && err != p2p.ErrAddPairPeer {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
if err != p2p.ErrAddPairPeer {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
}()
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
}
// main loop. handle incoming messages.
for {

View file

@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
@ -65,6 +66,7 @@ type peer struct {
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
pairRw p2p.MsgReadWriter
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@ -156,7 +158,13 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
// SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
if p.pairRw != nil {
log.Trace("p2p SendNewBlock with pairRw", "p", p, "number", block.NumberU64())
return p2p.Send(p.pairRw, NewBlockMsg, []interface{}{block, td})
} else {
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}
}
// SendBlockHeaders sends a batch of block headers to the remote peer.
@ -321,8 +329,14 @@ func (ps *peerSet) Register(p *peer) error {
if ps.closed {
return errClosed
}
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
if exitPeer, ok := ps.peers[p.id]; ok {
if exitPeer.pairRw != nil {
return errAlreadyRegistered
}
exitPeer.PairPeer = p.Peer
exitPeer.pairRw = p.rw
p.PairPeer = exitPeer.Peer
return p2p.ErrAddPairPeer
}
ps.peers[p.id] = p
return nil
@ -414,4 +428,4 @@ func (ps *peerSet) Close() {
p.Disconnect(p2p.DiscQuitting)
}
ps.closed = true
}
}

View file

@ -266,7 +266,10 @@ func (s *dialstate) checkDial(n *discover.Node, peers map[discover.NodeID]*Peer)
case dialing:
return errAlreadyDialing
case peers[n.ID] != nil:
return errAlreadyConnected
exitPeer := peers[n.ID]
if exitPeer.PairPeer != nil {
return errAlreadyConnected
}
case s.ntab != nil && n.ID == s.ntab.Self().ID:
return errSelf
case s.netrestrict != nil && !s.netrestrict.Contains(n.IP):
@ -300,10 +303,26 @@ func (t *dialTask) Do(srv *Server) {
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
err = t.dial(srv, t.dest)
}
}
}
if err == nil {
err = t.dial(srv, t.dest)
if err != nil {
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
err = t.dial(srv, t.dest)
}
}
}
if err == nil {
log.Trace("Dial pair connection sucess", "task", t.dest)
} else {
log.Trace("Dial pair connection error", "task", t.dest, "err", err)
}
}
}
// resolve attempts to find the current endpoint for the destination
@ -431,4 +450,4 @@ func (h *dialHistory) Pop() interface{} {
x := old[n-1]
*h = old[0 : n-1]
return x
}
}

View file

@ -116,9 +116,9 @@ func TestDialStateDynDial(t *testing.T) {
}},
},
new: []task{
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
},
// Some of the dials complete but no new ones are launched yet because
@ -164,9 +164,7 @@ func TestDialStateDynDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(4)}},
{rw: &conn{flags: dynDialedConn, id: uintID(5)}},
},
new: []task{
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}},
},
new: []task{},
},
// More peers (3,4) drop off and dial for ID 6 completes.
// The last query result from the discovery lookup is reused
@ -181,8 +179,8 @@ func TestDialStateDynDial(t *testing.T) {
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}},
},
new: []task{
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(7)}},
&discoverTask{},
},
},
// Peer 7 is connected, but there still aren't enough dynamic peers
@ -212,7 +210,7 @@ func TestDialStateDynDial(t *testing.T) {
&discoverTask{},
},
new: []task{
&discoverTask{},
&waitExpireTask{Duration: 14 * time.Second},
},
},
},
@ -302,6 +300,9 @@ func TestDialStateDynDialBootnode(t *testing.T) {
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
new: []task{
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}},
},
},
},
})
@ -351,10 +352,11 @@ func TestDialStateDynDialFromTable(t *testing.T) {
}},
},
new: []task{
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(1)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(10)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}},
&discoverTask{},
},
},
// Dialing nodes 3,4,5 fails. The dials from the lookup succeed.
@ -374,6 +376,9 @@ func TestDialStateDynDialFromTable(t *testing.T) {
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}},
&dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}},
},
new: []task{
&discoverTask{},
},
},
// Waiting for expiry. No waitExpireTask is launched because the
// discovery query is still running.
@ -453,6 +458,8 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(2)}},
},
new: []task{
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}},
@ -466,6 +473,9 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: dynDialedConn, id: uintID(2)}},
{rw: &conn{flags: staticDialedConn, id: uintID(3)}},
},
new: []task{
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
done: []task{
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
@ -485,7 +495,8 @@ func TestDialStateStaticDial(t *testing.T) {
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
new: []task{
&waitExpireTask{Duration: 14 * time.Second},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}},
},
},
// Wait a round for dial history to expire, no new tasks should spawn.
@ -506,10 +517,7 @@ func TestDialStateStaticDial(t *testing.T) {
{rw: &conn{flags: staticDialedConn, id: uintID(3)}},
{rw: &conn{flags: staticDialedConn, id: uintID(5)}},
},
new: []task{
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}},
},
new: []task{},
},
},
})
@ -542,7 +550,8 @@ func TestDialStaticAfterReset(t *testing.T) {
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
},
new: []task{
&waitExpireTask{Duration: 30 * time.Second},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
},
},
}
@ -554,7 +563,9 @@ func TestDialStaticAfterReset(t *testing.T) {
for _, n := range wantStatic {
dTest.init.removeStatic(n)
dTest.init.addStatic(n)
delete(dTest.init.dialing, n.ID)
}
// without removing peers they will be considered recently dialed
runDialTest(t, dTest)
}
@ -591,6 +602,10 @@ func TestDialStateCache(t *testing.T) {
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
},
new: []task{
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}},
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}},
},
},
// A salvage task is launched to wait for node 3's history
// entry to expire.
@ -602,9 +617,6 @@ func TestDialStateCache(t *testing.T) {
done: []task{
&dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}},
},
new: []task{
&waitExpireTask{Duration: 14 * time.Second},
},
},
// Still waiting for node 3's entry to expire in the cache.
{
@ -693,4 +705,4 @@ func (t *resolveMock) Self() *discover.Node { return new(dis
func (t *resolveMock) Close() {}
func (t *resolveMock) Bootstrap([]*discover.Node) {}
func (t *resolveMock) Lookup(discover.NodeID) []*discover.Node { return nil }
func (t *resolveMock) ReadRandomNodes(buf []*discover.Node) int { return 0 }
func (t *resolveMock) ReadRandomNodes(buf []*discover.Node) int { return 0 }

View file

@ -108,7 +108,8 @@ type Peer struct {
disc chan DiscReason
// events receives message send / receive events if set
events *event.Feed
events *event.Feed
PairPeer *Peer
}
// NewPeer returns a peer for testing purposes.
@ -157,7 +158,7 @@ func (p *Peer) Disconnect(reason DiscReason) {
// String implements fmt.Stringer.
func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
return fmt.Sprintf("Peer %x %v ", p.rw.id[:8], p.RemoteAddr())
}
// Inbound returns true if the peer is an inbound connection
@ -225,10 +226,12 @@ loop:
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
if p.PairPeer != nil {
go func() { p.PairPeer.Disconnect(DiscPairPeerStop) }()
}
return remoteRequested, err
}
@ -345,6 +348,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error)
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() {
err := proto.Run(p, rw)
if err == nil {
@ -459,4 +463,4 @@ func (p *Peer) Info() *PeerInfo {
info.Protocols[proto.Name] = protoInfo
}
return info
}
}

View file

@ -54,6 +54,8 @@ func (self *peerError) Error() string {
var errProtocolReturned = errors.New("protocol returned")
var ErrAddPairPeer = errors.New("add a pair peer")
type DiscReason uint
const (
@ -69,6 +71,7 @@ const (
DiscUnexpectedIdentity
DiscSelf
DiscReadTimeout
DiscPairPeerStop
DiscSubprotocolError = 0x10
)
@ -85,6 +88,7 @@ var discReasonToString = [...]string{
DiscUnexpectedIdentity: "unexpected identity",
DiscSelf: "connected to self",
DiscReadTimeout: "read timeout",
DiscPairPeerStop: "pair peer connection stop",
DiscSubprotocolError: "subprotocol error",
}
@ -116,4 +120,4 @@ func discReasonForError(err error) DiscReason {
}
}
return DiscSubprotocolError
}
}

View file

@ -122,6 +122,7 @@ func (t *rlpx) close(err error) {
}
func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
// Writing our handshake happens concurrently, we prefer
// returning the handshake read error. If the remote side
// disconnects us early with a valid reason, we should return it

View file

@ -286,6 +286,7 @@ func (srv *Server) PeerCount() int {
// server is shut down. If the connection fails for any reason, the server will
// attempt to reconnect the peer.
func (srv *Server) AddPeer(node *discover.Node) {
select {
case srv.addstatic <- node:
case <-srv.quit:
@ -642,9 +643,15 @@ running:
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)
peers[c.id] = p
if peers[c.id] != nil {
peers[c.id].PairPeer = p
srv.log.Debug("Adding p2p pair peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
} else {
peers[c.id] = p
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
}
if p.Inbound() {
inboundCount++
}
@ -708,7 +715,11 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, inboundCo
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.id] != nil:
return DiscAlreadyConnected
exitPeer := peers[c.id]
if exitPeer.PairPeer != nil {
return DiscAlreadyConnected
}
return nil
case c.id == srv.Self().ID:
return DiscSelf
default:

View file

@ -153,7 +153,6 @@ func TestServerDial(t *testing.T) {
select {
case conn := <-accepted:
defer conn.Close()
select {
case peer := <-connected:
if peer.ID() != remid {
@ -174,6 +173,21 @@ func TestServerDial(t *testing.T) {
t.Error("server did not launch peer within one second")
}
select {
case peer := <-connected:
if peer.ID() != remid {
t.Errorf("peer has wrong id")
}
if peer.Name() != "test" {
t.Errorf("peer has wrong name")
}
if peer.RemoteAddr().String() != conn.LocalAddr().String() {
t.Errorf("peer started with wrong conn: got %v, want %v",
peer.RemoteAddr(), conn.LocalAddr())
}
case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second")
}
case <-time.After(1 * time.Second):
t.Error("server did not connect within one second")
}
@ -501,4 +515,4 @@ func randomID() (id discover.NodeID) {
id[i] = byte(rand.Intn(255))
}
return id
}
}

View file

@ -91,11 +91,12 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
}
simNode := &SimNode{
ID: id,
config: config,
node: n,
adapter: s,
running: make(map[string]node.Service),
ID: id,
config: config,
node: n,
adapter: s,
running: make(map[string]node.Service),
connected: make(map[discover.NodeID]bool),
}
s.nodes[id] = simNode
return simNode, nil
@ -108,12 +109,16 @@ func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) {
if !ok {
return nil, fmt.Errorf("unknown node: %s", dest.ID)
}
if node.connected[dest.ID] {
return nil, fmt.Errorf("dialed node: %s", dest.ID)
}
srv := node.Server()
if srv == nil {
return nil, fmt.Errorf("node not running: %s", dest.ID)
}
pipe1, pipe2 := net.Pipe()
go srv.SetupConn(pipe1, 0, nil)
node.connected[dest.ID] = true
return pipe2, nil
}
@ -151,6 +156,7 @@ type SimNode struct {
running map[string]node.Service
client *rpc.Client
registerOnce sync.Once
connected map[discover.NodeID]bool
}
// Addr returns the node's discovery address
@ -313,4 +319,4 @@ func (self *SimNode) NodeInfo() *p2p.NodeInfo {
}
}
return server.NodeInfo()
}
}