Skip to content

Commit 736705f

Browse files
committed
Add tests for streaming logs
1 parent 1bb700f commit 736705f

File tree

13 files changed

+147
-120
lines changed

13 files changed

+147
-120
lines changed

agent/agent.go

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type Client interface {
8888
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
8989
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
9090
PostStartup(ctx context.Context, req agentsdk.PostStartupRequest) error
91-
AppendStartupLogs(ctx context.Context, req []agentsdk.StartupLog) error
91+
PatchStartupLogs(ctx context.Context, req agentsdk.PatchStartupLogs) error
9292
}
9393

9494
func New(options Options) io.Closer {
@@ -202,19 +202,6 @@ func (a *agent) runLoop(ctx context.Context) {
202202
}
203203
}
204204

205-
func (a *agent) appendStartupLogsLoop(ctx context.Context, logs []agentsdk.StartupLog) {
206-
for r := retry.New(time.Second, 15*time.Second); r.Wait(ctx); {
207-
err := a.client.AppendStartupLogs(ctx, logs)
208-
if err == nil {
209-
return
210-
}
211-
if errors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
212-
return
213-
}
214-
a.logger.Error(ctx, "failed to append startup logs", slog.Error(err))
215-
}
216-
}
217-
218205
// reportLifecycleLoop reports the current lifecycle state once.
219206
// Only the latest state is reported, intermediate states may be
220207
// lost if the agent can't communicate with the API.
@@ -680,25 +667,20 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) error {
680667
return
681668
}
682669
logsSending = true
683-
toSend := make([]agentsdk.StartupLog, len(queuedLogs))
684-
copy(toSend, queuedLogs)
670+
logsToSend := queuedLogs
671+
queuedLogs = make([]agentsdk.StartupLog, 0)
685672
logMutex.Unlock()
686673
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
687-
err := a.client.AppendStartupLogs(ctx, toSend)
674+
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
675+
Logs: logsToSend,
676+
})
688677
if err == nil {
689678
break
690679
}
691-
a.logger.Error(ctx, "upload startup logs", slog.Error(err))
692-
}
693-
if ctx.Err() != nil {
694-
logMutex.Lock()
695-
logsSending = false
696-
logMutex.Unlock()
697-
return
680+
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
698681
}
699682
logMutex.Lock()
700683
logsSending = false
701-
queuedLogs = queuedLogs[len(toSend):]
702684
logMutex.Unlock()
703685
}
704686
queueLog := func(log agentsdk.StartupLog) {

agent/agent_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,10 +1576,10 @@ func (c *client) getStartupLogs() []agentsdk.StartupLog {
15761576
return c.logs
15771577
}
15781578

1579-
func (c *client) AppendStartupLogs(_ context.Context, logs []agentsdk.StartupLog) error {
1579+
func (c *client) PatchStartupLogs(_ context.Context, logs agentsdk.PatchStartupLogs) error {
15801580
c.mu.Lock()
15811581
defer c.mu.Unlock()
1582-
c.logs = append(c.logs, logs...)
1582+
c.logs = append(c.logs, logs.Logs...)
15831583
return nil
15841584
}
15851585

coderd/provisionerjobs_test.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -89,36 +89,4 @@ func TestProvisionerJobLogs(t *testing.T) {
8989
}
9090
}
9191
})
92-
93-
t.Run("List", func(t *testing.T) {
94-
t.Parallel()
95-
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
96-
user := coderdtest.CreateFirstUser(t, client)
97-
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
98-
Parse: echo.ParseComplete,
99-
ProvisionApply: []*proto.Provision_Response{{
100-
Type: &proto.Provision_Response_Log{
101-
Log: &proto.Log{
102-
Level: proto.LogLevel_INFO,
103-
Output: "log-output",
104-
},
105-
},
106-
}, {
107-
Type: &proto.Provision_Response_Complete{
108-
Complete: &proto.Provision_Complete{},
109-
},
110-
}},
111-
})
112-
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
113-
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
114-
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
115-
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
116-
117-
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
118-
defer cancel()
119-
120-
logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, 0)
121-
require.NoError(t, err)
122-
require.Greater(t, len(logs), 1)
123-
})
12492
}

