Skip to content

fix: Use WebSockets to stream workspace build logs #2569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/uuid"
"nhooyr.io/websocket"

"cdr.dev/slog"

Expand Down Expand Up @@ -98,12 +99,28 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return
}

api.websocketWaitMutex.Lock()
api.websocketWaitGroup.Add(1)
api.websocketWaitMutex.Unlock()
defer api.websocketWaitGroup.Done()
conn, err := websocket.Accept(rw, r, nil)
if err != nil {
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
Message: "Failed to accept websocket.",
Detail: err.Error(),
})
return
}

ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.

bufferedLogs := make(chan database.ProvisionerJobLog, 128)
closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) {
var logs []database.ProvisionerJobLog
err := json.Unmarshal(message, &logs)
if err != nil {
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
api.Logger.Warn(ctx, fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
return
}

Expand All @@ -113,7 +130,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
default:
// If this overflows users could miss logs streaming. This can happen
// if a database request takes a long amount of time, and we get a lot of logs.
api.Logger.Warn(r.Context(), "provisioner job log overflowing channel")
api.Logger.Warn(ctx, "provisioner job log overflowing channel")
}
}
})
Expand All @@ -126,7 +143,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
}
defer closeSubscribe()

provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: job.ID,
CreatedAfter: after,
CreatedBefore: before,
Expand All @@ -142,17 +159,8 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return
}

// "follow" uses the ndjson format to stream data.
// See: https://canjs.com/doc/can-ndjson-stream.html
rw.Header().Set("Content-Type", "application/stream+json")
rw.WriteHeader(http.StatusOK)
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}

// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(rw)

encoder := json.NewEncoder(wsNetConn)
for _, provisionerJobLog := range provisionerJobLogs {
err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog))
if err != nil {
Expand All @@ -171,9 +179,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
if err != nil {
return
}
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}
case <-ticker.C:
job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID)
if err != nil {
Expand Down
28 changes: 23 additions & 5 deletions codersdk/provisionerdaemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/http/cookiejar"
"net/url"
"strconv"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"

"github.com/coder/coder/coderd/httpmw"
)

type LogSource string
Expand Down Expand Up @@ -106,17 +111,30 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
if !after.IsZero() {
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
}
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?follow%s", path, afterQuery), nil)
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
defer res.Body.Close()
jar, err := cookiejar.New(nil)
if err != nil {
return nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(followURL, []*http.Cookie{{
Name: httpmw.SessionTokenKey,
Value: c.SessionToken,
}})
httpClient := &http.Client{
Jar: jar,
}
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
return nil, readBodyAsError(res)
}

