Skip to content

Commit 4bd5609

Browse files
authored
feat: add status watcher to MCP server (#18320)
This is meant to complement the existing task reporter since the LLM does not call it reliably. It also includes refactoring to use the common agent flags/env vars.
1 parent 5bcde58 commit 4bd5609

File tree

12 files changed

+929
-184
lines changed

12 files changed

+929
-184
lines changed

cli/cliutil/queue.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package cliutil
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/xerrors"
7+
8+
"github.com/coder/coder/v2/codersdk"
9+
)
10+
11+
// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first
12+
// item is dropped.
13+
type Queue[T any] struct {
14+
cond *sync.Cond
15+
items []T
16+
mu sync.Mutex
17+
size int
18+
closed bool
19+
pred func(x T) (T, bool)
20+
}
21+
22+
// NewQueue creates a queue with the given size.
23+
func NewQueue[T any](size int) *Queue[T] {
24+
q := &Queue[T]{
25+
items: make([]T, 0, size),
26+
size: size,
27+
}
28+
q.cond = sync.NewCond(&q.mu)
29+
return q
30+
}
31+
32+
// WithPredicate adds the given predicate function, which can control what is
33+
// pushed to the queue.
34+
func (q *Queue[T]) WithPredicate(pred func(x T) (T, bool)) *Queue[T] {
35+
q.pred = pred
36+
return q
37+
}
38+
39+
// Close aborts any pending pops and makes future pushes error.
40+
func (q *Queue[T]) Close() {
41+
q.mu.Lock()
42+
defer q.mu.Unlock()
43+
q.closed = true
44+
q.cond.Broadcast()
45+
}
46+
47+
// Push adds an item to the queue. If closed, returns an error.
48+
func (q *Queue[T]) Push(x T) error {
49+
q.mu.Lock()
50+
defer q.mu.Unlock()
51+
if q.closed {
52+
return xerrors.New("queue has been closed")
53+
}
54+
// Potentially mutate or skip the push using the predicate.
55+
if q.pred != nil {
56+
var ok bool
57+
x, ok = q.pred(x)
58+
if !ok {
59+
return nil
60+
}
61+
}
62+
// Remove the first item from the queue if it has gotten too big.
63+
if len(q.items) >= q.size {
64+
q.items = q.items[1:]
65+
}
66+
q.items = append(q.items, x)
67+
q.cond.Broadcast()
68+
return nil
69+
}
70+
71+
// Pop removes and returns the first item from the queue, waiting until there is
72+
// something to pop if necessary. If closed, returns false.
73+
func (q *Queue[T]) Pop() (T, bool) {
74+
var head T
75+
q.mu.Lock()
76+
defer q.mu.Unlock()
77+
for len(q.items) == 0 && !q.closed {
78+
q.cond.Wait()
79+
}
80+
if q.closed {
81+
return head, false
82+
}
83+
head, q.items = q.items[0], q.items[1:]
84+
return head, true
85+
}
86+
87+
func (q *Queue[T]) Len() int {
88+
q.mu.Lock()
89+
defer q.mu.Unlock()
90+
return len(q.items)
91+
}
92+
93+
type reportTask struct {
94+
link string
95+
messageID int64
96+
selfReported bool
97+
state codersdk.WorkspaceAppStatusState
98+
summary string
99+
}
100+
101+
// statusQueue is a Queue that:
102+
// 1. Only pushes items that are not duplicates.
103+
// 2. Preserves the existing message and URI when one a message is not provided.
104+
// 3. Ignores "working" updates from the status watcher.
105+
type StatusQueue struct {
106+
Queue[reportTask]
107+
// lastMessageID is the ID of the last *user* message that we saw. A user
108+
// message only happens when interacting via the API (as opposed to
109+
// interacting with the terminal directly).
110+
lastMessageID int64
111+
}
112+
113+
func (q *StatusQueue) Push(report reportTask) error {
114+
q.mu.Lock()
115+
defer q.mu.Unlock()
116+
if q.closed {
117+
return xerrors.New("queue has been closed")
118+
}
119+
var lastReport reportTask
120+
if len(q.items) > 0 {
121+
lastReport = q.items[len(q.items)-1]
122+
}
123+
// Use "working" status if this is a new user message. If this is not a new
124+
// user message, and the status is "working" and not self-reported (meaning it
125+
// came from the screen watcher), then it means one of two things:
126+
// 1. The LLM is still working, in which case our last status will already
127+
// have been "working", so there is nothing to do.
128+
// 2. The user has interacted with the terminal directly. For now, we are
129+
// ignoring these updates. This risks missing cases where the user
130+
// manually submits a new prompt and the LLM becomes active and does not
131+
// update itself, but it avoids spamming useless status updates as the user
132+
// is typing, so the tradeoff is worth it. In the future, if we can
133+
// reliably distinguish between user and LLM activity, we can change this.
134+
if report.messageID > q.lastMessageID {
135+
report.state = codersdk.WorkspaceAppStatusStateWorking
136+
} else if report.state == codersdk.WorkspaceAppStatusStateWorking && !report.selfReported {
137+
q.mu.Unlock()
138+
return nil
139+
}
140+
// Preserve previous message and URI if there was no message.
141+
if report.summary == "" {
142+
report.summary = lastReport.summary
143+
if report.link == "" {
144+
report.link = lastReport.link
145+
}
146+
}
147+
// Avoid queueing duplicate updates.
148+
if report.state == lastReport.state &&
149+
report.link == lastReport.link &&
150+
report.summary == lastReport.summary {
151+
return nil
152+
}
153+
// Drop the first item if the queue has gotten too big.
154+
if len(q.items) >= q.size {
155+
q.items = q.items[1:]
156+
}
157+
q.items = append(q.items, report)
158+
q.cond.Broadcast()
159+
return nil
160+
}

