Skip to content

Commit d6acefb

Browse files
committed
Update
1 parent dcd16aa commit d6acefb

File tree

2 files changed

+61
-64
lines changed

2 files changed

+61
-64
lines changed

internal/skylb/client.go

Lines changed: 60 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66
"strconv"
77
"strings"
8+
"sync"
89
"time"
910

1011
"github.com/golang/glog"
@@ -26,6 +27,8 @@ import (
2627

2728
var (
2829
withPrometheusHistogram = false
30+
31+
once sync.Once
2932
)
3033

3134
// serviceClient implements interface skylb-api/client/ServiceClient.
@@ -44,7 +47,6 @@ type serviceClient struct {
4447
debugSvcEndpoints map[string]string
4548

4649
resolveFullEps bool
47-
started bool
4850
}
4951

5052
// Resolve resolves a service spec and returns a load balancer handle.
@@ -111,85 +113,80 @@ func (sc *serviceClient) AddUnaryInterceptor(incept grpc.UnaryClientInterceptor)
111113
//
112114
// Start can only be called once in the whole lifecycle of an application.
113115
func (sc *serviceClient) Start(callback func(spec *pb.ServiceSpec, conn *grpc.ClientConn)) {
114-
csId := sc.clientServiceId
115-
csName, err := naming.ServiceIdToName(csId)
116-
117-
// Only be called once
118-
if sc.started {
119-
glog.Warningf("Service client[%s] has started", csName)
120-
return
121-
}
116+
// Start only once.
117+
once.Do(func() {
118+
csId := sc.clientServiceId
119+
csName, err := naming.ServiceIdToName(csId)
122120

123-
glog.Infof("Starting service client[%s] with %d service specs to resolve.",
124-
csName, sc.skylbResolveCount)
125-
126-
if nil != err {
127-
glog.V(1).Infof("Invalid caller service id %d\n", csId)
128-
csName = fmt.Sprintf("!%d", csId)
129-
}
121+
glog.Infof("Starting service client[%s] with %d service specs to resolve.",
122+
csName, sc.skylbResolveCount)
130123

131-
if sc.skylbResolveCount > 0 {
132-
// Registers the skylb scheme to the resolver.
133-
resolver.RegisterSkylbResolverBuilder(sc.keeper)
124+
if nil != err {
125+
glog.V(1).Infof("Invalid caller service id %d\n", csId)
126+
csName = fmt.Sprintf("!%d", csId)
127+
}
134128

135-
go sc.keeper.Start(csId, csName, sc.resolveFullEps)
136-
}
129+
if sc.skylbResolveCount > 0 {
130+
// Registers the skylb scheme to the resolver.
131+
resolver.RegisterSkylbResolverBuilder(sc.keeper)
137132

138-
for _, spec := range sc.specs {
139-
specCopy := &pb.ServiceSpec{
140-
Namespace: spec.Namespace,
141-
ServiceName: spec.ServiceName,
142-
PortName: spec.PortName,
133+
go sc.keeper.Start(csId, csName, sc.resolveFullEps)
143134
}
144135

145-
options := sc.buildDialOptions(specCopy)
136+
for _, spec := range sc.specs {
137+
specCopy := &pb.ServiceSpec{
138+
Namespace: spec.Namespace,
139+
ServiceName: spec.ServiceName,
140+
PortName: spec.PortName,
141+
}
146142

147-
var conn *grpc.ClientConn
148-
var err error
149-
for {
150-
func() {
151-
defer func() {
152-
if p := recover(); p != nil {
153-
err = fmt.Errorf("%v", p)
143+
options := sc.buildDialOptions(specCopy)
144+
145+
var conn *grpc.ClientConn
146+
var err error
147+
for {
148+
func() {
149+
defer func() {
150+
if p := recover(); p != nil {
151+
err = fmt.Errorf("%v", p)
152+
}
153+
}()
154+
155+
var target string
156+
if addrs, ok := sc.debugSvcEndpoints[spec.ServiceName]; ok {
157+
target = skyrs.DirectTarget(addrs)
158+
} else {
159+
target = skyrs.SkyLBTarget(spec)
154160
}
161+
conn, err = grpc.Dial(target, options...)
155162
}()
156163

157-
var target string
158-
if addrs, ok := sc.debugSvcEndpoints[spec.ServiceName]; ok {
159-
target = skyrs.DirectTarget(addrs)
160-
} else {
161-
target = skyrs.SkyLBTarget(spec)
164+
if err == nil {
165+
break
162166
}
163-
conn, err = grpc.Dial(target, options...)
164-
}()
165167

166-
if err == nil {
167-
break
168+
glog.Warningf("Failed to dial service %q, %v.", spec.ServiceName, err)
169+
time.Sleep(*flags.SkylbRetryInterval)
168170
}
169171

170-
glog.Warningf("Failed to dial service %q, %v.", spec.ServiceName, err)
171-
time.Sleep(*flags.SkylbRetryInterval)
172+
sc.conns = append(sc.conns, conn)
173+
callback(spec, conn)
174+
// if *cflags.EnableHealthCheck {
175+
// closer := health.StartHealthCheck(conn, balancer, spec.ServiceName)
176+
// if closer != nil {
177+
// sc.healthCheckClosers = append(sc.healthCheckClosers, closer)
178+
// }
179+
// }
172180
}
173181

174-
sc.conns = append(sc.conns, conn)
175-
callback(spec, conn)
176-
// if *cflags.EnableHealthCheck {
177-
// closer := health.StartHealthCheck(conn, balancer, spec.ServiceName)
178-
// if closer != nil {
179-
// sc.healthCheckClosers = append(sc.healthCheckClosers, closer)
180-
// }
181-
// }
182-
}
183-
184-
if withPrometheusHistogram {
185-
metrics.EnableClientHandlingTimeHistogram()
186-
}
187-
188-
if !sc.failFast && sc.skylbResolveCount > 0 {
189-
sc.keeper.WaitUntilReady()
190-
}
182+
if withPrometheusHistogram {
183+
metrics.EnableClientHandlingTimeHistogram()
184+
}
191185

192-
sc.started = true
186+
if !sc.failFast && sc.skylbResolveCount > 0 {
187+
sc.keeper.WaitUntilReady()
188+
}
189+
})
193190
}
194191

195192
func (sc *serviceClient) buildDialOptions(calledSpec *pb.ServiceSpec) []grpc.DialOption {

internal/skylb/keeper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (sk *skyLbKeeper) start(ctx context.Context, req *pb.ResolveRequest) error
262262

263263
if cliConn, ok := sk.resolverCliConns[key]; ok {
264264
if err := cliConn.UpdateState(resolver.State{
265-
Addresses: localEpsMap[svcEps.Spec.String()],
265+
Addresses: updates,
266266
}); err != nil {
267267
cliConn.ReportError(err)
268268
}

0 commit comments

Comments
 (0)