diff --git a/cmd/skytest/client.go b/cmd/skytest/client.go index e0516f4..9fbe595 100644 --- a/cmd/skytest/client.go +++ b/cmd/skytest/client.go @@ -38,16 +38,19 @@ var ( func startSkylb(sid vexpb.ServiceId) (skylb.ServiceCli, pb.SkytestClient, hpb.HealthClient) { skycli := skylb.NewServiceCli(vexpb.ServiceId_SHARED_TEST_CLIENT_SERVICE) + options := []grpc.DialOption{} - // options = append(options, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin": {}}]}`)) + options = append(options, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin": {}}]}`)) skycli.Resolve(skylb.NewServiceSpec(skylb.DefaultNameSpace, sid, skylb.DefaultPortName), options...) skycli.EnableHistogram() + var cli pb.SkytestClient var healthCli hpb.HealthClient skycli.Start(func(spec *skypb.ServiceSpec, conn *grpc.ClientConn) { cli = pb.NewSkytestClient(conn) healthCli = hpb.NewHealthClient(conn) }) + return skycli, cli, healthCli } diff --git a/cmd/skytest/server.go b/cmd/skytest/server.go index 471e739..e9f2db2 100644 --- a/cmd/skytest/server.go +++ b/cmd/skytest/server.go @@ -3,11 +3,14 @@ package main import ( "flag" "fmt" + "log" "math/rand" + "net/http" "os" "time" "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" "google.golang.org/grpc" @@ -56,11 +59,11 @@ func main() { // } // go registerFakeServices() - // go registerPrometheus() + go registerPrometheus() myServiceId = vexpb.ServiceId_SHARED_TEST_SERVER_SERVICE skylb.Register(myServiceId, cli.DefaultPortName, *port) - // skylb.EnableHistogram() + skylb.EnableHistogram() addr := fmt.Sprintf("%s:%d", *host, *port) glog.Infof("Starting gRPC service at %s\n", addr) @@ -80,12 +83,12 @@ func main() { // } // } -// func registerPrometheus() { -// http.Handle("/_/metrics", prometheus.UninstrumentedHandler()) -// if err := http.ListenAndServe(*scrapeAddr, nil); err != nil { -// log.Fatal("ListenServerError:", err) -// } -// } +func registerPrometheus() { + http.Handle("/_/metrics", prometheus.UninstrumentedHandler()) + if err := http.ListenAndServe(*scrapeAddr, nil); err != nil { + log.Fatal("ListenServerError:", err) + } +} func randString(n int) string { b := make([]rune, n) diff --git a/internal/skylb/client.go b/internal/skylb/client.go index 2454e43..8eec2bf 100644 --- a/internal/skylb/client.go +++ b/internal/skylb/client.go @@ -5,6 +5,7 @@ import ( "net" "strconv" "strings" + "sync" "time" "github.com/golang/glog" @@ -26,6 +27,8 @@ import ( var ( withPrometheusHistogram = false + + once sync.Once ) // serviceClient implements interface skylb-api/client/ServiceClient. @@ -44,7 +47,6 @@ type serviceClient struct { debugSvcEndpoints map[string]string resolveFullEps bool - started bool } // Resolve resolves a service spec and returns a load balancer handle. @@ -111,85 +113,80 @@ func (sc *serviceClient) AddUnaryInterceptor(incept grpc.UnaryClientInterceptor) // // Start can only be called once in the whole lifecycle of an application. func (sc *serviceClient) Start(callback func(spec *pb.ServiceSpec, conn *grpc.ClientConn)) { - csId := sc.clientServiceId - csName, err := naming.ServiceIdToName(csId) - - // Only be called once - if sc.started { - glog.Warningf("Service client[%s] has started", csName) - return - } + // Start only once. + once.Do(func() { + csId := sc.clientServiceId + csName, err := naming.ServiceIdToName(csId) - glog.Infof("Starting service client[%s] with %d service specs to resolve.", - csName, sc.skylbResolveCount) - - if nil != err { - glog.V(1).Infof("Invalid caller service id %d\n", csId) - csName = fmt.Sprintf("!%d", csId) - } + glog.Infof("Starting service client[%s] with %d service specs to resolve.", + csName, sc.skylbResolveCount) - if sc.skylbResolveCount > 0 { - // Registers the skylb scheme to the resolver. - resolver.RegisterSkylbResolverBuilder(sc.keeper) + if nil != err { + glog.V(1).Infof("Invalid caller service id %d\n", csId) + csName = fmt.Sprintf("!%d", csId) + } - go sc.keeper.Start(csId, csName, sc.resolveFullEps) - } + if sc.skylbResolveCount > 0 { + // Registers the skylb scheme to the resolver. + resolver.RegisterSkylbResolverBuilder(sc.keeper) - for _, spec := range sc.specs { - specCopy := &pb.ServiceSpec{ - Namespace: spec.Namespace, - ServiceName: spec.ServiceName, - PortName: spec.PortName, + go sc.keeper.Start(csId, csName, sc.resolveFullEps) } - options := sc.buildDialOptions(specCopy) + for _, spec := range sc.specs { + specCopy := &pb.ServiceSpec{ + Namespace: spec.Namespace, + ServiceName: spec.ServiceName, + PortName: spec.PortName, + } - var conn *grpc.ClientConn - var err error - for { - func() { - defer func() { - if p := recover(); p != nil { - err = fmt.Errorf("%v", p) + options := sc.buildDialOptions(specCopy) + + var conn *grpc.ClientConn + var err error + for { + func() { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("%v", p) + } + }() + + var target string + if addrs, ok := sc.debugSvcEndpoints[spec.ServiceName]; ok { + target = skyrs.DirectTarget(addrs) + } else { + target = skyrs.SkyLBTarget(spec) } + conn, err = grpc.Dial(target, options...) }() - var target string - if addrs, ok := sc.debugSvcEndpoints[spec.ServiceName]; ok { - target = skyrs.DirectTarget(addrs) - } else { - target = skyrs.SkyLBTarget(spec) + if err == nil { + break } - conn, err = grpc.Dial(target, options...) - }() - if err == nil { - break + glog.Warningf("Failed to dial service %q, %v.", spec.ServiceName, err) + time.Sleep(*flags.SkylbRetryInterval) } - glog.Warningf("Failed to dial service %q, %v.", spec.ServiceName, err) - time.Sleep(*flags.SkylbRetryInterval) + sc.conns = append(sc.conns, conn) + callback(spec, conn) + // if *cflags.EnableHealthCheck { + // closer := health.StartHealthCheck(conn, balancer, spec.ServiceName) + // if closer != nil { + // sc.healthCheckClosers = append(sc.healthCheckClosers, closer) + // } + // } } - sc.conns = append(sc.conns, conn) - callback(spec, conn) - // if *cflags.EnableHealthCheck { - // closer := health.StartHealthCheck(conn, balancer, spec.ServiceName) - // if closer != nil { - // sc.healthCheckClosers = append(sc.healthCheckClosers, closer) - // } - // } - } - - if withPrometheusHistogram { - metrics.EnableClientHandlingTimeHistogram() - } - - if !sc.failFast && sc.skylbResolveCount > 0 { - sc.keeper.WaitUntilReady() - } + if withPrometheusHistogram { + metrics.EnableClientHandlingTimeHistogram() + } - sc.started = true + if !sc.failFast && sc.skylbResolveCount > 0 { + sc.keeper.WaitUntilReady() + } + }) } func (sc *serviceClient) buildDialOptions(calledSpec *pb.ServiceSpec) []grpc.DialOption { diff --git a/internal/skylb/keeper.go b/internal/skylb/keeper.go index 7f04e06..49711ff 100644 --- a/internal/skylb/keeper.go +++ b/internal/skylb/keeper.go @@ -262,7 +262,7 @@ func (sk *skyLbKeeper) start(ctx context.Context, req *pb.ResolveRequest) error if cliConn, ok := sk.resolverCliConns[key]; ok { if err := cliConn.UpdateState(resolver.State{ - Addresses: localEpsMap[svcEps.Spec.String()], + Addresses: updates, }); err != nil { cliConn.ReportError(err) } diff --git a/repositories.bzl b/repositories.bzl index 5f04938..e5cc495 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -4,8 +4,8 @@ def go_repositories(): # go_repository( # name = "com_github_binchencoder_ease_gateway", # importpath = "github.com/binchencoder/ease-gateway", - # sum = "h1:hRpJwksSTfcSQeKWW7CpjH/Gq2BteNwhI8hzI0nn/z4=", - # version = "v0.0.4", + # sum = "h1:efGeRw9OajE8VSNs8zdFP3GeONCgZYnDsZ5BHNkGPNw=", + # version = "v1.0.4", # ) go_repository( name = "com_github_binchencoder_gateway_proto",