Skip to content

feat: use v2 API for agent lifecycle updates #12278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ type Options struct {

type Client interface {
ConnectRPC(ctx context.Context) (drpc.Conn, error)
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
RewriteDERPMap(derpMap *tailcfg.DERPMap)
}
Expand Down Expand Up @@ -299,7 +298,6 @@ func (a *agent) init() {
// may be happening, but regardless after the intermittent
// failure, you'll want the agent to reconnect.
func (a *agent) runLoop() {
go a.reportLifecycleUntilClose()
go a.reportMetadataUntilGracefulShutdown()
go a.manageProcessPriorityUntilGracefulShutdown()

Expand Down Expand Up @@ -618,21 +616,19 @@ func (a *agent) reportMetadataUntilGracefulShutdown() {
}
}

// reportLifecycleUntilClose reports the current lifecycle state once. All state
// reportLifecycle reports the current lifecycle state once. All state
// changes are reported in order.
func (a *agent) reportLifecycleUntilClose() {
// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
// lifecycle reporting has to be via the "hard" context.
ctx := a.hardCtx
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn)
lastReportedIndex := 0 // Start off with the created state without reporting it.
for {
select {
case <-a.lifecycleUpdate:
case <-ctx.Done():
return
return ctx.Err()
}

for r := retry.New(time.Second, 15*time.Second); r.Wait(ctx); {
for {
a.lifecycleMu.RLock()
lastIndex := len(a.lifecycleStates) - 1
report := a.lifecycleStates[lastReportedIndex]
Expand All @@ -644,33 +640,36 @@ func (a *agent) reportLifecycleUntilClose() {
if lastIndex == lastReportedIndex {
break
}
l, err := agentsdk.ProtoFromLifecycle(report)
if err != nil {
a.logger.Critical(ctx, "failed to convert lifecycle state", slog.F("report", report))
// Skip this report; there is no point retrying. Maybe we can successfully convert the next one?
lastReportedIndex++
continue
}
payload := &proto.UpdateLifecycleRequest{Lifecycle: l}
logger := a.logger.With(slog.F("payload", payload))
logger.Debug(ctx, "reporting lifecycle state")

a.logger.Debug(ctx, "reporting lifecycle state", slog.F("payload", report))
_, err = aAPI.UpdateLifecycle(ctx, payload)
if err != nil {
return xerrors.Errorf("failed to update lifecycle: %w", err)
}

err := a.client.PostLifecycle(ctx, report)
if err == nil {
a.logger.Debug(ctx, "successfully reported lifecycle state", slog.F("payload", report))
r.Reset() // don't back off when we are successful
lastReportedIndex++
select {
case a.lifecycleReported <- report.State:
case <-a.lifecycleReported:
a.lifecycleReported <- report.State
}
if lastReportedIndex < lastIndex {
// Keep reporting until we've sent all messages, we can't
// rely on the channel triggering us before the backlog is
// consumed.
continue
}
break
logger.Debug(ctx, "successfully reported lifecycle state")
lastReportedIndex++
select {
case a.lifecycleReported <- report.State:
case <-a.lifecycleReported:
a.lifecycleReported <- report.State
}
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
a.logger.Debug(ctx, "canceled reporting lifecycle state", slog.F("payload", report))
return
if lastReportedIndex < lastIndex {
// Keep reporting until we've sent all messages, we can't
// rely on the channel triggering us before the backlog is
// consumed.
continue
}
// If we fail to report the state we probably shouldn't exit, log only.
a.logger.Error(ctx, "agent failed to report the lifecycle state", slog.Error(err))
break
}
}
}
Expand Down Expand Up @@ -780,6 +779,10 @@ func (a *agent) run() (retErr error) {
return err
})

// part of graceful shut down is reporting the final lifecycle states, e.g "ShuttingDown" so the
// lifecycle reporting has to be via gracefulShutdownBehaviorRemain
connMan.start("report lifecycle", gracefulShutdownBehaviorRemain, a.reportLifecycle)

// channels to sync goroutines below
// handle manifest
// |
Expand Down
51 changes: 27 additions & 24 deletions agent/agenttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"storj.io/drpc"
Expand Down Expand Up @@ -86,11 +88,10 @@ type Client struct {
fakeAgentAPI *FakeAgentAPI
LastWorkspaceAgent func()

mu sync.Mutex // Protects following.
lifecycleStates []codersdk.WorkspaceAgentLifecycle
logs []agentsdk.Log
derpMapUpdates chan *tailcfg.DERPMap
derpMapOnce sync.Once
mu sync.Mutex // Protects following.
logs []agentsdk.Log
derpMapUpdates chan *tailcfg.DERPMap
derpMapOnce sync.Once
}

func (*Client) RewriteDERPMap(*tailcfg.DERPMap) {}
Expand Down Expand Up @@ -122,17 +123,7 @@ func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
}

func (c *Client) GetLifecycleStates() []codersdk.WorkspaceAgentLifecycle {
c.mu.Lock()
defer c.mu.Unlock()
return c.lifecycleStates
}

func (c *Client) PostLifecycle(ctx context.Context, req agentsdk.PostLifecycleRequest) error {
c.mu.Lock()
defer c.mu.Unlock()
c.lifecycleStates = append(c.lifecycleStates, req.State)
c.logger.Debug(ctx, "post lifecycle", slog.F("req", req))
return nil
return c.fakeAgentAPI.GetLifecycleStates()
}

func (c *Client) GetStartup() <-chan *agentproto.Startup {
Expand Down Expand Up @@ -189,11 +180,12 @@ type FakeAgentAPI struct {
t testing.TB
logger slog.Logger

manifest *agentproto.Manifest
startupCh chan *agentproto.Startup
statsCh chan *agentproto.Stats
appHealthCh chan *agentproto.BatchUpdateAppHealthRequest
logsCh chan<- *agentproto.BatchCreateLogsRequest
manifest *agentproto.Manifest
startupCh chan *agentproto.Startup
statsCh chan *agentproto.Stats
appHealthCh chan *agentproto.BatchUpdateAppHealthRequest
logsCh chan<- *agentproto.BatchCreateLogsRequest
lifecycleStates []codersdk.WorkspaceAgentLifecycle

getServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
}
Expand Down Expand Up @@ -231,9 +223,20 @@ func (f *FakeAgentAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateSt
return &agentproto.UpdateStatsResponse{ReportInterval: durationpb.New(statsInterval)}, nil
}

func (*FakeAgentAPI) UpdateLifecycle(context.Context, *agentproto.UpdateLifecycleRequest) (*agentproto.Lifecycle, error) {
// TODO implement me
panic("implement me")
func (f *FakeAgentAPI) GetLifecycleStates() []codersdk.WorkspaceAgentLifecycle {
f.Lock()
defer f.Unlock()
return slices.Clone(f.lifecycleStates)
}

func (f *FakeAgentAPI) UpdateLifecycle(_ context.Context, req *agentproto.UpdateLifecycleRequest) (*agentproto.Lifecycle, error) {
f.Lock()
defer f.Unlock()
s, err := agentsdk.LifecycleStateFromProto(req.GetLifecycle().GetState())
if assert.NoError(f.t, err) {
f.lifecycleStates = append(f.lifecycleStates, s)
}
return req.GetLifecycle(), nil
}

func (f *FakeAgentAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
Expand Down
3 changes: 3 additions & 0 deletions codersdk/agentsdk/agentsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ type PostLifecycleRequest struct {
ChangedAt time.Time `json:"changed_at"`
}

// PostLifecycle posts the agent's lifecycle to the Coder server.
//
// Deprecated: Use UpdateLifecycle on the dRPC API instead
func (c *Client) PostLifecycle(ctx context.Context, req PostLifecycleRequest) error {
res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/report-lifecycle", req)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions codersdk/agentsdk/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,22 @@ func ProtoFromLog(log Log) (*proto.Log, error) {
Level: proto.Log_Level(lvl),
}, nil
}

func ProtoFromLifecycle(req PostLifecycleRequest) (*proto.Lifecycle, error) {
s, ok := proto.Lifecycle_State_value[strings.ToUpper(string(req.State))]
if !ok {
return nil, xerrors.Errorf("unknown lifecycle state: %s", req.State)
}
return &proto.Lifecycle{
State: proto.Lifecycle_State(s),
ChangedAt: timestamppb.New(req.ChangedAt),
}, nil
}

func LifecycleStateFromProto(s proto.Lifecycle_State) (codersdk.WorkspaceAgentLifecycle, error) {
caps, ok := proto.Lifecycle_State_name[int32(s)]
if !ok {
return "", xerrors.Errorf("unknown lifecycle state: %d", s)
}
return codersdk.WorkspaceAgentLifecycle(strings.ToLower(caps)), nil
}
15 changes: 15 additions & 0 deletions codersdk/agentsdk/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"tailscale.com/tailcfg"

"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
Expand Down Expand Up @@ -161,3 +162,17 @@ func TestSubsystems(t *testing.T) {
proto.Startup_EXECTRACE,
})
}

func TestProtoFromLifecycle(t *testing.T) {
t.Parallel()
now := dbtime.Now()
for _, s := range codersdk.WorkspaceAgentLifecycleOrder {
sr := agentsdk.PostLifecycleRequest{State: s, ChangedAt: now}
pr, err := agentsdk.ProtoFromLifecycle(sr)
require.NoError(t, err)
require.Equal(t, now, pr.ChangedAt.AsTime())
state, err := agentsdk.LifecycleStateFromProto(pr.State)
require.NoError(t, err)
require.Equal(t, s, state)
}
}