diff --git a/rpc/client.go b/rpc/client.go index dc1866dfc3..e0d4fa4218 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -78,7 +78,7 @@ type Client struct { isHTTP bool services *serviceRegistry - idCounter uint32 + idCounter atomic.Uint32 // This function, if non-nil, is called when the connection is lost. reconnectFunc reconnectFunc @@ -235,7 +235,7 @@ func (c *Client) RegisterName(name string, receiver interface{}) error { } func (c *Client) nextID() json.RawMessage { - id := atomic.AddUint32(&c.idCounter, 1) + id := c.idCounter.Add(1) return strconv.AppendUint(nil, uint64(id), 10) } diff --git a/rpc/server.go b/rpc/server.go index a49132423b..ab4d8ac83b 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -48,7 +48,7 @@ type Server struct { mutex sync.Mutex codecs map[ServerCodec]struct{} - run int32 + run atomic.Bool } // NewServer creates a new server instance with no registered handlers. @@ -56,8 +56,8 @@ func NewServer() *Server { server := &Server{ idgen: randomIDGenerator(), codecs: make(map[ServerCodec]struct{}), - run: 1, } + server.run.Store(true) // Register the default service providing meta information about the RPC service such // as the services and methods it offers. rpcService := &RPCService{server} @@ -95,7 +95,7 @@ func (s *Server) trackCodec(codec ServerCodec) bool { s.mutex.Lock() defer s.mutex.Unlock() - if atomic.LoadInt32(&s.run) == 0 { + if !s.run.Load() { return false // Don't serve if server is stopped. } s.codecs[codec] = struct{}{} @@ -114,7 +114,7 @@ func (s *Server) untrackCodec(codec ServerCodec) { // this mode. func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { + if !s.run.Load() { return } @@ -144,7 +144,7 @@ func (s *Server) Stop() { s.mutex.Lock() defer s.mutex.Unlock() - if atomic.CompareAndSwapInt32(&s.run, 1, 0) { + if s.run.CompareAndSwap(true, false) { log.Debug("RPC server shutting down") for codec := range s.codecs { codec.close()