Skip to content

Commit 5413f0c

Browse files
committed
Add custom socket option - fixes porsager#284
1 parent 632d0ad commit 5413f0c

File tree

4 files changed

+82
-11
lines changed

4 files changed

+82
-11
lines changed

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ const sql = postgres('postgres://username:password@host:port/database', {
622622
onnotice : fn, // Defaults to console.log
623623
onparameter : fn, // (key, value) when server param change
624624
debug : fn, // Is called with (connection, query, params, types)
625+
socket : fn, // fn returning custom socket to use
625626
transform : {
626627
column : fn, // Transforms incoming column names
627628
value : fn, // Transforms incoming row values
@@ -768,6 +769,31 @@ const [custom] = sql`
768769

769770
```
770771
772+
### Custom socket
773+
774+
Easily do in-process ssh tunneling to your database by providing a custom socket for Postgres.js to use. The function (optionally async) must return a socket-like duplex stream.
775+
776+
Here's a sample using [ssh2](https://github.com/mscdex/ssh2)
777+
778+
```js
779+
import ssh2 from 'ssh2'
780+
781+
const sql = postgres({
782+
...options,
783+
socket: ({ hostname, port }) => new Promise((resolve, reject) => {
784+
const ssh = new ssh2.Client()
785+
ssh
786+
.on('error', reject)
787+
.on('ready', () =>
788+
ssh.forwardOut('127.0.0.1', 12345, hostname, port,
789+
(err, socket) => err ? reject(err) : resolve(socket)
790+
)
791+
)
792+
.connect(sshOptions)
793+
})
794+
})
795+
```
796+
771797
## Teardown / Cleanup
772798
773799
To ensure proper teardown and cleanup on server restarts use `await sql.end()` before `process.exit()`.

src/connection.js

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
7474
, lifeTimer = timer(end, options.max_lifetime)
7575
, connectTimer = timer(connectTimedOut, options.connect_timeout)
7676

77-
let socket = createSocket()
77+
let socket = null
7878
, result = new Result()
7979
, incoming = Buffer.alloc(0)
8080
, needsTypes = options.fetch_types
@@ -122,15 +122,27 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
122122

123123
return connection
124124

125-
function createSocket() {
126-
const x = net.Socket()
125+
async function createSocket() {
126+
let x
127+
try {
128+
x = options.socket
129+
? (await Promise.resolve(options.socket(options)))
130+
: net.Socket()
131+
} catch (e) {
132+
error(e)
133+
return
134+
}
127135
x.on('error', error)
128136
x.on('close', closed)
129137
x.on('drain', drain)
130138
return x
131139
}
132140

133-
function cancel({ pid, secret }, resolve, reject) {
141+
async function cancel({ pid, secret }, resolve, reject) {
142+
socket || (socket = await createSocket())
143+
if (!socket)
144+
return
145+
134146
socket.removeAllListeners()
135147
socket = net.Socket()
136148
socket.on('connect', () => socket.write(b().i32(16).i32(80877102).i32(pid).i32(secret).end(16)))
@@ -324,10 +336,19 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
324336
}
325337
}
326338

327-
function connect() {
339+
async function connect() {
328340
terminated = false
329341
backendParameters = {}
342+
socket || (socket = await createSocket())
343+
344+
if (!socket)
345+
return
346+
330347
connectTimer.start()
348+
349+
if (options.socket)
350+
return ssl ? secure() : connected()
351+
331352
socket.on('connect', ssl ? secure : connected)
332353

333354
if (options.path)
@@ -349,7 +370,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
349370
statementCount = 1
350371
lifeTimer.start()
351372
socket.on('data', data)
352-
socket.setKeepAlive(true, 1000 * keep_alive)
373+
socket.setKeepAlive && socket.setKeepAlive(true, 1000 * keep_alive)
353374
const s = StartupMessage()
354375
write(s)
355376
} catch (err) {
@@ -397,13 +418,15 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
397418
error(Errors.connection('CONNECTION_DESTROYED', options))
398419

399420
clearImmediate(nextWriteTimer)
400-
socket.removeListener('data', data)
401-
socket.removeListener('connect', connected)
402-
socket.readyState !== 'closed' && socket.end(b().X().end())
421+
if (socket) {
422+
socket.removeListener('data', data)
423+
socket.removeListener('connect', connected)
424+
socket.readyState !== 'closed' && socket.end(b().X().end())
425+
}
403426
ended && (ended(), ending = ended = null)
404427
}
405428

406-
function closed(hadError) {
429+
async function closed(hadError) {
407430
incoming = Buffer.alloc(0)
408431
remaining = 0
409432
incomings = null
@@ -416,7 +439,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
416439

417440
if (socket.encrypted) {
418441
socket.removeAllListeners()
419-
socket = createSocket()
442+
socket = null
420443
}
421444

422445
if (initial)

src/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ function parseOptions(a, b) {
397397
connection : Object.assign({ application_name: 'postgres.js' }, o.connection),
398398
target_session_attrs: tsa(o, url, env),
399399
debug : o.debug,
400+
socket : o.socket,
400401
fetch_types : 'fetch_types' in o ? o.fetch_types : true,
401402
parameters : {},
402403
shared : { retries: 0, typeArrayMap: {} },

tests/index.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,3 +1954,24 @@ t('Ensure reconnect after max_lifetime with transactions', { timeout: 5000 }, as
19541954

19551955
return [true, true]
19561956
})
1957+
1958+
t('Custom socket works', {}, async() => {
1959+
let result
1960+
const sql = postgres({
1961+
socket: () => new Promise((resolve, reject) => {
1962+
const socket = net.Socket()
1963+
socket.connect(5432)
1964+
socket.once('data', x => result = x[0])
1965+
socket.on('error', reject)
1966+
socket.on('connect', () => resolve(socket))
1967+
}),
1968+
idle_timeout
1969+
})
1970+
1971+
await sql`select 1`
1972+
1973+
return [
1974+
result,
1975+
82
1976+
]
1977+
})

0 commit comments

Comments
 (0)