diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1d69694..da08b0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,7 @@ jobs: strategy: matrix: node-version: + - 14 - 20 steps: @@ -31,5 +32,11 @@ jobs: - name: Install dependencies run: npm ci + # the "override" keyword was added in typescript@4.5.0 + # else, users can go down to typescript@3.8.x ("import type") + - name: Install TypeScript 4.5 + run: npm i typescript@4.5 + if: ${{ matrix.node-version == '14' }} + - name: Run tests run: npm test diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e5447a..c06c36a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # History +- [2.5.4](#254-2024-02-22) (Feb 2024) - [2.5.3](#253-2024-02-21) (Feb 2024) - [2.5.2](#252-2023-01-12) (Jan 2023) - [2.5.1](#251-2023-01-06) (Jan 2023) @@ -20,6 +21,16 @@ # Release notes +## [2.5.4](https://github.com/socketio/socket.io-adapter/compare/2.5.3...2.5.4) (2024-02-22) + + +### Bug Fixes + +* ensure the order of the commands ([a13f35f](https://github.com/socketio/socket.io-adapter/commit/a13f35f0e6b85bbba07f99ee2440e914f1429d83)) +* **types:** ensure compatibility with TypeScript < 4.5 ([ca397f3](https://github.com/socketio/socket.io-adapter/commit/ca397f3afe06ed9390db52b70a506a9721e091d8)) + + + ## [2.5.3](https://github.com/socketio/socket.io-adapter/compare/2.5.2...2.5.3) (2024-02-21) Two abstract classes were imported from the [Redis adapter repository](https://github.com/socketio/socket.io-redis-adapter/blob/bd32763043a2eb79a21dffd8820f20e598348adf/lib/cluster-adapter.ts): diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index 8858d0e..143bb6f 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -1,9 +1,9 @@ -import { - Adapter, - type BroadcastFlags, - type BroadcastOptions, - type Room, -} from "./index"; +import { Adapter } from "./in-memory-adapter"; +import type { + BroadcastFlags, + BroadcastOptions, + Room, +} from "./in-memory-adapter"; import { debug as debugModule } from "debug"; import { randomBytes } from "crypto"; @@ -503,55 +503,64 @@ export abstract class ClusterAdapter extends Adapter { super.broadcastWithAck(packet, opts, clientCountCallback, ack); } - override addSockets(opts: BroadcastOptions, rooms: Room[]) { - super.addSockets(opts, rooms); - + override async addSockets(opts: BroadcastOptions, rooms: Room[]) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.SOCKETS_JOIN, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); + super.addSockets(opts, rooms); } - override delSockets(opts: BroadcastOptions, rooms: Room[]) { - super.delSockets(opts, rooms); - + override async delSockets(opts: BroadcastOptions, rooms: Room[]) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.SOCKETS_LEAVE, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); + super.delSockets(opts, rooms); } - override disconnectSockets(opts: BroadcastOptions, close: boolean) { - super.disconnectSockets(opts, close); - + override async disconnectSockets(opts: BroadcastOptions, close: boolean) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions(opts), + close, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.DISCONNECT_SOCKETS, - data: { - opts: encodeOptions(opts), - close, - }, - }); + super.disconnectSockets(opts, close); } async fetchSockets(opts: BroadcastOptions): Promise { diff --git a/lib/in-memory-adapter.ts b/lib/in-memory-adapter.ts new file mode 100644 index 0000000..83d4d67 --- /dev/null +++ b/lib/in-memory-adapter.ts @@ -0,0 +1,507 @@ +import { EventEmitter } from "events"; +import { yeast } from "./contrib/yeast"; +import WebSocket = require("ws"); + +const canPreComputeFrame = typeof WebSocket?.Sender?.frame === "function"; + +/** + * A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging + */ +export type SocketId = string; +/** + * A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery + * upon reconnection + */ +export type PrivateSessionId = string; + +// we could extend the Room type to "string | number", but that would be a breaking change +// related: https://github.com/socketio/socket.io-redis-adapter/issues/418 +export type Room = string; + +export interface BroadcastFlags { + volatile?: boolean; + compress?: boolean; + local?: boolean; + broadcast?: boolean; + binary?: boolean; + timeout?: number; +} + +export interface BroadcastOptions { + rooms: Set; + except?: Set; + flags?: BroadcastFlags; +} + +interface SessionToPersist { + sid: SocketId; + pid: PrivateSessionId; + rooms: Room[]; + data: unknown; +} + +export type Session = SessionToPersist & { missedPackets: unknown[][] }; + +export class Adapter extends EventEmitter { + public rooms: Map> = new Map(); + public sids: Map> = new Map(); + private readonly encoder; + + /** + * In-memory adapter constructor. + * + * @param {Namespace} nsp + */ + constructor(readonly nsp: any) { + super(); + this.encoder = nsp.server.encoder; + } + + /** + * To be overridden + */ + public init(): Promise | void {} + + /** + * To be overridden + */ + public close(): Promise | void {} + + /** + * Returns the number of Socket.IO servers in the cluster + * + * @public + */ + public serverCount(): Promise { + return Promise.resolve(1); + } + + /** + * Adds a socket to a list of room. + * + * @param {SocketId} id the socket id + * @param {Set} rooms a set of rooms + * @public + */ + public addAll(id: SocketId, rooms: Set): Promise | void { + if (!this.sids.has(id)) { + this.sids.set(id, new Set()); + } + + for (const room of rooms) { + this.sids.get(id).add(room); + + if (!this.rooms.has(room)) { + this.rooms.set(room, new Set()); + this.emit("create-room", room); + } + if (!this.rooms.get(room).has(id)) { + this.rooms.get(room).add(id); + this.emit("join-room", room, id); + } + } + } + + /** + * Removes a socket from a room. + * + * @param {SocketId} id the socket id + * @param {Room} room the room name + */ + public del(id: SocketId, room: Room): Promise | void { + if (this.sids.has(id)) { + this.sids.get(id).delete(room); + } + + this._del(room, id); + } + + private _del(room: Room, id: SocketId) { + const _room = this.rooms.get(room); + if (_room != null) { + const deleted = _room.delete(id); + if (deleted) { + this.emit("leave-room", room, id); + } + if (_room.size === 0 && this.rooms.delete(room)) { + this.emit("delete-room", room); + } + } + } + + /** + * Removes a socket from all rooms it's joined. + * + * @param {SocketId} id the socket id + */ + public delAll(id: SocketId): void { + if (!this.sids.has(id)) { + return; + } + + for (const room of this.sids.get(id)) { + this._del(room, id); + } + + this.sids.delete(id); + } + + /** + * Broadcasts a packet. + * + * Options: + * - `flags` {Object} flags for this packet + * - `except` {Array} sids that should be excluded + * - `rooms` {Array} list of rooms to broadcast to + * + * @param {Object} packet the packet object + * @param {Object} opts the options + * @public + */ + public broadcast(packet: any, opts: BroadcastOptions): void { + const flags = opts.flags || {}; + const packetOpts = { + preEncoded: true, + volatile: flags.volatile, + compress: flags.compress, + }; + + packet.nsp = this.nsp.name; + const encodedPackets = this._encode(packet, packetOpts); + + this.apply(opts, (socket) => { + if (typeof socket.notifyOutgoingListeners === "function") { + socket.notifyOutgoingListeners(packet); + } + + socket.client.writeToEngine(encodedPackets, packetOpts); + }); + } + + /** + * Broadcasts a packet and expects multiple acknowledgements. + * + * Options: + * - `flags` {Object} flags for this packet + * - `except` {Array} sids that should be excluded + * - `rooms` {Array} list of rooms to broadcast to + * + * @param {Object} packet the packet object + * @param {Object} opts the options + * @param clientCountCallback - the number of clients that received the packet + * @param ack - the callback that will be called for each client response + * + * @public + */ + public broadcastWithAck( + packet: any, + opts: BroadcastOptions, + clientCountCallback: (clientCount: number) => void, + ack: (...args: any[]) => void + ) { + const flags = opts.flags || {}; + const packetOpts = { + preEncoded: true, + volatile: flags.volatile, + compress: flags.compress, + }; + + packet.nsp = this.nsp.name; + // we can use the same id for each packet, since the _ids counter is common (no duplicate) + packet.id = this.nsp._ids++; + + const encodedPackets = this._encode(packet, packetOpts); + + let clientCount = 0; + + this.apply(opts, (socket) => { + // track the total number of acknowledgements that are expected + clientCount++; + // call the ack callback for each client response + socket.acks.set(packet.id, ack); + + if (typeof socket.notifyOutgoingListeners === "function") { + socket.notifyOutgoingListeners(packet); + } + + socket.client.writeToEngine(encodedPackets, packetOpts); + }); + + clientCountCallback(clientCount); + } + + private _encode(packet: unknown, packetOpts: Record) { + const encodedPackets = this.encoder.encode(packet); + + if ( + canPreComputeFrame && + encodedPackets.length === 1 && + typeof encodedPackets[0] === "string" + ) { + // "4" being the "message" packet type in the Engine.IO protocol + const data = Buffer.from("4" + encodedPackets[0]); + // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 + packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, { + readOnly: false, + mask: false, + rsv1: false, + opcode: 1, + fin: true, + }); + } + + return encodedPackets; + } + + /** + * Gets a list of sockets by sid. + * + * @param {Set} rooms the explicit set of rooms to check. + */ + public sockets(rooms: Set): Promise> { + const sids = new Set(); + + this.apply({ rooms }, (socket) => { + sids.add(socket.id); + }); + + return Promise.resolve(sids); + } + + /** + * Gets the list of rooms a given socket has joined. + * + * @param {SocketId} id the socket id + */ + public socketRooms(id: SocketId): Set | undefined { + return this.sids.get(id); + } + + /** + * Returns the matching socket instances + * + * @param opts - the filters to apply + */ + public fetchSockets(opts: BroadcastOptions): Promise { + const sockets = []; + + this.apply(opts, (socket) => { + sockets.push(socket); + }); + + return Promise.resolve(sockets); + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param opts - the filters to apply + * @param rooms - the rooms to join + */ + public addSockets(opts: BroadcastOptions, rooms: Room[]): void { + this.apply(opts, (socket) => { + socket.join(rooms); + }); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param opts - the filters to apply + * @param rooms - the rooms to leave + */ + public delSockets(opts: BroadcastOptions, rooms: Room[]): void { + this.apply(opts, (socket) => { + rooms.forEach((room) => socket.leave(room)); + }); + } + + /** + * Makes the matching socket instances disconnect + * + * @param opts - the filters to apply + * @param close - whether to close the underlying connection + */ + public disconnectSockets(opts: BroadcastOptions, close: boolean): void { + this.apply(opts, (socket) => { + socket.disconnect(close); + }); + } + + private apply(opts: BroadcastOptions, callback: (socket) => void): void { + const rooms = opts.rooms; + const except = this.computeExceptSids(opts.except); + + if (rooms.size) { + const ids = new Set(); + for (const room of rooms) { + if (!this.rooms.has(room)) continue; + + for (const id of this.rooms.get(room)) { + if (ids.has(id) || except.has(id)) continue; + const socket = this.nsp.sockets.get(id); + if (socket) { + callback(socket); + ids.add(id); + } + } + } + } else { + for (const [id] of this.sids) { + if (except.has(id)) continue; + const socket = this.nsp.sockets.get(id); + if (socket) callback(socket); + } + } + } + + private computeExceptSids(exceptRooms?: Set) { + const exceptSids = new Set(); + if (exceptRooms && exceptRooms.size > 0) { + for (const room of exceptRooms) { + if (this.rooms.has(room)) { + this.rooms.get(room).forEach((sid) => exceptSids.add(sid)); + } + } + } + return exceptSids; + } + + /** + * Send a packet to the other Socket.IO servers in the cluster + * @param packet - an array of arguments, which may include an acknowledgement callback at the end + */ + public serverSideEmit(packet: any[]): void { + console.warn( + "this adapter does not support the serverSideEmit() functionality" + ); + } + + /** + * Save the client session in order to restore it upon reconnection. + */ + public persistSession(session: SessionToPersist) {} + + /** + * Restore the session and find the packets that were missed by the client. + * @param pid + * @param offset + */ + public restoreSession( + pid: PrivateSessionId, + offset: string + ): Promise { + return null; + } +} + +interface PersistedPacket { + id: string; + emittedAt: number; + data: unknown[]; + opts: BroadcastOptions; +} + +type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number }; + +export class SessionAwareAdapter extends Adapter { + private readonly maxDisconnectionDuration: number; + + private sessions: Map = new Map(); + private packets: PersistedPacket[] = []; + + constructor(readonly nsp: any) { + super(nsp); + this.maxDisconnectionDuration = + nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; + + const timer = setInterval(() => { + const threshold = Date.now() - this.maxDisconnectionDuration; + this.sessions.forEach((session, sessionId) => { + const hasExpired = session.disconnectedAt < threshold; + if (hasExpired) { + this.sessions.delete(sessionId); + } + }); + for (let i = this.packets.length - 1; i >= 0; i--) { + const hasExpired = this.packets[i].emittedAt < threshold; + if (hasExpired) { + this.packets.splice(0, i + 1); + break; + } + } + }, 60 * 1000); + // prevents the timer from keeping the process alive + timer.unref(); + } + + override persistSession(session: SessionToPersist) { + (session as SessionWithTimestamp).disconnectedAt = Date.now(); + this.sessions.set(session.pid, session as SessionWithTimestamp); + } + + override restoreSession( + pid: PrivateSessionId, + offset: string + ): Promise { + const session = this.sessions.get(pid); + if (!session) { + // the session may have expired + return null; + } + const hasExpired = + session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); + if (hasExpired) { + // the session has expired + this.sessions.delete(pid); + return null; + } + const index = this.packets.findIndex((packet) => packet.id === offset); + if (index === -1) { + // the offset may be too old + return null; + } + const missedPackets = []; + for (let i = index + 1; i < this.packets.length; i++) { + const packet = this.packets[i]; + if (shouldIncludePacket(session.rooms, packet.opts)) { + missedPackets.push(packet.data); + } + } + return Promise.resolve({ + ...session, + missedPackets, + }); + } + + override broadcast(packet: any, opts: BroadcastOptions) { + const isEventPacket = packet.type === 2; + // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and + // restored on another server upon reconnection + const withoutAcknowledgement = packet.id === undefined; + const notVolatile = opts.flags?.volatile === undefined; + if (isEventPacket && withoutAcknowledgement && notVolatile) { + const id = yeast(); + // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has + // processed (and the format is backward-compatible) + packet.data.push(id); + this.packets.push({ + id, + opts, + data: packet.data, + emittedAt: Date.now(), + }); + } + super.broadcast(packet, opts); + } +} + +function shouldIncludePacket( + sessionRooms: Room[], + opts: BroadcastOptions +): boolean { + const included = + opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); + const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); + return included && notExcluded; +} diff --git a/lib/index.ts b/lib/index.ts index 7743723..2665139 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,516 +1,21 @@ -import { EventEmitter } from "events"; -import { yeast } from "./contrib/yeast"; -import WebSocket = require("ws"); - -const canPreComputeFrame = typeof WebSocket?.Sender?.frame === "function"; - -/** - * A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging - */ -export type SocketId = string; -/** - * A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery - * upon reconnection - */ -export type PrivateSessionId = string; - -// we could extend the Room type to "string | number", but that would be a breaking change -// related: https://github.com/socketio/socket.io-redis-adapter/issues/418 -export type Room = string; - -export interface BroadcastFlags { - volatile?: boolean; - compress?: boolean; - local?: boolean; - broadcast?: boolean; - binary?: boolean; - timeout?: number; -} - -export interface BroadcastOptions { - rooms: Set; - except?: Set; - flags?: BroadcastFlags; -} - -interface SessionToPersist { - sid: SocketId; - pid: PrivateSessionId; - rooms: Room[]; - data: unknown; -} - -export type Session = SessionToPersist & { missedPackets: unknown[][] }; - -export class Adapter extends EventEmitter { - public rooms: Map> = new Map(); - public sids: Map> = new Map(); - private readonly encoder; - - /** - * In-memory adapter constructor. - * - * @param {Namespace} nsp - */ - constructor(readonly nsp: any) { - super(); - this.encoder = nsp.server.encoder; - } - - /** - * To be overridden - */ - public init(): Promise | void {} - - /** - * To be overridden - */ - public close(): Promise | void {} - - /** - * Returns the number of Socket.IO servers in the cluster - * - * @public - */ - public serverCount(): Promise { - return Promise.resolve(1); - } - - /** - * Adds a socket to a list of room. - * - * @param {SocketId} id the socket id - * @param {Set} rooms a set of rooms - * @public - */ - public addAll(id: SocketId, rooms: Set): Promise | void { - if (!this.sids.has(id)) { - this.sids.set(id, new Set()); - } - - for (const room of rooms) { - this.sids.get(id).add(room); - - if (!this.rooms.has(room)) { - this.rooms.set(room, new Set()); - this.emit("create-room", room); - } - if (!this.rooms.get(room).has(id)) { - this.rooms.get(room).add(id); - this.emit("join-room", room, id); - } - } - } - - /** - * Removes a socket from a room. - * - * @param {SocketId} id the socket id - * @param {Room} room the room name - */ - public del(id: SocketId, room: Room): Promise | void { - if (this.sids.has(id)) { - this.sids.get(id).delete(room); - } - - this._del(room, id); - } - - private _del(room: Room, id: SocketId) { - const _room = this.rooms.get(room); - if (_room != null) { - const deleted = _room.delete(id); - if (deleted) { - this.emit("leave-room", room, id); - } - if (_room.size === 0 && this.rooms.delete(room)) { - this.emit("delete-room", room); - } - } - } - - /** - * Removes a socket from all rooms it's joined. - * - * @param {SocketId} id the socket id - */ - public delAll(id: SocketId): void { - if (!this.sids.has(id)) { - return; - } - - for (const room of this.sids.get(id)) { - this._del(room, id); - } - - this.sids.delete(id); - } - - /** - * Broadcasts a packet. - * - * Options: - * - `flags` {Object} flags for this packet - * - `except` {Array} sids that should be excluded - * - `rooms` {Array} list of rooms to broadcast to - * - * @param {Object} packet the packet object - * @param {Object} opts the options - * @public - */ - public broadcast(packet: any, opts: BroadcastOptions): void { - const flags = opts.flags || {}; - const packetOpts = { - preEncoded: true, - volatile: flags.volatile, - compress: flags.compress, - }; - - packet.nsp = this.nsp.name; - const encodedPackets = this._encode(packet, packetOpts); - - this.apply(opts, (socket) => { - if (typeof socket.notifyOutgoingListeners === "function") { - socket.notifyOutgoingListeners(packet); - } - - socket.client.writeToEngine(encodedPackets, packetOpts); - }); - } - - /** - * Broadcasts a packet and expects multiple acknowledgements. - * - * Options: - * - `flags` {Object} flags for this packet - * - `except` {Array} sids that should be excluded - * - `rooms` {Array} list of rooms to broadcast to - * - * @param {Object} packet the packet object - * @param {Object} opts the options - * @param clientCountCallback - the number of clients that received the packet - * @param ack - the callback that will be called for each client response - * - * @public - */ - public broadcastWithAck( - packet: any, - opts: BroadcastOptions, - clientCountCallback: (clientCount: number) => void, - ack: (...args: any[]) => void - ) { - const flags = opts.flags || {}; - const packetOpts = { - preEncoded: true, - volatile: flags.volatile, - compress: flags.compress, - }; - - packet.nsp = this.nsp.name; - // we can use the same id for each packet, since the _ids counter is common (no duplicate) - packet.id = this.nsp._ids++; - - const encodedPackets = this._encode(packet, packetOpts); - - let clientCount = 0; - - this.apply(opts, (socket) => { - // track the total number of acknowledgements that are expected - clientCount++; - // call the ack callback for each client response - socket.acks.set(packet.id, ack); - - if (typeof socket.notifyOutgoingListeners === "function") { - socket.notifyOutgoingListeners(packet); - } - - socket.client.writeToEngine(encodedPackets, packetOpts); - }); - - clientCountCallback(clientCount); - } - - private _encode(packet: unknown, packetOpts: Record) { - const encodedPackets = this.encoder.encode(packet); - - if ( - canPreComputeFrame && - encodedPackets.length === 1 && - typeof encodedPackets[0] === "string" - ) { - // "4" being the "message" packet type in the Engine.IO protocol - const data = Buffer.from("4" + encodedPackets[0]); - // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 - packetOpts.wsPreEncodedFrame = WebSocket.Sender.frame(data, { - readOnly: false, - mask: false, - rsv1: false, - opcode: 1, - fin: true, - }); - } - - return encodedPackets; - } - - /** - * Gets a list of sockets by sid. - * - * @param {Set} rooms the explicit set of rooms to check. - */ - public sockets(rooms: Set): Promise> { - const sids = new Set(); - - this.apply({ rooms }, (socket) => { - sids.add(socket.id); - }); - - return Promise.resolve(sids); - } - - /** - * Gets the list of rooms a given socket has joined. - * - * @param {SocketId} id the socket id - */ - public socketRooms(id: SocketId): Set | undefined { - return this.sids.get(id); - } - - /** - * Returns the matching socket instances - * - * @param opts - the filters to apply - */ - public fetchSockets(opts: BroadcastOptions): Promise { - const sockets = []; - - this.apply(opts, (socket) => { - sockets.push(socket); - }); - - return Promise.resolve(sockets); - } - - /** - * Makes the matching socket instances join the specified rooms - * - * @param opts - the filters to apply - * @param rooms - the rooms to join - */ - public addSockets(opts: BroadcastOptions, rooms: Room[]): void { - this.apply(opts, (socket) => { - socket.join(rooms); - }); - } - - /** - * Makes the matching socket instances leave the specified rooms - * - * @param opts - the filters to apply - * @param rooms - the rooms to leave - */ - public delSockets(opts: BroadcastOptions, rooms: Room[]): void { - this.apply(opts, (socket) => { - rooms.forEach((room) => socket.leave(room)); - }); - } - - /** - * Makes the matching socket instances disconnect - * - * @param opts - the filters to apply - * @param close - whether to close the underlying connection - */ - public disconnectSockets(opts: BroadcastOptions, close: boolean): void { - this.apply(opts, (socket) => { - socket.disconnect(close); - }); - } - - private apply(opts: BroadcastOptions, callback: (socket) => void): void { - const rooms = opts.rooms; - const except = this.computeExceptSids(opts.except); - - if (rooms.size) { - const ids = new Set(); - for (const room of rooms) { - if (!this.rooms.has(room)) continue; - - for (const id of this.rooms.get(room)) { - if (ids.has(id) || except.has(id)) continue; - const socket = this.nsp.sockets.get(id); - if (socket) { - callback(socket); - ids.add(id); - } - } - } - } else { - for (const [id] of this.sids) { - if (except.has(id)) continue; - const socket = this.nsp.sockets.get(id); - if (socket) callback(socket); - } - } - } - - private computeExceptSids(exceptRooms?: Set) { - const exceptSids = new Set(); - if (exceptRooms && exceptRooms.size > 0) { - for (const room of exceptRooms) { - if (this.rooms.has(room)) { - this.rooms.get(room).forEach((sid) => exceptSids.add(sid)); - } - } - } - return exceptSids; - } - - /** - * Send a packet to the other Socket.IO servers in the cluster - * @param packet - an array of arguments, which may include an acknowledgement callback at the end - */ - public serverSideEmit(packet: any[]): void { - console.warn( - "this adapter does not support the serverSideEmit() functionality" - ); - } - - /** - * Save the client session in order to restore it upon reconnection. - */ - public persistSession(session: SessionToPersist) {} - - /** - * Restore the session and find the packets that were missed by the client. - * @param pid - * @param offset - */ - public restoreSession( - pid: PrivateSessionId, - offset: string - ): Promise { - return null; - } -} - -interface PersistedPacket { - id: string; - emittedAt: number; - data: unknown[]; - opts: BroadcastOptions; -} - -type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number }; - -export class SessionAwareAdapter extends Adapter { - private readonly maxDisconnectionDuration: number; - - private sessions: Map = new Map(); - private packets: PersistedPacket[] = []; - - constructor(readonly nsp: any) { - super(nsp); - this.maxDisconnectionDuration = - nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; - - const timer = setInterval(() => { - const threshold = Date.now() - this.maxDisconnectionDuration; - this.sessions.forEach((session, sessionId) => { - const hasExpired = session.disconnectedAt < threshold; - if (hasExpired) { - this.sessions.delete(sessionId); - } - }); - for (let i = this.packets.length - 1; i >= 0; i--) { - const hasExpired = this.packets[i].emittedAt < threshold; - if (hasExpired) { - this.packets.splice(0, i + 1); - break; - } - } - }, 60 * 1000); - // prevents the timer from keeping the process alive - timer.unref(); - } - - override persistSession(session: SessionToPersist) { - (session as SessionWithTimestamp).disconnectedAt = Date.now(); - this.sessions.set(session.pid, session as SessionWithTimestamp); - } - - override restoreSession( - pid: PrivateSessionId, - offset: string - ): Promise { - const session = this.sessions.get(pid); - if (!session) { - // the session may have expired - return null; - } - const hasExpired = - session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); - if (hasExpired) { - // the session has expired - this.sessions.delete(pid); - return null; - } - const index = this.packets.findIndex((packet) => packet.id === offset); - if (index === -1) { - // the offset may be too old - return null; - } - const missedPackets = []; - for (let i = index + 1; i < this.packets.length; i++) { - const packet = this.packets[i]; - if (shouldIncludePacket(session.rooms, packet.opts)) { - missedPackets.push(packet.data); - } - } - return Promise.resolve({ - ...session, - missedPackets, - }); - } - - override broadcast(packet: any, opts: BroadcastOptions) { - const isEventPacket = packet.type === 2; - // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and - // restored on another server upon reconnection - const withoutAcknowledgement = packet.id === undefined; - const notVolatile = opts.flags?.volatile === undefined; - if (isEventPacket && withoutAcknowledgement && notVolatile) { - const id = yeast(); - // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has - // processed (and the format is backward-compatible) - packet.data.push(id); - this.packets.push({ - id, - opts, - data: packet.data, - emittedAt: Date.now(), - }); - } - super.broadcast(packet, opts); - } -} - -function shouldIncludePacket( - sessionRooms: Room[], - opts: BroadcastOptions -): boolean { - const included = - opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); - const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); - return included && notExcluded; -} +export { + SocketId, + PrivateSessionId, + Room, + BroadcastFlags, + BroadcastOptions, + Session, + Adapter, + SessionAwareAdapter, +} from "./in-memory-adapter"; export { ClusterAdapter, ClusterAdapterWithHeartbeat, + ClusterAdapterOptions, ClusterMessage, ClusterResponse, + MessageType, ServerId, Offset, } from "./cluster-adapter"; diff --git a/package.json b/package.json index 081d9fb..bd165e8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "socket.io-adapter", - "version": "2.5.3", + "version": "2.5.4", "license": "MIT", "repository": { "type": "git", diff --git a/test/cluster-adapter.ts b/test/cluster-adapter.ts index d5d6fdb..350ecda 100644 --- a/test/cluster-adapter.ts +++ b/test/cluster-adapter.ts @@ -3,7 +3,7 @@ import { Server, Socket as ServerSocket } from "socket.io"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; import expect = require("expect.js"); import type { AddressInfo } from "net"; -import { times, shouldNotHappen } from "./util"; +import { times, shouldNotHappen, sleep } from "./util"; import { ClusterAdapterWithHeartbeat, type ClusterMessage, @@ -243,6 +243,8 @@ describe("cluster adapter", () => { it("makes all socket instances join the specified room", async () => { servers[0].socketsJoin("room1"); + await sleep(); + expect(serverSockets[0].rooms.has("room1")).to.be(true); expect(serverSockets[1].rooms.has("room1")).to.be(true); expect(serverSockets[2].rooms.has("room1")).to.be(true); @@ -254,6 +256,8 @@ describe("cluster adapter", () => { servers[0].in("room1").socketsJoin("room2"); + await sleep(); + expect(serverSockets[0].rooms.has("room2")).to.be(true); expect(serverSockets[1].rooms.has("room2")).to.be(false); expect(serverSockets[2].rooms.has("room2")).to.be(true); @@ -275,6 +279,8 @@ describe("cluster adapter", () => { servers[0].socketsLeave("room1"); + await sleep(); + expect(serverSockets[0].rooms.has("room1")).to.be(false); expect(serverSockets[1].rooms.has("room1")).to.be(false); expect(serverSockets[2].rooms.has("room1")).to.be(false); @@ -287,6 +293,8 @@ describe("cluster adapter", () => { servers[0].in("room1").socketsLeave("room2"); + await sleep(); + expect(serverSockets[0].rooms.has("room2")).to.be(false); expect(serverSockets[1].rooms.has("room2")).to.be(false); expect(serverSockets[2].rooms.has("room2")).to.be(true); @@ -318,6 +326,22 @@ describe("cluster adapter", () => { servers[0].disconnectSockets(true); }); + + it("sends a packet before all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", shouldNotHappen(done)); + + clientSocket.on("bye", () => { + clientSocket.off("disconnect"); + clientSocket.on("disconnect", partialDone); + }); + }); + + servers[0].emit("bye"); + servers[0].disconnectSockets(true); + }); }); describe("fetchSockets", () => { diff --git a/test/util.ts b/test/util.ts index 8109ef1..ecdf6df 100644 --- a/test/util.ts +++ b/test/util.ts @@ -13,3 +13,7 @@ export function times(count: number, fn: () => void) { export function shouldNotHappen(done) { return () => done(new Error("should not happen")); } + +export function sleep() { + return new Promise((resolve) => process.nextTick(resolve)); +}