Skip to content

feat: Add workspace agent lifecycle state reporting #5785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add tests, improve lifecycle reporting
  • Loading branch information
mafredri committed Jan 23, 2023
commit 534f9546e33b4827da337d06473a1f10c865bae2
74 changes: 51 additions & 23 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func New(options Options) io.Closer {
exchangeToken: options.ExchangeToken,
filesystem: options.Filesystem,
tempDir: options.TempDir,
lifecycleUpdate: make(chan struct{}, 1),
}
a.init(ctx)
return a
Expand All @@ -128,8 +129,9 @@ type agent struct {
sessionToken atomic.Pointer[string]
sshServer *ssh.Server

lifecycleMu sync.Mutex // Protects following.
lifecycleState codersdk.WorkspaceAgentLifecycle
lifecycleUpdate chan struct{}
lifecycleMu sync.Mutex // Protects following.
lifecycleState codersdk.WorkspaceAgentLifecycle

network *tailnet.Conn
}
Expand All @@ -139,6 +141,8 @@ type agent struct {
// may be happening, but regardless after the intermittent
// failure, you'll want the agent to reconnect.
func (a *agent) runLoop(ctx context.Context) {
go a.reportLifecycleLoop(ctx)

for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
a.logger.Info(ctx, "running loop")
err := a.run(ctx)
Expand All @@ -160,27 +164,51 @@ func (a *agent) runLoop(ctx context.Context) {
}
}

func (a *agent) setLifecycle(ctx context.Context, state codersdk.WorkspaceAgentLifecycle) {
a.lifecycleMu.Lock()
defer a.lifecycleMu.Unlock()
// reportLifecycleLoop reports the current lifecycle state once.
// Only the latest state is reported, intermediate states may be
// lost if the agent can't communicate with the API.
func (a *agent) reportLifecycleLoop(ctx context.Context) {
var lastReported codersdk.WorkspaceAgentLifecycle
for {
select {
case <-a.lifecycleUpdate:
case <-ctx.Done():
return
}

a.lifecycleState = state
for r := retry.New(time.Second, 15*time.Second); r.Wait(ctx); {
a.lifecycleMu.Lock()
state := a.lifecycleState
a.lifecycleMu.Unlock()

var err error
for r := retry.New(time.Second, 30*time.Second); r.Wait(ctx); {
err = a.client.PostWorkspaceAgentLifecycle(ctx, codersdk.PostWorkspaceAgentLifecycleRequest{
State: state,
})
if err == nil {
return
if state == lastReported {
continue
}

err := a.client.PostWorkspaceAgentLifecycle(ctx, codersdk.PostWorkspaceAgentLifecycleRequest{
State: state,
})
if err == nil {
lastReported = state
break
}
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
return
}
// If we fail to report the state we probably shouldn't exit, log only.
a.logger.Error(ctx, "post state", slog.Error(err))
}
}
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) || a.isClosed() {
return
}
if err != nil {
// If we fail to report the state we probably shouldn't exit, log only.
a.logger.Error(ctx, "post state", slog.Error(err))
}

func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
a.lifecycleMu.Lock()
defer a.lifecycleMu.Unlock()

a.lifecycleState = state
select {
case a.lifecycleUpdate <- struct{}{}:
default:
}
}

Expand Down Expand Up @@ -224,14 +252,14 @@ func (a *agent) run(ctx context.Context) error {
timeout = t.C
}

a.setLifecycle(ctx, codersdk.WorkspaceAgentLifecycleStarting)
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStarting)

var err error
select {
case err = <-scriptDone:
case <-timeout:
a.logger.Warn(ctx, "startup script timed out")
a.setLifecycle(ctx, codersdk.WorkspaceAgentLifecycleStartTimeout)
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartTimeout)
err = <-scriptDone // The script can still complete after a timeout.
}
if errors.Is(err, context.Canceled) {
Expand All @@ -240,7 +268,7 @@ func (a *agent) run(ctx context.Context) error {
execTime := time.Since(scriptStart)
if err != nil {
a.logger.Warn(ctx, "startup script failed", slog.F("execution_time", execTime), slog.Error(err))
a.setLifecycle(ctx, codersdk.WorkspaceAgentLifecycleStartError)
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartError)
return
}
a.logger.Info(ctx, "startup script completed", slog.F("execution_time", execTime))
Expand All @@ -255,7 +283,7 @@ func (a *agent) run(ctx context.Context) error {
}
}

