Skip to content

Commit 0f87d5b

Browse files
committed
Build deno + cjs
1 parent 5413f0c commit 0f87d5b

File tree

8 files changed

+147
-30
lines changed

8 files changed

+147
-30
lines changed

cjs/src/connection.js

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
7474
, lifeTimer = timer(end, options.max_lifetime)
7575
, connectTimer = timer(connectTimedOut, options.connect_timeout)
7676

77-
let socket = createSocket()
77+
let socket = null
7878
, result = new Result()
7979
, incoming = Buffer.alloc(0)
8080
, needsTypes = options.fetch_types
@@ -122,15 +122,27 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
122122

123123
return connection
124124

125-
function createSocket() {
126-
const x = net.Socket()
125+
async function createSocket() {
126+
let x
127+
try {
128+
x = options.socket
129+
? (await Promise.resolve(options.socket(options)))
130+
: net.Socket()
131+
} catch (e) {
132+
error(e)
133+
return
134+
}
127135
x.on('error', error)
128136
x.on('close', closed)
129137
x.on('drain', drain)
130138
return x
131139
}
132140

133-
function cancel({ pid, secret }, resolve, reject) {
141+
async function cancel({ pid, secret }, resolve, reject) {
142+
socket || (socket = await createSocket())
143+
if (!socket)
144+
return
145+
134146
socket.removeAllListeners()
135147
socket = net.Socket()
136148
socket.on('connect', () => socket.write(b().i32(16).i32(80877102).i32(pid).i32(secret).end(16)))
@@ -324,10 +336,19 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
324336
}
325337
}
326338

327-
function connect() {
339+
async function connect() {
328340
terminated = false
329341
backendParameters = {}
342+
socket || (socket = await createSocket())
343+
344+
if (!socket)
345+
return
346+
330347
connectTimer.start()
348+
349+
if (options.socket)
350+
return ssl ? secure() : connected()
351+
331352
socket.on('connect', ssl ? secure : connected)
332353

333354
if (options.path)
@@ -349,7 +370,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
349370
statementCount = 1
350371
lifeTimer.start()
351372
socket.on('data', data)
352-
socket.setKeepAlive(true, 1000 * keep_alive)
373+
socket.setKeepAlive && socket.setKeepAlive(true, 1000 * keep_alive)
353374
const s = StartupMessage()
354375
write(s)
355376
} catch (err) {
@@ -397,13 +418,15 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
397418
error(Errors.connection('CONNECTION_DESTROYED', options))
398419

399420
clearImmediate(nextWriteTimer)
400-
socket.removeListener('data', data)
401-
socket.removeListener('connect', connected)
402-
socket.readyState !== 'closed' && socket.end(b().X().end())
421+
if (socket) {
422+
socket.removeListener('data', data)
423+
socket.removeListener('connect', connected)
424+
socket.readyState !== 'closed' && socket.end(b().X().end())
425+
}
403426
ended && (ended(), ending = ended = null)
404427
}
405428

406-
function closed(hadError) {
429+
async function closed(hadError) {
407430
incoming = Buffer.alloc(0)
408431
remaining = 0
409432
incomings = null
@@ -416,7 +439,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
416439

417440
if (socket.encrypted) {
418441
socket.removeAllListeners()
419-
socket = createSocket()
442+
socket = null
420443
}
421444

422445
if (initial)

cjs/src/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ function parseOptions(a, b) {
397397
connection : Object.assign({ application_name: 'postgres.js' }, o.connection),
398398
target_session_attrs: tsa(o, url, env),
399399
debug : o.debug,
400+
socket : o.socket,
400401
fetch_types : 'fetch_types' in o ? o.fetch_types : true,
401402
parameters : {},
402403
shared : { retries: 0, typeArrayMap: {} },

cjs/tests/index.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ t('listen reconnects after connection error', { timeout: 3 }, async() => {
691691
await delay(1000)
692692

693693
await sql.notify('test', 'b')
694-
await delay(50)
694+
await delay(200)
695695
sql.end()
696696

697697
return ['ab', xs.join('')]
@@ -1760,7 +1760,7 @@ t('Cancel running query works', async() => {
17601760
return ['57014', error.code]
17611761
})
17621762

1763-
t('Cancel piped query works', async() => {
1763+
t('Cancel piped query works', { timeout: 1 }, async() => {
17641764
await sql`select 1`
17651765
const last = sql`select pg_sleep(0.2)`.execute()
17661766
const query = sql`select pg_sleep(2) as dig`
@@ -1954,3 +1954,24 @@ t('Ensure reconnect after max_lifetime with transactions', { timeout: 5000 }, as
19541954

19551955
return [true, true]
19561956
})
1957+
1958+
t('Custom socket works', {}, async() => {
1959+
let result
1960+
const sql = postgres({
1961+
socket: () => new Promise((resolve, reject) => {
1962+
const socket = net.Socket()
1963+
socket.connect(5432)
1964+
socket.once('data', x => result = x[0])
1965+
socket.on('error', reject)
1966+
socket.on('connect', () => resolve(socket))
1967+
}),
1968+
idle_timeout
1969+
})
1970+
1971+
await sql`select 1`
1972+
1973+
return [
1974+
result,
1975+
82
1976+
]
1977+
})

deno/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ const sql = postgres('postgres://username:password@host:port/database', {
618618
onnotice : fn, // Defaults to console.log
619619
onparameter : fn, // (key, value) when server param change
620620
debug : fn, // Is called with (connection, query, params, types)
621+
socket : fn, // fn returning custom socket to use
621622
transform : {
622623
column : fn, // Transforms incoming column names
623624
value : fn, // Transforms incoming row values
@@ -764,6 +765,31 @@ const [custom] = sql`
764765

765766
```
766767
768+
### Custom socket
769+
770+
Easily do in-process ssh tunneling to your database by providing a custom socket for Postgres.js to use. The function (optionally async) must return a socket-like duplex stream.
771+
772+
Here's a sample using [ssh2](https://github.com/mscdex/ssh2)
773+
774+
```js
775+
import ssh2 from 'ssh2'
776+
777+
const sql = postgres({
778+
...options,
779+
socket: ({ hostname, port }) => new Promise((resolve, reject) => {
780+
const ssh = new ssh2.Client()
781+
ssh
782+
.on('error', reject)
783+
.on('ready', () =>
784+
ssh.forwardOut('127.0.0.1', 12345, hostname, port,
785+
(err, socket) => err ? reject(err) : resolve(socket)
786+
)
787+
)
788+
.connect(sshOptions)
789+
})
790+
})
791+
```
792+
767793
## Teardown / Cleanup
768794
769795
To ensure proper teardown and cleanup on server restarts use `await sql.end()` before `process.exit()`.

deno/polyfills.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ export const net = {
2828
const socket = {
2929
error,
3030
success,
31-
connect: (...xs) => {
31+
connect: (port, hostname) => {
3232
socket.closed = false
3333
socket.raw = null
34-
xs.length === 1
35-
? Deno.connect({ transport: 'unix', path: xs[0] }).then(success, error)
36-
: Deno.connect({ transport: 'tcp', port: socket.port = xs[0], hostname: socket.hostname = xs[1] }).then(success, error)
34+
typeof port === 'string'
35+
? Deno.connect({ transport: 'unix', path: socket.path = port }).then(success, error)
36+
: Deno.connect({ transport: 'tcp', port: socket.port = port, hostname: socket.hostname = hostname || 'localhost' }).then(success, error) // eslint-disable-line
37+
return socket
3738
},
3839
pause: () => {
3940
paused = new Promise(r => resume = r)

deno/src/connection.js

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
7777
, lifeTimer = timer(end, options.max_lifetime)
7878
, connectTimer = timer(connectTimedOut, options.connect_timeout)
7979

80-
let socket = createSocket()
80+
let socket = null
8181
, result = new Result()
8282
, incoming = Buffer.alloc(0)
8383
, needsTypes = options.fetch_types
@@ -125,15 +125,27 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
125125

126126
return connection
127127

128-
function createSocket() {
129-
const x = net.Socket()
128+
async function createSocket() {
129+
let x
130+
try {
131+
x = options.socket
132+
? (await Promise.resolve(options.socket(options)))
133+
: net.Socket()
134+
} catch (e) {
135+
error(e)
136+
return
137+
}
130138
x.on('error', error)
131139
x.on('close', closed)
132140
x.on('drain', drain)
133141
return x
134142
}
135143

136-
function cancel({ pid, secret }, resolve, reject) {
144+
async function cancel({ pid, secret }, resolve, reject) {
145+
socket || (socket = await createSocket())
146+
if (!socket)
147+
return
148+
137149
socket.removeAllListeners()
138150
socket = net.Socket()
139151
socket.on('connect', () => socket.write(b().i32(16).i32(80877102).i32(pid).i32(secret).end(16)))
@@ -327,10 +339,19 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
327339
}
328340
}
329341

330-
function connect() {
342+
async function connect() {
331343
terminated = false
332344
backendParameters = {}
345+
socket || (socket = await createSocket())
346+
347+
if (!socket)
348+
return
349+
333350
connectTimer.start()
351+
352+
if (options.socket)
353+
return ssl ? secure() : connected()
354+
334355
socket.on('connect', ssl ? secure : connected)
335356

336357
if (options.path)
@@ -352,7 +373,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
352373
statementCount = 1
353374
lifeTimer.start()
354375
socket.on('data', data)
355-
socket
376+
socket.setKeepAlive && socket
356377
const s = StartupMessage()
357378
write(s)
358379
} catch (err) {
@@ -400,13 +421,15 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
400421
error(Errors.connection('CONNECTION_DESTROYED', options))
401422

402423
clearImmediate(nextWriteTimer)
403-
socket.removeListener('data', data)
404-
socket.removeListener('connect', connected)
405-
socket.readyState !== 'closed' && socket.end(b().X().end())
424+
if (socket) {
425+
socket.removeListener('data', data)
426+
socket.removeListener('connect', connected)
427+
socket.readyState !== 'closed' && socket.end(b().X().end())
428+
}
406429
ended && (ended(), ending = ended = null)
407430
}
408431

409-
function closed(hadError) {
432+
async function closed(hadError) {
410433
incoming = Buffer.alloc(0)
411434
remaining = 0
412435
incomings = null
@@ -419,7 +442,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
419442

420443
if (socket.encrypted) {
421444
socket.removeAllListeners()
422-
socket = createSocket()
445+
socket = null
423446
}
424447

425448
if (initial)

deno/src/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ function parseOptions(a, b) {
398398
connection : Object.assign({ application_name: 'postgres.js' }, o.connection),
399399
target_session_attrs: tsa(o, url, env),
400400
debug : o.debug,
401+
socket : o.socket,
401402
fetch_types : 'fetch_types' in o ? o.fetch_types : true,
402403
parameters : {},
403404
shared : { retries: 0, typeArrayMap: {} },

deno/tests/index.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ t('listen reconnects after connection error', { timeout: 3 }, async() => {
692692
await delay(1000)
693693

694694
await sql.notify('test', 'b')
695-
await delay(50)
695+
await delay(200)
696696
sql.end()
697697

698698
return ['ab', xs.join('')]
@@ -1761,7 +1761,7 @@ t('Cancel running query works', async() => {
17611761
return ['57014', error.code]
17621762
})
17631763

1764-
t('Cancel piped query works', async() => {
1764+
t('Cancel piped query works', { timeout: 1 }, async() => {
17651765
await sql`select 1`
17661766
const last = sql`select pg_sleep(0.2)`.execute()
17671767
const query = sql`select pg_sleep(2) as dig`
@@ -1955,3 +1955,24 @@ t('Ensure reconnect after max_lifetime with transactions', { timeout: 5000 }, as
19551955

19561956
return [true, true]
19571957
})
1958+
1959+
t('Custom socket works', {}, async() => {
1960+
let result
1961+
const sql = postgres({
1962+
socket: () => new Promise((resolve, reject) => {
1963+
const socket = net.Socket()
1964+
socket.connect(5432)
1965+
socket.once('data', x => result = x[0])
1966+
socket.on('error', reject)
1967+
socket.on('connect', () => resolve(socket))
1968+
}),
1969+
idle_timeout
1970+
})
1971+
1972+
await sql`select 1`
1973+
1974+
return [
1975+
result,
1976+
82
1977+
]
1978+
})

0 commit comments

Comments
 (0)