diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 675f6ba..1d69694 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,14 +17,14 @@ jobs: strategy: matrix: node-version: - - 18 + - 20 steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v3 + uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index b299f40..7e5447a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # History +- [2.5.3](#253-2024-02-21) (Feb 2024) - [2.5.2](#252-2023-01-12) (Jan 2023) - [2.5.1](#251-2023-01-06) (Jan 2023) - [2.5.0](#250-2023-01-06) (Jan 2023) @@ -19,6 +20,49 @@ # Release notes +## [2.5.3](https://github.com/socketio/socket.io-adapter/compare/2.5.2...2.5.3) (2024-02-21) + +Two abstract classes were imported from the [Redis adapter repository](https://github.com/socketio/socket.io-redis-adapter/blob/bd32763043a2eb79a21dffd8820f20e598348adf/lib/cluster-adapter.ts): + +- the `ClusterAdapter` class, which manages the messages sent between the server instances of the cluster +- the `ClusterAdapterWithHeartbeat` class, which extends the `ClusterAdapter` and adds a heartbeat mechanism in order to check the healthiness of the other instances + +Other adapters can then just extend those classes and only have to implement the pub/sub mechanism (and not the internal chit-chat protocol): + +```js +class MyAdapter extends ClusterAdapterWithHeartbeat { + constructor(nsp, pubSub, opts) { + super(nsp, opts); + this.pubSub = pubSub; + pubSub.subscribe("main-channel", (message) => this.onMessage(message)); + pubSub.subscribe("specific-channel#" + this.uid, (response) => this.onResponse(response)); + } + + doPublish(message) { + return this.pubSub.publish("main-channel", message); + } + + doPublishResponse(requesterUid, response) { + return this.pubSub.publish("specific-channel#" + requesterUid, response); + } +} +``` + +Besides, the number of "timeout reached: only x responses received out of y" errors (which can happen when a server instance leaves the cluster) should be greatly reduced by [this commit](https://github.com/socketio/socket.io-adapter/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f). + + +### Bug Fixes + +* **cluster:** fix count in fetchSockets() method ([80af4e9](https://github.com/socketio/socket.io-adapter/commit/80af4e939c9caf89b0234ba1e676a3887c8d0ce6)) +* **cluster:** notify the other nodes when closing ([0e23ff0](https://github.com/socketio/socket.io-adapter/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f)) + + +### Performance Improvements + +* **cluster:** use timer.refresh() ([d99a71b](https://github.com/socketio/socket.io-adapter/commit/d99a71b5588f53f0b181eee989ab2ac939f965db)) + + + ## [2.5.2](https://github.com/socketio/socket.io-adapter/compare/2.5.1...2.5.2) (2023-01-12) The `ws` dependency was moved from `peerDependencies` to `dependencies`, in order to prevent issues like [this](https://github.com/socketio/socket.io-redis-adapter/issues/478). diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts new file mode 100644 index 0000000..8858d0e --- /dev/null +++ b/lib/cluster-adapter.ts @@ -0,0 +1,1001 @@ +import { + Adapter, + type BroadcastFlags, + type BroadcastOptions, + type Room, +} from "./index"; +import { debug as debugModule } from "debug"; +import { randomBytes } from "crypto"; + +const debug = debugModule("socket.io-adapter"); +const EMITTER_UID = "emitter"; +const DEFAULT_TIMEOUT = 5000; + +function randomId() { + return randomBytes(8).toString("hex"); +} + +type DistributiveOmit = T extends any + ? Omit + : never; + +/** + * The unique ID of a server + */ +export type ServerId = string; + +/** + * The unique ID of a message (for the connection state recovery feature) + */ +export type Offset = string; + +export interface ClusterAdapterOptions { + /** + * The number of ms between two heartbeats. + * @default 5_000 + */ + heartbeatInterval?: number; + /** + * The number of ms without heartbeat before we consider a node down. + * @default 10_000 + */ + heartbeatTimeout?: number; +} + +export enum MessageType { + INITIAL_HEARTBEAT = 1, + HEARTBEAT, + BROADCAST, + SOCKETS_JOIN, + SOCKETS_LEAVE, + DISCONNECT_SOCKETS, + FETCH_SOCKETS, + FETCH_SOCKETS_RESPONSE, + SERVER_SIDE_EMIT, + SERVER_SIDE_EMIT_RESPONSE, + BROADCAST_CLIENT_COUNT, + BROADCAST_ACK, + ADAPTER_CLOSE, +} + +export type ClusterMessage = { + uid: ServerId; + nsp: string; +} & ( + | { + type: + | MessageType.INITIAL_HEARTBEAT + | MessageType.HEARTBEAT + | MessageType.ADAPTER_CLOSE; + } + | { + type: MessageType.BROADCAST; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + packet: unknown; + requestId?: string; + }; + } + | { + type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + rooms: string[]; + }; + } + | { + type: MessageType.DISCONNECT_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + close?: boolean; + }; + } + | { + type: MessageType.FETCH_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + requestId: string; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT; + data: { + requestId?: string; + packet: any[]; + }; + } +); + +interface ClusterRequest { + type: MessageType; + resolve: Function; + timeout: NodeJS.Timeout; + expected: number; + current: number; + responses: any[]; +} + +export type ClusterResponse = { + uid: ServerId; + nsp: string; +} & ( + | { + type: MessageType.FETCH_SOCKETS_RESPONSE; + data: { + requestId: string; + sockets: unknown[]; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE; + data: { + requestId: string; + packet: unknown; + }; + } + | { + type: MessageType.BROADCAST_CLIENT_COUNT; + data: { + requestId: string; + clientCount: number; + }; + } + | { + type: MessageType.BROADCAST_ACK; + data: { + requestId: string; + packet: unknown; + }; + } +); + +interface ClusterAckRequest { + clientCountCallback: (clientCount: number) => void; + ack: (...args: any[]) => void; +} + +function encodeOptions(opts: BroadcastOptions) { + return { + rooms: [...opts.rooms], + except: [...opts.except], + flags: opts.flags, + }; +} + +function decodeOptions(opts): BroadcastOptions { + return { + rooms: new Set(opts.rooms), + except: new Set(opts.except), + flags: opts.flags, + }; +} + +/** + * A cluster-ready adapter. Any extending class must: + * + * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse} + * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse} + */ +export abstract class ClusterAdapter extends Adapter { + protected readonly uid: ServerId; + + private requests: Map = new Map(); + private ackRequests: Map = new Map(); + + protected constructor(nsp) { + super(nsp); + this.uid = randomId(); + } + + /** + * Called when receiving a message from another member of the cluster. + * + * @param message + * @param offset + * @protected + */ + protected onMessage(message: ClusterMessage, offset?: string) { + if (message.uid === this.uid) { + return debug("[%s] ignore message from self", this.uid); + } + + debug( + "[%s] new event of type %d from %s", + this.uid, + message.type, + message.uid + ); + + switch (message.type) { + case MessageType.BROADCAST: { + const withAck = message.data.requestId !== undefined; + if (withAck) { + super.broadcastWithAck( + message.data.packet, + decodeOptions(message.data.opts), + (clientCount) => { + debug( + "[%s] waiting for %d client acknowledgements", + this.uid, + clientCount + ); + this.publishResponse(message.uid, { + type: MessageType.BROADCAST_CLIENT_COUNT, + data: { + requestId: message.data.requestId, + clientCount, + }, + }); + }, + (arg) => { + debug( + "[%s] received acknowledgement with value %j", + this.uid, + arg + ); + this.publishResponse(message.uid, { + type: MessageType.BROADCAST_ACK, + data: { + requestId: message.data.requestId, + packet: arg, + }, + }); + } + ); + } else { + const packet = message.data.packet; + const opts = decodeOptions(message.data.opts); + + this.addOffsetIfNecessary(packet, opts, offset); + + super.broadcast(packet, opts); + } + break; + } + + case MessageType.SOCKETS_JOIN: + super.addSockets(decodeOptions(message.data.opts), message.data.rooms); + break; + + case MessageType.SOCKETS_LEAVE: + super.delSockets(decodeOptions(message.data.opts), message.data.rooms); + break; + + case MessageType.DISCONNECT_SOCKETS: + super.disconnectSockets( + decodeOptions(message.data.opts), + message.data.close + ); + break; + + case MessageType.FETCH_SOCKETS: { + debug( + "[%s] calling fetchSockets with opts %j", + this.uid, + message.data.opts + ); + super + .fetchSockets(decodeOptions(message.data.opts)) + .then((localSockets) => { + this.publishResponse(message.uid, { + type: MessageType.FETCH_SOCKETS_RESPONSE, + data: { + requestId: message.data.requestId, + sockets: localSockets.map((socket) => { + // remove sessionStore from handshake, as it may contain circular references + const { sessionStore, ...handshake } = socket.handshake; + return { + id: socket.id, + handshake, + rooms: [...socket.rooms], + data: socket.data, + }; + }), + }, + }); + }); + break; + } + + case MessageType.SERVER_SIDE_EMIT: { + const packet = message.data.packet; + const withAck = message.data.requestId !== undefined; + if (!withAck) { + this.nsp._onServerSideEmit(packet); + return; + } + let called = false; + const callback = (arg: any) => { + // only one argument is expected + if (called) { + return; + } + called = true; + debug("[%s] calling acknowledgement with %j", this.uid, arg); + this.publishResponse(message.uid, { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE, + data: { + requestId: message.data.requestId, + packet: arg, + }, + }); + }; + + this.nsp._onServerSideEmit([...packet, callback]); + break; + } + + // @ts-ignore + case MessageType.BROADCAST_CLIENT_COUNT: + // @ts-ignore + case MessageType.BROADCAST_ACK: + // @ts-ignore + case MessageType.FETCH_SOCKETS_RESPONSE: + // @ts-ignore + case MessageType.SERVER_SIDE_EMIT_RESPONSE: + // extending classes may not make a distinction between a ClusterMessage and a ClusterResponse payload and may + // always call the onMessage() method + this.onResponse(message); + break; + + default: + debug("[%s] unknown message type: %s", this.uid, message.type); + } + } + + /** + * Called when receiving a response from another member of the cluster. + * + * @param response + * @protected + */ + protected onResponse(response: ClusterResponse) { + const requestId = response.data.requestId; + + debug( + "[%s] received response %s to request %s", + this.uid, + response.type, + requestId + ); + + switch (response.type) { + case MessageType.BROADCAST_CLIENT_COUNT: { + this.ackRequests + .get(requestId) + ?.clientCountCallback(response.data.clientCount); + break; + } + + case MessageType.BROADCAST_ACK: { + this.ackRequests.get(requestId)?.ack(response.data.packet); + break; + } + + case MessageType.FETCH_SOCKETS_RESPONSE: { + const request = this.requests.get(requestId); + + if (!request) { + return; + } + + request.current++; + response.data.sockets.forEach((socket) => + request.responses.push(socket) + ); + + if (request.current === request.expected) { + clearTimeout(request.timeout); + request.resolve(request.responses); + this.requests.delete(requestId); + } + break; + } + + case MessageType.SERVER_SIDE_EMIT_RESPONSE: { + const request = this.requests.get(requestId); + + if (!request) { + return; + } + + request.current++; + request.responses.push(response.data.packet); + + if (request.current === request.expected) { + clearTimeout(request.timeout); + request.resolve(null, request.responses); + this.requests.delete(requestId); + } + break; + } + + default: + // @ts-ignore + debug("[%s] unknown response type: %s", this.uid, response.type); + } + } + + override async broadcast(packet: any, opts: BroadcastOptions) { + const onlyLocal = opts.flags?.local; + + if (!onlyLocal) { + try { + const offset = await this.publishAndReturnOffset({ + type: MessageType.BROADCAST, + data: { + packet, + opts: encodeOptions(opts), + }, + }); + this.addOffsetIfNecessary(packet, opts, offset); + } catch (e) { + return debug( + "[%s] error while broadcasting message: %s", + this.uid, + e.message + ); + } + } + + super.broadcast(packet, opts); + } + + /** + * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it + * reconnects after a temporary disconnection. + * + * @param packet + * @param opts + * @param offset + * @private + */ + private addOffsetIfNecessary( + packet: any, + opts: BroadcastOptions, + offset: Offset + ) { + if (!this.nsp.server.opts.connectionStateRecovery) { + return; + } + const isEventPacket = packet.type === 2; + // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and + // restored on another server upon reconnection + const withoutAcknowledgement = packet.id === undefined; + const notVolatile = opts.flags?.volatile === undefined; + + if (isEventPacket && withoutAcknowledgement && notVolatile) { + packet.data.push(offset); + } + } + + override broadcastWithAck( + packet: any, + opts: BroadcastOptions, + clientCountCallback: (clientCount: number) => void, + ack: (...args: any[]) => void + ) { + const onlyLocal = opts?.flags?.local; + if (!onlyLocal) { + const requestId = randomId(); + + this.ackRequests.set(requestId, { + clientCountCallback, + ack, + }); + + this.publish({ + type: MessageType.BROADCAST, + data: { + packet, + requestId, + opts: encodeOptions(opts), + }, + }); + + // we have no way to know at this level whether the server has received an acknowledgement from each client, so we + // will simply clean up the ackRequests map after the given delay + setTimeout(() => { + this.ackRequests.delete(requestId); + }, opts.flags!.timeout); + } + + super.broadcastWithAck(packet, opts, clientCountCallback, ack); + } + + override addSockets(opts: BroadcastOptions, rooms: Room[]) { + super.addSockets(opts, rooms); + + const onlyLocal = opts.flags?.local; + if (onlyLocal) { + return; + } + + this.publish({ + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } + + override delSockets(opts: BroadcastOptions, rooms: Room[]) { + super.delSockets(opts, rooms); + + const onlyLocal = opts.flags?.local; + if (onlyLocal) { + return; + } + + this.publish({ + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } + + override disconnectSockets(opts: BroadcastOptions, close: boolean) { + super.disconnectSockets(opts, close); + + const onlyLocal = opts.flags?.local; + if (onlyLocal) { + return; + } + + this.publish({ + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions(opts), + close, + }, + }); + } + + async fetchSockets(opts: BroadcastOptions): Promise { + const [localSockets, serverCount] = await Promise.all([ + super.fetchSockets(opts), + this.serverCount(), + ]); + const expectedResponseCount = serverCount - 1; + + if (opts.flags?.local || expectedResponseCount <= 0) { + return localSockets; + } + + const requestId = randomId(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + const storedRequest = this.requests.get(requestId); + if (storedRequest) { + reject( + new Error( + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + ) + ); + this.requests.delete(requestId); + } + }, opts.flags.timeout || DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.FETCH_SOCKETS, + resolve, + timeout, + current: 0, + expected: expectedResponseCount, + responses: localSockets, + }; + this.requests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.FETCH_SOCKETS, + data: { + opts: encodeOptions(opts), + requestId, + }, + }); + }); + } + + override async serverSideEmit(packet: any[]) { + const withAck = typeof packet[packet.length - 1] === "function"; + + if (!withAck) { + return this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet, + }, + }); + } + + const ack = packet.pop(); + const expectedResponseCount = (await this.serverCount()) - 1; + + debug( + '[%s] waiting for %d responses to "serverSideEmit" request', + this.uid, + expectedResponseCount + ); + + if (expectedResponseCount <= 0) { + return ack(null, []); + } + + const requestId = randomId(); + + const timeout = setTimeout(() => { + const storedRequest = this.requests.get(requestId); + if (storedRequest) { + ack( + new Error( + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + ), + storedRequest.responses + ); + this.requests.delete(requestId); + } + }, DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.SERVER_SIDE_EMIT, + resolve: ack, + timeout, + current: 0, + expected: expectedResponseCount, + responses: [], + }; + this.requests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + requestId, // the presence of this attribute defines whether an acknowledgement is needed + packet, + }, + }); + } + + protected publish( + message: DistributiveOmit + ): void { + this.publishAndReturnOffset(message).catch((err) => { + debug("[%s] error while publishing message: %s", this.uid, err); + }); + } + + protected publishAndReturnOffset( + message: DistributiveOmit + ) { + (message as ClusterMessage).uid = this.uid; + (message as ClusterMessage).nsp = this.nsp.name; + return this.doPublish(message as ClusterMessage); + } + + /** + * Send a message to the other members of the cluster. + * + * @param message + * @protected + * @return an offset, if applicable + */ + protected abstract doPublish(message: ClusterMessage): Promise; + + protected publishResponse( + requesterUid: ServerId, + response: Omit + ) { + (response as ClusterResponse).uid = this.uid; + (response as ClusterResponse).nsp = this.nsp.name; + this.doPublishResponse(requesterUid, response as ClusterResponse).catch( + (err) => { + debug("[%s] error while publishing response: %s", this.uid, err); + } + ); + } + + /** + * Send a response to the given member of the cluster. + * + * @param requesterUid + * @param response + * @protected + */ + protected abstract doPublishResponse( + requesterUid: ServerId, + response: ClusterResponse + ): Promise; +} + +interface CustomClusterRequest { + type: MessageType; + resolve: Function; + timeout: NodeJS.Timeout; + missingUids: Set; + responses: any[]; +} + +export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { + private readonly _opts: Required; + + private heartbeatTimer: NodeJS.Timeout; + private nodesMap: Map = new Map(); // uid => timestamp of last message + private readonly cleanupTimer: NodeJS.Timeout | undefined; + private customRequests: Map = new Map(); + + protected constructor(nsp, opts: ClusterAdapterOptions) { + super(nsp); + this._opts = Object.assign( + { + heartbeatInterval: 5_000, + heartbeatTimeout: 10_000, + }, + opts + ); + this.cleanupTimer = setInterval(() => { + const now = Date.now(); + this.nodesMap.forEach((lastSeen, uid) => { + const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; + if (nodeSeemsDown) { + debug("[%s] node %s seems down", this.uid, uid); + this.removeNode(uid); + } + }); + }, 1_000); + } + + override init() { + this.publish({ + type: MessageType.INITIAL_HEARTBEAT, + }); + } + + private scheduleHeartbeat() { + if (this.heartbeatTimer) { + this.heartbeatTimer.refresh(); + } else { + this.heartbeatTimer = setTimeout(() => { + this.publish({ + type: MessageType.HEARTBEAT, + }); + }, this._opts.heartbeatInterval); + } + } + + override close() { + this.publish({ + type: MessageType.ADAPTER_CLOSE, + }); + clearTimeout(this.heartbeatTimer); + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + } + } + + override onMessage(message: ClusterMessage, offset?: string) { + if (message.uid === this.uid) { + return debug("[%s] ignore message from self", this.uid); + } + + if (message.uid && message.uid !== EMITTER_UID) { + // we track the UID of each sender, in order to know how many servers there are in the cluster + this.nodesMap.set(message.uid, Date.now()); + } + + debug( + "[%s] new event of type %d from %s", + this.uid, + message.type, + message.uid + ); + + switch (message.type) { + case MessageType.INITIAL_HEARTBEAT: + this.publish({ + type: MessageType.HEARTBEAT, + }); + break; + case MessageType.HEARTBEAT: + // nothing to do + break; + case MessageType.ADAPTER_CLOSE: + this.removeNode(message.uid); + break; + default: + super.onMessage(message, offset); + } + } + + override serverCount(): Promise { + return Promise.resolve(1 + this.nodesMap.size); + } + + override publish(message: DistributiveOmit) { + this.scheduleHeartbeat(); + + return super.publish(message); + } + + override async serverSideEmit(packet: any[]) { + const withAck = typeof packet[packet.length - 1] === "function"; + + if (!withAck) { + return this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet, + }, + }); + } + + const ack = packet.pop(); + const expectedResponseCount = this.nodesMap.size; + + debug( + '[%s] waiting for %d responses to "serverSideEmit" request', + this.uid, + expectedResponseCount + ); + + if (expectedResponseCount <= 0) { + return ack(null, []); + } + + const requestId = randomId(); + + const timeout = setTimeout(() => { + const storedRequest = this.customRequests.get(requestId); + if (storedRequest) { + ack( + new Error( + `timeout reached: missing ${storedRequest.missingUids.size} responses` + ), + storedRequest.responses + ); + this.customRequests.delete(requestId); + } + }, DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.SERVER_SIDE_EMIT, + resolve: ack, + timeout, + missingUids: new Set([...this.nodesMap.keys()]), + responses: [], + }; + this.customRequests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + requestId, // the presence of this attribute defines whether an acknowledgement is needed + packet, + }, + }); + } + + override async fetchSockets(opts: BroadcastOptions): Promise { + const [localSockets, serverCount] = await Promise.all([ + super.fetchSockets({ + rooms: opts.rooms, + except: opts.except, + flags: { + local: true, + }, + }), + this.serverCount(), + ]); + const expectedResponseCount = serverCount - 1; + + if (opts.flags?.local || expectedResponseCount <= 0) { + return localSockets as any[]; + } + + const requestId = randomId(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + const storedRequest = this.customRequests.get(requestId); + if (storedRequest) { + reject( + new Error( + `timeout reached: missing ${storedRequest.missingUids.size} responses` + ) + ); + this.customRequests.delete(requestId); + } + }, opts.flags.timeout || DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.FETCH_SOCKETS, + resolve, + timeout, + missingUids: new Set([...this.nodesMap.keys()]), + responses: localSockets as any[], + }; + this.customRequests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.FETCH_SOCKETS, + data: { + opts: encodeOptions(opts), + requestId, + }, + }); + }); + } + + override onResponse(response: ClusterResponse) { + const requestId = response.data.requestId; + + debug( + "[%s] received response %s to request %s", + this.uid, + response.type, + requestId + ); + + switch (response.type) { + case MessageType.FETCH_SOCKETS_RESPONSE: { + const request = this.customRequests.get(requestId); + + if (!request) { + return; + } + + (response.data.sockets as any[]).forEach((socket) => + request.responses.push(socket) + ); + + request.missingUids.delete(response.uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + request.resolve(request.responses); + this.customRequests.delete(requestId); + } + break; + } + + case MessageType.SERVER_SIDE_EMIT_RESPONSE: { + const request = this.customRequests.get(requestId); + + if (!request) { + return; + } + + request.responses.push(response.data.packet); + + request.missingUids.delete(response.uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + request.resolve(null, request.responses); + this.customRequests.delete(requestId); + } + break; + } + + default: + super.onResponse(response); + } + } + + private removeNode(uid: ServerId) { + this.customRequests.forEach((request, requestId) => { + request.missingUids.delete(uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + if (request.type === MessageType.FETCH_SOCKETS) { + request.resolve(request.responses); + } else if (request.type === MessageType.SERVER_SIDE_EMIT) { + request.resolve(null, request.responses); + } + this.customRequests.delete(requestId); + } + }); + + this.nodesMap.delete(uid); + } +} diff --git a/lib/index.ts b/lib/index.ts index 83d4d67..7743723 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -505,3 +505,12 @@ function shouldIncludePacket( const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); return included && notExcluded; } + +export { + ClusterAdapter, + ClusterAdapterWithHeartbeat, + ClusterMessage, + ClusterResponse, + ServerId, + Offset, +} from "./cluster-adapter"; diff --git a/package-lock.json b/package-lock.json index e4dbfec..9a49af0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,23 +1,28 @@ { "name": "socket.io-adapter", - "version": "2.5.1", + "version": "2.5.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "socket.io-adapter", - "version": "2.5.1", + "version": "2.5.2", "license": "MIT", "dependencies": { + "debug": "~4.3.4", "ws": "~8.11.0" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/expect.js": "^0.3.32", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", "mocha": "^10.2.0", "nyc": "^15.1.0", "prettier": "^2.8.1", + "socket.io": "^4.7.4", + "socket.io-client": "^4.7.4", "ts-node": "^10.9.1", "typescript": "^4.9.4" } @@ -373,6 +378,12 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", + "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==", + "dev": true + }, "node_modules/@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -403,18 +414,67 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "node_modules/@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "dev": true + }, + "node_modules/@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "dependencies": { + "@types/ms": "*" + } + }, + "node_modules/@types/expect.js": { + "version": "0.3.32", + "resolved": "https://registry.npmjs.org/@types/expect.js/-/expect.js-0.3.32.tgz", + "integrity": "sha512-vUK0KSPtQTeANmOfiqsNNA/8hJ0xz8gOyB0ZhYRtoYOZBtZYir7ujNGr6GKw2hJAjltW0ocCNIGn9YxIXTT99Q==", + "dev": true + }, "node_modules/@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", "integrity": "sha512-/fvYntiO1GeICvqbQ3doGDIP97vWmvFt83GKguJ6prmQM2iXZfFcq6YE8KteFyRtX2/h5Hf91BYvPodJKFYv5Q==", "dev": true }, + "node_modules/@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "node_modules/@types/node": { "version": "14.11.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.11.2.tgz", "integrity": "sha512-jiE3QIxJ8JLNcb1Ps6rDbysDhN4xa8DJJvuC9prr6w+1tIh+QAbYyNF3tyiZNLDBIuBCf4KEcV2UvQm/V60xfA==", "dev": true }, + "node_modules/accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "dev": true, + "dependencies": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/acorn": { "version": "8.8.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", @@ -522,6 +582,15 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "node_modules/base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "dev": true, + "engines": { + "node": "^4.5.0 || >= 5.9" + } + }, "node_modules/binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -742,6 +811,28 @@ "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", "dev": true }, + "node_modules/cookie": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", + "integrity": "sha512-aSWTXFzaKWkvHO1Ny/s+ePFpvKsPnjc551iI41v3ny/ow6tBG5Vd+FuqGNhh1LxOmVzOlGUriIlOaokOvhaStA==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "dev": true, + "dependencies": { + "object-assign": "^4", + "vary": "^1" + }, + "engines": { + "node": ">= 0.10" + } + }, "node_modules/create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -766,7 +857,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "dependencies": { "ms": "2.1.2" }, @@ -815,6 +905,49 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "node_modules/engine.io": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", + "dev": true, + "dependencies": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.4.1", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/engine.io-client": { + "version": "6.5.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.3.tgz", + "integrity": "sha512-9Z0qLB0NIisTRt1DZ/8U2k12RJn8yls/nXMZLn+/N8hANT3TcYjKFKcwbw5zFQiN4NTde3TSY9zb79e1ij6j9Q==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -855,7 +988,7 @@ "node_modules/expect.js": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/expect.js/-/expect.js-0.3.1.tgz", - "integrity": "sha1-sKWaDS7/VDdUTr8M6qYBWEHQm1s=", + "integrity": "sha512-okDF/FAPEul1ZFLae4hrgpIqAeapoo5TRdcg/lD0iN9S3GWrBFIJwNezGH1DMtIz+RxU4RrFmMq7WUUvDg3J6A==", "dev": true }, "node_modules/fill-range": { @@ -1462,6 +1595,27 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dev": true, + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -1607,8 +1761,7 @@ "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "node_modules/nanoid": { "version": "3.3.3", @@ -1622,6 +1775,15 @@ "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" } }, + "node_modules/negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "dev": true, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -1830,6 +1992,15 @@ "node": ">=6" } }, + "node_modules/object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "dev": true, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -2196,6 +2367,61 @@ "integrity": "sha512-VUJ49FC8U1OxwZLxIbTTrDvLnf/6TDgxZcK8wxR8zs13xpx7xbG60ndBlhNrFi2EMuFRoeDoJO7wthSLq42EjA==", "dev": true }, + "node_modules/socket.io": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "dev": true, + "dependencies": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.5.2", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/socket.io-adapter": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", + "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "dev": true, + "dependencies": { + "ws": "~8.11.0" + } + }, + "node_modules/socket.io-client": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.4.tgz", + "integrity": "sha512-wh+OkeF0rAVCrABWQBaEjLfb7DVPotMbu0cgWgyR0v6eA4EoVnAwcIeIbcdTE3GT/H3kbdLl7OoH2+asoDRIIg==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/source-map": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", @@ -2421,6 +2647,15 @@ "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", "dev": true }, + "node_modules/vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "dev": true, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -2536,6 +2771,15 @@ } } }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", @@ -2940,6 +3184,12 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "@socket.io/component-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", + "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==", + "dev": true + }, "@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -2970,18 +3220,64 @@ "integrity": "sha512-rr+OQyAjxze7GgWrSaJwydHStIhHq2lvY3BOC2Mj7KnzI7XK0Uw1TOOdI9lDoajEbSWLiYgoo4f1R51erQfhPQ==", "dev": true }, + "@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "dev": true + }, + "@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "dev": true, + "requires": { + "@types/node": "*" + } + }, + "@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "requires": { + "@types/ms": "*" + } + }, + "@types/expect.js": { + "version": "0.3.32", + "resolved": "https://registry.npmjs.org/@types/expect.js/-/expect.js-0.3.32.tgz", + "integrity": "sha512-vUK0KSPtQTeANmOfiqsNNA/8hJ0xz8gOyB0ZhYRtoYOZBtZYir7ujNGr6GKw2hJAjltW0ocCNIGn9YxIXTT99Q==", + "dev": true + }, "@types/mocha": { "version": "10.0.1", "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.1.tgz", "integrity": "sha512-/fvYntiO1GeICvqbQ3doGDIP97vWmvFt83GKguJ6prmQM2iXZfFcq6YE8KteFyRtX2/h5Hf91BYvPodJKFYv5Q==", "dev": true }, + "@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true + }, "@types/node": { "version": "14.11.2", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.11.2.tgz", "integrity": "sha512-jiE3QIxJ8JLNcb1Ps6rDbysDhN4xa8DJJvuC9prr6w+1tIh+QAbYyNF3tyiZNLDBIuBCf4KEcV2UvQm/V60xfA==", "dev": true }, + "accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "dev": true, + "requires": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + } + }, "acorn": { "version": "8.8.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", @@ -3065,6 +3361,12 @@ "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true }, + "base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "dev": true + }, "binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -3242,6 +3544,22 @@ } } }, + "cookie": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.2.tgz", + "integrity": "sha512-aSWTXFzaKWkvHO1Ny/s+ePFpvKsPnjc551iI41v3ny/ow6tBG5Vd+FuqGNhh1LxOmVzOlGUriIlOaokOvhaStA==", + "dev": true + }, + "cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "dev": true, + "requires": { + "object-assign": "^4", + "vary": "^1" + } + }, "create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -3263,7 +3581,6 @@ "version": "4.3.4", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", - "dev": true, "requires": { "ms": "2.1.2" } @@ -3295,6 +3612,43 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "engine.io": { + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", + "dev": true, + "requires": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.4.1", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0" + } + }, + "engine.io-client": { + "version": "6.5.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.5.3.tgz", + "integrity": "sha512-9Z0qLB0NIisTRt1DZ/8U2k12RJn8yls/nXMZLn+/N8hANT3TcYjKFKcwbw5zFQiN4NTde3TSY9zb79e1ij6j9Q==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.11.0", + "xmlhttprequest-ssl": "~2.0.0" + } + }, + "engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true + }, "es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -3322,7 +3676,7 @@ "expect.js": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/expect.js/-/expect.js-0.3.1.tgz", - "integrity": "sha1-sKWaDS7/VDdUTr8M6qYBWEHQm1s=", + "integrity": "sha512-okDF/FAPEul1ZFLae4hrgpIqAeapoo5TRdcg/lD0iN9S3GWrBFIJwNezGH1DMtIz+RxU4RrFmMq7WUUvDg3J6A==", "dev": true }, "fill-range": { @@ -3760,6 +4114,21 @@ "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", "dev": true }, + "mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "dev": true + }, + "mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dev": true, + "requires": { + "mime-db": "1.52.0" + } + }, "minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -3869,8 +4238,7 @@ "ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, "nanoid": { "version": "3.3.3", @@ -3878,6 +4246,12 @@ "integrity": "sha512-p1sjXuopFs0xg+fPASzQ28agW1oHD7xDsd9Xkf3T15H3c/cifrFHVwrh74PdoklAPi+i7MdRsE47vm2r6JoB+w==", "dev": true }, + "negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "dev": true + }, "node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -4043,6 +4417,12 @@ } } }, + "object-assign": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", + "dev": true + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -4301,6 +4681,52 @@ "integrity": "sha512-VUJ49FC8U1OxwZLxIbTTrDvLnf/6TDgxZcK8wxR8zs13xpx7xbG60ndBlhNrFi2EMuFRoeDoJO7wthSLq42EjA==", "dev": true }, + "socket.io": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "dev": true, + "requires": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.5.2", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + } + }, + "socket.io-adapter": { + "version": "2.5.2", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", + "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "dev": true, + "requires": { + "ws": "~8.11.0" + } + }, + "socket.io-client": { + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.7.4.tgz", + "integrity": "sha512-wh+OkeF0rAVCrABWQBaEjLfb7DVPotMbu0cgWgyR0v6eA4EoVnAwcIeIbcdTE3GT/H3kbdLl7OoH2+asoDRIIg==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.5.2", + "socket.io-parser": "~4.2.4" + } + }, + "socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "dev": true, + "requires": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + } + }, "source-map": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", @@ -4456,6 +4882,12 @@ "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", "dev": true }, + "vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "dev": true + }, "which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -4538,6 +4970,12 @@ "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", "requires": {} }, + "xmlhttprequest-ssl": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.0.0.tgz", + "integrity": "sha512-QKxVRxiRACQcVuQEYFsI1hhkrMlrXHPegbbd1yn9UHOmRxY+si12nQYzri3vbzt8VdTTRviqcKxcyllFas5z2A==", + "dev": true + }, "y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", diff --git a/package.json b/package.json index 62e9071..081d9fb 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io-adapter", - "version": "2.5.2", + "version": "2.5.3", "license": "MIT", "repository": { "type": "git", @@ -13,20 +13,25 @@ "types": "./dist/index.d.ts", "description": "default socket.io in-memory adapter", "dependencies": { + "debug": "~4.3.4", "ws": "~8.11.0" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/expect.js": "^0.3.32", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "expect.js": "^0.3.1", "mocha": "^10.2.0", "nyc": "^15.1.0", "prettier": "^2.8.1", + "socket.io": "^4.7.4", + "socket.io-client": "^4.7.4", "ts-node": "^10.9.1", "typescript": "^4.9.4" }, "scripts": { - "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts", + "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/*.ts", "format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'", "format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'", "prepack": "tsc" diff --git a/test/cluster-adapter.ts b/test/cluster-adapter.ts new file mode 100644 index 0000000..d5d6fdb --- /dev/null +++ b/test/cluster-adapter.ts @@ -0,0 +1,416 @@ +import { createServer } from "http"; +import { Server, Socket as ServerSocket } from "socket.io"; +import { io as ioc, Socket as ClientSocket } from "socket.io-client"; +import expect = require("expect.js"); +import type { AddressInfo } from "net"; +import { times, shouldNotHappen } from "./util"; +import { + ClusterAdapterWithHeartbeat, + type ClusterMessage, + type ClusterResponse, +} from "../lib"; +import { EventEmitter } from "events"; + +const NODES_COUNT = 3; + +class EventEmitterAdapter extends ClusterAdapterWithHeartbeat { + private offset = 1; + + constructor(nsp, readonly eventBus) { + super(nsp, {}); + this.eventBus.on("message", (message) => { + this.onMessage(message as ClusterMessage); + }); + } + + protected doPublish(message: ClusterMessage): Promise { + this.eventBus.emit("message", message); + return Promise.resolve(String(++this.offset)); + } + + protected doPublishResponse( + requesterUid: string, + response: ClusterResponse + ): Promise { + this.eventBus.emit("message", response); + return Promise.resolve(); + } +} + +describe("cluster adapter", () => { + let servers: Server[], + serverSockets: ServerSocket[], + clientSockets: ClientSocket[]; + + beforeEach((done) => { + servers = []; + serverSockets = []; + clientSockets = []; + + const eventBus = new EventEmitter(); + for (let i = 1; i <= NODES_COUNT; i++) { + const httpServer = createServer(); + const io = new Server(httpServer); + // @ts-ignore + io.adapter(function (nsp) { + return new EventEmitterAdapter(nsp, eventBus); + }); + httpServer.listen(() => { + const port = (httpServer.address() as AddressInfo).port; + const clientSocket = ioc(`http://localhost:${port}`); + + io.on("connection", async (socket) => { + clientSockets.push(clientSocket); + serverSockets.push(socket); + servers.push(io); + if (servers.length === NODES_COUNT) { + // ensure all nodes know each other + servers[0].emit("ping"); + servers[1].emit("ping"); + servers[2].emit("ping"); + + done(); + } + }); + }); + } + }); + + afterEach(() => { + servers.forEach((server) => { + // @ts-ignore + server.httpServer.close(); + server.of("/").adapter.close(); + }); + clientSockets.forEach((socket) => { + socket.disconnect(); + }); + }); + + describe("broadcast", function () { + it("broadcasts to all clients", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("test", (arg1, arg2, arg3) => { + expect(arg1).to.eql(1); + expect(arg2).to.eql("2"); + expect(Buffer.isBuffer(arg3)).to.be(true); + partialDone(); + }); + }); + + servers[0].emit("test", 1, "2", Buffer.from([3, 4])); + }); + + it("broadcasts to all clients in a namespace", (done) => { + const partialDone = times(3, () => { + servers.forEach((server) => server.of("/custom").adapter.close()); + done(); + }); + + servers.forEach((server) => server.of("/custom")); + + const onConnect = times(3, async () => { + servers[0].of("/custom").emit("test"); + }); + + clientSockets.forEach((clientSocket) => { + const socket = clientSocket.io.socket("/custom"); + socket.on("connect", onConnect); + socket.on("test", () => { + socket.disconnect(); + partialDone(); + }); + }); + }); + + it("broadcasts to all clients in a room", (done) => { + serverSockets[1].join("room1"); + + clientSockets[0].on("test", shouldNotHappen(done)); + clientSockets[1].on("test", () => done()); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].to("room1").emit("test"); + }); + + it("broadcasts to all clients except in room", (done) => { + const partialDone = times(2, done); + serverSockets[1].join("room1"); + + clientSockets[0].on("test", () => partialDone()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", () => partialDone()); + + servers[0].of("/").except("room1").emit("test"); + }); + + it("broadcasts to local clients only", (done) => { + clientSockets[0].on("test", () => done()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].local.emit("test"); + }); + + it("broadcasts with multiple acknowledgements", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (cb) => cb(3)); + + servers[0].timeout(50).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + expect(responses).to.contain(3); + + setTimeout(() => { + // @ts-ignore + expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0); + + done(); + }, 50); + }); + }); + + it("broadcasts with multiple acknowledgements (binary content)", (done) => { + clientSockets[0].on("test", (cb) => cb(Buffer.from([1]))); + clientSockets[1].on("test", (cb) => cb(Buffer.from([2]))); + clientSockets[2].on("test", (cb) => cb(Buffer.from([3]))); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + responses.forEach((response) => { + expect(Buffer.isBuffer(response)).to.be(true); + }); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (no client)", (done) => { + servers[0] + .to("abc") + .timeout(500) + .emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.eql([]); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (timeout)", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (_cb) => { + // do nothing + }); + + servers[0].timeout(50).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be.an(Error); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + + done(); + }); + }); + + it("broadcasts with a single acknowledgement (local)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const response = await serverSockets[1].emitWithAck("test"); + expect(response).to.eql(2); + }); + + it("broadcasts with a single acknowledgement (remote)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const sockets = await servers[0].in(serverSockets[1].id).fetchSockets(); + expect(sockets.length).to.eql(1); + + const response = await sockets[0].timeout(500).emitWithAck("test"); + expect(response).to.eql(2); + }); + }); + + describe("socketsJoin", () => { + it("makes all socket instances join the specified room", async () => { + servers[0].socketsJoin("room1"); + + expect(serverSockets[0].rooms.has("room1")).to.be(true); + expect(serverSockets[1].rooms.has("room1")).to.be(true); + expect(serverSockets[2].rooms.has("room1")).to.be(true); + }); + + it("makes the matching socket instances join the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].in("room1").socketsJoin("room2"); + + expect(serverSockets[0].rooms.has("room2")).to.be(true); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance join the specified room", async () => { + servers[0].in(serverSockets[1].id).socketsJoin("room3"); + + expect(serverSockets[0].rooms.has("room3")).to.be(false); + expect(serverSockets[1].rooms.has("room3")).to.be(true); + expect(serverSockets[2].rooms.has("room3")).to.be(false); + }); + }); + + describe("socketsLeave", () => { + it("makes all socket instances leave the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].socketsLeave("room1"); + + expect(serverSockets[0].rooms.has("room1")).to.be(false); + expect(serverSockets[1].rooms.has("room1")).to.be(false); + expect(serverSockets[2].rooms.has("room1")).to.be(false); + }); + + it("makes the matching socket instances leave the specified room", async () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join(["room1", "room2"]); + serverSockets[2].join(["room2"]); + + servers[0].in("room1").socketsLeave("room2"); + + expect(serverSockets[0].rooms.has("room2")).to.be(false); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance leave the specified room", async () => { + serverSockets[0].join("room3"); + serverSockets[1].join("room3"); + serverSockets[2].join("room3"); + + servers[0].in(serverSockets[1].id).socketsLeave("room3"); + + expect(serverSockets[0].rooms.has("room3")).to.be(true); + expect(serverSockets[1].rooms.has("room3")).to.be(false); + expect(serverSockets[2].rooms.has("room3")).to.be(true); + }); + }); + + describe("disconnectSockets", () => { + it("makes all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", (reason) => { + expect(reason).to.eql("io server disconnect"); + partialDone(); + }); + }); + + servers[0].disconnectSockets(true); + }); + }); + + describe("fetchSockets", () => { + it("returns all socket instances", async () => { + const sockets = await servers[0].fetchSockets(); + + expect(sockets).to.be.an(Array); + expect(sockets).to.have.length(3); + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); // clean up + }); + + it("returns a single socket instance", async () => { + serverSockets[1].data = "test" as any; + + const [remoteSocket] = await servers[0] + .in(serverSockets[1].id) + .fetchSockets(); + + expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); + expect(remoteSocket.data).to.eql("test"); + expect(remoteSocket.rooms.size).to.eql(1); + }); + + it("returns only local socket instances", async () => { + const sockets = await servers[0].local.fetchSockets(); + + expect(sockets).to.have.length(1); + }); + }); + + describe("serverSideEmit", () => { + it("sends an event to other server instances", (done) => { + const partialDone = times(2, done); + + servers[0].on("hello", shouldNotHappen(done)); + + servers[1].on("hello", (arg1, arg2, arg3) => { + expect(arg1).to.eql("world"); + expect(arg2).to.eql(1); + expect(arg3).to.eql("2"); + partialDone(); + }); + + servers[2].of("/").on("hello", () => partialDone()); + + servers[0].serverSideEmit("hello", "world", 1, "2"); + }); + + it("sends an event and receives a response from the other server instances", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", (cb) => cb("3")); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + expect(response).to.contain("3"); + done(); + }); + }); + + it("sends an event but timeout if one server does not respond", function (done) { + this.timeout(6000); + + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + // do nothing + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err.message).to.be("timeout reached: missing 1 responses"); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + + it("succeeds even if an instance leaves the cluster", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + servers[2].of("/").adapter.close(); + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + }); +}); diff --git a/test/util.ts b/test/util.ts new file mode 100644 index 0000000..8109ef1 --- /dev/null +++ b/test/util.ts @@ -0,0 +1,15 @@ +export function times(count: number, fn: () => void) { + let i = 0; + return () => { + i++; + if (i === count) { + fn(); + } else if (i > count) { + throw new Error(`too many calls: ${i} instead of ${count}`); + } + }; +} + +export function shouldNotHappen(done) { + return () => done(new Error("should not happen")); +}