Skip to content

Commit 0b4e079

Browse files
committed
chore: add WS endpoints
1 parent b3bb076 commit 0b4e079

File tree

3 files changed

+329
-0
lines changed

3 files changed

+329
-0
lines changed

coderd/coderd.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,7 @@ func New(options *Options) *API {
12291229
)
12301230
r.Get("/", api.workspaceAgent)
12311231
r.Get("/watch-metadata", api.watchWorkspaceAgentMetadata)
1232+
r.Get("/watch-metadata-ws", api.watchWorkspaceAgentMetadataWs)
12321233
r.Get("/startup-logs", api.workspaceAgentLogsDeprecated)
12331234
r.Get("/logs", api.workspaceAgentLogs)
12341235
r.Get("/listening-ports", api.workspaceAgentListeningPorts)
@@ -1261,6 +1262,7 @@ func New(options *Options) *API {
12611262
r.Put("/", api.putWorkspaceTTL)
12621263
})
12631264
r.Get("/watch", api.watchWorkspace)
1265+
r.Get("/watch-ws", api.watchWorkspaceWs)
12641266
r.Put("/extend", api.putExtendWorkspace)
12651267
r.Post("/usage", api.postWorkspaceUsage)
12661268
r.Put("/dormant", api.putWorkspaceDormant)

coderd/workspaceagents.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,7 @@ func convertScripts(dbScripts []database.WorkspaceAgentScript) []codersdk.Worksp
10941094
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
10951095
// @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
10961096
// @x-apidocgen {"skip": true}
1097+
// @Deprecated Use /workspaceagents/{workspaceagent}/watch-metadata-ws instead
10971098
func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Request) {
10981099
// Allow us to interrupt watch via cancel.
10991100
ctx, cancel := context.WithCancel(r.Context())
@@ -1273,6 +1274,193 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
12731274
}
12741275
}
12751276

