Skip to content

Commit cfd7483

Browse files
authored
Expose underlying gRPC client and server objects (dapr#311)
* Expose underlying gRPC client and server objects Fixes dapr#204 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Fixed reported race conditions Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Support for Go 1.17 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
1 parent f182441 commit cfd7483

File tree

2 files changed

+37
-11
lines changed

2 files changed

+37
-11
lines changed

client/client.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,23 +300,21 @@ type GRPCClient struct {
300300
ctxCancelFunc context.CancelFunc
301301
protoClient pb.DaprClient
302302
authToken string
303-
mux sync.Mutex
304303
}
305304

306305
// Close cleans up all resources created by the client.
307306
func (c *GRPCClient) Close() {
308307
c.ctxCancelFunc()
309308
if c.connection != nil {
310309
c.connection.Close()
310+
c.connection = nil
311311
}
312312
}
313313

314314
// WithAuthToken sets Dapr API token on the instantiated client.
315315
// Allows empty string to reset token on existing client.
316316
func (c *GRPCClient) WithAuthToken(token string) {
317-
c.mux.Lock()
318317
c.authToken = token
319-
c.mux.Unlock()
320318
}
321319

322320
// WithTraceID adds existing trace ID to the outgoing context.
@@ -349,3 +347,8 @@ func (c *GRPCClient) Shutdown(ctx context.Context) error {
349347
func (c *GRPCClient) GrpcClient() pb.DaprClient {
350348
return c.protoClient
351349
}
350+
351+
// GrpcClientConn returns the grpc.ClientConn object used by this client.
352+
func (c *GRPCClient) GrpcClientConn() *grpc.ClientConn {
353+
return c.connection
354+
}

service/grpc/service.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package grpc
1616
import (
1717
"net"
1818
"os"
19+
"sync/atomic"
1920

2021
"github.com/pkg/errors"
2122
"google.golang.org/grpc"
@@ -48,13 +49,20 @@ func NewServiceWithListener(lis net.Listener) common.Service {
4849
}
4950

5051
func newService(lis net.Listener) *Server {
51-
return &Server{
52+
s := &Server{
5253
listener: lis,
5354
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
5455
topicRegistrar: make(internal.TopicRegistrar),
5556
bindingHandlers: make(map[string]common.BindingInvocationHandler),
5657
authToken: os.Getenv(common.AppAPITokenEnvVar),
5758
}
59+
60+
gs := grpc.NewServer()
61+
pb.RegisterAppCallbackServer(gs, s)
62+
pb.RegisterAppCallbackHealthCheckServer(gs, s)
63+
s.grpcServer = gs
64+
65+
return s
5866
}
5967

6068
// Server is the gRPC service implementation for Dapr.
@@ -68,6 +76,7 @@ type Server struct {
6876
healthCheckHandler common.HealthCheckHandler
6977
authToken string
7078
grpcServer *grpc.Server
79+
started uint32
7180
}
7281

7382
func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) {
@@ -76,19 +85,33 @@ func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option
7685

7786
// Start registers the server and starts it.
7887
func (s *Server) Start() error {
79-
gs := grpc.NewServer()
80-
pb.RegisterAppCallbackServer(gs, s)
81-
pb.RegisterAppCallbackHealthCheckServer(gs, s)
82-
s.grpcServer = gs
83-
return gs.Serve(s.listener)
88+
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
89+
return errors.New("a gRPC server can only be started once")
90+
}
91+
return s.grpcServer.Serve(s.listener)
8492
}
8593

86-
// Stop stops the previously started service.
94+
// Stop stops the previously-started service.
8795
func (s *Server) Stop() error {
88-
return s.listener.Close()
96+
if atomic.LoadUint32(&s.started) == 0 {
97+
return nil
98+
}
99+
s.grpcServer.Stop()
100+
s.grpcServer = nil
101+
return nil
89102
}
90103

104+
// GrecefulStop stops the previously-started service gracefully.
91105
func (s *Server) GracefulStop() error {
106+
if atomic.LoadUint32(&s.started) == 0 {
107+
return nil
108+
}
92109
s.grpcServer.GracefulStop()
110+
s.grpcServer = nil
93111
return nil
94112
}
113+
114+
// GrpcServer returns the grpc.Server object managed by the server.
115+
func (s *Server) GrpcServer() *grpc.Server {
116+
return s.grpcServer
117+
}

0 commit comments

Comments
 (0)