logs := make(chan ProvisionerJobLog)
decoder := json.NewDecoder(res.Body)
decoder := json.NewDecoder(websocket.NetConn(ctx, conn, websocket.MessageText))
go func() {
defer close(logs)
var log ProvisionerJobLog
Expand Down
4 changes: 3 additions & 1 deletion scripts/build_go_matrix.sh
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ for spec in "${specs[@]}"; do
--os "$spec_os" \
--arch "$spec_arch" \
--output "$spec_output_binary" \
"${build_args[@]}"
"${build_args[@]}" &
log
log

Expand Down Expand Up @@ -227,3 +227,5 @@ for spec in "${specs[@]}"; do
log
fi
done

wait
21 changes: 4 additions & 17 deletions site/src/api/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import axios, { AxiosRequestHeaders } from "axios"
import ndjsonStream from "can-ndjson-stream"
import * as Types from "./types"
import { WorkspaceBuildTransition } from "./types"
import * as TypesGen from "./typesGenerated"
Expand Down Expand Up @@ -280,25 +279,13 @@ export const getWorkspaceBuildByNumber = async (
return response.data
}

export const getWorkspaceBuildLogs = async (buildname: string): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs`)
export const getWorkspaceBuildLogs = async (buildname: string, before: Date): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(
`/api/v2/workspacebuilds/${buildname}/logs?before=${before.getTime()}`,
)
return response.data
}

export const streamWorkspaceBuildLogs = async (
buildname: string,
): Promise<ReadableStreamDefaultReader<TypesGen.ProvisionerJobLog>> => {
// Axios does not support HTTP stream in the browser
// https://github.com/axios/axios/issues/1474
// So we are going to use window.fetch and return a "stream" reader
const reader = await window
.fetch(`/api/v2/workspacebuilds/${buildname}/logs?follow=true`)
.then((res) => ndjsonStream<TypesGen.ProvisionerJobLog>(res.body))
.then((stream) => stream.getReader())

return reader
}

export const putWorkspaceExtension = async (
workspaceId: string,
extendWorkspaceRequest: TypesGen.PutExtendWorkspaceRequest,
Expand Down
12 changes: 6 additions & 6 deletions site/src/api/typesGenerated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export interface ParameterSchema {
readonly validation_contains?: string[]
}

// From codersdk/provisionerdaemons.go:33:6
// From codersdk/provisionerdaemons.go:38:6
export interface ProvisionerDaemon {
readonly id: string
readonly created_at: string
Expand All @@ -210,7 +210,7 @@ export interface ProvisionerDaemon {
readonly provisioners: ProvisionerType[]
}

// From codersdk/provisionerdaemons.go:62:6
// From codersdk/provisionerdaemons.go:67:6
export interface ProvisionerJob {
readonly id: string
readonly created_at: string
Expand All @@ -222,7 +222,7 @@ export interface ProvisionerJob {
readonly storage_source: string
}

// From codersdk/provisionerdaemons.go:73:6
// From codersdk/provisionerdaemons.go:78:6
export interface ProvisionerJobLog {
readonly id: string
readonly created_at: string
Expand Down Expand Up @@ -485,10 +485,10 @@ export interface WorkspaceResource {
// From codersdk/workspacebuilds.go:22:6
export type BuildReason = "autostart" | "autostop" | "initiator"

// From codersdk/provisionerdaemons.go:23:6
// From codersdk/provisionerdaemons.go:28:6
export type LogLevel = "debug" | "error" | "info" | "trace" | "warn"

// From codersdk/provisionerdaemons.go:16:6
// From codersdk/provisionerdaemons.go:21:6
export type LogSource = "provisioner" | "provisioner_daemon"

// From codersdk/parameters.go:29:6
Expand All @@ -503,7 +503,7 @@ export type ParameterSourceScheme = "data" | "none"
// From codersdk/parameters.go:37:6
export type ParameterTypeSystem = "hcl" | "none"

// From codersdk/provisionerdaemons.go:42:6
// From codersdk/provisionerdaemons.go:47:6
export type ProvisionerJobStatus = "canceled" | "canceling" | "failed" | "pending" | "running" | "succeeded"

// From codersdk/organizations.go:14:6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,3 @@ export const Example = Template.bind({})
Example.args = {
logs: MockWorkspaceBuildLogs,
}

export const Loading = Template.bind({})
Loading.args = {
logs: MockWorkspaceBuildLogs,
isWaitingForLogs: true,
}
15 changes: 3 additions & 12 deletions site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import CircularProgress from "@material-ui/core/CircularProgress"
import { makeStyles } from "@material-ui/core/styles"
import dayjs from "dayjs"
import { FC } from "react"
Expand Down Expand Up @@ -40,33 +39,29 @@ const getStageDurationInSeconds = (logs: ProvisionerJobLog[]) => {

export interface WorkspaceBuildLogsProps {
logs: ProvisionerJobLog[]
isWaitingForLogs: boolean
}

export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs, isWaitingForLogs }) => {
export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs }) => {
const groupedLogsByStage = groupLogsByStage(logs)
const stages = Object.keys(groupedLogsByStage)
const styles = useStyles()

return (
<div className={styles.logs}>
{stages.map((stage, stageIndex) => {
{stages.map((stage) => {
const logs = groupedLogsByStage[stage]
const isEmpty = logs.every((log) => log.output === "")
const lines = logs.map((log) => ({
time: log.created_at,
output: log.output,
}))
const duration = getStageDurationInSeconds(logs)
const isLastStage = stageIndex === stages.length - 1
const shouldDisplaySpinner = isWaitingForLogs && isLastStage
const shouldDisplayDuration = !isWaitingForLogs && duration
const shouldDisplayDuration = duration !== undefined

return (
<div key={stage}>
<div className={styles.header}>
<div>{stage}</div>
{shouldDisplaySpinner && <CircularProgress size={14} className={styles.spinner} />}
{shouldDisplayDuration && (
<div className={styles.duration}>
{duration} {Language.seconds}
Expand Down Expand Up @@ -109,8 +104,4 @@ const useStyles = makeStyles((theme) => ({
padding: theme.spacing(2),
paddingLeft: theme.spacing(4),
},

spinner: {
marginLeft: "auto",
},
}))
27 changes: 14 additions & 13 deletions site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.test.tsx
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import { screen } from "@testing-library/react"
import * as API from "../../api/api"
import WS from "jest-websocket-mock"
import { MockWorkspace, MockWorkspaceBuild, renderWithAuth } from "../../testHelpers/renderHelpers"
import { WorkspaceBuildPage } from "./WorkspaceBuildPage"

describe("WorkspaceBuildPage", () => {
it("renders the stats and logs", async () => {
jest.spyOn(API, "streamWorkspaceBuildLogs").mockResolvedValueOnce({
read() {
return Promise.resolve({
value: undefined,
done: true,
})
},
releaseLock: jest.fn(),
closed: Promise.resolve(undefined),
cancel: jest.fn(),
})
const server = new WS(`ws://localhost/api/v2/workspacebuilds/${MockWorkspaceBuild.id}/logs`)
renderWithAuth(<WorkspaceBuildPage />, {
route: `/@${MockWorkspace.owner_name}/${MockWorkspace.name}/builds/${MockWorkspace.latest_build.build_number}`,
path: "/@:username/:workspace/builds/:buildNumber",
})

await server.connected
const log = {
id: "70459334-4878-4bda-a546-98eee166c4c6",
created_at: "2022-05-19T16:46:02.283Z",
log_source: "provisioner_daemon",
log_level: "info",
stage: "Another stage",
output: "",
}
server.send(JSON.stringify(log))
await screen.findByText(MockWorkspaceBuild.workspace_name)
await screen.findByText(log.stage)
server.close()
})
})
7 changes: 4 additions & 3 deletions site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import { WorkspaceBuildPageView } from "./WorkspaceBuildPageView"

