diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..88d12aa --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,55 @@ +name: Test + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + test: + name: Node.js ${{ matrix.node }} + PostgreSQL ${{ matrix.postgres }} + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + node: + - 16 + - 18 + - 20 + postgres: + - 13 + - 14 + - 15 + - 16 + services: + postgres: + image: postgres:${{ matrix.postgres }} + env: + POSTGRES_HOST_AUTH_METHOD: trust + POSTGRES_DB: postgres + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + timeout-minutes: 10 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 + with: + node-version: ${{ matrix.node }} + - name: Install + run: npm install + - name: Test + run: npm test + env: + PGUSER: postgres + PGPASSWORD: postgres + PGDATABASE: postgres diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5513963 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +node_modules +package-lock.json + +# VS Code +.vscode/ diff --git a/.mocharc.js b/.mocharc.js new file mode 100644 index 0000000..510722e --- /dev/null +++ b/.mocharc.js @@ -0,0 +1,5 @@ +module.exports = { + timeout: 5_000, + file: './test/setup.js', + spec: '**/*.spec.js', +}; diff --git a/CHANGELOG.md b/CHANGELOG.md index 66db502..76f189d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## 4.0.1 + + - upgrade sharedb + + +## 4.0.0 + + - upgrade `pg` from 7.4.1 to 8.5.1 to prevent silent failure for node v14 or later, reported in [this issue](https://github.com/brianc/node-postgres/issues/2317). + - fix submit ops failed due to version mismatched - https://github.com/share/sharedb-postgres/issues/8 + + +# Change log in original repo + ## 3.0.0 Thanks to @billwashere, we upgraded to a more modern version of `pg` (4.5.1 -> diff --git a/README.md b/README.md index b385a69..7b0a6f2 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,61 @@ # sharedb-postgres -PostgreSQL database adapter for [sharedb](https://github.com/share/sharedb). This -driver can be used both as a snapshot store and oplog. +PostgreSQL database adapter for [sharedb](https://github.com/share/sharedb). This driver can be used both as a snapshot store and oplog. Doesn't support queries (yet?). Moderately experimental. (This drives [Synaptograph](https://www.synaptograph.com)'s backend, and [@nornagon](https://github.com/nornagon) hasn't noticed any issues so far.) + +## Installation + +```cmd +npm i sharedb-postgres +``` + + +## Requirements + +Due to the fix to resolve [high concurency issues](https://github.com/share/sharedb-postgres/issues/1) Postgres 9.5+ is now required. + +## Migrating older versions + +Older versions of this adaptor used the data type json. You will need to alter the data type prior to using if you are upgrading. + +```PLpgSQL +ALTER TABLE ops + ALTER COLUMN operation + SET DATA TYPE jsonb + USING operation::jsonb; + +ALTER TABLE snapshots + ALTER COLUMN data + SET DATA TYPE jsonb + USING data::jsonb; +``` + ## Usage -`sharedb-postgres` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options. +`sharedb-postgres-jsonb` wraps native [node-postgres](https://github.com/brianc/node-postgres), and it supports the same configuration options. To instantiate a sharedb-postgres wrapper, invoke the module and pass in your -PostgreSQL configuration as an argument. For example: +PostgreSQL configuration as an argument or use environmental arguments. + +For example using environmental arugments: + +```js +var db = require('sharedb-postgres')(); +var backend = require('sharedb')({db: db}) +``` + +Then executing via the command line + +``` +PGUSER=dbuser PGPASSWORD=secretpassword PGHOST=database.server.com PGDATABASE=mydb PGPORT=5433 npm start +``` + +Example using an object ```js var db = require('sharedb-postgres')({host: 'localhost', database: 'mydb'}); diff --git a/index.js b/index.js index 442bd84..0a093b1 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,8 @@ var DB = require('sharedb').DB; var pg = require('pg'); +const PG_UNIQUE_VIOLATION = '23505'; + // Postgres-backed ShareDB database function PostgresDB(options) { @@ -9,28 +11,32 @@ function PostgresDB(options) { this.closed = false; - this.pool = new pg.Pool(options); + this._pool = new pg.Pool(options); }; module.exports = PostgresDB; PostgresDB.prototype = Object.create(DB.prototype); -PostgresDB.prototype.close = function(callback) { - this.closed = true; - this.pool.end(); - - if (callback) callback(); +PostgresDB.prototype.close = async function(callback) { + let error; + try { + if (!this.closed) { + this.closed = true; + await this._pool.end(); + } + } catch (err) { + error = err; + } + + // FIXME: Don't swallow errors. Emit 'error' event? + if (callback) callback(error); }; -function rollback(client, done) { - client.query('ROLLBACK', function(err) { - return done(err); - }) -} // Persists an op and snapshot if it is for the next version. Calls back with // callback(err, succeeded) -PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, callback) { +PostgresDB.prototype.commit = async function(collection, id, op, snapshot, options, callback) { + try { /* * op: CreateOp { * src: '24545654654646', @@ -41,128 +47,97 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca * } * snapshot: PostgresSnapshot */ - this.pool.connect(function(err, client, done) { - if (err) { - done(client); - callback(err); - return; + /* + * This query uses common table expression to upsert the snapshot table + * (iff the new version is exactly 1 more than the latest table or if + * the document id does not exists) + * + * It will then insert into the ops table if it is exactly 1 more than the + * latest table or it the first operation and iff the previous insert into + * the snapshot table is successful. + * + * This result of this query the version of the newly inserted operation + * If either the ops or the snapshot insert fails then 0 rows are returned + * + * If 0 zeros are return then the callback must return false + * + * Casting is required as postgres thinks that collection and doc_id are + * not varchar + */ + const query = { + name: 'sdb-commit-op-and-snap', + text: `WITH snapshot_id AS ( + INSERT INTO snapshots (collection, doc_id, version, doc_type, data, metadata) + SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $4 doc_type, $5 d, $6 m + WHERE $3 = ( + SELECT version+1 v + FROM snapshots + WHERE collection = $1 AND doc_id = $2 + FOR UPDATE + ) OR NOT EXISTS ( + SELECT 1 + FROM snapshots + WHERE collection = $1 AND doc_id = $2 + FOR UPDATE + ) + ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4, metadata = $6 + RETURNING version +) +INSERT INTO ops (collection, doc_id, version, operation) +SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $7 operation +WHERE ( + $3 = ( + SELECT max(version)+1 + FROM ops + WHERE collection = $1 AND doc_id = $2 + ) OR NOT EXISTS ( + SELECT 1 + FROM ops + WHERE collection = $1 AND doc_id = $2 + ) +) AND EXISTS (SELECT 1 FROM snapshot_id) +RETURNING version`, + values: [collection, id, snapshot.v, snapshot.type, JSON.stringify(snapshot.data), JSON.stringify(snapshot.m), JSON.stringify(op)] } - function commit() { - client.query('COMMIT', function(err) { - done(err); - if (err) { - callback(err); - } else { - callback(null, true); - } - }) - } - client.query( - 'SELECT max(version) AS max_version FROM ops WHERE collection = $1 AND doc_id = $2', - [collection, id], - function(err, res) { - var max_version = res.rows[0].max_version; - if (max_version == null) - max_version = 0; - if (snapshot.v !== max_version + 1) { - return callback(null, false); - } - client.query('BEGIN', function(err) { - client.query( - 'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)', - [collection, id, snapshot.v, op], - function(err, res) { - if (err) { - // TODO: if err is "constraint violation", callback(null, false) instead - rollback(client, done); - callback(err); - return; - } - if (snapshot.v === 1) { - client.query( - 'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)', - [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { - // TODO: - // if the insert was successful and did insert, callback(null, true) - // if the insert was successful and did not insert, callback(null, false) - // if there was an error, rollback and callback(error) - if (err) { - rollback(client, done); - callback(err); - return; - } - commit(); - } - ) - } else { - client.query( - 'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)', - [collection, id, snapshot.type, snapshot.v, snapshot.data], - function(err, res) { - // TODO: - // if any rows were updated, success - // if 0 rows were updated, rollback and not success - // if error, rollback and not success - if (err) { - rollback(client, done); - callback(err); - return; - } - commit(); - } - ) - } - } - ) - }) - } - ) - }) + const result = await this._pool.query(query); + const success = result.rowCount > 0; + callback(null, success); + } catch (error) { + // Return non-success instead of duplicate key error, since this is + // expected to occur during simultaneous creates on the same id + if (error.code === PG_UNIQUE_VIOLATION) callback(null, false); + else callback(error); + } }; // Get the named document from the database. The callback is called with (err, // snapshot). A snapshot with a version of zero is returned if the docuemnt // has never been created in the database. -PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) { - this.pool.connect(function(err, client, done) { - if (err) { - done(client); - callback(err); - return; - } - client.query( - 'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1', +PostgresDB.prototype.getSnapshot = async function(collection, id, fields, options, callback) { + fields ||= {}; + options ||= {}; + const wantsMetadata = fields.$submit || options.metadata; + try { + const result = await this._pool.query( + 'SELECT version, data, doc_type, metadata FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1', [collection, id], - function(err, res) { - done(); - if (err) { - callback(err); - return; - } - if (res.rows.length) { - var row = res.rows[0] - var snapshot = new PostgresSnapshot( - id, - row.version, - row.doc_type, - row.data, - undefined // TODO: metadata - ) - callback(null, snapshot); - } else { - var snapshot = new PostgresSnapshot( - id, - 0, - null, - undefined, - undefined - ) - callback(null, snapshot); - } - } - ) - }) + ); + + var row = result.rows[0] + const snapshot = { + id, + v: row?.version || 0, + type: row?.doc_type || null, + data: row?.data || undefined, + m: wantsMetadata ? + // Postgres returns null but ShareDB expects undefined + (row?.metadata || undefined) : + null, + }; + callback(null, snapshot); + } catch (error) { + callback(error); + } }; // Get operations between [from, to) noninclusively. (Ie, the range should @@ -174,34 +149,21 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal // The version will be inferred from the parameters if it is missing. // // Callback should be called as callback(error, [list of ops]); -PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) { - this.pool.connect(function(err, client, done) { - if (err) { - done(client); - callback(err); - return; - } - client.query( - 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version >= $3 AND version < $4', - [collection, id, from, to], - function(err, res) { - done(); - if (err) { - callback(err); - return; - } - callback(null, res.rows.map(function(row) { - return row.operation; - })); - } - ) - }) +PostgresDB.prototype.getOps = async function(collection, id, from, to, options, callback) { + from ||= 0; + options ||= {}; + const wantsMetadata = options.metadata; + try { + var cmd = 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version > $3 '; + var params = [collection, id, from]; + if(to || to == 0) { cmd += ' AND version <= $4'; params.push(to)} + cmd += ' order by version'; + const result = await this._pool.query(cmd, params); + callback(null, result.rows.map(({operation}) => { + if (!wantsMetadata) delete operation.m; + return operation; + })); + } catch (error) { + callback(error); + } }; - -function PostgresSnapshot(id, version, type, data, meta) { - this.id = id; - this.v = version; - this.type = type; - this.data = data; - this.m = meta; -} diff --git a/index.spec.js b/index.spec.js new file mode 100644 index 0000000..b21888c --- /dev/null +++ b/index.spec.js @@ -0,0 +1,37 @@ +const PostgresDB = require('.'); +const {Pool} = require('pg'); +const fs = require('node:fs'); + +const DB_NAME = 'sharedbtest'; + +function create(callback) { + var db = new PostgresDB({database: DB_NAME}); + callback(null, db); +}; + +describe('PostgresDB', function() { + let pool; + let client; + + beforeEach(async () => { + pool = new Pool({database: 'postgres'}); + client = await pool.connect(); + await client.query(`DROP DATABASE IF EXISTS ${DB_NAME}`); + await client.query(`CREATE DATABASE ${DB_NAME}`); + + const testPool = new Pool({database: DB_NAME}); + const testClient = await testPool.connect(); + const structure = fs.readFileSync('./structure.sql', 'utf8'); + await testClient.query(structure); + await testClient.release(true); + await testPool.end(); + }); + + afterEach(async function() { + await client.query(`DROP DATABASE IF EXISTS ${DB_NAME}`); + await client.release(true); + await pool.end(); + }); + + require('sharedb/test/db')({create: create}); +}); diff --git a/package.json b/package.json index a3d57cb..f83872a 100644 --- a/package.json +++ b/package.json @@ -1,17 +1,32 @@ { "name": "sharedb-postgres", - "version": "3.0.0", - "description": "PostgreSQL adapter for ShareDB", + "version": "5.0.1", + "description": "PostgreSQL adapter for ShareDB. forked from share/sharedb-postgres", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "test": "mocha" }, "author": "Jeremy Apthorp ", "license": "MIT", - "keywords": ["sharedb", "sharejs", "share", "postgres"], - "repository": "share/sharedb-postgres", + "keywords": [ + "sharedb", + "sharejs", + "share", + "postgres" + ], + "repository": { + "type": "git", + "url": "https://github.com/share/sharedb-postgres" + }, "dependencies": { - "pg": "^8.5.1", - "sharedb": "^1.0.0-beta.7" + "pg": "^8.12.0", + "sharedb": "^1.6.0 || ^2.0.0 || ^3.0.0 || ^4.0.0 || ^5.0.0" + }, + "devDependencies": { + "chai": "^4.4.1", + "mocha": "^10.4.0", + "ot-json1": "^1.0.2", + "rich-text": "^4.1.0", + "sinon": "^18.0.0" } } diff --git a/sample-cmd b/sample-cmd new file mode 100644 index 0000000..086def8 --- /dev/null +++ b/sample-cmd @@ -0,0 +1,16 @@ +/*var query = { + name: "sdb-commit-op-and-snap", + text: ` + with snaps as ( + insert into snapshots (collection, doc_id, doc_type, version, data) + values ($1, $2, $3, $4, $5) + on conflict (collection, doc_id, version) do update set doc_type = $3, data = $5 + returning version + ) + insert into ops ( collection, doc_id, version, operation ) + values ($1, $2, $4, $6) on conflict (collection, doc_id, version) do update set operation = $6 + returning version + `, + values: [collection, id, snapshot.type, snapshot.v, snapshot.data, op] +}*/ + diff --git a/structure.sql b/structure.sql index 606b5fc..fe1c932 100644 --- a/structure.sql +++ b/structure.sql @@ -1,16 +1,42 @@ -CREATE TABLE ops ( +CREATE TABLE IF NOT EXISTS ops ( collection character varying(255) not null, doc_id character varying(255) not null, version integer not null, - operation json not null, -- {v:0, create:{...}} or {v:n, op:[...]} + operation jsonb not null, -- {v:0, create:{...}} or {v:n, op:[...]} PRIMARY KEY (collection, doc_id, version) ); -CREATE TABLE snapshots ( +CREATE TABLE IF NOT EXISTS snapshots ( collection character varying(255) not null, doc_id character varying(255) not null, doc_type character varying(255) not null, version integer not null, - data json not null, + data jsonb not null, PRIMARY KEY (collection, doc_id) ); + +CREATE INDEX IF NOT EXISTS snapshots_version ON snapshots (collection, doc_id); + +ALTER TABLE ops + ALTER COLUMN operation + SET DATA TYPE jsonb + USING operation::jsonb; + +ALTER TABLE snapshots + ALTER COLUMN data + SET DATA TYPE jsonb + USING data::jsonb; + + +-- v5.0.0 -- + +ALTER TABLE snapshots + ALTER column doc_type + DROP NOT NULL; + +ALTER TABLE snapshots + ALTER column data + DROP NOT NULL; + +ALTER TABLE snapshots + ADD metadata jsonb; diff --git a/test/setup.js b/test/setup.js new file mode 100644 index 0000000..0dfcb5a --- /dev/null +++ b/test/setup.js @@ -0,0 +1,10 @@ +var logger = require('sharedb/lib/logger'); + +if (process.env.LOGGING !== 'true') { + // Silence the logger for tests by setting all its methods to no-ops + logger.setMethods({ + info: function() {}, + warn: function() {}, + error: function() {} + }); +}