diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ddef8d..7477a68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,20 @@ # History -- [0.2.1](#021-2024-03-11) (Mar 2024) -- [0.2.0](#020-2024-02-21) (Feb 2024) -- [0.1.0](#010-2023-04-06) (Apr 2023) +| Version | Date | +|--------------------------|---------------| +| [0.2.2](#022-2024-05-08) | May 2024 | +| [0.2.1](#021-2024-03-11) | March 2024 | +| [0.2.0](#020-2024-02-21) | February 2024 | +| [0.1.0](#010-2023-04-06) | April 2023 | # Release notes +## [0.2.2](https://github.com/socketio/socket.io-redis-streams-adapter/compare/0.2.1...0.2.2) (2024-05-08) + +The `redis` package is no longer required if you use the `ioredis` package to create a Redis client. + + + ## [0.2.1](https://github.com/socketio/socket.io-redis-streams-adapter/compare/0.2.0...0.2.1) (2024-03-11) diff --git a/README.md b/README.md index 243451b..515d6e5 100644 --- a/README.md +++ b/README.md @@ -2,28 +2,9 @@ The `@socket.io/redis-streams-adapter` package allows broadcasting packets between multiple Socket.IO servers. -Supported features: - -- [broadcasting](https://socket.io/docs/v4/broadcasting-events/) -- [utility methods](https://socket.io/docs/v4/server-instance/#Utility-methods) - - [`socketsJoin`](https://socket.io/docs/v4/server-instance/#socketsJoin) - - [`socketsLeave`](https://socket.io/docs/v4/server-instance/#socketsLeave) - - [`disconnectSockets`](https://socket.io/docs/v4/server-instance/#disconnectSockets) - - [`fetchSockets`](https://socket.io/docs/v4/server-instance/#fetchSockets) - - [`serverSideEmit`](https://socket.io/docs/v4/server-instance/#serverSideEmit) -- [connection state recovery](https://socket.io/docs/v4/connection-state-recovery) - -Related packages: - -- Redis adapter: https://github.com/socketio/socket.io-redis-adapter/ -- Redis emitter: https://github.com/socketio/socket.io-redis-emitter/ -- MongoDB adapter: https://github.com/socketio/socket.io-mongo-adapter/ -- MongoDB emitter: https://github.com/socketio/socket.io-mongo-emitter/ -- Postgres adapter: https://github.com/socketio/socket.io-postgres-adapter/ -- Postgres emitter: https://github.com/socketio/socket.io-postgres-emitter/ - **Table of contents** +- [Supported features](#supported-features) - [Installation](#installation) - [Usage](#usage) - [With the `redis` package](#with-the-redis-package) @@ -34,6 +15,15 @@ Related packages: - [How it works](#how-it-works) - [License](#license) +## Supported features + +| Feature | `socket.io` version | Support | +|---------------------------------|---------------------|------------------------------------------------| +| Socket management | `4.0.0` | :white_check_mark: YES (since version `0.1.0`) | +| Inter-server communication | `4.1.0` | :white_check_mark: YES (since version `0.1.0`) | +| Broadcast with acknowledgements | `4.5.0` | :white_check_mark: YES (since version `0.1.0`) | +| Connection state recovery | `4.6.0` | :white_check_mark: YES (since version `0.1.0`) | + ## Installation ``` diff --git a/lib/util.ts b/lib/util.ts index c311ce4..590fa2e 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -1,6 +1,3 @@ -import { randomBytes } from "crypto"; -import { commandOptions } from "redis"; - export function hasBinary(obj: any, toJSON?: boolean): boolean { if (!obj || typeof obj !== "object") { return false; @@ -32,10 +29,6 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean { return false; } -export function randomId() { - return randomBytes(8).toString("hex"); -} - /** * Whether the client comes from the `redis` package * @@ -74,21 +67,23 @@ export function XREAD( readCount: number ) { if (isRedisV4Client(redisClient)) { - return redisClient.xRead( - commandOptions({ - isolated: true, - }), - [ + return import("redis").then((redisPackage) => { + return redisClient.xRead( + redisPackage.commandOptions({ + isolated: true, + }), + [ + { + key: streamName, + id: offset, + }, + ], { - key: streamName, - id: offset, - }, - ], - { - COUNT: readCount, - BLOCK: 5000, - } - ); + COUNT: readCount, + BLOCK: 5000, + } + ); + }); } else { return redisClient .xread("BLOCK", 100, "COUNT", readCount, "STREAMS", streamName, offset) diff --git a/package-lock.json b/package-lock.json index c3d1eb1..3977893 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@socket.io/redis-streams-adapter", - "version": "0.2.0", + "version": "0.2.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@socket.io/redis-streams-adapter", - "version": "0.2.0", + "version": "0.2.1", "license": "MIT", "dependencies": { "@msgpack/msgpack": "~2.8.0", @@ -2709,9 +2709,9 @@ "dev": true }, "node_modules/socket.io": { - "version": "4.7.4", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", - "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "version": "4.7.5", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.5.tgz", + "integrity": "sha512-DmeAkF6cwM9jSfmp6Dr/5/mfMwb5Z5qRrSXLpo3Fq5SqyU8CMF15jIN4ZhfSwu35ksM1qmHZDQ/DK5XTccSTvA==", "dev": true, "dependencies": { "accepts": "~1.3.4", @@ -5365,9 +5365,9 @@ "dev": true }, "socket.io": { - "version": "4.7.4", - "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.4.tgz", - "integrity": "sha512-DcotgfP1Zg9iP/dH9zvAQcWrE0TtbMVwXmlV4T4mqsvY+gw+LqUGPfx2AoVyRk0FLME+GQhufDMyacFmw7ksqw==", + "version": "4.7.5", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.7.5.tgz", + "integrity": "sha512-DmeAkF6cwM9jSfmp6Dr/5/mfMwb5Z5qRrSXLpo3Fq5SqyU8CMF15jIN4ZhfSwu35ksM1qmHZDQ/DK5XTccSTvA==", "dev": true, "requires": { "accepts": "~1.3.4", diff --git a/package.json b/package.json index 76d3be9..71e832f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@socket.io/redis-streams-adapter", - "version": "0.2.1", + "version": "0.2.2", "description": "The Socket.IO adapter based on Redis Streams, allowing to broadcast events between several Socket.IO servers", "license": "MIT", "repository": { diff --git a/test/broadcast.ts b/test/broadcast.ts deleted file mode 100644 index e6e45ab..0000000 --- a/test/broadcast.ts +++ /dev/null @@ -1,193 +0,0 @@ -import type { Server, Socket as ServerSocket } from "socket.io"; -import type { Socket as ClientSocket } from "socket.io-client"; -import { setup, sleep, times } from "./util"; -import expect = require("expect.js"); - -describe("broadcast()", function () { - let servers: Server[], - serverSockets: ServerSocket[], - clientSockets: ClientSocket[], - cleanup; - - beforeEach(async () => { - const testContext = await setup(); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - clientSockets = testContext.clientSockets; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - it("broadcasts to all clients", (done) => { - const partialDone = times(3, done); - - clientSockets.forEach((clientSocket, index) => { - clientSocket.on("test", (arg1, arg2, arg3) => { - expect(arg1).to.eql(1); - expect(arg2).to.eql("2"); - expect(Buffer.isBuffer(arg3)).to.be(true); - partialDone(); - }); - }); - - servers[0].emit("test", 1, "2", Buffer.from([3, 4])); - }); - - it("broadcasts to all clients in a namespace", (done) => { - const partialDone = times(3, () => { - servers.forEach((server) => server.of("/custom").adapter.close()); - done(); - }); - - servers.forEach((server) => server.of("/custom")); - - const onConnect = times(3, async () => { - await sleep(200); - - servers[0].of("/custom").emit("test"); - }); - - clientSockets.forEach((clientSocket) => { - const socket = clientSocket.io.socket("/custom"); - socket.on("connect", onConnect); - socket.on("test", () => { - socket.disconnect(); - partialDone(); - }); - }); - }); - - it("broadcasts to all clients in a room", (done) => { - serverSockets[1].join("room1"); - - clientSockets[0].on("test", () => { - done(new Error("should not happen")); - }); - - clientSockets[1].on("test", () => { - done(); - }); - - clientSockets[2].on("test", () => { - done(new Error("should not happen")); - }); - - servers[0].to("room1").emit("test"); - }); - - it("broadcasts to all clients except in room", (done) => { - const partialDone = times(2, done); - serverSockets[1].join("room1"); - - clientSockets[0].on("test", () => { - partialDone(); - }); - - clientSockets[1].on("test", () => { - done(new Error("should not happen")); - }); - - clientSockets[2].on("test", () => { - partialDone(); - }); - - servers[0].of("/").except("room1").emit("test"); - }); - - it("broadcasts to local clients only", (done) => { - clientSockets[0].on("test", () => { - done(); - }); - - clientSockets[1].on("test", () => { - done(new Error("should not happen")); - }); - - clientSockets[2].on("test", () => { - done(new Error("should not happen")); - }); - - servers[0].local.emit("test"); - }); - - it("broadcasts with multiple acknowledgements", (done) => { - clientSockets[0].on("test", (cb) => { - cb(1); - }); - - clientSockets[1].on("test", (cb) => { - cb(2); - }); - - clientSockets[2].on("test", (cb) => { - cb(3); - }); - - servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { - expect(err).to.be(null); - expect(responses).to.contain(1); - expect(responses).to.contain(2); - expect(responses).to.contain(3); - - done(); - }); - }); - - it("broadcasts with multiple acknowledgements (binary content)", (done) => { - clientSockets[0].on("test", (cb) => { - cb(Buffer.from([1])); - }); - - clientSockets[1].on("test", (cb) => { - cb(Buffer.from([2])); - }); - - clientSockets[2].on("test", (cb) => { - cb(Buffer.from([3])); - }); - - servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { - expect(err).to.be(null); - responses.forEach((response) => { - expect(response instanceof Uint8Array).to.be(true); - }); - - done(); - }); - }); - - it("broadcasts with multiple acknowledgements (no client)", (done) => { - servers[0] - .to("abc") - .timeout(500) - .emit("test", (err: Error, responses: any[]) => { - expect(err).to.be(null); - expect(responses).to.eql([]); - - done(); - }); - }); - - it("broadcasts with multiple acknowledgements (timeout)", (done) => { - clientSockets[0].on("test", (cb) => { - cb(1); - }); - - clientSockets[1].on("test", (cb) => { - cb(2); - }); - - clientSockets[2].on("test", () => { - // do nothing - }); - - servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { - expect(err).to.be.an(Error); - expect(responses).to.contain(1); - expect(responses).to.contain(2); - - done(); - }); - }); -}); diff --git a/test/connection-state-recovery.ts b/test/connection-state-recovery.ts index 12beab6..ee9eecc 100644 --- a/test/connection-state-recovery.ts +++ b/test/connection-state-recovery.ts @@ -21,7 +21,6 @@ describe("connection state recovery", () => { }); afterEach(() => { - servers[0].of("/foo").adapter.close(); cleanup(); }); diff --git a/test/disconnectSockets.ts b/test/disconnectSockets.ts deleted file mode 100644 index aabdd8a..0000000 --- a/test/disconnectSockets.ts +++ /dev/null @@ -1,34 +0,0 @@ -import type { Server, Socket as ServerSocket } from "socket.io"; -import type { Socket as ClientSocket } from "socket.io-client"; -import { setup, times } from "./util"; -import expect = require("expect.js"); - -describe("disconnectSockets()", function () { - let servers: Server[], - serverSockets: ServerSocket[], - clientSockets: ClientSocket[], - cleanup; - - beforeEach(async () => { - const testContext = await setup(); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - clientSockets = testContext.clientSockets; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - it("makes all socket instances disconnect", (done) => { - const partialDone = times(3, done); - - clientSockets.forEach((clientSocket) => { - clientSocket.on("disconnect", (reason) => { - expect(reason).to.eql("io server disconnect"); - partialDone(); - }); - }); - - servers[0].disconnectSockets(); - }); -}); diff --git a/test/fetchSockets.ts b/test/fetchSockets.ts deleted file mode 100644 index 5c91bd9..0000000 --- a/test/fetchSockets.ts +++ /dev/null @@ -1,41 +0,0 @@ -import type { Server, Socket as ServerSocket } from "socket.io"; -import { setup } from "./util"; -import expect = require("expect.js"); - -describe("fetchSockets()", function () { - let servers: Server[], serverSockets: ServerSocket[], cleanup; - - beforeEach(async () => { - const testContext = await setup(); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - it("returns all socket instances", async () => { - const sockets = await servers[0].fetchSockets(); - - expect(sockets).to.be.an(Array); - expect(sockets).to.have.length(3); - }); - - it("returns a single socket instance", async () => { - serverSockets[1].data = "test" as any; - - const [remoteSocket] = await servers[0] - .in(serverSockets[1].id) - .fetchSockets(); - - expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); - expect(remoteSocket.data).to.eql("test"); - expect(remoteSocket.rooms.size).to.eql(1); - }); - - it("returns only local socket instances", async () => { - const sockets = await servers[0].local.fetchSockets(); - - expect(sockets).to.have.length(1); - }); -}); diff --git a/test/index.ts b/test/index.ts new file mode 100644 index 0000000..7b95c16 --- /dev/null +++ b/test/index.ts @@ -0,0 +1,375 @@ +import { Server, Socket as ServerSocket } from "socket.io"; +import { Socket as ClientSocket } from "socket.io-client"; +import expect = require("expect.js"); +import { times, sleep, setup, shouldNotHappen } from "./util"; + +describe("@socket.io/redis-streams-adapter", () => { + let servers: Server[], + serverSockets: ServerSocket[], + clientSockets: ClientSocket[], + cleanup; + + beforeEach(async () => { + const testContext = await setup(); + servers = testContext.servers; + serverSockets = testContext.serverSockets; + clientSockets = testContext.clientSockets; + cleanup = testContext.cleanup; + }); + + afterEach(() => cleanup()); + + describe("broadcast", function () { + it("broadcasts to all clients", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("test", (arg1, arg2, arg3) => { + expect(arg1).to.eql(1); + expect(arg2).to.eql("2"); + expect(Buffer.isBuffer(arg3)).to.be(true); + partialDone(); + }); + }); + + servers[0].emit("test", 1, "2", Buffer.from([3, 4])); + }); + + it("broadcasts to all clients in a namespace", (done) => { + const partialDone = times(3, done); + + servers.forEach((server) => server.of("/custom")); + + const onConnect = times(3, async () => { + await sleep(200); + + servers[0].of("/custom").emit("test"); + }); + + clientSockets.forEach((clientSocket) => { + const socket = clientSocket.io.socket("/custom"); + socket.on("connect", onConnect); + socket.on("test", () => { + socket.disconnect(); + partialDone(); + }); + }); + }); + + it("broadcasts to all clients in a room", (done) => { + serverSockets[1].join("room1"); + + clientSockets[0].on("test", shouldNotHappen(done)); + clientSockets[1].on("test", () => done()); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].to("room1").emit("test"); + }); + + it("broadcasts to all clients except in room", (done) => { + const partialDone = times(2, done); + serverSockets[1].join("room1"); + + clientSockets[0].on("test", () => partialDone()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", () => partialDone()); + + servers[0].of("/").except("room1").emit("test"); + }); + + it("broadcasts to local clients only", (done) => { + clientSockets[0].on("test", () => done()); + clientSockets[1].on("test", shouldNotHappen(done)); + clientSockets[2].on("test", shouldNotHappen(done)); + + servers[0].local.emit("test"); + }); + + it("broadcasts with multiple acknowledgements", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (cb) => cb(3)); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + expect(responses).to.contain(3); + + setTimeout(() => { + // @ts-ignore + expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0); + + done(); + }, 500); + }); + }); + + it("broadcasts with multiple acknowledgements (binary content)", (done) => { + clientSockets[0].on("test", (cb) => cb(Buffer.from([1]))); + clientSockets[1].on("test", (cb) => cb(Buffer.from([2]))); + clientSockets[2].on("test", (cb) => cb(Buffer.from([3]))); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + responses.forEach((response) => { + expect(Buffer.isBuffer(response)).to.be(true); + }); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (no client)", (done) => { + servers[0] + .to("abc") + .timeout(500) + .emit("test", (err: Error, responses: any[]) => { + expect(err).to.be(null); + expect(responses).to.eql([]); + + done(); + }); + }); + + it("broadcasts with multiple acknowledgements (timeout)", (done) => { + clientSockets[0].on("test", (cb) => cb(1)); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", (_cb) => { + // do nothing + }); + + servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => { + expect(err).to.be.an(Error); + expect(responses).to.contain(1); + expect(responses).to.contain(2); + + done(); + }); + }); + + it("broadcasts with a single acknowledgement (local)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const response = await serverSockets[1].emitWithAck("test"); + expect(response).to.eql(2); + }); + + it("broadcasts with a single acknowledgement (remote)", async () => { + clientSockets[0].on("test", () => expect().fail()); + clientSockets[1].on("test", (cb) => cb(2)); + clientSockets[2].on("test", () => expect().fail()); + + const sockets = await servers[0].in(serverSockets[1].id).fetchSockets(); + expect(sockets.length).to.eql(1); + + const response = await sockets[0].timeout(500).emitWithAck("test"); + expect(response).to.eql(2); + }); + }); + + describe("socketsJoin", () => { + it("makes all socket instances join the specified room", async () => { + servers[0].socketsJoin("room1"); + + await sleep(300); + + expect(serverSockets[0].rooms.has("room1")).to.be(true); + expect(serverSockets[1].rooms.has("room1")).to.be(true); + expect(serverSockets[2].rooms.has("room1")).to.be(true); + }); + + it("makes the matching socket instances join the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].in("room1").socketsJoin("room2"); + + await sleep(300); + + expect(serverSockets[0].rooms.has("room2")).to.be(true); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance join the specified room", async () => { + servers[0].in(serverSockets[1].id).socketsJoin("room3"); + + await sleep(300); + + expect(serverSockets[0].rooms.has("room3")).to.be(false); + expect(serverSockets[1].rooms.has("room3")).to.be(true); + expect(serverSockets[2].rooms.has("room3")).to.be(false); + }); + }); + + describe("socketsLeave", () => { + it("makes all socket instances leave the specified room", async () => { + serverSockets[0].join("room1"); + serverSockets[2].join("room1"); + + servers[0].socketsLeave("room1"); + + await sleep(300); + + expect(serverSockets[0].rooms.has("room1")).to.be(false); + expect(serverSockets[1].rooms.has("room1")).to.be(false); + expect(serverSockets[2].rooms.has("room1")).to.be(false); + }); + + it("makes the matching socket instances leave the specified room", async () => { + serverSockets[0].join(["room1", "room2"]); + serverSockets[1].join(["room1", "room2"]); + serverSockets[2].join(["room2"]); + + servers[0].in("room1").socketsLeave("room2"); + + await sleep(300); + + expect(serverSockets[0].rooms.has("room2")).to.be(false); + expect(serverSockets[1].rooms.has("room2")).to.be(false); + expect(serverSockets[2].rooms.has("room2")).to.be(true); + }); + + it("makes the given socket instance leave the specified room", async () => { + serverSockets[0].join("room3"); + serverSockets[1].join("room3"); + serverSockets[2].join("room3"); + + servers[0].in(serverSockets[1].id).socketsLeave("room3"); + + await sleep(300); + + expect(serverSockets[0].rooms.has("room3")).to.be(true); + expect(serverSockets[1].rooms.has("room3")).to.be(false); + expect(serverSockets[2].rooms.has("room3")).to.be(true); + }); + }); + + describe("disconnectSockets", () => { + it("makes all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", (reason) => { + expect(reason).to.eql("io server disconnect"); + partialDone(); + }); + }); + + servers[0].disconnectSockets(); + }); + + it("sends a packet before all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", shouldNotHappen(done)); + + clientSocket.on("bye", () => { + clientSocket.off("disconnect"); + clientSocket.on("disconnect", partialDone); + }); + }); + + servers[0].emit("bye"); + servers[0].disconnectSockets(true); + }); + }); + + describe("fetchSockets", () => { + it("returns all socket instances", async () => { + const sockets = await servers[0].fetchSockets(); + + expect(sockets).to.be.an(Array); + expect(sockets).to.have.length(3); + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); // clean up + }); + + it("returns a single socket instance", async () => { + serverSockets[1].data = "test" as any; + + const [remoteSocket] = await servers[0] + .in(serverSockets[1].id) + .fetchSockets(); + + expect(remoteSocket.handshake).to.eql(serverSockets[1].handshake); + expect(remoteSocket.data).to.eql("test"); + expect(remoteSocket.rooms.size).to.eql(1); + }); + + it("returns only local socket instances", async () => { + const sockets = await servers[0].local.fetchSockets(); + + expect(sockets).to.have.length(1); + }); + }); + + describe("serverSideEmit", () => { + it("sends an event to other server instances", (done) => { + const partialDone = times(2, done); + + servers[0].serverSideEmit("hello", "world", 1, "2"); + + servers[0].on("hello", shouldNotHappen(done)); + + servers[1].on("hello", (arg1, arg2, arg3) => { + expect(arg1).to.eql("world"); + expect(arg2).to.eql(1); + expect(arg3).to.eql("2"); + partialDone(); + }); + + servers[2].of("/").on("hello", () => partialDone()); + }); + + it("sends an event and receives a response from the other server instances", (done) => { + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + expect(response).to.contain("3"); + done(); + }); + + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", (cb) => cb("3")); + }); + + it("sends an event but timeout if one server does not respond", function (done) { + this.timeout(6000); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err.message).to.be("timeout reached: missing 1 responses"); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + // do nothing + }); + }); + + it("succeeds even if an instance leaves the cluster", (done) => { + servers[0].on("hello", shouldNotHappen(done)); + servers[1].on("hello", (cb) => cb(2)); + servers[2].on("hello", () => { + servers[2].of("/").adapter.close(); + }); + + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + done(); + }); + }); + }); +}); diff --git a/test/serverSideEmit.ts b/test/serverSideEmit.ts deleted file mode 100644 index e7667bc..0000000 --- a/test/serverSideEmit.ts +++ /dev/null @@ -1,103 +0,0 @@ -import type { Server } from "socket.io"; -import { setup, times } from "./util"; -import expect = require("expect.js"); - -describe("serverSideEmit()", function () { - let servers: Server[], cleanup; - - beforeEach(async () => { - const testContext = await setup(); - servers = testContext.servers; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - it("sends an event to other server instances", (done) => { - const partialDone = times(2, done); - - servers[0].serverSideEmit("hello", "world", 1, "2"); - - servers[0].on("hello", () => { - done(new Error("should not happen")); - }); - - servers[1].on("hello", (arg1, arg2, arg3) => { - expect(arg1).to.eql("world"); - expect(arg2).to.eql(1); - expect(arg3).to.eql("2"); - partialDone(); - }); - - servers[2].of("/").on("hello", () => { - partialDone(); - }); - }); - - it("sends an event and receives a response from the other server instances", (done) => { - servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err).to.be(null); - expect(response).to.be.an(Array); - expect(response).to.contain(2); - expect(response).to.contain("3"); - done(); - }); - - servers[0].on("hello", () => { - done(new Error("should not happen")); - }); - - servers[1].on("hello", (cb) => { - cb(2); - }); - - servers[2].on("hello", (cb) => { - cb("3"); - }); - }); - - it("sends an event but timeout if one server does not respond", function (done) { - // TODO the serverSideEmit() method currently ignores the timeout() flag - this.timeout(6000); - - servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err.message).to.be("timeout reached: missing 1 responses"); - expect(response).to.be.an(Array); - expect(response).to.contain(2); - done(); - }); - - servers[0].on("hello", () => { - done(new Error("should not happen")); - }); - - servers[1].on("hello", (cb) => { - cb(2); - }); - - servers[2].on("hello", () => { - // do nothing - }); - }); - - it("succeeds even if an instance leaves the cluster", (done) => { - servers[0].on("hello", () => { - done(new Error("should not happen")); - }); - - servers[1].on("hello", (cb) => { - cb(2); - }); - - servers[2].on("hello", (cb) => { - servers[2].of("/").adapter.close(); - }); - - servers[0].serverSideEmit("hello", (err: Error, response: any) => { - expect(err).to.be(null); - expect(response).to.be.an(Array); - expect(response).to.contain(2); - done(); - }); - }); -}); diff --git a/test/socketsJoin.ts b/test/socketsJoin.ts deleted file mode 100644 index 2248d1e..0000000 --- a/test/socketsJoin.ts +++ /dev/null @@ -1,49 +0,0 @@ -import type { Server, Socket as ServerSocket } from "socket.io"; -import { setup, sleep } from "./util"; -import expect = require("expect.js"); - -describe("socketsJoin()", () => { - let servers: Server[], serverSockets: ServerSocket[], cleanup; - - beforeEach(async () => { - const testContext = await setup(); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - it("makes all socket instances join the specified room", async () => { - servers[0].socketsJoin("room1"); - - await sleep(300); - - expect(serverSockets[0].rooms.has("room1")).to.be(true); - expect(serverSockets[1].rooms.has("room1")).to.be(true); - expect(serverSockets[2].rooms.has("room1")).to.be(true); - }); - - it("makes the matching socket instances join the specified room", async () => { - serverSockets[0].join("room1"); - serverSockets[2].join("room1"); - - servers[0].in("room1").socketsJoin("room2"); - - await sleep(300); - - expect(serverSockets[0].rooms.has("room2")).to.be(true); - expect(serverSockets[1].rooms.has("room2")).to.be(false); - expect(serverSockets[2].rooms.has("room2")).to.be(true); - }); - - it("makes the given socket instance join the specified room", async () => { - servers[0].in(serverSockets[1].id).socketsJoin("room3"); - - await sleep(300); - - expect(serverSockets[0].rooms.has("room3")).to.be(false); - expect(serverSockets[1].rooms.has("room3")).to.be(true); - expect(serverSockets[2].rooms.has("room3")).to.be(false); - }); -}); diff --git a/test/socketsLeave.ts b/test/socketsLeave.ts deleted file mode 100644 index 13be4fd..0000000 --- a/test/socketsLeave.ts +++ /dev/null @@ -1,57 +0,0 @@ -import type { Server, Socket as ServerSocket } from "socket.io"; -import { setup, sleep } from "./util"; -import expect = require("expect.js"); - -describe("socketsLeave()", () => { - let servers: Server[], serverSockets: ServerSocket[], cleanup; - - beforeEach(async () => { - const testContext = await setup(); - servers = testContext.servers; - serverSockets = testContext.serverSockets; - cleanup = testContext.cleanup; - }); - - afterEach(() => cleanup()); - - it("makes all socket instances leave the specified room", async () => { - serverSockets[0].join("room1"); - serverSockets[2].join("room1"); - - servers[0].socketsLeave("room1"); - - await sleep(300); - - expect(serverSockets[0].rooms.has("room1")).to.be(false); - expect(serverSockets[1].rooms.has("room1")).to.be(false); - expect(serverSockets[2].rooms.has("room1")).to.be(false); - }); - - it("makes the matching socket instances leave the specified room", async () => { - serverSockets[0].join(["room1", "room2"]); - serverSockets[1].join(["room1", "room2"]); - serverSockets[2].join(["room2"]); - - servers[0].in("room1").socketsLeave("room2"); - - await sleep(300); - - expect(serverSockets[0].rooms.has("room2")).to.be(false); - expect(serverSockets[1].rooms.has("room2")).to.be(false); - expect(serverSockets[2].rooms.has("room2")).to.be(true); - }); - - it("makes the given socket instance leave the specified room", async () => { - serverSockets[0].join("room3"); - serverSockets[1].join("room3"); - serverSockets[2].join("room3"); - - servers[0].in(serverSockets[1].id).socketsLeave("room3"); - - await sleep(300); - - expect(serverSockets[0].rooms.has("room3")).to.be(true); - expect(serverSockets[1].rooms.has("room3")).to.be(false); - expect(serverSockets[2].rooms.has("room3")).to.be(true); - }); -}); diff --git a/test/util.ts b/test/util.ts index 4cc7da6..c552596 100644 --- a/test/util.ts +++ b/test/util.ts @@ -23,6 +23,10 @@ export function sleep(duration: number) { return new Promise((resolve) => setTimeout(resolve, duration)); } +export function shouldNotHappen(done) { + return () => done(new Error("should not happen")); +} + const NODES_COUNT = 3; interface TestContext { @@ -141,6 +145,11 @@ export function setup({ redisClients.push(redisClient); ports.push(port); if (servers.length === nodeCount) { + // ensure all nodes know each other + servers[0].emit("ping"); + servers[1].emit("ping"); + servers[2].emit("ping"); + await sleep(200); resolve({ @@ -149,17 +158,9 @@ export function setup({ clientSockets, ports, cleanup: () => { - servers.forEach((server) => { - // @ts-ignore - server.httpServer.close(); - server.of("/").adapter.close(); - }); - clientSockets.forEach((socket) => { - socket.disconnect(); - }); - redisClients.forEach((redisClient) => { - redisClient.quit(); - }); + servers.forEach((server) => server.close()); + clientSockets.forEach((socket) => socket.disconnect()); + redisClients.forEach((redisClient) => redisClient.quit()); }, }); }