1277+
// @Summary Watch for workspace agent metadata updates
1278+
// @ID watch-for-workspace-agent-metadata-updates
1279+
// @Security CoderSessionToken
1280+
// @Tags Agents
1281+
// @Success 200 "Success"
1282+
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
1283+
// @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
1284+
// @x-apidocgen {"skip": true}
1285+
func (api *API) watchWorkspaceAgentMetadataWs(rw http.ResponseWriter, r *http.Request) {
1286+
// Allow us to interrupt watch via cancel.
1287+
ctx, cancel := context.WithCancel(r.Context())
1288+
defer cancel()
1289+
r = r.WithContext(ctx) // Rewire context for SSE cancellation.
1290+
1291+
workspaceAgent := httpmw.WorkspaceAgentParam(r)
1292+
log := api.Logger.Named("workspace_metadata_watcher").With(
1293+
slog.F("workspace_agent_id", workspaceAgent.ID),
1294+
)
1295+
1296+
// Send metadata on updates, we must ensure subscription before sending
1297+
// initial metadata to guarantee that events in-between are not missed.
1298+
update := make(chan agentapi.WorkspaceAgentMetadataChannelPayload, 1)
1299+
cancelSub, err := api.Pubsub.Subscribe(agentapi.WatchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1300+
if ctx.Err() != nil {
1301+
return
1302+
}
1303+
1304+
var payload agentapi.WorkspaceAgentMetadataChannelPayload
1305+
err := json.Unmarshal(byt, &payload)
1306+
if err != nil {
1307+
log.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1308+
return
1309+
}
1310+
1311+
log.Debug(ctx, "received metadata update", "payload", payload)
1312+
1313+
select {
1314+
case prev := <-update:
1315+
payload.Keys = appendUnique(prev.Keys, payload.Keys)
1316+
default:
1317+
}
1318+
// This can never block since we pop and merge beforehand.
1319+
update <- payload
1320+
})
1321+
if err != nil {
1322+
httpapi.InternalServerError(rw, err)
1323+
return
1324+
}
1325+
defer cancelSub()
1326+
1327+
// We always use the original Request context because it contains
1328+
// the RBAC actor.
1329+
initialMD, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
1330+
WorkspaceAgentID: workspaceAgent.ID,
1331+
Keys: nil,
1332+
})
1333+
if err != nil {
1334+
// If we can't successfully pull the initial metadata, pubsub
1335+
// updates will be no-op so we may as well terminate the
1336+
// connection early.
1337+
httpapi.InternalServerError(rw, err)
1338+
return
1339+
}
1340+
1341+
log.Debug(ctx, "got initial metadata", "num", len(initialMD))
1342+
1343+
metadataMap := make(map[string]database.WorkspaceAgentMetadatum, len(initialMD))
1344+
for _, datum := range initialMD {
1345+
metadataMap[datum.Key] = datum
1346+
}
1347+
//nolint:ineffassign // Release memory.
1348+
initialMD = nil
1349+
1350+
send, closed, err := httpapi.OneWayWebSocket[codersdk.ServerSentEvent](rw, r)
1351+
if err != nil {
1352+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1353+
Message: "Internal error setting up server-sent events.",
1354+
Detail: err.Error(),
1355+
})
1356+
return
1357+
}
1358+
// Prevent handler from returning until the sender is closed.
1359+
defer func() {
1360+
cancel()
1361+
<-closed
1362+
}()
1363+
// Synchronize cancellation from SSE -> context, this lets us simplify the
1364+
// cancellation logic.
1365+
go func() {
1366+
select {
1367+
case <-ctx.Done():
1368+
case <-closed:
1369+
cancel()
1370+
}
1371+
}()
1372+
1373+
var lastSend time.Time
1374+
sendMetadata := func() {
1375+
lastSend = time.Now()
1376+
values := maps.Values(metadataMap)
1377+
1378+
log.Debug(ctx, "sending metadata", "num", len(values))
1379+
1380+
_ = send(codersdk.ServerSentEvent{
1381+
Type: codersdk.ServerSentEventTypeData,
1382+
Data: convertWorkspaceAgentMetadata(values),
1383+
})
1384+
}
1385+
1386+
// We send updates exactly every second.
1387+
const sendInterval = time.Second * 1
1388+
sendTicker := time.NewTicker(sendInterval)
1389+
defer sendTicker.Stop()
1390+
1391+
// Send initial metadata.
1392+
sendMetadata()
1393+
1394+
// Fetch updated metadata keys as they come in.
1395+
fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum)
1396+
go func() {
1397+
defer close(fetchedMetadata)
1398+
defer cancel()
1399+
1400+
for {
1401+
select {
1402+
case <-ctx.Done():
1403+
return
1404+
case payload := <-update:
1405+
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
1406+
WorkspaceAgentID: workspaceAgent.ID,
1407+
Keys: payload.Keys,
1408+
})
1409+
if err != nil {
1410+
if !database.IsQueryCanceledError(err) {
1411+
log.Error(ctx, "failed to get metadata", slog.Error(err))
1412+
_ = send(codersdk.ServerSentEvent{
1413+
Type: codersdk.ServerSentEventTypeError,
1414+
Data: codersdk.Response{
1415+
Message: "Failed to get metadata.",
1416+
Detail: err.Error(),
1417+
},
1418+
})
1419+
}
1420+
return
1421+
}
1422+
select {
1423+
case <-ctx.Done():
1424+
return
1425+
// We want to block here to avoid constantly pinging the
1426+
// database when the metadata isn't being processed.
1427+
case fetchedMetadata <- md:
1428+
log.Debug(ctx, "fetched metadata update for keys", "keys", payload.Keys, "num", len(md))
1429+
}
1430+
}
1431+
}
1432+
}()
1433+
defer func() {
1434+
<-fetchedMetadata
1435+
}()
1436+
1437+
pendingChanges := true
1438+
for {
1439+
select {
1440+
case <-ctx.Done():
1441+
return
1442+
case md, ok := <-fetchedMetadata:
1443+
if !ok {
1444+
return
1445+
}
1446+
for _, datum := range md {
1447+
metadataMap[datum.Key] = datum
1448+
}
1449+
pendingChanges = true
1450+
continue
1451+
case <-sendTicker.C:
1452+
// We send an update even if there's no change every 5 seconds
1453+
// to ensure that the frontend always has an accurate "Result.Age".
1454+
if !pendingChanges && time.Since(lastSend) < 5*time.Second {
1455+
continue
1456+
}
1457+
pendingChanges = false
1458+
}
1459+
1460+
sendMetadata()
1461+
}
1462+
}
1463+
12761464
// appendUnique is like append and adds elements from src to dst,
12771465
// skipping any elements that already exist in dst.
12781466
func appendUnique[T comparable](dst, src []T) []T {

coderd/workspaces.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,6 +1718,7 @@ func (api *API) resolveAutostart(rw http.ResponseWriter, r *http.Request) {
17181718
// @Param workspace path string true "Workspace ID" format(uuid)
17191719
// @Success 200 {object} codersdk.Response
17201720
// @Router /workspaces/{workspace}/watch [get]
1721+
// @Deprecated Use /workspaces/{workspace}/watch-ws instead
17211722
func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17221723
ctx := r.Context()
17231724
workspace := httpmw.WorkspaceParam(r)
@@ -1848,6 +1849,144 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
18481849
}
18491850
}
18501851

