Skip to content

Commit dcf3a56

Browse files
committed
feat: unified tracing between coderd<->provisionerd
1 parent e6931d6 commit dcf3a56

31 files changed

+592
-332
lines changed

cli/server.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,10 @@ func newProvisionerDaemon(
11851185
return nil, xerrors.Errorf("mkdir %q: %w", cacheDir, err)
11861186
}
11871187

1188-
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
1188+
tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
1189+
terraformClient, terraformServer := provisionersdk.MemTransportPipe(&provisionersdk.MemTransportOptions{
1190+
Tracer: tracer,
1191+
})
11891192
wg.Add(1)
11901193
go func() {
11911194
defer wg.Done()
@@ -1204,6 +1207,7 @@ func newProvisionerDaemon(
12041207
},
12051208
CachePath: cacheDir,
12061209
Logger: logger,
1210+
Tracer: tracer,
12071211
})
12081212
if err != nil && !xerrors.Is(err, context.Canceled) {
12091213
select {
@@ -1223,7 +1227,7 @@ func newProvisionerDaemon(
12231227
}
12241228
// include echo provisioner when in dev mode
12251229
if dev {
1226-
echoClient, echoServer := provisionersdk.MemTransportPipe()
1230+
echoClient, echoServer := provisionersdk.MemTransportPipe(nil)
12271231
wg.Add(1)
12281232
go func() {
12291233
defer wg.Done()

coderd/coderd.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,10 @@ func compressHandler(h http.Handler) http.Handler {
898898
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
899899
// Useful when starting coderd and provisionerd in the same process.
900900
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
901-
clientSession, serverSession := provisionersdk.MemTransportPipe()
901+
tracer := api.TracerProvider.Tracer(tracing.TracerName)
902+
clientSession, serverSession := provisionersdk.MemTransportPipe(&provisionersdk.MemTransportOptions{
903+
Tracer: tracer,
904+
})
902905
defer func() {
903906
if err != nil {
904907
_ = clientSession.Close()
@@ -947,14 +950,16 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
947950
if err != nil {
948951
return nil, err
949952
}
950-
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
951-
Log: func(err error) {
952-
if xerrors.Is(err, io.EOF) {
953-
return
954-
}
955-
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
953+
server := drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux, Tracer: tracer},
954+
drpcserver.Options{
955+
Log: func(err error) {
956+
if xerrors.Is(err, io.EOF) {
957+
return
958+
}
959+
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
960+
},
956961
},
957-
})
962+
)
958963
go func() {
959964
err := server.Serve(ctx, serverSession)
960965
if err != nil && !xerrors.Is(err, io.EOF) {

coderd/coderdtest/coderdtest.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ func NewWithAPI(t *testing.T, options *Options) (*codersdk.Client, io.Closer, *c
385385
// well with coderd testing. It registers the "echo" provisioner for
386386
// quick testing.
387387
func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
388-
echoClient, echoServer := provisionersdk.MemTransportPipe()
388+
echoClient, echoServer := provisionersdk.MemTransportPipe(nil)
389389
ctx, cancelFunc := context.WithCancel(context.Background())
390390
t.Cleanup(func() {
391391
_ = echoClient.Close()
@@ -420,7 +420,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
420420
}
421421

422422
func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uuid.UUID, tags map[string]string) io.Closer {
423-
echoClient, echoServer := provisionersdk.MemTransportPipe()
423+
echoClient, echoServer := provisionersdk.MemTransportPipe(nil)
424424
ctx, cancelFunc := context.WithCancel(context.Background())
425425
serveDone := make(chan struct{})
426426
t.Cleanup(func() {

coderd/database/dump.sql

+2-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE provisioner_jobs DROP COLUMN metadata;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE provisioner_jobs ADD COLUMN metadata jsonb;

coderd/database/models.go

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

+14-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/provisionerjobs.sql

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,11 @@ INSERT INTO
6363
file_id,
6464
"type",
6565
"input",
66-
tags
66+
tags,
67+
metadata
6768
)
6869
VALUES
69-
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *;
70+
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;
7071

7172
-- name: UpdateProvisionerJobByID :exec
7273
UPDATE

coderd/provisionerdserver/provisionerdserver.go

+10
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,22 @@ func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.Ac
129129
return nil, failJob(fmt.Sprintf("get user: %s", err))
130130
}
131131

132+
jobMetadata := map[string]string{}
133+
if job.Metadata.Valid {
134+
err := json.Unmarshal(job.Metadata.RawMessage, &jobMetadata)
135+
if err != nil {
136+
return nil, failJob(fmt.Sprintf("unmarshal metadata: %s", err))
137+
}
138+
}
139+
132140
protoJob := &proto.AcquiredJob{
133141
JobId: job.ID.String(),
134142
CreatedAt: job.CreatedAt.UnixMilli(),
135143
Provisioner: string(job.Provisioner),
136144
UserName: user.Username,
145+
Metadata: jobMetadata,
137146
}
147+
138148
switch job.Type {
139149
case database.ProvisionerJobTypeWorkspaceBuild:
140150
var input WorkspaceProvisionJob

coderd/templateversions.go

+23
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/go-chi/chi/v5"
1414
"github.com/google/uuid"
1515
"github.com/moby/moby/pkg/namesgenerator"
16+
"github.com/tabbed/pqtype"
1617
"golang.org/x/xerrors"
1718

1819
"cdr.dev/slog"
@@ -25,6 +26,7 @@ import (
2526
"github.com/coder/coder/coderd/parameter"
2627
"github.com/coder/coder/coderd/provisionerdserver"
2728
"github.com/coder/coder/coderd/rbac"
29+
"github.com/coder/coder/coderd/tracing"
2830
"github.com/coder/coder/codersdk"
2931
"github.com/coder/coder/examples"
3032
sdkproto "github.com/coder/coder/provisionersdk/proto"
@@ -574,6 +576,15 @@ func (api *API) postTemplateVersionDryRun(rw http.ResponseWriter, r *http.Reques
574576
return
575577
}
576578

579+
metadataRaw, err := json.Marshal(tracing.MetadataFromContext(ctx))
580+
if err != nil {
581+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
582+
Message: "Internal error unmarshalling metadata.",
583+
Detail: err.Error(),
584+
})
585+
return
586+
}
587+
577588
// Create a dry-run job
578589
jobID := uuid.New()
579590
provisionerJob, err := api.Database.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
@@ -589,6 +600,10 @@ func (api *API) postTemplateVersionDryRun(rw http.ResponseWriter, r *http.Reques
589600
Input: input,
590601
// Copy tags from the previous run.
591602
Tags: job.Tags,
603+
Metadata: pqtype.NullRawMessage{
604+
Valid: true,
605+
RawMessage: metadataRaw,
606+
},
592607
})
593608
if err != nil {
594609
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
@@ -1408,6 +1423,10 @@ func (api *API) postTemplateVersionsByOrganization(rw http.ResponseWriter, r *ht
14081423
if err != nil {
14091424
return xerrors.Errorf("marshal job input: %w", err)
14101425
}
1426+
metadataRaw, err := json.Marshal(tracing.MetadataFromContext(ctx))
1427+
if err != nil {
1428+
return xerrors.Errorf("marshal job metadata: %w", err)
1429+
}
14111430

14121431
provisionerJob, err = tx.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
14131432
ID: jobID,
@@ -1421,6 +1440,10 @@ func (api *API) postTemplateVersionsByOrganization(rw http.ResponseWriter, r *ht
14211440
Type: database.ProvisionerJobTypeTemplateVersionImport,
14221441
Input: jobInput,
14231442
Tags: tags,
1443+
Metadata: pqtype.NullRawMessage{
1444+
Valid: true,
1445+
RawMessage: metadataRaw,
1446+
},
14241447
})
14251448
if err != nil {
14261449
return xerrors.Errorf("insert provisioner job: %w", err)

coderd/tracing/drpc.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package tracing
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/otel"
7+
"go.opentelemetry.io/otel/propagation"
8+
"go.opentelemetry.io/otel/trace"
9+
"storj.io/drpc"
10+
"storj.io/drpc/drpcmetadata"
11+
)
12+
13+
type DRPCHandler struct {
14+
Handler drpc.Handler
15+
Tracer trace.Tracer
16+
}
17+
18+
func (t *DRPCHandler) HandleRPC(stream drpc.Stream, rpc string) error {
19+
metadata, ok := drpcmetadata.Get(stream.Context())
20+
if ok {
21+
ctx := otel.GetTextMapPropagator().Extract(stream.Context(), propagation.MapCarrier(metadata))
22+
ctx, span := t.Tracer.Start(ctx, "drpc.HandleRPC")
23+
defer span.End()
24+
stream = &drpcStreamWrapper{Stream: stream, ctx: ctx}
25+
}
26+
27+
return t.Handler.HandleRPC(stream, rpc)
28+
}
29+
30+
type drpcStreamWrapper struct {
31+
drpc.Stream
32+
33+
ctx context.Context
34+
}
35+
36+
func (s *drpcStreamWrapper) Context() context.Context { return s.ctx }
37+
38+
type DRPCConn struct {
39+
drpc.Conn
40+
Tracer trace.Tracer
41+
}
42+
43+
// Invoke implements drpc.Conn's Invoke method with tracing information injected into the context.
44+
func (c *DRPCConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in drpc.Message, out drpc.Message) (err error) {
45+
ctx, span := c.Tracer.Start(ctx, rpc)
46+
defer span.End()
47+
48+
return c.Conn.Invoke(c.addMetadata(ctx), rpc, enc, in, out)
49+
}
50+
51+
// NewStream implements drpc.Conn's NewStream method with tracing information injected into the context.
52+
func (c *DRPCConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) {
53+
ctx, span := c.Tracer.Start(ctx, rpc)
54+
defer span.End()
55+
56+
return c.Conn.NewStream(c.addMetadata(ctx), rpc, enc)
57+
}
58+
59+
// addMetadata propagates the headers into a map that we inject into drpc metadata so they are
60+
// sent across the wire for the server to get.
61+
func (*DRPCConn) addMetadata(ctx context.Context) context.Context {
62+
metadata := make(map[string]string)
63+
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(metadata))
64+
return drpcmetadata.AddPairs(ctx, metadata)
65+
}

coderd/tracing/util.go

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"runtime"
66
"strings"
77

8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/propagation"
810
"go.opentelemetry.io/otel/trace"
911
)
1012

@@ -41,3 +43,13 @@ func RunWithoutSpan(ctx context.Context, fn func(ctx context.Context)) {
4143
ctx = trace.ContextWithSpan(ctx, NoopSpan)
4244
fn(ctx)
4345
}
46+
47+
func MetadataFromContext(ctx context.Context) map[string]string {
48+
metadata := make(map[string]string)
49+
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(metadata))
50+
return metadata
51+
}
52+
53+
func MetadataToContext(ctx context.Context, metadata map[string]string) context.Context {
54+
return otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(metadata))
55+
}

0 commit comments

Comments
 (0)