@@ -7,15 +7,16 @@ import {
7
7
import _debug from '@codesandbox/common/lib/utils/debug' ;
8
8
import { camelizeKeys } from 'humps' ;
9
9
import { TextOperation } from 'ot' ;
10
- import { Socket , Channel } from 'phoenix' ;
10
+ import { Channel , Socket } from 'phoenix' ;
11
11
import uuid from 'uuid' ;
12
12
13
- import clientsFactory from './clients' ;
14
13
import { OPTIMISTIC_ID_PREFIX } from '../utils' ;
14
+ import clientsFactory from './clients' ;
15
15
16
16
type Options = {
17
17
onApplyOperation ( args : { moduleShortid : string ; operation : any } ) : void ;
18
18
provideJwtToken ( ) : string ;
19
+ getConnectionsCount ( ) : number ;
19
20
} ;
20
21
21
22
type JoinChannelResponse = {
@@ -29,27 +30,22 @@ declare global {
29
30
}
30
31
}
31
32
32
- const identifier = uuid . v4 ( ) ;
33
- const sentMessages = new Map ( ) ;
34
- const debug = _debug ( 'cs:socket' ) ;
33
+ class Live {
34
+ private identifier = uuid . v4 ( ) ;
35
+ private sentMessages = new Map ( ) ;
36
+ private debug = _debug ( 'cs:socket' ) ;
37
+ private channel : Channel | null ;
38
+ private messageIndex = 0 ;
39
+ private clients : ReturnType < typeof clientsFactory > ;
40
+ private _socket : Socket ;
35
41
36
- let channel : Channel | null ;
37
- let messageIndex = 0 ;
38
- let clients : ReturnType < typeof clientsFactory > ;
39
- let _socket : Socket ;
40
- let provideJwtToken : ( ) => string ;
42
+ private provideJwtToken : ( ) => string ;
43
+ private getConnectionsCount : ( ) => number ;
41
44
42
- export default new ( class Live {
43
45
initialize ( options : Options ) {
44
- const live = this ;
45
-
46
- clients = clientsFactory (
46
+ this . clients = clientsFactory (
47
47
( moduleShortid , revision , operation ) => {
48
- live . send ( 'operation' , {
49
- moduleShortid,
50
- operation,
51
- revision,
52
- } ) ;
48
+ this . sendOperation ( moduleShortid , revision , operation ) ;
53
49
} ,
54
50
( moduleShortid , operation ) => {
55
51
options . onApplyOperation ( {
@@ -58,48 +54,49 @@ export default new (class Live {
58
54
} ) ;
59
55
}
60
56
) ;
61
- provideJwtToken = options . provideJwtToken ;
57
+ this . provideJwtToken = options . provideJwtToken ;
58
+ this . getConnectionsCount = options . getConnectionsCount ;
62
59
}
63
60
64
61
getSocket ( ) {
65
- return _socket || this . connect ( ) ;
62
+ return this . _socket || this . connect ( ) ;
66
63
}
67
64
68
65
connect ( ) : Socket {
69
- if ( ! _socket ) {
66
+ if ( ! this . _socket ) {
70
67
const protocol = process . env . LOCAL_SERVER ? 'ws' : 'wss' ;
71
- _socket = new Socket ( `${ protocol } ://${ location . host } /socket` , {
68
+ this . _socket = new Socket ( `${ protocol } ://${ location . host } /socket` , {
72
69
params : {
73
- guardian_token : provideJwtToken ( ) ,
70
+ guardian_token : this . provideJwtToken ( ) ,
74
71
} ,
75
72
} ) ;
76
73
77
- _socket . connect ( ) ;
78
- window . socket = _socket ;
79
- debug ( 'Connecting to socket' , _socket ) ;
74
+ this . _socket . connect ( ) ;
75
+ window . socket = this . _socket ;
76
+ this . debug ( 'Connecting to socket' , this . _socket ) ;
80
77
}
81
78
82
- return _socket ;
79
+ return this . _socket ;
83
80
}
84
81
85
82
disconnect ( ) {
86
83
return new Promise ( ( resolve , reject ) => {
87
- if ( ! channel ) {
84
+ if ( ! this . channel ) {
88
85
resolve ( { } ) ;
89
86
return ;
90
87
}
91
88
92
- channel
89
+ this . channel
93
90
. leave ( )
94
91
. receive ( 'ok' , resp => {
95
- if ( ! channel ) {
92
+ if ( ! this . channel ) {
96
93
return resolve ( { } ) ;
97
94
}
98
95
99
- channel . onMessage = d => d ;
100
- channel = null ;
101
- sentMessages . clear ( ) ;
102
- messageIndex = 0 ;
96
+ this . channel . onMessage = d => d ;
97
+ this . channel = null ;
98
+ this . sentMessages . clear ( ) ;
99
+ this . messageIndex = 0 ;
103
100
104
101
return resolve ( resp ) ;
105
102
} )
@@ -110,9 +107,9 @@ export default new (class Live {
110
107
111
108
joinChannel ( roomId : string ) : Promise < JoinChannelResponse > {
112
109
return new Promise ( ( resolve , reject ) => {
113
- channel = this . getSocket ( ) . channel ( `live:${ roomId } ` , { version : 2 } ) ;
110
+ this . channel = this . getSocket ( ) . channel ( `live:${ roomId } ` , { version : 2 } ) ;
114
111
115
- channel
112
+ this . channel
116
113
. join ( )
117
114
. receive ( 'ok' , resp => {
118
115
const result = camelizeKeys ( resp ) as JoinChannelResponse ;
@@ -130,18 +127,18 @@ export default new (class Live {
130
127
data : object ;
131
128
} ) => { }
132
129
) {
133
- if ( ! channel ) {
130
+ if ( ! this . channel ) {
134
131
return ;
135
132
}
136
133
137
- channel . onMessage = ( event : any , data : any ) => {
134
+ this . channel . onMessage = ( event : any , data : any ) => {
138
135
const disconnected =
139
136
( data == null || Object . keys ( data ) . length === 0 ) &&
140
137
event === 'phx_error' ;
141
138
const alteredEvent = disconnected ? 'connection-loss' : event ;
142
139
143
140
const _isOwnMessage = Boolean (
144
- data && data . _messageId && sentMessages . delete ( data . _messageId )
141
+ data && data . _messageId && this . sentMessages . delete ( data . _messageId )
145
142
) ;
146
143
147
144
action ( {
@@ -155,14 +152,18 @@ export default new (class Live {
155
152
}
156
153
157
154
send ( event : string , payload : { _messageId ?: string ; [ key : string ] : any } ) {
158
- const _messageId = identifier + messageIndex ++ ;
155
+ if ( this . getConnectionsCount ( ) < 2 ) {
156
+ return Promise . resolve ( ) ;
157
+ }
158
+
159
+ const _messageId = this . identifier + this . messageIndex ++ ;
159
160
// eslint-disable-next-line
160
161
payload . _messageId = _messageId ;
161
- sentMessages . set ( _messageId , payload ) ;
162
+ this . sentMessages . set ( _messageId , payload ) ;
162
163
163
164
return new Promise ( ( resolve , reject ) => {
164
- if ( channel ) {
165
- channel
165
+ if ( this . channel ) {
166
+ this . channel
166
167
. push ( event , payload )
167
168
. receive ( 'ok' , resolve )
168
169
. receive ( 'error' , reject ) ;
@@ -202,7 +203,7 @@ export default new (class Live {
202
203
}
203
204
204
205
try {
205
- clients . get ( moduleShortid ) . applyClient ( operation ) ;
206
+ this . clients . get ( moduleShortid ) . applyClient ( operation ) ;
206
207
} catch ( e ) {
207
208
// Something went wrong, probably a sync mismatch. Request new version
208
209
this . send ( 'live:module_state' , { } ) ;
@@ -309,31 +310,59 @@ export default new (class Live {
309
310
} ) ;
310
311
}
311
312
313
+ getClient ( moduleShortid : string ) {
314
+ return this . clients . get ( moduleShortid ) ;
315
+ }
316
+
312
317
getAllClients ( ) {
313
- return clients . getAll ( ) ;
318
+ return this . clients . getAll ( ) ;
314
319
}
315
320
316
321
applyClient ( moduleShortid : string , operation : any ) {
317
- return clients
322
+ return this . clients
318
323
. get ( moduleShortid )
319
324
. applyClient ( TextOperation . fromJSON ( operation ) ) ;
320
325
}
321
326
322
327
applyServer ( moduleShortid : string , operation : any ) {
323
- return clients
328
+ return this . clients
324
329
. get ( moduleShortid )
325
330
. applyServer ( TextOperation . fromJSON ( operation ) ) ;
326
331
}
327
332
328
333
serverAck ( moduleShortid : string ) {
329
- return clients . get ( moduleShortid ) . serverAck ( ) ;
334
+ return this . clients . get ( moduleShortid ) . serverAck ( ) ;
330
335
}
331
336
332
337
createClient ( moduleShortid : string , revision : number ) {
333
- return clients . create ( moduleShortid , revision ) ;
338
+ return this . clients . create ( moduleShortid , revision ) ;
334
339
}
335
340
336
341
resetClients ( ) {
337
- clients . clear ( ) ;
342
+ this . clients . clear ( ) ;
343
+ }
344
+
345
+ private operationToElixir ( ot ) {
346
+ return ot . map ( op => {
347
+ if ( typeof op === 'number' ) {
348
+ if ( op < 0 ) {
349
+ return { d : - op } ;
350
+ }
351
+
352
+ return op ;
353
+ }
354
+
355
+ return { i : op } ;
356
+ } ) ;
338
357
}
339
- } ) ( ) ;
358
+
359
+ private sendOperation ( moduleShortid , revision , operation ) {
360
+ this . send ( 'operation' , {
361
+ moduleShortid,
362
+ operation : this . operationToElixir ( operation . toJSON ( ) ) ,
363
+ revision : Number ( revision ) ,
364
+ } ) ;
365
+ }
366
+ }
367
+
368
+ export default new Live ( ) ;
0 commit comments