export const WorkspaceBuildPage: FC = () => {
const { username, workspace: workspaceName, buildNumber } = useParams()
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber } })
const [buildState] = useMachine(workspaceBuildMachine, {
context: { username, workspaceName, buildNumber, timeCursor: new Date() },
})
const { logs, build } = buildState.context
const isWaitingForLogs = !buildState.matches("logs.loaded")

return (
<>
<Helmet>
<title>{build ? pageTitle(`Build #${build.build_number} · ${build.workspace_name}`) : ""}</title>
</Helmet>

<WorkspaceBuildPageView logs={logs} build={build} isWaitingForLogs={isWaitingForLogs} />
<WorkspaceBuildPageView logs={logs} build={build} />
</>
)
}
5 changes: 2 additions & 3 deletions site/src/pages/WorkspaceBuildPage/WorkspaceBuildPageView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ const sortLogsByCreatedAt = (logs: ProvisionerJobLog[]) => {
export interface WorkspaceBuildPageViewProps {
logs: ProvisionerJobLog[] | undefined
build: WorkspaceBuild | undefined
isWaitingForLogs: boolean
}

export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build, isWaitingForLogs }) => {
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build }) => {
return (
<Margins>
<PageHeader>
Expand All @@ -27,7 +26,7 @@ export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs,
<Stack>
{build && <WorkspaceBuildStats build={build} />}
{!logs && <Loader />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={isWaitingForLogs} />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} />}
</Stack>
</Margins>
)
Expand Down
3 changes: 3 additions & 0 deletions site/src/testHelpers/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,7 @@ export const handlers = [
rest.patch("/api/v2/workspacebuilds/:workspaceBuildId/cancel", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockCancellationMessage))
}),
rest.get("/api/v2/workspacebuilds/:workspaceBuildId/logs", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockWorkspaceBuildLogs))
}),
]
Loading