Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix(agent): keep track of lastReportIndex between invocations of repo…
…rtLifecycle()
  • Loading branch information
johnstcn committed Apr 25, 2024
commit 9c33880b22d330772db071a3e242bac1ef0e1ba8
24 changes: 12 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,11 @@ type agent struct {
sshServer *agentssh.Server
sshMaxTimeout time.Duration

lifecycleUpdate chan struct{}
lifecycleReported chan codersdk.WorkspaceAgentLifecycle
lifecycleMu sync.RWMutex // Protects following.
lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleUpdate chan struct{}
lifecycleReported chan codersdk.WorkspaceAgentLifecycle
lifecycleMu sync.RWMutex // Protects following.
lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.

network *tailnet.Conn
addresses []netip.Prefix
Expand Down Expand Up @@ -625,7 +626,6 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
// changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn)
lastReportedIndex := 0 // Start off with the created state without reporting it.
for {
select {
case <-a.lifecycleUpdate:
Expand All @@ -636,20 +636,20 @@ func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
for {
a.lifecycleMu.RLock()
lastIndex := len(a.lifecycleStates) - 1
report := a.lifecycleStates[lastReportedIndex]
if len(a.lifecycleStates) > lastReportedIndex+1 {
report = a.lifecycleStates[lastReportedIndex+1]
report := a.lifecycleStates[a.lifecycleLastReportedIndex]
if len(a.lifecycleStates) > a.lifecycleLastReportedIndex+1 {
report = a.lifecycleStates[a.lifecycleLastReportedIndex+1]
}
a.lifecycleMu.RUnlock()

if lastIndex == lastReportedIndex {
if lastIndex == a.lifecycleLastReportedIndex {
break
}
l, err := agentsdk.ProtoFromLifecycle(report)
if err != nil {
a.logger.Critical(ctx, "failed to convert lifecycle state", slog.F("report", report))
// Skip this report; there is no point retrying. Maybe we can successfully convert the next one?
lastReportedIndex++
a.lifecycleLastReportedIndex++
continue
}
payload := &proto.UpdateLifecycleRequest{Lifecycle: l}
Expand All @@ -662,13 +662,13 @@ func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
}

logger.Debug(ctx, "successfully reported lifecycle state")
lastReportedIndex++
a.lifecycleLastReportedIndex++
select {
case a.lifecycleReported <- report.State:
case <-a.lifecycleReported:
a.lifecycleReported <- report.State
}
if lastReportedIndex < lastIndex {
if a.lifecycleLastReportedIndex < lastIndex {
// Keep reporting until we've sent all messages, we can't
// rely on the channel triggering us before the backlog is
// consumed.
Expand Down