Skip to content

feat: Add the invocation of the Post response plugins #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand Down Expand Up @@ -66,6 +67,7 @@ type StreamingServer struct {

type Scheduler interface {
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.Result, err error)
RunPostResponsePlugins(ctx context.Context, req *types.LLMRequest, tragetPodName string) (*schedulingtypes.Result, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so far, all plugin invocations (pre, post, pick, ...) are done inside the pkg/epp/scheduling/Scheduler struct as private struct methods and are not defined as part of the interface itself.
Unless there's a good reason to deviate and make this an interface requirement, I would prefer we don't introduce new interface methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Schedule API is only used on the request side of things. PostResponse plugins are called in the response side of things

I can if you want add a parameter somewhere to indicate which "direction" the Schedule call is being made. Under the covers, My guess is that Schedule wil then wrap two functions, one foe request processing and the other for response processing.

I think this will complicate the P/D case where we have a special Scheduler that wraps two schedulers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Those are indeed two distinct flows.
There are several options (when you consider making this change upstream - seems reasonable to start a discussion on the issue to get a resolution in parallel with coding the change):

  • change the name of the RunPost... to be more generic
  • change both functions to make it clear that one handles the request and the other the response
  • introduce a new interface for doing post response processing and pass it from main, as is done for the Scheduler (the same object can implement both interfaces if needed)
  • etc.

}

// RequestContext stores context information during the life time of an HTTP request.
Expand Down Expand Up @@ -189,6 +191,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
case *extProcPb.ProcessingRequest_RequestTrailers:
// This is currently unused.
case *extProcPb.ProcessingRequest_ResponseHeaders:
responseHeaders := make(map[string]string)
for _, header := range v.ResponseHeaders.Headers.GetHeaders() {
value := string(header.RawValue)

Expand All @@ -199,27 +202,53 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
reqCtx.modelServerStreaming = true
loggerTrace.Info("model server is streaming response")
}
responseHeaders[header.Key] = value
}

reqCtx.RequestState = ResponseRecieved
reqCtx.respHeaderResp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &extProcPb.HeadersResponse{
Response: &extProcPb.CommonResponse{
HeaderMutation: &extProcPb.HeaderMutation{
SetHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
// This is for debugging purpose only.
Key: "x-went-into-resp-headers",
RawValue: []byte("true"),
},
},
llmReq := &schedulingtypes.LLMRequest{
Model: reqCtx.Model,
Headers: responseHeaders,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: Is there a need to differentiate between adding new headers and mutating existing ones, or is it all the same from Envoy's PoV?

ResolvedTargetModel: reqCtx.ResolvedTargetModel,
}

var result *types.Result
result, err = s.scheduler.RunPostResponsePlugins(ctx, llmReq, reqCtx.TargetPod)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error handling response")
reqCtx.ResponseStatusCode = errutil.ModelServerError
} else {
headers := []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
// This is for debugging purpose only.
Key: "x-went-into-resp-headers",
RawValue: []byte("true"),
},
},
}

// Add headers added by PostResponse
for key, value := range result.MutatedHeaders {
headers = append(headers, &configPb.HeaderValueOption{
Header: &configPb.HeaderValue{
Key: key,
RawValue: []byte(value),
},
})
}

reqCtx.RequestState = ResponseRecieved
reqCtx.respHeaderResp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &extProcPb.HeadersResponse{
Response: &extProcPb.CommonResponse{
HeaderMutation: &extProcPb.HeaderMutation{
SetHeaders: headers,
},
},
},
},
},
}
}

case *extProcPb.ProcessingRequest_ResponseBody:
Expand Down
2 changes: 2 additions & 0 deletions pkg/epp/scheduling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type SchedulerConfig struct {
scorers map[plugins.Scorer]int // map from scorer to weight
picker plugins.Picker
postSchedulePlugins []plugins.PostSchedule
postResponsePlugins []plugins.PostResponse
}

var defPlugin = &defaultPlugin{}
Expand All @@ -40,4 +41,5 @@ var defaultConfig = &SchedulerConfig{
scorers: map[plugins.Scorer]int{},
picker: defPlugin,
postSchedulePlugins: []plugins.PostSchedule{},
postResponsePlugins: []plugins.PostResponse{},
}
4 changes: 4 additions & 0 deletions pkg/epp/scheduling/local_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
loadAwareScorerWeightEnvVar = "LOAD_AWARE_SCORER_WEIGHT"
)

func init() {
setDefaultConfig()
}

func setDefaultConfig() {
// since the default config is a global variable, we add this function to minimize rebase conflicts.
// this configuration is a temporary state, it should be better streamlined.
Expand Down
35 changes: 34 additions & 1 deletion pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ var (
)

func NewScheduler(datastore Datastore) *Scheduler {
setDefaultConfig()
return NewSchedulerWithConfig(datastore, defaultConfig)
}

Expand All @@ -81,6 +80,7 @@ func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Sched
scorers: config.scorers,
picker: config.picker,
postSchedulePlugins: config.postSchedulePlugins,
postResponsePlugins: config.postResponsePlugins,
Copy link
Collaborator

@nirrozenbaum nirrozenbaum May 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reading this and wondering if PostResponsePlugins should even be part of the scheduler config.
other than the fact that scorer(or other plugin) also implements the PostResponse, is there any reason for this to be part of the scheduler? is there any information PostResponse plugins need from scheduler?
I mean, we could do in main where we register the plugins something like (pseudo code):

my_plugin = scorer.MyScorer // implements also PostSchedule
...
...
Scheduler := NewSchedulerWithConfig(... , config) // config contains my_plugin
// pass my_plugin (and other plugins that implement PostResponse) to runserver to call PostResponse. 

@elevran @shmuelk

}
}

