From 1ed2925c10710a653a471ba6d41ff8f1554e01a6 Mon Sep 17 00:00:00 2001 From: William Claydon Date: Sat, 10 Mar 2018 12:28:19 +1100 Subject: [PATCH 1/2] JSONB, PG library, and check max update --- README.md | 47 +++++++++++++++++-- index.js | 124 +++++++++++++++++--------------------------------- package.json | 2 +- structure.sql | 10 ++++ 4 files changed, 96 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index b006df0..794ed3b 100644 --- a/README.md +++ b/README.md @@ -8,15 +8,56 @@ Doesn't support queries (yet?). Moderately experimental. (This drives [Synaptograph](https://www.synaptograph.com)'s backend, and [@nornagon](https://github.com/nornagon) hasn't noticed any issues so far.) +## Requirements + +Due to the fix to resolve [high concurency issues](https://github.com/share/sharedb-postgres/issues/1) Postgres 9.5+ is now required. + +## Migrating older versions + +Older versions of this adaptor used the data type json. You will need to alter the data type prior to using if you are upgrading. + +```PLpgSQL +ALTER TABLE ops + ALTER COLUMN operation + SET DATA TYPE jsonb + USING operation::jsonb; + +ALTER TABLE snapshots + ALTER COLUMN data + SET DATA TYPE jsonb + USING data::jsonb; +``` + ## Usage -`sharedb-postgres` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options. +`sharedb-postgres-jsonb` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options. To instantiate a sharedb-postgres wrapper, invoke the module and pass in your -PostgreSQL configuration as an argument. For example: +PostgreSQL configuration as an argument or use environmental arguments. + +For example using environmental arugments: + +```js +var db = require('sharedb-postgres')(); +var backend = require('sharedb')({db: db}) +``` + +Then executing via the command line + +``` +PGUSER=dbuser PGPASSWORD=secretpassword PGHOST=database.server.com PGDATABASE=mydb PGPORT=5433 npm start +``` + +Example using an object ```js -var db = require('sharedb-postgres')('postgres://localhost/mydb'); +var db = require('sharedb-postgres')({ + user: 'dbuser', + host: 'database.server.com', + database: 'mydb', + password: 'secretpassword', + port: 5433, +}); var backend = require('sharedb')({db: db}) ``` diff --git a/index.js b/index.js index 3f7dc57..9d8359e 100644 --- a/index.js +++ b/index.js @@ -10,6 +10,7 @@ function PostgresDB(options) { this.closed = false; this.pg_config = options; + this.pool = new pg.Pool(this.pg_config) }; module.exports = PostgresDB; @@ -20,11 +21,6 @@ PostgresDB.prototype.close = function(callback) { if (callback) callback(); }; -function rollback(client, done) { - client.query('ROLLBACK', function(err) { - return done(err); - }) -} // Persists an op and snapshot if it is for the next version. Calls back with // callback(err, succeeded) @@ -39,91 +35,53 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca * } * snapshot: PostgresSnapshot */ - pg.connect(this.pg_config, function(err, client, done) { - if (err) { - done(client); - callback(err); - return; - } - function commit() { - client.query('COMMIT', function(err) { - done(err); - if (err) { - callback(err); - } else { - callback(null, true); - } - }) + this.pool.connect((err, client, done) => { + if (err) { + done(client); + callback(err); + return; + } + /*const*/ var query = { + name: 'sdb-commit-op-and-snap', + text: `With snaps as ( + Insert into snapshots (collection,doc_id,doc_type, version,data) + Select n.* From ( select $1 c, $2 d, $4 t, $3::integer v, $5::jsonb daa) + n + where v = (select version+1 v from snapshots where collection = $1 and doc_id = $2 for update) or not exists (select 1 from snapshots where collection = $1 and doc_id = $2 for update) + On conflict(collection, doc_id) do update set version = $3, data = $5 , doc_type = $4 + Returning version + ) + Insert into ops (collection,doc_id, version,operation) + Select n.* From ( select $1 c, $2 t, $3::integer v, $6::jsonb daa) + n + where (v = (select max(version)+1 v from ops where collection = $1 and doc_id = $2) or not exists (select 1 from ops where collection = $1 and doc_id = $2 for update)) and exists (select 1 from snaps) + Returning version`, + values: [collection,id,snapshot.v, snapshot.type, snapshot.data,op] } - client.query( - 'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2', - [collection, id], - function(err, res) { - var max_version = res.rows[0].max_version; - if (max_version == null) - max_version = 0; - if (snapshot.v !== max_version + 1) { - return callback(null, false); - } - client.query('BEGIN', function(err) { - client.query( - 'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)', - [collection, id, snapshot.v, op], - function(err, res) { - if (err) { - // TODO: if err is "constraint violation", callback(null, false) instead - rollback(client, done); - callback(err); - return; - } - if (snapshot.v === 1) { - client.query( - 'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)', - [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { - // TODO: - // if the insert was successful and did insert, callback(null, true) - // if the insert was successful and did not insert, callback(null, false) - // if there was an error, rollback and callback(error) - if (err) { - rollback(client, done); - callback(err); - return; - } - commit(); - } - ) - } else { - client.query( - 'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)', - [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { - // TODO: - // if any rows were updated, success - // if 0 rows were updated, rollback and not success - // if error, rollback and not success - if (err) { - rollback(client, done); - callback(err); - return; - } - commit(); - } - ) - } - } - ) - }) + client.query(query, (err, res) => { + if (err) { + console.log(err.stack) + callback(err) + } else if(res.rows.length === 0) { + done(client); + console.log("Unable to commit, not the latest version") + callback(null,false) + } + else { + done(client); + console.log(res.rows[0]) + callback(null,true) } - ) - }) + }) + + }) }; // Get the named document from the database. The callback is called with (err, // snapshot). A snapshot with a version of zero is returned if the docuemnt // has never been created in the database. PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) { - pg.connect(this.pg_config, function(err, client, done) { + this.pool.connect(function(err, client, done) { if (err) { done(client); callback(err); @@ -173,7 +131,7 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal // // Callback should be called as callback(error, [list of ops]); PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) { - pg.connect(this.pg_config, function(err, client, done) { + this.pool.connect(function(err, client, done) { if (err) { done(client); callback(err); diff --git a/package.json b/package.json index 3b1f446..f8863d3 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "keywords": ["sharedb", "sharejs", "share", "postgres"], "repository": "share/sharedb-postgres", "dependencies": { - "pg": "^4.5.1", + "pg": "^7.4.1", "sharedb": "^1.0.0-beta.7" } } diff --git a/structure.sql b/structure.sql index 606b5fc..fa9ec40 100644 --- a/structure.sql +++ b/structure.sql @@ -14,3 +14,13 @@ CREATE TABLE snapshots ( data json not null, PRIMARY KEY (collection, doc_id) ); + +ALTER TABLE ops + ALTER COLUMN operation + SET DATA TYPE jsonb + USING operation::jsonb; + +ALTER TABLE snapshots + ALTER COLUMN data + SET DATA TYPE jsonb + USING data::jsonb; \ No newline at end of file From 698bb625fc38dda547289585b57cb42c391baf86 Mon Sep 17 00:00:00 2001 From: William Claydon Date: Sat, 17 Mar 2018 18:06:53 +1100 Subject: [PATCH 2/2] typo --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 0293110..9f2ef25 100644 --- a/index.js +++ b/index.js @@ -50,7 +50,7 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca * (iff the new version is exactly 1 more than the latest table or if * the document id does not exists) * - * It will than Insert into the ops table if it is exactly 1 more than the + * It will then insert into the ops table if it is exactly 1 more than the * latest table or it the first operation and iff the previous insert into * the snapshot table is successful. *