Skip to content

Commit 8f16c3b

Browse files
committed
Merge branch 'main' into dean/hang-detector
2 parents 0b9e78a + 7703bb7 commit 8f16c3b

File tree

113 files changed

+4927
-1703
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+4927
-1703
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ jobs:
153153

154154
- name: Install sqlc
155155
run: |
156-
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.17.2/sqlc_1.17.2_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
156+
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.18.0/sqlc_1.18.0_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
157157
158158
- name: go install tools
159159
run: |

.github/workflows/security.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ jobs:
7575
7676
- name: Install sqlc
7777
run: |
78-
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.17.2/sqlc_1.17.2_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
78+
curl -sSL https://github.com/kyleconroy/sqlc/releases/download/v1.18.0/sqlc_1.18.0_linux_amd64.tar.gz | sudo tar -C /usr/bin -xz sqlc
7979
- name: Install yq
8080
run: go run github.com/mikefarah/yq/v4@v4.30.6
8181
- name: Install mockgen
@@ -125,6 +125,15 @@ jobs:
125125
pcc_pass: ${{ secrets.PRISMA_CLOUD_SECRET_KEY }}
126126
image_name: ${{ steps.build.outputs.image }}
127127

128+
- name: Scan image
129+
id: sysdig-scan
130+
uses: sysdiglabs/scan-action@v3
131+
with:
132+
image-tag: ${{ steps.build.outputs.image }}
133+
sysdig-secure-token: ${{ secrets.SYSDIG_API_TOKEN }}
134+
input-type: docker-daemon
135+
sysdig-secure-url: https://app.us4.sysdig.com
136+
128137
- name: Run Trivy vulnerability scanner
129138
uses: aquasecurity/trivy-action@41f05d9ecffa2ed3f1580af306000f734b733e54
130139
with:

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ coderd/database/dump.sql: coderd/database/gen/dump/main.go $(wildcard coderd/dat
486486
go run ./coderd/database/gen/dump/main.go
487487

488488
# Generates Go code for querying the database.
489-
coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $(wildcard coderd/database/queries/*.sql) coderd/database/gen/enum/main.go coderd/database/gen/fake/main.go
489+
coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $(wildcard coderd/database/queries/*.sql)
490490
./coderd/database/generate.sh
491491

492492

agent/agent.go

Lines changed: 123 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func (a *agent) reportMetadataLoop(ctx context.Context) {
332332
lastCollectedAts[mr.key] = mr.result.CollectedAt
333333
err := a.client.PostMetadata(ctx, mr.key, *mr.result)
334334
if err != nil {
335-
a.logger.Error(ctx, "report metadata", slog.Error(err))
335+
a.logger.Error(ctx, "agent failed to report metadata", slog.Error(err))
336336
}
337337
case <-baseTicker.C:
338338
}
@@ -462,7 +462,7 @@ func (a *agent) reportLifecycleLoop(ctx context.Context) {
462462
return
463463
}
464464
// If we fail to report the state we probably shouldn't exit, log only.
465-
a.logger.Error(ctx, "post state", slog.Error(err))
465+
a.logger.Error(ctx, "agent failed to report the lifecycle state", slog.Error(err))
466466
}
467467
}
468468
}
@@ -911,122 +911,141 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
911911
return nil
912912
}
913913

914-
func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan struct{}, error) {
915-
// Initialize variables for log management
916-
queuedLogs := make([]agentsdk.StartupLog, 0)
917-
var flushLogsTimer *time.Timer
918-
var logMutex sync.Mutex
919-
logsFlushed := sync.NewCond(&sync.Mutex{})
920-
var logsSending bool
921-
defer func() {
922-
logMutex.Lock()
923-
if flushLogsTimer != nil {
924-
flushLogsTimer.Stop()
925-
}
926-
logMutex.Unlock()
927-
}()
914+
func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) {
915+
// Synchronous sender, there can only be one outbound send at a time.
916+
//
917+
// It's important that we either flush or drop all logs before returning
918+
// because the startup state is reported after flush.
919+
sendDone := make(chan struct{})
920+
send := make(chan []agentsdk.StartupLog, 1)
921+
go func() {
922+
// Set flushTimeout and backlogLimit so that logs are uploaded
923+
// once every 250ms or when 100 logs have been added to the
924+
// backlog, whichever comes first.
925+
flushTimeout := 250 * time.Millisecond
926+
backlogLimit := 100
928927

929-
// sendLogs function uploads the queued logs to the server
930-
sendLogs := func() {
931-
// Lock logMutex and check if logs are already being sent
932-
logMutex.Lock()
933-
if logsSending {
934-
logMutex.Unlock()
935-
return
936-
}
937-
if flushLogsTimer != nil {
938-
flushLogsTimer.Stop()
939-
}
940-
if len(queuedLogs) == 0 {
941-
logMutex.Unlock()
942-
return
943-
}
944-
// Move the current queued logs to logsToSend and clear the queue
945-
logsToSend := queuedLogs
946-
logsSending = true
947-
queuedLogs = make([]agentsdk.StartupLog, 0)
948-
logMutex.Unlock()
949-
950-
// Retry uploading logs until successful or a specific error occurs
951-
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
952-
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
953-
Logs: logsToSend,
954-
})
955-
if err == nil {
956-
break
928+
flush := time.NewTicker(flushTimeout)
929+
930+
var backlog []agentsdk.StartupLog
931+
defer func() {
932+
flush.Stop()
933+
_ = reader.Close() // Ensure read routine is closed.
934+
if len(backlog) > 0 {
935+
a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog)))
957936
}
958-
var sdkErr *codersdk.Error
959-
if errors.As(err, &sdkErr) {
960-
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
961-
a.logger.Warn(ctx, "startup logs too large, dropping logs")
962-
break
937+
a.logger.Debug(ctx, "track script logs sender exited")
938+
close(sendDone)
939+
}()
940+
941+
done := false
942+
for {
943+
flushed := false
944+
select {
945+
case <-ctx.Done():
946+
return
947+
case <-a.closed:
948+
return
949+
// Close (!ok) can be triggered by the reader closing due to
950+
// EOF or due to agent closing, when this happens we attempt
951+
// a final flush. If the context is canceled this will be a
952+
// no-op.
953+
case logs, ok := <-send:
954+
done = !ok
955+
if ok {
956+
backlog = append(backlog, logs...)
957+
flushed = len(backlog) >= backlogLimit
963958
}
959+
case <-flush.C:
960+
flushed = true
961+
}
962+
963+
if (done || flushed) && len(backlog) > 0 {
964+
flush.Stop() // Lower the chance of a double flush.
965+
966+
// Retry uploading logs until successful or a specific
967+
// error occurs.
968+
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
969+
err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
970+
Logs: backlog,
971+
})
972+
if err == nil {
973+
break
974+
}
975+
976+
if errors.Is(err, context.Canceled) {
977+
return
978+
}
979+
var sdkErr *codersdk.Error
980+
if errors.As(err, &sdkErr) {
981+
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
982+
a.logger.Warn(ctx, "startup logs too large, dropping logs")
983+
break
984+
}
985+
}
986+
a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog))
987+
}
988+
if ctx.Err() != nil {
989+
return
990+
}
991+
backlog = nil
992+
993+
// Anchor flush to the last log upload.
994+
flush.Reset(flushTimeout)
995+
}
996+
if done {
997+
return
964998
}
965-
a.logger.Error(ctx, "upload startup logs", slog.Error(err), slog.F("to_send", logsToSend))
966-
}
967-
// Reset logsSending flag
968-
logMutex.Lock()
969-
logsSending = false
970-
flushLogsTimer.Reset(100 * time.Millisecond)
971-
logMutex.Unlock()
972-
logsFlushed.Broadcast()
973-
}
974-
// queueLog function appends a log to the queue and triggers sendLogs if necessary
975-
queueLog := func(log agentsdk.StartupLog) {
976-
logMutex.Lock()
977-
defer logMutex.Unlock()
978-
979-
// Append log to the queue
980-
queuedLogs = append(queuedLogs, log)
981-
982-
// If there are more than 100 logs, send them immediately
983-
if len(queuedLogs) > 100 {
984-
// Don't early return after this, because we still want
985-
// to reset the timer just in case logs come in while
986-
// we're sending.
987-
go sendLogs()
988-
}
989-
// Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
990-
if flushLogsTimer != nil {
991-
flushLogsTimer.Reset(100 * time.Millisecond)
992-
return
993999
}
994-
flushLogsTimer = time.AfterFunc(100*time.Millisecond, sendLogs)
995-
}
1000+
}()
9961001

997-
// It's important that we either flush or drop all logs before returning
998-
// because the startup state is reported after flush.
1002+
// Forward read lines to the sender or queue them for when the
1003+
// sender is ready to process them.
9991004
//
1000-
// It'd be weird for the startup state to be ready, but logs are still
1001-
// coming in.
1002-
logsFinished := make(chan struct{})
1005+
// We only need to track this goroutine since it will ensure that
1006+
// the sender has closed before returning.
1007+
logsDone := make(chan struct{})
10031008
err := a.trackConnGoroutine(func() {
1004-
scanner := bufio.NewScanner(reader)
1005-
for scanner.Scan() {
1006-
queueLog(agentsdk.StartupLog{
1009+
defer func() {
1010+
close(send)
1011+
<-sendDone
1012+
a.logger.Debug(ctx, "track script logs reader exited")
1013+
close(logsDone)
1014+
}()
1015+
1016+
var queue []agentsdk.StartupLog
1017+
1018+
s := bufio.NewScanner(reader)
1019+
for s.Scan() {
1020+
select {
1021+
case <-ctx.Done():
1022+
return
1023+
case <-a.closed:
1024+
return
1025+
case queue = <-send:
1026+
// Not captured by sender yet, re-use.
1027+
default:
1028+
}
1029+
1030+
queue = append(queue, agentsdk.StartupLog{
10071031
CreatedAt: database.Now(),
1008-
Output: scanner.Text(),
1032+
Output: s.Text(),
10091033
})
1034+
send <- queue
1035+
queue = nil
10101036
}
1011-
if err := scanner.Err(); err != nil {
1012-
a.logger.Error(ctx, "scan startup logs", slog.Error(err))
1013-
}
1014-
defer close(logsFinished)
1015-
logsFlushed.L.Lock()
1016-
for {
1017-
logMutex.Lock()
1018-
if len(queuedLogs) == 0 {
1019-
logMutex.Unlock()
1020-
break
1021-
}
1022-
logMutex.Unlock()
1023-
logsFlushed.Wait()
1037+
if err := s.Err(); err != nil {
1038+
a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err))
10241039
}
10251040
})
10261041
if err != nil {
1027-
return nil, xerrors.Errorf("track conn goroutine: %w", err)
1042+
close(send)
1043+
<-sendDone
1044+
close(logsDone)
1045+
return logsDone, err
10281046
}
1029-
return logsFinished, nil
1047+
1048+
return logsDone, nil
10301049
}
10311050

10321051
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {
@@ -1346,7 +1365,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
13461365
)
13471366
})
13481367
if err != nil {
1349-
a.logger.Error(ctx, "report stats", slog.Error(err))
1368+
a.logger.Error(ctx, "agent failed to report stats", slog.Error(err))
13501369
} else {
13511370
if err = a.trackConnGoroutine(func() {
13521371
// This is OK because the agent never re-creates the tailnet

0 commit comments

Comments
 (0)