diff --git a/site/package.json b/site/package.json index 7f45637237cf7..51ec024ae2fa1 100644 --- a/site/package.json +++ b/site/package.json @@ -166,7 +166,6 @@ "@vitejs/plugin-react": "4.3.4", "autoprefixer": "10.4.20", "chromatic": "11.25.2", - "eventsourcemock": "2.0.0", "express": "4.21.2", "jest": "29.7.0", "jest-canvas-mock": "2.5.2", diff --git a/site/pnpm-lock.yaml b/site/pnpm-lock.yaml index d08ab3c523083..fc5dbb43876f6 100644 --- a/site/pnpm-lock.yaml +++ b/site/pnpm-lock.yaml @@ -403,9 +403,6 @@ importers: chromatic: specifier: 11.25.2 version: 11.25.2 - eventsourcemock: - specifier: 2.0.0 - version: 2.0.0 express: specifier: 4.21.2 version: 4.21.2 @@ -3796,9 +3793,6 @@ packages: eventemitter3@4.0.7: resolution: {integrity: sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==, tarball: https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz} - eventsourcemock@2.0.0: - resolution: {integrity: sha512-tSmJnuE+h6A8/hLRg0usf1yL+Q8w01RQtmg0Uzgoxk/HIPZrIUeAr/A4es/8h1wNsoG8RdiESNQLTKiNwbSC3Q==, tarball: https://registry.npmjs.org/eventsourcemock/-/eventsourcemock-2.0.0.tgz} - execa@5.1.1: resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==, tarball: https://registry.npmjs.org/execa/-/execa-5.1.1.tgz} engines: {node: '>=10'} @@ -10017,8 +10011,6 @@ snapshots: eventemitter3@4.0.7: {} - eventsourcemock@2.0.0: {} - execa@5.1.1: dependencies: cross-spawn: 7.0.6 diff --git a/site/src/@types/eventsourcemock.d.ts b/site/src/@types/eventsourcemock.d.ts deleted file mode 100644 index 296c4f19c33ce..0000000000000 --- a/site/src/@types/eventsourcemock.d.ts +++ /dev/null @@ -1 +0,0 @@ -declare module "eventsourcemock"; diff --git a/site/src/api/api.ts b/site/src/api/api.ts index 85953bbce736f..3a43772a02657 100644 --- a/site/src/api/api.ts +++ b/site/src/api/api.ts @@ -22,9 +22,10 @@ import globalAxios, { type AxiosInstance, isAxiosError } from "axios"; import type dayjs from "dayjs"; import userAgentParser from "ua-parser-js"; +import { OneWayWebSocket } from "utils/OneWayWebSocket"; import { delay } from "../utils/delay"; -import * as TypesGen from "./typesGenerated"; import type { PostWorkspaceUsageRequest } from "./typesGenerated"; +import * as TypesGen from "./typesGenerated"; const getMissingParameters = ( oldBuildParameters: TypesGen.WorkspaceBuildParameter[], @@ -101,61 +102,40 @@ const getMissingParameters = ( }; /** - * * @param agentId - * @returns An EventSource that emits agent metadata event objects - * (ServerSentEvent) + * @returns {OneWayWebSocket} A OneWayWebSocket that emits Server-Sent Events. */ -export const watchAgentMetadata = (agentId: string): EventSource => { - return new EventSource( - `${location.protocol}//${location.host}/api/v2/workspaceagents/${agentId}/watch-metadata`, - { withCredentials: true }, - ); +export const watchAgentMetadata = ( + agentId: string, +): OneWayWebSocket => { + return new OneWayWebSocket({ + apiRoute: `/api/v2/workspaceagents/${agentId}/watch-metadata-ws`, + }); }; /** - * @returns {EventSource} An EventSource that emits workspace event objects - * (ServerSentEvent) + * @returns {OneWayWebSocket} A OneWayWebSocket that emits Server-Sent Events. */ -export const watchWorkspace = (workspaceId: string): EventSource => { - return new EventSource( - `${location.protocol}//${location.host}/api/v2/workspaces/${workspaceId}/watch`, - { withCredentials: true }, - ); +export const watchWorkspace = ( + workspaceId: string, +): OneWayWebSocket => { + return new OneWayWebSocket({ + apiRoute: `/api/v2/workspaces/${workspaceId}/watch-ws`, + }); }; -type WatchInboxNotificationsParams = { +type WatchInboxNotificationsParams = Readonly<{ read_status?: "read" | "unread" | "all"; -}; +}>; -export const watchInboxNotifications = ( - onNewNotification: (res: TypesGen.GetInboxNotificationResponse) => void, +export function watchInboxNotifications( params?: WatchInboxNotificationsParams, -) => { - const searchParams = new URLSearchParams(params); - const socket = createWebSocket( - "/api/v2/notifications/inbox/watch", - searchParams, - ); - - socket.addEventListener("message", (event) => { - try { - const res = JSON.parse( - event.data, - ) as TypesGen.GetInboxNotificationResponse; - onNewNotification(res); - } catch (error) { - console.warn("Error parsing inbox notification: ", error); - } - }); - - socket.addEventListener("error", (event) => { - console.warn("Watch inbox notifications error: ", event); - socket.close(); +): OneWayWebSocket { + return new OneWayWebSocket({ + apiRoute: "/api/v2/notifications/inbox/watch", + searchParams: params, }); - - return socket; -}; +} export const getURLWithSearchParams = ( basePath: string, @@ -1125,7 +1105,7 @@ class ApiMethods { }; getWorkspaceByOwnerAndName = async ( - username = "me", + username: string, workspaceName: string, params?: TypesGen.WorkspaceOptions, ): Promise => { @@ -1138,7 +1118,7 @@ class ApiMethods { }; getWorkspaceBuildByNumber = async ( - username = "me", + username: string, workspaceName: string, buildNumber: number, ): Promise => { @@ -1324,7 +1304,7 @@ class ApiMethods { }; createWorkspace = async ( - userId = "me", + userId: string, workspace: TypesGen.CreateWorkspaceRequest, ): Promise => { const response = await this.axios.post( @@ -2542,7 +2522,7 @@ function createWebSocket( ) { const protocol = location.protocol === "https:" ? "wss:" : "ws:"; const socket = new WebSocket( - `${protocol}//${location.host}${path}?${params.toString()}`, + `${protocol}//${location.host}${path}?${params}`, ); socket.binaryType = "blob"; return socket; diff --git a/site/src/modules/notifications/NotificationsInbox/NotificationsInbox.tsx b/site/src/modules/notifications/NotificationsInbox/NotificationsInbox.tsx index 656d87fbe31d3..cdbf0941b7fdb 100644 --- a/site/src/modules/notifications/NotificationsInbox/NotificationsInbox.tsx +++ b/site/src/modules/notifications/NotificationsInbox/NotificationsInbox.tsx @@ -61,21 +61,31 @@ export const NotificationsInbox: FC = ({ ); useEffect(() => { - const socket = watchInboxNotifications( - (res) => { - updateNotificationsCache((prev) => { - return { - unread_count: res.unread_count, - notifications: [res.notification, ...prev.notifications], - }; - }); - }, - { read_status: "unread" }, - ); + const socket = watchInboxNotifications({ read_status: "unread" }); - return () => { + socket.addEventListener("message", (e) => { + if (e.parseError) { + console.warn("Error parsing inbox notification: ", e.parseError); + return; + } + + const msg = e.parsedMessage; + updateNotificationsCache((current) => { + return { + unread_count: msg.unread_count, + notifications: [msg.notification, ...current.notifications], + }; + }); + }); + + socket.addEventListener("error", () => { + displayError( + "Unable to retrieve latest inbox notifications. Please try refreshing the browser.", + ); socket.close(); - }; + }); + + return () => socket.close(); }, [updateNotificationsCache]); const { diff --git a/site/src/modules/resources/AgentMetadata.tsx b/site/src/modules/resources/AgentMetadata.tsx index 81b5a14994e81..5e5501809ee49 100644 --- a/site/src/modules/resources/AgentMetadata.tsx +++ b/site/src/modules/resources/AgentMetadata.tsx @@ -3,9 +3,11 @@ import Skeleton from "@mui/material/Skeleton"; import Tooltip from "@mui/material/Tooltip"; import { watchAgentMetadata } from "api/api"; import type { + ServerSentEvent, WorkspaceAgent, WorkspaceAgentMetadata, } from "api/typesGenerated"; +import { displayError } from "components/GlobalSnackbar/utils"; import { Stack } from "components/Stack/Stack"; import dayjs from "dayjs"; import { @@ -17,6 +19,7 @@ import { useState, } from "react"; import { MONOSPACE_FONT_FAMILY } from "theme/constants"; +import type { OneWayWebSocket } from "utils/OneWayWebSocket"; type ItemStatus = "stale" | "valid" | "loading"; @@ -42,50 +45,82 @@ interface AgentMetadataProps { storybookMetadata?: WorkspaceAgentMetadata[]; } +const maxSocketErrorRetryCount = 3; + export const AgentMetadata: FC = ({ agent, storybookMetadata, }) => { - const [metadata, setMetadata] = useState< - WorkspaceAgentMetadata[] | undefined - >(undefined); - + const [activeMetadata, setActiveMetadata] = useState(storybookMetadata); useEffect(() => { + // This is an unfortunate pitfall with this component's testing setup, + // but even though we use the value of storybookMetadata as the initial + // value of the activeMetadata, we cannot put activeMetadata itself into + // the dependency array. If we did, we would destroy and rebuild each + // connection every single time a new message comes in from the socket, + // because the socket has to be wired up to the state setter if (storybookMetadata !== undefined) { - setMetadata(storybookMetadata); return; } - let timeout: ReturnType | undefined = undefined; - - const connect = (): (() => void) => { - const source = watchAgentMetadata(agent.id); + let timeoutId: number | undefined = undefined; + let activeSocket: OneWayWebSocket | null = null; + let retries = 0; + + const createNewConnection = () => { + const socket = watchAgentMetadata(agent.id); + activeSocket = socket; + + socket.addEventListener("error", () => { + setActiveMetadata(undefined); + window.clearTimeout(timeoutId); + + // The error event is supposed to fire when an error happens + // with the connection itself, which implies that the connection + // would auto-close. Couldn't find a definitive answer on MDN, + // though, so closing it manually just to be safe + socket.close(); + activeSocket = null; + + retries++; + if (retries >= maxSocketErrorRetryCount) { + displayError( + "Unexpected disconnect while watching Metadata changes. Please try refreshing the page.", + ); + return; + } - source.onerror = (e) => { - console.error("received error in watch stream", e); - setMetadata(undefined); - source.close(); + displayError( + "Unexpected disconnect while watching Metadata changes. Creating new connection...", + ); + timeoutId = window.setTimeout(() => { + createNewConnection(); + }, 3_000); + }); - timeout = setTimeout(() => { - connect(); - }, 3000); - }; + socket.addEventListener("message", (e) => { + if (e.parseError) { + displayError( + "Unable to process newest response from server. Please try refreshing the page.", + ); + return; + } - source.addEventListener("data", (e) => { - const data = JSON.parse(e.data); - setMetadata(data); - }); - return () => { - if (timeout !== undefined) { - clearTimeout(timeout); + const msg = e.parsedMessage; + if (msg.type === "data") { + setActiveMetadata(msg.data as WorkspaceAgentMetadata[]); } - source.close(); - }; + }); + }; + + createNewConnection(); + return () => { + window.clearTimeout(timeoutId); + activeSocket?.close(); }; - return connect(); }, [agent.id, storybookMetadata]); - if (metadata === undefined) { + if (activeMetadata === undefined) { return (
@@ -93,7 +128,7 @@ export const AgentMetadata: FC = ({ ); } - return ; + return ; }; export const AgentMetadataSkeleton: FC = () => { diff --git a/site/src/modules/templates/useWatchVersionLogs.ts b/site/src/modules/templates/useWatchVersionLogs.ts index 5574e083a9849..1e77b0eb1b073 100644 --- a/site/src/modules/templates/useWatchVersionLogs.ts +++ b/site/src/modules/templates/useWatchVersionLogs.ts @@ -1,46 +1,38 @@ import { watchBuildLogsByTemplateVersionId } from "api/api"; import type { ProvisionerJobLog, TemplateVersion } from "api/typesGenerated"; +import { useEffectEvent } from "hooks/hookPolyfills"; import { useEffect, useState } from "react"; export const useWatchVersionLogs = ( templateVersion: TemplateVersion | undefined, options?: { onDone: () => Promise }, ) => { - const [logs, setLogs] = useState(); + const [logs, setLogs] = useState(); const templateVersionId = templateVersion?.id; - const templateVersionStatus = templateVersion?.job.status; + const [cachedVersionId, setCachedVersionId] = useState(templateVersionId); + if (cachedVersionId !== templateVersionId) { + setCachedVersionId(templateVersionId); + setLogs([]); + } - // biome-ignore lint/correctness/useExhaustiveDependencies: consider refactoring + const stableOnDone = useEffectEvent(() => options?.onDone()); + const status = templateVersion?.job.status; + const canWatch = status === "running" || status === "pending"; useEffect(() => { - setLogs(undefined); - }, [templateVersionId]); - - useEffect(() => { - if (!templateVersionId || !templateVersionStatus) { - return; - } - - if ( - templateVersionStatus !== "running" && - templateVersionStatus !== "pending" - ) { + if (!templateVersionId || !canWatch) { return; } const socket = watchBuildLogsByTemplateVersionId(templateVersionId, { - onMessage: (log) => { - setLogs((logs) => (logs ? [...logs, log] : [log])); - }, - onDone: options?.onDone, - onError: (error) => { - console.error(error); + onError: (error) => console.error(error), + onDone: stableOnDone, + onMessage: (newLog) => { + setLogs((current) => [...(current ?? []), newLog]); }, }); - return () => { - socket.close(); - }; - }, [options?.onDone, templateVersionId, templateVersionStatus]); + return () => socket.close(); + }, [stableOnDone, canWatch, templateVersionId]); return logs; }; diff --git a/site/src/pages/WorkspacePage/WorkspacePage.test.tsx b/site/src/pages/WorkspacePage/WorkspacePage.test.tsx index 50f47a4721320..d120ad5546c17 100644 --- a/site/src/pages/WorkspacePage/WorkspacePage.test.tsx +++ b/site/src/pages/WorkspacePage/WorkspacePage.test.tsx @@ -2,7 +2,7 @@ import { screen, waitFor, within } from "@testing-library/react"; import userEvent from "@testing-library/user-event"; import * as apiModule from "api/api"; import type { TemplateVersionParameter, Workspace } from "api/typesGenerated"; -import EventSourceMock from "eventsourcemock"; +import MockServerSocket from "jest-websocket-mock"; import { DashboardContext, type DashboardProvider, @@ -84,23 +84,11 @@ const testButton = async ( const user = userEvent.setup(); await user.click(button); - expect(actionMock).toBeCalled(); + expect(actionMock).toHaveBeenCalled(); }; -let originalEventSource: typeof window.EventSource; - -beforeAll(() => { - originalEventSource = window.EventSource; - // mocking out EventSource for SSE - window.EventSource = EventSourceMock; -}); - -beforeEach(() => { - jest.resetAllMocks(); -}); - -afterAll(() => { - window.EventSource = originalEventSource; +afterEach(() => { + MockServerSocket.clean(); }); describe("WorkspacePage", () => { diff --git a/site/src/pages/WorkspacePage/WorkspacePage.tsx b/site/src/pages/WorkspacePage/WorkspacePage.tsx index cd2b5f48cb6d3..a55971abfb576 100644 --- a/site/src/pages/WorkspacePage/WorkspacePage.tsx +++ b/site/src/pages/WorkspacePage/WorkspacePage.tsx @@ -5,6 +5,7 @@ import { workspaceBuildsKey } from "api/queries/workspaceBuilds"; import { workspaceByOwnerAndName } from "api/queries/workspaces"; import type { Workspace } from "api/typesGenerated"; import { ErrorAlert } from "components/Alert/ErrorAlert"; +import { displayError } from "components/GlobalSnackbar/utils"; import { Loader } from "components/Loader/Loader"; import { Margins } from "components/Margins/Margins"; import { useEffectEvent } from "hooks/hookPolyfills"; @@ -82,20 +83,26 @@ export const WorkspacePage: FC = () => { return; } - const eventSource = watchWorkspace(workspaceId); + const socket = watchWorkspace(workspaceId); + socket.addEventListener("message", (event) => { + if (event.parseError) { + displayError( + "Unable to process latest data from the server. Please try refreshing the page.", + ); + return; + } - eventSource.addEventListener("data", async (event) => { - const newWorkspaceData = JSON.parse(event.data) as Workspace; - await updateWorkspaceData(newWorkspaceData); + if (event.parsedMessage.type === "data") { + updateWorkspaceData(event.parsedMessage.data as Workspace); + } }); - - eventSource.addEventListener("error", (event) => { - console.error("Error on getting workspace changes.", event); + socket.addEventListener("error", () => { + displayError( + "Unable to get workspace changes. Connection has been closed.", + ); }); - return () => { - eventSource.close(); - }; + return () => socket.close(); }, [updateWorkspaceData, workspaceId]); // Page statuses diff --git a/site/src/utils/OneWayWebSocket.test.ts b/site/src/utils/OneWayWebSocket.test.ts new file mode 100644 index 0000000000000..c6b00b593111f --- /dev/null +++ b/site/src/utils/OneWayWebSocket.test.ts @@ -0,0 +1,492 @@ +/** + * @file Sets up unit tests for OneWayWebSocket. + * + * 2025-03-18 - Really wanted to define these as integration tests with MSW, but + * getting it set up correctly for Jest and JSDOM got a little screwy. That can + * be revisited in the future, but in the meantime, we're assuming that the base + * WebSocket class doesn't have any bugs, and can safely be mocked out. + */ + +import { + type OneWayMessageEvent, + OneWayWebSocket, + type WebSocketEventType, +} from "./OneWayWebSocket"; + +type MockPublisher = Readonly<{ + publishMessage: (event: MessageEvent) => void; + publishError: (event: ErrorEvent) => void; + publishClose: (event: CloseEvent) => void; + publishOpen: (event: Event) => void; +}>; + +function createMockWebSocket( + url: string, + protocols?: string | string[], +): readonly [WebSocket, MockPublisher] { + type EventMap = { + message: MessageEvent; + error: ErrorEvent; + close: CloseEvent; + open: Event; + }; + type CallbackStore = { + [K in keyof EventMap]: ((event: EventMap[K]) => void)[]; + }; + + let activeProtocol: string; + if (Array.isArray(protocols)) { + activeProtocol = protocols[0] ?? ""; + } else if (typeof protocols === "string") { + activeProtocol = protocols; + } else { + activeProtocol = ""; + } + + let closed = false; + const store: CallbackStore = { + message: [], + error: [], + close: [], + open: [], + }; + + const mockSocket: WebSocket = { + CONNECTING: 0, + OPEN: 1, + CLOSING: 2, + CLOSED: 3, + + url, + protocol: activeProtocol, + readyState: 1, + binaryType: "blob", + bufferedAmount: 0, + extensions: "", + onclose: null, + onerror: null, + onmessage: null, + onopen: null, + send: jest.fn(), + dispatchEvent: jest.fn(), + + addEventListener: ( + eventType: E, + callback: WebSocketEventMap[E], + ) => { + if (closed) { + return; + } + + const subscribers = store[eventType]; + const cb = callback as unknown as CallbackStore[E][0]; + if (!subscribers.includes(cb)) { + subscribers.push(cb); + } + }, + + removeEventListener: ( + eventType: E, + callback: WebSocketEventMap[E], + ) => { + if (closed) { + return; + } + + const subscribers = store[eventType]; + const cb = callback as unknown as CallbackStore[E][0]; + if (subscribers.includes(cb)) { + const updated = store[eventType].filter((c) => c !== cb); + store[eventType] = updated as unknown as CallbackStore[E]; + } + }, + + close: () => { + closed = true; + }, + }; + + const publisher: MockPublisher = { + publishOpen: (event) => { + if (closed) { + return; + } + for (const sub of store.open) { + sub(event); + } + }, + + publishError: (event) => { + if (closed) { + return; + } + for (const sub of store.error) { + sub(event); + } + }, + + publishMessage: (event) => { + if (closed) { + return; + } + for (const sub of store.message) { + sub(event); + } + }, + + publishClose: (event) => { + if (closed) { + return; + } + for (const sub of store.close) { + sub(event); + } + }, + }; + + return [mockSocket, publisher] as const; +} + +describe(OneWayWebSocket.name, () => { + const dummyRoute = "/api/v2/blah"; + + it("Errors out if API route does not start with '/api/v2/'", () => { + const testRoutes: string[] = ["blah", "", "/", "/api", "/api/v225"]; + + for (const r of testRoutes) { + expect(() => { + new OneWayWebSocket({ + apiRoute: r, + websocketInit: (url, protocols) => { + const [socket] = createMockWebSocket(url, protocols); + return socket; + }, + }); + }).toThrow(Error); + } + }); + + it("Lets a consumer add an event listener of each type", () => { + let publisher!: MockPublisher; + const oneWay = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket, pub] = createMockWebSocket(url, protocols); + publisher = pub; + return socket; + }, + }); + + const onOpen = jest.fn(); + const onClose = jest.fn(); + const onError = jest.fn(); + const onMessage = jest.fn(); + + oneWay.addEventListener("open", onOpen); + oneWay.addEventListener("close", onClose); + oneWay.addEventListener("error", onError); + oneWay.addEventListener("message", onMessage); + + publisher.publishOpen(new Event("open")); + publisher.publishClose(new CloseEvent("close")); + publisher.publishError( + new ErrorEvent("error", { + error: new Error("Whoops - connection broke"), + }), + ); + publisher.publishMessage( + new MessageEvent("message", { + data: "null", + }), + ); + + expect(onOpen).toHaveBeenCalledTimes(1); + expect(onClose).toHaveBeenCalledTimes(1); + expect(onError).toHaveBeenCalledTimes(1); + expect(onMessage).toHaveBeenCalledTimes(1); + }); + + it("Lets a consumer remove an event listener of each type", () => { + let publisher!: MockPublisher; + const oneWay = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket, pub] = createMockWebSocket(url, protocols); + publisher = pub; + return socket; + }, + }); + + const onOpen = jest.fn(); + const onClose = jest.fn(); + const onError = jest.fn(); + const onMessage = jest.fn(); + + oneWay.addEventListener("open", onOpen); + oneWay.addEventListener("close", onClose); + oneWay.addEventListener("error", onError); + oneWay.addEventListener("message", onMessage); + + oneWay.removeEventListener("open", onOpen); + oneWay.removeEventListener("close", onClose); + oneWay.removeEventListener("error", onError); + oneWay.removeEventListener("message", onMessage); + + publisher.publishOpen(new Event("open")); + publisher.publishClose(new CloseEvent("close")); + publisher.publishError( + new ErrorEvent("error", { + error: new Error("Whoops - connection broke"), + }), + ); + publisher.publishMessage( + new MessageEvent("message", { + data: "null", + }), + ); + + expect(onOpen).toHaveBeenCalledTimes(0); + expect(onClose).toHaveBeenCalledTimes(0); + expect(onError).toHaveBeenCalledTimes(0); + expect(onMessage).toHaveBeenCalledTimes(0); + }); + + it("Only calls each callback once if callback is added multiple times", () => { + let publisher!: MockPublisher; + const oneWay = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket, pub] = createMockWebSocket(url, protocols); + publisher = pub; + return socket; + }, + }); + + const onOpen = jest.fn(); + const onClose = jest.fn(); + const onError = jest.fn(); + const onMessage = jest.fn(); + + for (let i = 0; i < 10; i++) { + oneWay.addEventListener("open", onOpen); + oneWay.addEventListener("close", onClose); + oneWay.addEventListener("error", onError); + oneWay.addEventListener("message", onMessage); + } + + publisher.publishOpen(new Event("open")); + publisher.publishClose(new CloseEvent("close")); + publisher.publishError( + new ErrorEvent("error", { + error: new Error("Whoops - connection broke"), + }), + ); + publisher.publishMessage( + new MessageEvent("message", { + data: "null", + }), + ); + + expect(onOpen).toHaveBeenCalledTimes(1); + expect(onClose).toHaveBeenCalledTimes(1); + expect(onError).toHaveBeenCalledTimes(1); + expect(onMessage).toHaveBeenCalledTimes(1); + }); + + it("Lets consumers register multiple callbacks for each event type", () => { + let publisher!: MockPublisher; + const oneWay = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket, pub] = createMockWebSocket(url, protocols); + publisher = pub; + return socket; + }, + }); + + const onOpen1 = jest.fn(); + const onClose1 = jest.fn(); + const onError1 = jest.fn(); + const onMessage1 = jest.fn(); + oneWay.addEventListener("open", onOpen1); + oneWay.addEventListener("close", onClose1); + oneWay.addEventListener("error", onError1); + oneWay.addEventListener("message", onMessage1); + + const onOpen2 = jest.fn(); + const onClose2 = jest.fn(); + const onError2 = jest.fn(); + const onMessage2 = jest.fn(); + oneWay.addEventListener("open", onOpen2); + oneWay.addEventListener("close", onClose2); + oneWay.addEventListener("error", onError2); + oneWay.addEventListener("message", onMessage2); + + publisher.publishOpen(new Event("open")); + publisher.publishClose(new CloseEvent("close")); + publisher.publishError( + new ErrorEvent("error", { + error: new Error("Whoops - connection broke"), + }), + ); + publisher.publishMessage( + new MessageEvent("message", { + data: "null", + }), + ); + + expect(onOpen1).toHaveBeenCalledTimes(1); + expect(onClose1).toHaveBeenCalledTimes(1); + expect(onError1).toHaveBeenCalledTimes(1); + expect(onMessage1).toHaveBeenCalledTimes(1); + + expect(onOpen2).toHaveBeenCalledTimes(1); + expect(onClose2).toHaveBeenCalledTimes(1); + expect(onError2).toHaveBeenCalledTimes(1); + expect(onMessage2).toHaveBeenCalledTimes(1); + }); + + it("Computes the socket protocol based on the browser location protocol", () => { + const oneWay1 = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket] = createMockWebSocket(url, protocols); + return socket; + }, + location: { + protocol: "https:", + host: "www.cool.com", + }, + }); + const oneWay2 = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket] = createMockWebSocket(url, protocols); + return socket; + }, + location: { + protocol: "http:", + host: "www.cool.com", + }, + }); + + expect(oneWay1.url).toMatch(/^wss:\/\//); + expect(oneWay2.url).toMatch(/^ws:\/\//); + }); + + it("Gives consumers pre-parsed versions of message events", () => { + let publisher!: MockPublisher; + const oneWay = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket, pub] = createMockWebSocket(url, protocols); + publisher = pub; + return socket; + }, + }); + + const onMessage = jest.fn(); + oneWay.addEventListener("message", onMessage); + + const payload = { + value: 5, + cool: "yes", + }; + const event = new MessageEvent("message", { + data: JSON.stringify(payload), + }); + + publisher.publishMessage(event); + expect(onMessage).toHaveBeenCalledWith({ + sourceEvent: event, + parsedMessage: payload, + parseError: undefined, + }); + }); + + it("Exposes parsing error if message payload could not be parsed as JSON", () => { + let publisher!: MockPublisher; + const oneWay = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket, pub] = createMockWebSocket(url, protocols); + publisher = pub; + return socket; + }, + }); + + const onMessage = jest.fn(); + oneWay.addEventListener("message", onMessage); + + const payload = "definitely not valid JSON"; + const event = new MessageEvent("message", { + data: payload, + }); + publisher.publishMessage(event); + + const arg: OneWayMessageEvent = onMessage.mock.lastCall[0]; + expect(arg.sourceEvent).toEqual(event); + expect(arg.parsedMessage).toEqual(undefined); + expect(arg.parseError).toBeInstanceOf(Error); + }); + + it("Passes all search param values through Websocket URL", () => { + const input1: Record = { + cool: "yeah", + yeah: "cool", + blah: "5", + }; + const oneWay1 = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket] = createMockWebSocket(url, protocols); + return socket; + }, + searchParams: input1, + location: { + protocol: "https:", + host: "www.blah.com", + }, + }); + let [base, params] = oneWay1.url.split("?"); + expect(base).toBe("wss://www.blah.com/api/v2/blah"); + for (const [key, value] of Object.entries(input1)) { + expect(params).toContain(`${key}=${value}`); + } + + const input2 = new URLSearchParams(input1); + const oneWay2 = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket] = createMockWebSocket(url, protocols); + return socket; + }, + searchParams: input2, + location: { + protocol: "https:", + host: "www.blah.com", + }, + }); + [base, params] = oneWay2.url.split("?"); + expect(base).toBe("wss://www.blah.com/api/v2/blah"); + for (const [key, value] of Object.entries(input2)) { + expect(params).toContain(`${key}=${value}`); + } + + const oneWay3 = new OneWayWebSocket({ + apiRoute: dummyRoute, + websocketInit: (url, protocols) => { + const [socket] = createMockWebSocket(url, protocols); + return socket; + }, + searchParams: undefined, + location: { + protocol: "https:", + host: "www.blah.com", + }, + }); + [base, params] = oneWay3.url.split("?"); + expect(base).toBe("wss://www.blah.com/api/v2/blah"); + expect(params).toBe(undefined); + }); +}); diff --git a/site/src/utils/OneWayWebSocket.ts b/site/src/utils/OneWayWebSocket.ts new file mode 100644 index 0000000000000..94ed1f1efc868 --- /dev/null +++ b/site/src/utils/OneWayWebSocket.ts @@ -0,0 +1,198 @@ +/** + * @file A wrapper over WebSockets that (1) enforces one-way communication, and + * (2) supports automatically parsing JSON messages as they come in. + * + * This should ALWAYS be favored in favor of using Server-Sent Events and the + * built-in EventSource class for doing one-way communication. SSEs have a hard + * limitation on HTTP/1.1 and below where there is a maximum number of 6 ports + * that can ever be used for a domain (sometimes less depending on the browser). + * Not only is this limit shared with short-lived REST requests, but it also + * applies across tabs and windows. So if a user opens Coder in multiple tabs, + * there is a very real possibility that parts of the app will start to lock up + * without it being clear why. + * + * WebSockets do not have this limitation, even on HTTP/1.1 – all modern + * browsers implement at least some degree of multiplexing for them. + */ + +// Not bothering with trying to borrow methods from the base WebSocket type +// because it's already a mess of inheritance and generics, and we're going to +// have to add a few more +export type WebSocketEventType = "close" | "error" | "message" | "open"; + +export type OneWayMessageEvent = Readonly< + | { + sourceEvent: MessageEvent; + parsedMessage: TData; + parseError: undefined; + } + | { + sourceEvent: MessageEvent; + parsedMessage: undefined; + parseError: Error; + } +>; + +type OneWayEventPayloadMap = { + close: CloseEvent; + error: Event; + message: OneWayMessageEvent; + open: Event; +}; + +type WebSocketMessageCallback = (payload: MessageEvent) => void; + +type OneWayEventCallback = ( + payload: OneWayEventPayloadMap[TEvent], +) => void; + +interface OneWayWebSocketApi { + get url(): string; + + addEventListener: ( + eventType: TEvent, + callback: OneWayEventCallback, + ) => void; + + removeEventListener: ( + eventType: TEvent, + callback: OneWayEventCallback, + ) => void; + + close: (closeCode?: number, reason?: string) => void; +} + +type OneWayWebSocketInit = Readonly<{ + apiRoute: string; + serverProtocols?: string | string[]; + searchParams?: Record | URLSearchParams; + binaryType?: BinaryType; + websocketInit?: (url: string, protocols?: string | string[]) => WebSocket; + location?: Readonly<{ + protocol: string; + host: string; + }>; +}>; + +function defaultInit(url: string, protocols?: string | string[]): WebSocket { + return new WebSocket(url, protocols); +} + +export class OneWayWebSocket + implements OneWayWebSocketApi +{ + readonly #socket: WebSocket; + readonly #messageCallbackWrappers = new Map< + OneWayEventCallback, + WebSocketMessageCallback + >(); + + constructor(init: OneWayWebSocketInit) { + const { + apiRoute, + searchParams, + serverProtocols, + binaryType = "blob", + location = window.location, + websocketInit = defaultInit, + } = init; + + if (!apiRoute.startsWith("/api/v2/")) { + throw new Error(`API route '${apiRoute}' does not begin with a slash`); + } + + const formattedParams = + searchParams instanceof URLSearchParams + ? searchParams + : new URLSearchParams(searchParams); + const paramsString = formattedParams.toString(); + const paramsSuffix = paramsString ? `?${paramsString}` : ""; + const wsProtocol = location.protocol === "https:" ? "wss:" : "ws:"; + const url = `${wsProtocol}//${location.host}${apiRoute}${paramsSuffix}`; + + this.#socket = websocketInit(url, serverProtocols); + this.#socket.binaryType = binaryType; + } + + get url(): string { + return this.#socket.url; + } + + addEventListener( + event: TEvent, + callback: OneWayEventCallback, + ): void { + // Not happy about all the type assertions, but there are some nasty + // type contravariance issues if you try to resolve the function types + // properly. This is actually the lesser of two evils + const looseCallback = callback as OneWayEventCallback< + TData, + WebSocketEventType + >; + + if (this.#messageCallbackWrappers.has(looseCallback)) { + return; + } + if (event !== "message") { + this.#socket.addEventListener(event, looseCallback); + return; + } + + const wrapped = (event: MessageEvent): void => { + const messageCallback = looseCallback as OneWayEventCallback< + TData, + "message" + >; + + try { + const message = JSON.parse(event.data) as TData; + messageCallback({ + sourceEvent: event, + parseError: undefined, + parsedMessage: message, + }); + } catch (err) { + messageCallback({ + sourceEvent: event, + parseError: err as Error, + parsedMessage: undefined, + }); + } + }; + + this.#socket.addEventListener(event as "message", wrapped); + this.#messageCallbackWrappers.set(looseCallback, wrapped); + } + + removeEventListener( + event: TEvent, + callback: OneWayEventCallback, + ): void { + const looseCallback = callback as OneWayEventCallback< + TData, + WebSocketEventType + >; + + if (event !== "message") { + this.#socket.removeEventListener(event, looseCallback); + return; + } + if (!this.#messageCallbackWrappers.has(looseCallback)) { + return; + } + + const wrapper = this.#messageCallbackWrappers.get(looseCallback); + if (wrapper === undefined) { + throw new Error( + `Cannot unregister callback for event ${event}. This is likely an issue with the browser itself.`, + ); + } + + this.#socket.removeEventListener(event as "message", wrapper); + this.#messageCallbackWrappers.delete(looseCallback); + } + + close(closeCode?: number, reason?: string): void { + this.#socket.close(closeCode, reason); + } +}