Skip to content

Commit cecdaf3

Browse files
committed
build
1 parent 7e64cd9 commit cecdaf3

File tree

6 files changed

+60
-8
lines changed

6 files changed

+60
-8
lines changed

cjs/src/connection.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
8080
, incoming = Buffer.alloc(0)
8181
, needsTypes = options.fetch_types
8282
, backendParameters = {}
83+
, portalId = Math.random().toString(36).slice(2)
84+
, portalCount = 1
8385
, statements = {}
8486
, statementId = Math.random().toString(36).slice(2)
8587
, statementCount = 1
@@ -168,6 +170,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
168170
&& !q.describeFirst
169171
&& sent.length < max_pipeline
170172
&& (!q.options.onexecute || q.options.onexecute(connection))
173+
&& !q.portal
171174
} catch (error) {
172175
sent.length === 0 && write(Sync)
173176
errored(error)
@@ -199,9 +202,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
199202

200203
function prepared(q) {
201204
return Buffer.concat([
202-
Bind(q.parameters, q.statement.types, q.statement.name, q.cursorName),
203-
q.cursorFn
204-
? Execute('', q.cursorRows)
205+
Bind(q.parameters, q.statement.types, q.statement.name, q.portal),
206+
q.portal
207+
? Execute(q.portal, q.cursorRows)
205208
: ExecuteUnnamed
206209
])
207210
}
@@ -222,6 +225,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
222225

223226
!q.tagged && q.args.forEach(x => handleValue(x, parameters, types, options))
224227

228+
q.cursorRows && (q.portal = 'P' + portalId + portalCount++)
225229
q.prepare = options.prepare && ('prepare' in q.options ? q.options.prepare : true)
226230
q.string = string
227231
q.signature = q.prepare && types + string
@@ -794,7 +798,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
794798
rows = 0
795799
x === CLOSE
796800
? write(Close(query.portal))
797-
: (result = new Result(), write(Execute('', query.cursorRows)))
801+
: (result = new Result(), write(Execute(query.portal, query.cursorRows)))
798802
} catch (err) {
799803
write(Sync)
800804
query.reject(err)

cjs/src/query.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const Query = module.exports.Query = class Query extends Promise {
3030
this.cancelled = null
3131
this.executed = false
3232
this.signature = ''
33+
this.portal = ''
3334

3435
this[originError] = this.handler.debug
3536
? new Error()

cjs/tests/index.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,27 @@ t('multiple listeners work after a reconnect', async() => {
698698
return ['1a2a1b2b', xs.join('')]
699699
})
700700

701+
t('concurrent cursors', async() => {
702+
const xs = []
703+
704+
await Promise.all([...Array(7)].map((x, i) => [
705+
sql`select ${ i }::int as a, generate_series(1, 2) as x`.cursor(([x]) => xs.push(x.a + x.x))
706+
]).flat())
707+
708+
return ['12233445566778', xs.join('')]
709+
})
710+
711+
t('concurrent cursors multiple connections', async() => {
712+
const sql = postgres({ ...options, max: 2 })
713+
const xs = []
714+
715+
await Promise.all([...Array(7)].map((x, i) => [
716+
sql`select ${ i }::int as a, generate_series(1, 2) as x`.cursor(([x]) => xs.push(x.a + x.x))
717+
]).flat())
718+
719+
return ['12233445566778', xs.sort().join('')]
720+
})
721+
701722
t('listen and notify with weird name', async() => {
702723
const sql = postgres(options)
703724
const channel = 'wat-;ø§'

deno/src/connection.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
8484
, incoming = Buffer.alloc(0)
8585
, needsTypes = options.fetch_types
8686
, backendParameters = {}
87+
, portalId = Math.random().toString(36).slice(2)
88+
, portalCount = 1
8789
, statements = {}
8890
, statementId = Math.random().toString(36).slice(2)
8991
, statementCount = 1
@@ -172,6 +174,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
172174
&& !q.describeFirst
173175
&& sent.length < max_pipeline
174176
&& (!q.options.onexecute || q.options.onexecute(connection))
177+
&& !q.portal
175178
} catch (error) {
176179
sent.length === 0 && write(Sync)
177180
errored(error)
@@ -203,9 +206,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
203206

204207
function prepared(q) {
205208
return Buffer.concat([
206-
Bind(q.parameters, q.statement.types, q.statement.name, q.cursorName),
207-
q.cursorFn
208-
? Execute('', q.cursorRows)
209+
Bind(q.parameters, q.statement.types, q.statement.name, q.portal),
210+
q.portal
211+
? Execute(q.portal, q.cursorRows)
209212
: ExecuteUnnamed
210213
])
211214
}
@@ -226,6 +229,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
226229

227230
!q.tagged && q.args.forEach(x => handleValue(x, parameters, types, options))
228231

232+
q.cursorRows && (q.portal = 'P' + portalId + portalCount++)
229233
q.prepare = options.prepare && ('prepare' in q.options ? q.options.prepare : true)
230234
q.string = string
231235
q.signature = q.prepare && types + string
@@ -798,7 +802,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
798802
rows = 0
799803
x === CLOSE
800804
? write(Close(query.portal))
801-
: (result = new Result(), write(Execute('', query.cursorRows)))
805+
: (result = new Result(), write(Execute(query.portal, query.cursorRows)))
802806
} catch (err) {
803807
write(Sync)
804808
query.reject(err)

deno/src/query.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class Query extends Promise {
3030
this.cancelled = null
3131
this.executed = false
3232
this.signature = ''
33+
this.portal = ''
3334

3435
this[originError] = this.handler.debug
3536
? new Error()

deno/tests/index.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,27 @@ t('multiple listeners work after a reconnect', async() => {
700700
return ['1a2a1b2b', xs.join('')]
701701
})
702702

703+
t('concurrent cursors', async() => {
704+
const xs = []
705+
706+
await Promise.all([...Array(7)].map((x, i) => [
707+
sql`select ${ i }::int as a, generate_series(1, 2) as x`.cursor(([x]) => xs.push(x.a + x.x))
708+
]).flat())
709+
710+
return ['12233445566778', xs.join('')]
711+
})
712+
713+
t('concurrent cursors multiple connections', async() => {
714+
const sql = postgres({ ...options, max: 2 })
715+
const xs = []
716+
717+
await Promise.all([...Array(7)].map((x, i) => [
718+
sql`select ${ i }::int as a, generate_series(1, 2) as x`.cursor(([x]) => xs.push(x.a + x.x))
719+
]).flat())
720+
721+
return ['12233445566778', xs.sort().join('')]
722+
})
723+
703724
t('listen and notify with weird name', async() => {
704725
const sql = postgres(options)
705726
const channel = 'wat-;ø§'

0 commit comments

Comments
 (0)