Skip to content

Commit 5097345

Browse files
committed
Fix subscribe reconnect and add onsubscribe method - fixes porsager#315
1 parent 94fea8f commit 5097345

File tree

4 files changed

+120
-43
lines changed

4 files changed

+120
-43
lines changed

README.md

+9-2
Original file line numberDiff line numberDiff line change
@@ -607,8 +607,15 @@ CREATE PUBLICATION alltables FOR ALL TABLES
607607
```js
608608
const sql = postgres({ publications: 'alltables' })
609609

610-
const { unsubscribe } = await sql.subscribe('insert:events', (row, { command, relation, key, old }) =>
611-
// tell about new event row over eg. websockets or do something else
610+
const { unsubscribe } = await sql.subscribe(
611+
'insert:events',
612+
function(row, { command, relation, key, old }) => {
613+
// Callback function for each row change
614+
// tell about new event row over eg. websockets or do something else
615+
},
616+
function onsubscribe() => {
617+
// Callback on initial connect and potential reconnects
618+
}
612619
)
613620
```
614621

src/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export default Postgres
4242

4343
function Postgres(a, b) {
4444
const options = parseOptions(a, b)
45-
, subscribe = Subscribe(Postgres, { ...options })
45+
, subscribe = options.no_subscribe || Subscribe(Postgres, { ...options })
4646

4747
let ending = false
4848

src/subscribe.js

+67-38
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,90 @@
1+
const noop = () => { /* noop */ }
2+
13
export default function Subscribe(postgres, options) {
2-
const listeners = new Map()
4+
const subscribers = new Map()
5+
, slot = 'postgresjs_' + Math.random().toString(36).slice(2)
6+
, state = {}
37

48
let connection
5-
6-
return async function subscribe(event, fn) {
7-
event = parseEvent(event)
8-
9-
options.max = 1
10-
options.onclose = onclose
11-
options.fetch_types = false
12-
options.connection = {
9+
, stream
10+
, ended = false
11+
12+
const sql = subscribe.sql = postgres({
13+
...options,
14+
max: 1,
15+
fetch_types: false,
16+
idle_timeout: null,
17+
max_lifetime: null,
18+
connection: {
1319
...options.connection,
1420
replication: 'database'
15-
}
21+
},
22+
onclose: async function() {
23+
if (ended)
24+
return
25+
stream = null
26+
state.pid = state.secret = undefined
27+
!ended && connected(await init(sql, slot, options.publications))
28+
subscribers.forEach(event => event.forEach(({ onsubscribe }) => onsubscribe()))
29+
},
30+
no_subscribe: true
31+
})
1632

17-
let stream
18-
, ended = false
33+
const end = sql.end
34+
, close = sql.close
1935

20-
const sql = postgres(options)
21-
, slot = 'postgresjs_' + Math.random().toString(36).slice(2)
22-
, end = sql.end
36+
sql.end = async() => {
37+
ended = true
38+
stream && (await new Promise(r => (stream.once('end', r), stream.end())))
39+
return end()
40+
}
2341

24-
sql.end = async() => {
25-
ended = true
26-
stream && (await new Promise(r => (stream.once('end', r), stream.end())))
27-
return end()
28-
}
42+
sql.close = async() => {
43+
stream && (await new Promise(r => (stream.once('end', r), stream.end())))
44+
return close()
45+
}
46+
47+
return subscribe
2948

30-
!connection && (subscribe.sql = sql, connection = init(sql, slot, options.publications))
49+
async function subscribe(event, fn, onsubscribe = noop) {
50+
event = parseEvent(event)
3151

32-
const fns = listeners.has(event)
33-
? listeners.get(event).add(fn)
34-
: listeners.set(event, new Set([fn])).get(event)
52+
if (!connection)
53+
connection = init(sql, slot, options.publications)
54+
55+
const subscriber = { fn, onsubscribe }
56+
const fns = subscribers.has(event)
57+
? subscribers.get(event).add(subscriber)
58+
: subscribers.set(event, new Set([subscriber])).get(event)
3559

3660
const unsubscribe = () => {
37-
fns.delete(fn)
38-
fns.size === 0 && listeners.delete(event)
61+
fns.delete(subscriber)
62+
fns.size === 0 && subscribers.delete(event)
3963
}
4064

41-
return connection.then(x => (stream = x, { unsubscribe }))
65+
return connection.then(x => {
66+
connected(x)
67+
onsubscribe()
68+
return { unsubscribe, state, sql }
69+
})
70+
}
4271

43-
async function onclose() {
44-
stream = null
45-
!ended && (stream = await init(sql, slot, options.publications))
46-
}
72+
function connected(x) {
73+
stream = x.stream
74+
state.pid = x.state.pid
75+
state.secret = x.state.secret
4776
}
4877

4978
async function init(sql, slot, publications) {
5079
if (!publications)
5180
throw new Error('Missing publication names')
5281

53-
const [x] = await sql.unsafe(
82+
const xs = await sql.unsafe(
5483
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
5584
)
5685

86+
const [x] = xs
87+
5788
const stream = await sql.unsafe(
5889
`START_REPLICATION SLOT ${ slot } LOGICAL ${
5990
x.consistent_point
@@ -65,12 +96,10 @@ export default function Subscribe(postgres, options) {
6596
}
6697

6798
stream.on('data', data)
68-
stream.on('error', (error) => {
69-
console.error('Logical Replication Error - Reconnecting', error) // eslint-disable-line
70-
sql.end()
71-
})
99+
stream.on('error', sql.close)
100+
stream.on('close', sql.close)
72101

73-
return stream
102+
return { stream, state: xs.state }
74103

75104
function data(x) {
76105
if (x[0] === 0x77)
@@ -99,7 +128,7 @@ export default function Subscribe(postgres, options) {
99128
}
100129

101130
function call(x, a, b) {
102-
listeners.has(x) && listeners.get(x).forEach(fn => fn(a, b, x))
131+
subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x))
103132
}
104133
}
105134

tests/index.js

+43-2
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,12 @@ t('Undefined values throws', async() => {
320320
})
321321

322322
t('Transform undefined', async() => {
323-
const sql = postgres({ transform: { undefined: null } })
323+
const sql = postgres({ ...options, transform: { undefined: null } })
324324
return [null, (await sql`select ${ undefined } as x`)[0].x]
325325
})
326326

327327
t('Transform undefined in array', async() => {
328-
const sql = postgres({ transform: { undefined: null } })
328+
const sql = postgres({ ...options, transform: { undefined: null } })
329329
return [null, (await sql`select * from (values ${ sql([undefined, undefined]) }) as x(x, y)`)[0].y]
330330
})
331331

@@ -1777,6 +1777,47 @@ t('subscribe', { timeout: 2 }, async() => {
17771777
]
17781778
})
17791779

1780+
t('subscribe reconnects and calls onsubscribe', { timeout: 4 }, async() => {
1781+
const sql = postgres({
1782+
database: 'postgres_js_test',
1783+
publications: 'alltables',
1784+
fetch_types: false
1785+
})
1786+
1787+
await sql.unsafe('create publication alltables for all tables')
1788+
1789+
const result = []
1790+
let onsubscribes = 0
1791+
1792+
const { unsubscribe, sql: subscribeSql } = await sql.subscribe(
1793+
'*',
1794+
(row, { command, old }) => result.push(command, row.name || row.id, old && old.name),
1795+
() => onsubscribes++
1796+
)
1797+
1798+
await sql`
1799+
create table test (
1800+
id serial primary key,
1801+
name text
1802+
)
1803+
`
1804+
1805+
await sql`insert into test (name) values ('Murray')`
1806+
await delay(10)
1807+
await subscribeSql.close()
1808+
await delay(500)
1809+
await sql`delete from test`
1810+
await delay(10)
1811+
await unsubscribe()
1812+
return [
1813+
'2insert,Murray,,delete,1,',
1814+
onsubscribes + result.join(','),
1815+
await sql`drop table test`,
1816+
await sql`drop publication alltables`,
1817+
await sql.end()
1818+
]
1819+
})
1820+
17801821
t('Execute', async() => {
17811822
const result = await new Promise((resolve) => {
17821823
const sql = postgres({ ...options, fetch_types: false, debug:(id, query) => resolve(query) })

0 commit comments

Comments
 (0)