Skip to content

Commit 27c2970

Browse files
authored
refactor: extract out bindState and writeState (#531)
1 parent 39b3acc commit 27c2970

File tree

2 files changed

+105
-104
lines changed

2 files changed

+105
-104
lines changed

apps/yjs/src/yjs-server.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,8 @@ import http from "http";
55

66
import jwt from "jsonwebtoken";
77

8-
import { ApolloServer } from "apollo-server-express";
9-
10-
import { gql } from "apollo-server";
11-
import { ApolloServerPluginLandingPageLocalDefault } from "apollo-server-core";
12-
13-
import { getYDoc, setupWSConnection } from "./yjs-setupWS";
8+
import { createSetupWSConnection } from "./yjs-setupWS";
9+
import { bindState, writeState } from "./yjs-blob";
1410

1511
import prisma from "@codepod/prisma";
1612
interface TokenInterface {
@@ -58,7 +54,9 @@ export async function startWsServer({ jwtSecret, port }) {
5854
const http_server = http.createServer(expapp);
5955
const wss = new WebSocketServer({ noServer: true });
6056

61-
wss.on("connection", setupWSConnection);
57+
wss.on("connection", (...args) =>
58+
createSetupWSConnection(bindState, writeState)(...args)
59+
);
6260

6361
http_server.on("upgrade", async (request, socket, head) => {
6462
// You may check auth of request here..

apps/yjs/src/yjs-setupWS.ts

Lines changed: 100 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,8 @@ import * as syncProtocol from "y-protocols/sync";
66

77
import { encoding, decoding, map } from "lib0";
88

9-
// To test: use yjs-plain to create a legacy document and save to DB
10-
// Then, use yjs-blob to read back and test if the migration wroks.
11-
12-
import { bindState, writeState } from "./yjs-blob";
13-
// import { bindState, writeState } from "./yjs-plain";
9+
let writeState = () => {};
10+
let bindState = async (doc: Y.Doc, repoId: string) => {};
1411

1512
const wsReadyStateConnecting = 0;
1613
const wsReadyStateOpen = 1;
@@ -274,105 +271,111 @@ const pingTimeout = 30000;
274271
* @param {any} req
275272
* @param {any} opts
276273
*/
277-
export const setupWSConnection = async (
278-
conn,
279-
req,
280-
{
281-
docName = req.url.slice(1).split("?")[0],
282-
gc = true,
283-
readOnly = false,
284-
role = undefined,
285-
} = {}
286-
) => {
287-
conn.binaryType = "arraybuffer";
288-
if (role) conn.role = role;
289-
console.log(`setupWSConnection ${docName}, read-only=${readOnly}`);
290-
// get doc, initialize if it does not exist yet
291-
const { doc, docLoadedPromise } = getYDoc(docName, gc);
292-
doc.conns.set(conn, new Set());
293-
if (scheduledDelete.has(doc.name)) {
294-
console.log("=== cancel previous scheduled destroy ydoc", doc.name);
295-
clearTimeout(scheduledDelete.get(doc.name));
296-
scheduledDelete.delete(doc.name);
297-
}
298-
299-
// It might take some time to load the doc, but before then we still need to
300-
// listen for websocket events, Ref:
301-
// https://github.com/yjs/y-websocket/issues/81#issuecomment-1453185788
302-
let isDocLoaded = docLoadedPromise ? false : true;
303-
let queuedMessages: Uint8Array[] = [];
304-
let isConnectionAlive = true;
305-
306-
// listen and reply to events
307-
conn.on(
308-
"message",
309-
/** @param {ArrayBuffer} message */ (message) => {
310-
if (isDocLoaded)
311-
messageListener(conn, doc, new Uint8Array(message), readOnly);
312-
else queuedMessages.push(new Uint8Array(message));
274+
export const createSetupWSConnection = (bindState, writeState) => {
275+
// set the writeState and bindState functions
276+
writeState = writeState;
277+
bindState = bindState;
278+
// return the setupWSConnection function
279+
return async (
280+
conn,
281+
req,
282+
{
283+
docName = req.url.slice(1).split("?")[0],
284+
gc = true,
285+
readOnly = false,
286+
role = undefined,
287+
} = {}
288+
) => {
289+
conn.binaryType = "arraybuffer";
290+
if (role) conn.role = role;
291+
console.log(`setupWSConnection ${docName}, read-only=${readOnly}`);
292+
// get doc, initialize if it does not exist yet
293+
const { doc, docLoadedPromise } = getYDoc(docName, gc);
294+
doc.conns.set(conn, new Set());
295+
if (scheduledDelete.has(doc.name)) {
296+
console.log("=== cancel previous scheduled destroy ydoc", doc.name);
297+
clearTimeout(scheduledDelete.get(doc.name));
298+
scheduledDelete.delete(doc.name);
313299
}
314-
);
315300

316-
// Check if connection is still alive
317-
let pongReceived = true;
318-
const pingInterval = setInterval(() => {
319-
if (!pongReceived) {
320-
if (doc.conns.has(conn)) {
321-
closeConn(doc, conn);
322-
isConnectionAlive = false;
301+
// It might take some time to load the doc, but before then we still need to
302+
// listen for websocket events, Ref:
303+
// https://github.com/yjs/y-websocket/issues/81#issuecomment-1453185788
304+
let isDocLoaded = docLoadedPromise ? false : true;
305+
let queuedMessages: Uint8Array[] = [];
306+
let isConnectionAlive = true;
307+
308+
// listen and reply to events
309+
conn.on(
310+
"message",
311+
/** @param {ArrayBuffer} message */ (message) => {
312+
if (isDocLoaded)
313+
messageListener(conn, doc, new Uint8Array(message), readOnly);
314+
else queuedMessages.push(new Uint8Array(message));
323315
}
324-
clearInterval(pingInterval);
325-
} else if (doc.conns.has(conn)) {
326-
pongReceived = false;
327-
try {
328-
conn.ping();
329-
} catch (e) {
330-
closeConn(doc, conn);
331-
isConnectionAlive = false;
316+
);
317+
318+
// Check if connection is still alive
319+
let pongReceived = true;
320+
const pingInterval = setInterval(() => {
321+
if (!pongReceived) {
322+
if (doc.conns.has(conn)) {
323+
closeConn(doc, conn);
324+
isConnectionAlive = false;
325+
}
332326
clearInterval(pingInterval);
327+
} else if (doc.conns.has(conn)) {
328+
pongReceived = false;
329+
try {
330+
conn.ping();
331+
} catch (e) {
332+
closeConn(doc, conn);
333+
isConnectionAlive = false;
334+
clearInterval(pingInterval);
335+
}
333336
}
334-
}
335-
}, pingTimeout);
336-
conn.on("close", () => {
337-
closeConn(doc, conn);
338-
isConnectionAlive = false;
339-
clearInterval(pingInterval);
340-
});
341-
conn.on("pong", () => {
342-
pongReceived = true;
343-
});
344-
// put the following in a variables in a block so the interval handlers don't keep in in
345-
// scope
346-
const sendSyncStep1 = () => {
347-
// send sync step 1
348-
const encoder = encoding.createEncoder();
349-
encoding.writeVarUint(encoder, messageSync);
350-
syncProtocol.writeSyncStep1(encoder, doc);
351-
send(doc, conn, encoding.toUint8Array(encoder));
352-
const awarenessStates = doc.awareness.getStates();
353-
if (awarenessStates.size > 0) {
337+
}, pingTimeout);
338+
conn.on("close", () => {
339+
closeConn(doc, conn);
340+
isConnectionAlive = false;
341+
clearInterval(pingInterval);
342+
});
343+
conn.on("pong", () => {
344+
pongReceived = true;
345+
});
346+
// put the following in a variables in a block so the interval handlers don't keep in in
347+
// scope
348+
const sendSyncStep1 = () => {
349+
// send sync step 1
354350
const encoder = encoding.createEncoder();
355-
encoding.writeVarUint(encoder, messageAwareness);
356-
encoding.writeVarUint8Array(
357-
encoder,
358-
awarenessProtocol.encodeAwarenessUpdate(
359-
doc.awareness,
360-
Array.from(awarenessStates.keys())
361-
)
362-
);
351+
encoding.writeVarUint(encoder, messageSync);
352+
syncProtocol.writeSyncStep1(encoder, doc);
363353
send(doc, conn, encoding.toUint8Array(encoder));
354+
const awarenessStates = doc.awareness.getStates();
355+
if (awarenessStates.size > 0) {
356+
const encoder = encoding.createEncoder();
357+
encoding.writeVarUint(encoder, messageAwareness);
358+
encoding.writeVarUint8Array(
359+
encoder,
360+
awarenessProtocol.encodeAwarenessUpdate(
361+
doc.awareness,
362+
Array.from(awarenessStates.keys())
363+
)
364+
);
365+
send(doc, conn, encoding.toUint8Array(encoder));
366+
}
367+
};
368+
if (docLoadedPromise) {
369+
docLoadedPromise.then(() => {
370+
if (!isConnectionAlive) return;
371+
372+
isDocLoaded = true;
373+
queuedMessages.forEach((message) =>
374+
messageListener(conn, doc, message, readOnly)
375+
);
376+
queuedMessages = [];
377+
sendSyncStep1();
378+
});
364379
}
365380
};
366-
if (docLoadedPromise) {
367-
docLoadedPromise.then(() => {
368-
if (!isConnectionAlive) return;
369-
370-
isDocLoaded = true;
371-
queuedMessages.forEach((message) =>
372-
messageListener(conn, doc, message, readOnly)
373-
);
374-
queuedMessages = [];
375-
sendSyncStep1();
376-
});
377-
}
378381
};

0 commit comments

Comments
 (0)