Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit f507ca1

Browse files
authored
Merge pull request share#262 from share/snapshot-by-timestamp
Fetch snapshot by time
2 parents 494d1a5 + 8e46271 commit f507ca1

17 files changed

+1111
-78
lines changed

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,27 @@ Get a read-only snapshot of a document at the requested version.
269269
}
270270
```
271271

272+
`connection.fetchSnapshotByTimestamp(collection, id, timestamp, callback): void;`
273+
Get a read-only snapshot of a document at the requested version.
274+
275+
* `collection` _(String)_
276+
Collection name of the snapshot
277+
* `id` _(String)_
278+
ID of the snapshot
279+
* `timestamp` _(number) [optional]_
280+
The timestamp of the desired snapshot. The returned snapshot will be the latest snapshot before the provided timestamp
281+
* `callback` _(Function)_
282+
Called with `(error, snapshot)`, where `snapshot` takes the following form:
283+
284+
```javascript
285+
{
286+
id: string; // ID of the snapshot
287+
v: number; // version number of the snapshot
288+
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
289+
data: any; // the snapshot
290+
}
291+
```
292+
272293
### Class: `ShareDB.Doc`
273294

274295
`doc.type` _(String_)
@@ -464,3 +485,5 @@ The `41xx` and `51xx` codes are reserved for use by ShareDB DB adapters, and the
464485
* 5018 - Required QueryEmitter listener not assigned
465486
* 5019 - getMilestoneSnapshot MilestoneDB method unimplemented
466487
* 5020 - saveMilestoneSnapshot MilestoneDB method unimplemented
488+
* 5021 - getMilestoneSnapshotAtOrBeforeTime MilestoneDB method unimplemented
489+
* 5022 - getMilestoneSnapshotAtOrAfterTime MilestoneDB method unimplemented

lib/agent.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ Agent.prototype._handleMessage = function(request, callback) {
303303
return this._submit(request.c, request.d, op, callback);
304304
case 'nf':
305305
return this._fetchSnapshot(request.c, request.d, request.v, callback);
306+
case 'nt':
307+
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
306308
default:
307309
callback({code: 4000, message: 'Invalid or unknown message'});
308310
}
@@ -589,3 +591,7 @@ Agent.prototype._createOp = function(request) {
589591
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
590592
this.backend.fetchSnapshot(this, collection, id, version, callback);
591593
};
594+
595+
Agent.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
596+
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
597+
};

lib/backend.js

Lines changed: 83 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ Backend.prototype.SNAPSHOT_TYPES = {
8181
// The current snapshot is being fetched (eg through backend.fetch)
8282
current: 'current',
8383
// A specific snapshot is being fetched by version (eg through backend.fetchSnapshot)
84-
byVersion: 'byVersion'
84+
byVersion: 'byVersion',
85+
// A specific snapshot is being fetch by timestamp (eg through backend.fetchSnapshotByTimestamp)
86+
byTimestamp: 'byTimestamp'
8587
};
8688

8789
Backend.prototype._shimDocAction = function() {
@@ -627,6 +629,8 @@ Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback)
627629

628630
Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
629631
var db = this.db;
632+
var backend = this;
633+
630634
this.milestoneDb.getMilestoneSnapshot(collection, id, version, function (error, milestoneSnapshot) {
631635
if (error) return callback(error);
632636

@@ -637,49 +641,98 @@ Backend.prototype._fetchSnapshot = function (collection, id, version, callback)
637641
db.getOps(collection, id, from, version, null, function (error, ops) {
638642
if (error) return callback(error);
639643

640-
var type = null;
641-
var data;
642-
var fetchedVersion = 0;
643-
644-
if (milestoneSnapshot) {
645-
type = types.map[milestoneSnapshot.type];
646-
if (!type) return callback({ code: 4008, message: 'Unknown type' });
647-
data = milestoneSnapshot.data;
648-
fetchedVersion = milestoneSnapshot.v;
649-
}
644+
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, function (error, snapshot) {
645+
if (error) return callback(error);
650646

651-
for (var index = 0; index < ops.length; index++) {
652-
var op = ops[index];
653-
fetchedVersion = op.v + 1;
654-
655-
if (op.create) {
656-
type = types.map[op.create.type];
657-
if (!type) return callback({ code: 4008, message: 'Unknown type' });
658-
data = type.create(op.create.data);
659-
} else if (op.del) {
660-
data = undefined;
661-
type = null;
662-
} else {
663-
data = type.apply(data, op.op);
647+
if (version > snapshot.v) {
648+
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
664649
}
665-
}
666650

667-
type = type ? type.uri : null;
651+
callback(null, snapshot);
652+
});
653+
});
654+
});
655+
};
668656

669-
if (version > fetchedVersion) {
670-
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
671-
}
657+
Backend.prototype.fetchSnapshotByTimestamp = function (agent, index, id, timestamp, callback) {
658+
var start = Date.now();
659+
var backend = this;
660+
var projection = this.projections[index];
661+
var collection = projection ? projection.target : index;
662+
var request = {
663+
agent: agent,
664+
index: index,
665+
collection: collection,
666+
id: id,
667+
timestamp: timestamp
668+
};
672669

673-
var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
670+
this._fetchSnapshotByTimestamp(collection, id, timestamp, function (error, snapshot) {
671+
if (error) return callback(error);
672+
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
673+
var snapshots = [snapshot];
674+
var snapshotType = backend.SNAPSHOT_TYPES.byTimestamp;
675+
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, snapshotType, function (error) {
676+
if (error) return callback(error);
677+
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
674678
callback(null, snapshot);
675679
});
676680
});
677681
};
678682

683+
Backend.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
684+
var db = this.db;
685+
var milestoneDb = this.milestoneDb;
686+
var backend = this;
687+
688+
var milestoneSnapshot;
689+
var from = 0;
690+
var to = null;
691+
692+
milestoneDb.getMilestoneSnapshotAtOrBeforeTime(collection, id, timestamp, function (error, snapshot) {
693+
if (error) return callback(error);
694+
milestoneSnapshot = snapshot;
695+
if (snapshot) from = snapshot.v;
696+
697+
milestoneDb.getMilestoneSnapshotAtOrAfterTime(collection, id, timestamp, function (error, snapshot) {
698+
if (error) return callback(error);
699+
if (snapshot) to = snapshot.v;
700+
701+
var options = {metadata: true};
702+
db.getOps(collection, id, from, to, options, function (error, ops) {
703+
if (error) return callback(error);
704+
filterOpsInPlaceBeforeTimestamp(ops, timestamp);
705+
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, callback);
706+
});
707+
});
708+
});
709+
};
710+
711+
Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, callback) {
712+
var snapshot = startingSnapshot || new Snapshot(id, 0, null, undefined, null);
713+
var error = ot.applyOps(snapshot, ops);
714+
callback(error, snapshot);
715+
};
716+
679717
function pluckIds(snapshots) {
680718
var ids = [];
681719
for (var i = 0; i < snapshots.length; i++) {
682720
ids.push(snapshots[i].id);
683721
}
684722
return ids;
685723
}
724+
725+
function filterOpsInPlaceBeforeTimestamp(ops, timestamp) {
726+
if (timestamp === null) {
727+
return;
728+
}
729+
730+
for (var i = 0; i < ops.length; i++) {
731+
var op = ops[i];
732+
var opTimestamp = op.m && op.m.ts;
733+
if (opTimestamp > timestamp) {
734+
ops.length = i;
735+
return;
736+
}
737+
}
738+
}

lib/client/connection.js

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
var Doc = require('./doc');
22
var Query = require('./query');
3-
var SnapshotRequest = require('./snapshot-request');
3+
var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-request');
4+
var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request');
45
var emitter = require('../emitter');
56
var ShareDBError = require('../error');
67
var types = require('../types');
@@ -233,6 +234,7 @@ Connection.prototype.handleMessage = function(message) {
233234
return this._handleBulkMessage(message, '_handleUnsubscribe');
234235

235236
case 'nf':
237+
case 'nt':
236238
return this._handleSnapshotFetch(err, message);
237239

238240
case 'f':
@@ -634,7 +636,35 @@ Connection.prototype.fetchSnapshot = function(collection, id, version, callback)
634636
}
635637

636638
var requestId = this.nextSnapshotRequestId++;
637-
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
639+
var snapshotRequest = new SnapshotVersionRequest(this, requestId, collection, id, version, callback);
640+
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
641+
snapshotRequest.send();
642+
};
643+
644+
/**
645+
* Fetch a read-only snapshot at a given timestamp
646+
*
647+
* @param collection - the collection name of the snapshot
648+
* @param id - the ID of the snapshot
649+
* @param timestamp (optional) - the timestamp to fetch
650+
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
651+
*
652+
* {
653+
* id: string; // ID of the snapshot
654+
* v: number; // version number of the snapshot
655+
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
656+
* data: any; // the snapshot
657+
* }
658+
*
659+
*/
660+
Connection.prototype.fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
661+
if (typeof timestamp === 'function') {
662+
callback = timestamp;
663+
timestamp = null;
664+
}
665+
666+
var requestId = this.nextSnapshotRequestId++;
667+
var snapshotRequest = new SnapshotTimestampRequest(this, requestId, collection, id, timestamp, callback);
638668
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
639669
snapshotRequest.send();
640670
};
Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,19 @@
1-
var Snapshot = require('../snapshot');
2-
var util = require('../util');
3-
var emitter = require('../emitter');
1+
var Snapshot = require('../../snapshot');
2+
var emitter = require('../../emitter');
43

54
module.exports = SnapshotRequest;
65

7-
function SnapshotRequest(connection, requestId, collection, id, version, callback) {
6+
function SnapshotRequest(connection, requestId, collection, id, callback) {
87
emitter.EventEmitter.call(this);
98

109
if (typeof callback !== 'function') {
1110
throw new Error('Callback is required for SnapshotRequest');
1211
}
1312

14-
if (!util.isValidVersion(version)) {
15-
throw new Error('Snapshot version must be a positive integer or null');
16-
}
17-
1813
this.requestId = requestId;
1914
this.connection = connection;
2015
this.id = id;
2116
this.collection = collection;
22-
this.version = version;
2317
this.callback = callback;
2418

2519
this.sent = false;
@@ -31,15 +25,7 @@ SnapshotRequest.prototype.send = function () {
3125
return;
3226
}
3327

34-
var message = {
35-
a: 'nf',
36-
id: this.requestId,
37-
c: this.collection,
38-
d: this.id,
39-
v: this.version,
40-
};
41-
42-
this.connection.send(message);
28+
this.connection.send(this._message());
4329
this.sent = true;
4430
};
4531

@@ -61,6 +47,8 @@ SnapshotRequest.prototype._handleResponse = function (error, message) {
6147
return this.callback(error);
6248
}
6349

64-
var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
50+
var metadata = message.meta ? message.meta : null;
51+
var snapshot = new Snapshot(this.id, message.v, message.type, message.data, metadata);
52+
6553
this.callback(null, snapshot);
6654
};
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
var SnapshotRequest = require('./snapshot-request');
2+
var util = require('../../util');
3+
4+
module.exports = SnapshotTimestampRequest;
5+
6+
function SnapshotTimestampRequest(connection, requestId, collection, id, timestamp, callback) {
7+
SnapshotRequest.call(this, connection, requestId, collection, id, callback);
8+
9+
if (!util.isValidTimestamp(timestamp)) {
10+
throw new Error('Snapshot timestamp must be a positive integer or null');
11+
}
12+
13+
this.timestamp = timestamp;
14+
}
15+
16+
SnapshotTimestampRequest.prototype = Object.create(SnapshotRequest.prototype);
17+
18+
SnapshotTimestampRequest.prototype._message = function () {
19+
return {
20+
a: 'nt',
21+
id: this.requestId,
22+
c: this.collection,
23+
d: this.id,
24+
ts: this.timestamp,
25+
};
26+
};
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
var SnapshotRequest = require('./snapshot-request');
2+
var util = require('../../util');
3+
4+
module.exports = SnapshotVersionRequest;
5+
6+
function SnapshotVersionRequest (connection, requestId, collection, id, version, callback) {
7+
SnapshotRequest.call(this, connection, requestId, collection, id, callback);
8+
9+
if (!util.isValidVersion(version)) {
10+
throw new Error('Snapshot version must be a positive integer or null');
11+
}
12+
13+
this.version = version;
14+
}
15+
16+
SnapshotVersionRequest.prototype = Object.create(SnapshotRequest.prototype);
17+
18+
SnapshotVersionRequest.prototype._message = function () {
19+
return {
20+
a: 'nf',
21+
id: this.requestId,
22+
c: this.collection,
23+
d: this.id,
24+
v: this.version,
25+
};
26+
};

0 commit comments

Comments
 (0)