Skip to content

Commit d3c39b6

Browse files
authored
feat: add agent log streaming and follow provisioner format (coder#8170)
1 parent c0a01ec commit d3c39b6

File tree

9 files changed

+708
-604
lines changed

9 files changed

+708
-604
lines changed

cli/cliui/agent.go

Lines changed: 174 additions & 209 deletions
Large diffs are not rendered by default.

cli/cliui/agent_test.go

Lines changed: 311 additions & 321 deletions
Large diffs are not rendered by default.

cli/cliui/provisionerjob.go

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"os"
99
"os/signal"
10+
"strings"
1011
"sync"
1112
"time"
1213

@@ -69,21 +70,20 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
6970
jobMutex sync.Mutex
7071
)
7172

73+
sw := &stageWriter{w: writer, verbose: opts.Verbose, silentLogs: opts.Silent}
74+
7275
printStage := func() {
73-
_, _ = fmt.Fprintf(writer, "==> ⧗ %s\n", currentStage)
76+
sw.Start(currentStage)
7477
}
7578

7679
updateStage := func(stage string, startedAt time.Time) {
7780
if currentStage != "" {
78-
mark := "✔"
81+
duration := startedAt.Sub(currentStageStartedAt)
7982
if job.CompletedAt != nil && job.Status != codersdk.ProvisionerJobSucceeded {
80-
mark = "✘"
81-
}
82-
dur := startedAt.Sub(currentStageStartedAt).Milliseconds()
83-
if dur < 0 {
84-
dur = 0
83+
sw.Fail(currentStage, duration)
84+
} else {
85+
sw.Complete(currentStage, duration)
8586
}
86-
_, _ = fmt.Fprintf(writer, "=== %s %s [%dms]\n", mark, currentStage, dur)
8787
}
8888
if stage == "" {
8989
return
@@ -147,30 +147,15 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
147147
}
148148
defer closer.Close()
149149

150-
var (
151-
// logOutput is where log output is written
152-
logOutput = writer
153-
// logBuffer is where logs are buffered if opts.Silent is true
154-
logBuffer = &bytes.Buffer{}
155-
)
156-
if opts.Silent {
157-
logOutput = logBuffer
158-
}
159-
flushLogBuffer := func() {
160-
if opts.Silent {
161-
_, _ = io.Copy(writer, logBuffer)
162-
}
163-
}
164-
165150
ticker := time.NewTicker(opts.FetchInterval)
166151
defer ticker.Stop()
167152
for {
168153
select {
169154
case err = <-errChan:
170-
flushLogBuffer()
155+
sw.Fail(currentStage, time.Since(currentStageStartedAt))
171156
return err
172157
case <-ctx.Done():
173-
flushLogBuffer()
158+
sw.Fail(currentStage, time.Since(currentStageStartedAt))
174159
return ctx.Err()
175160
case <-ticker.C:
176161
updateJob()
@@ -194,34 +179,89 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
194179
Message: job.Error,
195180
Code: job.ErrorCode,
196181
}
182+
sw.Fail(currentStage, time.Since(currentStageStartedAt))
197183
jobMutex.Unlock()
198-
flushLogBuffer()
199184
return err
200185
}
201186

202-
output := ""
203-
switch log.Level {
204-
case codersdk.LogLevelTrace, codersdk.LogLevelDebug:
205-
if !opts.Verbose {
206-
continue
207-
}
208-
output = DefaultStyles.Placeholder.Render(log.Output)
209-
case codersdk.LogLevelError:
210-
output = DefaultStyles.Error.Render(log.Output)
211-
case codersdk.LogLevelWarn:
212-
output = DefaultStyles.Warn.Render(log.Output)
213-
case codersdk.LogLevelInfo:
214-
output = log.Output
215-
}
216-
217187
jobMutex.Lock()
218188
if log.Stage != currentStage && log.Stage != "" {
219189
updateStage(log.Stage, log.CreatedAt)
220190
jobMutex.Unlock()
221191
continue
222192
}
223-
_, _ = fmt.Fprintf(logOutput, "%s\n", output)
193+
sw.Log(log.CreatedAt, log.Level, log.Output)
224194
jobMutex.Unlock()
225195
}
226196
}
227197
}
198+
199+
type stageWriter struct {
200+
w io.Writer
201+
verbose bool
202+
silentLogs bool
203+
logBuf bytes.Buffer
204+
}
205+
206+
func (s *stageWriter) Start(stage string) {
207+
_, _ = fmt.Fprintf(s.w, "==> ⧗ %s\n", stage)
208+
}
209+
210+
func (s *stageWriter) Complete(stage string, duration time.Duration) {
211+
s.end(stage, duration, true)
212+
}
213+
214+
func (s *stageWriter) Fail(stage string, duration time.Duration) {
215+
s.flushLogs()
216+
s.end(stage, duration, false)
217+
}
218+
219+
//nolint:revive
220+
func (s *stageWriter) end(stage string, duration time.Duration, ok bool) {
221+
s.logBuf.Reset()
222+
223+
mark := "✔"
224+
if !ok {
225+
mark = "✘"
226+
}
227+
if duration < 0 {
228+
duration = 0
229+
}
230+
_, _ = fmt.Fprintf(s.w, "=== %s %s [%dms]\n", mark, stage, duration.Milliseconds())
231+
}
232+
233+
func (s *stageWriter) Log(createdAt time.Time, level codersdk.LogLevel, line string) {
234+
w := s.w
235+
if s.silentLogs {
236+
w = &s.logBuf
237+
}
238+
239+
render := func(s ...string) string { return strings.Join(s, " ") }
240+
241+
var lines []string
242+
if !createdAt.IsZero() {
243+
lines = append(lines, createdAt.Local().Format("2006-01-02 15:04:05.000Z07:00"))
244+
}
245+
lines = append(lines, line)
246+
247+
switch level {
248+
case codersdk.LogLevelTrace, codersdk.LogLevelDebug:
249+
if !s.verbose {
250+
return
251+
}
252+
render = DefaultStyles.Placeholder.Render
253+
case codersdk.LogLevelError:
254+
render = DefaultStyles.Error.Render
255+
case codersdk.LogLevelWarn:
256+
render = DefaultStyles.Warn.Render
257+
case codersdk.LogLevelInfo:
258+
}
259+
_, _ = fmt.Fprintf(w, "%s\n", render(lines...))
260+
}
261+
262+
func (s *stageWriter) flushLogs() {
263+
if s.silentLogs {
264+
_, _ = io.Copy(s.w, &s.logBuf)
265+
}
266+
s.logBuf.Reset()
267+
}

