Skip to content

Commit 28bb0b3

Browse files
committed
Fix transaction execution timing
1 parent 1dc2fd2 commit 28bb0b3

File tree

7 files changed

+82
-41
lines changed

7 files changed

+82
-41
lines changed

cjs/src/index.js

+13-9
Original file line numberDiff line numberDiff line change
@@ -139,34 +139,38 @@ function Postgres(a, b) {
139139
}
140140
}
141141

142-
async function listen(name, fn) {
142+
async function listen(name, fn, onlisten) {
143+
const listener = { fn, onlisten }
144+
143145
const sql = listen.sql || (listen.sql = Postgres({
144146
...options,
145147
max: 1,
146148
idle_timeout: null,
147149
max_lifetime: null,
148150
fetch_types: false,
149151
onclose() {
150-
Object.entries(listen.channels).forEach(([channel, { listeners }]) => {
151-
delete listen.channels[channel]
152-
Promise.all(listeners.map(fn => listen(channel, fn).catch(() => { /* noop */ })))
152+
Object.entries(listen.channels).forEach(([name, { listeners }]) => {
153+
delete listen.channels[name]
154+
Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
153155
})
154156
},
155157
onnotify(c, x) {
156-
c in listen.channels && listen.channels[c].listeners.forEach(fn => fn(x))
158+
c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x))
157159
}
158160
}))
159161

160162
const channels = listen.channels || (listen.channels = {})
161163
, exists = name in channels
162-
, channel = exists ? channels[name] : (channels[name] = { listeners: [fn] })
164+
, channel = exists ? channels[name] : (channels[name] = { listeners: [listener] })
163165

164166
if (exists) {
165-
channel.listeners.push(fn)
167+
channel.listeners.push(listener)
168+
listener.onlisten && listener.onlisten()
166169
return Promise.resolve({ ...channel.result, unlisten })
167170
}
168171

169172
channel.result = await sql`listen ${ sql(name) }`
173+
listener.onlisten && listener.onlisten()
170174
channel.result.unlisten = unlisten
171175

172176
return channel.result
@@ -175,7 +179,7 @@ function Postgres(a, b) {
175179
if (name in channels === false)
176180
return
177181

178-
channel.listeners = channel.listeners.filter(x => x !== fn)
182+
channel.listeners = channel.listeners.filter(x => x !== listener)
179183
if (channels[name].listeners.length)
180184
return
181185

@@ -195,7 +199,7 @@ function Postgres(a, b) {
195199
, connection
196200

197201
try {
198-
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute })
202+
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
199203
return await scope(connection, fn)
200204
} catch (error) {
201205
throw error

cjs/tests/index.js

+17-9
Original file line numberDiff line numberDiff line change
@@ -672,15 +672,21 @@ t('listen reconnects', { timeout: 2 }, async() => {
672672
, a = new Promise(r => resolvers.a = r)
673673
, b = new Promise(r => resolvers.b = r)
674674

675-
const { state: { pid } } = await sql.listen('test', x => x in resolvers && resolvers[x]())
675+
let connects = 0
676+
677+
const { state: { pid } } = await sql.listen(
678+
'test',
679+
x => x in resolvers && resolvers[x](),
680+
() => connects++
681+
)
676682
await sql.notify('test', 'a')
677683
await a
678684
await sql`select pg_terminate_backend(${ pid })`
679685
await delay(100)
680686
await sql.notify('test', 'b')
681687
await b
682688
sql.end()
683-
return [true, true]
689+
return [connects, 2]
684690
})
685691

686692
t('listen result reports correct connection state after reconnection', async() => {
@@ -1545,12 +1551,12 @@ t('Multiple hosts', {
15451551
const x1 = await sql`select 1`
15461552
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15471553
await s1`select pg_terminate_backend(${ x1.state.pid }::int)`
1548-
await delay(10)
1554+
await delay(50)
15491555

15501556
const x2 = await sql`select 1`
15511557
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15521558
await s2`select pg_terminate_backend(${ x2.state.pid }::int)`
1553-
await delay(10)
1559+
await delay(50)
15541560

15551561
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15561562

@@ -1764,20 +1770,22 @@ t('Cancel running query', async() => {
17641770
return ['57014', error.code]
17651771
})
17661772

1767-
t('Cancel piped query', { timeout: 1 }, async() => {
1773+
t('Cancel piped query', async() => {
17681774
await sql`select 1`
1769-
const last = sql`select pg_sleep(0.1)`.execute()
1775+
const last = sql`select pg_sleep(0.05)`.execute()
17701776
const query = sql`select pg_sleep(2) as dig`
1771-
setTimeout(() => query.cancel(), 50)
1777+
setTimeout(() => query.cancel(), 10)
17721778
const error = await query.catch(x => x)
17731779
await last
17741780
return ['57014', error.code]
17751781
})
17761782

17771783
t('Cancel queued query', async() => {
1778-
const tx = sql.begin(sql => sql`select pg_sleep(0.2) as hej, 'hejsa'`)
17791784
const query = sql`select pg_sleep(2) as nej`
1780-
setTimeout(() => query.cancel(), 100)
1785+
const tx = sql.begin(sql => (
1786+
query.cancel(),
1787+
sql`select pg_sleep(0.1) as hej, 'hejsa'`
1788+
))
17811789
const error = await query.catch(x => x)
17821790
await tx
17831791
return ['57014', error.code]

deno/README.md

+17-2
Original file line numberDiff line numberDiff line change
@@ -515,9 +515,10 @@ Do note that you can often achieve the same result using [`WITH` queries (Common
515515
516516
## Listen & notify
517517
518-
When you call `.listen`, a dedicated connection will be created to ensure that you receive notifications in real-time. This connection will be used for any further calls to `.listen`.
518+
When you call `.listen`, a dedicated connection will be created to ensure that you receive notifications instantly. This connection will be used for any further calls to `.listen`. The connection will automatically reconnect according to a backoff reconnection pattern to not overload the database server.
519519
520-
`.listen` returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
520+
### Listen `await sql.listen(channel, onnotify, [onlisten]) -> { state }`
521+
`.listen` takes the channel name, a function to handle each notify, and an optional function to run every time listen is registered and ready (happens on initial connect and reconnects). It returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.
521522
522523
```js
523524
await sql.listen('news', payload => {
@@ -526,6 +527,20 @@ await sql.listen('news', payload => {
526527
})
527528
```
528529
530+
The optional `onlisten` method is great to use for a very simply queue mechanism:
531+
532+
```js
533+
await sql.listen(
534+
'jobs',
535+
(x) => run(JSON.parse(x)),
536+
( ) => sql`select unfinished_jobs()`.forEach(run)
537+
)
538+
539+
function run(job) {
540+
// And here you do the work you please
541+
}
542+
```
543+
### Notify `await sql.notify(channel, payload) -> Result[]`
529544
Notify can be done as usual in SQL, or by using the `sql.notify` method.
530545
```js
531546
sql.notify('news', JSON.stringify({ no: 'this', is: 'news' }))

deno/src/index.js

+13-9
Original file line numberDiff line numberDiff line change
@@ -140,34 +140,38 @@ function Postgres(a, b) {
140140
}
141141
}
142142

143-
async function listen(name, fn) {
143+
async function listen(name, fn, onlisten) {
144+
const listener = { fn, onlisten }
145+
144146
const sql = listen.sql || (listen.sql = Postgres({
145147
...options,
146148
max: 1,
147149
idle_timeout: null,
148150
max_lifetime: null,
149151
fetch_types: false,
150152
onclose() {
151-
Object.entries(listen.channels).forEach(([channel, { listeners }]) => {
152-
delete listen.channels[channel]
153-
Promise.all(listeners.map(fn => listen(channel, fn).catch(() => { /* noop */ })))
153+
Object.entries(listen.channels).forEach(([name, { listeners }]) => {
154+
delete listen.channels[name]
155+
Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
154156
})
155157
},
156158
onnotify(c, x) {
157-
c in listen.channels && listen.channels[c].listeners.forEach(fn => fn(x))
159+
c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x))
158160
}
159161
}))
160162

161163
const channels = listen.channels || (listen.channels = {})
162164
, exists = name in channels
163-
, channel = exists ? channels[name] : (channels[name] = { listeners: [fn] })
165+
, channel = exists ? channels[name] : (channels[name] = { listeners: [listener] })
164166

165167
if (exists) {
166-
channel.listeners.push(fn)
168+
channel.listeners.push(listener)
169+
listener.onlisten && listener.onlisten()
167170
return Promise.resolve({ ...channel.result, unlisten })
168171
}
169172

170173
channel.result = await sql`listen ${ sql(name) }`
174+
listener.onlisten && listener.onlisten()
171175
channel.result.unlisten = unlisten
172176

173177
return channel.result
@@ -176,7 +180,7 @@ function Postgres(a, b) {
176180
if (name in channels === false)
177181
return
178182

179-
channel.listeners = channel.listeners.filter(x => x !== fn)
183+
channel.listeners = channel.listeners.filter(x => x !== listener)
180184
if (channels[name].listeners.length)
181185
return
182186

@@ -196,7 +200,7 @@ function Postgres(a, b) {
196200
, connection
197201

198202
try {
199-
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute })
203+
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
200204
return await scope(connection, fn)
201205
} catch (error) {
202206
throw error

deno/tests/index.js

+17-9
Original file line numberDiff line numberDiff line change
@@ -674,15 +674,21 @@ t('listen reconnects', { timeout: 2 }, async() => {
674674
, a = new Promise(r => resolvers.a = r)
675675
, b = new Promise(r => resolvers.b = r)
676676

677-
const { state: { pid } } = await sql.listen('test', x => x in resolvers && resolvers[x]())
677+
let connects = 0
678+
679+
const { state: { pid } } = await sql.listen(
680+
'test',
681+
x => x in resolvers && resolvers[x](),
682+
() => connects++
683+
)
678684
await sql.notify('test', 'a')
679685
await a
680686
await sql`select pg_terminate_backend(${ pid })`
681687
await delay(100)
682688
await sql.notify('test', 'b')
683689
await b
684690
sql.end()
685-
return [true, true]
691+
return [connects, 2]
686692
})
687693

688694
t('listen result reports correct connection state after reconnection', async() => {
@@ -1547,12 +1553,12 @@ t('Multiple hosts', {
15471553
const x1 = await sql`select 1`
15481554
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15491555
await s1`select pg_terminate_backend(${ x1.state.pid }::int)`
1550-
await delay(10)
1556+
await delay(50)
15511557

15521558
const x2 = await sql`select 1`
15531559
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15541560
await s2`select pg_terminate_backend(${ x2.state.pid }::int)`
1555-
await delay(10)
1561+
await delay(50)
15561562

15571563
result.push((await sql`select system_identifier as x from pg_control_system()`)[0].x)
15581564

@@ -1766,20 +1772,22 @@ t('Cancel running query', async() => {
17661772
return ['57014', error.code]
17671773
})
17681774

1769-
t('Cancel piped query', { timeout: 1 }, async() => {
1775+
t('Cancel piped query', async() => {
17701776
await sql`select 1`
1771-
const last = sql`select pg_sleep(0.1)`.execute()
1777+
const last = sql`select pg_sleep(0.05)`.execute()
17721778
const query = sql`select pg_sleep(2) as dig`
1773-
setTimeout(() => query.cancel(), 50)
1779+
setTimeout(() => query.cancel(), 10)
17741780
const error = await query.catch(x => x)
17751781
await last
17761782
return ['57014', error.code]
17771783
})
17781784

17791785
t('Cancel queued query', async() => {
1780-
const tx = sql.begin(sql => sql`select pg_sleep(0.2) as hej, 'hejsa'`)
17811786
const query = sql`select pg_sleep(2) as nej`
1782-
setTimeout(() => query.cancel(), 100)
1787+
const tx = sql.begin(sql => (
1788+
query.cancel(),
1789+
sql`select pg_sleep(0.1) as hej, 'hejsa'`
1790+
))
17831791
const error = await query.catch(x => x)
17841792
await tx
17851793
return ['57014', error.code]

src/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ function Postgres(a, b) {
199199
, connection
200200

201201
try {
202-
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute })
202+
await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute()
203203
return await scope(connection, fn)
204204
} catch (error) {
205205
throw error

tests/index.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -1781,9 +1781,11 @@ t('Cancel piped query', async() => {
17811781
})
17821782

17831783
t('Cancel queued query', async() => {
1784-
const tx = sql.begin(sql => sql`select pg_sleep(0.2) as hej, 'hejsa'`)
17851784
const query = sql`select pg_sleep(2) as nej`
1786-
setTimeout(() => query.cancel(), 100)
1785+
const tx = sql.begin(sql => (
1786+
query.cancel(),
1787+
sql`select pg_sleep(0.1) as hej, 'hejsa'`
1788+
))
17871789
const error = await query.catch(x => x)
17881790
await tx
17891791
return ['57014', error.code]

0 commit comments

Comments
 (0)