Skip to content

Commit 7f6e0cc

Browse files
committed
Add sql.reserve method
1 parent ba498fd commit 7f6e0cc

File tree

9 files changed

+184
-10
lines changed

9 files changed

+184
-10
lines changed

cjs/src/connection.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
545545
return // Consider opening if able and sent.length < 50
546546

547547
connection.reserved
548-
? x[5] === 73 // I
548+
? !connection.reserved.release && x[5] === 73 // I
549549
? ending
550550
? terminate()
551551
: (connection.reserved = null, onopen(connection))

cjs/src/index.js

+40-2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ function Postgres(a, b) {
7474
END: CLOSE,
7575
PostgresError,
7676
options,
77+
reserve,
7778
listen,
7879
begin,
7980
close,
@@ -199,6 +200,36 @@ function Postgres(a, b) {
199200
return await sql`select pg_notify(${ channel }, ${ '' + payload })`
200201
}
201202

203+
async function reserve() {
204+
const q = Queue()
205+
const c = open.length
206+
? open.shift()
207+
: await new Promise(r => {
208+
queries.push({ reserve: r })
209+
closed.length && connect(closed.shift())
210+
})
211+
212+
move(c, reserved)
213+
c.reserved = () => q.length
214+
? c.execute(q.shift())
215+
: move(c, reserved)
216+
c.reserved.release = true
217+
218+
const sql = Sql(handler)
219+
sql.release = () => {
220+
c.reserved = null
221+
onopen(c)
222+
}
223+
224+
return sql
225+
226+
function handler(q) {
227+
c.queue === full
228+
? q.push(q)
229+
: c.execute(q) || move(c, full)
230+
}
231+
}
232+
202233
async function begin(options, fn) {
203234
!fn && (fn = options, options = '')
204235
const queries = Queue()
@@ -270,6 +301,7 @@ function Postgres(a, b) {
270301
queue === open
271302
? c.idleTimer.start()
272303
: c.idleTimer.cancel()
304+
return c
273305
}
274306

275307
function json(x) {
@@ -348,6 +380,7 @@ function Postgres(a, b) {
348380
function connect(c, query) {
349381
move(c, connecting)
350382
c.connect(query)
383+
return c
351384
}
352385

353386
function onend(c) {
@@ -361,8 +394,13 @@ function Postgres(a, b) {
361394
let max = Math.ceil(queries.length / (connecting.length + 1))
362395
, ready = true
363396

364-
while (ready && queries.length && max-- > 0)
365-
ready = c.execute(queries.shift())
397+
while (ready && queries.length && max-- > 0) {
398+
const query = queries.shift()
399+
if (query.reserve)
400+
return query.reserve(c)
401+
402+
ready = c.execute(query)
403+
}
366404

367405
ready
368406
? move(c, busy)

cjs/tests/index.js

+20
Original file line numberDiff line numberDiff line change
@@ -2499,3 +2499,23 @@ t('concurrent cursors multiple connections', async() => {
24992499

25002500
return ['12233445566778', xs.sort().join('')]
25012501
})
2502+
2503+
t('reserve connection', async() => {
2504+
const reserved = await sql.reserve()
2505+
2506+
setTimeout(() => reserved.release(), 500)
2507+
2508+
const xs = await Promise.all([
2509+
reserved`select 1 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
2510+
sql`select 2 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
2511+
reserved`select 3 as x`.then(([{ x }]) => ({ time: Date.now(), x }))
2512+
])
2513+
2514+
if (xs[1].time - xs[2].time < 500)
2515+
throw new Error('Wrong time')
2516+
2517+
return [
2518+
'123',
2519+
xs.map(x => x.x).join('')
2520+
]
2521+
})

deno/src/connection.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
549549
return // Consider opening if able and sent.length < 50
550550

551551
connection.reserved
552-
? x[5] === 73 // I
552+
? !connection.reserved.release && x[5] === 73 // I
553553
? ending
554554
? terminate()
555555
: (connection.reserved = null, onopen(connection))

deno/src/index.js

+40-2
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ function Postgres(a, b) {
7575
END: CLOSE,
7676
PostgresError,
7777
options,
78+
reserve,
7879
listen,
7980
begin,
8081
close,
@@ -200,6 +201,36 @@ function Postgres(a, b) {
200201
return await sql`select pg_notify(${ channel }, ${ '' + payload })`
201202
}
202203

204+
async function reserve() {
205+
const q = Queue()
206+
const c = open.length
207+
? open.shift()
208+
: await new Promise(r => {
209+
queries.push({ reserve: r })
210+
closed.length && connect(closed.shift())
211+
})
212+
213+
move(c, reserved)
214+
c.reserved = () => q.length
215+
? c.execute(q.shift())
216+
: move(c, reserved)
217+
c.reserved.release = true
218+
219+
const sql = Sql(handler)
220+
sql.release = () => {
221+
c.reserved = null
222+
onopen(c)
223+
}
224+
225+
return sql
226+
227+
function handler(q) {
228+
c.queue === full
229+
? q.push(q)
230+
: c.execute(q) || move(c, full)
231+
}
232+
}
233+
203234
async function begin(options, fn) {
204235
!fn && (fn = options, options = '')
205236
const queries = Queue()
@@ -271,6 +302,7 @@ function Postgres(a, b) {
271302
queue === open
272303
? c.idleTimer.start()
273304
: c.idleTimer.cancel()
305+
return c
274306
}
275307

276308
function json(x) {
@@ -349,6 +381,7 @@ function Postgres(a, b) {
349381
function connect(c, query) {
350382
move(c, connecting)
351383
c.connect(query)
384+
return c
352385
}
353386

354387
function onend(c) {
@@ -362,8 +395,13 @@ function Postgres(a, b) {
362395
let max = Math.ceil(queries.length / (connecting.length + 1))
363396
, ready = true
364397

365-
while (ready && queries.length && max-- > 0)
366-
ready = c.execute(queries.shift())
398+
while (ready && queries.length && max-- > 0) {
399+
const query = queries.shift()
400+
if (query.reserve)
401+
return query.reserve(c)
402+
403+
ready = c.execute(query)
404+
}
367405

368406
ready
369407
? move(c, busy)

deno/tests/index.js

+20
Original file line numberDiff line numberDiff line change
@@ -2502,4 +2502,24 @@ t('concurrent cursors multiple connections', async() => {
25022502
return ['12233445566778', xs.sort().join('')]
25032503
})
25042504

2505+
t('reserve connection', async() => {
2506+
const reserved = await sql.reserve()
2507+
2508+
setTimeout(() => reserved.release(), 500)
2509+
2510+
const xs = await Promise.all([
2511+
reserved`select 1 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
2512+
sql`select 2 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
2513+
reserved`select 3 as x`.then(([{ x }]) => ({ time: Date.now(), x }))
2514+
])
2515+
2516+
if (xs[1].time - xs[2].time < 500)
2517+
throw new Error('Wrong time')
2518+
2519+
return [
2520+
'123',
2521+
xs.map(x => x.x).join('')
2522+
]
2523+
})
2524+
25052525
;window.addEventListener("unload", () => Deno.exit(process.exitCode))

src/connection.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
545545
return // Consider opening if able and sent.length < 50
546546

547547
connection.reserved
548-
? x[5] === 73 // I
548+
? !connection.reserved.release && x[5] === 73 // I
549549
? ending
550550
? terminate()
551551
: (connection.reserved = null, onopen(connection))
@@ -571,7 +571,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
571571
final && (final(), final = null)
572572

573573
if (result.command === 'BEGIN' && max !== 1 && !connection.reserved)
574-
return errored(Errors.generic('UNSAFE_TRANSACTION', 'Only use sql.begin or max: 1'))
574+
return errored(Errors.generic('UNSAFE_TRANSACTION', 'Only use sql.begin, sql.reserved or max: 1'))
575575

576576
if (query.options.simple)
577577
return BindComplete()

src/index.js

+40-2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ function Postgres(a, b) {
7474
END: CLOSE,
7575
PostgresError,
7676
options,
77+
reserve,
7778
listen,
7879
begin,
7980
close,
@@ -199,6 +200,36 @@ function Postgres(a, b) {
199200
return await sql`select pg_notify(${ channel }, ${ '' + payload })`
200201
}
201202

203+
async function reserve() {
204+
const q = Queue()
205+
const c = open.length
206+
? open.shift()
207+
: await new Promise(r => {
208+
queries.push({ reserve: r })
209+
closed.length && connect(closed.shift())
210+
})
211+
212+
move(c, reserved)
213+
c.reserved = () => q.length
214+
? c.execute(q.shift())
215+
: move(c, reserved)
216+
c.reserved.release = true
217+
218+
const sql = Sql(handler)
219+
sql.release = () => {
220+
c.reserved = null
221+
onopen(c)
222+
}
223+
224+
return sql
225+
226+
function handler(q) {
227+
c.queue === full
228+
? q.push(q)
229+
: c.execute(q) || move(c, full)
230+
}
231+
}
232+
202233
async function begin(options, fn) {
203234
!fn && (fn = options, options = '')
204235
const queries = Queue()
@@ -270,6 +301,7 @@ function Postgres(a, b) {
270301
queue === open
271302
? c.idleTimer.start()
272303
: c.idleTimer.cancel()
304+
return c
273305
}
274306

275307
function json(x) {
@@ -348,6 +380,7 @@ function Postgres(a, b) {
348380
function connect(c, query) {
349381
move(c, connecting)
350382
c.connect(query)
383+
return c
351384
}
352385

353386
function onend(c) {
@@ -361,8 +394,13 @@ function Postgres(a, b) {
361394
let max = Math.ceil(queries.length / (connecting.length + 1))
362395
, ready = true
363396

364-
while (ready && queries.length && max-- > 0)
365-
ready = c.execute(queries.shift())
397+
while (ready && queries.length && max-- > 0) {
398+
const query = queries.shift()
399+
if (query.reserve)
400+
return query.reserve(c)
401+
402+
ready = c.execute(query)
403+
}
366404

367405
ready
368406
? move(c, busy)

tests/index.js

+20
Original file line numberDiff line numberDiff line change
@@ -2499,3 +2499,23 @@ t('concurrent cursors multiple connections', async() => {
24992499

25002500
return ['12233445566778', xs.sort().join('')]
25012501
})
2502+
2503+
t('reserve connection', async() => {
2504+
const reserved = await sql.reserve()
2505+
2506+
setTimeout(() => reserved.release(), 510)
2507+
2508+
const xs = await Promise.all([
2509+
reserved`select 1 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
2510+
sql`select 2 as x`.then(([{ x }]) => ({ time: Date.now(), x })),
2511+
reserved`select 3 as x`.then(([{ x }]) => ({ time: Date.now(), x }))
2512+
])
2513+
2514+
if (xs[1].time - xs[2].time < 500)
2515+
throw new Error('Wrong time')
2516+
2517+
return [
2518+
'123',
2519+
xs.map(x => x.x).join('')
2520+
]
2521+
})

0 commit comments

Comments
 (0)