Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 4156c3f

Browse files
committedFeb 3, 2020
Add cursor support
1 parent 295ac4f commit 4156c3f

File tree

8 files changed

+185
-57
lines changed

8 files changed

+185
-57
lines changed
 

‎README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,52 @@ await sql`
121121

122122
```
123123

124+
## Cursor ```sql` `.cursor([rows = 1], fn) -> Promise```
125+
126+
Use cursors if you need to throttle the amount of rows being returned from a query. New results won't be requested until the promise / async callack function has resolved.
127+
128+
```js
129+
130+
await sql.cursor`
131+
select * from generate_series(1,4) as x
132+
`.cursor(row => {
133+
// row = { x: 1 }
134+
http.request('https://example.com/wat', { row })
135+
})
136+
137+
// No more rows
138+
139+
```
140+
141+
A single row will be returned by default, but you can also request batches by setting the number of rows desired in each batch as the first argument. That is usefull if you can do work with the rows in parallel like in this example:
142+
143+
```js
144+
145+
await sql.cursor`
146+
select * from generate_series(1,1000) as x
147+
`.cursor(10, rows => {
148+
// rows = [{ x: 1 }, { x: 2 }, ... ]
149+
await Promise.all(rows.map(row =>
150+
http.request('https://example.com/wat', { row })
151+
))
152+
})
153+
154+
```
155+
156+
If an error is thrown inside the callback function no more rows will be requested and the promise will reject with the thrown error.
157+
158+
You can also stop receiving any more rows early by returning an end token `sql.END` from the callback function.
159+
160+
```js
161+
162+
await sql.cursor`
163+
select * from generate_series(1,1000) as x
164+
`.cursor(row => {
165+
return Math.random() > 0.9 && sql.END
166+
})
167+
168+
```
169+
124170
## Listen and notify
125171

126172
When you call listen, a dedicated connection will automatically be made to ensure that you receive notifications in real time. This connection will be used for any further calls to listen. Listen returns a promise which resolves once the `LISTEN` query to Postgres completes, or if there is already a listener active.

