From 93e28198ec85e595444b8fcb522d930bca67c5aa Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 17 May 2023 14:36:15 +0200 Subject: [PATCH 01/11] docs: fix usage example Reference: https://github.com/redis/node-redis/blob/master/docs/client-configuration.md Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/5 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2a9cedb..5e94008 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ import { createClient } from "redis"; import { Server } from "socket.io"; import { createAdapter } from "@socket.io/redis-streams-adapter"; -const redisClient = createClient({ host: "localhost", port: 6379 }); +const redisClient = createClient({ url: "redis://localhost:6379" }); await redisClient.connect(); From 65d0539eb7eb9649845ee744a4de4e0646fa194e Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Tue, 13 Feb 2024 15:06:34 +0100 Subject: [PATCH 02/11] test: run tests against a Redis cluster Related: - https://github.com/socketio/socket.io-redis-streams-adapter/issues/3 - https://github.com/socketio/socket.io-redis-streams-adapter/issues/10 --- .github/workflows/ci.yml | 10 +++- compose.yaml | 10 ++++ docker-compose.yml | 5 -- package-lock.json | 98 ++++++++++++++++++++-------------------- package.json | 4 +- test/util.ts | 49 ++++++++++++++++++-- 6 files changed, 116 insertions(+), 60 deletions(-) create mode 100644 compose.yaml delete mode 100644 docker-compose.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eab5ee5..566a07a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,15 @@ jobs: --health-retries 5 ports: - 6379:6379 - + redis-cluster: + image: grokzen/redis-cluster:7.0.10 + options: >- + --health-cmd "redis-cli -p 7005 ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - "7000-7005:7000-7005" steps: - name: Checkout repository uses: actions/checkout@v3 diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..e0b0417 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,10 @@ +services: + redis: + image: redis:5 + ports: + - "6379:6379" + + redis-cluster: + image: grokzen/redis-cluster:7.0.10 + ports: + - "7000-7005:7000-7005" diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index fdf5d8e..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,5 +0,0 @@ -services: - redis: - image: redis:5 - ports: - - "6379:6379" diff --git a/package-lock.json b/package-lock.json index b0cd291..d32e21c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@socket.io/redis-streams-adapter", - "version": "0.0.1", + "version": "0.1.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@socket.io/redis-streams-adapter", - "version": "0.0.1", + "version": "0.1.0", "license": "MIT", "dependencies": { "@msgpack/msgpack": "~2.8.0", @@ -27,7 +27,7 @@ "typescript": "^4.9.5" }, "engines": { - "node": ">=12.0.0" + "node": ">=14.0.0" }, "peerDependencies": { "socket.io-adapter": "^2.5.2" @@ -346,9 +346,9 @@ } }, "node_modules/@redis/client": { - "version": "1.5.6", - "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.6.tgz", - "integrity": "sha512-dFD1S6je+A47Lj22jN/upVU2fj4huR7S9APd7/ziUXsIXDL+11GPYti4Suv5y8FuXaN+0ZG4JF+y1houEJ7ToA==", + "version": "1.5.14", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.14.tgz", + "integrity": "sha512-YGn0GqsRBFUQxklhY7v562VMOP0DcmlrHHs3IV1mFE3cbxe31IITUkqhBcIhVSI/2JqtWAJXg5mjV4aU+zD0HA==", "dev": true, "dependencies": { "cluster-key-slot": "1.1.2", @@ -360,36 +360,36 @@ } }, "node_modules/@redis/graph": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", - "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.1.tgz", + "integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==", "dev": true, "peerDependencies": { "@redis/client": "^1.0.0" } }, "node_modules/@redis/json": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", - "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.6.tgz", + "integrity": "sha512-rcZO3bfQbm2zPRpqo82XbW8zg4G/w4W3tI7X8Mqleq9goQjAGLL7q/1n1ZX4dXEAmORVZ4s1+uKLaUOg7LrUhw==", "dev": true, "peerDependencies": { "@redis/client": "^1.0.0" } }, "node_modules/@redis/search": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.2.tgz", - "integrity": "sha512-/cMfstG/fOh/SsE+4/BQGeuH/JJloeWuH+qJzM8dbxuWvdWibWAOAHHCZTMPhV3xIlH4/cUEIA8OV5QnYpaVoA==", + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.6.tgz", + "integrity": "sha512-mZXCxbTYKBQ3M2lZnEddwEAks0Kc7nauire8q20oA0oA/LoA+E/b5Y5KZn232ztPb1FkIGqo12vh3Lf+Vw5iTw==", "dev": true, "peerDependencies": { "@redis/client": "^1.0.0" } }, "node_modules/@redis/time-series": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.4.tgz", - "integrity": "sha512-ThUIgo2U/g7cCuZavucQTQzA9g9JbDDY2f64u3AbAoz/8vE2lt2U37LamDUVChhaDA3IRT9R6VvJwqnUfTJzng==", + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", "dev": true, "peerDependencies": { "@redis/client": "^1.0.0" @@ -2201,17 +2201,17 @@ } }, "node_modules/redis": { - "version": "4.6.5", - "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.5.tgz", - "integrity": "sha512-O0OWA36gDQbswOdUuAhRL6mTZpHFN525HlgZgDaVNgCJIAZR3ya06NTESb0R+TUZ+BFaDpz6NnnVvoMx9meUFg==", + "version": "4.6.13", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.13.tgz", + "integrity": "sha512-MHgkS4B+sPjCXpf+HfdetBwbRz6vCtsceTmw1pHNYJAsYxrfpOP6dz+piJWGos8wqG7qb3vj/Rrc5qOlmInUuA==", "dev": true, "dependencies": { "@redis/bloom": "1.2.0", - "@redis/client": "1.5.6", - "@redis/graph": "1.1.0", - "@redis/json": "1.0.4", - "@redis/search": "1.1.2", - "@redis/time-series": "1.0.4" + "@redis/client": "1.5.14", + "@redis/graph": "1.1.1", + "@redis/json": "1.0.6", + "@redis/search": "1.1.6", + "@redis/time-series": "1.0.5" } }, "node_modules/release-zalgo": { @@ -3137,9 +3137,9 @@ "requires": {} }, "@redis/client": { - "version": "1.5.6", - "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.6.tgz", - "integrity": "sha512-dFD1S6je+A47Lj22jN/upVU2fj4huR7S9APd7/ziUXsIXDL+11GPYti4Suv5y8FuXaN+0ZG4JF+y1houEJ7ToA==", + "version": "1.5.14", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.14.tgz", + "integrity": "sha512-YGn0GqsRBFUQxklhY7v562VMOP0DcmlrHHs3IV1mFE3cbxe31IITUkqhBcIhVSI/2JqtWAJXg5mjV4aU+zD0HA==", "dev": true, "requires": { "cluster-key-slot": "1.1.2", @@ -3148,30 +3148,30 @@ } }, "@redis/graph": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", - "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.1.tgz", + "integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==", "dev": true, "requires": {} }, "@redis/json": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", - "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.6.tgz", + "integrity": "sha512-rcZO3bfQbm2zPRpqo82XbW8zg4G/w4W3tI7X8Mqleq9goQjAGLL7q/1n1ZX4dXEAmORVZ4s1+uKLaUOg7LrUhw==", "dev": true, "requires": {} }, "@redis/search": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.2.tgz", - "integrity": "sha512-/cMfstG/fOh/SsE+4/BQGeuH/JJloeWuH+qJzM8dbxuWvdWibWAOAHHCZTMPhV3xIlH4/cUEIA8OV5QnYpaVoA==", + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.6.tgz", + "integrity": "sha512-mZXCxbTYKBQ3M2lZnEddwEAks0Kc7nauire8q20oA0oA/LoA+E/b5Y5KZn232ztPb1FkIGqo12vh3Lf+Vw5iTw==", "dev": true, "requires": {} }, "@redis/time-series": { - "version": "1.0.4", - "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.4.tgz", - "integrity": "sha512-ThUIgo2U/g7cCuZavucQTQzA9g9JbDDY2f64u3AbAoz/8vE2lt2U37LamDUVChhaDA3IRT9R6VvJwqnUfTJzng==", + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", "dev": true, "requires": {} }, @@ -4528,17 +4528,17 @@ } }, "redis": { - "version": "4.6.5", - "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.5.tgz", - "integrity": "sha512-O0OWA36gDQbswOdUuAhRL6mTZpHFN525HlgZgDaVNgCJIAZR3ya06NTESb0R+TUZ+BFaDpz6NnnVvoMx9meUFg==", + "version": "4.6.13", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.13.tgz", + "integrity": "sha512-MHgkS4B+sPjCXpf+HfdetBwbRz6vCtsceTmw1pHNYJAsYxrfpOP6dz+piJWGos8wqG7qb3vj/Rrc5qOlmInUuA==", "dev": true, "requires": { "@redis/bloom": "1.2.0", - "@redis/client": "1.5.6", - "@redis/graph": "1.1.0", - "@redis/json": "1.0.4", - "@redis/search": "1.1.2", - "@redis/time-series": "1.0.4" + "@redis/client": "1.5.14", + "@redis/graph": "1.1.1", + "@redis/json": "1.0.6", + "@redis/search": "1.1.6", + "@redis/time-series": "1.0.5" } }, "release-zalgo": { diff --git a/package.json b/package.json index 7bcf204..dbca8b4 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,9 @@ "format:check": "prettier --parser typescript --check lib/**/*.ts test/**/*.ts", "format:fix": "prettier --parser typescript --write lib/**/*.ts test/**/*.ts", "prepack": "npm run compile", - "test": "npm run format:check && npm run compile && nyc mocha --bail --require ts-node/register test/**/*.ts" + "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster", + "test:redis-standalone": "nyc mocha --require ts-node/register test/**/*.ts", + "test:redis-cluster": "REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" }, "dependencies": { "@msgpack/msgpack": "~2.8.0", diff --git a/test/util.ts b/test/util.ts index 1cdcef0..82f018a 100644 --- a/test/util.ts +++ b/test/util.ts @@ -1,7 +1,7 @@ import { Server } from "socket.io"; import { Socket as ServerSocket } from "socket.io/dist/socket"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; -import { createClient } from "redis"; +import { createClient, createCluster } from "redis"; import { createServer } from "http"; import { createAdapter } from "../lib"; import { AddressInfo } from "net"; @@ -31,6 +31,49 @@ interface TestContext { cleanup: () => void; } +if (process.env.REDIS_CLUSTER === "1") { + console.log("[INFO] testing in cluster mode"); +} else { + console.log("[INFO] testing in standalone mode"); +} + +async function initRedisClient() { + if (process.env.REDIS_CLUSTER === "1") { + const redisClient = createCluster({ + rootNodes: [ + { + url: "redis://localhost:7000", + }, + { + url: "redis://localhost:7001", + }, + { + url: "redis://localhost:7002", + }, + { + url: "redis://localhost:7003", + }, + { + url: "redis://localhost:7004", + }, + { + url: "redis://localhost:7005", + }, + ], + }); + + await redisClient.connect(); + + return redisClient; + } else { + const redisClient = createClient(); + + await redisClient.connect(); + + return redisClient; + } +} + export function setup() { const servers = []; const serverSockets = []; @@ -39,9 +82,7 @@ export function setup() { return new Promise(async (resolve) => { for (let i = 1; i <= NODES_COUNT; i++) { - const redisClient = createClient(); - - await redisClient.connect(); + const redisClient = await initRedisClient(); const httpServer = createServer(); const io = new Server(httpServer, { From 58faa1d9c8cf87282b8643ed0c1aa79322b81852 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Tue, 13 Feb 2024 17:36:01 +0100 Subject: [PATCH 03/11] feat: add support for the ioredis package Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/2 --- lib/adapter.ts | 42 ++++++-------- lib/util.ts | 91 ++++++++++++++++++++++++++++++ package-lock.json | 141 ++++++++++++++++++++++++++++++++++++++++++++++ package.json | 7 ++- test/util.ts | 73 +++++++++++++++++------- 5 files changed, 306 insertions(+), 48 deletions(-) diff --git a/lib/adapter.ts b/lib/adapter.ts index 6b32476..70a2bb7 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -1,6 +1,5 @@ import type { PrivateSessionId, Session } from "socket.io-adapter"; import { decode, encode } from "@msgpack/msgpack"; -import { commandOptions } from "redis"; import { ClusterAdapter, ClusterAdapterOptions, @@ -8,7 +7,7 @@ import { MessageType, } from "./cluster-adapter"; import debugModule from "debug"; -import { hasBinary } from "./util"; +import { hasBinary, XADD, XREAD } from "./util"; const debug = debugModule("socket.io-redis-streams-adapter"); @@ -61,23 +60,15 @@ export function createAdapter( ); let offset = "$"; let polling = false; + let shouldClose = false; async function poll() { try { - let response = await redisClient.xRead( - commandOptions({ - isolated: true, - }), - [ - { - key: options.streamName, - id: offset, - }, - ], - { - COUNT: options.readCount, - BLOCK: 5000, - } + let response = await XREAD( + redisClient, + options.streamName, + offset, + options.readCount ); if (response) { @@ -98,7 +89,7 @@ export function createAdapter( debug("something went wrong while consuming the stream: %s", e.message); } - if (namespaceToAdapters.size > 0 && redisClient.isOpen) { + if (namespaceToAdapters.size > 0 && !shouldClose) { poll(); } else { polling = false; @@ -111,6 +102,7 @@ export function createAdapter( if (!polling) { polling = true; + shouldClose = false; poll(); } @@ -119,6 +111,10 @@ export function createAdapter( adapter.close = () => { namespaceToAdapters.delete(nsp.name); + if (namespaceToAdapters.size === 0) { + shouldClose = true; + } + defaultClose.call(adapter); }; @@ -141,17 +137,11 @@ class RedisStreamsAdapter extends ClusterAdapter { override doPublish(message: ClusterMessage) { debug("publishing %o", message); - return this.#redisClient.xAdd( + return XADD( + this.#redisClient, this.#opts.streamName, - "*", RedisStreamsAdapter.encode(message), - { - TRIM: { - strategy: "MAXLEN", - strategyModifier: "~", - threshold: this.#opts.maxLen, - }, - } + this.#opts.maxLen ); } diff --git a/lib/util.ts b/lib/util.ts index 03c97e0..a86fcbb 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -1,4 +1,5 @@ import { randomBytes } from "crypto"; +import { commandOptions } from "redis"; export function hasBinary(obj: any, toJSON?: boolean): boolean { if (!obj || typeof obj !== "object") { @@ -34,3 +35,93 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean { export function randomId() { return randomBytes(8).toString("hex"); } + +/** + * Whether the client comes from the `redis` package + * + * @param redisClient + * + * @see https://github.com/redis/node-redis + */ +function isRedisV4Client(redisClient: any) { + return typeof redisClient.sSubscribe === "function"; +} + +/** + * @see https://redis.io/commands/xread/ + */ +export function XREAD( + redisClient: any, + streamName: string, + offset: string, + readCount: number +) { + if (isRedisV4Client(redisClient)) { + return redisClient.xRead( + commandOptions({ + isolated: true, + }), + [ + { + key: streamName, + id: offset, + }, + ], + { + COUNT: readCount, + BLOCK: 5000, + } + ); + } else { + return redisClient + .xread("BLOCK", 100, "COUNT", readCount, "STREAMS", streamName, offset) + .then((results) => { + if (results === null) { + return null; + } + return [ + { + messages: results[0][1].map((result) => { + const id = result[0]; + const inlineValues = result[1]; + const message = {}; + for (let i = 0; i < inlineValues.length; i += 2) { + message[inlineValues[i]] = inlineValues[i + 1]; + } + return { + id, + message, + }; + }), + }, + ]; + }); + } +} + +/** + * @see https://redis.io/commands/xadd/ + */ +export function XADD( + redisClient: any, + streamName: string, + payload: any, + maxLenThreshold: number +) { + if (isRedisV4Client(redisClient)) { + return redisClient.xAdd(streamName, "*", payload, { + TRIM: { + strategy: "MAXLEN", + strategyModifier: "~", + threshold: maxLenThreshold, + }, + }); + } else { + const args = [streamName, "MAXLEN", "~", maxLenThreshold, "*"]; + Object.keys(payload).forEach((k) => { + args.push(k, payload[k]); + }); + + return redisClient.xadd.call(redisClient, args); + } +} diff --git a/package-lock.json b/package-lock.json index d32e21c..3bbe63a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "@types/mocha": "^8.2.1", "@types/node": "^18.15.11", "expect.js": "0.3.1", + "ioredis": "^5.3.2", "mocha": "^10.1.0", "nyc": "^15.1.0", "prettier": "^2.8.7", @@ -278,6 +279,12 @@ "node": ">=12" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "node_modules/@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -863,6 +870,15 @@ "node": ">=8" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "dev": true, + "engines": { + "node": ">=0.10" + } + }, "node_modules/diff": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.0.0.tgz", @@ -1243,6 +1259,30 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "dev": true }, + "node_modules/ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dev": true, + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/is-binary-path": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz", @@ -1534,12 +1574,24 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "dev": true + }, "node_modules/lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha1-+wMJF/hqMTTlvJvsDWngAT3f7bI=", "dev": true }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "node_modules/log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -2214,6 +2266,27 @@ "@redis/time-series": "1.0.5" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "dev": true, + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dev": true, + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/release-zalgo": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/release-zalgo/-/release-zalgo-1.0.0.tgz", @@ -2420,6 +2493,12 @@ "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "dev": true }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "dev": true + }, "node_modules/string-width": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.0.tgz", @@ -3083,6 +3162,12 @@ "@jridgewell/trace-mapping": "0.3.9" } }, + "@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -3551,6 +3636,12 @@ "strip-bom": "^4.0.0" } }, + "denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "dev": true + }, "diff": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.0.0.tgz", @@ -3825,6 +3916,23 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "dev": true }, + "ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dev": true, + "requires": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + } + }, "is-binary-path": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz", @@ -4042,12 +4150,24 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, + "lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "dev": true + }, "lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha1-+wMJF/hqMTTlvJvsDWngAT3f7bI=", "dev": true }, + "lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -4541,6 +4661,21 @@ "@redis/time-series": "1.0.5" } }, + "redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "dev": true + }, + "redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dev": true, + "requires": { + "redis-errors": "^1.0.0" + } + }, "release-zalgo": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/release-zalgo/-/release-zalgo-1.0.0.tgz", @@ -4705,6 +4840,12 @@ "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "dev": true }, + "standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "dev": true + }, "string-width": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.0.tgz", diff --git a/package.json b/package.json index dbca8b4..6e98066 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,11 @@ "format:check": "prettier --parser typescript --check lib/**/*.ts test/**/*.ts", "format:fix": "prettier --parser typescript --write lib/**/*.ts test/**/*.ts", "prepack": "npm run compile", - "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster", + "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster && npm run test:ioredis-standalone && npm run test:ioredis-cluster", "test:redis-standalone": "nyc mocha --require ts-node/register test/**/*.ts", - "test:redis-cluster": "REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" + "test:redis-cluster": "REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts", + "test:ioredis-standalone": "REDIS_LIB=ioredis mocha --require ts-node/register test/**/*.ts", + "test:ioredis-cluster": "REDIS_LIB=ioredis REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" }, "dependencies": { "@msgpack/msgpack": "~2.8.0", @@ -33,6 +35,7 @@ "@types/mocha": "^8.2.1", "@types/node": "^18.15.11", "expect.js": "0.3.1", + "ioredis": "^5.3.2", "mocha": "^10.1.0", "nyc": "^15.1.0", "prettier": "^2.8.7", diff --git a/test/util.ts b/test/util.ts index 82f018a..7b4dd73 100644 --- a/test/util.ts +++ b/test/util.ts @@ -2,6 +2,7 @@ import { Server } from "socket.io"; import { Socket as ServerSocket } from "socket.io/dist/socket"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; import { createClient, createCluster } from "redis"; +import { Redis, Cluster } from "ioredis"; import { createServer } from "http"; import { createAdapter } from "../lib"; import { AddressInfo } from "net"; @@ -31,46 +32,78 @@ interface TestContext { cleanup: () => void; } -if (process.env.REDIS_CLUSTER === "1") { - console.log("[INFO] testing in cluster mode"); -} else { - console.log("[INFO] testing in standalone mode"); -} +const mode = process.env.REDIS_CLUSTER === "1" ? "cluster" : "standalone"; +const lib = process.env.REDIS_LIB || "redis"; + +console.log(`[INFO] testing in ${mode} mode with ${lib}`); async function initRedisClient() { if (process.env.REDIS_CLUSTER === "1") { - const redisClient = createCluster({ - rootNodes: [ + if (process.env.REDIS_LIB === "ioredis") { + return new Cluster([ { - url: "redis://localhost:7000", + host: "localhost", + port: 7000, }, { - url: "redis://localhost:7001", + host: "localhost", + port: 7001, }, { - url: "redis://localhost:7002", + host: "localhost", + port: 7002, }, { - url: "redis://localhost:7003", + host: "localhost", + port: 7003, }, { - url: "redis://localhost:7004", + host: "localhost", + port: 7004, }, { - url: "redis://localhost:7005", + host: "localhost", + port: 7005, }, - ], - }); + ]); + } else { + const redisClient = createCluster({ + rootNodes: [ + { + url: "redis://localhost:7000", + }, + { + url: "redis://localhost:7001", + }, + { + url: "redis://localhost:7002", + }, + { + url: "redis://localhost:7003", + }, + { + url: "redis://localhost:7004", + }, + { + url: "redis://localhost:7005", + }, + ], + }); - await redisClient.connect(); + await redisClient.connect(); - return redisClient; + return redisClient; + } } else { - const redisClient = createClient(); + if (process.env.REDIS_LIB === "ioredis") { + return new Redis(); + } else { + const redisClient = createClient(); - await redisClient.connect(); + await redisClient.connect(); - return redisClient; + return redisClient; + } } } From 3b163d2964e3f3ffa33fb12b0addbeee15f05429 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Tue, 13 Feb 2024 17:45:02 +0100 Subject: [PATCH 04/11] ci: add Node.js 20 in the test matrix Reference: https://github.com/nodejs/Release --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 566a07a..bceb766 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: matrix: node-version: - 14 - - 18 + - 20 services: redis: From 727876db0033c23006d835bad3f2e051b8e341ac Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Tue, 13 Feb 2024 17:45:55 +0100 Subject: [PATCH 05/11] ci: upgrade to actions/checkout@4 and actions/setup-node@4 Reference: https://github.blog/changelog/2023-09-22-github-actions-transitioning-from-node-16-to-node-20/ --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bceb766..b88b958 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,10 +40,10 @@ jobs: - "7000-7005:7000-7005" steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v3 + uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} From e75c8c4383bfec634684ba107a88b3c3248b0c41 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Tue, 13 Feb 2024 18:03:20 +0100 Subject: [PATCH 06/11] chore: make tests work on Windows Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/10 --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 6e98066..f45e3e0 100644 --- a/package.json +++ b/package.json @@ -14,8 +14,8 @@ "types": "./dist/index.d.ts", "scripts": { "compile": "tsc", - "format:check": "prettier --parser typescript --check lib/**/*.ts test/**/*.ts", - "format:fix": "prettier --parser typescript --write lib/**/*.ts test/**/*.ts", + "format:check": "prettier --parser typescript --check \"lib/**/*.ts\" \"test/**/*.ts\"", + "format:fix": "prettier --parser typescript --write \"lib/**/*.ts\" \"test/**/*.ts\"", "prepack": "npm run compile", "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster && npm run test:ioredis-standalone && npm run test:ioredis-cluster", "test:redis-standalone": "nyc mocha --require ts-node/register test/**/*.ts", From 310026e8551010844ae87aadb93ac7f8d19b191b Mon Sep 17 00:00:00 2001 From: Giulio Pulina Date: Fri, 16 Feb 2024 10:43:10 +0100 Subject: [PATCH 07/11] chore: fix configuration of Prettier and test env variables (#12) Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/10 --- package-lock.json | 28 ++++++++++++++++++++++++++++ package.json | 10 +++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3bbe63a..3e0c7c1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@types/expect.js": "^0.3.29", "@types/mocha": "^8.2.1", "@types/node": "^18.15.11", + "cross-env": "7.0.3", "expect.js": "0.3.1", "ioredis": "^5.3.2", "mocha": "^10.1.0", @@ -819,6 +820,24 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "dev": true }, + "node_modules/cross-env": { + "version": "7.0.3", + "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz", + "integrity": "sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==", + "dev": true, + "dependencies": { + "cross-spawn": "^7.0.1" + }, + "bin": { + "cross-env": "src/bin/cross-env.js", + "cross-env-shell": "src/bin/cross-env-shell.js" + }, + "engines": { + "node": ">=10.14", + "npm": ">=6", + "yarn": ">=1" + } + }, "node_modules/cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -3602,6 +3621,15 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "dev": true }, + "cross-env": { + "version": "7.0.3", + "resolved": "https://registry.npmjs.org/cross-env/-/cross-env-7.0.3.tgz", + "integrity": "sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==", + "dev": true, + "requires": { + "cross-spawn": "^7.0.1" + } + }, "cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", diff --git a/package.json b/package.json index f45e3e0..8ad6de1 100644 --- a/package.json +++ b/package.json @@ -19,9 +19,12 @@ "prepack": "npm run compile", "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster && npm run test:ioredis-standalone && npm run test:ioredis-cluster", "test:redis-standalone": "nyc mocha --require ts-node/register test/**/*.ts", - "test:redis-cluster": "REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts", - "test:ioredis-standalone": "REDIS_LIB=ioredis mocha --require ts-node/register test/**/*.ts", - "test:ioredis-cluster": "REDIS_LIB=ioredis REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" + "test:redis-cluster": "cross-env REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts", + "test:ioredis-standalone": "cross-env REDIS_LIB=ioredis mocha --require ts-node/register test/**/*.ts", + "test:ioredis-cluster": "cross-env REDIS_LIB=ioredis REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" + }, + "prettier": { + "endOfLine": "auto" }, "dependencies": { "@msgpack/msgpack": "~2.8.0", @@ -34,6 +37,7 @@ "@types/expect.js": "^0.3.29", "@types/mocha": "^8.2.1", "@types/node": "^18.15.11", + "cross-env": "7.0.3", "expect.js": "0.3.1", "ioredis": "^5.3.2", "mocha": "^10.1.0", From 44e10aec084113fc7066196e32bd5d5c95eea2d0 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 21 Feb 2024 14:42:51 +0100 Subject: [PATCH 08/11] test: increase timeout The tests were randomly failing in the CI. It might be interesting to have some kind of acknowledgement for the socketsJoin() method, something like: ```js const count = await io.socketsJoin("room123"); ``` --- test/socketsJoin.ts | 6 +++--- test/socketsLeave.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/socketsJoin.ts b/test/socketsJoin.ts index 60b22a7..2248d1e 100644 --- a/test/socketsJoin.ts +++ b/test/socketsJoin.ts @@ -17,7 +17,7 @@ describe("socketsJoin()", () => { it("makes all socket instances join the specified room", async () => { servers[0].socketsJoin("room1"); - await sleep(200); + await sleep(300); expect(serverSockets[0].rooms.has("room1")).to.be(true); expect(serverSockets[1].rooms.has("room1")).to.be(true); @@ -30,7 +30,7 @@ describe("socketsJoin()", () => { servers[0].in("room1").socketsJoin("room2"); - await sleep(200); + await sleep(300); expect(serverSockets[0].rooms.has("room2")).to.be(true); expect(serverSockets[1].rooms.has("room2")).to.be(false); @@ -40,7 +40,7 @@ describe("socketsJoin()", () => { it("makes the given socket instance join the specified room", async () => { servers[0].in(serverSockets[1].id).socketsJoin("room3"); - await sleep(200); + await sleep(300); expect(serverSockets[0].rooms.has("room3")).to.be(false); expect(serverSockets[1].rooms.has("room3")).to.be(true); diff --git a/test/socketsLeave.ts b/test/socketsLeave.ts index a8904ca..13be4fd 100644 --- a/test/socketsLeave.ts +++ b/test/socketsLeave.ts @@ -20,7 +20,7 @@ describe("socketsLeave()", () => { servers[0].socketsLeave("room1"); - await sleep(200); + await sleep(300); expect(serverSockets[0].rooms.has("room1")).to.be(false); expect(serverSockets[1].rooms.has("room1")).to.be(false); @@ -34,7 +34,7 @@ describe("socketsLeave()", () => { servers[0].in("room1").socketsLeave("room2"); - await sleep(200); + await sleep(300); expect(serverSockets[0].rooms.has("room2")).to.be(false); expect(serverSockets[1].rooms.has("room2")).to.be(false); @@ -48,7 +48,7 @@ describe("socketsLeave()", () => { servers[0].in(serverSockets[1].id).socketsLeave("room3"); - await sleep(200); + await sleep(300); expect(serverSockets[0].rooms.has("room3")).to.be(true); expect(serverSockets[1].rooms.has("room3")).to.be(false); From 8ff0413a1858e6a76675be35be28b616be3f5504 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 21 Feb 2024 14:30:46 +0100 Subject: [PATCH 09/11] refactor: use the ClusterAdapter class from socket.io-adapter package The ClusterAdapter class has been moved to [1], so that this adapter only needs to implement to pub/sub mechanism. Also, [2] should reduce the number of "timeout reached: only x responses received out of y" errors, since the fetchSockets() requests will now succeed even if a server leaves the cluster. [1]: https://github.com/socketio/socket.io-adapter [2]: https://github.com/socketio/socket.io-adapter/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/6 --- lib/adapter.ts | 69 +++-- lib/cluster-adapter.ts | 556 ----------------------------------------- package-lock.json | 97 ++++--- package.json | 2 +- test/serverSideEmit.ts | 25 +- 5 files changed, 134 insertions(+), 615 deletions(-) delete mode 100644 lib/cluster-adapter.ts diff --git a/lib/adapter.ts b/lib/adapter.ts index 70a2bb7..b6b74ac 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -1,11 +1,12 @@ -import type { PrivateSessionId, Session } from "socket.io-adapter"; -import { decode, encode } from "@msgpack/msgpack"; import { - ClusterAdapter, - ClusterAdapterOptions, - ClusterMessage, - MessageType, -} from "./cluster-adapter"; + ClusterAdapterWithHeartbeat, + type ClusterMessage, + type PrivateSessionId, + type Session, + type ServerId, + type ClusterResponse, +} from "socket.io-adapter"; +import { decode, encode } from "@msgpack/msgpack"; import debugModule from "debug"; import { hasBinary, XADD, XREAD } from "./util"; @@ -13,7 +14,21 @@ const debug = debugModule("socket.io-redis-streams-adapter"); const RESTORE_SESSION_MAX_XRANGE_CALLS = 100; -export interface RedisStreamsAdapterOptions extends ClusterAdapterOptions { +// TODO ClusterAdapterOptions should be exported by the socket.io-adapter package +interface ClusterAdapterOptions { + /** + * The number of ms between two heartbeats. + * @default 5_000 + */ + heartbeatInterval?: number; + /** + * The number of ms without heartbeat before we consider a node down. + * @default 10_000 + */ + heartbeatTimeout?: number; +} + +export interface RedisStreamsAdapterOptions { /** * The name of the Redis stream. */ @@ -45,7 +60,7 @@ interface RawClusterMessage { */ export function createAdapter( redisClient: any, - opts?: RedisStreamsAdapterOptions + opts?: RedisStreamsAdapterOptions & ClusterAdapterOptions ) { const namespaceToAdapters = new Map(); const options = Object.assign( @@ -122,16 +137,20 @@ export function createAdapter( }; } -class RedisStreamsAdapter extends ClusterAdapter { +class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { readonly #redisClient: any; readonly #opts: Required; - constructor(nsp, redisClient, opts: Required) { + constructor( + nsp, + redisClient, + opts: Required & ClusterAdapterOptions + ) { super(nsp, opts); this.#redisClient = redisClient; this.#opts = opts; - this.initHeartbeat(); + this.init(); } override doPublish(message: ClusterMessage) { @@ -145,6 +164,14 @@ class RedisStreamsAdapter extends ClusterAdapter { ); } + protected doPublishResponse( + requesterUid: ServerId, + response: ClusterResponse + ): Promise { + // @ts-ignore + return this.doPublish(response); + } + static encode(message: ClusterMessage): RawClusterMessage { const rawMessage: RawClusterMessage = { uid: message.uid, @@ -152,18 +179,23 @@ class RedisStreamsAdapter extends ClusterAdapter { type: message.type.toString(), }; + // @ts-ignore if (message.data) { + // TODO MessageType should be exported by the socket.io-adapter package const mayContainBinary = [ - MessageType.BROADCAST, - MessageType.BROADCAST_ACK, - MessageType.FETCH_SOCKETS_RESPONSE, - MessageType.SERVER_SIDE_EMIT, - MessageType.SERVER_SIDE_EMIT_RESPONSE, + 3, // MessageType.BROADCAST, + 8, // MessageType.FETCH_SOCKETS_RESPONSE, + 9, // MessageType.SERVER_SIDE_EMIT, + 10, // MessageType.SERVER_SIDE_EMIT_RESPONSE, + 12, // MessageType.BROADCAST_ACK, ].includes(message.type); + // @ts-ignore if (mayContainBinary && hasBinary(message.data)) { + // @ts-ignore rawMessage.data = Buffer.from(encode(message.data)).toString("base64"); } else { + // @ts-ignore rawMessage.data = JSON.stringify(message.data); } } @@ -191,8 +223,10 @@ class RedisStreamsAdapter extends ClusterAdapter { if (rawMessage.data) { if (rawMessage.data.startsWith("{")) { + // @ts-ignore message.data = JSON.parse(rawMessage.data); } else { + // @ts-ignore message.data = decode(Buffer.from(rawMessage.data, "base64")) as Record< string, unknown @@ -261,6 +295,7 @@ class RedisStreamsAdapter extends ClusterAdapter { if (entry.message.nsp === this.nsp.name && entry.message.type === "3") { const message = RedisStreamsAdapter.decode(entry.message); + // @ts-ignore if (shouldIncludePacket(session.rooms, message.data.opts)) { // @ts-ignore session.missedPackets.push(message.data.packet.data); diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts deleted file mode 100644 index 6d49b1c..0000000 --- a/lib/cluster-adapter.ts +++ /dev/null @@ -1,556 +0,0 @@ -import { Adapter, BroadcastOptions, Room } from "socket.io-adapter"; -import debugModule from "debug"; -import { randomId } from "./util"; - -const debug = debugModule("socket.io-adapter"); -const EMITTER_UID = "emitter"; -const DEFAULT_TIMEOUT = 5000; - -export interface ClusterAdapterOptions { - /** - * The number of ms between two heartbeats. - * @default 5_000 - */ - heartbeatInterval?: number; - /** - * The number of ms without heartbeat before we consider a node down. - * @default 10_000 - */ - heartbeatTimeout?: number; -} - -export enum MessageType { - INITIAL_HEARTBEAT = 1, - HEARTBEAT, - BROADCAST, - SOCKETS_JOIN, - SOCKETS_LEAVE, - DISCONNECT_SOCKETS, - FETCH_SOCKETS, - FETCH_SOCKETS_RESPONSE, - SERVER_SIDE_EMIT, - SERVER_SIDE_EMIT_RESPONSE, - BROADCAST_CLIENT_COUNT, - BROADCAST_ACK, -} - -export interface ClusterMessage { - uid: string; - nsp: string; - type: MessageType; - data?: Record; -} - -interface ClusterRequest { - type: MessageType; - resolve: Function; - timeout: NodeJS.Timeout; - expected: number; - current: number; - responses: any[]; -} - -interface ClusterAckRequest { - clientCountCallback: (clientCount: number) => void; - ack: (...args: any[]) => void; -} - -function encodeOptions(opts: BroadcastOptions) { - return { - rooms: [...opts.rooms], - except: [...opts.except], - flags: opts.flags, - }; -} - -function decodeOptions(opts): BroadcastOptions { - return { - rooms: new Set(opts.rooms), - except: new Set(opts.except), - flags: opts.flags, - }; -} - -export abstract class ClusterAdapter extends Adapter { - readonly #opts: Required; - readonly #uid: string; - - #heartbeatTimer: NodeJS.Timeout; - #nodesMap: Map = new Map(); // uid => timestamp of last message - #requests: Map = new Map(); - #ackRequests: Map = new Map(); - - protected constructor(nsp, opts: Required) { - super(nsp); - this.#opts = opts; - this.#uid = randomId(); - } - - protected initHeartbeat() { - this.#publish({ - type: MessageType.INITIAL_HEARTBEAT, - }); - } - - #scheduleHeartbeat() { - if (this.#heartbeatTimer) { - clearTimeout(this.#heartbeatTimer); - } - this.#heartbeatTimer = setTimeout(() => { - this.#publish({ - type: MessageType.HEARTBEAT, - }); - }, this.#opts.heartbeatInterval); - } - - override close(): Promise | void { - clearTimeout(this.#heartbeatTimer); - } - - public async onMessage(message: ClusterMessage, offset: string) { - if (message.uid === this.#uid) { - return debug("ignore message from self"); - } - - if (message.uid && message.uid !== EMITTER_UID) { - this.#nodesMap.set(message.uid, Date.now()); - } - - debug("new event of type %d from %s", message.type, message.uid); - - switch (message.type) { - case MessageType.INITIAL_HEARTBEAT: - this.#publish({ - type: MessageType.HEARTBEAT, - }); - break; - - case MessageType.BROADCAST: { - const withAck = message.data.requestId !== undefined; - if (withAck) { - super.broadcastWithAck( - message.data.packet, - decodeOptions(message.data.opts), - (clientCount) => { - debug("waiting for %d client acknowledgements", clientCount); - this.#publish({ - type: MessageType.BROADCAST_CLIENT_COUNT, - data: { - requestId: message.data.requestId, - clientCount, - }, - }); - }, - (arg) => { - debug("received acknowledgement with value %j", arg); - this.#publish({ - type: MessageType.BROADCAST_ACK, - data: { - requestId: message.data.requestId, - packet: arg, - }, - }); - } - ); - } else { - const packet = message.data.packet; - const opts = decodeOptions(message.data.opts); - - this.#addOffsetIfNecessary(packet, opts, offset); - - super.broadcast(packet, opts); - } - break; - } - - case MessageType.BROADCAST_CLIENT_COUNT: { - const request = this.#ackRequests.get(message.data.requestId as string); - request?.clientCountCallback(message.data.clientCount as number); - break; - } - - case MessageType.BROADCAST_ACK: { - const request = this.#ackRequests.get(message.data.requestId as string); - request?.ack(message.data.packet); - break; - } - - case MessageType.SOCKETS_JOIN: - super.addSockets( - decodeOptions(message.data.opts), - message.data.rooms as string[] - ); - break; - - case MessageType.SOCKETS_LEAVE: - super.delSockets( - decodeOptions(message.data.opts), - message.data.rooms as string[] - ); - break; - - case MessageType.DISCONNECT_SOCKETS: - super.disconnectSockets( - decodeOptions(message.data.opts), - message.data.close as boolean - ); - break; - - case MessageType.FETCH_SOCKETS: { - debug("calling fetchSockets with opts %j", message.data.opts); - const localSockets = await super.fetchSockets( - decodeOptions(message.data.opts) - ); - - this.#publish({ - type: MessageType.FETCH_SOCKETS_RESPONSE, - data: { - requestId: message.data.requestId, - sockets: localSockets.map((socket) => { - // remove sessionStore from handshake, as it may contain circular references - const { sessionStore, ...handshake } = socket.handshake; - return { - id: socket.id, - handshake, - rooms: [...socket.rooms], - data: socket.data, - }; - }), - }, - }); - break; - } - - case MessageType.FETCH_SOCKETS_RESPONSE: { - const requestId = message.data.requestId as string; - const request = this.#requests.get(requestId); - - if (!request) { - return; - } - - request.current++; - (message.data.sockets as any[]).forEach((socket) => - request.responses.push(socket) - ); - - if (request.current === request.expected) { - clearTimeout(request.timeout); - request.resolve(request.responses); - this.#requests.delete(requestId); - } - break; - } - - case MessageType.SERVER_SIDE_EMIT: { - const packet = message.data.packet as unknown[]; - const withAck = message.data.requestId !== undefined; - if (!withAck) { - this.nsp._onServerSideEmit(packet); - return; - } - let called = false; - const callback = (arg: any) => { - // only one argument is expected - if (called) { - return; - } - called = true; - debug("calling acknowledgement with %j", arg); - this.#publish({ - type: MessageType.SERVER_SIDE_EMIT_RESPONSE, - data: { - requestId: message.data.requestId, - packet: arg, - }, - }); - }; - - packet.push(callback); - this.nsp._onServerSideEmit(packet); - break; - } - - case MessageType.SERVER_SIDE_EMIT_RESPONSE: { - const requestId = message.data.requestId as string; - const request = this.#requests.get(requestId); - - if (!request) { - return; - } - - request.current++; - request.responses.push(message.data.packet); - - if (request.current === request.expected) { - clearTimeout(request.timeout); - request.resolve(null, request.responses); - this.#requests.delete(requestId); - } - } - } - } - - override async broadcast(packet: any, opts: BroadcastOptions) { - const onlyLocal = opts.flags?.local; - - if (!onlyLocal) { - try { - const offset = await this.#publish({ - type: MessageType.BROADCAST, - data: { - packet, - opts: encodeOptions(opts), - }, - }); - this.#addOffsetIfNecessary(packet, opts, offset); - } catch (e) { - return debug("error while broadcasting message: %s", e.message); - } - } - - super.broadcast(packet, opts); - } - - /** - * Adds an offset at the end of the data array in order to allow the client to receive any missed packets when it - * reconnects after a temporary disconnection. - * - * @param packet - * @param opts - * @param offset - * @private - */ - #addOffsetIfNecessary(packet: any, opts: BroadcastOptions, offset: string) { - if (!this.nsp.server.opts.connectionStateRecovery) { - return; - } - const isEventPacket = packet.type === 2; - // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and - // restored on another server upon reconnection - const withoutAcknowledgement = packet.id === undefined; - const notVolatile = opts.flags?.volatile === undefined; - - if (isEventPacket && withoutAcknowledgement && notVolatile) { - packet.data.push(offset); - } - } - - override broadcastWithAck( - packet: any, - opts: BroadcastOptions, - clientCountCallback: (clientCount: number) => void, - ack: (...args: any[]) => void - ) { - const onlyLocal = opts?.flags?.local; - if (!onlyLocal) { - const requestId = randomId(); - - this.#publish({ - type: MessageType.BROADCAST, - data: { - packet, - requestId, - opts: encodeOptions(opts), - }, - }); - - this.#ackRequests.set(requestId, { - clientCountCallback, - ack, - }); - - // we have no way to know at this level whether the server has received an acknowledgement from each client, so we - // will simply clean up the ackRequests map after the given delay - setTimeout(() => { - this.#ackRequests.delete(requestId); - }, opts.flags!.timeout); - } - - super.broadcastWithAck(packet, opts, clientCountCallback, ack); - } - - override serverCount(): Promise { - return Promise.resolve(1 + this.#nodesMap.size); - } - - /** - * - * @param opts - * @param rooms - */ - override addSockets(opts: BroadcastOptions, rooms: Room[]) { - super.addSockets(opts, rooms); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.#publish({ - type: MessageType.SOCKETS_JOIN, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); - } - - override delSockets(opts: BroadcastOptions, rooms: Room[]) { - super.delSockets(opts, rooms); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.#publish({ - type: MessageType.SOCKETS_LEAVE, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); - } - - override disconnectSockets(opts: BroadcastOptions, close: boolean) { - super.disconnectSockets(opts, close); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.#publish({ - type: MessageType.DISCONNECT_SOCKETS, - data: { - opts: encodeOptions(opts), - close, - }, - }); - } - - #getExpectedResponseCount() { - this.#nodesMap.forEach((lastSeen, uid) => { - const nodeSeemsDown = Date.now() - lastSeen > this.#opts.heartbeatTimeout; - if (nodeSeemsDown) { - debug("node %s seems down", uid); - this.#nodesMap.delete(uid); - } - }); - return this.#nodesMap.size; - } - - async fetchSockets(opts: BroadcastOptions): Promise { - const localSockets = await super.fetchSockets(opts); - const expectedResponseCount = this.#getExpectedResponseCount(); - - if (opts.flags?.local || expectedResponseCount === 0) { - return localSockets; - } - - const requestId = randomId(); - - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - const storedRequest = this.#requests.get(requestId); - if (storedRequest) { - reject( - new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` - ) - ); - this.#requests.delete(requestId); - } - }, opts.flags.timeout || DEFAULT_TIMEOUT); - - const storedRequest = { - type: MessageType.FETCH_SOCKETS, - resolve, - timeout, - current: 0, - expected: expectedResponseCount, - responses: localSockets, - }; - this.#requests.set(requestId, storedRequest); - - this.#publish({ - type: MessageType.FETCH_SOCKETS, - data: { - opts: encodeOptions(opts), - requestId, - }, - }); - }); - } - - override serverSideEmit(packet: any[]) { - const withAck = typeof packet[packet.length - 1] === "function"; - - if (!withAck) { - return this.#publish({ - type: MessageType.SERVER_SIDE_EMIT, - data: { - packet, - }, - }); - } - - const ack = packet.pop(); - const expectedResponseCount = this.#getExpectedResponseCount(); - - debug( - 'waiting for %d responses to "serverSideEmit" request', - expectedResponseCount - ); - - if (expectedResponseCount <= 0) { - return ack(null, []); - } - - const requestId = randomId(); - - const timeout = setTimeout(() => { - const storedRequest = this.#requests.get(requestId); - if (storedRequest) { - ack( - new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` - ), - storedRequest.responses - ); - this.#requests.delete(requestId); - } - }, DEFAULT_TIMEOUT); - - const storedRequest = { - type: MessageType.SERVER_SIDE_EMIT, - resolve: ack, - timeout, - current: 0, - expected: expectedResponseCount, - responses: [], - }; - this.#requests.set(requestId, storedRequest); - - this.#publish({ - type: MessageType.SERVER_SIDE_EMIT, - data: { - requestId, // the presence of this attribute defines whether an acknowledgement is needed - packet, - }, - }); - } - - #publish(message: Omit) { - this.#scheduleHeartbeat(); - - return this.doPublish({ - uid: this.#uid, - nsp: this.nsp.name, - ...message, - }); - } - - abstract doPublish(message: ClusterMessage): Promise; -} diff --git a/package-lock.json b/package-lock.json index 3e0c7c1..4d5d2ab 100644 --- a/package-lock.json +++ b/package-lock.json @@ -440,9 +440,9 @@ "dev": true }, "node_modules/@types/cors": { - "version": "2.8.13", - "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.13.tgz", - "integrity": "sha512-RG8AStHlUiV5ysZQKq97copd2UmVYw3/pRMLefISZ3S1hK104Cwm7iLQ3fTKx+lsUH2CE8FlLaYeEA2LSeqYUA==", + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", "dev": true, "dependencies": { "@types/node": "*" @@ -914,9 +914,9 @@ "dev": true }, "node_modules/engine.io": { - "version": "6.4.1", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.4.1.tgz", - "integrity": "sha512-JFYQurD/nbsA5BSPmbaOSLa3tSVj8L6o4srSwXXY3NqE+gGUNmmPTbhn8tjzcCtSqhFgIeqef81ngny8JM25hw==", + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", "dev": true, "dependencies": { "@types/cookie": "^0.4.1", @@ -927,11 +927,11 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.0.3", + "engine.io-parser": "~5.2.1", "ws": "~8.11.0" }, "engines": { - "node": ">=10.0.0" + "node": ">=10.2.0" } }, "node_modules/engine.io-client": { @@ -956,6 +956,15 @@ "node": ">=10.0.0" } }, + "node_modules/engine.io/node_modules/engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/es6-error": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/es6-error/-/es6-error-4.1.1.tgz", @@ -2428,27 +2437,29 @@ "dev": true }, "node_modules/socket.io": { - "version": "4.6.1", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.6.1.tgz", - "integrity": "sha512-KMcaAi4l/8+xEjkRICl6ak8ySoxsYG+gG6/XfRCPJPQ/haCRIJBTL4wIl8YCsmtaBovcAXGLOShyVWQ/FG8GZA==", + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", "dev": true, "dependencies": { "accepts": "~1.3.4", "base64id": "~2.0.0", + "cors": "~2.8.5", "debug": "~4.3.2", - "engine.io": "~6.4.1", + "engine.io": "~6.5.2", "socket.io-adapter": "~2.5.2", - "socket.io-parser": "~4.2.1" + "socket.io-parser": "~4.2.4" }, "engines": { - "node": ">=10.0.0" + "node": ">=10.2.0" } }, "node_modules/socket.io-adapter": { - "version": "2.5.2", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", - "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "version": "2.5.3", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.3.tgz", + "integrity": "sha512-OtkQtynXUM0JSEwmI6YlEJ5hU9kpDUVjda0hx8QVffKhqum53xhynH8eTCyjHSfI8FiJnyfK8I3Dlc88Jr81Dg==", "dependencies": { + "debug": "~4.3.4", "ws": "~8.11.0" } }, @@ -2468,9 +2479,9 @@ } }, "node_modules/socket.io-parser": { - "version": "4.2.2", - "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.2.tgz", - "integrity": "sha512-DJtziuKypFkMMHCm2uIshOYC7QaylbtzQwiMYDuCKy3OPkjLzu4B2vAhTlqipRHHzrI0NJeBAizTK7X+6m1jVw==", + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", "dev": true, "dependencies": { "@socket.io/component-emitter": "~3.1.0", @@ -3316,9 +3327,9 @@ "dev": true }, "@types/cors": { - "version": "2.8.13", - "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.13.tgz", - "integrity": "sha512-RG8AStHlUiV5ysZQKq97copd2UmVYw3/pRMLefISZ3S1hK104Cwm7iLQ3fTKx+lsUH2CE8FlLaYeEA2LSeqYUA==", + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", "dev": true, "requires": { "@types/node": "*" @@ -3683,9 +3694,9 @@ "dev": true }, "engine.io": { - "version": "6.4.1", - "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.4.1.tgz", - "integrity": "sha512-JFYQurD/nbsA5BSPmbaOSLa3tSVj8L6o4srSwXXY3NqE+gGUNmmPTbhn8tjzcCtSqhFgIeqef81ngny8JM25hw==", + "version": "6.5.4", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.5.4.tgz", + "integrity": "sha512-KdVSDKhVKyOi+r5uEabrDLZw2qXStVvCsEB/LN3mw4WFi6Gx50jTyuxYVCwAAC0U46FdnzP/ScKRBTXb/NiEOg==", "dev": true, "requires": { "@types/cookie": "^0.4.1", @@ -3696,8 +3707,16 @@ "cookie": "~0.4.1", "cors": "~2.8.5", "debug": "~4.3.1", - "engine.io-parser": "~5.0.3", + "engine.io-parser": "~5.2.1", "ws": "~8.11.0" + }, + "dependencies": { + "engine.io-parser": { + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.2.tgz", + "integrity": "sha512-RcyUFKA93/CXH20l4SoVvzZfrSDMOTUS3bWVpTt2FuFP+XYrL8i8oonHP7WInRyVHXh0n/ORtoeiE1os+8qkSw==", + "dev": true + } } }, "engine.io-client": { @@ -4799,24 +4818,26 @@ "dev": true }, "socket.io": { - "version": "4.6.1", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.6.1.tgz", - "integrity": "sha512-KMcaAi4l/8+xEjkRICl6ak8ySoxsYG+gG6/XfRCPJPQ/haCRIJBTL4wIl8YCsmtaBovcAXGLOShyVWQ/FG8GZA==", + "version": "4.7.4", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", + "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", "dev": true, "requires": { "accepts": "~1.3.4", "base64id": "~2.0.0", + "cors": "~2.8.5", "debug": "~4.3.2", - "engine.io": "~6.4.1", + "engine.io": "~6.5.2", "socket.io-adapter": "~2.5.2", - "socket.io-parser": "~4.2.1" + "socket.io-parser": "~4.2.4" } }, "socket.io-adapter": { - "version": "2.5.2", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", - "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "version": "2.5.3", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.3.tgz", + "integrity": "sha512-OtkQtynXUM0JSEwmI6YlEJ5hU9kpDUVjda0hx8QVffKhqum53xhynH8eTCyjHSfI8FiJnyfK8I3Dlc88Jr81Dg==", "requires": { + "debug": "~4.3.4", "ws": "~8.11.0" } }, @@ -4833,9 +4854,9 @@ } }, "socket.io-parser": { - "version": "4.2.2", - "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.2.tgz", - "integrity": "sha512-DJtziuKypFkMMHCm2uIshOYC7QaylbtzQwiMYDuCKy3OPkjLzu4B2vAhTlqipRHHzrI0NJeBAizTK7X+6m1jVw==", + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", "dev": true, "requires": { "@socket.io/component-emitter": "~3.1.0", diff --git a/package.json b/package.json index 8ad6de1..0b4673c 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "debug": "~4.3.1" }, "peerDependencies": { - "socket.io-adapter": "^2.5.2" + "socket.io-adapter": "^2.5.3" }, "devDependencies": { "@types/expect.js": "^0.3.29", diff --git a/test/serverSideEmit.ts b/test/serverSideEmit.ts index ae51486..e7667bc 100644 --- a/test/serverSideEmit.ts +++ b/test/serverSideEmit.ts @@ -61,9 +61,7 @@ describe("serverSideEmit()", function () { this.timeout(6000); servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err.message).to.be( - "timeout reached: only 1 responses received out of 2" - ); + expect(err.message).to.be("timeout reached: missing 1 responses"); expect(response).to.be.an(Array); expect(response).to.contain(2); done(); @@ -81,4 +79,25 @@ describe("serverSideEmit()", function () { // do nothing }); }); + + it("succeeds even if an instance leaves the cluster", (done) => { + servers[0].on("hello", () => { + done(new Error("should not happen")); + }); + + servers[1].on("hello", (cb) => { + cb(2); + }); + + servers[2].on("hello", (cb) => { + servers[2].of("/").adapter.close(); + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); }); From 18985863d8eca842c59fa57e396a72c01123e61d Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 21 Feb 2024 15:03:45 +0100 Subject: [PATCH 10/11] feat: allow to modify the Redis key for the session Related: https://github.com/socketio/socket.io-redis-streams-adapter/issues/9 --- README.md | 15 ++++++++------- lib/adapter.ts | 12 ++++++++++-- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5e94008..239e962 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,14 @@ io.listen(3000); ## Options -| Name | Description | Default value | -|---------------------|--------------------------------------------------------------------|---------------| -| `streamName` | The name of the Redis stream. | `socket.io` | -| `maxLen` | The maximum size of the stream. Almost exact trimming (~) is used. | `10_000` | -| `readCount` | The number of elements to fetch per XREAD call. | `100` | -| `heartbeatInterval` | The number of ms between two heartbeats. | `5_000` | -| `heartbeatTimeout` | The number of ms without heartbeat before we consider a node down. | `10_000` | +| Name | Description | Default value | +|---------------------|-------------------------------------------------------------------------------------------------------------------|----------------| +| `streamName` | The name of the Redis stream. | `socket.io` | +| `maxLen` | The maximum size of the stream. Almost exact trimming (~) is used. | `10_000` | +| `readCount` | The number of elements to fetch per XREAD call. | `100` | +| `sessionKeyPrefix` | The prefix of the key used to store the Socket.IO session, when the connection state recovery feature is enabled. | `sio:session:` | +| `heartbeatInterval` | The number of ms between two heartbeats. | `5_000` | +| `heartbeatTimeout` | The number of ms without heartbeat before we consider a node down. | `10_000` | ## How it works diff --git a/lib/adapter.ts b/lib/adapter.ts index b6b74ac..e28cda0 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -31,6 +31,7 @@ interface ClusterAdapterOptions { export interface RedisStreamsAdapterOptions { /** * The name of the Redis stream. + * @default "socket.io" */ streamName?: string; /** @@ -43,6 +44,11 @@ export interface RedisStreamsAdapterOptions { * @default 100 */ readCount?: number; + /** + * The prefix of the key used to store the Socket.IO session, when the connection state recovery feature is enabled. + * @default "sio:session:" + */ + sessionKeyPrefix?: string; } interface RawClusterMessage { @@ -68,6 +74,7 @@ export function createAdapter( streamName: "socket.io", maxLen: 10_000, readCount: 100, + sessionKeyPrefix: "sio:session:", heartbeatInterval: 5_000, heartbeatTimeout: 10_000, }, @@ -239,9 +246,10 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { override persistSession(session) { debug("persisting session %o", session); + const sessionKey = this.#opts.sessionKeyPrefix + session.pid; const encodedSession = Buffer.from(encode(session)).toString("base64"); - this.#redisClient.set(`sio:session:${session.pid}`, encodedSession, { + this.#redisClient.set(sessionKey, encodedSession, { PX: this.nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration, }); } @@ -256,7 +264,7 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { return Promise.reject("invalid offset"); } - const sessionKey = `sio:session:${pid}`; + const sessionKey = this.#opts.sessionKeyPrefix + pid; const [rawSession, offsetExists] = await this.#redisClient .multi() From 88fb39ff3efad47258c3ff5ee30abe213f74eedb Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 21 Feb 2024 15:32:37 +0100 Subject: [PATCH 11/11] chore(release): 0.2.0 Diff: https://github.com/socketio/socket.io-redis-streams-adapter/compare/0.1.0...0.2.0 --- CHANGELOG.md | 10 ++++++++++ package.json | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39eb360..c145f38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,19 @@ # History +- [0.2.0](#020-2024-02-21) (February 2024) - [0.1.0](#010-2023-04-06) (April 2023) # Release notes +## [0.2.0](https://github.com/socketio/socket.io-redis-streams-adapter/compare/0.1.0...0.2.0) (2024-02-21) + + +### Features + +* add support for the ioredis package ([58faa1d](https://github.com/socketio/socket.io-redis-streams-adapter/commit/58faa1d9c8cf87282b8643ed0c1aa79322b81852)) +* allow to modify the Redis key for the session ([1898586](https://github.com/socketio/socket.io-redis-streams-adapter/commit/18985863d8eca842c59fa57e396a72c01123e61d)) + + ## 0.1.0 (2023-04-06) First release! diff --git a/package.json b/package.json index 0b4673c..083fb62 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@socket.io/redis-streams-adapter", - "version": "0.1.0", + "version": "0.2.0", "description": "The Socket.IO adapter based on Redis Streams, allowing to broadcast events between several Socket.IO servers", "license": "MIT", "repository": {