feat(p2p): swarm POC3 #17041 (#2059)

This commit is contained in:
Daniel Liu 2026-02-28 21:30:04 +08:00 committed by GitHub
parent ad0eea0f07
commit 5b8c6ef8fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 49 additions and 39 deletions

View file

@ -79,7 +79,7 @@ func markDialError(err error) {
// meteredConn is a wrapper around a network TCP connection that meters both the
// inbound and outbound network traffic.
type meteredConn struct {
*net.TCPConn // Network connection to wrap with metering
net.Conn
}
// newMeteredConn creates a new metered connection, also bumping the ingress or
@ -90,13 +90,13 @@ func newMeteredConn(conn net.Conn) net.Conn {
if !metrics.Enabled() {
return conn
}
return &meteredConn{conn.(*net.TCPConn)}
return &meteredConn{Conn: conn}
}
// Read delegates a network read to the underlying connection, bumping the ingress
// traffic meter along the way.
func (c *meteredConn) Read(b []byte) (n int, err error) {
n, err = c.TCPConn.Read(b)
n, err = c.Conn.Read(b)
ingressTrafficMeter.Mark(int64(n))
return
}
@ -104,7 +104,7 @@ func (c *meteredConn) Read(b []byte) (n int, err error) {
// Write delegates a network write to the underlying connection, bumping the
// egress traffic meter along the way.
func (c *meteredConn) Write(b []byte) (n int, err error) {
n, err = c.TCPConn.Write(b)
n, err = c.Conn.Write(b)
egressTrafficMeter.Mark(int64(n))
return
}

View file

@ -33,6 +33,10 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
)
var (
ErrShuttingDown = errors.New("shutting down")
)
const (
baseProtocolVersion = 5
baseProtocolLength = uint64(16)
@ -408,7 +412,7 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
// as well but we don't want to rely on that.
rw.werr <- err
case <-rw.closed:
err = errors.New("shutting down")
err = ErrShuttingDown
}
return err
}

View file

@ -30,10 +30,12 @@ package protocols
import (
"context"
"fmt"
"io"
"reflect"
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/p2p"
)
@ -201,6 +203,11 @@ func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
func (p *Peer) Run(handler func(msg interface{}) error) error {
for {
if err := p.handleIncoming(handler); err != nil {
if err != io.EOF {
metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1)
log.Error("peer.handleIncoming", "err", err)
}
return err
}
}

View file

@ -32,7 +32,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/simulations/adapters"
)
var dialBanTimeout = 200 * time.Millisecond
var DialBanTimeout = 200 * time.Millisecond
// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
@ -79,41 +79,25 @@ func (net *Network) Events() *event.Feed {
return &net.events
}
// NewNode adds a new node to the network with a random ID
func (net *Network) NewNode() (*Node, error) {
conf := adapters.RandomNodeConfig()
conf.Lifecycles = []string{net.DefaultService}
return net.NewNodeWithConfig(conf)
}
// NewNodeWithConfig adds a new node to the network with the given config,
// returning an error if a node with the same ID or name already exists
func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
net.lock.Lock()
defer net.lock.Unlock()
// create a random ID and PrivateKey if not set
if conf.ID == (discover.NodeID{}) {
c := adapters.RandomNodeConfig()
conf.ID = c.ID
conf.PrivateKey = c.PrivateKey
}
id := conf.ID
if conf.Reachable == nil {
conf.Reachable = func(otherID discover.NodeID) bool {
_, err := net.InitConn(conf.ID, otherID)
return err == nil
if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
return false
}
return true
}
}
// assign a name to the node if not set
if conf.Name == "" {
conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1)
}
// check the node doesn't already exist
if node := net.getNode(id); node != nil {
return nil, fmt.Errorf("node with ID %q already exists", id)
if node := net.getNode(conf.ID); node != nil {
return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
}
if node := net.getNodeByName(conf.Name); node != nil {
return nil, fmt.Errorf("node with name %q already exists", conf.Name)
@ -133,8 +117,8 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
Node: adapterNode,
Config: conf,
}
log.Trace(fmt.Sprintf("node %v created", id))
net.nodeMap[id] = len(net.Nodes)
log.Trace(fmt.Sprintf("node %v created", conf.ID))
net.nodeMap[conf.ID] = len(net.Nodes)
net.Nodes = append(net.Nodes, node)
// emit a "control" event
@ -182,7 +166,9 @@ func (net *Network) Start(id discover.NodeID) error {
// startWithSnapshots starts the node with the given ID using the give
// snapshots
func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error {
node := net.GetNode(id)
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return fmt.Errorf("node %v does not exist", id)
}
@ -221,9 +207,13 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve
// assume the node is now down
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
log.Error("Can not find node for id", "id", id)
return
}
node.Up = false
net.lock.Unlock()
net.events.Send(NewEvent(node))
}()
for {
@ -258,7 +248,9 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve
// Stop stops the node with the given ID
func (net *Network) Stop(id discover.NodeID) error {
node := net.GetNode(id)
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return fmt.Errorf("node %v does not exist", id)
}
@ -311,7 +303,9 @@ func (net *Network) Disconnect(oneID, otherID discover.NodeID) error {
// DidConnect tracks the fact that the "one" node connected to the "other" node
func (net *Network) DidConnect(one, other discover.NodeID) error {
conn, err := net.GetOrCreateConn(one, other)
net.lock.Lock()
defer net.lock.Unlock()
conn, err := net.getOrCreateConn(one, other)
if err != nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
@ -326,7 +320,9 @@ func (net *Network) DidConnect(one, other discover.NodeID) error {
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (net *Network) DidDisconnect(one, other discover.NodeID) error {
conn := net.GetConn(one, other)
net.lock.Lock()
defer net.lock.Unlock()
conn := net.getConn(one, other)
if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
@ -334,7 +330,7 @@ func (net *Network) DidDisconnect(one, other discover.NodeID) error {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
conn.initiated = time.Now().Add(-dialBanTimeout)
conn.initiated = time.Now().Add(-DialBanTimeout)
net.events.Send(NewEvent(conn))
return nil
}
@ -475,16 +471,19 @@ func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
if err != nil {
return nil, err
}
if time.Since(conn.initiated) < dialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}
if conn.Up {
return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
}
if time.Since(conn.initiated) < DialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}
err = conn.nodesUp()
if err != nil {
log.Trace(fmt.Sprintf("nodes not up: %v", err))
return nil, fmt.Errorf("nodes not up: %v", err)
}
log.Debug("InitConn - connection initiated")
conn.initiated = time.Now()
return conn, nil
}