Skip to content

Commit 7e64cd9

Browse files
committed
Concurrent cursors
1 parent a12108a commit 7e64cd9

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

src/connection.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
8080
, incoming = Buffer.alloc(0)
8181
, needsTypes = options.fetch_types
8282
, backendParameters = {}
83+
, portalId = Math.random().toString(36).slice(2)
84+
, portalCount = 1
8385
, statements = {}
8486
, statementId = Math.random().toString(36).slice(2)
8587
, statementCount = 1
@@ -168,6 +170,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
168170
&& !q.describeFirst
169171
&& sent.length < max_pipeline
170172
&& (!q.options.onexecute || q.options.onexecute(connection))
173+
&& !q.portal
171174
} catch (error) {
172175
sent.length === 0 && write(Sync)
173176
errored(error)
@@ -199,9 +202,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
199202

200203
function prepared(q) {
201204
return Buffer.concat([
202-
Bind(q.parameters, q.statement.types, q.statement.name, q.cursorName),
203-
q.cursorFn
204-
? Execute('', q.cursorRows)
205+
Bind(q.parameters, q.statement.types, q.statement.name, q.portal),
206+
q.portal
207+
? Execute(q.portal, q.cursorRows)
205208
: ExecuteUnnamed
206209
])
207210
}
@@ -222,6 +225,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
222225

223226
!q.tagged && q.args.forEach(x => handleValue(x, parameters, types, options))
224227

228+
q.cursorRows && (q.portal = 'P' + portalId + portalCount++)
225229
q.prepare = options.prepare && ('prepare' in q.options ? q.options.prepare : true)
226230
q.string = string
227231
q.signature = q.prepare && types + string
@@ -794,7 +798,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
794798
rows = 0
795799
x === CLOSE
796800
? write(Close(query.portal))
797-
: (result = new Result(), write(Execute('', query.cursorRows)))
801+
: (result = new Result(), write(Execute(query.portal, query.cursorRows)))
798802
} catch (err) {
799803
write(Sync)
800804
query.reject(err)

src/query.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class Query extends Promise {
3030
this.cancelled = null
3131
this.executed = false
3232
this.signature = ''
33+
this.portal = ''
3334

3435
this[originError] = this.handler.debug
3536
? new Error()

tests/index.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,27 @@ t('multiple listeners work after a reconnect', async() => {
698698
return ['1a2a1b2b', xs.join('')]
699699
})
700700

701+
t('concurrent cursors', async() => {
702+
const xs = []
703+
704+
await Promise.all([...Array(7)].map((x, i) => [
705+
sql`select ${ i }::int as a, generate_series(1, 2) as x`.cursor(([x]) => xs.push(x.a + x.x))
706+
]).flat())
707+
708+
return ['12233445566778', xs.join('')]
709+
})
710+
711+
t('concurrent cursors multiple connections', async() => {
712+
const sql = postgres({ ...options, max: 2 })
713+
const xs = []
714+
715+
await Promise.all([...Array(7)].map((x, i) => [
716+
sql`select ${ i }::int as a, generate_series(1, 2) as x`.cursor(([x]) => xs.push(x.a + x.x))
717+
]).flat())
718+
719+
return ['12233445566778', xs.sort().join('')]
720+
})
721+
701722
t('listen and notify with weird name', async() => {
702723
const sql = postgres(options)
703724
const channel = 'wat-;.ø.§'

0 commit comments

Comments
 (0)