Skip to content

feat: add agent log streaming and follow provisioner format #8170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 28, 2023
389 changes: 183 additions & 206 deletions cli/cliui/agent.go

Large diffs are not rendered by default.

638 changes: 317 additions & 321 deletions cli/cliui/agent_test.go

Large diffs are not rendered by default.

124 changes: 82 additions & 42 deletions cli/cliui/provisionerjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"os/signal"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -69,21 +70,20 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
jobMutex sync.Mutex
)

sw := &stageWriter{w: writer, verbose: opts.Verbose, silentLogs: opts.Silent}

printStage := func() {
_, _ = fmt.Fprintf(writer, "==> ⧗ %s\n", currentStage)
sw.Start(currentStage)
}

updateStage := func(stage string, startedAt time.Time) {
if currentStage != "" {
mark := "✔"
duration := startedAt.Sub(currentStageStartedAt)
if job.CompletedAt != nil && job.Status != codersdk.ProvisionerJobSucceeded {
mark = "✘"
}
dur := startedAt.Sub(currentStageStartedAt).Milliseconds()
if dur < 0 {
dur = 0
sw.Fail(currentStage, duration)
} else {
sw.Complete(currentStage, duration)
}
_, _ = fmt.Fprintf(writer, "=== %s %s [%dms]\n", mark, currentStage, dur)
}
if stage == "" {
return
Expand Down Expand Up @@ -147,30 +147,15 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
}
defer closer.Close()

var (
// logOutput is where log output is written
logOutput = writer
// logBuffer is where logs are buffered if opts.Silent is true
logBuffer = &bytes.Buffer{}
)
if opts.Silent {
logOutput = logBuffer
}
flushLogBuffer := func() {
if opts.Silent {
_, _ = io.Copy(writer, logBuffer)
}
}

ticker := time.NewTicker(opts.FetchInterval)
defer ticker.Stop()
for {
select {
case err = <-errChan:
flushLogBuffer()
sw.Fail(currentStage, time.Since(currentStageStartedAt))
return err
case <-ctx.Done():
flushLogBuffer()
sw.Fail(currentStage, time.Since(currentStageStartedAt))
return ctx.Err()
case <-ticker.C:
updateJob()
Expand All @@ -194,34 +179,89 @@ func ProvisionerJob(ctx context.Context, writer io.Writer, opts ProvisionerJobOp
Message: job.Error,
Code: job.ErrorCode,
}
sw.Fail(currentStage, time.Since(currentStageStartedAt))
jobMutex.Unlock()
flushLogBuffer()
return err
}

output := ""
switch log.Level {
case codersdk.LogLevelTrace, codersdk.LogLevelDebug:
if !opts.Verbose {
continue
}
output = DefaultStyles.Placeholder.Render(log.Output)
case codersdk.LogLevelError:
output = DefaultStyles.Error.Render(log.Output)
case codersdk.LogLevelWarn:
output = DefaultStyles.Warn.Render(log.Output)
case codersdk.LogLevelInfo:
output = log.Output
}

jobMutex.Lock()
if log.Stage != currentStage && log.Stage != "" {
updateStage(log.Stage, log.CreatedAt)
jobMutex.Unlock()
continue
}
_, _ = fmt.Fprintf(logOutput, "%s\n", output)
sw.Log(log.CreatedAt, log.Level, log.Output)
jobMutex.Unlock()
}
}
}

type stageWriter struct {
w io.Writer
verbose bool
silentLogs bool
logBuf bytes.Buffer
}

func (s *stageWriter) Start(stage string) {
_, _ = fmt.Fprintf(s.w, "==> ⧗ %s\n", stage)
}

func (s *stageWriter) Complete(stage string, duration time.Duration) {
s.end(stage, duration, true)
}

func (s *stageWriter) Fail(stage string, duration time.Duration) {
s.flushLogs()
s.end(stage, duration, false)
}

//nolint:revive
func (s *stageWriter) end(stage string, duration time.Duration, ok bool) {
s.logBuf.Reset()

mark := "✔"
if !ok {
mark = "✘"
}
if duration < 0 {
duration = 0
}
_, _ = fmt.Fprintf(s.w, "=== %s %s [%dms]\n", mark, stage, duration.Milliseconds())
}

func (s *stageWriter) Log(createdAt time.Time, level codersdk.LogLevel, line string) {
w := s.w
if s.silentLogs {
w = &s.logBuf
}

render := func(s ...string) string { return strings.Join(s, " ") }

var lines []string
if !createdAt.IsZero() {
lines = append(lines, createdAt.Local().Format("2006-01-02 15:04:05.000Z07:00"))
}
lines = append(lines, line)

switch level {
case codersdk.LogLevelTrace, codersdk.LogLevelDebug:
if !s.verbose {
return
}
render = DefaultStyles.Placeholder.Render
case codersdk.LogLevelError:
render = DefaultStyles.Error.Render
case codersdk.LogLevelWarn:
render = DefaultStyles.Warn.Render
case codersdk.LogLevelInfo:
}
_, _ = fmt.Fprintf(w, "%s\n", render(lines...))
}

func (s *stageWriter) flushLogs() {
if s.silentLogs {
_, _ = io.Copy(s.w, &s.logBuf)
}
s.logBuf.Reset()
}
1 change: 0 additions & 1 deletion cli/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (r *RootCmd) portForward() *clibase.Cmd {
}

