1
+ const noop = ( ) => { /* noop */ }
2
+
1
3
export default function Subscribe ( postgres , options ) {
2
- const listeners = new Map ( )
4
+ const subscribers = new Map ( )
5
+ , slot = 'postgresjs_' + Math . random ( ) . toString ( 36 ) . slice ( 2 )
6
+ , state = { }
3
7
4
8
let connection
5
-
6
- return async function subscribe ( event , fn ) {
7
- event = parseEvent ( event )
8
-
9
- options . max = 1
10
- options . onclose = onclose
11
- options . fetch_types = false
12
- options . connection = {
9
+ , stream
10
+ , ended = false
11
+
12
+ const sql = subscribe . sql = postgres ( {
13
+ ...options ,
14
+ max : 1 ,
15
+ fetch_types : false ,
16
+ idle_timeout : null ,
17
+ max_lifetime : null ,
18
+ connection : {
13
19
...options . connection ,
14
20
replication : 'database'
15
- }
21
+ } ,
22
+ onclose : async function ( ) {
23
+ if ( ended )
24
+ return
25
+ stream = null
26
+ state . pid = state . secret = undefined
27
+ ! ended && connected ( await init ( sql , slot , options . publications ) )
28
+ subscribers . forEach ( event => event . forEach ( ( { onsubscribe } ) => onsubscribe ( ) ) )
29
+ } ,
30
+ no_subscribe : true
31
+ } )
16
32
17
- let stream
18
- , ended = false
33
+ const end = sql . end
34
+ , close = sql . close
19
35
20
- const sql = postgres ( options )
21
- , slot = 'postgresjs_' + Math . random ( ) . toString ( 36 ) . slice ( 2 )
22
- , end = sql . end
36
+ sql . end = async ( ) => {
37
+ ended = true
38
+ stream && ( await new Promise ( r => ( stream . once ( 'end' , r ) , stream . end ( ) ) ) )
39
+ return end ( )
40
+ }
23
41
24
- sql . end = async ( ) => {
25
- ended = true
26
- stream && ( await new Promise ( r => ( stream . once ( 'end' , r ) , stream . end ( ) ) ) )
27
- return end ( )
28
- }
42
+ sql . close = async ( ) => {
43
+ stream && ( await new Promise ( r => ( stream . once ( 'end' , r ) , stream . end ( ) ) ) )
44
+ return close ( )
45
+ }
46
+
47
+ return subscribe
29
48
30
- ! connection && ( subscribe . sql = sql , connection = init ( sql , slot , options . publications ) )
49
+ async function subscribe ( event , fn , onsubscribe = noop ) {
50
+ event = parseEvent ( event )
31
51
32
- const fns = listeners . has ( event )
33
- ? listeners . get ( event ) . add ( fn )
34
- : listeners . set ( event , new Set ( [ fn ] ) ) . get ( event )
52
+ if ( ! connection )
53
+ connection = init ( sql , slot , options . publications )
54
+
55
+ const subscriber = { fn, onsubscribe }
56
+ const fns = subscribers . has ( event )
57
+ ? subscribers . get ( event ) . add ( subscriber )
58
+ : subscribers . set ( event , new Set ( [ subscriber ] ) ) . get ( event )
35
59
36
60
const unsubscribe = ( ) => {
37
- fns . delete ( fn )
38
- fns . size === 0 && listeners . delete ( event )
61
+ fns . delete ( subscriber )
62
+ fns . size === 0 && subscribers . delete ( event )
39
63
}
40
64
41
- return connection . then ( x => ( stream = x , { unsubscribe } ) )
65
+ return connection . then ( x => {
66
+ connected ( x )
67
+ onsubscribe ( )
68
+ return { unsubscribe, state, sql }
69
+ } )
70
+ }
42
71
43
- async function onclose ( ) {
44
- stream = null
45
- ! ended && ( stream = await init ( sql , slot , options . publications ) )
46
- }
72
+ function connected ( x ) {
73
+ stream = x . stream
74
+ state . pid = x . state . pid
75
+ state . secret = x . state . secret
47
76
}
48
77
49
78
async function init ( sql , slot , publications ) {
50
79
if ( ! publications )
51
80
throw new Error ( 'Missing publication names' )
52
81
53
- const [ x ] = await sql . unsafe (
82
+ const xs = await sql . unsafe (
54
83
`CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT`
55
84
)
56
85
86
+ const [ x ] = xs
87
+
57
88
const stream = await sql . unsafe (
58
89
`START_REPLICATION SLOT ${ slot } LOGICAL ${
59
90
x . consistent_point
@@ -65,12 +96,10 @@ export default function Subscribe(postgres, options) {
65
96
}
66
97
67
98
stream . on ( 'data' , data )
68
- stream . on ( 'error' , ( error ) => {
69
- console . error ( 'Logical Replication Error - Reconnecting' , error ) // eslint-disable-line
70
- sql . end ( )
71
- } )
99
+ stream . on ( 'error' , sql . close )
100
+ stream . on ( 'close' , sql . close )
72
101
73
- return stream
102
+ return { stream, state : xs . state }
74
103
75
104
function data ( x ) {
76
105
if ( x [ 0 ] === 0x77 )
@@ -99,7 +128,7 @@ export default function Subscribe(postgres, options) {
99
128
}
100
129
101
130
function call ( x , a , b ) {
102
- listeners . has ( x ) && listeners . get ( x ) . forEach ( fn => fn ( a , b , x ) )
131
+ subscribers . has ( x ) && subscribers . get ( x ) . forEach ( ( { fn } ) => fn ( a , b , x ) )
103
132
}
104
133
}
105
134
0 commit comments