Skip to content

Commit 1c49916

Browse files
committed
fix: Use WebSockets to stream workspace build logs
This was using a streaming HTTP request before, which didn't work on my version of Chrome. This method seemed less reliable and standard than a WebSocket, so figured switching would be best.
1 parent 7f77831 commit 1c49916

File tree

10 files changed

+119
-73
lines changed

10 files changed

+119
-73
lines changed

coderd/provisionerjobs.go

+21-16
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/google/uuid"
14+
"nhooyr.io/websocket"
1415

1516
"cdr.dev/slog"
1617

@@ -98,12 +99,28 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
9899
return
99100
}
100101

102+
api.websocketWaitMutex.Lock()
103+
api.websocketWaitGroup.Add(1)
104+
api.websocketWaitMutex.Unlock()
105+
defer api.websocketWaitGroup.Done()
106+
conn, err := websocket.Accept(rw, r, nil)
107+
if err != nil {
108+
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
109+
Message: "Failed to accept websocket.",
110+
Detail: err.Error(),
111+
})
112+
return
113+
}
114+
115+
ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageText)
116+
defer wsNetConn.Close() // Also closes conn.
117+
101118
bufferedLogs := make(chan database.ProvisionerJobLog, 128)
102119
closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) {
103120
var logs []database.ProvisionerJobLog
104121
err := json.Unmarshal(message, &logs)
105122
if err != nil {
106-
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
123+
api.Logger.Warn(ctx, fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
107124
return
108125
}
109126

@@ -113,7 +130,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
113130
default:
114131
// If this overflows users could miss logs streaming. This can happen
115132
// if a database request takes a long amount of time, and we get a lot of logs.
116-
api.Logger.Warn(r.Context(), "provisioner job log overflowing channel")
133+
api.Logger.Warn(ctx, "provisioner job log overflowing channel")
117134
}
118135
}
119136
})
@@ -126,7 +143,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
126143
}
127144
defer closeSubscribe()
128145

129-
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{
146+
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
130147
JobID: job.ID,
131148
CreatedAfter: after,
132149
CreatedBefore: before,
@@ -142,17 +159,8 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
142159
return
143160
}
144161

145-
// "follow" uses the ndjson format to stream data.
146-
// See: https://canjs.com/doc/can-ndjson-stream.html
147-
rw.Header().Set("Content-Type", "application/stream+json")
148-
rw.WriteHeader(http.StatusOK)
149-
if flusher, ok := rw.(http.Flusher); ok {
150-
flusher.Flush()
151-
}
152-
153162
// The Go stdlib JSON encoder appends a newline character after message write.
154-
encoder := json.NewEncoder(rw)
155-
163+
encoder := json.NewEncoder(wsNetConn)
156164
for _, provisionerJobLog := range provisionerJobLogs {
157165
err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog))
158166
if err != nil {
@@ -171,9 +179,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
171179
if err != nil {
172180
return
173181
}
174-
if flusher, ok := rw.(http.Flusher); ok {
175-
flusher.Flush()
176-
}
177182
case <-ticker.C:
178183
job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID)
179184
if err != nil {

codersdk/provisionerdaemons.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@ import (
66
"encoding/json"
77
"fmt"
88
"net/http"
9+
"net/http/cookiejar"
910
"net/url"
1011
"strconv"
1112
"time"
1213

1314
"github.com/google/uuid"
15+
"golang.org/x/xerrors"
16+
"nhooyr.io/websocket"
17+
18+
"github.com/coder/coder/coderd/httpmw"
1419
)
1520

1621
type LogSource string
@@ -106,17 +111,30 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
106111
if !after.IsZero() {
107112
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
108113
}
109-
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?follow%s", path, afterQuery), nil)
114+
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
110115
if err != nil {
111116
return nil, err
112117
}
113-
if res.StatusCode != http.StatusOK {
114-
defer res.Body.Close()
118+
jar, err := cookiejar.New(nil)
119+
if err != nil {
120+
return nil, xerrors.Errorf("create cookie jar: %w", err)
121+
}
122+
jar.SetCookies(followURL, []*http.Cookie{{
123+
Name: httpmw.SessionTokenKey,
124+
Value: c.SessionToken,
125+
}})
126+
httpClient := &http.Client{
127+
Jar: jar,
128+
}
129+
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
130+
HTTPClient: httpClient,
131+
CompressionMode: websocket.CompressionDisabled,
132+
})
133+
if err != nil {
115134
return nil, readBodyAsError(res)
116135
}
117-
118136
logs := make(chan ProvisionerJobLog)
119-
decoder := json.NewDecoder(res.Body)
137+
decoder := json.NewDecoder(websocket.NetConn(ctx, conn, websocket.MessageText))
120138
go func() {
121139
defer close(logs)
122140
var log ProvisionerJobLog

scripts/build_go_matrix.sh

+3-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ for spec in "${specs[@]}"; do
193193
--os "$spec_os" \
194194
--arch "$spec_arch" \
195195
--output "$spec_output_binary" \
196-
"${build_args[@]}"
196+
"${build_args[@]}" &
197197
log
198198
log
199199

@@ -227,3 +227,5 @@ for spec in "${specs[@]}"; do
227227
log
228228
fi
229229
done
230+
231+
wait

scripts/develop.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ echo '== Without these binaries, workspaces will fail to start!'
1919
cd "${PROJECT_ROOT}"
2020

2121
trap 'kill 0' SIGINT
22-
CODERV2_HOST=http://127.0.0.1:3000 INSPECT_XSTATE=true yarn --cwd=./site dev &
22+
CODER_HOST=http://127.0.0.1:3000 INSPECT_XSTATE=true yarn --cwd=./site dev &
2323
go run -tags embed cmd/coder/main.go server --in-memory --tunnel &
2424

2525
# Just a minor sleep to ensure the first user was created to make the member.
26-
sleep 2
26+
sleep 5
2727

2828
# create the first user, the admin
2929
go run cmd/coder/main.go login http://127.0.0.1:3000 --username=admin --email=admin@coder.com --password=password || true
3030

3131
# || yes to always exit code 0. If this fails, whelp.
32-
go run cmd/coder/main.go users create --email=member@coder.com --username=member --password="${CODER_DEV_ADMIN_PASSWORD}" || true
32+
go run cmd/coder/main.go users create --email=member@coder.com --username=member --password=password || true
3333
wait
3434
)

