Skip to content

feat: add provisioner job hang detector #7927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'main' into dean/hang-detector
  • Loading branch information
deansheather committed Jun 21, 2023
commit 8f16c3bde342f486f8812804f7f24b205dca8bf7
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ jobs:

- name: Install sqlc
run: |
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.17.2/sqlc_1.17.2_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.18.0/sqlc_1.18.0_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc

- name: go install tools
run: |
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/security.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:

- name: Install sqlc
run: |
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.17.2/sqlc_1.17.2_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.18.0/sqlc_1.18.0_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
- name: Install yq
run: go run github.com/mikefarah/yq/v4@v4.30.6
- name: Install mockgen
Expand Down Expand Up @@ -125,6 +125,15 @@ jobs:
pcc_pass: ${{ secrets.PRISMA_CLOUD_SECRET_KEY }}
image_name: ${{ steps.build.outputs.image }}

- name: Scan image
id: sysdig-scan
uses: sysdiglabs/scan-action@v3
with:
image-tag: ${{ steps.build.outputs.image }}
sysdig-secure-token: ${{ secrets.SYSDIG_API_TOKEN }}
input-type: docker-daemon
sysdig-secure-url: https://app.us4.sysdig.com

- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@41f05d9ecffa2ed3f1580af306000f734b733e54
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ coderd/database/dump.sql: coderd/database/gen/dump/main.go $(wildcard coderd/dat
go run ./coderd/database/gen/dump/main.go

# Generates Go code for querying the database.
coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $(wildcard coderd/database/queries/*.sql) coderd/database/gen/enum/main.go coderd/database/gen/fake/main.go
coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $(wildcard coderd/database/queries/*.sql)
./coderd/database/generate.sh


Expand Down
227 changes: 123 additions & 104 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
lastCollectedAts[mr.key] = mr.result.CollectedAt
err := a.client.PostMetadata(ctx, mr.key, *mr.result)
if err != nil {
a.logger.Error(ctx, "report metadata", slog.Error(err))
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
}
case <-baseTicker.C:
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func (a *agent) reportLifecycleLoop(ctx context.Context) {
return
}
// If we fail to report the state we probably shouldn't exit, log only.
a.logger.Error(ctx, "post state", slog.Error(err))
a.logger.Error(ctx, "agent failed to report the lifecycle state", slog.Error(err))
}
}
}
Expand Down Expand Up @@ -911,122 +911,141 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
return nil
}

func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
// Initialize variables for log management
queuedLogs := make([]agentsdk.StartupLog, 0)
var flushLogsTimer *time.Timer
var logMutex sync.Mutex
logsFlushed := sync.NewCond(&sync.Mutex{})
var logsSending bool
defer func() {
logMutex.Lock()
if flushLogsTimer != nil {
flushLogsTimer.Stop()
}
logMutex.Unlock()
}()
func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) {
// Synchronous sender, there can only be one outbound send at a time.
//
// It's important that we either flush or drop all logs before returning
// because the startup state is reported after flush.
sendDone := make(chan struct{})
send := make(chan []agentsdk.StartupLog, 1)
go func() {
// Set flushTimeout and backlogLimit so that logs are uploaded
// once every 250ms or when 100 logs have been added to the
// backlog, whichever comes first.
flushTimeout := 250 * time.Millisecond
backlogLimit := 100

// sendLogs function uploads the queued logs to the server
sendLogs := func() {
// Lock logMutex and check if logs are already being sent
logMutex.Lock()
if logsSending {
logMutex.Unlock()
return
}
if flushLogsTimer != nil {
flushLogsTimer.Stop()
}
if len(queuedLogs) == 0 {
logMutex.Unlock()
return
}
// Move the current queued logs to logsToSend and clear the queue
logsToSend := queuedLogs
logsSending = true
queuedLogs = make([]agentsdk.StartupLog, 0)
logMutex.Unlock()

// Retry uploading logs until successful or a specific error occurs
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
Logs: logsToSend,
})
if err == nil {
break
flush := time.NewTicker(flushTimeout)

var backlog []agentsdk.StartupLog
defer func() {
flush.Stop()
_ = reader.Close() // Ensure read routine is closed.
if len(backlog) > 0 {
a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog)))
}
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) {
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
a.logger.Warn(ctx, "startup logs too large, dropping logs")
break
a.logger.Debug(ctx, "track script logs sender exited")
close(sendDone)
}()

done := false
for {
flushed := false
select {
case <-ctx.Done():
return
case <-a.closed:
return
// Close (!ok) can be triggered by the reader closing due to
// EOF or due to agent closing, when this happens we attempt
// a final flush. If the context is canceled this will be a
// no-op.
case logs, ok := <-send:
done = !ok
if ok {
backlog = append(backlog, logs...)
flushed = len(backlog) >= backlogLimit
}
case <-flush.C:
flushed = true
}

if (done || flushed) && len(backlog) > 0 {
flush.Stop() // Lower the chance of a double flush.

// Retry uploading logs until successful or a specific
// error occurs.
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
Logs: backlog,
})
if err == nil {
break
}

if errors.Is(err, context.Canceled) {
return
}
var sdkErr *codersdk.Error
if errors.As(err, &sdkErr) {
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
a.logger.Warn(ctx, "startup logs too large, dropping logs")
break
}
}
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
}
if ctx.Err() != nil {
return
}
backlog = nil

// Anchor flush to the last log upload.
flush.Reset(flushTimeout)
}
if done {
return
}
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
}
// Reset logsSending flag
logMutex.Lock()
logsSending = false
flushLogsTimer.Reset(100 * time.Millisecond)
logMutex.Unlock()
logsFlushed.Broadcast()
}
// queueLog function appends a log to the queue and triggers sendLogs if necessary
queueLog := func(log agentsdk.StartupLog) {
logMutex.Lock()
defer logMutex.Unlock()

// Append log to the queue
queuedLogs = append(queuedLogs, log)

// If there are more than 100 logs, send them immediately
if len(queuedLogs) > 100 {
// Don't early return after this, because we still want
// to reset the timer just in case logs come in while
// we're sending.
go sendLogs()
}
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
if flushLogsTimer != nil {
flushLogsTimer.Reset(100 * time.Millisecond)
return
}
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
}
}()

// It's important that we either flush or drop all logs before returning
// because the startup state is reported after flush.
// Forward read lines to the sender or queue them for when the
// sender is ready to process them.
//
// It'd be weird for the startup state to be ready, but logs are still
// coming in.
logsFinished := make(chan struct{})
// We only need to track this goroutine since it will ensure that
// the sender has closed before returning.
logsDone := make(chan struct{})
err := a.trackConnGoroutine(func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
queueLog(agentsdk.StartupLog{
defer func() {
close(send)
<-sendDone
a.logger.Debug(ctx, "track script logs reader exited")
close(logsDone)
}()

var queue []agentsdk.StartupLog

s := bufio.NewScanner(reader)
for s.Scan() {
select {
case <-ctx.Done():
return
case <-a.closed:
return
case queue = <-send:
// Not captured by sender yet, re-use.
default:
}

queue = append(queue, agentsdk.StartupLog{
CreatedAt: database.Now(),
Output: scanner.Text(),
Output: s.Text(),
})
send <- queue
queue = nil
}
if err := scanner.Err(); err != nil {
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
}
defer close(logsFinished)
logsFlushed.L.Lock()
for {
logMutex.Lock()
if len(queuedLogs) == 0 {
logMutex.Unlock()
break
}
logMutex.Unlock()
logsFlushed.Wait()
if err := s.Err(); err != nil {
a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err))
}
})
if err != nil {
return nil, xerrors.Errorf("track conn goroutine: %w", err)
close(send)
<-sendDone
close(logsDone)
return logsDone, err
}
return logsFinished, nil

return logsDone, nil
}

func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {
Expand Down Expand Up @@ -1346,7 +1365,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
)
})
if err != nil {
a.logger.Error(ctx, "report stats", slog.Error(err))
a.logger.Error(ctx, "agent failed to report stats", slog.Error(err))
} else {
if err = a.trackConnGoroutine(func() {
// This is OK because the agent never re-creates the tailnet
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.