‎lib/backend.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ module.exports = Backend
88
function Backend({
99
onparse,
1010
onparameter,
11+
onsuspended,
12+
oncomplete,
1113
parsers,
1214
onauth,
1315
onready,
@@ -83,6 +85,8 @@ function Backend({
8385
break
8486
}
8587
}
88+
89+
oncomplete()
8690
}
8791

8892
/* c8 ignore next 3 */
@@ -161,9 +165,10 @@ function Backend({
161165
onparameter(k, v)
162166
}
163167

164-
/* c8 ignore next 3 */
165168
function PortalSuspended() {
166-
backend.error = errors.notSupported('PortalSuspended')
169+
onsuspended(backend.query.result)
170+
backend.query.result = []
171+
rows = 0
167172
}
168173

169174
/* c8 ignore next 3 */

‎lib/bytes.js

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,18 @@
11
const size = 256
22
let buffer = Buffer.allocUnsafe(size)
33

4-
const messages = {
5-
B: 'B'.charCodeAt(0),
6-
Q: 'Q'.charCodeAt(0),
7-
P: 'P'.charCodeAt(0),
8-
p: 'p'.charCodeAt(0)
9-
}
10-
11-
const b = {
12-
i: 0,
13-
B() {
14-
buffer[0] = messages.B
15-
b.i = 5
16-
return b
17-
},
18-
Q() {
19-
buffer[0] = messages.Q
20-
b.i = 5
21-
return b
22-
},
23-
P() {
24-
buffer[0] = messages.P
4+
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => {
5+
const v = x.charCodeAt(0)
6+
acc[x] = () => {
7+
buffer[0] = v
258
b.i = 5
269
return b
27-
},
28-
p() {
29-
buffer[0] = messages.p
30-
b.i = 5
31-
return b
32-
},
10+
}
11+
return acc
12+
}, {})
13+
14+
const b = Object.assign(messages, {
15+
i: 0,
3316
inc(x) {
3417
b.i += x
3518
return b
@@ -69,7 +52,7 @@ const b = {
6952
buffer = Buffer.allocUnsafe(size)
7053
return out
7154
}
72-
}
55+
})
7356

7457
module.exports = b
7558

‎lib/connection.js

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ const tls = require('tls')
33
const frontend = require('./frontend.js')
44
const Backend = require('./backend.js')
55
const Queue = require('./queue.js')
6-
const { errors } = require('./types.js')
6+
const { errors, END } = require('./types.js')
77

88
module.exports = Connection
99

@@ -42,6 +42,8 @@ function Connection(options = {}) {
4242
const backend = Backend({
4343
onparse,
4444
onparameter,
45+
onsuspended,
46+
oncomplete,
4547
transform,
4648
parsers,
4749
onnotify,
@@ -51,6 +53,23 @@ function Connection(options = {}) {
5153
error
5254
})
5355

56+
function onsuspended(x) {
57+
new Promise(r => r(backend.query.cursor(
58+
backend.query.cursor.rows === 1 ? x[0] : x
59+
))).then(x => {
60+
x === END
61+
? socket.write(frontend.Close())
62+
: socket.write(frontend.Execute(backend.query.cursor.rows))
63+
}).catch(err => {
64+
socket.write(frontend.Close())
65+
backend.query.reject(err)
66+
})
67+
}
68+
69+
function oncomplete() {
70+
backend.query.cursor && socket.write(frontend.Close())
71+
}
72+
5473
function onparse() {
5574
if (backend.query && backend.query.statement.sig)
5675
statements[backend.query.statement.sig] = backend.query.statement
@@ -72,8 +91,7 @@ function Connection(options = {}) {
7291
ended = () => resolve(socket.end())
7392
})
7493

75-
if (!backend.query && queries.length === 0)
76-
ended()
94+
process.nextTick(() => ready && ended())
7795

7896
return promise
7997
}
@@ -117,17 +135,23 @@ function Connection(options = {}) {
117135

118136
function prepared(statement, args, query) {
119137
query.statement = statement
120-
return frontend.Bind(statement.name, args)
138+
return bind(query, args)
121139
}
122140

123141
function prepare(sig, str, args, query) {
124142
query.statement = { name: sig ? 'p' + statement_id++ : '', sig }
125143
return Buffer.concat([
126144
frontend.Parse(query.statement.name, str, args),
127-
frontend.Bind(query.statement.name, args)
145+
bind(query, args)
128146
])
129147
}
130148

149+
function bind(query, args) {
150+
return query.cursor
151+
? frontend.Bind(query.statement.name, args, query.cursor.rows)
152+
: frontend.Bind(query.statement.name, args)
153+
}
154+
131155
function idle() {
132156
clearTimeout(timer)
133157
timer = setTimeout(socket.end, timeout * 1000)
@@ -141,9 +165,6 @@ function Connection(options = {}) {
141165
backend.query = backend.error = null
142166
timeout && queries.length === 0 && idle()
143167

144-
if (queries.length === 0 && ended)
145-
return ended()
146-
147168
if (!open) {
148169
messages.forEach(socket.write)
149170
messages = []
@@ -152,6 +173,7 @@ function Connection(options = {}) {
152173

153174
backend.query = queries.shift()
154175
ready = !backend.query
176+
ready && ended && ended()
155177
}
156178

157179
function data(x) {
@@ -246,7 +268,7 @@ function postgresSocket(options, {
246268
return Promise.resolve()
247269
},
248270
end: () => {
249-
return new Promise(r => socket && socket.end(r))
271+
return new Promise(r => socket ? socket.end(r) : r())
250272
},
251273
connect
252274
}

‎lib/frontend.js

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ const bytes = require('./bytes.js')
33
const { errors, entries } = require('./types.js')
44

55
const N = String.fromCharCode(0)
6-
const execute = bytes
7-
.inc(5)
8-
.str('D').i32(6).str('P').str(N)
9-
.str('E').i32(9).z(5)
10-
.str('H').i32(4)
11-
.str('S').i32(4)
12-
.end().slice(5)
6+
const execute = Buffer.concat([
7+
bytes.D().str('P').str(N).end(),
8+
bytes.E().str(N).i32(0).end(),
9+
bytes.H().end(),
10+
bytes.S().end()
11+
])
1312

1413
const authNames = {
1514
2 : 'KerberosV5',
@@ -38,7 +37,9 @@ module.exports = {
3837
auth,
3938
Bind,
4039
Parse,
41-
Query
40+
Query,
41+
Close,
42+
Execute
4243
}
4344

4445
function connect({ user, database, connection }) {
@@ -140,7 +141,7 @@ function Query(x) {
140141
.end()
141142
}
142143

143-
function Bind(name, args) {
144+
function Bind(name, args, rows = 0) {
144145
let prev
145146

146147
bytes
@@ -165,7 +166,13 @@ function Bind(name, args) {
165166

166167
return Buffer.concat([
167168
bytes.end(),
168-
execute
169+
rows
170+
? Buffer.concat([
171+
bytes.D().str('P').str(N).end(),
172+
bytes.E().str(N).i32(rows).end(),
173+
bytes.H().end()
174+
])
175+
: execute
169176
])
170177
}
171178

@@ -181,6 +188,20 @@ function Parse(name, str, args) {
181188
return bytes.end()
182189
}
183190

191+
function Execute(rows) {
192+
return Buffer.concat([
193+
bytes.E().str(N).i32(rows).end(),
194+
bytes.H().end()
195+
])
196+
}
197+
198+
function Close() {
199+
return Buffer.concat([
200+
bytes.C().str('P').str(N).end(),
201+
bytes.S().end()
202+
])
203+
}
204+
184205
function md5(x) {
185206
return crypto.createHash('md5').update(x).digest('hex')
186207
}

‎lib/index.js

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ const {
1414
toKebab,
1515
errors,
1616
escape,
17-
types
17+
types,
18+
END
1819
} = require('./types.js')
1920

2021
const notPromise = {
@@ -171,7 +172,7 @@ function Postgres(a, b) {
171172
: fetchArrayTypes(connection).then(() => send(connection, query, xs, args)).catch(reject)
172173
})
173174

174-
promise.stream = (fn) => (query.stream = fn, promise)
175+
addMethods(promise, query)
175176

176177
return promise
177178
}
@@ -185,7 +186,7 @@ function Postgres(a, b) {
185186

186187
function send(connection, query, xs, args) {
187188
connection
188-
? connection.send(query, query.raw ? parseRaw(xs, args) : parse(query, xs, args))
189+
? process.nextTick(connection.send, query, query.raw ? parseRaw(xs, args) : parse(query, xs, args))
189190
: queries.push({ query, xs, args })
190191
}
191192

@@ -246,6 +247,7 @@ function Postgres(a, b) {
246247

247248
function addTypes(sql, connection) {
248249
Object.assign(sql, {
250+
END,
249251
types: {},
250252
notify,
251253
unsafe,
@@ -287,7 +289,7 @@ function Postgres(a, b) {
287289
})
288290
}))).then(str => query(q, connection || getConnection(), str, args || []))
289291

290-
promise.stream = fn => (q.stream = fn, promise)
292+
addMethods(promise, q)
291293

292294
return promise
293295
}
@@ -297,6 +299,19 @@ function Postgres(a, b) {
297299
})
298300
}
299301

302+
function addMethods(promise, query) {
303+
promise.stream = (fn) => (query.stream = fn, promise)
304+
promise.cursor = (rows, fn) => {
305+
if (typeof rows === 'function') {
306+
fn = rows
307+
rows = 1
308+
}
309+
fn.rows = rows
310+
query.cursor = fn
311+
return promise
312+
}
313+
}
314+
300315
function listen(channel, fn) {
301316
if (channel in listeners) {
302317
listeners[channel].push(fn)
@@ -329,7 +344,7 @@ function Postgres(a, b) {
329344
return ended = Promise.all(all.map(c => c.destroy())).then(() => undefined)
330345

331346
return ended = Promise.race([
332-
Promise.all(all.map(c => c.end()))
347+
Promise.resolve(arrayTypesPromise).then(() => Promise.all(all.map(c => c.end())))
333348
].concat(
334349
timeout > 0
335350
? new Promise(r => destroy = setTimeout(() => (all.map(c => c.destroy()), r()), timeout * 1000))
There was a problem loading the remainder of the diff.

0 commit comments

Comments
 (0)
Failed to load comments.