Skip to content

Commit d7d4a33

Browse files
committed
feat: Add bufferring to provisioner job logs
This should improve overall build performance, and especially under load. It removes the old `id` column on the `provisioner_job_logs` table and replaces it with an auto-incrementing big integer to preserve order. Funny enough, we never had to care about order before because inserts would at minimum be 1ms different. Now they aren't, so the order needs to be preserved.
1 parent 267b81a commit d7d4a33

15 files changed

+187
-310
lines changed

coderd/database/databasefake/databasefake.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2212,10 +2212,15 @@ func (q *fakeQuerier) InsertProvisionerJobLogs(_ context.Context, arg database.I
22122212
defer q.mutex.Unlock()
22132213

22142214
logs := make([]database.ProvisionerJobLog, 0)
2215+
id := int64(1)
2216+
if len(q.provisionerJobLogs) > 0 {
2217+
id = q.provisionerJobLogs[len(q.provisionerJobLogs)-1].ID
2218+
}
22152219
for index, output := range arg.Output {
2220+
id++
22162221
logs = append(logs, database.ProvisionerJobLog{
2222+
ID: id,
22172223
JobID: arg.JobID,
2218-
ID: arg.ID[index],
22192224
CreatedAt: arg.CreatedAt[index],
22202225
Source: arg.Source[index],
22212226
Level: arg.Level[index],

coderd/database/dump.sql

Lines changed: 13 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/migrations/000070_provisioner_log_lines.down.sql

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE provisioner_job_logs DROP COLUMN id;
2+
3+
ALTER TABLE provisioner_job_logs ADD COLUMN id BIGSERIAL PRIMARY KEY;

coderd/database/models.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 10 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/provisionerjoblogs.sql

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@ WHERE
88
AND (
99
created_at >= @created_after
1010
OR created_at <= @created_before
11-
)
12-
ORDER BY
13-
created_at DESC;
11+
) ORDER BY id;
1412

1513
-- name: InsertProvisionerJobLogs :many
1614
INSERT INTO
1715
provisioner_job_logs
1816
SELECT
19-
unnest(@id :: uuid [ ]) AS id,
2017
@job_id :: uuid AS job_id,
2118
unnest(@created_at :: timestamptz [ ]) AS created_at,
2219
unnest(@source :: log_source [ ]) AS source,

coderd/provisionerdaemons.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,6 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
368368
if err != nil {
369369
return nil, xerrors.Errorf("convert log source: %w", err)
370370
}
371-
insertParams.ID = append(insertParams.ID, uuid.New())
372371
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
373372
insertParams.Level = append(insertParams.Level, logLevel)
374373
insertParams.Stage = append(insertParams.Stage, log.Stage)
@@ -379,17 +378,13 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
379378
slog.F("stage", log.Stage),
380379
slog.F("output", log.Output))
381380
}
382-
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
381+
_, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
383382
if err != nil {
384383
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
385384
return nil, xerrors.Errorf("insert job logs: %w", err)
386385
}
387386
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
388-
data, err := json.Marshal(provisionerJobLogsMessage{Logs: logs})
389-
if err != nil {
390-
return nil, xerrors.Errorf("marshal job log: %w", err)
391-
}
392-
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
387+
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), []byte("{}"))
393388
if err != nil {
394389
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
395390
return nil, xerrors.Errorf("publish job log: %w", err)

coderd/provisionerjobs.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
156156
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
157157
defer wsNetConn.Close() // Also closes conn.
158158

159-
logIdsDone := make(map[uuid.UUID]bool)
159+
logIdsDone := make(map[int64]bool)
160160

161161
// The Go stdlib JSON encoder appends a newline character after message write.
162162
encoder := json.NewEncoder(wsNetConn)
@@ -370,8 +370,7 @@ func provisionerJobLogsChannel(jobID uuid.UUID) string {
370370

371371
// provisionerJobLogsMessage is the message type published on the provisionerJobLogsChannel() channel
372372
type provisionerJobLogsMessage struct {
373-
EndOfLogs bool `json:"end_of_logs,omitempty"`
374-
Logs []database.ProvisionerJobLog `json:"logs,omitempty"`
373+
EndOfLogs bool `json:"end_of_logs,omitempty"`
375374
}
376375

377376
func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) {
@@ -380,6 +379,7 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
380379
var (
381380
closed = make(chan struct{})
382381
bufferedLogs = make(chan database.ProvisionerJobLog, 128)
382+
since = database.Now()
383383
)
384384
closeSubscribe, err := api.Pubsub.Subscribe(
385385
provisionerJobLogsChannel(jobID),
@@ -389,15 +389,23 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
389389
return
390390
default:
391391
}
392-
393392
jlMsg := provisionerJobLogsMessage{}
394393
err := json.Unmarshal(message, &jlMsg)
395394
if err != nil {
396395
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
397396
return
398397
}
398+
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
399+
JobID: jobID,
400+
CreatedAfter: since,
401+
})
402+
if err != nil {
403+
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
404+
return
405+
}
406+
since = database.Now()
399407

400-
for _, log := range jlMsg.Logs {
408+
for _, log := range logs {
401409
select {
402410
case bufferedLogs <- log:
403411
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))

0 commit comments

Comments
 (0)