Skip to content

feat: Add buffering to provisioner job logs #4918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Add bufferring to provisioner job logs
This should improve overall build performance, and especially under load.

It removes the old `id` column on the `provisioner_job_logs` table
and replaces it with an auto-incrementing big integer to preserve order.

Funny enough, we never had to care about order before because inserts
would at minimum be 1ms different. Now they aren't, so the order needs
to be preserved.
  • Loading branch information
kylecarbs committed Nov 6, 2022
commit d7d4a339de98c1131f6547023fe77e9a3088a910
7 changes: 6 additions & 1 deletion coderd/database/databasefake/databasefake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2212,10 +2212,15 @@ func (q *fakeQuerier) InsertProvisionerJobLogs(_ context.Context, arg database.I
defer q.mutex.Unlock()

logs := make([]database.ProvisionerJobLog, 0)
id := int64(1)
if len(q.provisionerJobLogs) > 0 {
id = q.provisionerJobLogs[len(q.provisionerJobLogs)-1].ID
}
for index, output := range arg.Output {
id++
logs = append(logs, database.ProvisionerJobLog{
ID: id,
JobID: arg.JobID,
ID: arg.ID[index],
CreatedAt: arg.CreatedAt[index],
Source: arg.Source[index],
Level: arg.Level[index],
Expand Down
15 changes: 13 additions & 2 deletions coderd/database/dump.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE provisioner_job_logs DROP COLUMN id;

ALTER TABLE provisioner_job_logs ADD COLUMN id BIGSERIAL PRIMARY KEY;
2 changes: 1 addition & 1 deletion coderd/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 10 additions & 15 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions coderd/database/queries/provisionerjoblogs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ WHERE
AND (
created_at >= @created_after
OR created_at <= @created_before
)
ORDER BY
created_at DESC;
) ORDER BY id;

-- name: InsertProvisionerJobLogs :many
INSERT INTO
provisioner_job_logs
SELECT
unnest(@id :: uuid [ ]) AS id,
@job_id :: uuid AS job_id,
unnest(@created_at :: timestamptz [ ]) AS created_at,
unnest(@source :: log_source [ ]) AS source,
Expand Down
9 changes: 2 additions & 7 deletions coderd/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
if err != nil {
return nil, xerrors.Errorf("convert log source: %w", err)
}
insertParams.ID = append(insertParams.ID, uuid.New())
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
insertParams.Level = append(insertParams.Level, logLevel)
insertParams.Stage = append(insertParams.Stage, log.Stage)
Expand All @@ -379,17 +378,13 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
slog.F("stage", log.Stage),
slog.F("output", log.Output))
}
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
_, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
if err != nil {
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
return nil, xerrors.Errorf("insert job logs: %w", err)
}
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
data, err := json.Marshal(provisionerJobLogsMessage{Logs: logs})
if err != nil {
return nil, xerrors.Errorf("marshal job log: %w", err)
}
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), []byte("{}"))
if err != nil {
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
return nil, xerrors.Errorf("publish job log: %w", err)
Expand Down
18 changes: 13 additions & 5 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.

logIdsDone := make(map[uuid.UUID]bool)
logIdsDone := make(map[int64]bool)

// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(wsNetConn)
Expand Down Expand Up @@ -370,8 +370,7 @@ func provisionerJobLogsChannel(jobID uuid.UUID) string {

// provisionerJobLogsMessage is the message type published on the provisionerJobLogsChannel() channel
type provisionerJobLogsMessage struct {
EndOfLogs bool `json:"end_of_logs,omitempty"`
Logs []database.ProvisionerJobLog `json:"logs,omitempty"`
EndOfLogs bool `json:"end_of_logs,omitempty"`
}

func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) {
Expand All @@ -380,6 +379,7 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
var (
closed = make(chan struct{})
bufferedLogs = make(chan database.ProvisionerJobLog, 128)
since = database.Now()
)
closeSubscribe, err := api.Pubsub.Subscribe(
provisionerJobLogsChannel(jobID),
Expand All @@ -389,15 +389,23 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
return
default:
}

jlMsg := provisionerJobLogsMessage{}
err := json.Unmarshal(message, &jlMsg)
if err != nil {
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
return
}
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: jobID,
CreatedAfter: since,
})
if err != nil {
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
return
}
since = database.Now()

for _, log := range jlMsg.Logs {
for _, log := range logs {
select {
case bufferedLogs <- log:
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
Expand Down
Loading