Skip to content

Commit 8e675a3

Browse files
authored
* Copy to and from stdout and stdin - fixes porsager#170 * Should not discard last byte * Fix copy when queued * Handle backpressure for readable and writable * Fix abort on .writable()
1 parent 8b405b3 commit 8e675a3

File tree

7 files changed

+243
-26
lines changed

7 files changed

+243
-26
lines changed

lib/backend.js

+11-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ function Backend({
1515
parsers,
1616
onauth,
1717
onready,
18+
oncopy,
19+
ondata,
1820
transform,
1921
onnotice,
2022
onnotify
@@ -92,7 +94,9 @@ function Backend({
9294
}
9395

9496
/* c8 ignore next 3 */
95-
function CopyDone() { /* No handling needed */ }
97+
function CopyDone() {
98+
backend.query.readable.push(null)
99+
}
96100

97101
function DataRow(x) {
98102
let index = 7
@@ -127,21 +131,21 @@ function Backend({
127131
}
128132

129133
/* c8 ignore next 3 */
130-
function CopyData() { /* No handling needed until implemented */ }
134+
function CopyData(x) {
135+
ondata(x.slice(5))
136+
}
131137

132138
function ErrorResponse(x) {
133139
onerror(errors.postgres(parseError(x)))
134140
}
135141

136142
/* c8 ignore next 3 */
137143
function CopyInResponse() {
138-
backend.error = errors.notSupported('CopyInResponse')
144+
oncopy()
139145
}
140146

141147
/* c8 ignore next 3 */
142-
function CopyOutResponse() {
143-
backend.error = errors.notSupported('CopyOutResponse')
144-
}
148+
function CopyOutResponse() { /* No handling needed */ }
145149

146150
/* c8 ignore next 3 */
147151
function EmptyQueryResponse() { /* No handling needed */ }
@@ -227,7 +231,7 @@ function Backend({
227231

228232
/* c8 ignore next 3 */
229233
function CopyBothResponse() {
230-
backend.error = errors.notSupported('CopyBothResponse')
234+
oncopy()
231235
}
232236

233237
function ReadyForQuery() {

lib/bytes.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
const size = 256
22
let buffer = Buffer.allocUnsafe(size)
33

4-
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => {
4+
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S', 'd', 'c', 'f'].reduce((acc, x) => {
55
const v = x.charCodeAt(0)
66
acc[x] = () => {
77
buffer[0] = v
@@ -45,6 +45,11 @@ const b = Object.assign(messages, {
4545
b.i += x
4646
return b
4747
},
48+
raw(x) {
49+
buffer = Buffer.concat([buffer.slice(0, b.i), x])
50+
b.i = buffer.length
51+
return b
52+
},
4853
end(at = 1) {
4954
buffer.writeUInt32BE(b.i - at, at)
5055
const out = buffer.slice(0, b.i)

lib/connection.js

+27-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ function Connection(options = {}) {
3737
const queries = Queue()
3838
, id = count++
3939
, uid = Math.random().toString(36).slice(2)
40-
, connection = { send, end, destroy }
4140

4241
const socket = postgresSocket(options, {
4342
ready,
@@ -47,6 +46,8 @@ function Connection(options = {}) {
4746
cleanup
4847
})
4948

49+
const connection = { send, end, destroy, socket }
50+
5051
const backend = Backend({
5152
onparse,
5253
onparameter,
@@ -59,6 +60,8 @@ function Connection(options = {}) {
5960
onnotice,
6061
onready,
6162
onauth,
63+
oncopy,
64+
ondata,
6265
error
6366
})
6467

@@ -235,6 +238,21 @@ function Connection(options = {}) {
235238
ready && ended && ended()
236239
}
237240

241+
function oncopy() {
242+
backend.query.writable.push = ({ chunk, error, callback }) => {
243+
error
244+
? socket.write(frontend.CopyFail(error))
245+
: chunk === null
246+
? socket.write(frontend.CopyDone())
247+
: socket.write(frontend.CopyData(chunk), callback)
248+
}
249+
backend.query.writable.forEach(backend.query.writable.push)
250+
}
251+
252+
function ondata(x) {
253+
!backend.query.readable.push(x) && socket.pause()
254+
}
255+
238256
function multi() {
239257
if (next)
240258
return (next = false, true)
@@ -378,11 +396,15 @@ function postgresSocket(options, {
378396
succeeded = true
379397
i >= options.host.length && (i = 0)
380398
},
381-
write: x => {
399+
pause: () => socket.pause(),
400+
resume: () => socket.resume(),
401+
isPaused: () => socket.isPaused(),
402+
write: (x, callback) => {
382403
buffer = buffer ? Buffer.concat([buffer, x]) : Buffer.from(x)
383404
if (buffer.length >= 1024)
384-
return write()
405+
return write(callback)
385406
next === null && (next = setImmediate(write))
407+
callback && callback()
386408
},
387409
destroy: () => {
388410
socket && socket.destroy()
@@ -395,8 +417,8 @@ function postgresSocket(options, {
395417
connect
396418
}
397419

398-
function write() {
399-
socket.write(buffer)
420+
function write(callback) {
421+
socket.write(buffer, callback)
400422
next !== null && clearImmediate(next)
401423
buffer = next = null
402424
}

lib/frontend.js

+24-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ module.exports = {
4545
Parse,
4646
Query,
4747
Close,
48-
Execute
48+
Execute,
49+
CopyData,
50+
CopyDone,
51+
CopyFail
4952
}
5053

5154
function StartupMessage({ user, database, connection }) {
@@ -147,6 +150,26 @@ function Query(x) {
147150
.end()
148151
}
149152

153+
function CopyData(x) {
154+
return bytes
155+
.d()
156+
.raw(x)
157+
.end()
158+
}
159+
160+
function CopyDone() {
161+
return bytes
162+
.c()
163+
.end()
164+
}
165+
166+
function CopyFail(err) {
167+
return bytes
168+
.f()
169+
.str(String(err) + N)
170+
.end()
171+
}
172+
150173
function Bind(name, args, rows = 0) {
151174
let prev
152175

lib/index.js

+64-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const fs = require('fs')
22
const Url = require('url')
3+
const Stream = require('stream')
34
const Connection = require('./connection.js')
45
const Queue = require('./queue.js')
56
const { errors, PostgresError } = require('./errors.js')
@@ -146,7 +147,7 @@ function Postgres(a, b) {
146147
})
147148
.then(begin && (() => {
148149
connections.push(connection)
149-
next()
150+
next(connection)
150151
}))
151152

152153
function scoped(xs) {
@@ -158,7 +159,8 @@ function Postgres(a, b) {
158159
let c
159160
, x
160161

161-
while (queries.length && (c = getConnection(queries.peek().fn)) && (x = queries.shift())) {
162+
while ((x = queries.peek()) && (c = x.query && x.query.connection || getConnection(queries.peek().fn)) && queries.shift()) {
163+
x.query && x.query.connection && x.query.writable && (c.blocked = true)
162164
x.fn
163165
? transaction(x, c)
164166
: send(c, x.query, x.xs, x.args)
@@ -205,9 +207,12 @@ function Postgres(a, b) {
205207
}
206208

207209
function send(connection, query, xs, args) {
208-
connection
209-
? process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args))
210-
: queries.push({ query, xs, args })
210+
connection && (query.connection = connection)
211+
if (!connection || connection.blocked)
212+
return queries.push({ query, xs, args, connection })
213+
214+
connection.blocked = query.blocked
215+
process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args))
211216
}
212217

213218
function getConnection(reserve) {
@@ -325,9 +330,15 @@ function Postgres(a, b) {
325330
}
326331

327332
function addMethods(promise, query) {
333+
promise.readable = () => readable(promise, query)
334+
promise.writable = () => writable(promise, query)
328335
promise.raw = () => (query.raw = true, promise)
329336
promise.stream = (fn) => (query.stream = fn, promise)
330-
promise.cursor = (rows, fn) => {
337+
promise.cursor = cursor(promise, query)
338+
}
339+
340+
function cursor(promise, query) {
341+
return (rows, fn) => {
331342
if (typeof rows === 'function') {
332343
fn = rows
333344
rows = 1
@@ -339,6 +350,53 @@ function Postgres(a, b) {
339350
}
340351
}
341352

353+
function readable(promise, query) {
354+
query.connection
355+
? query.connection.blocked = true
356+
: query.blocked = true
357+
358+
const read = () => query.connection.socket.isPaused() && query.connection.socket.resume()
359+
promise.catch(err => query.readable.destroy(err)).then(() => {
360+
query.connection.blocked = false
361+
read()
362+
next()
363+
})
364+
return query.readable = new Stream.Readable({ read })
365+
}
366+
367+
function writable(promise, query) {
368+
query.connection
369+
? query.connection.blocked = true
370+
: query.blocked = true
371+
let error
372+
query.prepare = false
373+
query.simple = true
374+
query.writable = []
375+
promise.catch(err => error = err).then(() => {
376+
query.connection.blocked = false
377+
next()
378+
})
379+
return query.readable = new Stream.Duplex({
380+
read() { /* backpressure handling not possible */ },
381+
write(chunk, encoding, callback) {
382+
error
383+
? callback(error)
384+
: query.writable.push({ chunk, callback })
385+
},
386+
destroy(error, callback) {
387+
query.writable.push({ error })
388+
callback(error)
389+
},
390+
final(callback) {
391+
if (error)
392+
return callback(error)
393+
394+
query.writable.push({ chunk: null })
395+
promise.then(() => callback(), callback)
396+
}
397+
})
398+
}
399+
342400
function listen(channel, fn) {
343401
const listener = getListener()
344402

tests/copy.csv

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
1 2 3
2+
4 5 6

0 commit comments

Comments
 (0)