Merge pull request #726 from gzliudan/fix-sa1019-node

upgrade package node
This commit is contained in:
Daniel Liu 2024-11-04 13:26:34 +08:00 committed by GitHub
commit dd892c0b82
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 154 additions and 125 deletions

View file

@ -147,7 +147,9 @@ var (
utils.RPCGlobalGasCapFlag,
utils.RPCListenAddrFlag,
utils.RPCPortFlag,
utils.RPCHttpReadTimeoutFlag,
utils.RPCHttpWriteTimeoutFlag,
utils.RPCHttpIdleTimeoutFlag,
utils.RPCApiFlag,
utils.WSEnabledFlag,
utils.WSListenAddrFlag,

View file

@ -147,7 +147,9 @@ var AppHelpFlagGroups = []flagGroup{
utils.RPCGlobalGasCapFlag,
utils.RPCListenAddrFlag,
utils.RPCPortFlag,
utils.RPCHttpReadTimeoutFlag,
utils.RPCHttpWriteTimeoutFlag,
utils.RPCHttpIdleTimeoutFlag,
utils.RPCApiFlag,
utils.WSEnabledFlag,
utils.WSListenAddrFlag,

View file

@ -435,10 +435,20 @@ var (
Usage: "HTTP-RPC server listening port",
Value: node.DefaultHTTPPort,
}
RPCHttpReadTimeoutFlag = cli.DurationFlag{
Name: "rpcreadtimeout",
Usage: "HTTP-RPC server read timeout",
Value: rpc.DefaultHTTPTimeouts.ReadTimeout,
}
RPCHttpWriteTimeoutFlag = cli.DurationFlag{
Name: "rpcwritetimeout",
Usage: "HTTP-RPC server write timeout (default = 10s)",
Value: node.DefaultHTTPWriteTimeOut,
Usage: "HTTP-RPC server write timeout",
Value: rpc.DefaultHTTPTimeouts.WriteTimeout,
}
RPCHttpIdleTimeoutFlag = cli.DurationFlag{
Name: "rpcidletimeout",
Usage: "HTTP-RPC server idle timeout",
Value: rpc.DefaultHTTPTimeouts.IdleTimeout,
}
RPCCORSDomainFlag = cli.StringFlag{
Name: "rpccorsdomain",
@ -779,8 +789,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(RPCPortFlag.Name) {
cfg.HTTPPort = ctx.GlobalInt(RPCPortFlag.Name)
}
if ctx.GlobalIsSet(RPCHttpReadTimeoutFlag.Name) {
cfg.HTTPTimeouts.ReadTimeout = ctx.GlobalDuration(RPCHttpReadTimeoutFlag.Name)
}
if ctx.GlobalIsSet(RPCHttpWriteTimeoutFlag.Name) {
cfg.HTTPWriteTimeout = ctx.GlobalDuration(RPCHttpWriteTimeoutFlag.Name)
cfg.HTTPTimeouts.WriteTimeout = ctx.GlobalDuration(RPCHttpWriteTimeoutFlag.Name)
}
if ctx.GlobalIsSet(RPCHttpIdleTimeoutFlag.Name) {
cfg.HTTPTimeouts.IdleTimeout = ctx.GlobalDuration(RPCHttpIdleTimeoutFlag.Name)
}
if ctx.GlobalIsSet(RPCCORSDomainFlag.Name) {
cfg.HTTPCors = splitAndTrim(ctx.GlobalString(RPCCORSDomainFlag.Name))

View file

@ -158,7 +158,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
}
}
if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts); err != nil {
if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts); err != nil {
return false, err
}
return true, nil

View file

@ -23,7 +23,6 @@ import (
"path/filepath"
"runtime"
"strings"
"time"
"github.com/XinFinOrg/XDPoSChain/accounts"
"github.com/XinFinOrg/XDPoSChain/accounts/keystore"
@ -33,6 +32,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
"github.com/XinFinOrg/XDPoSChain/rpc"
)
const (
@ -100,9 +100,6 @@ type Config struct {
// for ephemeral nodes).
HTTPPort int `toml:",omitempty"`
// HTTPWriteTimeout is the write timeout for the HTTP RPC server.
HTTPWriteTimeout time.Duration `toml:",omitempty"`
// HTTPCors is the Cross-Origin Resource Sharing header to send to requesting
// clients. Please be aware that CORS is a browser enforced security, it's fully
// useless for custom HTTP clients.
@ -122,6 +119,10 @@ type Config struct {
// exposed.
HTTPModules []string `toml:",omitempty"`
// HTTPTimeouts allows for customization of the timeout values used by the HTTP RPC
// interface.
HTTPTimeouts rpc.HTTPTimeouts
// WSHost is the host interface on which to start the websocket RPC server. If
// this field is empty, no websocket API endpoint will be started.
WSHost string `toml:",omitempty"`

View file

@ -21,27 +21,26 @@ import (
"os/user"
"path/filepath"
"runtime"
"time"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
"github.com/XinFinOrg/XDPoSChain/rpc"
)
const (
DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server
DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server
DefaultHTTPWriteTimeOut = 10 * time.Second // Default write timeout for the HTTP RPC server
DefaultWSHost = "localhost" // Default host interface for the websocket RPC server
DefaultWSPort = 8546 // Default TCP port for the websocket RPC server
DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server
DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server
DefaultWSHost = "localhost" // Default host interface for the websocket RPC server
DefaultWSPort = 8546 // Default TCP port for the websocket RPC server
)
// DefaultConfig contains reasonable default settings.
var DefaultConfig = Config{
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
HTTPWriteTimeout: DefaultHTTPWriteTimeOut,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
P2P: p2p.Config{

View file

@ -140,7 +140,7 @@ func (n *Node) Register(constructor ServiceConstructor) error {
return nil
}
// Start create a live P2P node and starts running it.
// Start creates a live P2P node and starts running it.
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
@ -203,7 +203,7 @@ func (n *Node) Start() error {
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
var started []reflect.Type
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
@ -217,7 +217,7 @@ func (n *Node) Start() error {
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
// Lastly, start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
@ -252,7 +252,7 @@ func (n *Node) openDataDir() error {
return nil
}
// startRPC is a helper method to start all the various RPC endpoint during node
// startRPC is a helper method to start all the various RPC endpoints during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
func (n *Node) startRPC(services map[reflect.Type]Service) error {
@ -269,7 +269,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts); err != nil {
n.stopIPC()
n.stopInProc()
return err
@ -293,7 +293,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
n.log.Debug("InProc registered", "namespace", api.Namespace)
}
n.inprocHandler = handler
return nil
@ -320,51 +320,16 @@ func (n *Node) RegisterAPIs(apis []rpc.API) {
// startIPC initializes and starts the IPC RPC endpoint.
func (n *Node) startIPC(apis []rpc.API) error {
// Short circuit if the IPC endpoint isn't being exposed
if n.ipcEndpoint == "" {
return nil
return nil // IPC disabled.
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace)
}
// All APIs registered, start the IPC listener
var (
listener net.Listener
err error
)
if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil {
listener, handler, err := rpc.StartIPCEndpoint(n.ipcEndpoint, apis)
if err != nil {
return err
}
go func() {
n.log.Info("IPC endpoint opened", "url", n.ipcEndpoint)
for {
conn, err := listener.Accept()
if err != nil {
// Terminate if the listener was closed
n.lock.RLock()
closed := n.ipcListener == nil
n.lock.RUnlock()
if closed {
return
}
// Not closed, just some error; report and continue
n.log.Error("IPC accept failed", "err", err)
continue
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
go handler.ServeCodec(rpc.NewCodec(conn), 0)
}
}()
// All listeners booted successfully
n.ipcListener = listener
n.ipcHandler = handler
log.Info("IPC endpoint opened", "url", n.ipcEndpoint)
return nil
}
@ -374,7 +339,7 @@ func (n *Node) stopIPC() {
n.ipcListener.Close()
n.ipcListener = nil
n.log.Info("IPC endpoint closed", "endpoint", n.ipcEndpoint)
n.log.Info("IPC endpoint closed", "url", n.ipcEndpoint)
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
@ -383,36 +348,18 @@ func (n *Node) stopIPC() {
}
// startHTTP initializes and starts the HTTP RPC endpoint.
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error {
// Short circuit if the HTTP endpoint isn't being exposed
if endpoint == "" {
return nil
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("HTTP registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts)
if err != nil {
return err
}
go rpc.NewHTTPServer(cors, vhosts, handler, n.config.HTTPWriteTimeout).Serve(listener)
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", listener.Addr()),
"cors", strings.Join(cors, ","),
"vhosts", strings.Join(vhosts, ","))
// All listeners booted successfully
n.httpEndpoint = endpoint
n.httpListener = listener
@ -424,10 +371,10 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
// stopHTTP terminates the HTTP RPC endpoint.
func (n *Node) stopHTTP() {
if n.httpListener != nil {
url := fmt.Sprintf("http://%v/", n.httpListener.Addr())
n.httpListener.Close()
n.httpListener = nil
n.log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", n.httpEndpoint))
n.log.Info("HTTP endpoint closed", "url", url)
}
if n.httpHandler != nil {
n.httpHandler.Stop()
@ -441,32 +388,11 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if endpoint == "" {
return nil
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)
if err != nil {
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
n.wsListener = listener
@ -645,11 +571,23 @@ func (n *Node) IPCEndpoint() string {
// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack.
func (n *Node) HTTPEndpoint() string {
n.lock.Lock()
defer n.lock.Unlock()
if n.httpListener != nil {
return n.httpListener.Addr().String()
}
return n.httpEndpoint
}
// WSEndpoint retrieves the current WS endpoint used by the protocol stack.
func (n *Node) WSEndpoint() string {
n.lock.Lock()
defer n.lock.Unlock()
if n.wsListener != nil {
return n.wsListener.Addr().String()
}
return n.wsEndpoint
}

View file

@ -23,6 +23,64 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
)
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules
func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string, timeouts HTTPTimeouts) (net.Listener, *Server, error) {
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := NewServer()
for _, api := range apis {
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("HTTP registered", "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
go NewHTTPServer(cors, vhosts, timeouts, handler).Serve(listener)
return listener, handler, err
}
// StartWSEndpoint starts a websocket endpoint
func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) {
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := NewServer()
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
go NewWSServer(wsOrigins, handler).Serve(listener)
return listener, handler, err
}
// StartIPCEndpoint starts an IPC endpoint.
func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) {
// Register all the APIs exposed by the services.
@ -36,6 +94,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, er
log.Info("IPC registration failed", "namespace", api.Namespace, "error", err)
return nil, nil, err
}
log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace)
if _, ok := regMap[api.Namespace]; !ok {
registered = append(registered, api.Namespace)
regMap[api.Namespace] = struct{}{}

View file

@ -240,17 +240,35 @@ func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil }
// NewHTTPServer creates a new HTTP RPC server around an API provider.
//
// Deprecated: Server implements http.Handler
func NewHTTPServer(cors []string, vhosts []string, srv *Server, writeTimeout time.Duration) *http.Server {
func NewHTTPServer(cors []string, vhosts []string, timeouts HTTPTimeouts, srv *Server) *http.Server {
// Wrap the CORS-handler within a host-handler
handler := newCorsHandler(srv, cors)
handler = newVHostHandler(vhosts, handler)
handler = http.TimeoutHandler(handler, writeTimeout, `{"error":"http server timeout"}`)
log.Info("NewHTTPServer", "writeTimeout", writeTimeout)
// Make sure timeout values are meaningful
if timeouts.ReadTimeout < time.Second {
log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", DefaultHTTPTimeouts.ReadTimeout)
timeouts.ReadTimeout = DefaultHTTPTimeouts.ReadTimeout
}
if timeouts.WriteTimeout < time.Second {
log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", DefaultHTTPTimeouts.WriteTimeout)
timeouts.WriteTimeout = DefaultHTTPTimeouts.WriteTimeout
}
if timeouts.IdleTimeout < time.Second {
log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", DefaultHTTPTimeouts.IdleTimeout)
timeouts.IdleTimeout = DefaultHTTPTimeouts.IdleTimeout
}
// PR #469: return http code 503 and error message to client when timeout
handler = http.TimeoutHandler(handler, timeouts.WriteTimeout, `{"error":"http server timeout"}`)
log.Info("NewHTTPServer", "writeTimeout", timeouts.WriteTimeout)
// Bundle and start the HTTP server
return &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Second,
WriteTimeout: writeTimeout + time.Second,
IdleTimeout: 120 * time.Second,
ReadTimeout: timeouts.ReadTimeout,
WriteTimeout: timeouts.WriteTimeout + time.Second,
IdleTimeout: timeouts.IdleTimeout,
}
}

View file

@ -24,23 +24,17 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
)
// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
// Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint)
}
// ServeListener accepts connections on l, serving JSON-RPC on them.
// ServeListener accepts connections on l, serving IPC-RPC on them.
func (s *Server) ServeListener(l net.Listener) error {
for {
conn, err := l.Accept()
if netutil.IsTemporaryError(err) {
log.Warn("RPC accept error", "err", err)
log.Warn("IPC accept error", "err", err)
continue
} else if err != nil {
return err
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
log.Trace("IPC accepted connection")
go s.ServeCodec(NewCodec(conn), 0)
}
}