coderd/workspaceagents.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,23 +226,23 @@ func (api *API) postWorkspaceAgentStartup(rw http.ResponseWriter, r *http.Reques
226226
// @Accept json
227227
// @Produce json
228228
// @Tags Agents
229-
// @Param request body []agentsdk.StartupLog true "Startup logs"
229+
// @Param request body agentsdk.PatchStartupLogs true "Startup logs"
230230
// @Success 200
231231
// @Router /workspaceagents/me/startup-logs [patch]
232232
// @x-apidocgen {"skip": true}
233233
func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Request) {
234234
ctx := r.Context()
235235
workspaceAgent := httpmw.WorkspaceAgent(r)
236236

237-
var req []agentsdk.StartupLog
237+
var req agentsdk.PatchStartupLogs
238238
if !httpapi.Read(ctx, rw, r, &req) {
239239
return
240240
}
241241

242242
createdAt := make([]time.Time, 0)
243243
output := make([]string, 0)
244244
outputLength := 0
245-
for _, log := range req {
245+
for _, log := range req.Logs {
246246
createdAt = append(createdAt, log.CreatedAt)
247247
output = append(output, log.Output)
248248
outputLength += len(log.Output)
@@ -311,7 +311,7 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
311311
// This mostly copies how provisioner job logs are streamed!
312312
var (
313313
ctx = r.Context()
314-
agent = httpmw.WorkspaceAgent(r)
314+
agent = httpmw.WorkspaceAgentParam(r)
315315
logger = api.Logger.With(slog.F("workspace_agent_id", agent.ID))
316316
follow = r.URL.Query().Has("follow")
317317
afterRaw = r.URL.Query().Get("after")

coderd/workspaceagents_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,65 @@ func TestWorkspaceAgent(t *testing.T) {
175175
})
176176
}
177177

178+
func TestWorkspaceAgentStartup(t *testing.T) {
179+
t.Parallel()
180+
ctx, cancelFunc := testutil.Context(t)
181+
defer cancelFunc()
182+
client := coderdtest.New(t, &coderdtest.Options{
183+
IncludeProvisionerDaemon: true,
184+
})
185+
user := coderdtest.CreateFirstUser(t, client)
186+
authToken := uuid.NewString()
187+
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
188+
Parse: echo.ParseComplete,
189+
ProvisionPlan: echo.ProvisionComplete,
190+
ProvisionApply: []*proto.Provision_Response{{
191+
Type: &proto.Provision_Response_Complete{
192+
Complete: &proto.Provision_Complete{
193+
Resources: []*proto.Resource{{
194+
Name: "example",
195+
Type: "aws_instance",
196+
Agents: []*proto.Agent{{
197+
Id: uuid.NewString(),
198+
Auth: &proto.Agent_Token{
199+
Token: authToken,
200+
},
201+
}},
202+
}},
203+
},
204+
},
205+
}},
206+
})
207+
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
208+
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
209+
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
210+
build := coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
211+
212+
agentClient := agentsdk.New(client.URL)
213+
agentClient.SetSessionToken(authToken)
214+
err := agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
215+
Logs: []agentsdk.StartupLog{{
216+
CreatedAt: database.Now(),
217+
Output: "testing",
218+
}},
219+
})
220+
require.NoError(t, err)
221+
222+
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
223+
require.NoError(t, err)
224+
defer func() {
225+
_ = closer.Close()
226+
}()
227+
var log codersdk.WorkspaceAgentStartupLog
228+
select {
229+
case <-ctx.Done():
230+
case log = <-logs:
231+
}
232+
require.NoError(t, ctx.Err())
233+
require.Equal(t, "testing", log.Output)
234+
cancelFunc()
235+
}
236+
178237
func TestWorkspaceAgentListen(t *testing.T) {
179238
t.Parallel()
180239

coderd/wsconncache/wsconncache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,6 @@ func (*client) PostStartup(_ context.Context, _ agentsdk.PostStartupRequest) err
250250
return nil
251251
}
252252

