-
Notifications
You must be signed in to change notification settings - Fork 8
feat: Add the invocation of the Post response plugins #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
466e773
49b6afa
3e8284c
32e43b1
4655be4
6fffe9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ import ( | |
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" | ||
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" | ||
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" | ||
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" | ||
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" | ||
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" | ||
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" | ||
|
@@ -66,6 +67,7 @@ type StreamingServer struct { | |
|
||
type Scheduler interface { | ||
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.Result, err error) | ||
RunPostResponsePlugins(ctx context.Context, req *types.LLMRequest, tragetPodName string) (*schedulingtypes.Result, error) | ||
} | ||
|
||
// RequestContext stores context information during the life time of an HTTP request. | ||
|
@@ -189,6 +191,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) | |
case *extProcPb.ProcessingRequest_RequestTrailers: | ||
// This is currently unused. | ||
case *extProcPb.ProcessingRequest_ResponseHeaders: | ||
responseHeaders := make(map[string]string) | ||
elevran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, header := range v.ResponseHeaders.Headers.GetHeaders() { | ||
value := string(header.RawValue) | ||
|
||
|
@@ -199,27 +202,53 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) | |
reqCtx.modelServerStreaming = true | ||
loggerTrace.Info("model server is streaming response") | ||
} | ||
responseHeaders[header.Key] = value | ||
} | ||
|
||
reqCtx.RequestState = ResponseRecieved | ||
reqCtx.respHeaderResp = &extProcPb.ProcessingResponse{ | ||
Response: &extProcPb.ProcessingResponse_ResponseHeaders{ | ||
ResponseHeaders: &extProcPb.HeadersResponse{ | ||
Response: &extProcPb.CommonResponse{ | ||
HeaderMutation: &extProcPb.HeaderMutation{ | ||
SetHeaders: []*configPb.HeaderValueOption{ | ||
{ | ||
Header: &configPb.HeaderValue{ | ||
// This is for debugging purpose only. | ||
Key: "x-went-into-resp-headers", | ||
RawValue: []byte("true"), | ||
}, | ||
}, | ||
llmReq := &schedulingtypes.LLMRequest{ | ||
Model: reqCtx.Model, | ||
Headers: responseHeaders, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. q: Is there a need to differentiate between adding new headers and mutating existing ones, or is it all the same from Envoy's PoV? |
||
ResolvedTargetModel: reqCtx.ResolvedTargetModel, | ||
} | ||
|
||
var result *types.Result | ||
result, err = s.scheduler.RunPostResponsePlugins(ctx, llmReq, reqCtx.TargetPod) | ||
if err != nil { | ||
logger.V(logutil.DEFAULT).Error(err, "Error handling response") | ||
reqCtx.ResponseStatusCode = errutil.ModelServerError | ||
} else { | ||
headers := []*configPb.HeaderValueOption{ | ||
{ | ||
Header: &configPb.HeaderValue{ | ||
// This is for debugging purpose only. | ||
shmuelk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Key: "x-went-into-resp-headers", | ||
RawValue: []byte("true"), | ||
}, | ||
}, | ||
} | ||
|
||
// Add headers added by PostResponse | ||
for key, value := range result.MutatedHeaders { | ||
headers = append(headers, &configPb.HeaderValueOption{ | ||
Header: &configPb.HeaderValue{ | ||
Key: key, | ||
RawValue: []byte(value), | ||
}, | ||
}) | ||
} | ||
|
||
reqCtx.RequestState = ResponseRecieved | ||
reqCtx.respHeaderResp = &extProcPb.ProcessingResponse{ | ||
Response: &extProcPb.ProcessingResponse_ResponseHeaders{ | ||
ResponseHeaders: &extProcPb.HeadersResponse{ | ||
Response: &extProcPb.CommonResponse{ | ||
HeaderMutation: &extProcPb.HeaderMutation{ | ||
SetHeaders: headers, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
case *extProcPb.ProcessingRequest_ResponseBody: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,7 +69,6 @@ var ( | |
) | ||
|
||
func NewScheduler(datastore Datastore) *Scheduler { | ||
setDefaultConfig() | ||
return NewSchedulerWithConfig(datastore, defaultConfig) | ||
} | ||
|
||
|
@@ -81,6 +80,7 @@ func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Sched | |
scorers: config.scorers, | ||
picker: config.picker, | ||
postSchedulePlugins: config.postSchedulePlugins, | ||
postResponsePlugins: config.postResponsePlugins, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm reading this and wondering if PostResponsePlugins should even be part of the scheduler config.
|
||
} | ||
} | ||
|
||
|
@@ -91,6 +91,7 @@ type Scheduler struct { | |
scorers map[plugins.Scorer]int // map from scorer to its weight | ||
picker plugins.Picker | ||
postSchedulePlugins []plugins.PostSchedule | ||
postResponsePlugins []plugins.PostResponse | ||
} | ||
|
||
type Datastore interface { | ||
|
@@ -211,6 +212,38 @@ func (s *Scheduler) runPostSchedulePlugins(ctx *types.SchedulingContext, res *ty | |
} | ||
} | ||
|
||
func (s *Scheduler) RunPostResponsePlugins(ctx context.Context, req *types.LLMRequest, targetPodName string) (*types.Result, error) { | ||
elevran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger := log.FromContext(ctx) | ||
|
||
pool, err := s.datastore.PoolGet() | ||
if err != nil { | ||
return nil, errutil.Error{Code: errutil.Internal, Msg: "failed to find a target pod"} // pool not defined, no pods | ||
} | ||
|
||
// Snapshot pod metrics from the datastore to: | ||
elevran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// 1. Reduce concurrent access to the datastore. | ||
// 2. Ensure consistent data during the scheduling operation of a request. | ||
pods := types.ToSchedulerPodMetrics(s.datastore.PodGetAll()) | ||
var targetPod types.Pod | ||
for _, pod := range pods { | ||
if pod.GetPod().NamespacedName.String() == targetPodName { | ||
targetPod = pod | ||
break | ||
} | ||
} | ||
|
||
sCtx := types.NewSchedulingContext(ctx, req, pods, pool.Spec.TargetPortNumber) | ||
|
||
for _, plugin := range s.postResponsePlugins { | ||
logger.V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name()) | ||
before := time.Now() | ||
plugin.PostResponse(sCtx, targetPod) | ||
metrics.RecordSchedulerPluginProcessingLatency(plugins.PostResponsePluginType, plugin.Name(), time.Since(before)) | ||
} | ||
Comment on lines
+237
to
+242
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all other |
||
|
||
return &types.Result{TargetPod: nil, MutatedHeaders: sCtx.MutatedHeaders}, nil | ||
} | ||
|
||
type defaultPlugin struct { | ||
picker.RandomPicker | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so far, all plugin invocations (pre, post, pick, ...) are done inside the
pkg/epp/scheduling/Scheduler
struct as private struct methods and are not defined as part of the interface itself.Unless there's a good reason to deviate and make this an interface requirement, I would prefer we don't introduce new interface methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Schedule API is only used on the request side of things. PostResponse plugins are called in the response side of things
I can if you want add a parameter somewhere to indicate which "direction" the Schedule call is being made. Under the covers, My guess is that Schedule wil then wrap two functions, one foe request processing and the other for response processing.
I think this will complicate the P/D case where we have a special Scheduler that wraps two schedulers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Those are indeed two distinct flows.
There are several options (when you consider making this change upstream - seems reasonable to start a discussion on the issue to get a resolution in parallel with coding the change):