diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 675f6ba..da08b0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,19 +17,26 @@ jobs: strategy: matrix: node-version: - - 18 + - 14 + - 20 steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v3 + uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} - name: Install dependencies run: npm ci + # the "override" keyword was added in typescript@4.5.0 + # else, users can go down to typescript@3.8.x ("import type") + - name: Install TypeScript 4.5 + run: npm i typescript@4.5 + if: ${{ matrix.node-version == '14' }} + - name: Run tests run: npm test diff --git a/CHANGELOG.md b/CHANGELOG.md index b299f40..4475b99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # History +- [2.5.5](#255-2024-06-18) (Jun 2024) +- [2.5.4](#254-2024-02-22) (Feb 2024) +- [2.5.3](#253-2024-02-21) (Feb 2024) - [2.5.2](#252-2023-01-12) (Jan 2023) - [2.5.1](#251-2023-01-06) (Jan 2023) - [2.5.0](#250-2023-01-06) (Jan 2023) @@ -19,6 +22,67 @@ # Release notes +## [2.5.5](https://github.com/socketio/socket.io-adapter/compare/2.5.4...2.5.5) (2024-06-18) + +This release contains a bump of the `ws` dependency, which includes an important [security fix](https://github.com/websockets/ws/commit/e55e5106f10fcbaac37cfa89759e4cc0d073a52c). + +Advisory: https://github.com/advisories/GHSA-3h5v-q93c-6h6q + + + +## [2.5.4](https://github.com/socketio/socket.io-adapter/compare/2.5.3...2.5.4) (2024-02-22) + + +### Bug Fixes + +* ensure the order of the commands ([a13f35f](https://github.com/socketio/socket.io-adapter/commit/a13f35f0e6b85bbba07f99ee2440e914f1429d83)) +* **types:** ensure compatibility with TypeScript < 4.5 ([ca397f3](https://github.com/socketio/socket.io-adapter/commit/ca397f3afe06ed9390db52b70a506a9721e091d8)) + + + +## [2.5.3](https://github.com/socketio/socket.io-adapter/compare/2.5.2...2.5.3) (2024-02-21) + +Two abstract classes were imported from the [Redis adapter repository](https://github.com/socketio/socket.io-redis-adapter/blob/bd32763043a2eb79a21dffd8820f20e598348adf/lib/cluster-adapter.ts): + +- the `ClusterAdapter` class, which manages the messages sent between the server instances of the cluster +- the `ClusterAdapterWithHeartbeat` class, which extends the `ClusterAdapter` and adds a heartbeat mechanism in order to check the healthiness of the other instances + +Other adapters can then just extend those classes and only have to implement the pub/sub mechanism (and not the internal chit-chat protocol): + +```js +class MyAdapter extends ClusterAdapterWithHeartbeat { + constructor(nsp, pubSub, opts) { + super(nsp, opts); + this.pubSub = pubSub; + pubSub.subscribe("main-channel", (message) => this.onMessage(message)); + pubSub.subscribe("specific-channel#" + this.uid, (response) => this.onResponse(response)); + } + + doPublish(message) { + return this.pubSub.publish("main-channel", message); + } + + doPublishResponse(requesterUid, response) { + return this.pubSub.publish("specific-channel#" + requesterUid, response); + } +} +``` + +Besides, the number of "timeout reached: only x responses received out of y" errors (which can happen when a server instance leaves the cluster) should be greatly reduced by [this commit](https://github.com/socketio/socket.io-adapter/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f). + + +### Bug Fixes + +* **cluster:** fix count in fetchSockets() method ([80af4e9](https://github.com/socketio/socket.io-adapter/commit/80af4e939c9caf89b0234ba1e676a3887c8d0ce6)) +* **cluster:** notify the other nodes when closing ([0e23ff0](https://github.com/socketio/socket.io-adapter/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f)) + + +### Performance Improvements + +* **cluster:** use timer.refresh() ([d99a71b](https://github.com/socketio/socket.io-adapter/commit/d99a71b5588f53f0b181eee989ab2ac939f965db)) + + + ## [2.5.2](https://github.com/socketio/socket.io-adapter/compare/2.5.1...2.5.2) (2023-01-12) The `ws` dependency was moved from `peerDependencies` to `dependencies`, in order to prevent issues like [this](https://github.com/socketio/socket.io-redis-adapter/issues/478). diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts new file mode 100644 index 0000000..143bb6f --- /dev/null +++ b/lib/cluster-adapter.ts @@ -0,0 +1,1010 @@ +import { Adapter } from "./in-memory-adapter"; +import type { + BroadcastFlags, + BroadcastOptions, + Room, +} from "./in-memory-adapter"; +import { debug as debugModule } from "debug"; +import { randomBytes } from "crypto"; + +const debug = debugModule("socket.io-adapter"); +const EMITTER_UID = "emitter"; +const DEFAULT_TIMEOUT = 5000; + +function randomId() { + return randomBytes(8).toString("hex"); +} + +type DistributiveOmit = T extends any + ? Omit + : never; + +/** + * The unique ID of a server + */ +export type ServerId = string; + +/** + * The unique ID of a message (for the connection state recovery feature) + */ +export type Offset = string; + +export interface ClusterAdapterOptions { + /** + * The number of ms between two heartbeats. + * @default 5_000 + */ + heartbeatInterval?: number; + /** + * The number of ms without heartbeat before we consider a node down. + * @default 10_000 + */ + heartbeatTimeout?: number; +} + +export enum MessageType { + INITIAL_HEARTBEAT = 1, + HEARTBEAT, + BROADCAST, + SOCKETS_JOIN, + SOCKETS_LEAVE, + DISCONNECT_SOCKETS, + FETCH_SOCKETS, + FETCH_SOCKETS_RESPONSE, + SERVER_SIDE_EMIT, + SERVER_SIDE_EMIT_RESPONSE, + BROADCAST_CLIENT_COUNT, + BROADCAST_ACK, + ADAPTER_CLOSE, +} + +export type ClusterMessage = { + uid: ServerId; + nsp: string; +} & ( + | { + type: + | MessageType.INITIAL_HEARTBEAT + | MessageType.HEARTBEAT + | MessageType.ADAPTER_CLOSE; + } + | { + type: MessageType.BROADCAST; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + packet: unknown; + requestId?: string; + }; + } + | { + type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + rooms: string[]; + }; + } + | { + type: MessageType.DISCONNECT_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + close?: boolean; + }; + } + | { + type: MessageType.FETCH_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + requestId: string; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT; + data: { + requestId?: string; + packet: any[]; + }; + } +); + +interface ClusterRequest { + type: MessageType; + resolve: Function; + timeout: NodeJS.Timeout; + expected: number; + current: number; + responses: any[]; +} + +export type ClusterResponse = { + uid: ServerId; + nsp: string; +} & ( + | { + type: MessageType.FETCH_SOCKETS_RESPONSE; + data: { + requestId: string; + sockets: unknown[]; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE; + data: { + requestId: string; + packet: unknown; + }; + } + | { + type: MessageType.BROADCAST_CLIENT_COUNT; + data: { + requestId: string; + clientCount: number; + }; + } + | { + type: MessageType.BROADCAST_ACK; + data: { + requestId: string; + packet: unknown; + }; + } +); + +interface ClusterAckRequest { + clientCountCallback: (clientCount: number) => void; + ack: (...args: any[]) => void; +} + +function encodeOptions(opts: BroadcastOptions) { + return { + rooms: [...opts.rooms], + except: [...opts.except], + flags: opts.flags, + }; +} + +function decodeOptions(opts): BroadcastOptions { + return { + rooms: new Set(opts.rooms), + except: new Set(opts.except), + flags: opts.flags, + }; +} + +/** + * A cluster-ready adapter. Any extending class must: + * + * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse} + * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse} + */ +export abstract class ClusterAdapter extends Adapter { + protected readonly uid: ServerId; + + private requests: Map = new Map(); + private ackRequests: Map = new Map(); + + protected constructor(nsp) { + super(nsp); + this.uid = randomId(); + } + + /** + * Called when receiving a message from another member of the cluster. + * + * @param message + * @param offset + * @protected + */ + protected onMessage(message: ClusterMessage, offset?: string) { + if (message.uid === this.uid) { + return debug("[%s] ignore message from self", this.uid); + } + + debug( + "[%s] new event of type %d from %s", + this.uid, + message.type, + message.uid + ); + + switch (message.type) { + case MessageType.BROADCAST: { + const withAck = message.data.requestId !== undefined; + if (withAck) { + super.broadcastWithAck( + message.data.packet, + decodeOptions(message.data.opts), + (clientCount) => { + debug( + "[%s] waiting for %d client acknowledgements", + this.uid, + clientCount + ); + this.publishResponse(message.uid, { + type: MessageType.BROADCAST_CLIENT_COUNT, + data: { + requestId: message.data.requestId, + clientCount, + }, + }); + }, + (arg) => { + debug( + "[%s] received acknowledgement with value %j", + this.uid, + arg + ); + this.publishResponse(message.uid, { + type: MessageType.BROADCAST_ACK, + data: { + requestId: message.data.requestId, + packet: arg, + }, + }); + } + ); + } else { + const packet = message.data.packet; + const opts = decodeOptions(message.data.opts); + + this.addOffsetIfNecessary(packet, opts, offset); + + super.broadcast(packet, opts); + } + break; + } + + case MessageType.SOCKETS_JOIN: + super.addSockets(decodeOptions(message.data.opts), message.data.rooms); + break; + + case MessageType.SOCKETS_LEAVE: + super.delSockets(decodeOptions(message.data.opts), message.data.rooms); + break; + + case MessageType.DISCONNECT_SOCKETS: + super.disconnectSockets( + decodeOptions(message.data.opts), + message.data.close + ); + break; + + case MessageType.FETCH_SOCKETS: { + debug( + "[%s] calling fetchSockets with opts %j", + this.uid, + message.data.opts + ); + super + .fetchSockets(decodeOptions(message.data.opts)) + .then((localSockets) => { + this.publishResponse(message.uid, { + type: MessageType.FETCH_SOCKETS_RESPONSE, + data: { + requestId: message.data.requestId, + sockets: localSockets.map((socket) => { + // remove sessionStore from handshake, as it may contain circular references + const { sessionStore, ...handshake } = socket.handshake; + return { + id: socket.id, + handshake, + rooms: [...socket.rooms], + data: socket.data, + }; + }), + }, + }); + }); + break; + } + + case MessageType.SERVER_SIDE_EMIT: { + const packet = message.data.packet; + const withAck = message.data.requestId !== undefined; + if (!withAck) { + this.nsp._onServerSideEmit(packet); + return; + } + let called = false; + const callback = (arg: any) => { + // only one argument is expected + if (called) { + return; + } + called = true; + debug("[%s] calling acknowledgement with %j", this.uid, arg); + this.publishResponse(message.uid, { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE, + data: { + requestId: message.data.requestId, + packet: arg, + }, + }); + }; + + this.nsp._onServerSideEmit([...packet, callback]); + break; + } + + // @ts-ignore + case MessageType.BROADCAST_CLIENT_COUNT: + // @ts-ignore + case MessageType.BROADCAST_ACK: + // @ts-ignore + case MessageType.FETCH_SOCKETS_RESPONSE: + // @ts-ignore + case MessageType.SERVER_SIDE_EMIT_RESPONSE: + // extending classes may not make a distinction between a ClusterMessage and a ClusterResponse payload and may + // always call the onMessage() method + this.onResponse(message); + break; + + default: + debug("[%s] unknown message type: %s", this.uid, message.type); + } + } + + /** + * Called when receiving a response from another member of the cluster. + * + * @param response + * @protected + */ + protected onResponse(response: ClusterResponse) { + const requestId = response.data.requestId; + + debug( + "[%s] received response %s to request %s", + this.uid, + response.type, + requestId + ); + + switch (response.type) { + case MessageType.BROADCAST_CLIENT_COUNT: { + this.ackRequests + .get(requestId) + ?.clientCountCallback(response.data.clientCount); + break; + } + + case MessageType.BROADCAST_ACK: { + this.ackRequests.get(requestId)?.ack(response.data.packet); + break; + } + + case MessageType.FETCH_SOCKETS_RESPONSE: { + const request = this.requests.get(requestId); + + if (!request) { + return; + } + + request.current++; + response.data.sockets.forEach((socket) => + request.responses.push(socket) + ); + + if (request.current === request.expected) { + clearTimeout(request.timeout); + request.resolve(request.responses); + this.requests.delete(requestId); + } + break; + } + + case MessageType.SERVER_SIDE_EMIT_RESPONSE: { + const request = this.requests.get(requestId); + + if (!request) { + return; + } + + request.current++; + request.responses.push(response.data.packet); + + if (request.current === request.expected) { + clearTimeout(request.timeout); + request.resolve(null, request.responses); + this.requests.delete(requestId); + } + break; + } + + default: + // @ts-ignore + debug("[%s] unknown response type: %s", this.uid, response.type); + } + } + + override async broadcast(packet: any, opts: BroadcastOptions) { + const onlyLocal = opts.flags?.local; + + if (!onlyLocal) { + try { + const offset = await this.publishAndReturnOffset({ + type: MessageType.BROADCAST, + data: { + packet, + opts: encodeOptions(opts), + }, + }); + this.addOffsetIfNecessary(packet, opts, offset); + } catch (e) { + return debug( + "[%s] error while broadcasting message: %s", + this.uid, + e.message + ); + } + } + + super.broadcast(packet, opts); + } + + /** + * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it + * reconnects after a temporary disconnection. + * + * @param packet + * @param opts + * @param offset + * @private + */ + private addOffsetIfNecessary( + packet: any, + opts: BroadcastOptions, + offset: Offset + ) { + if (!this.nsp.server.opts.connectionStateRecovery) { + return; + } + const isEventPacket = packet.type === 2; + // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and + // restored on another server upon reconnection + const withoutAcknowledgement = packet.id === undefined; + const notVolatile = opts.flags?.volatile === undefined; + + if (isEventPacket && withoutAcknowledgement && notVolatile) { + packet.data.push(offset); + } + } + + override broadcastWithAck( + packet: any, + opts: BroadcastOptions, + clientCountCallback: (clientCount: number) => void, + ack: (...args: any[]) => void + ) { + const onlyLocal = opts?.flags?.local; + if (!onlyLocal) { + const requestId = randomId(); + + this.ackRequests.set(requestId, { + clientCountCallback, + ack, + }); + + this.publish({ + type: MessageType.BROADCAST, + data: { + packet, + requestId, + opts: encodeOptions(opts), + }, + }); + + // we have no way to know at this level whether the server has received an acknowledgement from each client, so we + // will simply clean up the ackRequests map after the given delay + setTimeout(() => { + this.ackRequests.delete(requestId); + }, opts.flags!.timeout); + } + + super.broadcastWithAck(packet, opts, clientCountCallback, ack); + } + + override async addSockets(opts: BroadcastOptions, rooms: Room[]) { + const onlyLocal = opts.flags?.local; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } + } + + super.addSockets(opts, rooms); + } + + override async delSockets(opts: BroadcastOptions, rooms: Room[]) { + const onlyLocal = opts.flags?.local; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } + } + + super.delSockets(opts, rooms); + } + + override async disconnectSockets(opts: BroadcastOptions, close: boolean) { + const onlyLocal = opts.flags?.local; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions(opts), + close, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } + } + + super.disconnectSockets(opts, close); + } + + async fetchSockets(opts: BroadcastOptions): Promise { + const [localSockets, serverCount] = await Promise.all([ + super.fetchSockets(opts), + this.serverCount(), + ]); + const expectedResponseCount = serverCount - 1; + + if (opts.flags?.local || expectedResponseCount <= 0) { + return localSockets; + } + + const requestId = randomId(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + const storedRequest = this.requests.get(requestId); + if (storedRequest) { + reject( + new Error( + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + ) + ); + this.requests.delete(requestId); + } + }, opts.flags.timeout || DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.FETCH_SOCKETS, + resolve, + timeout, + current: 0, + expected: expectedResponseCount, + responses: localSockets, + }; + this.requests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.FETCH_SOCKETS, + data: { + opts: encodeOptions(opts), + requestId, + }, + }); + }); + } + + override async serverSideEmit(packet: any[]) { + const withAck = typeof packet[packet.length - 1] === "function"; + + if (!withAck) { + return this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet, + }, + }); + } + + const ack = packet.pop(); + const expectedResponseCount = (await this.serverCount()) - 1; + + debug( + '[%s] waiting for %d responses to "serverSideEmit" request', + this.uid, + expectedResponseCount + ); + + if (expectedResponseCount <= 0) { + return ack(null, []); + } + + const requestId = randomId(); + + const timeout = setTimeout(() => { + const storedRequest = this.requests.get(requestId); + if (storedRequest) { + ack( + new Error( + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + ), + storedRequest.responses + ); + this.requests.delete(requestId); + } + }, DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.SERVER_SIDE_EMIT, + resolve: ack, + timeout, + current: 0, + expected: expectedResponseCount, + responses: [], + }; + this.requests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + requestId, // the presence of this attribute defines whether an acknowledgement is needed + packet, + }, + }); + } + + protected publish( + message: DistributiveOmit + ): void { + this.publishAndReturnOffset(message).catch((err) => { + debug("[%s] error while publishing message: %s", this.uid, err); + }); + } + + protected publishAndReturnOffset( + message: DistributiveOmit + ) { + (message as ClusterMessage).uid = this.uid; + (message as ClusterMessage).nsp = this.nsp.name; + return this.doPublish(message as ClusterMessage); + } + + /** + * Send a message to the other members of the cluster. + * + * @param message + * @protected + * @return an offset, if applicable + */ + protected abstract doPublish(message: ClusterMessage): Promise; + + protected publishResponse( + requesterUid: ServerId, + response: Omit + ) { + (response as ClusterResponse).uid = this.uid; + (response as ClusterResponse).nsp = this.nsp.name; + this.doPublishResponse(requesterUid, response as ClusterResponse).catch( + (err) => { + debug("[%s] error while publishing response: %s", this.uid, err); + } + ); + } + + /** + * Send a response to the given member of the cluster. + * + * @param requesterUid + * @param response + * @protected + */ + protected abstract doPublishResponse( + requesterUid: ServerId, + response: ClusterResponse + ): Promise; +} + +interface CustomClusterRequest { + type: MessageType; + resolve: Function; + timeout: NodeJS.Timeout; + missingUids: Set; + responses: any[]; +} + +export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { + private readonly _opts: Required; + + private heartbeatTimer: NodeJS.Timeout; + private nodesMap: Map = new Map(); // uid => timestamp of last message + private readonly cleanupTimer: NodeJS.Timeout | undefined; + private customRequests: Map = new Map(); + + protected constructor(nsp, opts: ClusterAdapterOptions) { + super(nsp); + this._opts = Object.assign( + { + heartbeatInterval: 5_000, + heartbeatTimeout: 10_000, + }, + opts + ); + this.cleanupTimer = setInterval(() => { + const now = Date.now(); + this.nodesMap.forEach((lastSeen, uid) => { + const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; + if (nodeSeemsDown) { + debug("[%s] node %s seems down", this.uid, uid); + this.removeNode(uid); + } + }); + }, 1_000); + } + + override init() { + this.publish({ + type: MessageType.INITIAL_HEARTBEAT, + }); + } + + private scheduleHeartbeat() { + if (this.heartbeatTimer) { + this.heartbeatTimer.refresh(); + } else { + this.heartbeatTimer = setTimeout(() => { + this.publish({ + type: MessageType.HEARTBEAT, + }); + }, this._opts.heartbeatInterval); + } + } + + override close() { + this.publish({ + type: MessageType.ADAPTER_CLOSE, + }); + clearTimeout(this.heartbeatTimer); + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + } + } + + override onMessage(message: ClusterMessage, offset?: string) { + if (message.uid === this.uid) { + return debug("[%s] ignore message from self", this.uid); + } + + if (message.uid && message.uid !== EMITTER_UID) { + // we track the UID of each sender, in order to know how many servers there are in the cluster + this.nodesMap.set(message.uid, Date.now()); + } + + debug( + "[%s] new event of type %d from %s", + this.uid, + message.type, + message.uid + ); + + switch (message.type) { + case MessageType.INITIAL_HEARTBEAT: + this.publish({ + type: MessageType.HEARTBEAT, + }); + break; + case MessageType.HEARTBEAT: + // nothing to do + break; + case MessageType.ADAPTER_CLOSE: + this.removeNode(message.uid); + break; + default: + super.onMessage(message, offset); + } + } + + override serverCount(): Promise { + return Promise.resolve(1 + this.nodesMap.size); + } + + override publish(message: DistributiveOmit) { + this.scheduleHeartbeat(); + + return super.publish(message); + } + + override async serverSideEmit(packet: any[]) { + const withAck = typeof packet[packet.length - 1] === "function"; + + if (!withAck) { + return this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet, + }, + }); + } + + const ack = packet.pop(); + const expectedResponseCount = this.nodesMap.size; + + debug( + '[%s] waiting for %d responses to "serverSideEmit" request', + this.uid, + expectedResponseCount + ); + + if (expectedResponseCount <= 0) { + return ack(null, []); + } + + const requestId = randomId(); + + const timeout = setTimeout(() => { + const storedRequest = this.customRequests.get(requestId); + if (storedRequest) { + ack( + new Error( + `timeout reached: missing ${storedRequest.missingUids.size} responses` + ), + storedRequest.responses + ); + this.customRequests.delete(requestId); + } + }, DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.SERVER_SIDE_EMIT, + resolve: ack, + timeout, + missingUids: new Set([...this.nodesMap.keys()]), + responses: [], + }; + this.customRequests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + requestId, // the presence of this attribute defines whether an acknowledgement is needed + packet, + }, + }); + } + + override async fetchSockets(opts: BroadcastOptions): Promise { + const [localSockets, serverCount] = await Promise.all([ + super.fetchSockets({ + rooms: opts.rooms, + except: opts.except, + flags: { + local: true, + }, + }), + this.serverCount(), + ]); + const expectedResponseCount = serverCount - 1; + + if (opts.flags?.local || expectedResponseCount <= 0) { + return localSockets as any[]; + } + + const requestId = randomId(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + const storedRequest = this.customRequests.get(requestId); + if (storedRequest) { + reject( + new Error( + `timeout reached: missing ${storedRequest.missingUids.size} responses` + ) + ); + this.customRequests.delete(requestId); + } + }, opts.flags.timeout || DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.FETCH_SOCKETS, + resolve, + timeout, + missingUids: new Set([...this.nodesMap.keys()]), + responses: localSockets as any[], + }; + this.customRequests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.FETCH_SOCKETS, + data: { + opts: encodeOptions(opts), + requestId, + }, + }); + }); + } + + override onResponse(response: ClusterResponse) { + const requestId = response.data.requestId; + + debug( + "[%s] received response %s to request %s", + this.uid, + response.type, + requestId + ); + + switch (response.type) { + case MessageType.FETCH_SOCKETS_RESPONSE: { + const request = this.customRequests.get(requestId); + + if (!request) { + return; + } + + (response.data.sockets as any[]).forEach((socket) => + request.responses.push(socket) + ); + + request.missingUids.delete(response.uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + request.resolve(request.responses); + this.customRequests.delete(requestId); + } + break; + } + + case MessageType.SERVER_SIDE_EMIT_RESPONSE: { + const request = this.customRequests.get(requestId); + + if (!request) { + return; + } + + request.responses.push(response.data.packet); + + request.missingUids.delete(response.uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + request.resolve(null, request.responses); + this.customRequests.delete(requestId); + } + break; + } + + default: + super.onResponse(response); + } + } + + private removeNode(uid: ServerId) { + this.customRequests.forEach((request, requestId) => { + request.missingUids.delete(uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + if (request.type === MessageType.FETCH_SOCKETS) { + request.resolve(request.responses); + } else if (request.type === MessageType.SERVER_SIDE_EMIT) { + request.resolve(null, request.responses); + } + this.customRequests.delete(requestId); + } + }); + + this.nodesMap.delete(uid); + } +} diff --git a/lib/in-memory-adapter.ts b/lib/in-memory-adapter.ts new file mode 100644 index 0000000..83d4d67 --- /dev/null +++ b/lib/in-memory-adapter.ts @@ -0,0 +1,507 @@ +import { EventEmitter } from "events"; +import { yeast } from "./contrib/yeast"; +import WebSocket = require("ws"); + +const canPreComputeFrame = typeof WebSocket?.Sender?.frame === "function"; + +/** + * A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging + */ +export type SocketId = string; +/** + * A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery + * upon reconnection + */ +export type PrivateSessionId = string; + +// we could extend the Room type to "string | number", but that would be a breaking change +// related: https://github.com/socketio/socket.io-redis-adapter/issues/418 +export type Room = string; + +export interface BroadcastFlags { + volatile?: boolean; + compress?: boolean; + local?: boolean; + broadcast?: boolean; + binary?: boolean; + timeout?: number; +} + +export interface BroadcastOptions { + rooms: Set; + except?: Set; + flags?: BroadcastFlags; +} + +interface SessionToPersist { + sid: SocketId; + pid: PrivateSessionId; + rooms: Room[]; + data: unknown; +} + +export type Session = SessionToPersist & { missedPackets: unknown[][] }; + +export class Adapter extends EventEmitter { + public rooms: Map> = new Map(); + public sids: Map> = new Map(); + private readonly encoder; + + /** + * In-memory adapter constructor. + * + * @param {Namespace} nsp + */ + constructor(readonly nsp: any) { + super(); + this.encoder = nsp.server.encoder; + } + + /** + * To be overridden + */ + public init(): Promise | void {} + + /** + * To be overridden + */ + public close(): Promise | void {} + + /** + * Returns the number of Socket.IO servers in the cluster + * + * @public + */ + public serverCount(): Promise { + return Promise.resolve(1); + } + + /** + * Adds a socket to a list of room. + * + * @param {SocketId} id the socket id + * @param {Set} rooms a set of rooms + * @public + */ + public addAll(id: SocketId, rooms: Set): Promise | void { + if (!this.sids.has(id)) { + this.sids.set(id, new Set()); + } + + for (const room of rooms) { + this.sids.get(id).add(room); + + if (!this.rooms.has(room)) { + this.rooms.set(room, new Set()); + this.emit("create-room", room); + } + if (!this.rooms.get(room).has(id)) { + this.rooms.get(room).add(id); + this.emit("join-room", room, id); + } + } + } + + /** + * Removes a socket from a room. + * + * @param {SocketId} id the socket id + * @param {Room} room the room name + */ + public del(id: SocketId, room: Room): Promise | void { + if (this.sids.has(id)) { + this.sids.get(id).delete(room); + } + + this._del(room, id); + } + + private _del(room: Room, id: SocketId) { + const _room = this.rooms.get(room); + if (_room != null) { + const deleted = _room.delete(id); + if (deleted) { + this.emit("leave-room", room, id); + } + if (_room.size === 0 && this.rooms.delete(room)) { + this.emit("delete-room", room); + } + } + } + + /** + * Removes a socket from all rooms it's joined. + * + * @param {SocketId} id the socket id + */ + public delAll(id: SocketId): void { + if (!this.sids.has(id)) { + return; + } + + for (const room of this.sids.get(id)) { + this._del(room, id); + } + + this.sids.delete(id); + } + + /** + * Broadcasts a packet. + * + * Options: + * - `flags` {Object} flags for this packet + * - `except` {Array} sids that should be excluded + * - `rooms` {Array} list of rooms to broadcast to + * + * @param {Object} packet the packet object + * @param {Object} opts the options + * @public + */ + public broadcast(packet: any, opts: BroadcastOptions): void { + const flags = opts.flags || {}; + const packetOpts = { + preEncoded: true, + volatile: flags.volatile, + compress: flags.compress, + }; + + packet.nsp = this.nsp.name; + const encodedPackets = this._encode(packet, packetOpts); + + this.apply(opts, (socket) => { + if (typeof socket.notifyOutgoingListeners === "function") { + socket.notifyOutgoingListeners(packet); + } + + socket.client.writeToEngine(encodedPackets, packetOpts); + }); + } + + /** + * Broadcasts a packet and expects multiple acknowledgements. + * + * Options: + * - `flags` {Object} flags for this packet + * - `except` {Array} sids that should be excluded + * - `rooms` {Array} list of rooms to broadcast to + * + * @param {Object} packet the packet object + * @param {Object} opts the options + * @param clientCountCallback - the number of clients that received the packet + * @param ack - the callback that will be called for each client response + * + * @public + */ + public broadcastWithAck( + packet: any, + opts: BroadcastOptions, + clientCountCallback: (clientCount: number) => void, + ack: (...args: any[]) => void + ) { + const flags = opts.flags || {}; + const packetOpts = { + preEncoded: true, + volatile: flags.volatile, + compress: flags.compress, + }; + + packet.nsp = this.nsp.name; + // we can use the same id for each packet, since the _ids counter is common (no duplicate) + packet.id = this.nsp._ids++; + + const encodedPackets = this._encode(packet, packetOpts); + + let clientCount = 0; + + this.apply(opts, (socket) => { + // track the total number of acknowledgements that are expected + clientCount++; + // call the ack callback for each client response + socket.acks.set(packet.id, ack); + + if (typeof socket.notifyOutgoingListeners === "function") { + socket.notifyOutgoingListeners(packet); + } + + socket.client.writeToEngine(encodedPackets, packetOpts); + }); + + clientCountCallback(clientCount); + } + + private _encode(packet: unknown, packetOpts: Record) { + const encodedPackets = this.encoder.encode(packet); + + if ( + canPreComputeFrame && + encodedPackets.length === 1 && + typeof encodedPackets[0] === "string" + ) { + // "4" being the "message" packet type in the Engine.IO protocol + const data = Buffer.from("4" + encodedPackets[0]); + // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 + packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, { + readOnly: false, + mask: false, + rsv1: false, + opcode: 1, + fin: true, + }); + } + + return encodedPackets; + } + + /** + * Gets a list of sockets by sid. + * + * @param {Set} rooms the explicit set of rooms to check. + */ + public sockets(rooms: Set): Promise> { + const sids = new Set(); + + this.apply({ rooms }, (socket) => { + sids.add(socket.id); + }); + + return Promise.resolve(sids); + } + + /** + * Gets the list of rooms a given socket has joined. + * + * @param {SocketId} id the socket id + */ + public socketRooms(id: SocketId): Set | undefined { + return this.sids.get(id); + } + + /** + * Returns the matching socket instances + * + * @param opts - the filters to apply + */ + public fetchSockets(opts: BroadcastOptions): Promise { + const sockets = []; + + this.apply(opts, (socket) => { + sockets.push(socket); + }); + + return Promise.resolve(sockets); + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param opts - the filters to apply + * @param rooms - the rooms to join + */ + public addSockets(opts: BroadcastOptions, rooms: Room[]): void { + this.apply(opts, (socket) => { + socket.join(rooms); + }); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param opts - the filters to apply + * @param rooms - the rooms to leave + */ + public delSockets(opts: BroadcastOptions, rooms: Room[]): void { + this.apply(opts, (socket) => { + rooms.forEach((room) => socket.leave(room)); + }); + } + + /** + * Makes the matching socket instances disconnect + * + * @param opts - the filters to apply + * @param close - whether to close the underlying connection + */ + public disconnectSockets(opts: BroadcastOptions, close: boolean): void { + this.apply(opts, (socket) => { + socket.disconnect(close); + }); + } + + private apply(opts: BroadcastOptions, callback: (socket) => void): void { + const rooms = opts.rooms; + const except = this.computeExceptSids(opts.except); + + if (rooms.size) { + const ids = new Set(); + for (const room of rooms) { + if (!this.rooms.has(room)) continue; + + for (const id of this.rooms.get(room)) { + if (ids.has(id) || except.has(id)) continue; + const socket = this.nsp.sockets.get(id); + if (socket) { + callback(socket); + ids.add(id); + } + } + } + } else { + for (const [id] of this.sids) { + if (except.has(id)) continue; + const socket = this.nsp.sockets.get(id); + if (socket) callback(socket); + } + } + } + + private computeExceptSids(exceptRooms?: Set) { + const exceptSids = new Set(); + if (exceptRooms && exceptRooms.size > 0) { + for (const room of exceptRooms) { + if (this.rooms.has(room)) { + this.rooms.get(room).forEach((sid) => exceptSids.add(sid)); + } + } + } + return exceptSids; + } + + /** + * Send a packet to the other Socket.IO servers in the cluster + * @param packet - an array of arguments, which may include an acknowledgement callback at the end + */ + public serverSideEmit(packet: any[]): void { + console.warn( + "this adapter does not support the serverSideEmit() functionality" + ); + } + + /** + * Save the client session in order to restore it upon reconnection. + */ + public persistSession(session: SessionToPersist) {} + + /** + * Restore the session and find the packets that were missed by the client. + * @param pid + * @param offset + */ + public restoreSession( + pid: PrivateSessionId, + offset: string + ): Promise { + return null; + } +} + +interface PersistedPacket { + id: string; + emittedAt: number; + data: unknown[]; + opts: BroadcastOptions; +} + +type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number }; + +export class SessionAwareAdapter extends Adapter { + private readonly maxDisconnectionDuration: number; + + private sessions: Map = new Map(); + private packets: PersistedPacket[] = []; + + constructor(readonly nsp: any) { + super(nsp); + this.maxDisconnectionDuration = + nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; + + const timer = setInterval(() => { + const threshold = Date.now() - this.maxDisconnectionDuration; + this.sessions.forEach((session, sessionId) => { + const hasExpired = session.disconnectedAt < threshold; + if (hasExpired) { + this.sessions.delete(sessionId); + } + }); + for (let i = this.packets.length - 1; i >= 0; i--) { + const hasExpired = this.packets[i].emittedAt < threshold; + if (hasExpired) { + this.packets.splice(0, i + 1); + break; + } + } + }, 60 * 1000); + // prevents the timer from keeping the process alive + timer.unref(); + } + + override persistSession(session: SessionToPersist) { + (session as SessionWithTimestamp).disconnectedAt = Date.now(); + this.sessions.set(session.pid, session as SessionWithTimestamp); + } + + override restoreSession( + pid: PrivateSessionId, + offset: string + ): Promise { + const session = this.sessions.get(pid); + if (!session) { + // the session may have expired + return null; + } + const hasExpired = + session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); + if (hasExpired) { + // the session has expired + this.sessions.delete(pid); + return null; + } + const index = this.packets.findIndex((packet) => packet.id === offset); + if (index === -1) { + // the offset may be too old + return null; + } + const missedPackets = []; + for (let i = index + 1; i < this.packets.length; i++) { + const packet = this.packets[i]; + if (shouldIncludePacket(session.rooms, packet.opts)) { + missedPackets.push(packet.data); + } + } + return Promise.resolve({ + ...session, + missedPackets, + }); + } + + override broadcast(packet: any, opts: BroadcastOptions) { + const isEventPacket = packet.type === 2; + // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and + // restored on another server upon reconnection + const withoutAcknowledgement = packet.id === undefined; + const notVolatile = opts.flags?.volatile === undefined; + if (isEventPacket && withoutAcknowledgement && notVolatile) { + const id = yeast(); + // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has + // processed (and the format is backward-compatible) + packet.data.push(id); + this.packets.push({ + id, + opts, + data: packet.data, + emittedAt: Date.now(), + }); + } + super.broadcast(packet, opts); + } +} + +function shouldIncludePacket( + sessionRooms: Room[], + opts: BroadcastOptions +): boolean { + const included = + opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); + const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); + return included && notExcluded; +} diff --git a/lib/index.ts b/lib/index.ts index 83d4d67..2665139 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,507 +1,21 @@ -import { EventEmitter } from "events"; -import { yeast } from "./contrib/yeast"; -import WebSocket = require("ws"); - -const canPreComputeFrame = typeof WebSocket?.Sender?.frame === "function"; - -/** - * A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging - */ -export type SocketId = string; -/** - * A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery - * upon reconnection - */ -export type PrivateSessionId = string; - -// we could extend the Room type to "string | number", but that would be a breaking change -// related: https://github.com/socketio/socket.io-redis-adapter/issues/418 -export type Room = string; - -export interface BroadcastFlags { - volatile?: boolean; - compress?: boolean; - local?: boolean; - broadcast?: boolean; - binary?: boolean; - timeout?: number; -} - -export interface BroadcastOptions { - rooms: Set; - except?: Set; - flags?: BroadcastFlags; -} - -interface SessionToPersist { - sid: SocketId; - pid: PrivateSessionId; - rooms: Room[]; - data: unknown; -} - -export type Session = SessionToPersist & { missedPackets: unknown[][] }; - -export class Adapter extends EventEmitter { - public rooms: Map> = new Map(); - public sids: Map> = new Map(); - private readonly encoder; - - /** - * In-memory adapter constructor. - * - * @param {Namespace} nsp - */ - constructor(readonly nsp: any) { - super(); - this.encoder = nsp.server.encoder; - } - - /** - * To be overridden - */ - public init(): Promise | void {} - - /** - * To be overridden - */ - public close(): Promise | void {} - - /** - * Returns the number of Socket.IO servers in the cluster - * - * @public - */ - public serverCount(): Promise { - return Promise.resolve(1); - } - - /** - * Adds a socket to a list of room. - * - * @param {SocketId} id the socket id - * @param {Set} rooms a set of rooms - * @public - */ - public addAll(id: SocketId, rooms: Set): Promise | void { - if (!this.sids.has(id)) { - this.sids.set(id, new Set()); - } - - for (const room of rooms) { - this.sids.get(id).add(room); - - if (!this.rooms.has(room)) { - this.rooms.set(room, new Set()); - this.emit("create-room", room); - } - if (!this.rooms.get(room).has(id)) { - this.rooms.get(room).add(id); - this.emit("join-room", room, id); - } - } - } - - /** - * Removes a socket from a room. - * - * @param {SocketId} id the socket id - * @param {Room} room the room name - */ - public del(id: SocketId, room: Room): Promise | void { - if (this.sids.has(id)) { - this.sids.get(id).delete(room); - } - - this._del(room, id); - } - - private _del(room: Room, id: SocketId) { - const _room = this.rooms.get(room); - if (_room != null) { - const deleted = _room.delete(id); - if (deleted) { - this.emit("leave-room", room, id); - } - if (_room.size === 0 && this.rooms.delete(room)) { - this.emit("delete-room", room); - } - } - } - - /** - * Removes a socket from all rooms it's joined. - * - * @param {SocketId} id the socket id - */ - public delAll(id: SocketId): void { - if (!this.sids.has(id)) { - return; - } - - for (const room of this.sids.get(id)) { - this._del(room, id); - } - - this.sids.delete(id); - } - - /** - * Broadcasts a packet. - * - * Options: - * - `flags` {Object} flags for this packet - * - `except` {Array} sids that should be excluded - * - `rooms` {Array} list of rooms to broadcast to - * - * @param {Object} packet the packet object - * @param {Object} opts the options - * @public - */ - public broadcast(packet: any, opts: BroadcastOptions): void { - const flags = opts.flags || {}; - const packetOpts = { - preEncoded: true, - volatile: flags.volatile, - compress: flags.compress, - }; - - packet.nsp = this.nsp.name; - const encodedPackets = this._encode(packet, packetOpts); - - this.apply(opts, (socket) => { - if (typeof socket.notifyOutgoingListeners === "function") { - socket.notifyOutgoingListeners(packet); - } - - socket.client.writeToEngine(encodedPackets, packetOpts); - }); - } - - /** - * Broadcasts a packet and expects multiple acknowledgements. - * - * Options: - * - `flags` {Object} flags for this packet - * - `except` {Array} sids that should be excluded - * - `rooms` {Array} list of rooms to broadcast to - * - * @param {Object} packet the packet object - * @param {Object} opts the options - * @param clientCountCallback - the number of clients that received the packet - * @param ack - the callback that will be called for each client response - * - * @public - */ - public broadcastWithAck( - packet: any, - opts: BroadcastOptions, - clientCountCallback: (clientCount: number) => void, - ack: (...args: any[]) => void - ) { - const flags = opts.flags || {}; - const packetOpts = { - preEncoded: true, - volatile: flags.volatile, - compress: flags.compress, - }; - - packet.nsp = this.nsp.name; - // we can use the same id for each packet, since the _ids counter is common (no duplicate) - packet.id = this.nsp._ids++; - - const encodedPackets = this._encode(packet, packetOpts); - - let clientCount = 0; - - this.apply(opts, (socket) => { - // track the total number of acknowledgements that are expected - clientCount++; - // call the ack callback for each client response - socket.acks.set(packet.id, ack); - - if (typeof socket.notifyOutgoingListeners === "function") { - socket.notifyOutgoingListeners(packet); - } - - socket.client.writeToEngine(encodedPackets, packetOpts); - }); - - clientCountCallback(clientCount); - } - - private _encode(packet: unknown, packetOpts: Record) { - const encodedPackets = this.encoder.encode(packet); - - if ( - canPreComputeFrame && - encodedPackets.length === 1 && - typeof encodedPackets[0] === "string" - ) { - // "4" being the "message" packet type in the Engine.IO protocol - const data = Buffer.from("4" + encodedPackets[0]); - // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 - packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, { - readOnly: false, - mask: false, - rsv1: false, - opcode: 1, - fin: true, - }); - } - - return encodedPackets; - } - - /** - * Gets a list of sockets by sid. - * - * @param {Set} rooms the explicit set of rooms to check. - */ - public sockets(rooms: Set): Promise> { - const sids = new Set(); - - this.apply({ rooms }, (socket) => { - sids.add(socket.id); - }); - - return Promise.resolve(sids); - } - - /** - * Gets the list of rooms a given socket has joined. - * - * @param {SocketId} id the socket id - */ - public socketRooms(id: SocketId): Set | undefined { - return this.sids.get(id); - } - - /** - * Returns the matching socket instances - * - * @param opts - the filters to apply - */ - public fetchSockets(opts: BroadcastOptions): Promise { - const sockets = []; - - this.apply(opts, (socket) => { - sockets.push(socket); - }); - - return Promise.resolve(sockets); - } - - /** - * Makes the matching socket instances join the specified rooms - * - * @param opts - the filters to apply - * @param rooms - the rooms to join - */ - public addSockets(opts: BroadcastOptions, rooms: Room[]): void { - this.apply(opts, (socket) => { - socket.join(rooms); - }); - } - - /** - * Makes the matching socket instances leave the specified rooms - * - * @param opts - the filters to apply - * @param rooms - the rooms to leave - */ - public delSockets(opts: BroadcastOptions, rooms: Room[]): void { - this.apply(opts, (socket) => { - rooms.forEach((room) => socket.leave(room)); - }); - } - - /** - * Makes the matching socket instances disconnect - * - * @param opts - the filters to apply - * @param close - whether to close the underlying connection - */ - public disconnectSockets(opts: BroadcastOptions, close: boolean): void { - this.apply(opts, (socket) => { - socket.disconnect(close); - }); - } - - private apply(opts: BroadcastOptions, callback: (socket) => void): void { - const rooms = opts.rooms; - const except = this.computeExceptSids(opts.except); - - if (rooms.size) { - const ids = new Set(); - for (const room of rooms) { - if (!this.rooms.has(room)) continue; - - for (const id of this.rooms.get(room)) { - if (ids.has(id) || except.has(id)) continue; - const socket = this.nsp.sockets.get(id); - if (socket) { - callback(socket); - ids.add(id); - } - } - } - } else { - for (const [id] of this.sids) { - if (except.has(id)) continue; - const socket = this.nsp.sockets.get(id); - if (socket) callback(socket); - } - } - } - - private computeExceptSids(exceptRooms?: Set) { - const exceptSids = new Set(); - if (exceptRooms && exceptRooms.size > 0) { - for (const room of exceptRooms) { - if (this.rooms.has(room)) { - this.rooms.get(room).forEach((sid) => exceptSids.add(sid)); - } - } - } - return exceptSids; - } - - /** - * Send a packet to the other Socket.IO servers in the cluster - * @param packet - an array of arguments, which may include an acknowledgement callback at the end - */ - public serverSideEmit(packet: any[]): void { - console.warn( - "this adapter does not support the serverSideEmit() functionality" - ); - } - - /** - * Save the client session in order to restore it upon reconnection. - */ - public persistSession(session: SessionToPersist) {} - - /** - * Restore the session and find the packets that were missed by the client. - * @param pid - * @param offset - */ - public restoreSession( - pid: PrivateSessionId, - offset: string - ): Promise { - return null; - } -} - -interface PersistedPacket { - id: string; - emittedAt: number; - data: unknown[]; - opts: BroadcastOptions; -} - -type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number }; - -export class SessionAwareAdapter extends Adapter { - private readonly maxDisconnectionDuration: number; - - private sessions: Map = new Map(); - private packets: PersistedPacket[] = []; - - constructor(readonly nsp: any) { - super(nsp); - this.maxDisconnectionDuration = - nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; - - const timer = setInterval(() => { - const threshold = Date.now() - this.maxDisconnectionDuration; - this.sessions.forEach((session, sessionId) => { - const hasExpired = session.disconnectedAt < threshold; - if (hasExpired) { - this.sessions.delete(sessionId); - } - }); - for (let i = this.packets.length - 1; i >= 0; i--) { - const hasExpired = this.packets[i].emittedAt < threshold; - if (hasExpired) { - this.packets.splice(0, i + 1); - break; - } - } - }, 60 * 1000); - // prevents the timer from keeping the process alive - timer.unref(); - } - - override persistSession(session: SessionToPersist) { - (session as SessionWithTimestamp).disconnectedAt = Date.now(); - this.sessions.set(session.pid, session as SessionWithTimestamp); - } - - override restoreSession( - pid: PrivateSessionId, - offset: string - ): Promise { - const session = this.sessions.get(pid); - if (!session) { - // the session may have expired - return null; - } - const hasExpired = - session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); - if (hasExpired) { - // the session has expired - this.sessions.delete(pid); - return null; - } - const index = this.packets.findIndex((packet) => packet.id === offset); - if (index === -1) { - // the offset may be too old - return null; - } - const missedPackets = []; - for (let i = index + 1; i < this.packets.length; i++) { - const packet = this.packets[i]; - if (shouldIncludePacket(session.rooms, packet.opts)) { - missedPackets.push(packet.data); - } - } - return Promise.resolve({ - ...session, - missedPackets, - }); - } - - override broadcast(packet: any, opts: BroadcastOptions) { - const isEventPacket = packet.type === 2; - // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and - // restored on another server upon reconnection - const withoutAcknowledgement = packet.id === undefined; - const notVolatile = opts.flags?.volatile === undefined; - if (isEventPacket && withoutAcknowledgement && notVolatile) { - const id = yeast(); - // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has - // processed (and the format is backward-compatible) - packet.data.push(id); - this.packets.push({ - id, - opts, - data: packet.data, - emittedAt: Date.now(), - }); - } - super.broadcast(packet, opts); - } -} - -function shouldIncludePacket( - sessionRooms: Room[], - opts: BroadcastOptions -): boolean { - const included = - opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); - const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); - return included && notExcluded; -} +export { + SocketId, + PrivateSessionId, + Room, + BroadcastFlags, + BroadcastOptions, + Session, + Adapter, + SessionAwareAdapter, +} from "./in-memory-adapter"; + +export { + ClusterAdapter, + ClusterAdapterWithHeartbeat, + ClusterAdapterOptions, + ClusterMessage, + ClusterResponse, + MessageType, + ServerId, + Offset, +} from "./cluster-adapter"; diff --git a/package-lock.json b/package-lock.json index e4dbfec..95070a2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,23 +1,28 @@ { "name": "socket.io-adapter", - "version": "2.5.1", + "version": "2.5.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "socket.io-adapter", - "version": "2.5.1", + "version": "2.5.4", "license": "MIT", "dependencies": { - "ws": "~8.11.0" + "debug": "~4.3.4", + "ws": "~8.17.1" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/expect.js": "^0.3.32", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", "mocha": "^10.2.0", "nyc": "^15.1.0", "prettier": "^2.8.1", + "socket.io": "^4.7.4", + "socket.io-client": "^4.7.4", "ts-node": "^10.9.1", "typescript": "^4.9.4" } @@ -373,6 +378,12 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", + "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==", + "dev": true + }, "node_modules/@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -403,18 +414,67 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "node_modules/@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "dev": true + }, + "node_modules/@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "dependencies": { + "@types/ms": "*" + } + }, + "node_modules/@types/expect.js": { + "version": "0.3.32", + "resolved": "https://registry.npmjs.org/@types/expect.js/-/expect.js-0.3.32.tgz", + "integrity": "sha512-vUK0KSPtQTeANmOfiqsNNA/8hJ0xz8gOyB0ZhYRtoYOZBtZYir7ujNGr6GKw2hJAjltW0ocCNIGn9YxIXTT99Q==", + "dev": true + }, "node_modules/@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", "integrity": "sha512-/fvYntiO1GeICvqbQ3doGDIP97vWmvFt83GKguJ6prmQM2iXZfFcq6YE8KteFyRtX2/h5Hf91BYvPodJKFYv5Q==", "dev": true }, + "node_modules/@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "node_modules/@types/node": { "version": "14.11.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.11.2.tgz", "integrity": "sha512-jiE3QIxJ8JLNcb1Ps6rDbysDhN4xa8DJJvuC9prr6w+1tIh+QAbYyNF3tyiZNLDBIuBCf4KEcV2UvQm/V60xfA==", "dev": true }, + "node_modules/accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "dev": true, + "dependencies": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/acorn": { "version": "8.8.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", @@ -522,6 +582,15 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "node_modules/base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "dev": true, + "engines": { + "node": "^4.5.0 || >= 5.9" + } + }, "node_modules/binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -742,6 +811,28 @@ "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", "dev": true }, + "node_modules/cookie": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", + "integrity": "sha512-aSWTXFzaKWkvHO1Ny/s+ePFpvKsPnjc551iI41v3ny/ow6tBG5Vd+FuqGNhh1LxOmVzOlGUriIlOaokOvhaStA==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "dev": true, + "dependencies": { + "object-assign": "^4", + "vary": "^1" + }, + "engines": { + "node": ">= 0.10" + } + }, "node_modules/create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -766,7 +857,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "dependencies": { "ms": "2.1.2" }, @@ -815,6 +905,49 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "node_modules/engine.io": { + "version": "6.5.5", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.5.tgz", + "integrity": "sha512-C5Pn8Wk+1vKBoHghJODM63yk8MvrO9EWZUfkAt5HAqIgPE4/8FF0PEGHXtEd40l223+cE5ABWuPzm38PHFXfMA==", + "dev": true, + "dependencies": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.4.1", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/engine.io-client": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.4.tgz", + "integrity": "sha512-GeZeeRjpD2qf49cZQ0Wvh/8NJNfeXkXXcoGh+F77oEAgo9gUHwT1fCRxSNU+YEEaysOJTnsFHmM5oAcPy4ntvQ==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -855,7 +988,7 @@ "node_modules/expect.js": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/expect.js/-/expect.js-0.3.1.tgz", - "integrity": "sha1-sKWaDS7/VDdUTr8M6qYBWEHQm1s=", + "integrity": "sha512-okDF/FAPEul1ZFLae4hrgpIqAeapoo5TRdcg/lD0iN9S3GWrBFIJwNezGH1DMtIz+RxU4RrFmMq7WUUvDg3J6A==", "dev": true }, "node_modules/fill-range": { @@ -1462,6 +1595,27 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dev": true, + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -1607,8 +1761,7 @@ "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/nanoid": { "version": "3.3.3", @@ -1622,6 +1775,15 @@ "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" } }, + "node_modules/negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -1830,6 +1992,15 @@ "node": ">=6" } }, + "node_modules/object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "dev": true, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -2196,6 +2367,82 @@ "integrity": "sha512-VUJ49FC8U1OxwZLxIbTTrDvLnf/6TDgxZcK8wxR8zs13xpx7xbG60ndBlhNrFi2EMuFRoeDoJO7wthSLq42EjA==", "dev": true }, + "node_modules/socket.io": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "dev": true, + "dependencies": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.5.2", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/socket.io-adapter": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", + "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "dev": true, + "dependencies": { + "ws": "~8.11.0" + } + }, + "node_modules/socket.io-adapter/node_modules/ws": { + "version": "8.11.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", + "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "dev": true, + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, + "node_modules/socket.io-client": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.4.tgz", + "integrity": "sha512-wh+OkeF0rAVCrABWQBaEjLfb7DVPotMbu0cgWgyR0v6eA4EoVnAwcIeIbcdTE3GT/H3kbdLl7OoH2+asoDRIIg==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/source-map": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", @@ -2421,6 +2668,15 @@ "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", "dev": true }, + "node_modules/vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "dev": true, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -2517,15 +2773,15 @@ } }, "node_modules/ws": { - "version": "8.11.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", - "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", "engines": { "node": ">=10.0.0" }, "peerDependencies": { "bufferutil": "^4.0.1", - "utf-8-validate": "^5.0.2" + "utf-8-validate": ">=5.0.2" }, "peerDependenciesMeta": { "bufferutil": { @@ -2536,6 +2792,15 @@ } } }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", @@ -2940,6 +3205,12 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "@socket.io/component-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", + "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==", + "dev": true + }, "@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -2970,18 +3241,64 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "dev": true + }, + "@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "requires": { + "@types/node": "*" + } + }, + "@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "requires": { + "@types/ms": "*" + } + }, + "@types/expect.js": { + "version": "0.3.32", + "resolved": "https://registry.npmjs.org/@types/expect.js/-/expect.js-0.3.32.tgz", + "integrity": "sha512-vUK0KSPtQTeANmOfiqsNNA/8hJ0xz8gOyB0ZhYRtoYOZBtZYir7ujNGr6GKw2hJAjltW0ocCNIGn9YxIXTT99Q==", + "dev": true + }, "@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", "integrity": "sha512-/fvYntiO1GeICvqbQ3doGDIP97vWmvFt83GKguJ6prmQM2iXZfFcq6YE8KteFyRtX2/h5Hf91BYvPodJKFYv5Q==", "dev": true }, + "@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "@types/node": { "version": "14.11.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.11.2.tgz", "integrity": "sha512-jiE3QIxJ8JLNcb1Ps6rDbysDhN4xa8DJJvuC9prr6w+1tIh+QAbYyNF3tyiZNLDBIuBCf4KEcV2UvQm/V60xfA==", "dev": true }, + "accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "dev": true, + "requires": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + } + }, "acorn": { "version": "8.8.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", @@ -3065,6 +3382,12 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "dev": true + }, "binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -3242,6 +3565,22 @@ } } }, + "cookie": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", + "integrity": "sha512-aSWTXFzaKWkvHO1Ny/s+ePFpvKsPnjc551iI41v3ny/ow6tBG5Vd+FuqGNhh1LxOmVzOlGUriIlOaokOvhaStA==", + "dev": true + }, + "cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "dev": true, + "requires": { + "object-assign": "^4", + "vary": "^1" + } + }, "create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -3263,7 +3602,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "requires": { "ms": "2.1.2" } @@ -3295,6 +3633,43 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "engine.io": { + "version": "6.5.5", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.5.tgz", + "integrity": "sha512-C5Pn8Wk+1vKBoHghJODM63yk8MvrO9EWZUfkAt5HAqIgPE4/8FF0PEGHXtEd40l223+cE5ABWuPzm38PHFXfMA==", + "dev": true, + "requires": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.4.1", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1" + } + }, + "engine.io-client": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.4.tgz", + "integrity": "sha512-GeZeeRjpD2qf49cZQ0Wvh/8NJNfeXkXXcoGh+F77oEAgo9gUHwT1fCRxSNU+YEEaysOJTnsFHmM5oAcPy4ntvQ==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true + }, "es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -3322,7 +3697,7 @@ "expect.js": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/expect.js/-/expect.js-0.3.1.tgz", - "integrity": "sha1-sKWaDS7/VDdUTr8M6qYBWEHQm1s=", + "integrity": "sha512-okDF/FAPEul1ZFLae4hrgpIqAeapoo5TRdcg/lD0iN9S3GWrBFIJwNezGH1DMtIz+RxU4RrFmMq7WUUvDg3J6A==", "dev": true }, "fill-range": { @@ -3760,6 +4135,21 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "dev": true + }, + "mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dev": true, + "requires": { + "mime-db": "1.52.0" + } + }, "minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -3869,8 +4259,7 @@ "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "nanoid": { "version": "3.3.3", @@ -3878,6 +4267,12 @@ "integrity": "sha512-p1sjXuopFs0xg+fPASzQ28agW1oHD7xDsd9Xkf3T15H3c/cifrFHVwrh74PdoklAPi+i7MdRsE47vm2r6JoB+w==", "dev": true }, + "negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "dev": true + }, "node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -4043,6 +4438,12 @@ } } }, + "object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "dev": true + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -4301,6 +4702,61 @@ "integrity": "sha512-VUJ49FC8U1OxwZLxIbTTrDvLnf/6TDgxZcK8wxR8zs13xpx7xbG60ndBlhNrFi2EMuFRoeDoJO7wthSLq42EjA==", "dev": true }, + "socket.io": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "dev": true, + "requires": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.5.2", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + } + }, + "socket.io-adapter": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", + "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "dev": true, + "requires": { + "ws": "~8.11.0" + }, + "dependencies": { + "ws": { + "version": "8.11.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", + "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "dev": true, + "requires": {} + } + } + }, + "socket.io-client": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.4.tgz", + "integrity": "sha512-wh+OkeF0rAVCrABWQBaEjLfb7DVPotMbu0cgWgyR0v6eA4EoVnAwcIeIbcdTE3GT/H3kbdLl7OoH2+asoDRIIg==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + } + }, + "socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + } + }, "source-map": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", @@ -4456,6 +4912,12 @@ "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", "dev": true }, + "vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "dev": true + }, "which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -4533,11 +4995,17 @@ } }, "ws": { - "version": "8.11.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", - "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", "requires": {} }, + "xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true + }, "y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", diff --git a/package.json b/package.json index 62e9071..dbb055e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io-adapter", - "version": "2.5.2", + "version": "2.5.5", "license": "MIT", "repository": { "type": "git", @@ -13,20 +13,25 @@ "types": "./dist/index.d.ts", "description": "default socket.io in-memory adapter", "dependencies": { - "ws": "~8.11.0" + "debug": "~4.3.4", + "ws": "~8.17.1" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/expect.js": "^0.3.32", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", "mocha": "^10.2.0", "nyc": "^15.1.0", "prettier": "^2.8.1", + "socket.io": "^4.7.4", + "socket.io-client": "^4.7.4", "ts-node": "^10.9.1", "typescript": "^4.9.4" }, "scripts": { - "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts", + "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/*.ts", "format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'", "format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'", "prepack": "tsc" diff --git a/test/cluster-adapter.ts b/test/cluster-adapter.ts new file mode 100644 index 0000000..350ecda --- /dev/null +++ b/test/cluster-adapter.ts @@ -0,0 +1,440 @@ +import { createServer } from "http"; +import { Server, Socket as ServerSocket } from "socket.io"; +import { io as ioc, Socket as ClientSocket } from "socket.io-client"; +import expect = require("expect.js"); +import type { AddressInfo } from "net"; +import { times, shouldNotHappen, sleep } from "./util"; +import { + ClusterAdapterWithHeartbeat, + type ClusterMessage, + type ClusterResponse, +} from "../lib"; +import { EventEmitter } from "events"; + +const NODES_COUNT = 3; + +class EventEmitterAdapter extends ClusterAdapterWithHeartbeat { + private offset = 1; + + constructor(nsp, readonly eventBus) { + super(nsp, {}); + this.eventBus.on("message", (message) => { + this.onMessage(message as ClusterMessage); + }); + } + + protected doPublish(message: ClusterMessage): Promise { + this.eventBus.emit("message", message); + return Promise.resolve(String(++this.offset)); + } + + protected doPublishResponse( + requesterUid: string, + response: ClusterResponse + ): Promise { + this.eventBus.emit("message", response); + return Promise.resolve(); + } +} + +describe("cluster adapter", () => { + let servers: Server[], + serverSockets: ServerSocket[], + clientSockets: ClientSocket[]; + + beforeEach((done) => { + servers = []; + serverSockets = []; + clientSockets = []; + + const eventBus = new EventEmitter(); + for (let i = 1; i <= NODES_COUNT; i++) { + const httpServer = createServer(); + const io = new Server(httpServer); + // @ts-ignore + io.adapter(function (nsp) { + return new EventEmitterAdapter(nsp, eventBus); + }); + httpServer.listen(() => { + const port = (httpServer.address() as AddressInfo).port; + const clientSocket = ioc(`http://localhost:${port}`); + + io.on("connection", async (socket) => { + clientSockets.push(clientSocket); + serverSockets.push(socket); + servers.push(io); + if (servers.length === NODES_COUNT) { + // ensure all nodes know each other + servers[0].emit("ping"); + servers[1].emit("ping"); + servers[2].emit("ping"); + + done(); + } + }); + }); + } + }); + + afterEach(() => { + servers.forEach((server) => { + // @ts-ignore + server.httpServer.close(); + server.of("/").adapter.close(); + }); + clientSockets.forEach((socket) => { + socket.disconnect(); + }); + }); + + describe("broadcast", function () { + it("broadcasts to all clients", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("test", (arg1, arg2, arg3) => { + expect(arg1).to.eql(1); + expect(arg2).to.eql("2"); + expect(Buffer.isBuffer(arg3)).to.be(true); + partialDone(); + }); + }); + + servers[0].emit("test", 1, "2", Buffer.from([3, 4])); + }); + + it("broadcasts to all clients in a namespace", (done) => { + const partialDone = times(3, () => { + servers.forEach((server) => server.of("/custom").adapter.close()); + done(); + }); + + servers.forEach((server) => server.of("/custom")); + + const onConnect = times(3, async () => { + servers[0].of("/custom").emit("test"); + }); + + clientSockets.forEach((clientSocket) => { + const socket = clientSocket.io.socket("/custom"); + socket.on("connect", onConnect); + socket.on("test", () => { + socket.disconnect(); + partialDone(); + }); + }); + }); + + it("broadcasts to all clients in a room", (done) => { + serverSockets[1].join("room1"); + + clientSockets[0].on("test", shouldNotHappen(done)); + clientSockets[1].on("test", () => done()); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].to("room1").emit("test"); + }); + + it("broadcasts to all clients except in room", (done) => { + const partialDone = times(2, done); + serverSockets[1].join("room1"); + + clientSockets[0].on("test", () => partialDone()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", () => partialDone()); + + servers[0].of("/").except("room1").emit("test"); + }); + + it("broadcasts to local clients only", (done) => { + clientSockets[0].on("test", () => done()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].local.emit("test"); + }); + + it("broadcasts with multiple acknowledgements", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (cb) => cb(3)); + + servers[0].timeout(50).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + expect(responses).to.contain(3); + + setTimeout(() => { + // @ts-ignore + expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0); + + done(); + }, 50); + }); + }); + + it("broadcasts with multiple acknowledgements (binary content)", (done) => { + clientSockets[0].on("test", (cb) => cb(Buffer.from([1]))); + clientSockets[1].on("test", (cb) => cb(Buffer.from([2]))); + clientSockets[2].on("test", (cb) => cb(Buffer.from([3]))); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + responses.forEach((response) => { + expect(Buffer.isBuffer(response)).to.be(true); + }); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (no client)", (done) => { + servers[0] + .to("abc") + .timeout(500) + .emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.eql([]); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (timeout)", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (_cb) => { + // do nothing + }); + + servers[0].timeout(50).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be.an(Error); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + + done(); + }); + }); + + it("broadcasts with a single acknowledgement (local)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const response = await serverSockets[1].emitWithAck("test"); + expect(response).to.eql(2); + }); + + it("broadcasts with a single acknowledgement (remote)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const sockets = await servers[0].in(serverSockets[1].id).fetchSockets(); + expect(sockets.length).to.eql(1); + + const response = await sockets[0].timeout(500).emitWithAck("test"); + expect(response).to.eql(2); + }); + }); + + describe("socketsJoin", () => { + it("makes all socket instances join the specified room", async () => { + servers[0].socketsJoin("room1"); + + await sleep(); + + expect(serverSockets[0].rooms.has("room1")).to.be(true); + expect(serverSockets[1].rooms.has("room1")).to.be(true); + expect(serverSockets[2].rooms.has("room1")).to.be(true); + }); + + it("makes the matching socket instances join the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].in("room1").socketsJoin("room2"); + + await sleep(); + + expect(serverSockets[0].rooms.has("room2")).to.be(true); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance join the specified room", async () => { + servers[0].in(serverSockets[1].id).socketsJoin("room3"); + + expect(serverSockets[0].rooms.has("room3")).to.be(false); + expect(serverSockets[1].rooms.has("room3")).to.be(true); + expect(serverSockets[2].rooms.has("room3")).to.be(false); + }); + }); + + describe("socketsLeave", () => { + it("makes all socket instances leave the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].socketsLeave("room1"); + + await sleep(); + + expect(serverSockets[0].rooms.has("room1")).to.be(false); + expect(serverSockets[1].rooms.has("room1")).to.be(false); + expect(serverSockets[2].rooms.has("room1")).to.be(false); + }); + + it("makes the matching socket instances leave the specified room", async () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join(["room1", "room2"]); + serverSockets[2].join(["room2"]); + + servers[0].in("room1").socketsLeave("room2"); + + await sleep(); + + expect(serverSockets[0].rooms.has("room2")).to.be(false); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance leave the specified room", async () => { + serverSockets[0].join("room3"); + serverSockets[1].join("room3"); + serverSockets[2].join("room3"); + + servers[0].in(serverSockets[1].id).socketsLeave("room3"); + + expect(serverSockets[0].rooms.has("room3")).to.be(true); + expect(serverSockets[1].rooms.has("room3")).to.be(false); + expect(serverSockets[2].rooms.has("room3")).to.be(true); + }); + }); + + describe("disconnectSockets", () => { + it("makes all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", (reason) => { + expect(reason).to.eql("io server disconnect"); + partialDone(); + }); + }); + + servers[0].disconnectSockets(true); + }); + + it("sends a packet before all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", shouldNotHappen(done)); + + clientSocket.on("bye", () => { + clientSocket.off("disconnect"); + clientSocket.on("disconnect", partialDone); + }); + }); + + servers[0].emit("bye"); + servers[0].disconnectSockets(true); + }); + }); + + describe("fetchSockets", () => { + it("returns all socket instances", async () => { + const sockets = await servers[0].fetchSockets(); + + expect(sockets).to.be.an(Array); + expect(sockets).to.have.length(3); + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); // clean up + }); + + it("returns a single socket instance", async () => { + serverSockets[1].data = "test" as any; + + const [remoteSocket] = await servers[0] + .in(serverSockets[1].id) + .fetchSockets(); + + expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); + expect(remoteSocket.data).to.eql("test"); + expect(remoteSocket.rooms.size).to.eql(1); + }); + + it("returns only local socket instances", async () => { + const sockets = await servers[0].local.fetchSockets(); + + expect(sockets).to.have.length(1); + }); + }); + + describe("serverSideEmit", () => { + it("sends an event to other server instances", (done) => { + const partialDone = times(2, done); + + servers[0].on("hello", shouldNotHappen(done)); + + servers[1].on("hello", (arg1, arg2, arg3) => { + expect(arg1).to.eql("world"); + expect(arg2).to.eql(1); + expect(arg3).to.eql("2"); + partialDone(); + }); + + servers[2].of("/").on("hello", () => partialDone()); + + servers[0].serverSideEmit("hello", "world", 1, "2"); + }); + + it("sends an event and receives a response from the other server instances", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", (cb) => cb("3")); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + expect(response).to.contain("3"); + done(); + }); + }); + + it("sends an event but timeout if one server does not respond", function (done) { + this.timeout(6000); + + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + // do nothing + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err.message).to.be("timeout reached: missing 1 responses"); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + + it("succeeds even if an instance leaves the cluster", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + servers[2].of("/").adapter.close(); + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + }); +}); diff --git a/test/util.ts b/test/util.ts new file mode 100644 index 0000000..ecdf6df --- /dev/null +++ b/test/util.ts @@ -0,0 +1,19 @@ +export function times(count: number, fn: () => void) { + let i = 0; + return () => { + i++; + if (i === count) { + fn(); + } else if (i > count) { + throw new Error(`too many calls: ${i} instead of ${count}`); + } + }; +} + +export function shouldNotHappen(done) { + return () => done(new Error("should not happen")); +} + +export function sleep() { + return new Promise((resolve) => process.nextTick(resolve)); +}