253-
func (*client) AppendStartupLogs(_ context.Context, _ []agentsdk.StartupLog) error {
253+
func (*client) PatchStartupLogs(_ context.Context, _ agentsdk.PatchStartupLogs) error {
254254
return nil
255255
}

codersdk/agentsdk/agentsdk.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,13 @@ type StartupLog struct {
521521
Output string `json:"output"`
522522
}
523523

524-
// AppendStartupLogs writes log messages to the agent startup script.
524+
type PatchStartupLogs struct {
525+
Logs []StartupLog `json:"logs"`
526+
}
527+
528+
// PatchStartupLogs writes log messages to the agent startup script.
525529
// Log messages are limited to 1MB in total.
526-
func (c *Client) AppendStartupLogs(ctx context.Context, req []StartupLog) error {
530+
func (c *Client) PatchStartupLogs(ctx context.Context, req PatchStartupLogs) error {
527531
res, err := c.SDK.Request(ctx, http.MethodPatch, "/api/v2/workspaceagents/me/startup-logs", req)
528532
if err != nil {
529533
return err

codersdk/provisionerdaemons.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"net"
1010
"net/http"
1111
"net/http/cookiejar"
12-
"net/url"
13-
"strconv"
1412
"time"
1513

1614
"github.com/google/uuid"
@@ -100,27 +98,6 @@ type ProvisionerJobLog struct {
10098
Output string `json:"output"`
10199
}
102100

103-
// provisionerJobLogsBefore provides log output that occurred before a time.
104-
// This is abstracted from a specific job type to provide consistency between
105-
// APIs. Logs is the only shared route between jobs.
106-
func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before int64) ([]ProvisionerJobLog, error) {
107-
values := url.Values{}
108-
if before != 0 {
109-
values["before"] = []string{strconv.FormatInt(before, 10)}
110-
}
111-
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?%s", path, values.Encode()), nil)
112-
if err != nil {
113-
return nil, err
114-
}
115-
if res.StatusCode != http.StatusOK {
116-
defer res.Body.Close()
117-
return nil, ReadBodyAsError(res)
118-
}
119-
120-
var logs []ProvisionerJobLog
121-
return logs, json.NewDecoder(res.Body).Decode(&logs)
122-
}
123-
124101
// provisionerJobLogsAfter streams logs that occurred after a specific time.
125102
func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
126103
afterQuery := ""

codersdk/templateversions.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,6 @@ func (c *Client) TemplateVersionVariables(ctx context.Context, version uuid.UUID
186186
return variables, json.NewDecoder(res.Body).Decode(&variables)
187187
}
188188

189-
// TemplateVersionLogsBefore returns logs that occurred before a specific log ID.
190-
func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before int64) ([]ProvisionerJobLog, error) {
191-
return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), before)
192-
}
193-
194189
// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific log ID.
195190
func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
196191
return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), after)
@@ -253,12 +248,6 @@ func (c *Client) TemplateVersionDryRunResources(ctx context.Context, version, jo
253248
return resources, json.NewDecoder(res.Body).Decode(&resources)
254249
}
255250

256-
// TemplateVersionDryRunLogsBefore returns logs for a template version dry-run
257-
// that occurred before a specific log ID.
258-
func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before int64) ([]ProvisionerJobLog, error) {
259-
return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), before)
260-
}
261-
262251
// TemplateVersionDryRunLogsAfter streams logs for a template version dry-run
263252
// that occurred after a specific log ID.
264253
func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {

codersdk/workspaceagents.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"io"
89
"net"
910
"net/http"
1011
"net/http/cookiejar"
@@ -314,6 +315,65 @@ func (c *Client) WorkspaceAgentListeningPorts(ctx context.Context, agentID uuid.
314315
return listeningPorts, json.NewDecoder(res.Body).Decode(&listeningPorts)
315316
}
316317

318+
func (c *Client) WorkspaceAgentStartupLogsAfter(ctx context.Context, agentID uuid.UUID, after int64) (<-chan WorkspaceAgentStartupLog, io.Closer, error) {
319+
afterQuery := ""
320+
if after != 0 {
321+
afterQuery = fmt.Sprintf("&after=%d", after)
322+
}
323+
followURL, err := c.URL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/startup-logs?follow%s", agentID, afterQuery))
324+
if err != nil {
325+
return nil, nil, err
326+
}
327+
jar, err := cookiejar.New(nil)
328+
if err != nil {
329+
return nil, nil, xerrors.Errorf("create cookie jar: %w", err)
330+
}
331+
jar.SetCookies(followURL, []*http.Cookie{{
332+
Name: SessionTokenCookie,
333+
Value: c.SessionToken(),
334+
}})
335+
httpClient := &http.Client{
336+
Jar: jar,
337+
Transport: c.HTTPClient.Transport,
338+
}
339+
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
340+
HTTPClient: httpClient,
341+
CompressionMode: websocket.CompressionDisabled,
342+
})
343+
if err != nil {
344+
if res == nil {
345+
return nil, nil, err
346+
}
347+
return nil, nil, ReadBodyAsError(res)
348+
}
349+
logs := make(chan WorkspaceAgentStartupLog)
350+
closed := make(chan struct{})
351+
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
352+
decoder := json.NewDecoder(wsNetConn)
353+
go func() {
354+
defer close(closed)
355+
defer close(logs)
356+
defer conn.Close(websocket.StatusGoingAway, "")
357+
var log WorkspaceAgentStartupLog
358+
for {
359+
err = decoder.Decode(&log)
360+
if err != nil {
361+
return
362+
}
363+
select {
364+
case <-ctx.Done():
365+
return
366+
case logs <- log:
367+
}
368+
}
369+
}()
370+
return logs, closeFunc(func() error {
371+
_ = wsNetConn.Close()
372+
<-closed
373+
return nil
374+
}), nil
375+
}
376+
317377
// GitProvider is a constant that represents the
318378
// type of providers that are supported within Coder.
319379
type GitProvider string

0 commit comments

Comments
 (0)