@@ -16,6 +16,7 @@ package grpc
16
16
import (
17
17
"net"
18
18
"os"
19
+ "sync/atomic"
19
20
20
21
"github.com/pkg/errors"
21
22
"google.golang.org/grpc"
@@ -48,13 +49,20 @@ func NewServiceWithListener(lis net.Listener) common.Service {
48
49
}
49
50
50
51
func newService (lis net.Listener ) * Server {
51
- return & Server {
52
+ s := & Server {
52
53
listener : lis ,
53
54
invokeHandlers : make (map [string ]common.ServiceInvocationHandler ),
54
55
topicRegistrar : make (internal.TopicRegistrar ),
55
56
bindingHandlers : make (map [string ]common.BindingInvocationHandler ),
56
57
authToken : os .Getenv (common .AppAPITokenEnvVar ),
57
58
}
59
+
60
+ gs := grpc .NewServer ()
61
+ pb .RegisterAppCallbackServer (gs , s )
62
+ pb .RegisterAppCallbackHealthCheckServer (gs , s )
63
+ s .grpcServer = gs
64
+
65
+ return s
58
66
}
59
67
60
68
// Server is the gRPC service implementation for Dapr.
@@ -68,6 +76,7 @@ type Server struct {
68
76
healthCheckHandler common.HealthCheckHandler
69
77
authToken string
70
78
grpcServer * grpc.Server
79
+ started uint32
71
80
}
72
81
73
82
func (s * Server ) RegisterActorImplFactory (f actor.Factory , opts ... config.Option ) {
@@ -76,19 +85,33 @@ func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option
76
85
77
86
// Start registers the server and starts it.
78
87
func (s * Server ) Start () error {
79
- gs := grpc .NewServer ()
80
- pb .RegisterAppCallbackServer (gs , s )
81
- pb .RegisterAppCallbackHealthCheckServer (gs , s )
82
- s .grpcServer = gs
83
- return gs .Serve (s .listener )
88
+ if ! atomic .CompareAndSwapUint32 (& s .started , 0 , 1 ) {
89
+ return errors .New ("a gRPC server can only be started once" )
90
+ }
91
+ return s .grpcServer .Serve (s .listener )
84
92
}
85
93
86
- // Stop stops the previously started service.
94
+ // Stop stops the previously- started service.
87
95
func (s * Server ) Stop () error {
88
- return s .listener .Close ()
96
+ if atomic .LoadUint32 (& s .started ) == 0 {
97
+ return nil
98
+ }
99
+ s .grpcServer .Stop ()
100
+ s .grpcServer = nil
101
+ return nil
89
102
}
90
103
104
+ // GrecefulStop stops the previously-started service gracefully.
91
105
func (s * Server ) GracefulStop () error {
106
+ if atomic .LoadUint32 (& s .started ) == 0 {
107
+ return nil
108
+ }
92
109
s .grpcServer .GracefulStop ()
110
+ s .grpcServer = nil
93
111
return nil
94
112
}
113
+
114
+ // GrpcServer returns the grpc.Server object managed by the server.
115
+ func (s * Server ) GrpcServer () * grpc.Server {
116
+ return s .grpcServer
117
+ }
0 commit comments