-
Notifications
You must be signed in to change notification settings - Fork 5.2k
/
Copy pathlivedata_server.js
1749 lines (1522 loc) · 58.5 KB
/
livedata_server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import isEmpty from 'lodash.isempty';
import isObject from 'lodash.isobject';
import isString from 'lodash.isstring';
import { SessionCollectionView } from './session_collection_view';
import { SessionDocumentView } from './session_document_view';
DDPServer = {};
// Publication strategies define how we handle data from published cursors at the collection level
// This allows someone to:
// - Choose a trade-off between client-server bandwidth and server memory usage
// - Implement special (non-mongo) collections like volatile message queues
const publicationStrategies = {
// SERVER_MERGE is the default strategy.
// When using this strategy, the server maintains a copy of all data a connection is subscribed to.
// This allows us to only send deltas over multiple publications.
SERVER_MERGE: {
useDummyDocumentView: false,
useCollectionView: true,
doAccountingForCollection: true,
},
// The NO_MERGE_NO_HISTORY strategy results in the server sending all publication data
// directly to the client. It does not remember what it has previously sent
// to it will not trigger removed messages when a subscription is stopped.
// This should only be chosen for special use cases like send-and-forget queues.
NO_MERGE_NO_HISTORY: {
useDummyDocumentView: false,
useCollectionView: false,
doAccountingForCollection: false,
},
// NO_MERGE is similar to NO_MERGE_NO_HISTORY but the server will remember the IDs it has
// sent to the client so it can remove them when a subscription is stopped.
// This strategy can be used when a collection is only used in a single publication.
NO_MERGE: {
useDummyDocumentView: false,
useCollectionView: false,
doAccountingForCollection: true,
},
// NO_MERGE_MULTI is similar to `NO_MERGE`, but it does track whether a document is
// used by multiple publications. This has some memory overhead, but it still does not do
// diffing so it's faster and slimmer than SERVER_MERGE.
NO_MERGE_MULTI: {
useDummyDocumentView: true,
useCollectionView: true,
doAccountingForCollection: true
}
};
DDPServer.publicationStrategies = publicationStrategies;
// This file contains classes:
// * Session - The server's connection to a single DDP client
// * Subscription - A single subscription for a single client
// * Server - An entire server that may talk to > 1 client. A DDP endpoint.
//
// Session and Subscription are file scope. For now, until we freeze
// the interface, Server is package scope (in the future it should be
// exported).
DDPServer._SessionDocumentView = SessionDocumentView;
DDPServer._getCurrentFence = function () {
let currentInvocation = this._CurrentWriteFence.get();
if (currentInvocation) {
return currentInvocation;
}
currentInvocation = DDP._CurrentMethodInvocation.get();
return currentInvocation ? currentInvocation.fence : undefined;
};
DDPServer._SessionCollectionView = SessionCollectionView;
/******************************************************************************/
/* Session */
/******************************************************************************/
var Session = function (server, version, socket, options) {
var self = this;
self.id = Random.id();
self.server = server;
self.version = version;
self.initialized = false;
self.socket = socket;
// Set to null when the session is destroyed. Multiple places below
// use this to determine if the session is alive or not.
self.inQueue = new Meteor._DoubleEndedQueue();
self.blocked = false;
self.workerRunning = false;
self.cachedUnblock = null;
// Sub objects for active subscriptions
self._namedSubs = new Map();
self._universalSubs = [];
self.userId = null;
self.collectionViews = new Map();
// Set this to false to not send messages when collectionViews are
// modified. This is done when rerunning subs in _setUserId and those messages
// are calculated via a diff instead.
self._isSending = true;
// If this is true, don't start a newly-created universal publisher on this
// session. The session will take care of starting it when appropriate.
self._dontStartNewUniversalSubs = false;
// When we are rerunning subscriptions, any ready messages
// we want to buffer up for when we are done rerunning subscriptions
self._pendingReady = [];
// List of callbacks to call when this connection is closed.
self._closeCallbacks = [];
// XXX HACK: If a sockjs connection, save off the URL. This is
// temporary and will go away in the near future.
self._socketUrl = socket.url;
// Allow tests to disable responding to pings.
self._respondToPings = options.respondToPings;
// This object is the public interface to the session. In the public
// API, it is called the `connection` object. Internally we call it
// a `connectionHandle` to avoid ambiguity.
self.connectionHandle = {
id: self.id,
close: function () {
self.close();
},
onClose: function (fn) {
var cb = Meteor.bindEnvironment(fn, "connection onClose callback");
if (self.inQueue) {
self._closeCallbacks.push(cb);
} else {
// if we're already closed, call the callback.
Meteor.defer(cb);
}
},
clientAddress: self._clientAddress(),
httpHeaders: self.socket.headers
};
self.send({ msg: 'connected', session: self.id });
// On initial connect, spin up all the universal publishers.
self.startUniversalSubs();
if (version !== 'pre1' && options.heartbeatInterval !== 0) {
// We no longer need the low level timeout because we have heartbeats.
socket.setWebsocketTimeout(0);
self.heartbeat = new DDPCommon.Heartbeat({
heartbeatInterval: options.heartbeatInterval,
heartbeatTimeout: options.heartbeatTimeout,
onTimeout: function () {
self.close();
},
sendPing: function () {
self.send({msg: 'ping'});
}
});
self.heartbeat.start();
}
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"livedata", "sessions", 1);
};
Object.assign(Session.prototype, {
sendReady: function (subscriptionIds) {
var self = this;
if (self._isSending) {
self.send({msg: "ready", subs: subscriptionIds});
} else {
subscriptionIds.forEach(function (subscriptionId) {
self._pendingReady.push(subscriptionId);
});
}
},
_canSend(collectionName) {
return this._isSending || !this.server.getPublicationStrategy(collectionName).useCollectionView;
},
sendAdded(collectionName, id, fields) {
if (this._canSend(collectionName)) {
this.send({ msg: 'added', collection: collectionName, id, fields });
}
},
sendChanged(collectionName, id, fields) {
if (isEmpty(fields))
return;
if (this._canSend(collectionName)) {
this.send({
msg: "changed",
collection: collectionName,
id,
fields
});
}
},
sendRemoved(collectionName, id) {
if (this._canSend(collectionName)) {
this.send({msg: "removed", collection: collectionName, id});
}
},
getSendCallbacks: function () {
var self = this;
return {
added: self.sendAdded.bind(self),
changed: self.sendChanged.bind(self),
removed: self.sendRemoved.bind(self)
};
},
getCollectionView: function (collectionName) {
var self = this;
var ret = self.collectionViews.get(collectionName);
if (!ret) {
ret = new SessionCollectionView(collectionName,
self.getSendCallbacks());
self.collectionViews.set(collectionName, ret);
}
return ret;
},
added(subscriptionHandle, collectionName, id, fields) {
if (this.server.getPublicationStrategy(collectionName).useCollectionView) {
const view = this.getCollectionView(collectionName);
view.added(subscriptionHandle, id, fields);
} else {
this.sendAdded(collectionName, id, fields);
}
},
removed(subscriptionHandle, collectionName, id) {
if (this.server.getPublicationStrategy(collectionName).useCollectionView) {
const view = this.getCollectionView(collectionName);
view.removed(subscriptionHandle, id);
if (view.isEmpty()) {
this.collectionViews.delete(collectionName);
}
} else {
this.sendRemoved(collectionName, id);
}
},
changed(subscriptionHandle, collectionName, id, fields) {
if (this.server.getPublicationStrategy(collectionName).useCollectionView) {
const view = this.getCollectionView(collectionName);
view.changed(subscriptionHandle, id, fields);
} else {
this.sendChanged(collectionName, id, fields);
}
},
startUniversalSubs: function () {
var self = this;
// Make a shallow copy of the set of universal handlers and start them. If
// additional universal publishers start while we're running them (due to
// yielding), they will run separately as part of Server.publish.
var handlers = [...self.server.universal_publish_handlers];
handlers.forEach(function (handler) {
self._startSubscription(handler);
});
},
// Destroy this session and unregister it at the server.
close: function () {
var self = this;
// Destroy this session, even if it's not registered at the
// server. Stop all processing and tear everything down. If a socket
// was attached, close it.
// Already destroyed.
if (! self.inQueue)
return;
// Drop the merge box data immediately.
self.inQueue = null;
self.collectionViews = new Map();
if (self.heartbeat) {
self.heartbeat.stop();
self.heartbeat = null;
}
if (self.socket) {
self.socket.close();
self.socket._meteorSession = null;
}
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"livedata", "sessions", -1);
Meteor.defer(function () {
// Stop callbacks can yield, so we defer this on close.
// sub._isDeactivated() detects that we set inQueue to null and
// treats it as semi-deactivated (it will ignore incoming callbacks, etc).
self._deactivateAllSubscriptions();
// Defer calling the close callbacks, so that the caller closing
// the session isn't waiting for all the callbacks to complete.
self._closeCallbacks.forEach(function (callback) {
callback();
});
});
// Unregister the session.
self.server._removeSession(self);
},
// Send a message (doing nothing if no socket is connected right now).
// It should be a JSON object (it will be stringified).
send: function (msg) {
const self = this;
if (self.socket) {
if (Meteor._printSentDDP)
Meteor._debug("Sent DDP", DDPCommon.stringifyDDP(msg));
self.socket.send(DDPCommon.stringifyDDP(msg));
}
},
// Send a connection error.
sendError: function (reason, offendingMessage) {
var self = this;
var msg = {msg: 'error', reason: reason};
if (offendingMessage)
msg.offendingMessage = offendingMessage;
self.send(msg);
},
// Process 'msg' as an incoming message. As a guard against
// race conditions during reconnection, ignore the message if
// 'socket' is not the currently connected socket.
//
// We run the messages from the client one at a time, in the order
// given by the client. The message handler is passed an idempotent
// function 'unblock' which it may call to allow other messages to
// begin running in parallel in another fiber (for example, a method
// that wants to yield). Otherwise, it is automatically unblocked
// when it returns.
//
// Actually, we don't have to 'totally order' the messages in this
// way, but it's the easiest thing that's correct. (unsub needs to
// be ordered against sub, methods need to be ordered against each
// other).
processMessage: function (msg_in) {
var self = this;
if (!self.inQueue) // we have been destroyed.
return;
// Respond to ping and pong messages immediately without queuing.
// If the negotiated DDP version is "pre1" which didn't support
// pings, preserve the "pre1" behavior of responding with a "bad
// request" for the unknown messages.
//
// Fibers are needed because heartbeats use Meteor.setTimeout, which
// needs a Fiber. We could actually use regular setTimeout and avoid
// these new fibers, but it is easier to just make everything use
// Meteor.setTimeout and not think too hard.
//
// Any message counts as receiving a pong, as it demonstrates that
// the client is still alive.
if (self.heartbeat) {
self.heartbeat.messageReceived();
};
if (self.version !== 'pre1' && msg_in.msg === 'ping') {
if (self._respondToPings)
self.send({msg: "pong", id: msg_in.id});
return;
}
if (self.version !== 'pre1' && msg_in.msg === 'pong') {
// Since everything is a pong, there is nothing to do
return;
}
self.inQueue.push(msg_in);
if (self.workerRunning)
return;
self.workerRunning = true;
var processNext = function () {
var msg = self.inQueue && self.inQueue.shift();
if (!msg) {
self.workerRunning = false;
return;
}
function runHandlers() {
var blocked = true;
var unblock = function () {
if (!blocked)
return; // idempotent
blocked = false;
setImmediate(processNext);
};
self.server.onMessageHook.each(function (callback) {
callback(msg, self);
return true;
});
if (msg.msg in self.protocol_handlers) {
const result = self.protocol_handlers[msg.msg].call(
self,
msg,
unblock
);
if (Meteor._isPromise(result)) {
result.finally(() => unblock());
} else {
unblock();
}
} else {
self.sendError('Bad request', msg);
unblock(); // in case the handler didn't already do it
}
}
runHandlers();
};
processNext();
},
protocol_handlers: {
sub: async function (msg, unblock) {
var self = this;
// cacheUnblock temporarly, so we can capture it later
// we will use unblock in current eventLoop, so this is safe
self.cachedUnblock = unblock;
// reject malformed messages
if (typeof (msg.id) !== "string" ||
typeof (msg.name) !== "string" ||
('params' in msg && !(msg.params instanceof Array))) {
self.sendError("Malformed subscription", msg);
return;
}
if (!self.server.publish_handlers[msg.name]) {
self.send({
msg: 'nosub', id: msg.id,
error: new Meteor.Error(404, `Subscription '${msg.name}' not found`)});
return;
}
if (self._namedSubs.has(msg.id))
// subs are idempotent, or rather, they are ignored if a sub
// with that id already exists. this is important during
// reconnect.
return;
// XXX It'd be much better if we had generic hooks where any package can
// hook into subscription handling, but in the mean while we special case
// ddp-rate-limiter package. This is also done for weak requirements to
// add the ddp-rate-limiter package in case we don't have Accounts. A
// user trying to use the ddp-rate-limiter must explicitly require it.
if (Package['ddp-rate-limiter']) {
var DDPRateLimiter = Package['ddp-rate-limiter'].DDPRateLimiter;
var rateLimiterInput = {
userId: self.userId,
clientAddress: self.connectionHandle.clientAddress,
type: "subscription",
name: msg.name,
connectionId: self.id
};
DDPRateLimiter._increment(rateLimiterInput);
var rateLimitResult = DDPRateLimiter._check(rateLimiterInput);
if (!rateLimitResult.allowed) {
self.send({
msg: 'nosub', id: msg.id,
error: new Meteor.Error(
'too-many-requests',
DDPRateLimiter.getErrorMessage(rateLimitResult),
{timeToReset: rateLimitResult.timeToReset})
});
return;
}
}
var handler = self.server.publish_handlers[msg.name];
await self._startSubscription(handler, msg.id, msg.params, msg.name);
// cleaning cached unblock
self.cachedUnblock = null;
},
unsub: function (msg) {
var self = this;
self._stopSubscription(msg.id);
},
method: async function (msg, unblock) {
var self = this;
// Reject malformed messages.
// For now, we silently ignore unknown attributes,
// for forwards compatibility.
if (typeof (msg.id) !== "string" ||
typeof (msg.method) !== "string" ||
('params' in msg && !(msg.params instanceof Array)) ||
(('randomSeed' in msg) && (typeof msg.randomSeed !== "string"))) {
self.sendError("Malformed method invocation", msg);
return;
}
var randomSeed = msg.randomSeed || null;
// Set up to mark the method as satisfied once all observers
// (and subscriptions) have reacted to any writes that were
// done.
var fence = new DDPServer._WriteFence;
fence.onAllCommitted(function () {
// Retire the fence so that future writes are allowed.
// This means that callbacks like timers are free to use
// the fence, and if they fire before it's armed (for
// example, because the method waits for them) their
// writes will be included in the fence.
fence.retire();
self.send({msg: 'updated', methods: [msg.id]});
});
// Find the handler
var handler = self.server.method_handlers[msg.method];
if (!handler) {
self.send({
msg: 'result', id: msg.id,
error: new Meteor.Error(404, `Method '${msg.method}' not found`)});
await fence.arm();
return;
}
var invocation = new DDPCommon.MethodInvocation({
name: msg.method,
isSimulation: false,
userId: self.userId,
setUserId(userId) {
return self._setUserId(userId);
},
unblock: unblock,
connection: self.connectionHandle,
randomSeed: randomSeed,
fence,
});
const promise = new Promise((resolve, reject) => {
// XXX It'd be better if we could hook into method handlers better but
// for now, we need to check if the ddp-rate-limiter exists since we
// have a weak requirement for the ddp-rate-limiter package to be added
// to our application.
if (Package['ddp-rate-limiter']) {
var DDPRateLimiter = Package['ddp-rate-limiter'].DDPRateLimiter;
var rateLimiterInput = {
userId: self.userId,
clientAddress: self.connectionHandle.clientAddress,
type: "method",
name: msg.method,
connectionId: self.id
};
DDPRateLimiter._increment(rateLimiterInput);
var rateLimitResult = DDPRateLimiter._check(rateLimiterInput)
if (!rateLimitResult.allowed) {
reject(new Meteor.Error(
"too-many-requests",
DDPRateLimiter.getErrorMessage(rateLimitResult),
{timeToReset: rateLimitResult.timeToReset}
));
return;
}
}
resolve(DDPServer._CurrentWriteFence.withValue(
fence,
() => DDP._CurrentMethodInvocation.withValue(
invocation,
() => maybeAuditArgumentChecks(
handler, invocation, msg.params,
"call to '" + msg.method + "'"
)
)
));
});
async function finish() {
await fence.arm();
unblock();
}
const payload = {
msg: "result",
id: msg.id
};
return promise.then(async result => {
await finish();
if (result !== undefined) {
payload.result = result;
}
self.send(payload);
}, async (exception) => {
await finish();
payload.error = wrapInternalException(
exception,
`while invoking method '${msg.method}'`
);
self.send(payload);
});
}
},
_eachSub: function (f) {
var self = this;
self._namedSubs.forEach(f);
self._universalSubs.forEach(f);
},
_diffCollectionViews: function (beforeCVs) {
var self = this;
DiffSequence.diffMaps(beforeCVs, self.collectionViews, {
both: function (collectionName, leftValue, rightValue) {
rightValue.diff(leftValue);
},
rightOnly: function (collectionName, rightValue) {
rightValue.documents.forEach(function (docView, id) {
self.sendAdded(collectionName, id, docView.getFields());
});
},
leftOnly: function (collectionName, leftValue) {
leftValue.documents.forEach(function (doc, id) {
self.sendRemoved(collectionName, id);
});
}
});
},
// Sets the current user id in all appropriate contexts and reruns
// all subscriptions
async _setUserId(userId) {
var self = this;
if (userId !== null && typeof userId !== "string")
throw new Error("setUserId must be called on string or null, not " +
typeof userId);
// Prevent newly-created universal subscriptions from being added to our
// session. They will be found below when we call startUniversalSubs.
//
// (We don't have to worry about named subscriptions, because we only add
// them when we process a 'sub' message. We are currently processing a
// 'method' message, and the method did not unblock, because it is illegal
// to call setUserId after unblock. Thus we cannot be concurrently adding a
// new named subscription).
self._dontStartNewUniversalSubs = true;
// Prevent current subs from updating our collectionViews and call their
// stop callbacks. This may yield.
self._eachSub(function (sub) {
sub._deactivate();
});
// All subs should now be deactivated. Stop sending messages to the client,
// save the state of the published collections, reset to an empty view, and
// update the userId.
self._isSending = false;
var beforeCVs = self.collectionViews;
self.collectionViews = new Map();
self.userId = userId;
// _setUserId is normally called from a Meteor method with
// DDP._CurrentMethodInvocation set. But DDP._CurrentMethodInvocation is not
// expected to be set inside a publish function, so we temporary unset it.
// Inside a publish function DDP._CurrentPublicationInvocation is set.
await DDP._CurrentMethodInvocation.withValue(undefined, async function () {
// Save the old named subs, and reset to having no subscriptions.
var oldNamedSubs = self._namedSubs;
self._namedSubs = new Map();
self._universalSubs = [];
await Promise.all([...oldNamedSubs].map(async ([subscriptionId, sub]) => {
const newSub = sub._recreate();
self._namedSubs.set(subscriptionId, newSub);
// nb: if the handler throws or calls this.error(), it will in fact
// immediately send its 'nosub'. This is OK, though.
await newSub._runHandler();
}));
// Allow newly-created universal subs to be started on our connection in
// parallel with the ones we're spinning up here, and spin up universal
// subs.
self._dontStartNewUniversalSubs = false;
self.startUniversalSubs();
}, { name: '_setUserId' });
// Start sending messages again, beginning with the diff from the previous
// state of the world to the current state. No yields are allowed during
// this diff, so that other changes cannot interleave.
Meteor._noYieldsAllowed(function () {
self._isSending = true;
self._diffCollectionViews(beforeCVs);
if (!isEmpty(self._pendingReady)) {
self.sendReady(self._pendingReady);
self._pendingReady = [];
}
});
},
_startSubscription: function (handler, subId, params, name) {
var self = this;
var sub = new Subscription(
self, handler, subId, params, name);
let unblockHander = self.cachedUnblock;
// _startSubscription may call from a lot places
// so cachedUnblock might be null in somecases
// assign the cachedUnblock
sub.unblock = unblockHander || (() => {});
if (subId)
self._namedSubs.set(subId, sub);
else
self._universalSubs.push(sub);
return sub._runHandler();
},
// Tear down specified subscription
_stopSubscription: function (subId, error) {
var self = this;
var subName = null;
if (subId) {
var maybeSub = self._namedSubs.get(subId);
if (maybeSub) {
subName = maybeSub._name;
maybeSub._removeAllDocuments();
maybeSub._deactivate();
self._namedSubs.delete(subId);
}
}
var response = {msg: 'nosub', id: subId};
if (error) {
response.error = wrapInternalException(
error,
subName ? ("from sub " + subName + " id " + subId)
: ("from sub id " + subId));
}
self.send(response);
},
// Tear down all subscriptions. Note that this does NOT send removed or nosub
// messages, since we assume the client is gone.
_deactivateAllSubscriptions: function () {
var self = this;
self._namedSubs.forEach(function (sub, id) {
sub._deactivate();
});
self._namedSubs = new Map();
self._universalSubs.forEach(function (sub) {
sub._deactivate();
});
self._universalSubs = [];
},
// Determine the remote client's IP address, based on the
// HTTP_FORWARDED_COUNT environment variable representing how many
// proxies the server is behind.
_clientAddress: function () {
var self = this;
// For the reported client address for a connection to be correct,
// the developer must set the HTTP_FORWARDED_COUNT environment
// variable to an integer representing the number of hops they
// expect in the `x-forwarded-for` header. E.g., set to "1" if the
// server is behind one proxy.
//
// This could be computed once at startup instead of every time.
var httpForwardedCount = parseInt(process.env['HTTP_FORWARDED_COUNT']) || 0;
if (httpForwardedCount === 0)
return self.socket.remoteAddress;
var forwardedFor = self.socket.headers["x-forwarded-for"];
if (!isString(forwardedFor))
return null;
forwardedFor = forwardedFor.trim().split(/\s*,\s*/);
// Typically the first value in the `x-forwarded-for` header is
// the original IP address of the client connecting to the first
// proxy. However, the end user can easily spoof the header, in
// which case the first value(s) will be the fake IP address from
// the user pretending to be a proxy reporting the original IP
// address value. By counting HTTP_FORWARDED_COUNT back from the
// end of the list, we ensure that we get the IP address being
// reported by *our* first proxy.
if (httpForwardedCount < 0 || httpForwardedCount > forwardedFor.length)
return null;
return forwardedFor[forwardedFor.length - httpForwardedCount];
}
});
/******************************************************************************/
/* Subscription */
/******************************************************************************/
// Ctor for a sub handle: the input to each publish function
// Instance name is this because it's usually referred to as this inside a
// publish
/**
* @summary The server's side of a subscription
* @class Subscription
* @instanceName this
* @showInstanceName true
*/
var Subscription = function (
session, handler, subscriptionId, params, name) {
var self = this;
self._session = session; // type is Session
/**
* @summary Access inside the publish function. The incoming [connection](#meteor_onconnection) for this subscription.
* @locus Server
* @name connection
* @memberOf Subscription
* @instance
*/
self.connection = session.connectionHandle; // public API object
self._handler = handler;
// My subscription ID (generated by client, undefined for universal subs).
self._subscriptionId = subscriptionId;
// Undefined for universal subs
self._name = name;
self._params = params || [];
// Only named subscriptions have IDs, but we need some sort of string
// internally to keep track of all subscriptions inside
// SessionDocumentViews. We use this subscriptionHandle for that.
if (self._subscriptionId) {
self._subscriptionHandle = 'N' + self._subscriptionId;
} else {
self._subscriptionHandle = 'U' + Random.id();
}
// Has _deactivate been called?
self._deactivated = false;
// Stop callbacks to g/c this sub. called w/ zero arguments.
self._stopCallbacks = [];
// The set of (collection, documentid) that this subscription has
// an opinion about.
self._documents = new Map();
// Remember if we are ready.
self._ready = false;
// Part of the public API: the user of this sub.
/**
* @summary Access inside the publish function. The id of the logged-in user, or `null` if no user is logged in.
* @locus Server
* @memberOf Subscription
* @name userId
* @instance
*/
self.userId = session.userId;
// For now, the id filter is going to default to
// the to/from DDP methods on MongoID, to
// specifically deal with mongo/minimongo ObjectIds.
// Later, you will be able to make this be "raw"
// if you want to publish a collection that you know
// just has strings for keys and no funny business, to
// a DDP consumer that isn't minimongo.
self._idFilter = {
idStringify: MongoID.idStringify,
idParse: MongoID.idParse
};
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"livedata", "subscriptions", 1);
};
Object.assign(Subscription.prototype, {
_runHandler: async function() {
// XXX should we unblock() here? Either before running the publish
// function, or before running _publishCursor.
//
// Right now, each publish function blocks all future publishes and
// methods waiting on data from Mongo (or whatever else the function
// blocks on). This probably slows page load in common cases.
if (!this.unblock) {
this.unblock = () => {};
}
const self = this;
let resultOrThenable = null;
try {
resultOrThenable = DDP._CurrentPublicationInvocation.withValue(
self,
() =>
maybeAuditArgumentChecks(
self._handler,
self,
EJSON.clone(self._params),
// It's OK that this would look weird for universal subscriptions,
// because they have no arguments so there can never be an
// audit-argument-checks failure.
"publisher '" + self._name + "'"
),
{ name: self._name }
);
} catch (e) {
self.error(e);
return;
}
// Did the handler call this.error or this.stop?
if (self._isDeactivated()) return;
// Both conventional and async publish handler functions are supported.
// If an object is returned with a then() function, it is either a promise
// or thenable and will be resolved asynchronously.
const isThenable =
resultOrThenable && typeof resultOrThenable.then === 'function';
if (isThenable) {
try {
await self._publishHandlerResult(await resultOrThenable);
} catch(e) {
self.error(e)
}
} else {
await self._publishHandlerResult(resultOrThenable);
}
},
async _publishHandlerResult (res) {
// SPECIAL CASE: Instead of writing their own callbacks that invoke
// this.added/changed/ready/etc, the user can just return a collection
// cursor or array of cursors from the publish function; we call their
// _publishCursor method which starts observing the cursor and publishes the
// results. Note that _publishCursor does NOT call ready().
//
// XXX This uses an undocumented interface which only the Mongo cursor
// interface publishes. Should we make this interface public and encourage
// users to implement it themselves? Arguably, it's unnecessary; users can
// already write their own functions like
// var publishMyReactiveThingy = function (name, handler) {
// Meteor.publish(name, function () {
// var reactiveThingy = handler();
// reactiveThingy.publishMe();
// });
// };
var self = this;
var isCursor = function (c) {
return c && c._publishCursor;
};
if (isCursor(res)) {
try {