Skip to content

Commit 5586ecf

Browse files
committed
feat: use v2 API for agent lifecycle updates
1 parent 4cc132c commit 5586ecf

File tree

5 files changed

+98
-56
lines changed

5 files changed

+98
-56
lines changed

agent/agent.go

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ type Options struct {
9090

9191
type Client interface {
9292
ConnectRPC(ctx context.Context) (drpc.Conn, error)
93-
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9493
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
9594
RewriteDERPMap(derpMap *tailcfg.DERPMap)
9695
}
@@ -299,7 +298,6 @@ func (a *agent) init() {
299298
// may be happening, but regardless after the intermittent
300299
// failure, you'll want the agent to reconnect.
301300
func (a *agent) runLoop() {
302-
go a.reportLifecycleUntilClose()
303301
go a.reportMetadataUntilGracefulShutdown()
304302
go a.manageProcessPriorityUntilGracefulShutdown()
305303

@@ -618,21 +616,19 @@ func (a *agent) reportMetadataUntilGracefulShutdown() {
618616
}
619617
}
620618

621-
// reportLifecycleUntilClose reports the current lifecycle state once. All state
619+
// reportLifecycle reports the current lifecycle state once. All state
622620
// changes are reported in order.
623-
func (a *agent) reportLifecycleUntilClose() {
624-
// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
625-
// lifecycle reporting has to be via the "hard" context.
626-
ctx := a.hardCtx
621+
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
622+
aAPI := proto.NewDRPCAgentClient(conn)
627623
lastReportedIndex := 0 // Start off with the created state without reporting it.
628624
for {
629625
select {
630626
case <-a.lifecycleUpdate:
631627
case <-ctx.Done():
632-
return
628+
return ctx.Err()
633629
}
634630

635-
for r := retry.New(time.Second, 15*time.Second); r.Wait(ctx); {
631+
for {
636632
a.lifecycleMu.RLock()
637633
lastIndex := len(a.lifecycleStates) - 1
638634
report := a.lifecycleStates[lastReportedIndex]
@@ -644,33 +640,35 @@ func (a *agent) reportLifecycleUntilClose() {
644640
if lastIndex == lastReportedIndex {
645641
break
646642
}
643+
l, err := agentsdk.ProtoFromLifecycle(report)
644+
if err != nil {
645+
a.logger.Critical(ctx, "failed to convert lifecycle state", slog.F("report", report))
646+
// Skip this report; there is no point retrying. Maybe we can successfully convert the next one?
647+
lastReportedIndex++
648+
continue
649+
}
650+
logger := a.logger.With(slog.F("payload", l))
651+
logger.Debug(ctx, "reporting lifecycle state")
647652

648-
a.logger.Debug(ctx, "reporting lifecycle state", slog.F("payload", report))
653+
_, err = aAPI.UpdateLifecycle(ctx, &proto.UpdateLifecycleRequest{Lifecycle: l})
654+
if err != nil {
655+
return xerrors.Errorf("failed to update lifecycle: %w", err)
656+
}
649657

650-
err := a.client.PostLifecycle(ctx, report)
651-
if err == nil {
652-
a.logger.Debug(ctx, "successfully reported lifecycle state", slog.F("payload", report))
653-
r.Reset() // don't back off when we are successful
654-
lastReportedIndex++
655-
select {
656-
case a.lifecycleReported <- report.State:
657-
case <-a.lifecycleReported:
658-
a.lifecycleReported <- report.State
659-
}
660-
if lastReportedIndex < lastIndex {
661-
// Keep reporting until we've sent all messages, we can't
662-
// rely on the channel triggering us before the backlog is
663-
// consumed.
664-
continue
665-
}
666-
break
658+
logger.Debug(ctx, "successfully reported lifecycle state")
659+
lastReportedIndex++
660+
select {
661+
case a.lifecycleReported <- report.State:
662+
case <-a.lifecycleReported:
663+
a.lifecycleReported <- report.State
667664
}
668-
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
669-
a.logger.Debug(ctx, "canceled reporting lifecycle state", slog.F("payload", report))
670-
return
665+
if lastReportedIndex < lastIndex {
666+
// Keep reporting until we've sent all messages, we can't
667+
// rely on the channel triggering us before the backlog is
668+
// consumed.
669+
continue
671670
}
672-
// If we fail to report the state we probably shouldn't exit, log only.
673-
a.logger.Error(ctx, "agent failed to report the lifecycle state", slog.Error(err))
671+
break
674672
}
675673
}
676674
}
@@ -780,6 +778,10 @@ func (a *agent) run() (retErr error) {
780778
return err
781779
})
782780

781+
// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
782+
// lifecycle reporting has to be via gracefulShutdownBehaviorRemain
783+
connMan.start("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)
784+
783785
// channels to sync goroutines below
784786
// handle manifest
785787
// |

agent/agenttest/client.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
"time"
1010

1111
"github.com/google/uuid"
12+
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/require"
1314
"golang.org/x/exp/maps"
15+
"golang.org/x/exp/slices"
1416
"golang.org/x/xerrors"
1517
"google.golang.org/protobuf/types/known/durationpb"
1618
"storj.io/drpc"
@@ -86,11 +88,10 @@ type Client struct {
8688
fakeAgentAPI *FakeAgentAPI
8789
LastWorkspaceAgent func()
8890

89-
mu sync.Mutex // Protects following.
90-
lifecycleStates []codersdk.WorkspaceAgentLifecycle
91-
logs []agentsdk.Log
92-
derpMapUpdates chan *tailcfg.DERPMap
93-
derpMapOnce sync.Once
91+
mu sync.Mutex // Protects following.
92+
logs []agentsdk.Log
93+
derpMapUpdates chan *tailcfg.DERPMap
94+
derpMapOnce sync.Once
9495
}
9596

9697
func (*Client) RewriteDERPMap(*tailcfg.DERPMap) {}
@@ -122,17 +123,7 @@ func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
122123
}
123124

124125
func (c *Client) GetLifecycleStates() []codersdk.WorkspaceAgentLifecycle {
125-
c.mu.Lock()
126-
defer c.mu.Unlock()
127-
return c.lifecycleStates
128-
}
129-
130-
func (c *Client) PostLifecycle(ctx context.Context, req agentsdk.PostLifecycleRequest) error {
131-
c.mu.Lock()
132-
defer c.mu.Unlock()
133-
c.lifecycleStates = append(c.lifecycleStates, req.State)
134-
c.logger.Debug(ctx, "post lifecycle", slog.F("req", req))
135-
return nil
126+
return c.fakeAgentAPI.GetLifecycleStates()
136127
}
137128

138129
func (c *Client) GetStartup() <-chan *agentproto.Startup {
@@ -189,11 +180,12 @@ type FakeAgentAPI struct {
189180
t testing.TB
190181
logger slog.Logger
191182

192-
manifest *agentproto.Manifest
193-
startupCh chan *agentproto.Startup
194-
statsCh chan *agentproto.Stats
195-
appHealthCh chan *agentproto.BatchUpdateAppHealthRequest
196-
logsCh chan<- *agentproto.BatchCreateLogsRequest
183+
manifest *agentproto.Manifest
184+
startupCh chan *agentproto.Startup
185+
statsCh chan *agentproto.Stats
186+
appHealthCh chan *agentproto.BatchUpdateAppHealthRequest
187+
logsCh chan<- *agentproto.BatchCreateLogsRequest
188+
lifecycleStates []codersdk.WorkspaceAgentLifecycle
197189

198190
getServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
199191
}
@@ -231,9 +223,20 @@ func (f *FakeAgentAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateSt
231223
return &agentproto.UpdateStatsResponse{ReportInterval: durationpb.New(statsInterval)}, nil
232224
}
233225

234-
func (*FakeAgentAPI) UpdateLifecycle(context.Context, *agentproto.UpdateLifecycleRequest) (*agentproto.Lifecycle, error) {
235-
// TODO implement me
236-
panic("implement me")
226+
func (f *FakeAgentAPI) GetLifecycleStates() []codersdk.WorkspaceAgentLifecycle {
227+
f.Lock()
228+
defer f.Unlock()
229+
return slices.Clone(f.lifecycleStates)
230+
}
231+
232+
func (f *FakeAgentAPI) UpdateLifecycle(_ context.Context, req *agentproto.UpdateLifecycleRequest) (*agentproto.Lifecycle, error) {
233+
f.Lock()
234+
defer f.Unlock()
235+
s, err := agentsdk.LifecycleStateFromProto(req.GetLifecycle().GetState())
236+
if assert.NoError(f.t, err) {
237+
f.lifecycleStates = append(f.lifecycleStates, s)
238+
}
239+
return req.GetLifecycle(), nil
237240
}
238241

239242
func (f *FakeAgentAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {

codersdk/agentsdk/agentsdk.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,9 @@ type PostLifecycleRequest struct {
485485
ChangedAt time.Time `json:"changed_at"`
486486
}
487487

488+
// PostLifecycle posts the agent's lifecycle to the Coder server.
489+
//
490+
// Deprecated: Use UpdateLifecycle on the dRPC API instead
488491
func (c *Client) PostLifecycle(ctx context.Context, req PostLifecycleRequest) error {
489492
res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/report-lifecycle", req)
490493
if err != nil {

codersdk/agentsdk/convert.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,3 +311,22 @@ func ProtoFromLog(log Log) (*proto.Log, error) {
311311
Level: proto.Log_Level(lvl),
312312
}, nil
313313
}
314+
315+
func ProtoFromLifecycle(req PostLifecycleRequest) (*proto.Lifecycle, error) {
316+
s, ok := proto.Lifecycle_State_value[strings.ToUpper(string(req.State))]
317+
if !ok {
318+
return nil, xerrors.Errorf("unknown lifecycle state: %s", req.State)
319+
}
320+
return &proto.Lifecycle{
321+
State: proto.Lifecycle_State(s),
322+
ChangedAt: timestamppb.New(req.ChangedAt),
323+
}, nil
324+
}
325+
326+
func LifecycleStateFromProto(s proto.Lifecycle_State) (codersdk.WorkspaceAgentLifecycle, error) {
327+
caps, ok := proto.Lifecycle_State_name[int32(s)]
328+
if !ok {
329+
return "", xerrors.Errorf("unknown lifecycle state: %d", s)
330+
}
331+
return codersdk.WorkspaceAgentLifecycle(strings.ToLower(caps)), nil
332+
}

codersdk/agentsdk/convert_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"tailscale.com/tailcfg"
1010

1111
"github.com/coder/coder/v2/agent/proto"
12+
"github.com/coder/coder/v2/coderd/database/dbtime"
1213
"github.com/coder/coder/v2/codersdk"
1314
"github.com/coder/coder/v2/codersdk/agentsdk"
1415
"github.com/coder/coder/v2/tailnet"
@@ -161,3 +162,17 @@ func TestSubsystems(t *testing.T) {
161162
proto.Startup_EXECTRACE,
162163
})
163164
}
165+
166+
func TestProtoFromLifecycle(t *testing.T) {
167+
t.Parallel()
168+
now := dbtime.Now()
169+
for _, s := range codersdk.WorkspaceAgentLifecycleOrder {
170+
sr := agentsdk.PostLifecycleRequest{State: s, ChangedAt: now}
171+
pr, err := agentsdk.ProtoFromLifecycle(sr)
172+
require.NoError(t, err)
173+
require.Equal(t, now, pr.ChangedAt.AsTime())
174+
state, err := agentsdk.LifecycleStateFromProto(pr.State)
175+
require.NoError(t, err)
176+
require.Equal(t, s, state)
177+
}
178+
}

0 commit comments

Comments
 (0)