Skip to content

Fix for high concurrency issue #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,47 @@ Doesn't support queries (yet?).
Moderately experimental. (This drives [Synaptograph](https://www.synaptograph.com)'s backend, and [@nornagon](https://github.com/nornagon) hasn't noticed any issues so far.)


## Requirements

Due to the fix to resolve [high concurency issues](https://github.com/share/sharedb-postgres/issues/1) Postgres 9.5+ is now required.

## Migrating older versions

Older versions of this adaptor used the data type json. You will need to alter the data type prior to using if you are upgrading.

```PLpgSQL
ALTER TABLE ops
ALTER COLUMN operation
SET DATA TYPE jsonb
USING operation::jsonb;

ALTER TABLE snapshots
ALTER COLUMN data
SET DATA TYPE jsonb
USING data::jsonb;
```

## Usage

`sharedb-postgres` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options.

To instantiate a sharedb-postgres wrapper, invoke the module and pass in your
PostgreSQL configuration as an argument. For example:
PostgreSQL configuration as an argument or use environmental arguments.

For example using environmental arugments:

```js
var db = require('sharedb-postgres')();
var backend = require('sharedb')({db: db})
```

Then executing via the command line

```
PGUSER=dbuser PGPASSWORD=secretpassword PGHOST=database.server.com PGDATABASE=mydb PGPORT=5433 npm start
```

Example using an object

```js
var db = require('sharedb-postgres')({host: 'localhost', database: 'mydb'});
Expand Down
152 changes: 71 additions & 81 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ PostgresDB.prototype.close = function(callback) {
if (callback) callback();
};

function rollback(client, done) {
client.query('ROLLBACK', function(err) {
return done(err);
})
}

// Persists an op and snapshot if it is for the next version. Calls back with
// callback(err, succeeded)
Expand All @@ -41,84 +36,79 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
* }
* snapshot: PostgresSnapshot
*/
this.pool.connect(function(err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
function commit() {
client.query('COMMIT', function(err) {
done(err);
if (err) {
callback(err);
} else {
callback(null, true);
}
})
this.pool.connect((err, client, done) => {
if (err) {
done(client);
callback(err);
return;
}

/*
* This query uses common table expression to upsert the snapshot table
* (iff the new version is exactly 1 more than the latest table or if
* the document id does not exists)
*
* It will then insert into the ops table if it is exactly 1 more than the
* latest table or it the first operation and iff the previous insert into
* the snapshot table is successful.
*
* This result of this query the version of the newly inserted operation
* If either the ops or the snapshot insert fails then 0 rows are returned
*
* If 0 zeros are return then the callback must return false
*
* Casting is required as postgres thinks that collection and doc_id are
* not varchar
*/
const query = {
name: 'sdb-commit-op-and-snap',
text: `WITH snapshot_id AS (
INSERT INTO snapshots (collection, doc_id, doc_type, version, data)
SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type, $3 v, $5 d
WHERE $3 = (
SELECT version+1 v
FROM snapshots
WHERE collection = $1 AND doc_id = $2
FOR UPDATE
) OR NOT EXISTS (
SELECT 1
FROM snapshots
WHERE collection = $1 AND doc_id = $2
FOR UPDATE
)
ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4
RETURNING version
)
INSERT INTO ops (collection, doc_id, version, operation)
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation
WHERE (
$3 = (
SELECT max(version)+1
FROM ops
WHERE collection = $1 AND doc_id = $2
) OR NOT EXISTS (
SELECT 1
FROM ops
WHERE collection = $1 AND doc_id = $2
)
) AND EXISTS (SELECT 1 FROM snapshot_id)
RETURNING version`,
values: [collection,id,snapshot.v, snapshot.type, snapshot.data,op]
}
client.query(
'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2',
[collection, id],
function(err, res) {
var max_version = res.rows[0].max_version;
if (max_version == null)
max_version = 0;
if (snapshot.v !== max_version + 1) {
return callback(null, false);
}
client.query('BEGIN', function(err) {
client.query(
'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)',
[collection, id, snapshot.v, op],
function(err, res) {
if (err) {
// TODO: if err is "constraint violation", callback(null, false) instead
rollback(client, done);
callback(err);
return;
}
if (snapshot.v === 1) {
client.query(
'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)',
[collection, id, snapshot.type, snapshot.v, snapshot.data],
function(err, res) {
// TODO:
// if the insert was successful and did insert, callback(null, true)
// if the insert was successful and did not insert, callback(null, false)
// if there was an error, rollback and callback(error)
if (err) {
rollback(client, done);
callback(err);
return;
}
commit();
}
)
} else {
client.query(
'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)',
[collection, id, snapshot.type, snapshot.v, snapshot.data],
function(err, res) {
// TODO:
// if any rows were updated, success
// if 0 rows were updated, rollback and not success
// if error, rollback and not success
if (err) {
rollback(client, done);
callback(err);
return;
}
commit();
}
)
}
}
)
})
client.query(query, (err, res) => {
if (err) {
callback(err)
} else if(res.rows.length === 0) {
done(client);
callback(null,false)
}
else {
done(client);
callback(null,true)
}
)
})
})

})
};

// Get the named document from the database. The callback is called with (err,
Expand Down
4 changes: 2 additions & 2 deletions structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE ops (
collection character varying(255) not null,
doc_id character varying(255) not null,
version integer not null,
operation json not null, -- {v:0, create:{...}} or {v:n, op:[...]}
operation jsonb not null, -- {v:0, create:{...}} or {v:n, op:[...]}
PRIMARY KEY (collection, doc_id, version)
);

Expand All @@ -11,6 +11,6 @@ CREATE TABLE snapshots (
doc_id character varying(255) not null,
doc_type character varying(255) not null,
version integer not null,
data json not null,
data jsonb not null,
PRIMARY KEY (collection, doc_id)
);