Skip to content

Commit cbb7266

Browse files
committed
add basic query handler
1 parent 317ad9e commit cbb7266

File tree

1 file changed

+124
-54
lines changed

1 file changed

+124
-54
lines changed

index.js

Lines changed: 124 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
var DB = require('sharedb').DB;
2-
var pg = require('pg');
1+
var DB = require("sharedb").DB;
2+
var pg = require("pg");
33

44
// Postgres-backed ShareDB database
55

@@ -10,27 +10,34 @@ function PostgresDB(options) {
1010
this.closed = false;
1111

1212
this.pool = new pg.Pool(options);
13-
};
13+
}
1414
module.exports = PostgresDB;
1515

1616
PostgresDB.prototype = Object.create(DB.prototype);
1717

18-
PostgresDB.prototype.close = function(callback) {
18+
PostgresDB.prototype.close = function (callback) {
1919
this.closed = true;
2020
this.pool.end();
21-
21+
2222
if (callback) callback();
2323
};
2424

2525
function rollback(client, done) {
26-
client.query('ROLLBACK', function(err) {
26+
client.query("ROLLBACK", function (err) {
2727
return done(err);
28-
})
28+
});
2929
}
3030

3131
// Persists an op and snapshot if it is for the next version. Calls back with
3232
// callback(err, succeeded)
33-
PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, callback) {
33+
PostgresDB.prototype.commit = function (
34+
collection,
35+
id,
36+
op,
37+
snapshot,
38+
options,
39+
callback,
40+
) {
3441
/*
3542
* op: CreateOp {
3643
* src: '24545654654646',
@@ -41,37 +48,36 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
4148
* }
4249
* snapshot: PostgresSnapshot
4350
*/
44-
this.pool.connect(function(err, client, done) {
51+
this.pool.connect(function (err, client, done) {
4552
if (err) {
4653
done(client);
4754
callback(err);
4855
return;
4956
}
5057
function commit() {
51-
client.query('COMMIT', function(err) {
58+
client.query("COMMIT", function (err) {
5259
done(err);
5360
if (err) {
5461
callback(err);
5562
} else {
5663
callback(null, true);
5764
}
58-
})
65+
});
5966
}
6067
client.query(
61-
'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2',
68+
"SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2",
6269
[collection, id],
63-
function(err, res) {
70+
function (err, res) {
6471
var max_version = res.rows[0].max_version;
65-
if (max_version == null)
66-
max_version = 0;
72+
if (max_version == null) max_version = 0;
6773
if (snapshot.v !== max_version + 1) {
6874
return callback(null, false);
6975
}
70-
client.query('BEGIN', function(err) {
76+
client.query("BEGIN", function (err) {
7177
client.query(
72-
'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)',
78+
"INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)",
7379
[collection, id, snapshot.v, op],
74-
function(err, res) {
80+
function (err, res) {
7581
if (err) {
7682
// TODO: if err is "constraint violation", callback(null, false) instead
7783
rollback(client, done);
@@ -80,9 +86,9 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
8086
}
8187
if (snapshot.v === 1) {
8288
client.query(
83-
'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)',
89+
"INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)",
8490
[collection, id, snapshot.type, snapshot.v, snapshot.data],
85-
function(err, res) {
91+
function (err, res) {
8692
// TODO:
8793
// if the insert was successful and did insert, callback(null, true)
8894
// if the insert was successful and did not insert, callback(null, false)
@@ -93,13 +99,13 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
9399
return;
94100
}
95101
commit();
96-
}
97-
)
102+
},
103+
);
98104
} else {
99105
client.query(
100-
'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)',
106+
"UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)",
101107
[collection, id, snapshot.type, snapshot.v, snapshot.data],
102-
function(err, res) {
108+
function (err, res) {
103109
// TODO:
104110
// if any rows were updated, success
105111
// if 0 rows were updated, rollback and not success
@@ -110,59 +116,65 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
110116
return;
111117
}
112118
commit();
113-
}
114-
)
119+
},
120+
);
115121
}
116-
}
117-
)
118-
})
119-
}
120-
)
121-
})
122+
},
123+
);
124+
});
125+
},
126+
);
127+
});
122128
};
123129

