-
Notifications
You must be signed in to change notification settings - Fork 914
feat: add status watcher to MCP server #18320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+929
−184
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
5cee4c4
Add queue util
code-asher 1b3b734
Make createAgentClient use token file and errors
code-asher 722475c
Use common flags for agent client in MCP server
code-asher 56c41c8
Add status watcher to MCP server
code-asher 2cd3b45
Preserve URI only if message was blank
code-asher 9ec6ded
Test summary and link
code-asher 3d56d18
Fix lying comment
code-asher c8dc0dd
Only report user messages
code-asher bf78f1a
Increase queue to 100
code-asher 32f6eb9
Push and return seems fine
code-asher f4e06c6
Configure LLM agent URL
code-asher 8dcada5
Add test for duplicate complete
code-asher 6d40d40
Check against last successfully submitted status
code-asher 866b721
Move update predicates to push phase
code-asher a79ee72
Rename LLM to AI
code-asher File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package cliutil | ||
|
||
import ( | ||
"sync" | ||
|
||
"golang.org/x/xerrors" | ||
|
||
"github.com/coder/coder/v2/codersdk" | ||
) | ||
|
||
// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first | ||
// item is dropped. | ||
type Queue[T any] struct { | ||
cond *sync.Cond | ||
items []T | ||
mu sync.Mutex | ||
size int | ||
closed bool | ||
pred func(x T) (T, bool) | ||
} | ||
|
||
// NewQueue creates a queue with the given size. | ||
func NewQueue[T any](size int) *Queue[T] { | ||
q := &Queue[T]{ | ||
items: make([]T, 0, size), | ||
size: size, | ||
} | ||
q.cond = sync.NewCond(&q.mu) | ||
return q | ||
} | ||
|
||
// WithPredicate adds the given predicate function, which can control what is | ||
// pushed to the queue. | ||
func (q *Queue[T]) WithPredicate(pred func(x T) (T, bool)) *Queue[T] { | ||
q.pred = pred | ||
return q | ||
} | ||
|
||
// Close aborts any pending pops and makes future pushes error. | ||
func (q *Queue[T]) Close() { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
q.closed = true | ||
q.cond.Broadcast() | ||
} | ||
|
||
// Push adds an item to the queue. If closed, returns an error. | ||
func (q *Queue[T]) Push(x T) error { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
if q.closed { | ||
return xerrors.New("queue has been closed") | ||
} | ||
// Potentially mutate or skip the push using the predicate. | ||
if q.pred != nil { | ||
var ok bool | ||
x, ok = q.pred(x) | ||
if !ok { | ||
return nil | ||
} | ||
} | ||
// Remove the first item from the queue if it has gotten too big. | ||
if len(q.items) >= q.size { | ||
q.items = q.items[1:] | ||
} | ||
q.items = append(q.items, x) | ||
q.cond.Broadcast() | ||
return nil | ||
} | ||
|
||
// Pop removes and returns the first item from the queue, waiting until there is | ||
// something to pop if necessary. If closed, returns false. | ||
func (q *Queue[T]) Pop() (T, bool) { | ||
var head T | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
for len(q.items) == 0 && !q.closed { | ||
q.cond.Wait() | ||
hugodutka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if q.closed { | ||
return head, false | ||
} | ||
head, q.items = q.items[0], q.items[1:] | ||
return head, true | ||
} | ||
|
||
func (q *Queue[T]) Len() int { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
return len(q.items) | ||
} | ||
|
||
type reportTask struct { | ||
link string | ||
messageID int64 | ||
selfReported bool | ||
state codersdk.WorkspaceAppStatusState | ||
summary string | ||
} | ||
|
||
// statusQueue is a Queue that: | ||
// 1. Only pushes items that are not duplicates. | ||
// 2. Preserves the existing message and URI when one a message is not provided. | ||
// 3. Ignores "working" updates from the status watcher. | ||
type StatusQueue struct { | ||
Queue[reportTask] | ||
// lastMessageID is the ID of the last *user* message that we saw. A user | ||
// message only happens when interacting via the API (as opposed to | ||
// interacting with the terminal directly). | ||
lastMessageID int64 | ||
} | ||
|
||
func (q *StatusQueue) Push(report reportTask) error { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
if q.closed { | ||
return xerrors.New("queue has been closed") | ||
} | ||
var lastReport reportTask | ||
if len(q.items) > 0 { | ||
lastReport = q.items[len(q.items)-1] | ||
} | ||
// Use "working" status if this is a new user message. If this is not a new | ||
// user message, and the status is "working" and not self-reported (meaning it | ||
// came from the screen watcher), then it means one of two things: | ||
// 1. The LLM is still working, in which case our last status will already | ||
// have been "working", so there is nothing to do. | ||
// 2. The user has interacted with the terminal directly. For now, we are | ||
// ignoring these updates. This risks missing cases where the user | ||
// manually submits a new prompt and the LLM becomes active and does not | ||
// update itself, but it avoids spamming useless status updates as the user | ||
// is typing, so the tradeoff is worth it. In the future, if we can | ||
// reliably distinguish between user and LLM activity, we can change this. | ||
if report.messageID > q.lastMessageID { | ||
report.state = codersdk.WorkspaceAppStatusStateWorking | ||
} else if report.state == codersdk.WorkspaceAppStatusStateWorking && !report.selfReported { | ||
q.mu.Unlock() | ||
return nil | ||
} | ||
// Preserve previous message and URI if there was no message. | ||
if report.summary == "" { | ||
report.summary = lastReport.summary | ||
if report.link == "" { | ||
report.link = lastReport.link | ||
} | ||
} | ||
// Avoid queueing duplicate updates. | ||
if report.state == lastReport.state && | ||
report.link == lastReport.link && | ||
report.summary == lastReport.summary { | ||
return nil | ||
} | ||
// Drop the first item if the queue has gotten too big. | ||
if len(q.items) >= q.size { | ||
q.items = q.items[1:] | ||
} | ||
q.items = append(q.items, report) | ||
q.cond.Broadcast() | ||
return nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package cliutil_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/coder/coder/v2/cli/cliutil" | ||
) | ||
|
||
func TestQueue(t *testing.T) { | ||
t.Parallel() | ||
|
||
t.Run("DropsFirst", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
q := cliutil.NewQueue[int](10) | ||
require.Equal(t, 0, q.Len()) | ||
|
||
for i := 0; i < 20; i++ { | ||
err := q.Push(i) | ||
require.NoError(t, err) | ||
if i < 10 { | ||
require.Equal(t, i+1, q.Len()) | ||
} else { | ||
require.Equal(t, 10, q.Len()) | ||
} | ||
} | ||
|
||
val, ok := q.Pop() | ||
require.True(t, ok) | ||
require.Equal(t, 10, val) | ||
require.Equal(t, 9, q.Len()) | ||
}) | ||
|
||
t.Run("Pop", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
q := cliutil.NewQueue[int](10) | ||
for i := 0; i < 5; i++ { | ||
err := q.Push(i) | ||
require.NoError(t, err) | ||
} | ||
|
||
// No blocking, should pop immediately. | ||
for i := 0; i < 5; i++ { | ||
val, ok := q.Pop() | ||
require.True(t, ok) | ||
require.Equal(t, i, val) | ||
} | ||
|
||
// Pop should block until the next push. | ||
go func() { | ||
err := q.Push(55) | ||
assert.NoError(t, err) | ||
}() | ||
|
||
item, ok := q.Pop() | ||
require.True(t, ok) | ||
require.Equal(t, 55, item) | ||
}) | ||
|
||
t.Run("Close", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
q := cliutil.NewQueue[int](10) | ||
|
||
done := make(chan bool) | ||
go func() { | ||
_, ok := q.Pop() | ||
done <- ok | ||
}() | ||
|
||
q.Close() | ||
|
||
require.False(t, <-done) | ||
|
||
_, ok := q.Pop() | ||
require.False(t, ok) | ||
|
||
err := q.Push(10) | ||
require.Error(t, err) | ||
}) | ||
|
||
t.Run("WithPredicate", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
q := cliutil.NewQueue[int](10) | ||
q.WithPredicate(func(n int) (int, bool) { | ||
if n == 2 { | ||
return n, false | ||
} | ||
return n + 1, true | ||
}) | ||
|
||
for i := 0; i < 5; i++ { | ||
err := q.Push(i) | ||
require.NoError(t, err) | ||
} | ||
|
||
got := []int{} | ||
for i := 0; i < 4; i++ { | ||
val, ok := q.Pop() | ||
require.True(t, ok) | ||
got = append(got, val) | ||
} | ||
require.Equal(t, []int{1, 2, 4, 5}, got) | ||
}) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.