Skip to content

fix(agent): keep track of lastReportIndex between invocations of reportLifecycle() #13075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,11 @@ type agent struct {
sshServer *agentssh.Server
sshMaxTimeout time.Duration

lifecycleUpdate chan struct{}
lifecycleReported chan codersdk.WorkspaceAgentLifecycle
lifecycleMu sync.RWMutex // Protects following.
lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleUpdate chan struct{}
lifecycleReported chan codersdk.WorkspaceAgentLifecycle
lifecycleMu sync.RWMutex // Protects following.
lifecycleStates []agentsdk.PostLifecycleRequest
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.

network *tailnet.Conn
addresses []netip.Prefix
Expand Down Expand Up @@ -625,7 +626,6 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
// changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
aAPI := proto.NewDRPCAgentClient(conn)
lastReportedIndex := 0 // Start off with the created state without reporting it.
for {
select {
case <-a.lifecycleUpdate:
Expand All @@ -636,20 +636,20 @@ func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
for {
a.lifecycleMu.RLock()
lastIndex := len(a.lifecycleStates) - 1
report := a.lifecycleStates[lastReportedIndex]
if len(a.lifecycleStates) > lastReportedIndex+1 {
report = a.lifecycleStates[lastReportedIndex+1]
report := a.lifecycleStates[a.lifecycleLastReportedIndex]
if len(a.lifecycleStates) > a.lifecycleLastReportedIndex+1 {
report = a.lifecycleStates[a.lifecycleLastReportedIndex+1]
}
a.lifecycleMu.RUnlock()

if lastIndex == lastReportedIndex {
if lastIndex == a.lifecycleLastReportedIndex {
break
}
l, err := agentsdk.ProtoFromLifecycle(report)
if err != nil {
a.logger.Critical(ctx, "failed to convert lifecycle state", slog.F("report", report))
// Skip this report; there is no point retrying. Maybe we can successfully convert the next one?
lastReportedIndex++
a.lifecycleLastReportedIndex++
continue
}
payload := &proto.UpdateLifecycleRequest{Lifecycle: l}
Expand All @@ -662,13 +662,13 @@ func (a *agent) reportLifecycle(ctx context.Context, conn drpc.Conn) error {
}

logger.Debug(ctx, "successfully reported lifecycle state")
lastReportedIndex++
a.lifecycleLastReportedIndex++
select {
case a.lifecycleReported <- report.State:
case <-a.lifecycleReported:
a.lifecycleReported <- report.State
}
if lastReportedIndex < lastIndex {
if a.lifecycleLastReportedIndex < lastIndex {
// Keep reporting until we've sent all messages, we can't
// rely on the channel triggering us before the backlog is
// consumed.
Expand Down
Loading