cli/portforward.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func (r *RootCmd) portForward() *clibase.Cmd {
9191
}
9292

9393
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
94-
WorkspaceName: workspace.Name,
9594
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
9695
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
9796
},

cli/speedtest.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,18 @@ func (r *RootCmd) speedtest() *clibase.Cmd {
3535
ctx, cancel := context.WithCancel(inv.Context())
3636
defer cancel()
3737

38-
workspace, workspaceAgent, err := getWorkspaceAndAgent(ctx, inv, client, codersdk.Me, inv.Args[0])
38+
_, workspaceAgent, err := getWorkspaceAndAgent(ctx, inv, client, codersdk.Me, inv.Args[0])
3939
if err != nil {
4040
return err
4141
}
4242

4343
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
44-
WorkspaceName: workspace.Name,
4544
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
4645
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
4746
},
4847
Wait: false,
4948
})
50-
if err != nil && !xerrors.Is(err, cliui.AgentStartError) {
49+
if err != nil {
5150
return xerrors.Errorf("await agent: %w", err)
5251
}
5352

cli/ssh.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,23 +176,16 @@ func (r *RootCmd) ssh() *clibase.Cmd {
176176
// OpenSSH passes stderr directly to the calling TTY.
177177
// This is required in "stdio" mode so a connecting indicator can be displayed.
178178
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
179-
WorkspaceName: workspace.Name,
180179
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
181180
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
182181
},
183-
Wait: wait,
182+
FetchLogs: client.WorkspaceAgentStartupLogsAfter,
183+
Wait: wait,
184184
})
185185
if err != nil {
186186
if xerrors.Is(err, context.Canceled) {
187187
return cliui.Canceled
188188
}
189-
if !xerrors.Is(err, cliui.AgentStartError) {
190-
return xerrors.Errorf("await agent: %w", err)
191-
}
192-
193-
// We don't want to fail on a startup script error because it's
194-
// natural that the user will want to fix the script and try again.
195-
// We don't print the error because cliui.Agent does that for us.
196189
}
197190

