Skip to content

Commit 1dc2fd2

Browse files
committed
Add optional onlisten function to listen
1 parent 2ad65c4 commit 1dc2fd2

File tree

3 files changed

+42
-17
lines changed

3 files changed

+42
-17
lines changed

README.md

+17-2
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,10 @@ Do note that you can often achieve the same result using [`WITH` queries (Common
519519
520520
## Listen & notify
521521
522-
When you call `.listen`, a dedicated connection will be created to ensure that you receive notifications in real-time. This connection will be used for any further calls to `.listen`.
522+
When you call `.listen`, a dedicated connection will be created to ensure that you receive notifications instantly. This connection will be used for any further calls to `.listen`. The connection will automatically reconnect according to a backoff reconnection pattern to not overload the database server.
523523
524-
`.listen` returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
524+
### Listen `await sql.listen(channel, onnotify, [onlisten]) -> { state }`
525+
`.listen` takes the channel name, a function to handle each notify, and an optional function to run every time listen is registered and ready (happens on initial connect and reconnects). It returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
525526
526527
```js
527528
await sql.listen('news', payload => {
@@ -530,6 +531,20 @@ await sql.listen('news', payload => {
530531
})
531532
```
532533
534+
The optional `onlisten` method is great to use for a very simply queue mechanism:
535+
536+
```js
537+
await sql.listen(
538+
'jobs',
539+
(x) => run(JSON.parse(x)),
540+
( ) => sql`select unfinished_jobs()`.forEach(run)
541+
)
542+
543+
function run(job) {
544+
// And here you do the work you please
545+
}
546+
```
547+
### Notify `await sql.notify(channel, payload) -> Result[]`
533548
Notify can be done as usual in SQL, or by using the `sql.notify` method.
534549
```js
535550
sql.notify('news', JSON.stringify({ no: 'this', is: 'news' }))

src/index.js

+12-8
Original file line numberDiff line numberDiff line change
@@ -139,34 +139,38 @@ function Postgres(a, b) {
139139
}
140140
}
141141

142-
async function listen(name, fn) {
142+
async function listen(name, fn, onlisten) {
143+
const listener = { fn, onlisten }
144+
143145
const sql = listen.sql || (listen.sql = Postgres({
144146
...options,
145147
max: 1,
146148
idle_timeout: null,
147149
max_lifetime: null,
148150
fetch_types: false,
149151
onclose() {
150-
Object.entries(listen.channels).forEach(([channel, { listeners }]) => {
151-
delete listen.channels[channel]
152-
Promise.all(listeners.map(fn => listen(channel, fn).catch(() => { /* noop */ })))
152+
Object.entries(listen.channels).forEach(([name, { listeners }]) => {
153+
delete listen.channels[name]
154+
Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
153155
})
154156
},
155157
onnotify(c, x) {
156-
c in listen.channels && listen.channels[c].listeners.forEach(fn => fn(x))
158+
c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x))
157159
}
158160
}))
159161

160162
const channels = listen.channels || (listen.channels = {})
161163
, exists = name in channels
162-
, channel = exists ? channels[name] : (channels[name] = { listeners: [fn] })
164+
, channel = exists ? channels[name] : (channels[name] = { listeners: [listener] })
163165

164166
if (exists) {
165-
channel.listeners.push(fn)
167+
channel.listeners.push(listener)
168+
listener.onlisten && listener.onlisten()
166169
return Promise.resolve({ ...channel.result, unlisten })
167170
}
168171

169172
channel.result = await sql`listen ${ sql(name) }`
173+
listener.onlisten && listener.onlisten()
170174
channel.result.unlisten = unlisten
171175

172176
return channel.result
@@ -175,7 +179,7 @@ function Postgres(a, b) {
175179
if (name in channels === false)
176180
return
177181

178-
channel.listeners = channel.listeners.filter(x => x !== fn)
182+
channel.listeners = channel.listeners.filter(x => x !== listener)
179183
if (channels[name].listeners.length)
180184
return
181185

tests/index.js

+13-7
Original file line numberDiff line numberDiff line change
@@ -672,15 +672,21 @@ t('listen reconnects', { timeout: 2 }, async() => {
672672
, a = new Promise(r => resolvers.a = r)
673673
, b = new Promise(r => resolvers.b = r)
674674

675-
const { state: { pid } } = await sql.listen('test', x => x in resolvers && resolvers[x]())
675+
let connects = 0
676+
677+
const { state: { pid } } = await sql.listen(
678+
'test',
679+
x => x in resolvers && resolvers[x](),
680+
() => connects++
681+
)
676682
await sql.notify('test', 'a')
677683
await a
678684
await sql`select pg_terminate_backend(${ pid })`
679685
await delay(100)
680686
await sql.notify('test', 'b')
681687
await b
682688
sql.end()
683-
return [true, true]
689+
return [connects, 2]
684690
})
685691

686692
t('listen result reports correct connection state after reconnection', async() => {
@@ -1545,12 +1551,12 @@ t('Multiple hosts', {
15451551
const x1 = await sql`select 1`
15461552
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15471553
await s1`select pg_terminate_backend(${ x1.state.pid }::int)`
1548-
await delay(10)
1554+
await delay(50)
15491555

15501556
const x2 = await sql`select 1`
15511557
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15521558
await s2`select pg_terminate_backend(${ x2.state.pid }::int)`
1553-
await delay(10)
1559+
await delay(50)
15541560

15551561
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15561562

@@ -1764,11 +1770,11 @@ t('Cancel running query', async() => {
17641770
return ['57014', error.code]
17651771
})
17661772

1767-
t('Cancel piped query', { timeout: 1 }, async() => {
1773+
t('Cancel piped query', async() => {
17681774
await sql`select 1`
1769-
const last = sql`select pg_sleep(0.1)`.execute()
1775+
const last = sql`select pg_sleep(0.05)`.execute()
17701776
const query = sql`select pg_sleep(2) as dig`
1771-
setTimeout(() => query.cancel(), 50)
1777+
setTimeout(() => query.cancel(), 10)
17721778
const error = await query.catch(x => x)
17731779
await last
17741780
return ['57014', error.code]

0 commit comments

Comments
 (0)