diff --git a/eth/handler.go b/eth/handler.go index e33bf28175..46fd8db368 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -271,38 +271,40 @@ func (pm *ProtocolManager) handle(p *peer) error { rw.Init(p.version) } // Register the peer locally - if err := pm.peers.Register(p); err != nil { + err := pm.peers.Register(p) + if err != nil && err != p2p.ErrAddPairPeer { p.Log().Error("Ethereum peer registration failed", "err", err) return err } defer pm.removePeer(p.id) - - // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { - return err - } - // Propagate existing transactions. new transactions appearing - // after this will be sent via broadcasts. - pm.syncTransactions(p) - - // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork - if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { - // Request the peer's DAO fork header for extra-data validation - if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil { + if err != p2p.ErrAddPairPeer { + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } - // Start a timer to disconnect if the peer doesn't reply in time - p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { - p.Log().Debug("Timed out DAO fork-check, dropping") - pm.removePeer(p.id) - }) - // Make sure it's cleaned up if the peer dies off - defer func() { - if p.forkDrop != nil { - p.forkDrop.Stop() - p.forkDrop = nil + // Propagate existing transactions. new transactions appearing + // after this will be sent via broadcasts. + pm.syncTransactions(p) + + // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork + if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { + // Request the peer's DAO fork header for extra-data validation + if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil { + return err } - }() + // Start a timer to disconnect if the peer doesn't reply in time + p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { + p.Log().Debug("Timed out DAO fork-check, dropping") + pm.removePeer(p.id) + }) + // Make sure it's cleaned up if the peer dies off + defer func() { + if p.forkDrop != nil { + p.forkDrop.Stop() + p.forkDrop = nil + } + }() + } } // main loop. handle incoming messages. for { diff --git a/eth/peer.go b/eth/peer.go index 42ead53965..8fda1b13cd 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" @@ -65,6 +66,7 @@ type peer struct { knownTxs *set.Set // Set of transaction hashes known to be known by this peer knownBlocks *set.Set // Set of block hashes known to be known by this peer + pairRw p2p.MsgReadWriter } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -156,7 +158,13 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Add(block.Hash()) - return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) + if p.pairRw != nil { + log.Trace("p2p SendNewBlock with pairRw", "p", p, "number", block.NumberU64()) + return p2p.Send(p.pairRw, NewBlockMsg, []interface{}{block, td}) + } else { + return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) + } + } // SendBlockHeaders sends a batch of block headers to the remote peer. @@ -321,8 +329,14 @@ func (ps *peerSet) Register(p *peer) error { if ps.closed { return errClosed } - if _, ok := ps.peers[p.id]; ok { - return errAlreadyRegistered + if exitPeer, ok := ps.peers[p.id]; ok { + if exitPeer.pairRw != nil { + return errAlreadyRegistered + } + exitPeer.PairPeer = p.Peer + exitPeer.pairRw = p.rw + p.PairPeer = exitPeer.Peer + return p2p.ErrAddPairPeer } ps.peers[p.id] = p return nil @@ -414,4 +428,4 @@ func (ps *peerSet) Close() { p.Disconnect(p2p.DiscQuitting) } ps.closed = true -} +} \ No newline at end of file diff --git a/p2p/dial.go b/p2p/dial.go index d8feceb9f3..f65935273c 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -266,7 +266,10 @@ func (s *dialstate) checkDial(n *discover.Node, peers map[discover.NodeID]*Peer) case dialing: return errAlreadyDialing case peers[n.ID] != nil: - return errAlreadyConnected + exitPeer := peers[n.ID] + if exitPeer.PairPeer != nil { + return errAlreadyConnected + } case s.ntab != nil && n.ID == s.ntab.Self().ID: return errSelf case s.netrestrict != nil && !s.netrestrict.Contains(n.IP): @@ -300,10 +303,26 @@ func (t *dialTask) Do(srv *Server) { // Try resolving the ID of static nodes if dialing failed. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { if t.resolve(srv) { - t.dial(srv, t.dest) + err = t.dial(srv, t.dest) } } } + if err == nil { + err = t.dial(srv, t.dest) + if err != nil { + // Try resolving the ID of static nodes if dialing failed. + if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { + if t.resolve(srv) { + err = t.dial(srv, t.dest) + } + } + } + if err == nil { + log.Trace("Dial pair connection sucess", "task", t.dest) + } else { + log.Trace("Dial pair connection error", "task", t.dest, "err", err) + } + } } // resolve attempts to find the current endpoint for the destination @@ -431,4 +450,4 @@ func (h *dialHistory) Pop() interface{} { x := old[n-1] *h = old[0 : n-1] return x -} +} \ No newline at end of file diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 2a7941fc65..da578400f3 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -116,9 +116,9 @@ func TestDialStateDynDial(t *testing.T) { }}, }, new: []task{ + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(3)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, - &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, }, // Some of the dials complete but no new ones are launched yet because @@ -164,9 +164,7 @@ func TestDialStateDynDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(4)}}, {rw: &conn{flags: dynDialedConn, id: uintID(5)}}, }, - new: []task{ - &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}}, - }, + new: []task{}, }, // More peers (3,4) drop off and dial for ID 6 completes. // The last query result from the discovery lookup is reused @@ -181,8 +179,8 @@ func TestDialStateDynDial(t *testing.T) { &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(6)}}, }, new: []task{ + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(7)}}, - &discoverTask{}, }, }, // Peer 7 is connected, but there still aren't enough dynamic peers @@ -212,7 +210,7 @@ func TestDialStateDynDial(t *testing.T) { &discoverTask{}, }, new: []task{ - &discoverTask{}, + &waitExpireTask{Duration: 14 * time.Second}, }, }, }, @@ -302,6 +300,9 @@ func TestDialStateDynDialBootnode(t *testing.T) { &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, + new: []task{ + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(4)}}, + }, }, }, }) @@ -351,10 +352,11 @@ func TestDialStateDynDialFromTable(t *testing.T) { }}, }, new: []task{ + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(2)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(10)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}}, - &discoverTask{}, }, }, // Dialing nodes 3,4,5 fails. The dials from the lookup succeed. @@ -374,6 +376,9 @@ func TestDialStateDynDialFromTable(t *testing.T) { &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(11)}}, &dialTask{flags: dynDialedConn, dest: &discover.Node{ID: uintID(12)}}, }, + new: []task{ + &discoverTask{}, + }, }, // Waiting for expiry. No waitExpireTask is launched because the // discovery query is still running. @@ -453,6 +458,8 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, }, new: []task{ + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}}, &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}}, @@ -466,6 +473,9 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: dynDialedConn, id: uintID(2)}}, {rw: &conn{flags: staticDialedConn, id: uintID(3)}}, }, + new: []task{ + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, + }, done: []task{ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, }, @@ -485,7 +495,8 @@ func TestDialStateStaticDial(t *testing.T) { &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, new: []task{ - &waitExpireTask{Duration: 14 * time.Second}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(5)}}, }, }, // Wait a round for dial history to expire, no new tasks should spawn. @@ -506,10 +517,7 @@ func TestDialStateStaticDial(t *testing.T) { {rw: &conn{flags: staticDialedConn, id: uintID(3)}}, {rw: &conn{flags: staticDialedConn, id: uintID(5)}}, }, - new: []task{ - &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, - &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(4)}}, - }, + new: []task{}, }, }, }) @@ -542,7 +550,8 @@ func TestDialStaticAfterReset(t *testing.T) { &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, }, new: []task{ - &waitExpireTask{Duration: 30 * time.Second}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, }, }, } @@ -554,7 +563,9 @@ func TestDialStaticAfterReset(t *testing.T) { for _, n := range wantStatic { dTest.init.removeStatic(n) dTest.init.addStatic(n) + delete(dTest.init.dialing, n.ID) } + // without removing peers they will be considered recently dialed runDialTest(t, dTest) } @@ -591,6 +602,10 @@ func TestDialStateCache(t *testing.T) { &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}}, &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, }, + new: []task{ + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(1)}}, + &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(2)}}, + }, }, // A salvage task is launched to wait for node 3's history // entry to expire. @@ -602,9 +617,6 @@ func TestDialStateCache(t *testing.T) { done: []task{ &dialTask{flags: staticDialedConn, dest: &discover.Node{ID: uintID(3)}}, }, - new: []task{ - &waitExpireTask{Duration: 14 * time.Second}, - }, }, // Still waiting for node 3's entry to expire in the cache. { @@ -693,4 +705,4 @@ func (t *resolveMock) Self() *discover.Node { return new(dis func (t *resolveMock) Close() {} func (t *resolveMock) Bootstrap([]*discover.Node) {} func (t *resolveMock) Lookup(discover.NodeID) []*discover.Node { return nil } -func (t *resolveMock) ReadRandomNodes(buf []*discover.Node) int { return 0 } +func (t *resolveMock) ReadRandomNodes(buf []*discover.Node) int { return 0 } \ No newline at end of file diff --git a/p2p/peer.go b/p2p/peer.go index 477d8c2190..b6508bb695 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -108,7 +108,8 @@ type Peer struct { disc chan DiscReason // events receives message send / receive events if set - events *event.Feed + events *event.Feed + PairPeer *Peer } // NewPeer returns a peer for testing purposes. @@ -157,7 +158,7 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr()) + return fmt.Sprintf("Peer %x %v ", p.rw.id[:8], p.RemoteAddr()) } // Inbound returns true if the peer is an inbound connection @@ -225,10 +226,12 @@ loop: break loop } } - close(p.closed) p.rw.close(reason) p.wg.Wait() + if p.PairPeer != nil { + go func() { p.PairPeer.Disconnect(DiscPairPeerStop) }() + } return remoteRequested, err } @@ -345,6 +348,7 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) + go func() { err := proto.Run(p, rw) if err == nil { @@ -459,4 +463,4 @@ func (p *Peer) Info() *PeerInfo { info.Protocols[proto.Name] = protoInfo } return info -} +} \ No newline at end of file diff --git a/p2p/peer_error.go b/p2p/peer_error.go index a1cddb707b..1f7869574e 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -54,6 +54,8 @@ func (self *peerError) Error() string { var errProtocolReturned = errors.New("protocol returned") +var ErrAddPairPeer = errors.New("add a pair peer") + type DiscReason uint const ( @@ -69,6 +71,7 @@ const ( DiscUnexpectedIdentity DiscSelf DiscReadTimeout + DiscPairPeerStop DiscSubprotocolError = 0x10 ) @@ -85,6 +88,7 @@ var discReasonToString = [...]string{ DiscUnexpectedIdentity: "unexpected identity", DiscSelf: "connected to self", DiscReadTimeout: "read timeout", + DiscPairPeerStop: "pair peer connection stop", DiscSubprotocolError: "subprotocol error", } @@ -116,4 +120,4 @@ func discReasonForError(err error) DiscReason { } } return DiscSubprotocolError -} +} \ No newline at end of file diff --git a/p2p/rlpx.go b/p2p/rlpx.go index a320e81e7c..037f45d89a 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -122,6 +122,7 @@ func (t *rlpx) close(err error) { } func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) { + // Writing our handshake happens concurrently, we prefer // returning the handshake read error. If the remote side // disconnects us early with a valid reason, we should return it diff --git a/p2p/server.go b/p2p/server.go index db80a5a659..4b1640b75f 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -286,6 +286,7 @@ func (srv *Server) PeerCount() int { // server is shut down. If the connection fails for any reason, the server will // attempt to reconnect the peer. func (srv *Server) AddPeer(node *discover.Node) { + select { case srv.addstatic <- node: case <-srv.quit: @@ -642,9 +643,15 @@ running: p.events = &srv.peerFeed } name := truncateName(c.name) - srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) + go srv.runPeer(p) - peers[c.id] = p + if peers[c.id] != nil { + peers[c.id].PairPeer = p + srv.log.Debug("Adding p2p pair peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) + } else { + peers[c.id] = p + srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) + } if p.Inbound() { inboundCount++ } @@ -708,7 +715,11 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, inboundCo case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.id] != nil: - return DiscAlreadyConnected + exitPeer := peers[c.id] + if exitPeer.PairPeer != nil { + return DiscAlreadyConnected + } + return nil case c.id == srv.Self().ID: return DiscSelf default: diff --git a/p2p/server_test.go b/p2p/server_test.go index 10c36528eb..359c2b5a7d 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -153,7 +153,6 @@ func TestServerDial(t *testing.T) { select { case conn := <-accepted: defer conn.Close() - select { case peer := <-connected: if peer.ID() != remid { @@ -174,6 +173,21 @@ func TestServerDial(t *testing.T) { t.Error("server did not launch peer within one second") } + select { + case peer := <-connected: + if peer.ID() != remid { + t.Errorf("peer has wrong id") + } + if peer.Name() != "test" { + t.Errorf("peer has wrong name") + } + if peer.RemoteAddr().String() != conn.LocalAddr().String() { + t.Errorf("peer started with wrong conn: got %v, want %v", + peer.RemoteAddr(), conn.LocalAddr()) + } + case <-time.After(1 * time.Second): + t.Error("server did not launch peer within one second") + } case <-time.After(1 * time.Second): t.Error("server did not connect within one second") } @@ -501,4 +515,4 @@ func randomID() (id discover.NodeID) { id[i] = byte(rand.Intn(255)) } return id -} +} \ No newline at end of file diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 48d7c17301..2e831d241b 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -91,11 +91,12 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { } simNode := &SimNode{ - ID: id, - config: config, - node: n, - adapter: s, - running: make(map[string]node.Service), + ID: id, + config: config, + node: n, + adapter: s, + running: make(map[string]node.Service), + connected: make(map[discover.NodeID]bool), } s.nodes[id] = simNode return simNode, nil @@ -108,12 +109,16 @@ func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { if !ok { return nil, fmt.Errorf("unknown node: %s", dest.ID) } + if node.connected[dest.ID] { + return nil, fmt.Errorf("dialed node: %s", dest.ID) + } srv := node.Server() if srv == nil { return nil, fmt.Errorf("node not running: %s", dest.ID) } pipe1, pipe2 := net.Pipe() go srv.SetupConn(pipe1, 0, nil) + node.connected[dest.ID] = true return pipe2, nil } @@ -151,6 +156,7 @@ type SimNode struct { running map[string]node.Service client *rpc.Client registerOnce sync.Once + connected map[discover.NodeID]bool } // Addr returns the node's discovery address @@ -313,4 +319,4 @@ func (self *SimNode) NodeInfo() *p2p.NodeInfo { } } return server.NodeInfo() -} +} \ No newline at end of file