Expand All @@ -91,6 +91,7 @@ type Scheduler struct {
scorers map[plugins.Scorer]int // map from scorer to its weight
picker plugins.Picker
postSchedulePlugins []plugins.PostSchedule
postResponsePlugins []plugins.PostResponse
}

type Datastore interface {
Expand Down Expand Up @@ -211,6 +212,38 @@ func (s *Scheduler) runPostSchedulePlugins(ctx *types.SchedulingContext, res *ty
}
}

func (s *Scheduler) RunPostResponsePlugins(ctx context.Context, req *types.LLMRequest, targetPodName string) (*types.Result, error) {
logger := log.FromContext(ctx)

pool, err := s.datastore.PoolGet()
if err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: "failed to find a target pod"} // pool not defined, no pods
}

// Snapshot pod metrics from the datastore to:
// 1. Reduce concurrent access to the datastore.
// 2. Ensure consistent data during the scheduling operation of a request.
pods := types.ToSchedulerPodMetrics(s.datastore.PodGetAll())
var targetPod types.Pod
for _, pod := range pods {
if pod.GetPod().NamespacedName.String() == targetPodName {
targetPod = pod
break
}
}

sCtx := types.NewSchedulingContext(ctx, req, pods, pool.Spec.TargetPortNumber)

for _, plugin := range s.postResponsePlugins {
logger.V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Name())
before := time.Now()
plugin.PostResponse(sCtx, targetPod)
metrics.RecordSchedulerPluginProcessingLatency(plugins.PostResponsePluginType, plugin.Name(), time.Since(before))
}
Comment on lines +237 to +242
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all other run* functions are considerably shorter (amounting to the highlighted code more or less.
Would be good to understand the difference. Might be related that insofar scheduling was called for a request and this is a first processing called on the response?
If so, might be good to highlight that in the comments. Also consider if we want to create some symmetry (in the upstream) so we call OnRequest/OnResponse instead of Schedule/RunPostResponse...


return &types.Result{TargetPod: nil, MutatedHeaders: sCtx.MutatedHeaders}, nil
}

type defaultPlugin struct {
picker.RandomPicker
}
Expand Down
67 changes: 67 additions & 0 deletions pkg/epp/scheduling/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,56 @@ func TestSchedulePlugins(t *testing.T) {
}
}

func TestPostResponse(t *testing.T) {
pr1 := &testPostResponse{
NameRes: "pr1",
ExtraHeaders: map[string]string{"x-session-id": "qwer-asdf-zxcv"},
ReceivedResponseHeaders: make(map[string]string),
}

tests := []struct {
name string
config SchedulerConfig
input []*backendmetrics.FakePodMetrics
responseHeaders map[string]string
wantMutatedHeaders map[string]string
}{
{
name: "Simple postResponse test",
config: SchedulerConfig{
postResponsePlugins: []plugins.PostResponse{pr1},
},
input: []*backendmetrics.FakePodMetrics{
{Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
},
responseHeaders: map[string]string{"Content-type": "application/json", "Content-Length": "1234"},
wantMutatedHeaders: map[string]string{"x-session-id": "qwer-asdf-zxcv"},
},
}

for _, test := range tests {
scheduler := NewSchedulerWithConfig(&fakeDataStore{pods: test.input}, &test.config)

req := &types.LLMRequest{
Model: "test-model",
Headers: test.responseHeaders,
}

result, err := scheduler.RunPostResponsePlugins(context.Background(), req, test.input[0].Pod.NamespacedName.String())
if err != nil {
t.Errorf("Received an error. Error: %s", err)
}

if diff := cmp.Diff(test.responseHeaders, pr1.ReceivedResponseHeaders); diff != "" {
t.Errorf("Unexpected output (-responseHeaders +ReceivedResponseHeaders): %v", diff)
}

if diff := cmp.Diff(test.wantMutatedHeaders, result.MutatedHeaders); diff != "" {
t.Errorf("Unexpected output (-wantedMutatedHeaders +MutatedHeaders): %v", diff)
}
}
}

type fakeDataStore struct {
pods []*backendmetrics.FakePodMetrics
}
Expand Down Expand Up @@ -571,6 +621,23 @@ func (tp *TestPlugin) reset() {
tp.NumOfPickerCandidates = 0
}

type testPostResponse struct {
NameRes string
ReceivedResponseHeaders map[string]string
ExtraHeaders map[string]string
}

func (pr *testPostResponse) Name() string { return pr.NameRes }

func (pr *testPostResponse) PostResponse(ctx *types.SchedulingContext, pod types.Pod) {
for key, value := range ctx.Req.Headers {
pr.ReceivedResponseHeaders[key] = value
}
for key, value := range pr.ExtraHeaders {
ctx.MutatedHeaders[key] = value
}
}

func findPods(ctx *types.SchedulingContext, names ...k8stypes.NamespacedName) []types.Pod {
res := []types.Pod{}
for _, pod := range ctx.PodsSnapshot {
Expand Down
24 changes: 14 additions & 10 deletions pkg/epp/scheduling/scorers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,23 @@ func TestScorers(t *testing.T) {
},
},
wantRes: &types.Result{
TargetPod: &types.PodMetrics{
Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
Metrics: &backendmetrics.Metrics{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
ActiveModels: map[string]int{
"foo": 1,
"bar": 1,
TargetPod: &types.ScoredPod{
Pod: &types.PodMetrics{
Pod: &backendmetrics.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
Metrics: &backendmetrics.Metrics{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
ActiveModels: map[string]int{
"foo": 1,
"bar": 1,
},
WaitingModels: map[string]int{},
},
WaitingModels: map[string]int{},
},
Score: 0.5,
},
MutatedHeaders: map[string]string{},
},
},
}
Expand Down