1
1
const os = require ( 'os' )
2
2
const fs = require ( 'fs' )
3
- const Stream = require ( 'stream' )
4
3
5
4
const {
6
5
mergeUserTypes,
@@ -21,6 +20,7 @@ const { Query, CLOSE } = require('./query.js')
21
20
const Queue = require ( './queue.js' )
22
21
const { Errors, PostgresError } = require ( './errors.js' )
23
22
const Subscribe = require ( './subscribe.js' )
23
+ const largeObject = require ( './large.js' )
24
24
25
25
Object . assign ( Postgres , {
26
26
PostgresError,
@@ -42,21 +42,22 @@ function Postgres(a, b) {
42
42
let ending = false
43
43
44
44
const queries = Queue ( )
45
- , connections = [ ...Array ( options . max ) ] . map ( ( ) => Connection ( options , { onopen, onend, ondrain, onclose } ) )
46
- , closed = Queue ( connections )
45
+ , connecting = Queue ( )
47
46
, reserved = Queue ( )
47
+ , closed = Queue ( )
48
+ , ended = Queue ( )
48
49
, open = Queue ( )
49
50
, busy = Queue ( )
50
51
, full = Queue ( )
51
- , ended = Queue ( )
52
- , connecting = Queue ( )
53
- , queues = { closed , ended , connecting , reserved , open , busy , full }
52
+ , queues = { connecting , reserved , closed , ended , open , busy , full }
53
+
54
+ const connections = [ ... Array ( options . max ) ] . map ( ( ) => Connection ( options , queues , { onopen , onend , onclose } ) )
54
55
55
56
const sql = Sql ( handler )
56
57
57
58
Object . assign ( sql , {
58
59
get parameters ( ) { return options . parameters } ,
59
- largeObject,
60
+ largeObject : largeObject . bind ( null , sql ) ,
60
61
subscribe,
61
62
CLOSE ,
62
63
END : CLOSE ,
@@ -229,90 +230,28 @@ function Postgres(a, b) {
229
230
230
231
function handler ( q ) {
231
232
q . catch ( e => uncaughtError || ( uncaughtError = e ) )
232
- c . state === ' full'
233
+ c . queue === full
233
234
? queries . push ( q )
234
- : c . execute ( q ) || ( c . state = 'full' , full . push ( c ) )
235
+ : c . execute ( q ) || move ( c , full )
235
236
}
236
237
}
237
238
238
239
function onexecute ( c ) {
239
- queues [ c . state ] . remove ( c )
240
- c . state = ' reserved'
240
+ connection = c
241
+ move ( c , reserved )
241
242
c . reserved = ( ) => queries . length
242
243
? c . execute ( queries . shift ( ) )
243
- : c . state = 'reserved'
244
- reserved . push ( c )
245
- connection = c
244
+ : move ( c , reserved )
246
245
}
247
246
}
248
247
249
- function largeObject ( oid , mode = 0x00020000 | 0x00040000 ) {
250
- return new Promise ( async ( resolve , reject ) => {
251
- await sql . begin ( async sql => {
252
- let finish
253
- ! oid && ( [ { oid } ] = await sql `select lo_creat(-1) as oid` )
254
- const [ { fd } ] = await sql `select lo_open(${ oid } , ${ mode } ) as fd`
255
-
256
- const lo = {
257
- writable,
258
- readable,
259
- close : ( ) => sql `select lo_close(${ fd } )` . then ( finish ) ,
260
- tell : ( ) => sql `select lo_tell64(${ fd } )` ,
261
- read : ( x ) => sql `select loread(${ fd } , ${ x } ) as data` ,
262
- write : ( x ) => sql `select lowrite(${ fd } , ${ x } )` ,
263
- truncate : ( x ) => sql `select lo_truncate64(${ fd } , ${ x } )` ,
264
- seek : ( x , whence = 0 ) => sql `select lo_lseek64(${ fd } , ${ x } , ${ whence } )` ,
265
- size : ( ) => sql `
266
- select
267
- lo_lseek64(${ fd } , location, 0) as position,
268
- seek.size
269
- from (
270
- select
271
- lo_lseek64($1, 0, 2) as size,
272
- tell.location
273
- from (select lo_tell64($1) as location) tell
274
- ) seek
275
- `
276
- }
277
-
278
- resolve ( lo )
279
-
280
- return new Promise ( async r => finish = r )
281
-
282
- async function readable ( {
283
- highWaterMark = 2048 * 8 ,
284
- start = 0 ,
285
- end = Infinity
286
- } = { } ) {
287
- let max = end - start
288
- start && await lo . seek ( start )
289
- return new Stream . Readable ( {
290
- highWaterMark,
291
- async read ( size ) {
292
- const l = size > max ? size - max : size
293
- max -= size
294
- const [ { data } ] = await lo . read ( l )
295
- this . push ( data )
296
- if ( data . length < size )
297
- this . push ( null )
298
- }
299
- } )
300
- }
301
-
302
- async function writable ( {
303
- highWaterMark = 2048 * 8 ,
304
- start = 0
305
- } = { } ) {
306
- start && await lo . seek ( start )
307
- return new Stream . Writable ( {
308
- highWaterMark,
309
- write ( chunk , encoding , callback ) {
310
- lo . write ( chunk ) . then ( ( ) => callback ( ) , callback )
311
- }
312
- } )
313
- }
314
- } ) . catch ( reject )
315
- } )
248
+ function move ( c , queue ) {
249
+ c . queue . remove ( c )
250
+ queue . push ( c )
251
+ c . queue = queue
252
+ queue === open
253
+ ? c . idleTimer . start ( )
254
+ : c . idleTimer . cancel ( )
316
255
}
317
256
318
257
function json ( x ) {
@@ -331,28 +270,27 @@ function Postgres(a, b) {
331
270
return query . reject ( Errors . connection ( 'CONNECTION_ENDED' , options , options ) )
332
271
333
272
if ( open . length )
334
- return go ( open , query )
273
+ return go ( open . shift ( ) , query )
335
274
336
275
if ( closed . length )
337
276
return connect ( closed . shift ( ) , query )
338
277
339
278
busy . length
340
- ? go ( busy , query )
279
+ ? go ( busy . shift ( ) , query )
341
280
: queries . push ( query )
342
281
}
343
282
344
- function go ( xs , query ) {
345
- const c = xs . shift ( )
283
+ function go ( c , query ) {
346
284
return c . execute ( query )
347
- ? ( c . state = 'busy' , busy . push ( c ) )
348
- : ( c . state = 'full' , full . push ( c ) )
285
+ ? move ( c , busy )
286
+ : move ( c , full )
349
287
}
350
288
351
289
function cancel ( query ) {
352
290
return new Promise ( ( resolve , reject ) => {
353
291
query . state
354
292
? query . active
355
- ? Connection ( options , { } ) . cancel ( query . state , resolve , reject )
293
+ ? Connection ( options ) . cancel ( query . state , resolve , reject )
356
294
: query . cancelled = { resolve, reject }
357
295
: (
358
296
queries . remove ( query ) ,
@@ -386,21 +324,17 @@ function Postgres(a, b) {
386
324
}
387
325
388
326
function connect ( c , query ) {
389
- c . state = 'connecting'
390
- connecting . push ( c )
327
+ move ( c , connecting )
391
328
c . connect ( query )
392
329
}
393
330
394
331
function onend ( c ) {
395
- queues [ c . state ] . remove ( c )
396
- c . state = 'ended'
397
- ended . push ( c )
332
+ move ( c , ended )
398
333
}
399
334
400
335
function onopen ( c ) {
401
- queues [ c . state ] . remove ( c )
402
336
if ( queries . length === 0 )
403
- return ( c . state = 'open' , open . push ( c ) )
337
+ return move ( c , open )
404
338
405
339
let max = Math . ceil ( queries . length / ( connecting . length + 1 ) )
406
340
, ready = true
@@ -409,23 +343,15 @@ function Postgres(a, b) {
409
343
ready = c . execute ( queries . shift ( ) )
410
344
411
345
ready
412
- ? ( c . state = 'busy' , busy . push ( c ) )
413
- : ( c . state = 'full' , full . push ( c ) )
414
- }
415
-
416
- function ondrain ( c ) {
417
- full . remove ( c )
418
- onopen ( c )
346
+ ? move ( c , busy )
347
+ : move ( c , full )
419
348
}
420
349
421
350
function onclose ( c ) {
422
- queues [ c . state ] . remove ( c )
423
- c . state = 'closed'
351
+ move ( c , closed )
424
352
c . reserved = null
425
353
options . onclose && options . onclose ( c . id )
426
- queries . length
427
- ? connect ( c , queries . shift ( ) )
428
- : queues . closed . push ( c )
354
+ queries . length && connect ( c , queries . shift ( ) )
429
355
}
430
356
}
431
357
@@ -468,7 +394,8 @@ function parseOptions(a, b) {
468
394
debug : o . debug ,
469
395
fetch_types : 'fetch_types' in o ? o . fetch_types : true ,
470
396
parameters : { } ,
471
- shared : { retries : 0 , typeArrayMap : { } }
397
+ shared : { retries : 0 , typeArrayMap : { } } ,
398
+ publications : o . publications || query . get ( 'publications' ) || 'alltables'
472
399
} ,
473
400
mergeUserTypes ( o . types )
474
401
)
0 commit comments