Skip to content

feat: add status watcher to MCP server #18320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions cli/cliutil/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package cliutil

import (
"sync"

"golang.org/x/xerrors"

"github.com/coder/coder/v2/codersdk"
)

// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first
// item is dropped.
type Queue[T any] struct {
cond *sync.Cond
items []T
mu sync.Mutex
size int
closed bool
pred func(x T) (T, bool)
}

// NewQueue creates a queue with the given size.
func NewQueue[T any](size int) *Queue[T] {
q := &Queue[T]{
items: make([]T, 0, size),
size: size,
}
q.cond = sync.NewCond(&q.mu)
return q
}

// WithPredicate adds the given predicate function, which can control what is
// pushed to the queue.
func (q *Queue[T]) WithPredicate(pred func(x T) (T, bool)) *Queue[T] {
q.pred = pred
return q
}

// Close aborts any pending pops and makes future pushes error.
func (q *Queue[T]) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.cond.Broadcast()
}

// Push adds an item to the queue. If closed, returns an error.
func (q *Queue[T]) Push(x T) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return xerrors.New("queue has been closed")
}
// Potentially mutate or skip the push using the predicate.
if q.pred != nil {
var ok bool
x, ok = q.pred(x)
if !ok {
return nil
}
}
// Remove the first item from the queue if it has gotten too big.
if len(q.items) >= q.size {
q.items = q.items[1:]
}
q.items = append(q.items, x)
q.cond.Broadcast()
return nil
}

// Pop removes and returns the first item from the queue, waiting until there is
// something to pop if necessary. If closed, returns false.
func (q *Queue[T]) Pop() (T, bool) {
var head T
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 && !q.closed {
q.cond.Wait()
}
if q.closed {
return head, false
}
head, q.items = q.items[0], q.items[1:]
return head, true
}

func (q *Queue[T]) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.items)
}

type reportTask struct {
link string
messageID int64
selfReported bool
state codersdk.WorkspaceAppStatusState
summary string
}

// statusQueue is a Queue that:
// 1. Only pushes items that are not duplicates.
// 2. Preserves the existing message and URI when one a message is not provided.
// 3. Ignores "working" updates from the status watcher.
type StatusQueue struct {
Queue[reportTask]
// lastMessageID is the ID of the last *user* message that we saw. A user
// message only happens when interacting via the API (as opposed to
// interacting with the terminal directly).
lastMessageID int64
}

func (q *StatusQueue) Push(report reportTask) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return xerrors.New("queue has been closed")
}
var lastReport reportTask
if len(q.items) > 0 {
lastReport = q.items[len(q.items)-1]
}
// Use "working" status if this is a new user message. If this is not a new
// user message, and the status is "working" and not self-reported (meaning it
// came from the screen watcher), then it means one of two things:
// 1. The LLM is still working, in which case our last status will already
// have been "working", so there is nothing to do.
// 2. The user has interacted with the terminal directly. For now, we are
// ignoring these updates. This risks missing cases where the user
// manually submits a new prompt and the LLM becomes active and does not
// update itself, but it avoids spamming useless status updates as the user
// is typing, so the tradeoff is worth it. In the future, if we can
// reliably distinguish between user and LLM activity, we can change this.
if report.messageID > q.lastMessageID {
report.state = codersdk.WorkspaceAppStatusStateWorking
} else if report.state == codersdk.WorkspaceAppStatusStateWorking && !report.selfReported {
q.mu.Unlock()
return nil
}
// Preserve previous message and URI if there was no message.
if report.summary == "" {
report.summary = lastReport.summary
if report.link == "" {
report.link = lastReport.link
}
}
// Avoid queueing duplicate updates.
if report.state == lastReport.state &&
report.link == lastReport.link &&
report.summary == lastReport.summary {
return nil
}
// Drop the first item if the queue has gotten too big.
if len(q.items) >= q.size {
q.items = q.items[1:]
}
q.items = append(q.items, report)
q.cond.Broadcast()
return nil
}
110 changes: 110 additions & 0 deletions cli/cliutil/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package cliutil_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/coder/coder/v2/cli/cliutil"
)

func TestQueue(t *testing.T) {
t.Parallel()

t.Run("DropsFirst", func(t *testing.T) {
t.Parallel()

q := cliutil.NewQueue[int](10)
require.Equal(t, 0, q.Len())

for i := 0; i < 20; i++ {
err := q.Push(i)
require.NoError(t, err)
if i < 10 {
require.Equal(t, i+1, q.Len())
} else {
require.Equal(t, 10, q.Len())
}
}

val, ok := q.Pop()
require.True(t, ok)
require.Equal(t, 10, val)
require.Equal(t, 9, q.Len())
})

t.Run("Pop", func(t *testing.T) {
t.Parallel()

q := cliutil.NewQueue[int](10)
for i := 0; i < 5; i++ {
err := q.Push(i)
require.NoError(t, err)
}

// No blocking, should pop immediately.
for i := 0; i < 5; i++ {
val, ok := q.Pop()
require.True(t, ok)
require.Equal(t, i, val)
}

// Pop should block until the next push.
go func() {
err := q.Push(55)
assert.NoError(t, err)
}()

item, ok := q.Pop()
require.True(t, ok)
require.Equal(t, 55, item)
})

t.Run("Close", func(t *testing.T) {
t.Parallel()

q := cliutil.NewQueue[int](10)

done := make(chan bool)
go func() {
_, ok := q.Pop()
done <- ok
}()

q.Close()

require.False(t, <-done)

_, ok := q.Pop()
require.False(t, ok)

err := q.Push(10)
require.Error(t, err)
})

t.Run("WithPredicate", func(t *testing.T) {
t.Parallel()

q := cliutil.NewQueue[int](10)
q.WithPredicate(func(n int) (int, bool) {
if n == 2 {
return n, false
}
return n + 1, true
})

for i := 0; i < 5; i++ {
err := q.Push(i)
require.NoError(t, err)
}

got := []int{}
for i := 0; i < 4; i++ {
val, ok := q.Pop()
require.True(t, ok)
got = append(got, val)
}
require.Equal(t, []int{1, 2, 4, 5}, got)
})
}
Loading
Loading