Skip to content

Commit 51d033c

Browse files
authored
Subscribe to changes using logical replication (porsager#220)
* Got logical replication subscription working * Add docs for subscribe()
1 parent ce80061 commit 51d033c

File tree

5 files changed

+305
-6
lines changed

5 files changed

+305
-6
lines changed

.eslintrc.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"root": true,
33
"env": {
4-
"node": true,
5-
"es6": true
4+
"es2020": true,
5+
"node": true
66
},
77
"parserOptions": {
88
"ecmaVersion": 9,

README.md

+47
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,52 @@ sql.file(path.join(__dirname, 'query.sql'), [], {
463463

464464
```
465465

466+
## Subscribe / Realtime
467+
468+
Postgres.js implements the logical replication protocol of PostgreSQL to support subscription to realtime updates of `insert`, `update` and `delete` operations.
469+
470+
> **NOTE** To make this work you must [create the proper publications in your database](https://www.postgresql.org/docs/current/sql-createpublication.html), enable logical replication by setting `wal_level = logical` in `postgresql.conf` and connect using either a replication or superuser.
471+
472+
### Quick start
473+
474+
#### Create a publication (eg. in migration)
475+
```sql
476+
CREATE PUBLICATION alltables FOR ALL TABLES
477+
```
478+
479+
#### Subscribe to updates
480+
```js
481+
const sql = postgres({ publications: 'alltables' })
482+
483+
const { unsubscribe } = await sql.subscribe('insert:events', row =>
484+
// tell about new event row over eg. websockets or do something else
485+
)
486+
```
487+
488+
### Subscribe pattern
489+
490+
You can subscribe to specific operations, tables or even rows with primary keys.
491+
492+
### `operation` `:` `schema` `.` `table` `=` `primary_key`
493+
494+
**`operation`** is one of ``` * | insert | update | delete ``` and defaults to `*`
495+
496+
**`schema`** defaults to `public.`
497+
498+
**`table`** is a specific table name and defaults to `*`
499+
500+
**`primary_key`** can be used to only subscribe to specific rows
501+
502+
#### Examples
503+
504+
```js
505+
sql.subscribe('*', () => /* everything */ )
506+
sql.subscribe('insert', () => /* all inserts */ )
507+
sql.subscribe('*:users', () => /* all operations on the public.users table */ )
508+
sql.subscribe('delete:users', () => /* all deletes on the public.users table */ )
509+
sql.subscribe('update:users=1', () => /* all updates on the users row with a primary key = 1 */ )
510+
```
511+
466512
## Transactions
467513

468514

@@ -533,6 +579,7 @@ sql.begin(async sql => {
533579

534580
Do note that you can often achieve the same result using [`WITH` queries (Common Table Expressions)](https://www.postgresql.org/docs/current/queries-with.html) instead of using transactions.
535581

582+
536583
## Custom Types
537584

538585
You can add ergonomic support for custom types, or simply pass an object with a `{ type, value }` signature that contains the Postgres `oid` for the type and the correctly serialized value. _(`oid` values for types can be found in the `pg_catalog.pg_types` table.)_

lib/index.js

+13-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const Url = require('url')
33
const Stream = require('stream')
44
const Connection = require('./connection.js')
55
const Queue = require('./queue.js')
6+
const Subscribe = require('./subscribe.js')
67
const { errors, PostgresError } = require('./errors.js')
78
const {
89
mergeUserTypes,
@@ -59,6 +60,7 @@ function Postgres(a, b) {
5960
const options = parseOptions(a, b)
6061

6162
const max = Math.max(1, options.max)
63+
, subscribe = Subscribe(Postgres, a, b)
6264
, transform = options.transform
6365
, connections = Queue()
6466
, all = []
@@ -82,6 +84,7 @@ function Postgres(a, b) {
8284
Object.assign(postgres, {
8385
options: Object.assign({}, options, { pass: null }),
8486
parameters: {},
87+
subscribe,
8588
listen,
8689
begin,
8790
end
@@ -259,7 +262,7 @@ function Postgres(a, b) {
259262
function fetchArrayTypes(connection) {
260263
return arrayTypesPromise || (arrayTypesPromise =
261264
new Promise((resolve, reject) => {
262-
send(connection, { resolve, reject, tagged: false, prepare: false, origin: new Error().stack }, `
265+
send(connection, { resolve, reject, simple: true, tagged: false, prepare: false, origin: new Error().stack }, `
263266
select b.oid, b.typarray
264267
from pg_catalog.pg_type a
265268
left join pg_catalog.pg_type b on b.oid = a.typelem
@@ -397,8 +400,8 @@ function Postgres(a, b) {
397400
: query.writable.push({ chunk, callback })
398401
},
399402
destroy(error, callback) {
400-
query.writable.push({ error })
401403
callback(error)
404+
query.writable.push({ error })
402405
},
403406
final(callback) {
404407
if (error)
@@ -472,10 +475,16 @@ function Postgres(a, b) {
472475
let destroy
473476

474477
return ended = Promise.race([
475-
Promise.resolve(arrayTypesPromise).then(() => Promise.all(all.map(c => c.end())))
478+
Promise.resolve(arrayTypesPromise).then(() => Promise.all(
479+
(subscribe.sql ? [subscribe.sql.end({ timeout: 0 })] : []).concat(all.map(c => c.end()))
480+
))
476481
].concat(
477482
timeout === 0 || timeout > 0
478-
? new Promise(r => destroy = setTimeout(() => (all.map(c => c.destroy()), r()), timeout * 1000))
483+
? new Promise(r => destroy = setTimeout(() => (
484+
subscribe.sql && subscribe.sql.end({ timeout }),
485+
all.map(c => c.destroy()),
486+
r()
487+
), timeout * 1000))
479488
: []
480489
))
481490
.then(() => clearTimeout(destroy))

lib/subscribe.js

+210
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
module.exports = function(postgres, a, b) {
2+
const listeners = new Map()
3+
4+
let connection
5+
6+
return async function subscribe(event, fn) {
7+
event = parseEvent(event)
8+
9+
const options = typeof a === 'string' ? b : a || {}
10+
options.max = 1
11+
options.connection = {
12+
...options.connection,
13+
replication: 'database'
14+
}
15+
16+
const sql = postgres(a, b)
17+
18+
!connection && (subscribe.sql = sql, connection = init(sql, options.publications))
19+
20+
const fns = listeners.has(event)
21+
? listeners.get(event).add(fn)
22+
: listeners.set(event, new Set([fn]))
23+
24+
const unsubscribe = () => {
25+
fns.delete(fn)
26+
fns.size === 0 && listeners.delete(event)
27+
}
28+
29+
return connection.then(() => ({ unsubscribe }))
30+
}
31+
32+
async function init(sql, publications = 'alltables') {
33+
if (!publications)
34+
throw new Error('Missing publication names')
35+
36+
const slot = 'postgresjs_' + Math.random().toString(36).slice(2)
37+
const [x] = await sql.unsafe(
38+
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
39+
)
40+
41+
const stream = sql.unsafe(
42+
`START_REPLICATION SLOT ${ slot } LOGICAL ${
43+
x.consistent_point
44+
} (proto_version '1', publication_names '${ publications }')`
45+
).writable()
46+
47+
const state = {
48+
lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex')))
49+
}
50+
51+
stream.on('data', data)
52+
53+
function data(x) {
54+
if (x[0] === 0x77)
55+
parse(x.slice(25), state, sql.options.parsers, handle)
56+
else if (x[0] === 0x6b && x[17])
57+
pong()
58+
}
59+
60+
function handle(a, b) {
61+
const path = b.relation.schema + '.' + b.relation.table
62+
call('*', a, b)
63+
call('*:' + path, a, b)
64+
b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b)
65+
call(b.command, a, b)
66+
call(b.command + ':' + path, a, b)
67+
b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b)
68+
}
69+
70+
function pong() {
71+
const x = Buffer.alloc(34)
72+
x[0] = 'r'.charCodeAt(0)
73+
x.fill(state.lsn, 1)
74+
x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25)
75+
stream.write(x)
76+
}
77+
}
78+
79+
function call(x, a, b) {
80+
listeners.has(x) && listeners.get(x).forEach(fn => fn(a, b, x))
81+
}
82+
}
83+
84+
function Time(x) {
85+
return new Date(Date.UTC(2000, 0, 1) + Number(x / BigInt(1000)))
86+
}
87+
88+
function parse(x, state, parsers, handle) {
89+
const char = (acc, [k, v]) => (acc[k.charCodeAt(0)] = v, acc)
90+
91+
Object.entries({
92+
R: x => { // Relation
93+
let i = 1
94+
const r = state[x.readInt32BE(i)] = {
95+
schema: String(x.slice(i += 4, i = x.indexOf(0, i))) || 'pg_catalog',
96+
table: String(x.slice(i + 1, i = x.indexOf(0, i + 1))),
97+
columns: Array(x.readInt16BE(i += 2)),
98+
keys: []
99+
}
100+
i += 2
101+
102+
let columnIndex = 0
103+
, column
104+
105+
while (i < x.length) {
106+
column = r.columns[columnIndex++] = {
107+
key: x[i++],
108+
name: String(x.slice(i, i = x.indexOf(0, i))),
109+
type: x.readInt32BE(i += 1),
110+
parser: parsers[x.readInt32BE(i)],
111+
atttypmod: x.readInt32BE(i += 4)
112+
}
113+
114+
column.key && r.keys.push(column)
115+
i += 4
116+
}
117+
},
118+
Y: () => { /* noop */ }, // Type
119+
O: () => { /* noop */ }, // Origin
120+
B: x => { // Begin
121+
state.date = Time(x.readBigInt64BE(9))
122+
state.lsn = x.slice(1, 9)
123+
},
124+
I: x => { // Insert
125+
let i = 1
126+
const relation = state[x.readInt32BE(i)]
127+
const row = {}
128+
tuples(x, row, relation.columns, i += 7)
129+
130+
handle(row, {
131+
command: 'insert',
132+
relation
133+
})
134+
},
135+
D: x => { // Delete
136+
let i = 1
137+
const relation = state[x.readInt32BE(i)]
138+
i += 4
139+
const key = x[i] === 75
140+
const row = key || x[i] === 79
141+
? {}
142+
: null
143+
144+
tuples(x, row, key ? relation.keys : relation.columns, i += 3)
145+
146+
handle(row, {
147+
command: 'delete',
148+
relation,
149+
key
150+
})
151+
},
152+
U: x => { // Update
153+
let i = 1
154+
const relation = state[x.readInt32BE(i)]
155+
i += 4
156+
const key = x[i] === 75
157+
const old = key || x[i] === 79
158+
? {}
159+
: null
160+
161+
old && (i = tuples(x, old, key ? relation.keys : relation.columns, ++i))
162+
163+
const row = {}
164+
i = tuples(x, row, relation.columns, i += 3)
165+
166+
handle(row, {
167+
command: 'update',
168+
relation,
169+
key,
170+
old
171+
})
172+
},
173+
T: () => { /* noop */ }, // Truncate,
174+
C: () => { /* noop */ } // Commit
175+
}).reduce(char, {})[x[0]](x)
176+
}
177+
178+
function tuples(x, row, columns, xi) {
179+
let type
180+
, column
181+
182+
for (let i = 0; i < columns.length; i++) {
183+
type = x[xi++]
184+
column = columns[i]
185+
row[column.name] = type === 110 // n
186+
? null
187+
: type === 117 // u
188+
? undefined
189+
: column.parser === undefined
190+
? x.toString('utf8', xi + 4, xi += 4 + x.readInt32BE(xi))
191+
: column.parser.array === true
192+
? column.parser(x.toString('utf8', xi + 5, xi += 4 + x.readInt32BE(xi)))
193+
: column.parser(x.toString('utf8', xi + 4, xi += 4 + x.readInt32BE(xi)))
194+
}
195+
196+
return xi
197+
}
198+
199+
function parseEvent(x) {
200+
const xs = x.match(/^(\*|insert|update|delete)?:?([^.]+?\.?[^=]+)?=?(.+)?/i) || []
201+
202+
if (!xs)
203+
throw new Error('Malformed subscribe pattern: ' + x)
204+
205+
const [, command, path, key] = xs
206+
207+
return (command || '*')
208+
+ (path ? ':' + (path.indexOf('.') === -1 ? 'public.' + path : path) : '')
209+
+ (key ? '=' + key : '')
210+
}

tests/index.js

+33
Original file line numberDiff line numberDiff line change
@@ -1494,3 +1494,36 @@ t('multiple queries before connect', async() => {
14941494
xs.map(x => x[0].x).join()
14951495
]
14961496
})
1497+
1498+
t('subscribe', async() => {
1499+
const sql = postgres({
1500+
database: 'postgres_js_test',
1501+
publications: 'alltables'
1502+
})
1503+
1504+
await sql.unsafe('create publication alltables for all tables')
1505+
1506+
const result = []
1507+
1508+
await sql.subscribe('*', (row, info) =>
1509+
result.push(info.command, row.name || row.id)
1510+
)
1511+
1512+
await sql`
1513+
create table test (
1514+
id serial primary key,
1515+
name text
1516+
)
1517+
`
1518+
await sql`insert into test (name) values ('Murray')`
1519+
await sql`update test set name = 'Rothbard'`
1520+
await sql`delete from test`
1521+
await delay(100)
1522+
return [
1523+
'insert,Murray,update,Rothbard,delete,1',
1524+
result.join(','),
1525+
await sql`drop table test`,
1526+
await sql`drop publication alltables`,
1527+
await sql.end()
1528+
]
1529+
})

0 commit comments

Comments
 (0)