Skip to content

Commit 5058a57

Browse files
committed
Implement automatic reconnect for listen - fixes porsager#35
commit 003bc24f1f98af585a4276d787eeab8eafda980a Author: Rasmus Porsager <rasmus@porsager.com> Date: Wed May 20 08:51:19 2020 +0200 Clean up reconnect commit 585c19491e0a8d3696833bfaf2bab24e24fc1f6c Author: Rasmus Porsager <rasmus@porsager.com> Date: Mon Mar 9 20:19:32 2020 +0100 Clean up tests commit dc80681f4b91de6fdaab0b7576a55b106c8f463b Author: Rasmus Porsager <rasmus@porsager.com> Date: Fri Feb 14 21:54:43 2020 +0100 Reconnect listener connection on close
1 parent b5c52c8 commit 5058a57

File tree

4 files changed

+34
-9
lines changed

4 files changed

+34
-9
lines changed

lib/backend.js

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ function Backend({
8383
backend.query.result.count = +x.toString('utf8', i + 1, x.length - 1)
8484
if (x[i - 1] >= 65) {
8585
backend.query.result.command = x.toString('utf8', 5, i)
86+
backend.query.result.state = state
8687
break
8788
}
8889
}

lib/connection.js

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ function Connection(options = {}) {
1818
connect_timeout,
1919
onnotify,
2020
onnotice,
21+
onclose,
2122
parsers
2223
} = options
2324
let buffer = Buffer.alloc(0)
@@ -230,6 +231,7 @@ function Connection(options = {}) {
230231
statements = {}
231232
messages = []
232233
open = ready = false
234+
onclose && onclose()
233235
}
234236

235237
/* c8 ignore next */

lib/index.js

+11-4
Original file line numberDiff line numberDiff line change
@@ -340,16 +340,23 @@ function Postgres(a, b) {
340340
}
341341

342342
listeners[channel] = [fn]
343-
return query({ raw: true }, getListener(), 'listen ' + escape(channel)).then(() => channel)
343+
return query({ raw: true }, getListener(), 'listen ' + escape(channel))
344344
}
345345

346346
function getListener() {
347347
if (listener)
348348
return listener
349349

350-
listener = Connection(Object.assign({},
351-
options,
352-
{ onnotify: (c, x) => c in listeners && listeners[c].forEach(fn => fn(x)) }
350+
listener = Connection(Object.assign({
351+
onnotify: (c, x) => c in listeners && listeners[c].forEach(fn => fn(x)),
352+
onclose: () => {
353+
Object.entries(listeners).forEach(([channel, fns]) => {
354+
delete listeners[channel]
355+
Promise.allSettled(fns.map(fn => listen(channel, fn)))
356+
})
357+
}
358+
},
359+
options
353360
))
354361
all.push(listener)
355362
return listener

tests/index.js

+20-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const cp = require('child_process')
55
const path = require('path')
66

77
const postgres = require('../lib')
8+
const delay = ms => new Promise(r => setTimeout(r, ms))
89

910
const login = {
1011
user: 'postgres_js_test'
@@ -30,7 +31,7 @@ const options = {
3031
user: login.user,
3132
pass: login.pass,
3233
idle_timeout: 0.2,
33-
debug: true,
34+
debug: false,
3435
max: 1
3536
}
3637

@@ -385,7 +386,7 @@ t('sql file throws', async() =>
385386

386387
t('sql file cached', async() => {
387388
await sql.file(path.join(__dirname, 'select.sql'))
388-
await new Promise(r => setTimeout(r, 20))
389+
await delay(20)
389390

390391
return [1, (await sql.file(path.join(__dirname, 'select.sql')))[0].x]
391392
})
@@ -416,7 +417,6 @@ t('Connection ended timeout', async() => {
416417

417418
t('Connection ended error', async() => {
418419
const sql = postgres(options)
419-
420420
sql.end()
421421
return ['CONNECTION_ENDED', (await sql``.catch(x => x.code))]
422422
})
@@ -549,6 +549,21 @@ t('listen and notify with weird name', async() => {
549549
)]
550550
})
551551

552+
t('listen reconnects', async() => {
553+
const listener = postgres(options)
554+
, xs = []
555+
556+
const { state: { pid } } = await listener.listen('test', x => xs.push(x))
557+
await sql.notify('test', 'a')
558+
await sql`select pg_terminate_backend(${ pid }::int)`
559+
await delay(50)
560+
await sql.notify('test', 'b')
561+
await delay(50)
562+
listener.end()
563+
564+
return ['ab', xs.join('')]
565+
})
566+
552567
t('responds with server parameters (application_name)', async() =>
553568
['postgres.js', await new Promise((resolve, reject) => postgres({
554569
...options,
@@ -788,7 +803,7 @@ t('Cursor works', async() => {
788803
const order = []
789804
await sql`select 1 as x union select 2 as x`.cursor(async(x) => {
790805
order.push(x.x + 'a')
791-
await new Promise(r => setTimeout(r, 100))
806+
await delay(100)
792807
order.push(x.x + 'b')
793808
})
794809
return ['1a1b2a2b', order.join('')]
@@ -815,7 +830,7 @@ t('Cursor throw works', async() => {
815830
const order = []
816831
await sql`select 1 as x union select 2 as x`.cursor(async(x) => {
817832
order.push(x.x + 'a')
818-
await new Promise(r => setTimeout(r, 100))
833+
await delay(100)
819834
throw new Error('watty')
820835
}).catch(() => order.push('err'))
821836
return ['1aerr', order.join('')]

0 commit comments

Comments
 (0)