Skip to content

Commit 7c3dbbb

Browse files
authored
fix: stream provisioner logs (#7712)
* stream provisioner logs Signed-off-by: Spike Curtis <spike@coder.com> * Fix imports Signed-off-by: Spike Curtis <spike@coder.com> * Better logging, naming, arg order Signed-off-by: Spike Curtis <spike@coder.com> --------- Signed-off-by: Spike Curtis <spike@coder.com>
1 parent 583b777 commit 7c3dbbb

File tree

6 files changed

+536
-221
lines changed

6 files changed

+536
-221
lines changed

coderd/provisionerdserver/provisionerdserver.go

+6-19
Original file line numberDiff line numberDiff line change
@@ -537,13 +537,13 @@ func (server *Server) UpdateJob(ctx context.Context, request *proto.UpdateJobReq
537537
// everything from that point.
538538
lowestID := logs[0].ID
539539
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
540-
data, err := json.Marshal(ProvisionerJobLogsNotifyMessage{
540+
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
541541
CreatedAfter: lowestID - 1,
542542
})
543543
if err != nil {
544544
return nil, xerrors.Errorf("marshal: %w", err)
545545
}
546-
err = server.Pubsub.Publish(ProvisionerJobLogsNotifyChannel(parsedID), data)
546+
err = server.Pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(parsedID), data)
547547
if err != nil {
548548
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
549549
return nil, xerrors.Errorf("publish job log: %w", err)
@@ -846,11 +846,11 @@ func (server *Server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*p
846846
}
847847
}
848848

849-
data, err := json.Marshal(ProvisionerJobLogsNotifyMessage{EndOfLogs: true})
849+
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{EndOfLogs: true})
850850
if err != nil {
851851
return nil, xerrors.Errorf("marshal job log: %w", err)
852852
}
853-
err = server.Pubsub.Publish(ProvisionerJobLogsNotifyChannel(jobID), data)
853+
err = server.Pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobID), data)
854854
if err != nil {
855855
server.Logger.Error(ctx, "failed to publish end of job logs", slog.F("job_id", jobID), slog.Error(err))
856856
return nil, xerrors.Errorf("publish end of job logs: %w", err)
@@ -1236,11 +1236,11 @@ func (server *Server) CompleteJob(ctx context.Context, completed *proto.Complete
12361236
reflect.TypeOf(completed.Type).String())
12371237
}
12381238

1239-
data, err := json.Marshal(ProvisionerJobLogsNotifyMessage{EndOfLogs: true})
1239+
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{EndOfLogs: true})
12401240
if err != nil {
12411241
return nil, xerrors.Errorf("marshal job log: %w", err)
12421242
}
1243-
err = server.Pubsub.Publish(ProvisionerJobLogsNotifyChannel(jobID), data)
1243+
err = server.Pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobID), data)
12441244
if err != nil {
12451245
server.Logger.Error(ctx, "failed to publish end of job logs", slog.F("job_id", jobID), slog.Error(err))
12461246
return nil, xerrors.Errorf("publish end of job logs: %w", err)
@@ -1704,19 +1704,6 @@ type TemplateVersionDryRunJob struct {
17041704
RichParameterValues []database.WorkspaceBuildParameter `json:"rich_parameter_values"`
17051705
}
17061706

1707-
// ProvisionerJobLogsNotifyMessage is the payload published on
1708-
// the provisioner job logs notify channel.
1709-
type ProvisionerJobLogsNotifyMessage struct {
1710-
CreatedAfter int64 `json:"created_after"`
1711-
EndOfLogs bool `json:"end_of_logs,omitempty"`
1712-
}
1713-
1714-
// ProvisionerJobLogsNotifyChannel is the PostgreSQL NOTIFY channel
1715-
// to publish updates to job logs on.
1716-
func ProvisionerJobLogsNotifyChannel(jobID uuid.UUID) string {
1717-
return fmt.Sprintf("provisioner-log-logs:%s", jobID)
1718-
}
1719-
17201707
func asVariableValues(templateVariables []database.TemplateVersionVariable) []*sdkproto.VariableValue {
17211708
var apiVariableValues []*sdkproto.VariableValue
17221709
for _, v := range templateVariables {

coderd/provisionerdserver/provisionerdserver_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/coder/coder/coderd/telemetry"
2828
"github.com/coder/coder/codersdk"
2929
"github.com/coder/coder/provisionerd/proto"
30+
"github.com/coder/coder/provisionersdk"
3031
sdkproto "github.com/coder/coder/provisionersdk/proto"
3132
"github.com/coder/coder/testutil"
3233
)
@@ -528,7 +529,7 @@ func TestUpdateJob(t *testing.T) {
528529

529530
published := make(chan struct{})
530531

531-
closeListener, err := srv.Pubsub.Subscribe(provisionerdserver.ProvisionerJobLogsNotifyChannel(job), func(_ context.Context, _ []byte) {
532+
closeListener, err := srv.Pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(job), func(_ context.Context, _ []byte) {
532533
close(published)
533534
})
534535
require.NoError(t, err)
@@ -776,7 +777,7 @@ func TestFailJob(t *testing.T) {
776777
require.NoError(t, err)
777778
defer closeWorkspaceSubscribe()
778779
publishedLogs := make(chan struct{})
779-
closeLogsSubscribe, err := srv.Pubsub.Subscribe(provisionerdserver.ProvisionerJobLogsNotifyChannel(job.ID), func(_ context.Context, _ []byte) {
780+
closeLogsSubscribe, err := srv.Pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), func(_ context.Context, _ []byte) {
780781
close(publishedLogs)
781782
})
782783
require.NoError(t, err)
@@ -1082,7 +1083,7 @@ func TestCompleteJob(t *testing.T) {
10821083
require.NoError(t, err)
10831084
defer closeWorkspaceSubscribe()
10841085
publishedLogs := make(chan struct{})
1085-
closeLogsSubscribe, err := srv.Pubsub.Subscribe(provisionerdserver.ProvisionerJobLogsNotifyChannel(job.ID), func(_ context.Context, _ []byte) {
1086+
closeLogsSubscribe, err := srv.Pubsub.Subscribe(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), func(_ context.Context, _ []byte) {
10861087
close(publishedLogs)
10871088
})
10881089
require.NoError(t, err)

0 commit comments

Comments
 (0)