Skip to content

Commit d9d2af9

Browse files
committed
Build
1 parent cee1a57 commit d9d2af9

File tree

9 files changed

+229
-243
lines changed

9 files changed

+229
-243
lines changed

cjs/src/connection.js

+8-12
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ const errorFields = {
4848
82 : 'routine' // R
4949
}
5050

51-
function Connection(options, { onopen = noop, onend = noop, ondrain = noop, onclose = noop } = {}) {
51+
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) {
5252
const {
5353
ssl,
5454
max,
@@ -80,7 +80,6 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
8080
, needsTypes = options.fetch_types
8181
, backendParameters = {}
8282
, statements = {}
83-
, state = 'closed'
8483
, statementId = Math.random().toString(36).slice(2)
8584
, statementCount = 1
8685
, closedDate = 0
@@ -105,13 +104,8 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
105104
, final = null
106105

107106
const connection = {
108-
get state() { return state },
109-
set state(x) {
110-
state = x
111-
state === 'open'
112-
? idleTimer.start()
113-
: idleTimer.cancel()
114-
},
107+
queue: queues.closed,
108+
idleTimer,
115109
connect(query) {
116110
initial = query
117111
reconnect()
@@ -124,6 +118,8 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
124118
id
125119
}
126120

121+
queues.closed && queues.closed.push(connection)
122+
127123
return connection
128124

129125
function createSocket() {
@@ -291,7 +287,7 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
291287

292288
/* c8 ignore next 3 */
293289
function drain() {
294-
ondrain(connection)
290+
onopen(connection)
295291
}
296292

297293
function data(x) {
@@ -362,7 +358,7 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
362358
}
363359

364360
function error(err) {
365-
if (connection.state === 'connecting' && options.host[retries + 1])
361+
if (connection.queue === queues.connecting && options.host[retries + 1])
366362
return
367363

368364
errored(err)
@@ -529,7 +525,7 @@ function Connection(options, { onopen = noop, onend = noop, ondrain = noop, oncl
529525
}
530526

531527
while (sent.length && (query = sent.shift()) && (query.active = true) && query.cancelled)
532-
Connection(options, {}).cancel(query.state, query.cancelled.resolve, query.cancelled.reject)
528+
Connection(options).cancel(query.state, query.cancelled.resolve, query.cancelled.reject)
533529

534530
if (query)
535531
return // Consider opening if able and sent.length < 50

cjs/src/index.js

+35-108
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
const os = require('os')
22
const fs = require('fs')
3-
const Stream = require('stream')
43

54
const {
65
mergeUserTypes,
@@ -21,6 +20,7 @@ const { Query, CLOSE } = require('./query.js')
2120
const Queue = require('./queue.js')
2221
const { Errors, PostgresError } = require('./errors.js')
2322
const Subscribe = require('./subscribe.js')
23+
const largeObject = require('./large.js')
2424

2525
Object.assign(Postgres, {
2626
PostgresError,
@@ -42,21 +42,22 @@ function Postgres(a, b) {
4242
let ending = false
4343

4444
const queries = Queue()
45-
, connections = [...Array(options.max)].map(() => Connection(options, { onopen, onend, ondrain, onclose }))
46-
, closed = Queue(connections)
45+
, connecting = Queue()
4746
, reserved = Queue()
47+
, closed = Queue()
48+
, ended = Queue()
4849
, open = Queue()
4950
, busy = Queue()
5051
, full = Queue()
51-
, ended = Queue()
52-
, connecting = Queue()
53-
, queues = { closed, ended, connecting, reserved, open, busy, full }
52+
, queues = { connecting, reserved, closed, ended, open, busy, full }
53+
54+
const connections = [...Array(options.max)].map(() => Connection(options, queues, { onopen, onend, onclose }))
5455

5556
const sql = Sql(handler)
5657

5758
Object.assign(sql, {
5859
get parameters() { return options.parameters },
59-
largeObject,
60+
largeObject: largeObject.bind(null, sql),
6061
subscribe,
6162
CLOSE,
6263
END: CLOSE,
@@ -229,90 +230,28 @@ function Postgres(a, b) {
229230

230231
function handler(q) {
231232
q.catch(e => uncaughtError || (uncaughtError = e))
232-
c.state === 'full'
233+
c.queue === full
233234
? queries.push(q)
234-
: c.execute(q) || (c.state = 'full', full.push(c))
235+
: c.execute(q) || move(c, full)
235236
}
236237
}
237238

238239
function onexecute(c) {
239-
queues[c.state].remove(c)
240-
c.state = 'reserved'
240+
connection = c
241+
move(c, reserved)
241242
c.reserved = () => queries.length
242243
? c.execute(queries.shift())
243-
: c.state = 'reserved'
244-
reserved.push(c)
245-
connection = c
244+
: move(c, reserved)
246245
}
247246
}
248247

249-
function largeObject(oid, mode = 0x00020000 | 0x00040000) {
250-
return new Promise(async(resolve, reject) => {
251-
await sql.begin(async sql => {
252-
let finish
253-
!oid && ([{ oid }] = await sql`select lo_creat(-1) as oid`)
254-
const [{ fd }] = await sql`select lo_open(${ oid }, ${ mode }) as fd`
255-
256-
const lo = {
257-
writable,
258-
readable,
259-
close : () => sql`select lo_close(${ fd })`.then(finish),
260-
tell : () => sql`select lo_tell64(${ fd })`,
261-
read : (x) => sql`select loread(${ fd }, ${ x }) as data`,
262-
write : (x) => sql`select lowrite(${ fd }, ${ x })`,
263-
truncate : (x) => sql`select lo_truncate64(${ fd }, ${ x })`,
264-
seek : (x, whence = 0) => sql`select lo_lseek64(${ fd }, ${ x }, ${ whence })`,
265-
size : () => sql`
266-
select
267-
lo_lseek64(${ fd }, location, 0) as position,
268-
seek.size
269-
from (
270-
select
271-
lo_lseek64($1, 0, 2) as size,
272-
tell.location
273-
from (select lo_tell64($1) as location) tell
274-
) seek
275-
`
276-
}
277-
278-
resolve(lo)
279-
280-
return new Promise(async r => finish = r)
281-
282-
async function readable({
283-
highWaterMark = 2048 * 8,
284-
start = 0,
285-
end = Infinity
286-
} = {}) {
287-
let max = end - start
288-
start && await lo.seek(start)
289-
return new Stream.Readable({
290-
highWaterMark,
291-
async read(size) {
292-
const l = size > max ? size - max : size
293-
max -= size
294-
const [{ data }] = await lo.read(l)
295-
this.push(data)
296-
if (data.length < size)
297-
this.push(null)
298-
}
299-
})
300-
}
301-
302-
async function writable({
303-
highWaterMark = 2048 * 8,
304-
start = 0
305-
} = {}) {
306-
start && await lo.seek(start)
307-
return new Stream.Writable({
308-
highWaterMark,
309-
write(chunk, encoding, callback) {
310-
lo.write(chunk).then(() => callback(), callback)
311-
}
312-
})
313-
}
314-
}).catch(reject)
315-
})
248+
function move(c, queue) {
249+
c.queue.remove(c)
250+
queue.push(c)
251+
c.queue = queue
252+
queue === open
253+
? c.idleTimer.start()
254+
: c.idleTimer.cancel()
316255
}
317256

318257
function json(x) {
@@ -331,28 +270,27 @@ function Postgres(a, b) {
331270
return query.reject(Errors.connection('CONNECTION_ENDED', options, options))
332271

333272
if (open.length)
334-
return go(open, query)
273+
return go(open.shift(), query)
335274

336275
if (closed.length)
337276
return connect(closed.shift(), query)
338277

339278
busy.length
340-
? go(busy, query)
279+
? go(busy.shift(), query)
341280
: queries.push(query)
342281
}
343282

344-
function go(xs, query) {
345-
const c = xs.shift()
283+
function go(c, query) {
346284
return c.execute(query)
347-
? (c.state = 'busy', busy.push(c))
348-
: (c.state = 'full', full.push(c))
285+
? move(c, busy)
286+
: move(c, full)
349287
}
350288

351289
function cancel(query) {
352290
return new Promise((resolve, reject) => {
353291
query.state
354292
? query.active
355-
? Connection(options, {}).cancel(query.state, resolve, reject)
293+
? Connection(options).cancel(query.state, resolve, reject)
356294
: query.cancelled = { resolve, reject }
357295
: (
358296
queries.remove(query),
@@ -386,21 +324,17 @@ function Postgres(a, b) {
386324
}
387325

388326
function connect(c, query) {
389-
c.state = 'connecting'
390-
connecting.push(c)
327+
move(c, connecting)
391328
c.connect(query)
392329
}
393330

394331
function onend(c) {
395-
queues[c.state].remove(c)
396-
c.state = 'ended'
397-
ended.push(c)
332+
move(c, ended)
398333
}
399334

400335
function onopen(c) {
401-
queues[c.state].remove(c)
402336
if (queries.length === 0)
403-
return (c.state = 'open', open.push(c))
337+
return move(c, open)
404338

405339
let max = Math.ceil(queries.length / (connecting.length + 1))
406340
, ready = true
@@ -409,23 +343,15 @@ function Postgres(a, b) {
409343
ready = c.execute(queries.shift())
410344

411345
ready
412-
? (c.state = 'busy', busy.push(c))
413-
: (c.state = 'full', full.push(c))
414-
}
415-
416-
function ondrain(c) {
417-
full.remove(c)
418-
onopen(c)
346+
? move(c, busy)
347+
: move(c, full)
419348
}
420349

421350
function onclose(c) {
422-
queues[c.state].remove(c)
423-
c.state = 'closed'
351+
move(c, closed)
424352
c.reserved = null
425353
options.onclose && options.onclose(c.id)
426-
queries.length
427-
? connect(c, queries.shift())
428-
: queues.closed.push(c)
354+
queries.length && connect(c, queries.shift())
429355
}
430356
}
431357

@@ -468,7 +394,8 @@ function parseOptions(a, b) {
468394
debug : o.debug,
469395
fetch_types : 'fetch_types' in o ? o.fetch_types : true,
470396
parameters : {},
471-
shared : { retries: 0, typeArrayMap: {} }
397+
shared : { retries: 0, typeArrayMap: {} },
398+
publications : o.publications || query.get('publications') || 'alltables'
472399
},
473400
mergeUserTypes(o.types)
474401
)

cjs/src/large.js

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
const Stream = require('stream')
2+
3+
module.exports = largeObject;function largeObject(sql, oid, mode = 0x00020000 | 0x00040000) {
4+
return new Promise(async(resolve, reject) => {
5+
await sql.begin(async sql => {
6+
let finish
7+
!oid && ([{ oid }] = await sql`select lo_creat(-1) as oid`)
8+
const [{ fd }] = await sql`select lo_open(${ oid }, ${ mode }) as fd`
9+
10+
const lo = {
11+
writable,
12+
readable,
13+
close : () => sql`select lo_close(${ fd })`.then(finish),
14+
tell : () => sql`select lo_tell64(${ fd })`,
15+
read : (x) => sql`select loread(${ fd }, ${ x }) as data`,
16+
write : (x) => sql`select lowrite(${ fd }, ${ x })`,
17+
truncate : (x) => sql`select lo_truncate64(${ fd }, ${ x })`,
18+
seek : (x, whence = 0) => sql`select lo_lseek64(${ fd }, ${ x }, ${ whence })`,
19+
size : () => sql`
20+
select
21+
lo_lseek64(${ fd }, location, 0) as position,
22+
seek.size
23+
from (
24+
select
25+
lo_lseek64($1, 0, 2) as size,
26+
tell.location
27+
from (select lo_tell64($1) as location) tell
28+
) seek
29+
`
30+
}
31+
32+
resolve(lo)
33+
34+
return new Promise(async r => finish = r)
35+
36+
async function readable({
37+
highWaterMark = 2048 * 8,
38+
start = 0,
39+
end = Infinity
40+
} = {}) {
41+
let max = end - start
42+
start && await lo.seek(start)
43+
return new Stream.Readable({
44+
highWaterMark,
45+
async read(size) {
46+
const l = size > max ? size - max : size
47+
max -= size
48+
const [{ data }] = await lo.read(l)
49+
this.push(data)
50+
if (data.length < size)
51+
this.push(null)
52+
}
53+
})
54+
}
55+
56+
async function writable({
57+
highWaterMark = 2048 * 8,
58+
start = 0
59+
} = {}) {
60+
start && await lo.seek(start)
61+
return new Stream.Writable({
62+
highWaterMark,
63+
write(chunk, encoding, callback) {
64+
lo.write(chunk).then(() => callback(), callback)
65+
}
66+
})
67+
}
68+
}).catch(reject)
69+
})
70+
}

cjs/src/subscribe.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) {
4646
}
4747
}
4848

49-
async function init(sql, slot, publications = 'alltables') {
49+
async function init(sql, slot, publications) {
5050
if (!publications)
5151
throw new Error('Missing publication names')
5252

0 commit comments

Comments
 (0)