diff --git a/README.md b/README.md index b385a69..24d81ee 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,47 @@ Doesn't support queries (yet?). Moderately experimental. (This drives [Synaptograph](https://www.synaptograph.com)'s backend, and [@nornagon](https://github.com/nornagon) hasn't noticed any issues so far.) +## Requirements + +Due to the fix to resolve [high concurency issues](https://github.com/share/sharedb-postgres/issues/1) Postgres 9.5+ is now required. + +## Migrating older versions + +Older versions of this adaptor used the data type json. You will need to alter the data type prior to using if you are upgrading. + +```PLpgSQL +ALTER TABLE ops + ALTER COLUMN operation + SET DATA TYPE jsonb + USING operation::jsonb; + +ALTER TABLE snapshots + ALTER COLUMN data + SET DATA TYPE jsonb + USING data::jsonb; +``` + ## Usage `sharedb-postgres` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options. To instantiate a sharedb-postgres wrapper, invoke the module and pass in your -PostgreSQL configuration as an argument. For example: +PostgreSQL configuration as an argument or use environmental arguments. + +For example using environmental arugments: + +```js +var db = require('sharedb-postgres')(); +var backend = require('sharedb')({db: db}) +``` + +Then executing via the command line + +``` +PGUSER=dbuser PGPASSWORD=secretpassword PGHOST=database.server.com PGDATABASE=mydb PGPORT=5433 npm start +``` + +Example using an object ```js var db = require('sharedb-postgres')({host: 'localhost', database: 'mydb'}); diff --git a/index.js b/index.js index 442bd84..da934f8 100644 --- a/index.js +++ b/index.js @@ -22,11 +22,6 @@ PostgresDB.prototype.close = function(callback) { if (callback) callback(); }; -function rollback(client, done) { - client.query('ROLLBACK', function(err) { - return done(err); - }) -} // Persists an op and snapshot if it is for the next version. Calls back with // callback(err, succeeded) @@ -41,84 +36,79 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca * } * snapshot: PostgresSnapshot */ - this.pool.connect(function(err, client, done) { - if (err) { - done(client); - callback(err); - return; - } - function commit() { - client.query('COMMIT', function(err) { - done(err); - if (err) { - callback(err); - } else { - callback(null, true); - } - }) + this.pool.connect((err, client, done) => { + if (err) { + done(client); + callback(err); + return; + } + + /* + * This query uses common table expression to upsert the snapshot table + * (iff the new version is exactly 1 more than the latest table or if + * the document id does not exists) + * + * It will then insert into the ops table if it is exactly 1 more than the + * latest table or it the first operation and iff the previous insert into + * the snapshot table is successful. + * + * This result of this query the version of the newly inserted operation + * If either the ops or the snapshot insert fails then 0 rows are returned + * + * If 0 zeros are return then the callback must return false + * + * Casting is required as postgres thinks that collection and doc_id are + * not varchar + */ + const query = { + name: 'sdb-commit-op-and-snap', + text: `WITH snapshot_id AS ( + INSERT INTO snapshots (collection, doc_id, doc_type, version, data) + SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type, $3 v, $5 d + WHERE $3 = ( + SELECT version+1 v + FROM snapshots + WHERE collection = $1 AND doc_id = $2 + FOR UPDATE + ) OR NOT EXISTS ( + SELECT 1 + FROM snapshots + WHERE collection = $1 AND doc_id = $2 + FOR UPDATE + ) + ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4 + RETURNING version +) +INSERT INTO ops (collection, doc_id, version, operation) +SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation +WHERE ( + $3 = ( + SELECT max(version)+1 + FROM ops + WHERE collection = $1 AND doc_id = $2 + ) OR NOT EXISTS ( + SELECT 1 + FROM ops + WHERE collection = $1 AND doc_id = $2 + ) +) AND EXISTS (SELECT 1 FROM snapshot_id) +RETURNING version`, + values: [collection,id,snapshot.v, snapshot.type, snapshot.data,op] } - client.query( - 'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2', - [collection, id], - function(err, res) { - var max_version = res.rows[0].max_version; - if (max_version == null) - max_version = 0; - if (snapshot.v !== max_version + 1) { - return callback(null, false); - } - client.query('BEGIN', function(err) { - client.query( - 'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)', - [collection, id, snapshot.v, op], - function(err, res) { - if (err) { - // TODO: if err is "constraint violation", callback(null, false) instead - rollback(client, done); - callback(err); - return; - } - if (snapshot.v === 1) { - client.query( - 'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)', - [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { - // TODO: - // if the insert was successful and did insert, callback(null, true) - // if the insert was successful and did not insert, callback(null, false) - // if there was an error, rollback and callback(error) - if (err) { - rollback(client, done); - callback(err); - return; - } - commit(); - } - ) - } else { - client.query( - 'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)', - [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { - // TODO: - // if any rows were updated, success - // if 0 rows were updated, rollback and not success - // if error, rollback and not success - if (err) { - rollback(client, done); - callback(err); - return; - } - commit(); - } - ) - } - } - ) - }) + client.query(query, (err, res) => { + if (err) { + callback(err) + } else if(res.rows.length === 0) { + done(client); + callback(null,false) + } + else { + done(client); + callback(null,true) } - ) - }) + }) + + }) }; // Get the named document from the database. The callback is called with (err, diff --git a/structure.sql b/structure.sql index 606b5fc..977988d 100644 --- a/structure.sql +++ b/structure.sql @@ -2,7 +2,7 @@ CREATE TABLE ops ( collection character varying(255) not null, doc_id character varying(255) not null, version integer not null, - operation json not null, -- {v:0, create:{...}} or {v:n, op:[...]} + operation jsonb not null, -- {v:0, create:{...}} or {v:n, op:[...]} PRIMARY KEY (collection, doc_id, version) ); @@ -11,6 +11,6 @@ CREATE TABLE snapshots ( doc_id character varying(255) not null, doc_type character varying(255) not null, version integer not null, - data json not null, + data jsonb not null, PRIMARY KEY (collection, doc_id) );