From 7f08a9c7a009fbbc86316f6a918d6104322b94e2 Mon Sep 17 00:00:00 2001 From: binchen Date: Fri, 10 Jul 2020 17:18:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98internal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/BUILD.bazel | 48 ++++++++++++ internal/README.md | 9 +++ internal/balancer.go | 108 +++++++++++++++++++++++++++ internal/balancer_test.go | 99 +++++++++++++++++++++++++ internal/hook.go | 149 ++++++++++++++++++++++++++++++++++++++ internal/hook_test.go | 39 ++++++++++ internal/service.go | 104 ++++++++++++++++++++++++++ 7 files changed, 556 insertions(+) create mode 100644 internal/BUILD.bazel create mode 100644 internal/README.md create mode 100644 internal/balancer.go create mode 100644 internal/balancer_test.go create mode 100644 internal/hook.go create mode 100755 internal/hook_test.go create mode 100755 internal/service.go diff --git a/internal/BUILD.bazel b/internal/BUILD.bazel new file mode 100644 index 0000000..129fba7 --- /dev/null +++ b/internal/BUILD.bazel @@ -0,0 +1,48 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +package(default_visibility = ["//visibility:public"]) + +go_library( + name = "go_default_library", + srcs = glob( + ["*.go"], + exclude = ["*_test.go"], + ), + importpath = "github.com/binchencoder/ease-gateway/internal", + visibility = ["//visibility:public"], + deps = [ + "//httpoptions:go_default_library", + "//gateway/runtime:go_default_library", + "@com_github_binchencoder_gateway_proto//data:go_default_library", + "@com_github_binchencoder_letsgo//grpc:go_default_library", + "@com_github_binchencoder_letsgo//hashring:go_default_library", + "@com_github_binchencoder_skylb_api//proto:go_default_library", + "@com_github_golang_protobuf//proto:go_default_library", + "@com_github_pborman_uuid//:go_default_library", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//grpclog:go_default_library", + "@org_golang_x_net//context:go_default_library", + ], +) + +go_test( + name = "go_default_test", + size = "small", + srcs = [ + "balancer_test.go", + "hook_test.go", + ], + embed = [ + ":go_default_library", + ], + deps = [ + "//examples/proto:go_default_library", + "//httpoptions:go_default_library", + "//gateway/runtime:go_default_library", + "@com_github_binchencoder_letsgo//hashring:go_default_library", + "@com_github_binchencoder_skylb_api//proto:go_default_library", + "@com_github_golang_protobuf//proto:go_default_library", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_x_net//context:go_default_library", + ], +) diff --git a/internal/README.md b/internal/README.md new file mode 100644 index 0000000..eb70ce3 --- /dev/null +++ b/internal/README.md @@ -0,0 +1,9 @@ +# Internal + +这个目录下的文件原来在//gateway/runtime 下, 为了让runtime 目录下的文件更加清晰, 想将balancer.go、hook.go、service.go 迁移到//gateway/runtime 下, 但是在迁移的过程中遇到了难题 + +//gateway/rume 和 //internal 互相依赖造成死循环 + +# NOTE + +这个目录目前没有用 diff --git a/internal/balancer.go b/internal/balancer.go new file mode 100644 index 0000000..2f470e2 --- /dev/null +++ b/internal/balancer.go @@ -0,0 +1,108 @@ +package internal + +import ( + "context" + "fmt" + "reflect" + "strings" + + "github.com/golang/protobuf/proto" + "github.com/pborman/uuid" + "google.golang.org/grpc/grpclog" + + options "github.com/binchencoder/ease-gateway/httpoptions" + "github.com/binchencoder/letsgo/grpc" + "github.com/binchencoder/letsgo/hashring" +) + +const ( + hashKeyUUID = "@uuid" + hashKeySession = "@session" +) + +// PreLoadBalance processes context to affect the load balancer. +func PreLoadBalance(ctx context.Context, balancer, hashHeyType string, req proto.Message) context.Context { + if balancer == "" || balancer == options.LoadBalancer_ROUND_ROBIN.String() { + return ctx + } + + if balancer == options.LoadBalancer_CONSISTENT.String() { + if hashHeyType == hashKeyUUID { + hashKey := uuid.New() + ctx = hashring.WithHashKey(ctx, hashKey) + // Also put to gRPC metadata. + ctx = grpc.ToMetadataOutgoing(ctx, "") + } else if hashHeyType == hashKeySession { + // TODO(chenbin): implement it. + return ctx + } else { + // Hash key is a proto field. + hashKey := fmt.Sprintf("%v", getProtoFiledValue(req, hashHeyType)) + ctx = hashring.WithHashKey(ctx, hashKey) + } + } + + return ctx +} + +func getProtoFiledValue(msg proto.Message, fieldPathStr string) reflect.Value { + fieldPath := strings.Split(fieldPathStr, ".") + v := reflect.ValueOf(msg).Elem() + for _, fieldName := range fieldPath { + f, _, err := fieldByProtoName(v, fieldName) + if err != nil { + grpclog.Printf("field not found in %T: %s, %v", msg, strings.Join(fieldPath, "."), err) + return reflect.Value{} + } + if !f.IsValid() { + grpclog.Printf("field not found in %T: %s", msg, strings.Join(fieldPath, ".")) + return reflect.Value{} + } + + switch f.Kind() { + case reflect.Bool, reflect.Float32, reflect.Float64, reflect.Int32, reflect.Int64, reflect.String, reflect.Uint32, reflect.Uint64: + v = f + case reflect.Ptr: + if f.IsNil() { + grpclog.Printf("field is nil in %T: %s", msg, strings.Join(fieldPath, ".")) + return reflect.Value{} + } + v = f.Elem() + continue + case reflect.Struct: + v = f + continue + default: + grpclog.Printf("unexpected type %s in %T", f.Type(), msg) + return reflect.Value{} + } + } + return v +} + +// fieldByProtoName looks up a field whose corresponding protobuf field name is "name". +// "m" must be a struct value. It returns zero reflect.Value if no such field found. +func fieldByProtoName(m reflect.Value, name string) (reflect.Value, *proto.Properties, error) { + props := proto.GetProperties(m.Type()) + + // look up field name in oneof map + if op, ok := props.OneofTypes[name]; ok { + v := reflect.New(op.Type.Elem()) + field := m.Field(op.Field) + if !field.IsNil() { + return reflect.Value{}, nil, fmt.Errorf("field already set for %s oneof", props.Prop[op.Field].OrigName) + } + field.Set(v) + return v.Elem().Field(0), op.Prop, nil + } + + for _, p := range props.Prop { + if p.OrigName == name { + return m.FieldByName(p.Name), p, nil + } + if p.JSONName == name { + return m.FieldByName(p.Name), p, nil + } + } + return reflect.Value{}, nil, nil +} diff --git a/internal/balancer_test.go b/internal/balancer_test.go new file mode 100644 index 0000000..905ea98 --- /dev/null +++ b/internal/balancer_test.go @@ -0,0 +1,99 @@ +package internal + +import ( + "context" + "testing" + + "github.com/golang/protobuf/proto" + + options "github.com/binchencoder/ease-gateway/httpoptions" + "github.com/binchencoder/letsgo/hashring" +) + +const DefaultHashKey = "8daad76a-dbb6-4f95-855d-7cfceb89afa1" + +type msgA struct { + StringValue string `protobuf:"bytes,1,opt,name=string_value" json:"string_value,omitempty"` +} + +func (ma *msgA) Reset() { *ma = msgA{} } +func (ma *msgA) String() string { return proto.CompactTextString(ma) } +func (*msgA) ProtoMessage() {} + +type msgB struct { + Nested *msgA `protobuf:"bytes,1,opt,name=nested" json:"nested,omitempty"` +} + +func (mb *msgB) Reset() { *mb = msgB{} } +func (mb *msgB) String() string { return proto.CompactTextString(mb) } +func (*msgB) ProtoMessage() {} + +func (mb *msgB) GetNested() *msgA { + if mb != nil { + return mb.Nested + } + return nil +} + +type msgC struct { + Nested *msgB `protobuf:"bytes,1,opt,name=nested" json:"nested,omitempty"` +} + +func (mc *msgC) Reset() { *mc = msgC{} } +func (mc *msgC) String() string { return proto.CompactTextString(mc) } +func (*msgC) ProtoMessage() {} + +func (mc *msgC) GetNested() *msgB { + if mc != nil { + return mc.Nested + } + return nil +} + +func TestGetProtoFiledValue(t *testing.T) { + a := msgA{ + StringValue: "foo", + } + v := getProtoFiledValue(&a, "string_value") + if v.String() != "foo" { + t.Errorf("Expect string %s but got %s", "foo", v.String()) + } + + b := msgB{ + Nested: &a, + } + v = getProtoFiledValue(&b, "nested.string_value") + if v.String() != "foo" { + t.Errorf("Expect string %s but got %s", "foo", v.String()) + } + + c := msgC{ + Nested: &b, + } + v = getProtoFiledValue(&c, "nested.nested.string_value") + if v.String() != "foo" { + t.Errorf("Expect string %s but got %s", "foo", v.String()) + } +} + +func TestPreLoadBalance(t *testing.T) { + // Generate UUID. + req := msgA{ + StringValue: DefaultHashKey, + } + ctx := PreLoadBalance(context.Background(), options.LoadBalancer_CONSISTENT.String(), hashKeyUUID, &req) + key := hashring.GetHashKeyOrEmpty(ctx) + if len(key) != 36 { + t.Errorf("Expect getting hash key with length 36s but got %s", key) + } + + // Proto field. + req = msgA{ + StringValue: DefaultHashKey, + } + ctx = PreLoadBalance(context.Background(), options.LoadBalancer_CONSISTENT.String(), "string_value", &req) + key = hashring.GetHashKeyOrEmpty(ctx) + if key != DefaultHashKey { + t.Errorf("Expect getting hash key %s but got %s", DefaultHashKey, key) + } +} diff --git a/internal/hook.go b/internal/hook.go new file mode 100644 index 0000000..2411878 --- /dev/null +++ b/internal/hook.go @@ -0,0 +1,149 @@ +package internal + +import ( + "net/http" + + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" + + "github.com/binchencoder/ease-gateway/gateway/runtime" + pb "github.com/binchencoder/skylb-api/proto" +) + +// GatewayServiceHook collects the injection points with which gateway runtime +// calls back on different stages of request processing. It allows us to +// intercept the flow and inject our business logic, such as authentication, +// etc. +// +// At runtime we only need one instance of GatewayServiceHook since it's +// designed as goroutine-safe. +type GatewayServiceHook interface { + // Bootstrap is a callback function which will be called when + // SetGatewayServiceHook() is executed. The call is guaranteed to be + // after all gateway services are registered and before any request + // arrives. + // + // The passed-in service map contains all the service information known + // by the gateway at compile time. The interface implementation can + // safely hold this map because it will not be changed as soon as + // Bootstrap() is called. + // + // The typical jobs which can be done in Bootstrap() include + // initialization or starting service maintenance workers. + Bootstrap(svcs map[string]*ServiceGroup) error + + // RequestReceived is called after the request arrives at the gateway + // but before the routing decision is made. + // Parameters: + // w: the raw HTTP response writer of current request + // r: the raw HTTP request + // + // Returns: + // err: the error returned to HTTP handler; when it's non-nil, the + // request fails with an internal server error (500) + RequestReceived(w http.ResponseWriter, r *http.Request) (ctx context.Context, err error) + + // RequestAccepted is called when a request is accepted and routed at + // the gateway but before the protocol buffer is parsed. + // + // Parameters: + // svc: the service object to which the gateway routes + // m: the method object to which the gateway routes + // w: the raw HTTP response writer of current request + // r: the raw HTTP request + // Returns: + // ctxVals: the values to be put into context and passed along the + // chain + // err: the error returned to HTTP handler; when it's non-nil, + // the request fails with an internal server error (500) + RequestAccepted(ctx context.Context, svc *Service, m *Method, w http.ResponseWriter, r *http.Request) (ctxret context.Context, err error) + + // RequestParsed is called after the request payload is unmarshaled and + // before the gRPC call is invoked. + // + // Parameters: + // ctx: the context + // svc: the service object to which the gateway routes + // m: the method object to which the gateway routes + // reqProto: the request proto message + // meta: the server meta data + RequestParsed(ctx context.Context, svc *Service, m *Method, reqProto proto.Message, meta *runtime.ServerMetadata) error + + // RequestHandled is called after a request is completely handled, either + // succeeded or failed. + // + // Parameters: + // ctx: is the context. + // svc: is the service object under current context. + // m: the method object to which the gateway routes + // out: is the response message from grpc server. + // meta: the meta data. + // err: is the err which returned from grpc server + RequestHandled(ctx context.Context, svc *Service, m *Method, responseProto proto.Message, meta *runtime.ServerMetadata, err error) +} + +var ( + hook GatewayServiceHook + defaultContext = context.Background() +) + +// SetGatewayServiceHook sets a GatewayServiceHook. It should be called exactly +// once, after all init() functions are called (so that all gateway handlers +// are properly registered). That said, do not call it in function init(). +func SetGatewayServiceHook(h GatewayServiceHook) error { + hook = h + if err := hook.Bootstrap(availableServiceGroups); err != nil { + return err + } + return nil +} + +// RequestReceived will forward call to the hook if set; otherwise no-op. +func RequestReceived(w http.ResponseWriter, r *http.Request) (context.Context, error) { + if hook == nil { + return defaultContext, nil + } + + return hook.RequestReceived(w, r) +} + +// RequestAccepted will forward call to the hook if been set, otherwise no-op. +func RequestAccepted(ctx context.Context, spec *pb.ServiceSpec, name string, methodName string, w http.ResponseWriter, r *http.Request) (context.Context, error) { + if hook == nil { + return nil, nil + } + + sg := GetServiceGroup(spec) + s := sg.Services[name] + return hook.RequestAccepted(ctx, s, getMethod(s, methodName), w, r) +} + +// RequestParsed forwards the call to the RequestParsed method of +// GatewayServiceHook. +func RequestParsed(ctx context.Context, spec *pb.ServiceSpec, name string, methodName string, reqProto proto.Message, meta *runtime.ServerMetadata) error { + if hook == nil { + return nil + } + + sg := GetServiceGroup(spec) + s := sg.Services[name] + return hook.RequestParsed(ctx, s, getMethod(s, methodName), reqProto, meta) +} + +// RequestHandled will forward call to the hook if been set otherwise noop. +func RequestHandled(ctx context.Context, spec *pb.ServiceSpec, name string, methodName string, out proto.Message, meta *runtime.ServerMetadata, err error) { + if hook != nil { + sg := GetServiceGroup(spec) + s := sg.Services[name] + hook.RequestHandled(ctx, s, getMethod(s, methodName), out, meta, err) + } +} + +func getMethod(s *Service, methodName string) *Method { + for _, m := range s.Methods { + if m.Name == methodName { + return m + } + } + return nil +} diff --git a/internal/hook_test.go b/internal/hook_test.go new file mode 100755 index 0000000..cfd627e --- /dev/null +++ b/internal/hook_test.go @@ -0,0 +1,39 @@ +package internal + +import ( + "net/http" + "testing" + + "github.com/binchencoder/ease-gateway/gateway/runtime" + "github.com/golang/protobuf/proto" + "golang.org/x/net/context" +) + +type GatewayServiceHookFake struct { +} + +func (g *GatewayServiceHookFake) Bootstrap(sgs map[string]*ServiceGroup) error { + return nil +} +func (g *GatewayServiceHookFake) RequestReceived(w http.ResponseWriter, r *http.Request) (ctx context.Context, err error) { + return nil, nil +} + +func (g *GatewayServiceHookFake) RequestAccepted(ctx context.Context, svc *Service, m *Method, w http.ResponseWriter, r *http.Request) (ctxret context.Context, err error) { + return nil, nil +} + +func (g *GatewayServiceHookFake) RequestParsed(ctx context.Context, svc *Service, m *Method, in proto.Message, meta *runtime.ServerMetadata) error { + return nil +} + +func (g *GatewayServiceHookFake) RequestHandled(ctx context.Context, svc *Service, m *Method, out proto.Message, meta *runtime.ServerMetadata, err error) { +} + +func TestHook(t *testing.T) { + fakeHook := GatewayServiceHookFake{} + err := SetGatewayServiceHook(&fakeHook) + if err != nil { + t.Errorf("Failed to set hook err=%+v", err) + } +} diff --git a/internal/service.go b/internal/service.go new file mode 100755 index 0000000..c69f8f0 --- /dev/null +++ b/internal/service.go @@ -0,0 +1,104 @@ +package internal + +import ( + "google.golang.org/grpc" + + options "github.com/binchencoder/ease-gateway/httpoptions" + "github.com/binchencoder/ease-gateway/gateway/runtime" + vexpb "github.com/binchencoder/gateway-proto/data" + pb "github.com/binchencoder/skylb-api/proto" + skypb "github.com/binchencoder/skylb-api/proto" +) + +var ( + // CallerServiceId sets the gRPC caller service ID of the gateway. + // For ease-gateway, it's ServiceId_EASE_GATEWAY. + CallerServiceId = vexpb.ServiceId_EASE_GATEWAY +) + +// Method represents a gRPC service method. +type Method struct { + Name string + Path string + HttpMethod string + Enabled bool + LoginRequired bool + ClientSignRequired bool + IsThirdParty bool + SpecifiedSource options.SpecSourceType + ApiSource options.ApiSourceType + TokenType options.AuthTokenType + Timeout string +} + +// Service is the controller class for each grpc service handler. +type Service struct { + Spec pb.ServiceSpec + Name string + Methods []*Method + Register func(*runtime.ServeMux) error + Enable func(spec *skypb.ServiceSpec, conn *grpc.ClientConn) + Disable func() +} + +// ServiceGroup groups services with the same spec. +type ServiceGroup struct { + Spec pb.ServiceSpec + Enable func() + Disable func() + Services map[string]*Service +} + +var ( + availableServiceGroups = make(map[string]*ServiceGroup) +) + +// AddMethod adds an API method to the service object with the given spec. +func AddMethod(spec *pb.ServiceSpec, svcName, methodName, path, httpMethod string, loginRequired, clientSignRequired, isThirdParty bool, specSource, apiSource, tokenType, timeout string) { + sg := availableServiceGroups[spec.String()] + svc := sg.Services[svcName] + m := Method{ + Name: methodName, + Path: path, + HttpMethod: httpMethod, + LoginRequired: loginRequired, + ClientSignRequired: clientSignRequired, + IsThirdParty: isThirdParty, + SpecifiedSource: options.SpecSourceType(options.SpecSourceType_value[specSource]), + ApiSource: options.ApiSourceType(options.ApiSourceType_value[apiSource]), + TokenType: options.AuthTokenType(options.AuthTokenType_value[tokenType]), + Timeout: timeout, + } + svc.Methods = append(svc.Methods, &m) +} + +// AddService adds a service handler to the pool as available list. +// This will not automatically call Regsiter. +func AddService(s *Service, enabler, disabler func()) { + spec := s.Spec + sg, ok := availableServiceGroups[spec.String()] + if !ok { + sg = &ServiceGroup{ + Spec: spec, + Services: map[string]*Service{}, + } + availableServiceGroups[spec.String()] = sg + } + if enabler != nil { + sg.Enable = enabler + } + if disabler != nil { + sg.Disable = disabler + } + sg.Services[s.Name] = s +} + +// GetServicGroups returns the current available service groups. +func GetServicGroups() map[string]*ServiceGroup { + return availableServiceGroups +} + +// GetServiceGroup returns the ServiceGroup with the given spec. +func GetServiceGroup(spec *pb.ServiceSpec) *ServiceGroup { + return availableServiceGroups[spec.String()] +}