Skip to content

Commit fd1b20f

Browse files
committed
Add status watcher to MCP server
Since we can now get status updates from two places, they are placed in a queue so we can handle them one at a time.
1 parent d46ea56 commit fd1b20f

File tree

6 files changed

+456
-35
lines changed

6 files changed

+456
-35
lines changed

cli/exp_mcp.go

Lines changed: 187 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8+
"net/url"
89
"os"
910
"path/filepath"
1011
"slices"
@@ -15,8 +16,10 @@ import (
1516
"github.com/spf13/afero"
1617
"golang.org/x/xerrors"
1718

19+
agentapi "github.com/coder/agentapi-sdk-go"
1820
"github.com/coder/coder/v2/buildinfo"
1921
"github.com/coder/coder/v2/cli/cliui"
22+
"github.com/coder/coder/v2/cli/cliutil"
2023
"github.com/coder/coder/v2/codersdk"
2124
"github.com/coder/coder/v2/codersdk/agentsdk"
2225
"github.com/coder/coder/v2/codersdk/toolsdk"
@@ -25,6 +28,7 @@ import (
2528

2629
const (
2730
envAppStatusSlug = "CODER_MCP_APP_STATUS_SLUG"
31+
envLLMAgentURL = "CODER_MCP_LLM_AGENT_URL"
2832
)
2933

3034
func (r *RootCmd) mcpCommand() *serpent.Command {
@@ -347,10 +351,20 @@ func (*RootCmd) mcpConfigureCursor() *serpent.Command {
347351
return cmd
348352
}
349353

354+
type reportTask struct {
355+
link string
356+
messageID int64
357+
selfReported bool
358+
state codersdk.WorkspaceAppStatusState
359+
summary string
360+
}
361+
350362
type mcpServer struct {
351363
agentClient *agentsdk.Client
352364
appStatusSlug string
353365
client *codersdk.Client
366+
llmClient *agentapi.Client
367+
queue *cliutil.Queue[reportTask]
354368
}
355369

356370
func (r *RootCmd) mcpServer() *serpent.Command {
@@ -359,12 +373,14 @@ func (r *RootCmd) mcpServer() *serpent.Command {
359373
instructions string
360374
allowedTools []string
361375
appStatusSlug string
376+
llmAgentURL url.URL
362377
)
363378
return &serpent.Command{
364379
Use: "server",
365380
Handler: func(inv *serpent.Invocation) error {
366381
srv := &mcpServer{
367382
appStatusSlug: appStatusSlug,
383+
queue: cliutil.NewQueue[reportTask](10),
368384
}
369385

370386
// Display client URL separately from authentication status.
@@ -408,8 +424,36 @@ func (r *RootCmd) mcpServer() *serpent.Command {
408424
cliui.Infof(inv.Stderr, "Task reporter : Enabled")
409425
}
410426

411-
// Start the server.
412-
return srv.start(inv, instructions, allowedTools)
427+
// Try to create a client for the LLM agent API, which is used to get the
428+
// screen status to make the status reporting more robust. No auth
429+
// needed, so no validation.
430+
if llmAgentURL.String() == "" {
431+
cliui.Infof(inv.Stderr, "LLM agent URL : Not configured")
432+
} else {
433+
cliui.Infof(inv.Stderr, "LLM agent URL : %s", llmAgentURL.String())
434+
llmClient, err := agentapi.NewClient(llmAgentURL.String())
435+
if err != nil {
436+
cliui.Infof(inv.Stderr, "Screen events : Disabled")
437+
cliui.Warnf(inv.Stderr, "%s must be set", envLLMAgentURL)
438+
} else {
439+
cliui.Infof(inv.Stderr, "Screen events : Enabled")
440+
srv.llmClient = llmClient
441+
}
442+
}
443+
444+
ctx, cancel := context.WithCancel(inv.Context())
445+
defer cancel()
446+
defer srv.queue.Close()
447+
448+
cliui.Infof(inv.Stderr, "Failed to watch screen events")
449+
// Start the reporter, watcher, and server.
450+
if srv.agentClient != nil && appStatusSlug != "" {
451+
srv.startReporter(ctx, inv)
452+
if srv.llmClient != nil {
453+
srv.startWatcher(ctx, inv)
454+
}
455+
}
456+
return srv.startServer(ctx, inv, instructions, allowedTools)
413457
},
414458
Short: "Start the Coder MCP server.",
415459
Middleware: serpent.Chain(
@@ -438,14 +482,142 @@ func (r *RootCmd) mcpServer() *serpent.Command {
438482
Value: serpent.StringOf(&appStatusSlug),
439483
Default: "",
440484
},
485+
{
486+
Flag: "llm-agent-url",
487+
Description: "The URL of the LLM agent API, used to listen for status updates.",
488+
Env: envLLMAgentURL,
489+
Value: serpent.URLOf(&llmAgentURL),
490+
},
441491
},
442492
}
443493
}
444494

445-
func (s *mcpServer) start(inv *serpent.Invocation, instructions string, allowedTools []string) error {
446-
ctx, cancel := context.WithCancel(inv.Context())
447-
defer cancel()
495+
func (s *mcpServer) startReporter(ctx context.Context, inv *serpent.Invocation) {
496+
var lastMessageID int64
497+
shouldUpdate := func(item reportTask) codersdk.WorkspaceAppStatusState {
498+
// Always send self-reported updates.
499+
if item.selfReported {
500+
return item.state
501+
}
502+
// Always send completed states.
503+
switch item.state {
504+
case codersdk.WorkspaceAppStatusStateComplete,
505+
codersdk.WorkspaceAppStatusStateFailure:
506+
return item.state
507+
}
508+
// Always send "working" when there is a new message, since this means the
509+
// user submitted a message through the API and we know the LLM will begin
510+
// work soon if it has not already.
511+
if item.messageID > lastMessageID {
512+
return codersdk.WorkspaceAppStatusStateWorking
513+
}
514+
// Otherwise, if the state is "working" and there have been no new messages,
515+
// it means either that the LLM is still working or it means the user has
516+
// interacted with the terminal directly. For now, we are ignoring these
517+
// updates. This risks missing cases where the user manually submits a new
518+
// prompt and the LLM becomes active and does not update itself, but it
519+
// avoids spamming useless status updates.
520+
return ""
521+
}
522+
var lastPayload agentsdk.PatchAppStatus
523+
go func() {
524+
for {
525+
// TODO: Even with the queue, there is still the potential that a message
526+
// from the screen watcher and a message from the LLM could arrive out of
527+
// order if the timing is just right. We might want to wait a bit, then
528+
// check if the status has changed before committing.
529+
item, ok := s.queue.Pop()
530+
if !ok {
531+
return
532+
}
533+
534+
state := shouldUpdate(item)
535+
if state == "" {
536+
continue
537+
}
538+
539+
if item.messageID != 0 {
540+
lastMessageID = item.messageID
541+
}
542+
543+
payload := agentsdk.PatchAppStatus{
544+
AppSlug: s.appStatusSlug,
545+
Message: item.summary,
546+
URI: item.link,
547+
State: state,
548+
}
549+
550+
// Preserve previous message and URI.
551+
if payload.Message == "" {
552+
payload.Message = lastPayload.Message
553+
}
554+
if payload.URI == "" {
555+
payload.URI = lastPayload.URI
556+
}
557+
558+
// Avoid sending duplicate updates.
559+
if lastPayload.State == payload.State &&
560+
lastPayload.URI == payload.URI &&
561+
lastPayload.Message == payload.Message {
562+
continue
563+
}
564+
565+
err := s.agentClient.PatchAppStatus(ctx, payload)
566+
if err != nil && !errors.Is(err, context.Canceled) {
567+
cliui.Warnf(inv.Stderr, "Failed to report task status: %s", err)
568+
}
569+
570+
lastPayload = payload
571+
}
572+
}()
573+
}
574+
575+
func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) {
576+
eventsCh, errCh, err := s.llmClient.SubscribeEvents(ctx)
577+
if err != nil {
578+
cliui.Warnf(inv.Stderr, "Failed to watch screen events: %s", err)
579+
return
580+
}
581+
go func() {
582+
for {
583+
select {
584+
case <-ctx.Done():
585+
return
586+
case event := <-eventsCh:
587+
switch ev := event.(type) {
588+
case agentapi.EventStatusChange:
589+
// If the screen is stable, assume complete.
590+
state := codersdk.WorkspaceAppStatusStateWorking
591+
if ev.Status == agentapi.StatusStable {
592+
state = codersdk.WorkspaceAppStatusStateComplete
593+
}
594+
err := s.queue.Push(reportTask{
595+
state: state,
596+
})
597+
if err != nil {
598+
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
599+
return
600+
}
601+
case agentapi.EventMessageUpdate:
602+
err := s.queue.Push(reportTask{
603+
messageID: ev.Id,
604+
})
605+
if err != nil {
606+
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
607+
return
608+
}
609+
}
610+
case err := <-errCh:
611+
if !errors.Is(err, context.Canceled) {
612+
cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err)
613+
}
614+
return
615+
}
616+
}
617+
}()
618+
}
448619

620+
func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, instructions string, allowedTools []string) error {
449621
cliui.Infof(inv.Stderr, "Starting MCP server")
450622

451623
cliui.Infof(inv.Stderr, "Instructions : %q", instructions)
@@ -476,8 +648,16 @@ func (s *mcpServer) start(inv *serpent.Invocation, instructions string, allowedT
476648

477649
// Add tool dependencies.
478650
toolOpts := []func(*toolsdk.Deps){
479-
toolsdk.WithAgentClient(s.agentClient),
480-
toolsdk.WithAppStatusSlug(s.appStatusSlug),
651+
toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs) error {
652+
// TODO: Is it OK to just push and return or should we wait for it to
653+
// actually get disatched to return any request errors?
654+
return s.queue.Push(reportTask{
655+
link: args.Link,
656+
selfReported: true,
657+
state: codersdk.WorkspaceAppStatusState(args.State),
658+
summary: args.Summary,
659+
})
660+
}),
481661
}
482662

483663
toolDeps, err := toolsdk.NewDeps(s.client, toolOpts...)

0 commit comments

Comments
 (0)