diff --git a/index.js b/index.js index 442bd84..0e50263 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,5 @@ -var DB = require('sharedb').DB; -var pg = require('pg'); +var DB = require("sharedb").DB; +var pg = require("pg"); // Postgres-backed ShareDB database @@ -10,27 +10,34 @@ function PostgresDB(options) { this.closed = false; this.pool = new pg.Pool(options); -}; +} module.exports = PostgresDB; PostgresDB.prototype = Object.create(DB.prototype); -PostgresDB.prototype.close = function(callback) { +PostgresDB.prototype.close = function (callback) { this.closed = true; this.pool.end(); - + if (callback) callback(); }; function rollback(client, done) { - client.query('ROLLBACK', function(err) { + client.query("ROLLBACK", function (err) { return done(err); - }) + }); } // Persists an op and snapshot if it is for the next version. Calls back with // callback(err, succeeded) -PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, callback) { +PostgresDB.prototype.commit = function ( + collection, + id, + op, + snapshot, + options, + callback, +) { /* * op: CreateOp { * src: '24545654654646', @@ -41,37 +48,36 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca * } * snapshot: PostgresSnapshot */ - this.pool.connect(function(err, client, done) { + this.pool.connect(function (err, client, done) { if (err) { done(client); callback(err); return; } function commit() { - client.query('COMMIT', function(err) { + client.query("COMMIT", function (err) { done(err); if (err) { callback(err); } else { callback(null, true); } - }) + }); } client.query( - 'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2', + "SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2", [collection, id], - function(err, res) { + function (err, res) { var max_version = res.rows[0].max_version; - if (max_version == null) - max_version = 0; + if (max_version == null) max_version = 0; if (snapshot.v !== max_version + 1) { return callback(null, false); } - client.query('BEGIN', function(err) { + client.query("BEGIN", function (err) { client.query( - 'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)', + "INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)", [collection, id, snapshot.v, op], - function(err, res) { + function (err, res) { if (err) { // TODO: if err is "constraint violation", callback(null, false) instead rollback(client, done); @@ -80,9 +86,9 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca } if (snapshot.v === 1) { client.query( - 'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)', + "INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)", [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { + function (err, res) { // TODO: // if the insert was successful and did insert, callback(null, true) // if the insert was successful and did not insert, callback(null, false) @@ -93,13 +99,13 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca return; } commit(); - } - ) + }, + ); } else { client.query( - 'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)', + "UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)", [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { + function (err, res) { // TODO: // if any rows were updated, success // if 0 rows were updated, rollback and not success @@ -110,45 +116,51 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca return; } commit(); - } - ) + }, + ); } - } - ) - }) - } - ) - }) + }, + ); + }); + }, + ); + }); }; // Get the named document from the database. The callback is called with (err, // snapshot). A snapshot with a version of zero is returned if the docuemnt // has never been created in the database. -PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) { - this.pool.connect(function(err, client, done) { +PostgresDB.prototype.getSnapshot = function ( + collection, + id, + fields, + options, + callback, +) { + this.pool.connect(function (err, client, done) { if (err) { done(client); callback(err); return; } client.query( - 'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1', + "SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1", [collection, id], - function(err, res) { + function (err, res) { done(); if (err) { callback(err); return; } if (res.rows.length) { - var row = res.rows[0] + var row = res.rows[0]; var snapshot = new PostgresSnapshot( id, row.version, row.doc_type, row.data, - undefined // TODO: metadata - ) + undefined, // TODO: metadata + ); callback(null, snapshot); } else { var snapshot = new PostgresSnapshot( @@ -156,13 +168,13 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal 0, null, undefined, - undefined - ) + undefined, + ); callback(null, snapshot); } - } - ) - }) + }, + ); + }); }; // Get operations between [from, to) noninclusively. (Ie, the range should @@ -174,28 +186,86 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal // The version will be inferred from the parameters if it is missing. // // Callback should be called as callback(error, [list of ops]); -PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) { - this.pool.connect(function(err, client, done) { +PostgresDB.prototype.getOps = function ( + collection, + id, + from, + to, + options, + callback, +) { + this.pool.connect(function (err, client, done) { if (err) { done(client); callback(err); return; } client.query( - 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4', + "SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4", [collection, id, from, to], - function(err, res) { + function (err, res) { + done(); + if (err) { + callback(err); + return; + } + callback( + null, + res.rows.map(function (row) { + return row.operation; + }), + ); + }, + ); + }); +}; + +PostgresDB.prototype.query = function ( + collectionName, + inputQuery, + fields, + options, + callback, +) { + this.pool.connect(function (err, client, done) { + if (err) { + done(client); + callback(err); + return; + } + // TODO: more consistent parse + const limit = inputQuery["$limit"]; + const sort = inputQuery["$sort"]; + const sortKey = Object.keys(sort)[0]; + const sortDirection = sort[sortKey] === -1 ? "DESC" : "ASC"; + + client.query( + `SELECT version, data, doc_type FROM snapshots WHERE collection = $1 ORDER BY data->>$3 ${sortDirection} LIMIT $2`, + [collectionName, limit, sortKey], + function (err, res) { done(); if (err) { callback(err); return; } - callback(null, res.rows.map(function(row) { - return row.operation; - })); - } - ) - }) + if (res.rows.length) { + var snapshots = []; + for (var i = 0; i < res.rows.length; i++) { + const row = res.rows[i]; + var snapshot = new PostgresSnapshot( + row.data.documentId, + row.version, + row.doc_type, + row.data, + undefined, // TODO: metadata + ); + snapshots.push(snapshot); + } + callback(null, snapshots); + } + }, + ); + }); }; function PostgresSnapshot(id, version, type, data, meta) {