a.setLifecycle(ctx, codersdk.WorkspaceAgentLifecycleReady)
a.setLifecycle(codersdk.WorkspaceAgentLifecycleReady)
}()
}

Expand Down
143 changes: 124 additions & 19 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAgent_Stats_SSH(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

conn, stats, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
conn, _, stats, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)

sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
Expand All @@ -83,7 +83,7 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

conn, stats, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
conn, _, stats, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)

ptyConn, err := conn.ReconnectingPTY(ctx, uuid.New(), 128, 128, "/bin/bash")
require.NoError(t, err)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestAgent_SFTP(t *testing.T) {
if runtime.GOOS == "windows" {
home = "/" + strings.ReplaceAll(home, "\\", "/")
}
conn, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
conn, _, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
defer sshClient.Close()
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestAgent_SCP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

conn, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
conn, _, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
defer sshClient.Close()
Expand Down Expand Up @@ -666,7 +666,7 @@ func TestAgent_StartupScript(t *testing.T) {
t.Skip("This test doesn't work on Windows for some reason...")
}
content := "output"
_, _, fs := setupAgent(t, codersdk.WorkspaceAgentMetadata{
_, _, _, fs := setupAgent(t, codersdk.WorkspaceAgentMetadata{
StartupScript: "echo " + content,
}, 0)
var gotContent string
Expand Down Expand Up @@ -694,6 +694,97 @@ func TestAgent_StartupScript(t *testing.T) {
require.Equal(t, content, strings.TrimSpace(gotContent))
}

func TestAgent_Lifecycle(t *testing.T) {
t.Parallel()

t.Run("Timeout", func(t *testing.T) {
t.Parallel()

_, client, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{
StartupScript: "sleep 10",
StartupScriptTimeout: time.Nanosecond,
}, 0)

want := []codersdk.WorkspaceAgentLifecycle{
codersdk.WorkspaceAgentLifecycleStarting,
codersdk.WorkspaceAgentLifecycleStartTimeout,
}

var got []codersdk.WorkspaceAgentLifecycle
assert.Eventually(t, func() bool {
got = client.getLifecycleStates()
return len(got) > 0 && got[len(got)-1] == want[len(want)-1]
}, testutil.WaitShort, testutil.IntervalMedium)
switch len(got) {
case 1:
// This can happen if lifecycle states are too
// fast, only the latest on is reported.
require.Equal(t, want[1:], got)
default:
// This is the expected case.
require.Equal(t, want, got)
}
})

t.Run("Error", func(t *testing.T) {
t.Parallel()

_, client, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{
StartupScript: "false",
StartupScriptTimeout: 30 * time.Second,
}, 0)

want := []codersdk.WorkspaceAgentLifecycle{
codersdk.WorkspaceAgentLifecycleStarting,
codersdk.WorkspaceAgentLifecycleStartError,
}

var got []codersdk.WorkspaceAgentLifecycle
assert.Eventually(t, func() bool {
got = client.getLifecycleStates()
return len(got) > 0 && got[len(got)-1] == want[len(want)-1]
}, testutil.WaitShort, testutil.IntervalMedium)
switch len(got) {
case 1:
// This can happen if lifecycle states are too
// fast, only the latest on is reported.
require.Equal(t, want[1:], got)
default:
// This is the expected case.
require.Equal(t, want, got)
}
})

t.Run("Ready", func(t *testing.T) {
t.Parallel()

_, client, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{
StartupScript: "true",
StartupScriptTimeout: 30 * time.Second,
}, 0)

want := []codersdk.WorkspaceAgentLifecycle{
codersdk.WorkspaceAgentLifecycleStarting,
codersdk.WorkspaceAgentLifecycleReady,
}

var got []codersdk.WorkspaceAgentLifecycle
assert.Eventually(t, func() bool {
got = client.getLifecycleStates()
return len(got) > 0 && got[len(got)-1] == want[len(want)-1]
}, testutil.WaitShort, testutil.IntervalMedium)
switch len(got) {
case 1:
// This can happen if lifecycle states are too
// fast, only the latest on is reported.
require.Equal(t, want[1:], got)
default:
// This is the expected case.
require.Equal(t, want, got)
}
})
}