124130
// Get the named document from the database. The callback is called with (err,
125131
// snapshot). A snapshot with a version of zero is returned if the docuemnt
126132
// has never been created in the database.
127-
PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) {
128-
this.pool.connect(function(err, client, done) {
133+
PostgresDB.prototype.getSnapshot = function (
134+
collection,
135+
id,
136+
fields,
137+
options,
138+
callback,
139+
) {
140+
this.pool.connect(function (err, client, done) {
129141
if (err) {
130142
done(client);
131143
callback(err);
132144
return;
133145
}
134146
client.query(
135-
'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1',
147+
"SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1",
136148
[collection, id],
137-
function(err, res) {
149+
function (err, res) {
138150
done();
139151
if (err) {
140152
callback(err);
141153
return;
142154
}
143155
if (res.rows.length) {
144-
var row = res.rows[0]
156+
var row = res.rows[0];
145157
var snapshot = new PostgresSnapshot(
146158
id,
147159
row.version,
148160
row.doc_type,
149161
row.data,
150-
undefined // TODO: metadata
151-
)
162+
undefined, // TODO: metadata
163+
);
152164
callback(null, snapshot);
153165
} else {
154166
var snapshot = new PostgresSnapshot(
155167
id,
156168
0,
157169
null,
158170
undefined,
159-
undefined
160-
)
171+
undefined,
172+
);
161173
callback(null, snapshot);
162174
}
163-
}
164-
)
165-
})
175+
},
176+
);
177+
});
166178
};
167179

168180
// Get operations between [from, to) noninclusively. (Ie, the range should
@@ -174,28 +186,86 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
174186
// The version will be inferred from the parameters if it is missing.
175187
//
176188
// Callback should be called as callback(error, [list of ops]);
177-
PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) {
178-
this.pool.connect(function(err, client, done) {
189+
PostgresDB.prototype.getOps = function (
190+
collection,
191+
id,
192+
from,
193+
to,
194+
options,
195+
callback,
196+
) {
197+
this.pool.connect(function (err, client, done) {
179198
if (err) {
180199
done(client);
181200
callback(err);
182201
return;
183202
}
184203
client.query(
185-
'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4',
204+
"SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4",
186205
[collection, id, from, to],
187-
function(err, res) {
206+
function (err, res) {
207+
done();
208+
if (err) {
209+
callback(err);
210+
return;
211+
}
212+
callback(
213+
null,
214+
res.rows.map(function (row) {
215+
return row.operation;
216+
}),
217+
);
218+
},
219+
);
220+
});
221+
};
222+
223+
PostgresDB.prototype.query = function (
224+
collectionName,
225+
inputQuery,
226+
fields,
227+
options,
228+
callback,
229+
) {
230+
this.pool.connect(function (err, client, done) {
231+
if (err) {
232+
done(client);
233+
callback(err);
234+
return;
235+
}
236+
// TODO: more consistent parse
237+
const limit = inputQuery["$limit"];
238+
const sort = inputQuery["$sort"];
239+
const sortKey = Object.keys(sort)[0];
240+
const sortDirection = sort[sortKey] === -1 ? "DESC" : "ASC";
241+
242+
client.query(
243+
`SELECT version, data, doc_type FROM snapshots WHERE collection = $1 ORDER BY data->>$3 ${sortDirection} LIMIT $2`,
244+
[collectionName, limit, sortKey],
245+
function (err, res) {
188246
done();
189247
if (err) {
190248
callback(err);
191249
return;
192250
}
193-
callback(null, res.rows.map(function(row) {
194-
return row.operation;
195-
}));
196-
}
197-
)
198-
})
251+
if (res.rows.length) {
252+
var snapshots = [];
253+
for (var i = 0; i < res.rows.length; i++) {
254+
const row = res.rows[i];
255+
var snapshot = new PostgresSnapshot(
256+
row.data.documentId,
257+
row.version,
258+
row.doc_type,
259+
row.data,
260+
undefined, // TODO: metadata
261+
);
262+
snapshots.push(snapshot);
263+
}
264+
callback(null, snapshots);
265+
}
266+
},
267+
);
268+
});
199269
};
200270

201271
function PostgresSnapshot(id, version, type, data, meta) {

0 commit comments

Comments
 (0)