err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},
Expand Down
5 changes: 2 additions & 3 deletions cli/speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,18 @@ func (r *RootCmd) speedtest() *clibase.Cmd {
ctx, cancel := context.WithCancel(inv.Context())
defer cancel()

workspace, workspaceAgent, err := getWorkspaceAndAgent(ctx, inv, client, codersdk.Me, inv.Args[0])
_, workspaceAgent, err := getWorkspaceAndAgent(ctx, inv, client, codersdk.Me, inv.Args[0])
if err != nil {
return err
}

err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},
Wait: false,
})
if err != nil && !xerrors.Is(err, cliui.AgentStartError) {
if err != nil {
return xerrors.Errorf("await agent: %w", err)
}

Expand Down
11 changes: 2 additions & 9 deletions cli/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,16 @@ func (r *RootCmd) ssh() *clibase.Cmd {
// OpenSSH passes stderr directly to the calling TTY.
// This is required in "stdio" mode so a connecting indicator can be displayed.
err = cliui.Agent(ctx, inv.Stderr, cliui.AgentOptions{
WorkspaceName: workspace.Name,
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
},
Wait: wait,
FetchLogs: client.WorkspaceAgentStartupLogsAfter,
Wait: wait,
})
if err != nil {
if xerrors.Is(err, context.Canceled) {
return cliui.Canceled
}
if !xerrors.Is(err, cliui.AgentStartError) {
return xerrors.Errorf("await agent: %w", err)
}

// We don't want to fail on a startup script error because it's
// natural that the user will want to fix the script and try again.
// We don't print the error because cliui.Agent does that for us.
}

if r.disableDirect {
Expand Down
96 changes: 85 additions & 11 deletions cmd/cliui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/url"
"os"
"strings"
"sync/atomic"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"

"github.com/coder/coder/cli/clibase"
Expand Down Expand Up @@ -164,25 +166,91 @@ func main() {
root.Children = append(root.Children, &clibase.Cmd{
Use: "agent",
Handler: func(inv *clibase.Invocation) error {
agent := codersdk.WorkspaceAgent{
Status: codersdk.WorkspaceAgentDisconnected,
LifecycleState: codersdk.WorkspaceAgentLifecycleReady,
var agent codersdk.WorkspaceAgent
var logs []codersdk.WorkspaceAgentStartupLog

fetchSteps := []func(){
func() {
createdAt := time.Now().Add(-time.Minute)
agent = codersdk.WorkspaceAgent{
CreatedAt: createdAt,
Status: codersdk.WorkspaceAgentConnecting,
LifecycleState: codersdk.WorkspaceAgentLifecycleCreated,
}
},
func() {
time.Sleep(time.Second)
agent.Status = codersdk.WorkspaceAgentTimeout
},
func() {
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleStarting
startingAt := time.Now()
agent.StartedAt = &startingAt
for i := 0; i < 10; i++ {
level := codersdk.LogLevelInfo
if rand.Float64() > 0.75 { //nolint:gosec
level = codersdk.LogLevelError
}
logs = append(logs, codersdk.WorkspaceAgentStartupLog{
CreatedAt: time.Now().Add(-time.Duration(10-i) * 144 * time.Millisecond),
Output: fmt.Sprintf("Some log %d", i),
Level: level,
})
}
},
func() {
time.Sleep(time.Second)
firstConnectedAt := time.Now()
agent.FirstConnectedAt = &firstConnectedAt
lastConnectedAt := firstConnectedAt.Add(0)
agent.LastConnectedAt = &lastConnectedAt
agent.Status = codersdk.WorkspaceAgentConnected
},
func() {},
func() {
time.Sleep(5 * time.Second)
agent.Status = codersdk.WorkspaceAgentConnected
lastConnectedAt := time.Now()
agent.LastConnectedAt = &lastConnectedAt
},
}
go func() {
time.Sleep(3 * time.Second)
agent.Status = codersdk.WorkspaceAgentConnected
}()
err := cliui.Agent(inv.Context(), inv.Stdout, cliui.AgentOptions{
WorkspaceName: "dev",
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
FetchInterval: 100 * time.Millisecond,
Wait: true,
Fetch: func(_ context.Context) (codersdk.WorkspaceAgent, error) {
if len(fetchSteps) == 0 {
return agent, nil
}
step := fetchSteps[0]
fetchSteps = fetchSteps[1:]
step()
return agent, nil
},
WarnInterval: 2 * time.Second,
FetchLogs: func(_ context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
logsC := make(chan []codersdk.WorkspaceAgentStartupLog, len(logs))
if follow {
go func() {
defer close(logsC)
for _, log := range logs {
logsC <- []codersdk.WorkspaceAgentStartupLog{log}
time.Sleep(144 * time.Millisecond)
}
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
readyAt := database.Now()
agent.ReadyAt = &readyAt
}()
} else {
logsC <- logs
close(logsC)
}
return logsC, closeFunc(func() error {
return nil
}), nil
},
})
if err != nil {
return err
}
_, _ = fmt.Printf("Completed!\n")
return nil
},
})
Expand Down Expand Up @@ -278,3 +346,9 @@ func main() {
os.Exit(1)
}
}

type closeFunc func() error

func (f closeFunc) Close() error {
return f()
}
4 changes: 2 additions & 2 deletions coderd/workspaceagents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
})
require.NoError(t, err)

logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
require.NoError(t, err)
defer func() {
_ = closer.Close()
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(authToken)

logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true)
require.NoError(t, err)
defer func() {
_ = closer.Close()
Expand Down
Loading