198191
if r.disableDirect {

cmd/cliui/main.go

Lines changed: 85 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"math/rand"
89
"net/url"
910
"os"
1011
"strings"
1112
"sync/atomic"
1213
"time"
1314

15+
"github.com/google/uuid"
1416
"golang.org/x/xerrors"
1517

1618
"github.com/coder/coder/cli/clibase"
@@ -164,25 +166,91 @@ func main() {
164166
root.Children = append(root.Children, &clibase.Cmd{
165167
Use: "agent",
166168
Handler: func(inv *clibase.Invocation) error {
167-
agent := codersdk.WorkspaceAgent{
168-
Status: codersdk.WorkspaceAgentDisconnected,
169-
LifecycleState: codersdk.WorkspaceAgentLifecycleReady,
169+
var agent codersdk.WorkspaceAgent
170+
var logs []codersdk.WorkspaceAgentStartupLog
171+
172+
fetchSteps := []func(){
173+
func() {
174+
createdAt := time.Now().Add(-time.Minute)
175+
agent = codersdk.WorkspaceAgent{
176+
CreatedAt: createdAt,
177+
Status: codersdk.WorkspaceAgentConnecting,
178+
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
179+
}
180+
},
181+
func() {
182+
time.Sleep(time.Second)
183+
agent.Status = codersdk.WorkspaceAgentTimeout
184+
},
185+
func() {
186+
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStarting
187+
startingAt := time.Now()
188+
agent.StartedAt = &startingAt
189+
for i := 0; i < 10; i++ {
190+
level := codersdk.LogLevelInfo
191+
if rand.Float64() > 0.75 { //nolint:gosec
192+
level = codersdk.LogLevelError
193+
}
194+
logs = append(logs, codersdk.WorkspaceAgentStartupLog{
195+
CreatedAt: time.Now().Add(-time.Duration(10-i) * 144 * time.Millisecond),
196+
Output: fmt.Sprintf("Some log %d", i),
197+
Level: level,
198+
})
199+
}
200+
},
201+
func() {
202+
time.Sleep(time.Second)
203+
firstConnectedAt := time.Now()
204+
agent.FirstConnectedAt = &firstConnectedAt
205+
lastConnectedAt := firstConnectedAt.Add(0)
206+
agent.LastConnectedAt = &lastConnectedAt
207+
agent.Status = codersdk.WorkspaceAgentConnected
208+
},
209+
func() {},
210+
func() {
211+
time.Sleep(5 * time.Second)
212+
agent.Status = codersdk.WorkspaceAgentConnected
213+
lastConnectedAt := time.Now()
214+
agent.LastConnectedAt = &lastConnectedAt
215+
},
170216
}
171-
go func() {
172-
time.Sleep(3 * time.Second)
173-
agent.Status = codersdk.WorkspaceAgentConnected
174-
}()
175217
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
176-
WorkspaceName: "dev",
177-
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
218+
FetchInterval: 100 * time.Millisecond,
219+
Wait: true,
220+
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
221+
if len(fetchSteps) == 0 {
222+
return agent, nil
223+
}
224+
step := fetchSteps[0]
225+
fetchSteps = fetchSteps[1:]
226+
step()
178227
return agent, nil
179228
},
180-
WarnInterval: 2 * time.Second,
229+
FetchLogs: func(_ context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
230+
logsC := make(chan []codersdk.WorkspaceAgentStartupLog, len(logs))
231+
if follow {
232+
go func() {
233+
defer close(logsC)
234+
for _, log := range logs {
235+
logsC <- []codersdk.WorkspaceAgentStartupLog{log}
236+
time.Sleep(144 * time.Millisecond)
237+
}
238+
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
239+
readyAt := database.Now()
240+
agent.ReadyAt = &readyAt
241+
}()
242+
} else {
243+
logsC <- logs
244+
close(logsC)
245+
}
246+
return logsC, closeFunc(func() error {
247+
return nil
248+
}), nil
249+
},
181250
})
182251
if err != nil {
183252
return err
184253
}
185-
_, _ = fmt.Printf("Completed!\n")
186254
return nil
187255
},
188256
})
@@ -278,3 +346,9 @@ func main() {
278346
os.Exit(1)
279347
}
280348
}
349+
350+
type closeFunc func() error
351+
352+
func (f closeFunc) Close() error {
353+
return f()
354+
}

coderd/workspaceagents_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
225225
})
226226
require.NoError(t, err)
227227

228-
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
228+
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
229229
require.NoError(t, err)
230230
defer func() {
231231
_ = closer.Close()
@@ -338,7 +338,7 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
338338
agentClient := agentsdk.New(client.URL)
339339
agentClient.SetSessionToken(authToken)
340340

341-
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
341+
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
342342
require.NoError(t, err)
343343
defer func() {
344344
_ = closer.Close()

0 commit comments

Comments
 (0)