func TestAgent_ReconnectingPTY(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
Expand All @@ -706,7 +797,7 @@ func TestAgent_ReconnectingPTY(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

conn, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
conn, _, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
id := uuid.New()
netConn, err := conn.ReconnectingPTY(ctx, id, 100, 100, "/bin/bash")
require.NoError(t, err)
Expand Down Expand Up @@ -807,7 +898,7 @@ func TestAgent_Dial(t *testing.T) {
}
}()

conn, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
conn, _, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
require.True(t, conn.AwaitReachable(context.Background()))
conn1, err := conn.DialContext(context.Background(), l.Addr().Network(), l.Addr().String())
require.NoError(t, err)
Expand All @@ -828,7 +919,7 @@ func TestAgent_Speedtest(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
derpMap := tailnettest.RunDERPAndSTUN(t)
conn, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{
conn, _, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{
DERPMap: derpMap,
}, 0)
defer conn.Close()
Expand Down Expand Up @@ -913,7 +1004,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
}

func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exec.Cmd {
agentConn, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
agentConn, _, _, _ := setupAgent(t, codersdk.WorkspaceAgentMetadata{}, 0)
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
waitGroup := sync.WaitGroup{}
Expand Down Expand Up @@ -959,7 +1050,7 @@ func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exe
func setupSSHSession(t *testing.T, options codersdk.WorkspaceAgentMetadata) *ssh.Session {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
conn, _, _ := setupAgent(t, options, 0)
conn, _, _, _ := setupAgent(t, options, 0)
sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
t.Cleanup(func() {
Expand All @@ -981,6 +1072,7 @@ func (c closeFunc) Close() error {

func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeout time.Duration) (
*codersdk.AgentConn,
*client,
<-chan *codersdk.AgentStats,
afero.Fs,
) {
Expand All @@ -994,14 +1086,15 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
agentID := uuid.New()
statsCh := make(chan *codersdk.AgentStats, 50)
fs := afero.NewMemMapFs()
c := &client{
t: t,
agentID: agentID,
metadata: metadata,
statsChan: statsCh,
coordinator: coordinator,
}
closer := agent.New(agent.Options{
Client: &client{
t: t,
agentID: agentID,
metadata: metadata,
statsChan: statsCh,
coordinator: coordinator,
},
Client: c,
Filesystem: fs,
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
ReconnectingPTYTimeout: ptyTimeout,
Expand Down Expand Up @@ -1034,7 +1127,7 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
conn.SetNodeCallback(sendNode)
return &codersdk.AgentConn{
Conn: conn,
}, statsCh, fs
}, c, statsCh, fs
}

var dialTestPayload = []byte("dean-was-here123")
Expand Down Expand Up @@ -1075,6 +1168,9 @@ type client struct {
statsChan chan *codersdk.AgentStats
coordinator tailnet.Coordinator
lastWorkspaceAgent func()

mu sync.Mutex // Protects following.
lifecycleStates []codersdk.WorkspaceAgentLifecycle
}

func (c *client) WorkspaceAgentMetadata(_ context.Context) (codersdk.WorkspaceAgentMetadata, error) {
Expand Down Expand Up @@ -1130,7 +1226,16 @@ func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func
}), nil
}

func (*client) PostWorkspaceAgentLifecycle(_ context.Context, _ codersdk.PostWorkspaceAgentLifecycleRequest) error {
func (c *client) getLifecycleStates() []codersdk.WorkspaceAgentLifecycle {
c.mu.Lock()
defer c.mu.Unlock()
return c.lifecycleStates
}

func (c *client) PostWorkspaceAgentLifecycle(_ context.Context, req codersdk.PostWorkspaceAgentLifecycleRequest) error {
c.mu.Lock()
defer c.mu.Unlock()
c.lifecycleStates = append(c.lifecycleStates, req.State)
return nil
}

Expand Down
Loading