Skip to content

Commit e3bf80c

Browse files
authored
Ensure operations are processed synchronously on OT server (#106)
ensure operatoins are processed synchrnously
1 parent 8dfd733 commit e3bf80c

File tree

3 files changed

+130
-37
lines changed

3 files changed

+130
-37
lines changed

packages/ot-server/src/DocumentManager.spec.ts

Lines changed: 87 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ function createOperation<T extends OperationType>(
2525
}
2626

2727
describe('DocumentManager', () => {
28-
it('should process consequential operations', () => {
28+
it('should process consequential operations', async () => {
2929
const manager = new DocumentManager('document');
3030

31-
manager.process(
31+
await manager.process(
3232
createOperation(
3333
OperationType.Insert,
3434
'doc@0:block@0',
@@ -48,7 +48,7 @@ describe('DocumentManager', () => {
4848
0
4949
)
5050
);
51-
manager.process(
51+
await manager.process(
5252
createOperation(
5353
OperationType.Insert,
5454
'doc@0:block@0:data@text:[0,0]',
@@ -57,7 +57,7 @@ describe('DocumentManager', () => {
5757
1
5858
)
5959
);
60-
manager.process(
60+
await manager.process(
6161
createOperation(
6262
OperationType.Insert,
6363
'doc@0:block@0:data@text:[0,0]',
@@ -66,7 +66,7 @@ describe('DocumentManager', () => {
6666
2
6767
)
6868
);
69-
manager.process(
69+
await manager.process(
7070
createOperation(
7171
OperationType.Insert,
7272
'doc@0:block@0:data@text:[0,0]',
@@ -95,10 +95,10 @@ describe('DocumentManager', () => {
9595
});
9696
});
9797

98-
it('should process concurrent operations', () => {
98+
it('should process concurrent operations', async () => {
9999
const manager = new DocumentManager('document');
100100

101-
manager.process(
101+
await manager.process(
102102
createOperation(
103103
OperationType.Insert,
104104
'doc@0:block@0',
@@ -118,7 +118,7 @@ describe('DocumentManager', () => {
118118
0
119119
)
120120
);
121-
manager.process(
121+
await manager.process(
122122
createOperation(
123123
OperationType.Insert,
124124
'doc@0:block@0:data@text:[0,0]',
@@ -127,7 +127,7 @@ describe('DocumentManager', () => {
127127
1
128128
)
129129
);
130-
manager.process(
130+
await manager.process(
131131
createOperation(
132132
OperationType.Insert,
133133
'doc@0:block@0:data@text:[0,0]',
@@ -156,10 +156,10 @@ describe('DocumentManager', () => {
156156
});
157157
});
158158

159-
it('should process older operations', () => {
159+
it('should process older operations', async () => {
160160
const manager = new DocumentManager('document');
161161

162-
manager.process(
162+
await manager.process(
163163
createOperation(
164164
OperationType.Insert,
165165
'doc@0:block@0',
@@ -179,7 +179,7 @@ describe('DocumentManager', () => {
179179
0
180180
)
181181
);
182-
manager.process(
182+
await manager.process(
183183
createOperation(
184184
OperationType.Insert,
185185
'doc@0:block@0:data@text:[0,0]',
@@ -188,7 +188,7 @@ describe('DocumentManager', () => {
188188
1
189189
)
190190
);
191-
manager.process(
191+
await manager.process(
192192
createOperation(
193193
OperationType.Insert,
194194
'doc@0:block@0:data@text:[0,0]',
@@ -197,7 +197,7 @@ describe('DocumentManager', () => {
197197
2
198198
)
199199
);
200-
manager.process(
200+
await manager.process(
201201
createOperation(
202202
OperationType.Insert,
203203
'doc@0:block@0:data@text:[0,0]',
@@ -225,4 +225,77 @@ describe('DocumentManager', () => {
225225
properties: {},
226226
});
227227
});
228+
229+
it('should correctly process async operations', async () => {
230+
const manager = new DocumentManager('document');
231+
232+
void manager.process(
233+
createOperation(
234+
OperationType.Insert,
235+
'doc@0:block@0',
236+
{
237+
payload: [{
238+
name: 'paragraph',
239+
data: {
240+
text: {
241+
$t: 't',
242+
value: '',
243+
fragments: [],
244+
},
245+
},
246+
}],
247+
},
248+
'user',
249+
0
250+
)
251+
);
252+
void manager.process(
253+
createOperation(
254+
OperationType.Insert,
255+
'doc@0:block@0:data@text:[0,0]',
256+
{ payload: 'A' },
257+
'user',
258+
1
259+
)
260+
);
261+
void manager.process(
262+
createOperation(
263+
OperationType.Insert,
264+
'doc@0:block@0:data@text:[0,0]',
265+
{ payload: 'A' },
266+
'user',
267+
2
268+
)
269+
);
270+
/**
271+
* Waiting for the last operation so expect is executed after it is processed
272+
*/
273+
await manager.process(
274+
createOperation(
275+
OperationType.Insert,
276+
'doc@0:block@0:data@text:[0,0]',
277+
{ payload: 'A' },
278+
'user',
279+
3
280+
)
281+
);
282+
283+
expect(manager.currentModelState()).toEqual({
284+
identifier: 'document',
285+
blocks: [
286+
{
287+
name: 'paragraph',
288+
tunes: {},
289+
data: {
290+
text: {
291+
$t: 't',
292+
value: 'AAA',
293+
fragments: [],
294+
},
295+
},
296+
},
297+
],
298+
properties: {},
299+
});
300+
});
228301
});

packages/ot-server/src/DocumentManager.ts

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ export class DocumentManager {
2020
*/
2121
#model: EditorJSModel;
2222

23+
/**
24+
* Promise resolving with currently processed operation
25+
*/
26+
#operationInProcessing: Promise<Operation | null> | null = null;
27+
2328
/**
2429
* DocumentManager constructor function
2530
* @param identifier - identifier of the document to manage
@@ -37,38 +42,53 @@ export class DocumentManager {
3742

3843
/**
3944
* Process new operation
45+
* - awaits previous operation to finish processing
46+
* - processes the new one
47+
* @param operation - operation from the client to process
48+
*/
49+
public async process(operation: Operation): Promise<Operation | null> {
50+
await this.#operationInProcessing;
51+
52+
return this.#processNextOperation(operation);
53+
}
54+
55+
/**
56+
* Return serialised current state of the document
57+
*/
58+
public currentModelState(): EditorDocumentSerialized {
59+
return this.#model.serialized;
60+
}
61+
62+
/**
63+
* Process next operation
4064
* - Transform relative to operations in stack if needed
4165
* - Puts operation to the operations array
4266
* - Updates models state
43-
* @todo ensure the operations are processed consequently
44-
* @param operation - operation from the client to process
67+
* @param operation - operation to process
4568
*/
46-
public process(operation: Operation): Operation | null {
47-
if (operation.rev! > this.#currentRev) {
48-
console.error('Operation rejected due to incorrect revision %o', operation);
69+
#processNextOperation(operation: Operation): Promise<Operation | null> {
70+
this.#operationInProcessing = new Promise((resolve) => {
71+
if (operation.rev! > this.#currentRev) {
72+
console.error('Operation rejected due to incorrect revision %o', operation);
4973

50-
return null;
51-
}
74+
return resolve(null);
75+
}
5276

53-
const conflictingOps = this.#operations.filter(op => op.rev! >= operation.rev!);
54-
const transformedOp = conflictingOps.reduce((result, op) => result.transform(op), operation);
77+
const conflictingOps = this.#operations.filter(op => op.rev! >= operation.rev!);
78+
const transformedOp = conflictingOps.reduce((result, op) => result.transform(op), operation);
5579

56-
transformedOp.rev = this.#currentRev;
80+
transformedOp.rev = this.#currentRev;
5781

58-
this.#currentRev += 1;
82+
this.#currentRev += 1;
5983

60-
this.#operations.push(transformedOp);
84+
this.#operations.push(transformedOp);
6185

62-
this.#applyOperationToModel(transformedOp);
86+
this.#applyOperationToModel(transformedOp);
6387

64-
return transformedOp;
65-
}
88+
resolve(transformedOp);
89+
});
6690

67-
/**
68-
* Return serialised current state of the document
69-
*/
70-
public currentModelState(): EditorDocumentSerialized {
71-
return this.#model.serialized;
91+
return this.#operationInProcessing;
7292
}
7393

7494
/**

packages/ot-server/src/OTServer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export class OTServer {
6161

6262
return;
6363
case MessageType.Operation:
64-
this.#onOperation(ws, message.payload as SerializedOperation);
64+
void this.#onOperation(ws, message.payload as SerializedOperation);
6565

6666
return;
6767
}
@@ -104,7 +104,7 @@ export class OTServer {
104104
* @param ws - client websocket
105105
* @param payload - operation payload
106106
*/
107-
#onOperation(ws: WebSocket, payload: SerializedOperation): void {
107+
async #onOperation(ws: WebSocket, payload: SerializedOperation): Promise<void> {
108108
const operation = Operation.from(payload);
109109
const documentId = operation.index.documentId;
110110

@@ -121,7 +121,7 @@ export class OTServer {
121121
const manager = this.#managers.get(documentId)!;
122122
const clients = this.#clients.get(documentId)!;
123123

124-
const processedOperation = manager.process(operation);
124+
const processedOperation = await manager.process(operation);
125125

126126
if (processedOperation === null) {
127127
ws.close(BAD_REQUEST_CODE, 'Operation couldn\'t be processed');

0 commit comments

Comments
 (0)