site/src/api/api.ts

+2-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import axios, { AxiosRequestHeaders } from "axios"
2-
import ndjsonStream from "can-ndjson-stream"
32
import * as Types from "./types"
43
import { WorkspaceBuildTransition } from "./types"
54
import * as TypesGen from "./typesGenerated"
@@ -280,25 +279,11 @@ export const getWorkspaceBuildByNumber = async (
280279
return response.data
281280
}
282281

283-
export const getWorkspaceBuildLogs = async (buildname: string): Promise<TypesGen.ProvisionerJobLog[]> => {
284-
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs`)
282+
export const getWorkspaceBuildLogs = async (buildname: string, before: Date): Promise<TypesGen.ProvisionerJobLog[]> => {
283+
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs?before=`+before.getTime())
285284
return response.data
286285
}
287286

288-
export const streamWorkspaceBuildLogs = async (
289-
buildname: string,
290-
): Promise<ReadableStreamDefaultReader<TypesGen.ProvisionerJobLog>> => {
291-
// Axios does not support HTTP stream in the browser
292-
// https://github.com/axios/axios/issues/1474
293-
// So we are going to use window.fetch and return a "stream" reader
294-
const reader = await window
295-
.fetch(`/api/v2/workspacebuilds/${buildname}/logs?follow=true`)
296-
.then((res) => ndjsonStream<TypesGen.ProvisionerJobLog>(res.body))
297-
.then((stream) => stream.getReader())
298-
299-
return reader
300-
}
301-
302287
export const putWorkspaceExtension = async (
303288
workspaceId: string,
304289
extendWorkspaceRequest: TypesGen.PutExtendWorkspaceRequest,
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,31 @@
11
import { screen } from "@testing-library/react"
2-
import * as API from "../../api/api"
3-
import { MockWorkspace, MockWorkspaceBuild, renderWithAuth } from "../../testHelpers/renderHelpers"
2+
import WS from "jest-websocket-mock"
3+
import {
4+
MockWorkspace,
5+
MockWorkspaceBuild,
6+
renderWithAuth,
7+
} from "../../testHelpers/renderHelpers"
48
import { WorkspaceBuildPage } from "./WorkspaceBuildPage"
59

610
describe("WorkspaceBuildPage", () => {
711
it("renders the stats and logs", async () => {
8-
jest.spyOn(API, "streamWorkspaceBuildLogs").mockResolvedValueOnce({
9-
read() {
10-
return Promise.resolve({
11-
value: undefined,
12-
done: true,
13-
})
14-
},
15-
releaseLock: jest.fn(),
16-
closed: Promise.resolve(undefined),
17-
cancel: jest.fn(),
18-
})
12+
const server = new WS("ws://localhost/api/v2/workspacebuilds/" + MockWorkspaceBuild.id + "/logs")
1913
renderWithAuth(<WorkspaceBuildPage />, {
2014
route: `/@${MockWorkspace.owner_name}/${MockWorkspace.name}/builds/${MockWorkspace.latest_build.build_number}`,
2115
path: "/@:username/:workspace/builds/:buildNumber",
2216
})
23-
17+
await server.connected
18+
const log = {
19+
id: "70459334-4878-4bda-a546-98eee166c4c6",
20+
created_at: "2022-05-19T16:46:02.283Z",
21+
log_source: "provisioner_daemon",
22+
log_level: "info",
23+
stage: "Another stage",
24+
output: "",
25+
}
26+
server.send(JSON.stringify(log))
2427
await screen.findByText(MockWorkspaceBuild.workspace_name)
28+
await screen.findByText(log.stage)
29+
server.close()
2530
})
2631
})

site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { WorkspaceBuildPageView } from "./WorkspaceBuildPageView"
88

99
export const WorkspaceBuildPage: FC = () => {
1010
const { username, workspace: workspaceName, buildNumber } = useParams()
11-
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber } })
11+
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber, timeCursor: new Date() } })
1212
const { logs, build } = buildState.context
1313
const isWaitingForLogs = !buildState.matches("logs.loaded")
1414

