5
5
"net"
6
6
"strconv"
7
7
"strings"
8
+ "sync"
8
9
"time"
9
10
10
11
"github.com/golang/glog"
@@ -26,6 +27,8 @@ import (
26
27
27
28
var (
28
29
withPrometheusHistogram = false
30
+
31
+ once sync.Once
29
32
)
30
33
31
34
// serviceClient implements interface skylb-api/client/ServiceClient.
@@ -44,7 +47,6 @@ type serviceClient struct {
44
47
debugSvcEndpoints map [string ]string
45
48
46
49
resolveFullEps bool
47
- started bool
48
50
}
49
51
50
52
// Resolve resolves a service spec and returns a load balancer handle.
@@ -111,85 +113,80 @@ func (sc *serviceClient) AddUnaryInterceptor(incept grpc.UnaryClientInterceptor)
111
113
//
112
114
// Start can only be called once in the whole lifecycle of an application.
113
115
func (sc * serviceClient ) Start (callback func (spec * pb.ServiceSpec , conn * grpc.ClientConn )) {
114
- csId := sc .clientServiceId
115
- csName , err := naming .ServiceIdToName (csId )
116
-
117
- // Only be called once
118
- if sc .started {
119
- glog .Warningf ("Service client[%s] has started" , csName )
120
- return
121
- }
116
+ // Start only once.
117
+ once .Do (func () {
118
+ csId := sc .clientServiceId
119
+ csName , err := naming .ServiceIdToName (csId )
122
120
123
- glog .Infof ("Starting service client[%s] with %d service specs to resolve." ,
124
- csName , sc .skylbResolveCount )
125
-
126
- if nil != err {
127
- glog .V (1 ).Infof ("Invalid caller service id %d\n " , csId )
128
- csName = fmt .Sprintf ("!%d" , csId )
129
- }
121
+ glog .Infof ("Starting service client[%s] with %d service specs to resolve." ,
122
+ csName , sc .skylbResolveCount )
130
123
131
- if sc .skylbResolveCount > 0 {
132
- // Registers the skylb scheme to the resolver.
133
- resolver .RegisterSkylbResolverBuilder (sc .keeper )
124
+ if nil != err {
125
+ glog .V (1 ).Infof ("Invalid caller service id %d\n " , csId )
126
+ csName = fmt .Sprintf ("!%d" , csId )
127
+ }
134
128
135
- go sc .keeper .Start (csId , csName , sc .resolveFullEps )
136
- }
129
+ if sc .skylbResolveCount > 0 {
130
+ // Registers the skylb scheme to the resolver.
131
+ resolver .RegisterSkylbResolverBuilder (sc .keeper )
137
132
138
- for _ , spec := range sc .specs {
139
- specCopy := & pb.ServiceSpec {
140
- Namespace : spec .Namespace ,
141
- ServiceName : spec .ServiceName ,
142
- PortName : spec .PortName ,
133
+ go sc .keeper .Start (csId , csName , sc .resolveFullEps )
143
134
}
144
135
145
- options := sc .buildDialOptions (specCopy )
136
+ for _ , spec := range sc .specs {
137
+ specCopy := & pb.ServiceSpec {
138
+ Namespace : spec .Namespace ,
139
+ ServiceName : spec .ServiceName ,
140
+ PortName : spec .PortName ,
141
+ }
146
142
147
- var conn * grpc.ClientConn
148
- var err error
149
- for {
150
- func () {
151
- defer func () {
152
- if p := recover (); p != nil {
153
- err = fmt .Errorf ("%v" , p )
143
+ options := sc .buildDialOptions (specCopy )
144
+
145
+ var conn * grpc.ClientConn
146
+ var err error
147
+ for {
148
+ func () {
149
+ defer func () {
150
+ if p := recover (); p != nil {
151
+ err = fmt .Errorf ("%v" , p )
152
+ }
153
+ }()
154
+
155
+ var target string
156
+ if addrs , ok := sc .debugSvcEndpoints [spec .ServiceName ]; ok {
157
+ target = skyrs .DirectTarget (addrs )
158
+ } else {
159
+ target = skyrs .SkyLBTarget (spec )
154
160
}
161
+ conn , err = grpc .Dial (target , options ... )
155
162
}()
156
163
157
- var target string
158
- if addrs , ok := sc .debugSvcEndpoints [spec .ServiceName ]; ok {
159
- target = skyrs .DirectTarget (addrs )
160
- } else {
161
- target = skyrs .SkyLBTarget (spec )
164
+ if err == nil {
165
+ break
162
166
}
163
- conn , err = grpc .Dial (target , options ... )
164
- }()
165
167
166
- if err == nil {
167
- break
168
+ glog . Warningf ( "Failed to dial service %q, %v." , spec . ServiceName , err )
169
+ time . Sleep ( * flags . SkylbRetryInterval )
168
170
}
169
171
170
- glog .Warningf ("Failed to dial service %q, %v." , spec .ServiceName , err )
171
- time .Sleep (* flags .SkylbRetryInterval )
172
+ sc .conns = append (sc .conns , conn )
173
+ callback (spec , conn )
174
+ // if *cflags.EnableHealthCheck {
175
+ // closer := health.StartHealthCheck(conn, balancer, spec.ServiceName)
176
+ // if closer != nil {
177
+ // sc.healthCheckClosers = append(sc.healthCheckClosers, closer)
178
+ // }
179
+ // }
172
180
}
173
181
174
- sc .conns = append (sc .conns , conn )
175
- callback (spec , conn )
176
- // if *cflags.EnableHealthCheck {
177
- // closer := health.StartHealthCheck(conn, balancer, spec.ServiceName)
178
- // if closer != nil {
179
- // sc.healthCheckClosers = append(sc.healthCheckClosers, closer)
180
- // }
181
- // }
182
- }
183
-
184
- if withPrometheusHistogram {
185
- metrics .EnableClientHandlingTimeHistogram ()
186
- }
187
-
188
- if ! sc .failFast && sc .skylbResolveCount > 0 {
189
- sc .keeper .WaitUntilReady ()
190
- }
182
+ if withPrometheusHistogram {
183
+ metrics .EnableClientHandlingTimeHistogram ()
184
+ }
191
185
192
- sc .started = true
186
+ if ! sc .failFast && sc .skylbResolveCount > 0 {
187
+ sc .keeper .WaitUntilReady ()
188
+ }
189
+ })
193
190
}
194
191
195
192
func (sc * serviceClient ) buildDialOptions (calledSpec * pb.ServiceSpec ) []grpc.DialOption {
0 commit comments