From dcd16aa3c0596dc5c270db533c185f1638377ba0 Mon Sep 17 00:00:00 2001 From: chenbin1314 Date: Sun, 27 Mar 2022 18:54:25 +0800 Subject: [PATCH] keeper.go: print added or deleted endpoints. --- internal/skylb/keeper.go | 74 +++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/internal/skylb/keeper.go b/internal/skylb/keeper.go index f9c1f2c..7f04e06 100644 --- a/internal/skylb/keeper.go +++ b/internal/skylb/keeper.go @@ -160,7 +160,7 @@ func (sk *skyLbKeeper) start(ctx context.Context, req *pb.ResolveRequest) error } glog.Infoln("Established resolve stream to SkyLB.") - // localEpsMap := make(map[string]map[string]struct{}) + localEpsMap := make(map[string]map[string]struct{}) readyMap := map[string]struct{}{} for { @@ -178,49 +178,60 @@ func (sk *skyLbKeeper) start(ctx context.Context, req *pb.ResolveRequest) error } timer.Reset(*skylbAliveTimeout) - var updates []resolver.Address if svcEps := resp.GetSvcEndpoints(); svcEps != nil { lenEps := len(svcEps.InstEndpoints) svcName := svcEps.Spec.ServiceName glog.V(2).Infof("Received %d endpoint(s) for service %s", lenEps, svcName) metrics.RecordEndpointCount(svcName, lenEps) - // localEps, ok := localEpsMap[svcEps.Spec.String()] - // if !ok { - // localEps = make(map[string]struct{}) - // localEpsMap[svcEps.Spec.String()] = localEps - // } + localEps, ok := localEpsMap[svcEps.Spec.String()] + if !ok { + localEps = make(map[string]struct{}) + localEpsMap[svcEps.Spec.String()] = localEps + } + + // The response holds full endpoints, we need to calculate the deltas. + eps := make(map[string]struct{}) - // The response holds full endpoints, we need to calculate - // the deltas. - // eps := make(map[string]struct{}) + var updates []resolver.Address + // Newly added service endpoints. + var addedEps []resolver.Address for _, ep := range svcEps.InstEndpoints { addr := fmt.Sprintf("%s:%d", ep.Host, ep.Port) - // eps[addr] = struct{}{} - // if _, ok := localEps[addr]; !ok { - // up := resolver.Address{ - // Addr: addr, - // } - // if ep.Weight != 0 { - // up.Metadata = ep.Weight - // } - // updates = append(updates, up) - // localEps[addr] = struct{}{} - // } + eps[addr] = struct{}{} + + // Added service endpoints. + if _, ok := localEps[addr]; !ok { + up := resolver.Address{ + Addr: addr, + } + if ep.Weight != 0 { + up.Metadata = ep.Weight + } + addedEps = append(addedEps, up) + localEps[addr] = struct{}{} + } updates = append(updates, resolver.Address{ Addr: addr, }) } - // for addr := range localEps { - // if _, ok := eps[addr]; !ok { - // up := resolver.Address{ - // Addr: addr, - // } - // updates = append(updates, up) - // delete(localEps, addr) - // } - // } + + // Removed service endpoints. + var deletedEps []resolver.Address + for addr := range localEps { + if _, ok := eps[addr]; !ok { + up := resolver.Address{ + Addr: addr, + } + deletedEps = append(deletedEps, up) + delete(localEps, addr) + } + } + if len(deletedEps) > 0 || len(addedEps) > 0 { + glog.V(3).Infof("Received resolve resp for[%s], added endpoint(s): %+v, deleted endpoints(s): %+v \n", + svcName, addedEps, deletedEps) + } if len(updates) == 0 { svcKeeperRecvStreamGauge.Dec() @@ -250,9 +261,8 @@ func (sk *skyLbKeeper) start(ctx context.Context, req *pb.ResolveRequest) error } if cliConn, ok := sk.resolverCliConns[key]; ok { - // glog.V(1).Infof("resolver.ClientConn#UpdateState, service:[%s], update: %+v \n", key, updates) if err := cliConn.UpdateState(resolver.State{ - Addresses: updates, + Addresses: localEpsMap[svcEps.Spec.String()], }); err != nil { cliConn.ReportError(err) }