@@ -1174,7 +1174,7 @@ func compressHandler(h http.Handler) http.Handler {
1174
1174
1175
1175
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd.
1176
1176
// Useful when starting coderd and provisionerd in the same process.
1177
- func (api * API ) CreateInMemoryProvisionerDaemon (ctx context.Context , name string ) (client proto.DRPCProvisionerDaemonClient , err error ) {
1177
+ func (api * API ) CreateInMemoryProvisionerDaemon (dialCtx context.Context , name string ) (client proto.DRPCProvisionerDaemonClient , err error ) {
1178
1178
tracer := api .TracerProvider .Tracer (tracing .TracerName )
1179
1179
clientSession , serverSession := drpc .MemTransportPipe ()
1180
1180
defer func () {
@@ -1185,7 +1185,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string
1185
1185
}()
1186
1186
1187
1187
//nolint:gocritic // in-memory provisioners are owned by system
1188
- daemon , err := api .Database .UpsertProvisionerDaemon (dbauthz .AsSystemRestricted (ctx ), database.UpsertProvisionerDaemonParams {
1188
+ daemon , err := api .Database .UpsertProvisionerDaemon (dbauthz .AsSystemRestricted (dialCtx ), database.UpsertProvisionerDaemonParams {
1189
1189
Name : name ,
1190
1190
CreatedAt : dbtime .Now (),
1191
1191
Provisioners : []database.ProvisionerType {
@@ -1201,7 +1201,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string
1201
1201
}
1202
1202
1203
1203
mux := drpcmux .New ()
1204
- api .Logger .Info (ctx , "starting in-memory provisioner daemon" , slog .F ("name" , name ))
1204
+ api .Logger .Info (dialCtx , "starting in-memory provisioner daemon" , slog .F ("name" , name ))
1205
1205
logger := api .Logger .Named (fmt .Sprintf ("inmem-provisionerd-%s" , name ))
1206
1206
srv , err := provisionerdserver .NewServer (
1207
1207
api .ctx , // use the same ctx as the API
@@ -1238,13 +1238,25 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, name string
1238
1238
if xerrors .Is (err , io .EOF ) {
1239
1239
return
1240
1240
}
1241
- logger .Debug (ctx , "drpc server error" , slog .Error (err ))
1241
+ logger .Debug (dialCtx , "drpc server error" , slog .Error (err ))
1242
1242
},
1243
1243
},
1244
1244
)
1245
+ // in-mem pipes aren't technically "websockets" but they have the same properties as far as the
1246
+ // API is concerned: they are long-lived connections that we need to close before completing
1247
+ // shutdown of the API.
1248
+ api .WebsocketWaitMutex .Lock ()
1249
+ api .WebsocketWaitGroup .Add (1 )
1250
+ api .WebsocketWaitMutex .Unlock ()
1245
1251
go func () {
1246
- err := server .Serve (ctx , serverSession )
1247
- logger .Info (ctx , "provisioner daemon disconnected" , slog .Error (err ))
1252
+ defer api .WebsocketWaitGroup .Done ()
1253
+ // here we pass the background context, since we want the server to keep serving until the
1254
+ // client hangs up. If we, say, pass the API context, then when it is canceled, we could
1255
+ // drop a job that we locked in the database but never passed to the provisionerd. The
1256
+ // provisionerd is local, in-mem, so there isn't a danger of losing contact with it and
1257
+ // having a dead connection we don't know the status of.
1258
+ err := server .Serve (context .Background (), serverSession )
1259
+ logger .Info (dialCtx , "provisioner daemon disconnected" , slog .Error (err ))
1248
1260
// close the sessions, so we don't leak goroutines serving them.
1249
1261
_ = clientSession .Close ()
1250
1262
_ = serverSession .Close ()
0 commit comments