Skip to content

Commit 6ef71c8

Browse files
authored
feat: implements unlisten functionality (porsager#155)
* feat: implements unlisten functionality Relates to porsager#133 * fix: return listening result from subscecuent `listen` calls So, the client can unlisten any of its subscription * fix: makes sure unlisten can be called even after there is no events for the bound topic * fix: makes sure unlisten clears the right subscription
1 parent 995bc7e commit 6ef71c8

File tree

3 files changed

+87
-6
lines changed

3 files changed

+87
-6
lines changed

lib/index.js

+31-4
Original file line numberDiff line numberDiff line change
@@ -337,31 +337,58 @@ function Postgres(a, b) {
337337
}
338338

339339
function listen(channel, fn) {
340+
const listener = getListener();
341+
340342
if (channel in listeners) {
341343
listeners[channel].push(fn)
342-
return Promise.resolve(channel)
344+
return Promise.resolve(Object.assign({}, listener.result, {
345+
unlisten,
346+
}))
343347
}
344348

345349
listeners[channel] = [fn]
346-
return query({ raw: true }, getListener(), 'listen ' + escape(channel))
350+
return query({ raw: true }, listener.conn, 'listen ' + escape(channel))
351+
.then((result) => {
352+
listener.result = result
353+
return Object.assign({}, listener.result, {
354+
unlisten,
355+
})
356+
})
357+
358+
function unlisten() {
359+
if (!listeners[channel]) {
360+
return Promise.resolve()
361+
}
362+
363+
listeners[channel] = listeners[channel].filter(handler => handler !== fn)
364+
365+
if (!listeners[channel].length) {
366+
delete listeners[channel]
367+
return query({ raw: true }, getListener().conn, 'unlisten ' + escape(channel))
368+
.then(() => {})
369+
}
370+
return Promise.resolve()
371+
}
347372
}
348373

349374
function getListener() {
350375
if (listener)
351376
return listener
352377

353-
listener = Connection(Object.assign({
378+
const conn = Connection(Object.assign({
354379
onnotify: (c, x) => c in listeners && listeners[c].forEach(fn => fn(x)),
355380
onclose: () => {
356381
Object.entries(listeners).forEach(([channel, fns]) => {
357382
delete listeners[channel]
358383
Promise.all(fns.map(fn => listen(channel, fn).catch(() => { /* noop */ })))
359384
})
385+
listener = null;
360386
}
361387
},
362388
options
363389
))
364-
all.push(listener)
390+
listener = { conn, result: null };
391+
all.push(conn);
365392
return listener
366393
}
367394

tests/index.js

+49
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,55 @@ t('listen reconnects', async() => {
580580
return ['ab', xs.join('')]
581581
})
582582

583+
t('unlisten removes subscription', async() => {
584+
const listener = postgres(options)
585+
, xs = []
586+
587+
const { unlisten } = await listener.listen('test', x => xs.push(x))
588+
await listener.notify('test', 'a')
589+
await delay(50)
590+
await unlisten()
591+
await listener.notify('test', 'b')
592+
await delay(50)
593+
listener.end()
594+
595+
return ['a', xs.join('')]
596+
})
597+
598+
t('listen after unlisten', async() => {
599+
const listener = postgres(options)
600+
, xs = []
601+
602+
const { unlisten } = await listener.listen('test', x => xs.push(x))
603+
await listener.notify('test', 'a')
604+
await delay(50)
605+
await unlisten()
606+
await listener.notify('test', 'b')
607+
await delay(50)
608+
await listener.listen('test', x => xs.push(x))
609+
await listener.notify('test', 'c')
610+
await delay(50)
611+
listener.end();
612+
613+
return ['ac', xs.join('')]
614+
})
615+
616+
t('multiple listeners and unlisten one', async() => {
617+
const listener = postgres(options)
618+
, xs = []
619+
620+
await listener.listen('test', x => xs.push('1', x))
621+
const s2 = await listener.listen('test', x => xs.push('2', x))
622+
await listener.notify('test', 'a')
623+
await delay(50)
624+
await s2.unlisten()
625+
await listener.notify('test', 'b')
626+
await delay(50)
627+
listener.end();
628+
629+
return ['1a2a1b', xs.join('')]
630+
})
631+
583632
t('responds with server parameters (application_name)', async() =>
584633
['postgres.js', await new Promise((resolve, reject) => postgres({
585634
...options,

types/index.d.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,11 @@ declare namespace postgres {
295295

296296
interface PendingRequest extends Promise<[] & ResultMeta<null>> { }
297297

298+
interface ListenRequest extends Promise<ListenMeta> { }
299+
interface ListenMeta extends ResultMeta<null> {
300+
unlisten(): Promise<void>
301+
}
302+
298303
interface Helper<T, U extends any[] = T[]> {
299304
first: T;
300305
rest: U;
@@ -336,7 +341,7 @@ declare namespace postgres {
336341
file<T extends any[] = Row[]>(path: string, options?: { cache?: boolean }): PendingQuery<AsRowList<T>>;
337342
file<T extends any[] = Row[]>(path: string, args: SerializableParameter[], options?: { cache?: boolean }): PendingQuery<AsRowList<T>>;
338343
json(value: any): Parameter;
339-
listen(channel: string, cb: (value?: string) => void): PendingRequest;
344+
listen(channel: string, cb: (value?: string) => void): ListenRequest;
340345
notify(channel: string, payload: string): PendingRequest;
341346
options: ParsedOptions<TTypes>;
342347
parameters: ConnectionParameters;
@@ -355,4 +360,4 @@ declare namespace postgres {
355360

356361
}
357362

358-
export = postgres;
363+
export = postgres;

0 commit comments

Comments
 (0)