site/src/pages/WorkspaceBuildPage/WorkspaceBuildPageView.tsx

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export interface WorkspaceBuildPageViewProps {
1717
isWaitingForLogs: boolean
1818
}
1919

20-
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build, isWaitingForLogs }) => {
20+
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build }) => {
2121
return (
2222
<Margins>
2323
<PageHeader>
@@ -27,7 +27,7 @@ export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs,
2727
<Stack>
2828
{build && <WorkspaceBuildStats build={build} />}
2929
{!logs && <Loader />}
30-
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={isWaitingForLogs} />}
30+
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={false} />}
3131
</Stack>
3232
</Margins>
3333
)

site/src/testHelpers/handlers.ts

+3
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,7 @@ export const handlers = [
124124
rest.patch("/api/v2/workspacebuilds/:workspaceBuildId/cancel", (req, res, ctx) => {
125125
return res(ctx.status(200), ctx.json(M.MockCancellationMessage))
126126
}),
127+
rest.get("/api/v2/workspacebuilds/:workspaceBuildId/logs", (req, res, ctx) => {
128+
return res(ctx.status(200), ctx.json(M.MockWorkspaceBuildLogs))
129+
}),
127130
]

site/src/xServices/workspaceBuild/workspaceBuildXService.ts

+42-14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ type LogsContext = {
88
workspaceName: string
99
buildNumber: string
1010
buildId: string
11+
// Used to reference logs before + after.
12+
timeCursor: Date
1113
build?: WorkspaceBuild
1214
getBuildError?: Error | unknown
1315
// Logs
@@ -33,6 +35,9 @@ export const workspaceBuildMachine = createMachine(
3335
getWorkspaceBuild: {
3436
data: WorkspaceBuild
3537
}
38+
getLogs: {
39+
data: ProvisionerJobLog[]
40+
}
3641
},
3742
},
3843
tsTypes: {} as import("./workspaceBuildXService.typegen").Typegen0,
@@ -54,8 +59,18 @@ export const workspaceBuildMachine = createMachine(
5459
},
5560
idle: {},
5661
logs: {
57-
initial: "watchingLogs",
62+
initial: "gettingExistentLogs",
5863
states: {
64+
gettingExistentLogs: {
65+
invoke: {
66+
id: "getLogs",
67+
src: "getLogs",
68+
onDone: {
69+
actions: ["assignLogs"],
70+
target: "watchingLogs",
71+
},
72+
},
73+
},
5974
watchingLogs: {
6075
id: "watchingLogs",
6176
invoke: {
@@ -94,6 +109,10 @@ export const workspaceBuildMachine = createMachine(
94109
clearGetBuildError: assign({
95110
getBuildError: (_) => undefined,
96111
}),
112+
// Logs
113+
assignLogs: assign({
114+
logs: (_, event) => event.data,
115+
}),
97116
addLog: assign({
98117
logs: (context, event) => {
99118
const previousLogs = context.logs ?? []
@@ -103,21 +122,30 @@ export const workspaceBuildMachine = createMachine(
103122
},
104123
services: {
105124
getWorkspaceBuild: (ctx) => API.getWorkspaceBuildByNumber(ctx.username, ctx.workspaceName, ctx.buildNumber),
125+
getLogs: async (ctx) => API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor),
106126
streamWorkspaceBuildLogs: (ctx) => async (callback) => {
107-
const reader = await API.streamWorkspaceBuildLogs(ctx.buildId)
108-
109-
// Watching for the stream
110-
// eslint-disable-next-line no-constant-condition, @typescript-eslint/no-unnecessary-condition
111-
while (true) {
112-
const { value, done } = await reader.read()
113-
114-
if (done) {
127+
return new Promise<void>((resolve, reject) => {
128+
const proto = location.protocol === "https:" ? "wss:" : "ws:"
129+
const socket = new WebSocket(
130+
`${proto}//${location.host}/api/v2/workspacebuilds/${ctx.buildId}/logs?follow=true&after=` +
131+
ctx.timeCursor.getTime(),
132+
)
133+
socket.binaryType = "blob"
134+
socket.addEventListener("message", (event) => {
135+
callback({ type: "ADD_LOG", log: JSON.parse(event.data) })
136+
})
137+
socket.addEventListener("error", () => {
138+
reject(new Error("socket errored"))
139+
})
140+
socket.addEventListener("open", () => {
141+
resolve()
142+
})
143+
socket.addEventListener("close", () => {
144+
// When the socket closes, logs have finished streaming!
115145
callback("NO_MORE_LOGS")
116-
break
117-
}
118-
119-
callback({ type: "ADD_LOG", log: value })
120-
}
146+
resolve()
147+
})
148+
})
121149
},
122150
},
123151
},

0 commit comments

Comments
 (0)