1852+
// @Summary Watch workspace by ID
1853+
// @ID watch-workspace-by-id
1854+
// @Security CoderSessionToken
1855+
// @Produce text/event-stream
1856+
// @Tags Workspaces
1857+
// @Param workspace path string true "Workspace ID" format(uuid)
1858+
// @Success 200 {object} codersdk.Response
1859+
// @Router /workspaces/{workspace}/watch [get]
1860+
func (api *API) watchWorkspaceWs(rw http.ResponseWriter, r *http.Request) {
1861+
ctx := r.Context()
1862+
workspace := httpmw.WorkspaceParam(r)
1863+
apiKey := httpmw.APIKey(r)
1864+
1865+
send, closed, err := httpapi.OneWayWebSocket[codersdk.ServerSentEvent](rw, r)
1866+
if err != nil {
1867+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1868+
Message: "Internal error setting up server-sent events.",
1869+
Detail: err.Error(),
1870+
})
1871+
return
1872+
}
1873+
// Prevent handler from returning until the sender is closed.
1874+
defer func() {
1875+
<-closed
1876+
}()
1877+
1878+
sendUpdate := func(_ context.Context, _ []byte) {
1879+
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
1880+
if err != nil {
1881+
_ = send(codersdk.ServerSentEvent{
1882+
Type: codersdk.ServerSentEventTypeError,
1883+
Data: codersdk.Response{
1884+
Message: "Internal error fetching workspace.",
1885+
Detail: err.Error(),
1886+
},
1887+
})
1888+
return
1889+
}
1890+
1891+
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
1892+
if err != nil {
1893+
_ = send(codersdk.ServerSentEvent{
1894+
Type: codersdk.ServerSentEventTypeError,
1895+
Data: codersdk.Response{
1896+
Message: "Internal error fetching workspace data.",
1897+
Detail: err.Error(),
1898+
},
1899+
})
1900+
return
1901+
}
1902+
if len(data.templates) == 0 {
1903+
_ = send(codersdk.ServerSentEvent{
1904+
Type: codersdk.ServerSentEventTypeError,
1905+
Data: codersdk.Response{
1906+
Message: "Forbidden reading template of selected workspace.",
1907+
},
1908+
})
1909+
return
1910+
}
1911+
1912+
w, err := convertWorkspace(
1913+
apiKey.UserID,
1914+
workspace,
1915+
data.builds[0],
1916+
data.templates[0],
1917+
api.Options.AllowWorkspaceRenames,
1918+
)
1919+
if err != nil {
1920+
_ = send(codersdk.ServerSentEvent{
1921+
Type: codersdk.ServerSentEventTypeError,
1922+
Data: codersdk.Response{
1923+
Message: "Internal error converting workspace.",
1924+
Detail: err.Error(),
1925+
},
1926+
})
1927+
}
1928+
_ = send(codersdk.ServerSentEvent{
1929+
Type: codersdk.ServerSentEventTypeData,
1930+
Data: w,
1931+
})
1932+
}
1933+
1934+
cancelWorkspaceSubscribe, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1935+
wspubsub.HandleWorkspaceEvent(
1936+
func(ctx context.Context, payload wspubsub.WorkspaceEvent, err error) {
1937+
if err != nil {
1938+
return
1939+
}
1940+
if payload.WorkspaceID != workspace.ID {
1941+
return
1942+
}
1943+
sendUpdate(ctx, nil)
1944+
}))
1945+
if err != nil {
1946+
_ = send(codersdk.ServerSentEvent{
1947+
Type: codersdk.ServerSentEventTypeError,
1948+
Data: codersdk.Response{
1949+
Message: "Internal error subscribing to workspace events.",
1950+
Detail: err.Error(),
1951+
},
1952+
})
1953+
return
1954+
}
1955+
defer cancelWorkspaceSubscribe()
1956+
1957+
// This is required to show whether the workspace is up-to-date.
1958+
cancelTemplateSubscribe, err := api.Pubsub.Subscribe(watchTemplateChannel(workspace.TemplateID), sendUpdate)
1959+
if err != nil {
1960+
_ = send(codersdk.ServerSentEvent{
1961+
Type: codersdk.ServerSentEventTypeError,
1962+
Data: codersdk.Response{
1963+
Message: "Internal error subscribing to template events.",
1964+
Detail: err.Error(),
1965+
},
1966+
})
1967+
return
1968+
}
1969+
defer cancelTemplateSubscribe()
1970+
1971+
// An initial ping signals to the request that the server is now ready
1972+
// and the client can begin servicing a channel with data.
1973+
_ = send(codersdk.ServerSentEvent{
1974+
Type: codersdk.ServerSentEventTypePing,
1975+
})
1976+
// Send updated workspace info after connection is established. This avoids
1977+
// missing updates if the client connects after an update.
1978+
sendUpdate(ctx, nil)
1979+
1980+
for {
1981+
select {
1982+
case <-ctx.Done():
1983+
return
1984+
case <-closed:
1985+
return
1986+
}
1987+
}
1988+
}
1989+
18511990
// @Summary Get workspace timings by ID
18521991
// @ID get-workspace-timings-by-id
18531992
// @Security CoderSessionToken

0 commit comments

Comments
 (0)