cli/cliutil/queue_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package cliutil_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/coder/coder/v2/cli/cliutil"
10+
)
11+
12+
func TestQueue(t *testing.T) {
13+
t.Parallel()
14+
15+
t.Run("DropsFirst", func(t *testing.T) {
16+
t.Parallel()
17+
18+
q := cliutil.NewQueue[int](10)
19+
require.Equal(t, 0, q.Len())
20+
21+
for i := 0; i < 20; i++ {
22+
err := q.Push(i)
23+
require.NoError(t, err)
24+
if i < 10 {
25+
require.Equal(t, i+1, q.Len())
26+
} else {
27+
require.Equal(t, 10, q.Len())
28+
}
29+
}
30+
31+
val, ok := q.Pop()
32+
require.True(t, ok)
33+
require.Equal(t, 10, val)
34+
require.Equal(t, 9, q.Len())
35+
})
36+
37+
t.Run("Pop", func(t *testing.T) {
38+
t.Parallel()
39+
40+
q := cliutil.NewQueue[int](10)
41+
for i := 0; i < 5; i++ {
42+
err := q.Push(i)
43+
require.NoError(t, err)
44+
}
45+
46+
// No blocking, should pop immediately.
47+
for i := 0; i < 5; i++ {
48+
val, ok := q.Pop()
49+
require.True(t, ok)
50+
require.Equal(t, i, val)
51+
}
52+
53+
// Pop should block until the next push.
54+
go func() {
55+
err := q.Push(55)
56+
assert.NoError(t, err)
57+
}()
58+
59+
item, ok := q.Pop()
60+
require.True(t, ok)
61+
require.Equal(t, 55, item)
62+
})
63+
64+
t.Run("Close", func(t *testing.T) {
65+
t.Parallel()
66+
67+
q := cliutil.NewQueue[int](10)
68+
69+
done := make(chan bool)
70+
go func() {
71+
_, ok := q.Pop()
72+
done <- ok
73+
}()
74+
75+
q.Close()
76+
77+
require.False(t, <-done)
78+
79+
_, ok := q.Pop()
80+
require.False(t, ok)
81+
82+
err := q.Push(10)
83+
require.Error(t, err)
84+
})
85+
86+
t.Run("WithPredicate", func(t *testing.T) {
87+
t.Parallel()
88+
89+
q := cliutil.NewQueue[int](10)
90+
q.WithPredicate(func(n int) (int, bool) {
91+
if n == 2 {
92+
return n, false
93+
}
94+
return n + 1, true
95+
})
96+
97+
for i := 0; i < 5; i++ {
98+
err := q.Push(i)
99+
require.NoError(t, err)
100+
}
101+
102+
got := []int{}
103+
for i := 0; i < 4; i++ {
104+
val, ok := q.Pop()
105+
require.True(t, ok)
106+
got = append(got, val)
107+
}
108+
require.Equal(t, []int{1, 2, 4, 5}, got)
109+
})
110+
}

0 commit comments

Comments
 (0)