Skip to content

Multihost support for High Availability setups #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ You can use either a `postgres://` url connection string or the options to defin

```js
const sql = postgres('postgres://username:password@host:port/database', {
host : '', // Postgres ip address or domain name
port : 5432, // Postgres server port
host : '', // Postgres ip address[s] or domain name[s]
port : 5432, // Postgres server port[s]
path : '', // unix socket path (usually '/tmp')
database : '', // Name of database to connect to
username : '', // Username of database user
Expand All @@ -68,10 +68,15 @@ const sql = postgres('postgres://username:password@host:port/database', {
connection : {
application_name : 'postgres.js', // Default application_name
... // Other connection parameters
}
},
target_session_attrs : null // Use 'read-write' with multiple hosts to
// ensure only connecting to primary
})
```

### SSL
More info for the `ssl` option can be found in the [Node.js docs for tls connect options](https://nodejs.org/dist/latest-v10.x/docs/api/tls.html#tls_new_tls_tlssocket_socket_options).

Although it is [vulnerable to MITM attacks](https://security.stackexchange.com/a/229297/174913), a common configuration for the `ssl` option for some cloud providers like Heroku is to set `rejectUnauthorized` to `false` (if `NODE_ENV` is `production`):

```js
Expand All @@ -83,23 +88,31 @@ const sql =
: postgres();
```

More info for the `ssl` option can be found in the [Node.js docs for tls connect options](https://nodejs.org/dist/latest-v10.x/docs/api/tls.html#tls_new_tls_tlssocket_socket_options).
### Multi host connections - High Availability (HA)

Connection uri strings with multiple hosts works like in [`psql multiple host uris`](https://www.postgresql.org/docs/13/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS)

Connecting to the specified hosts/ports will be tried in order, and on a successfull connection retries will be reset. This ensures that hosts can come up and down seamless to your application.

If you specify `target_session_attrs: 'read-write'` or `PGTARGETSESSIONATTRS=read-write` Postgres.js will only connect to a writeable host allowing for zero down time failovers.

### Environment Variables for Options

It is also possible to connect to the database without a connection string or options, which will read the options from the environment variables in the table below:
It is also possible to connect to the database without a connection string or any options. Postgres.js will fall back to the common environment variables used by `psql` as in the table below:

```js
const sql = postgres()
```

| Option | Environment Variables |
| ---------- | ------------------------ |
| `host` | `PGHOST` |
| `port` | `PGPORT` |
| `database` | `PGDATABASE` |
| `username` | `PGUSERNAME` or `PGUSER` |
| `password` | `PGPASSWORD` |
| Option | Environment Variables |
| ----------------- | ------------------------ |
| `host` | `PGHOST` |
| `port` | `PGPORT` |
| `database` | `PGDATABASE` |
| `username` | `PGUSERNAME` or `PGUSER` |
| `password` | `PGPASSWORD` |
| `idle_timeout` | `PGIDLE_TIMEOUT` |
' `connect_timeout` | `PGCONNECT_TIMEOUT` |

## Query ```sql` ` -> Promise```

Expand Down
81 changes: 65 additions & 16 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ function Connection(options = {}) {
let ended
let open = false
let ready = false
let write = false
let next = false
let statements = {}
let connect_timer

Expand All @@ -41,7 +43,8 @@ function Connection(options = {}) {
ready,
data,
error,
close
close,
cleanup
})

const backend = Backend({
Expand Down Expand Up @@ -102,7 +105,7 @@ function Connection(options = {}) {
}

function destroy() {
error(errors.connection('CONNECTION_DESTROYED', options))
error(errors.connection('CONNECTION_DESTROYED', options, socket))
socket.destroy()
}

Expand Down Expand Up @@ -150,7 +153,7 @@ function Connection(options = {}) {
}

function connectTimedOut() {
error(errors.connection('CONNECT_TIMEOUT', options))
error(errors.connection('CONNECT_TIMEOUT', options, socket))
socket.destroy()
}

Expand Down Expand Up @@ -210,6 +213,9 @@ function Connection(options = {}) {
idle()

if (!open) {
if (multi())
return

messages.forEach(socket.write)
messages = []
open = true
Expand All @@ -220,6 +226,25 @@ function Connection(options = {}) {
ready && ended && ended()
}

function multi() {
if (next)
return (next = false, true)

if (!write && options.target_session_attrs === 'read-write') {
backend.query = {
origin: '',
result: [],
statement: {},
resolve: ([{ transaction_read_only }]) => transaction_read_only === 'on'
? (next = true, socket.destroy())
: (write = true, socket.success()),
reject: error
}
socket.write(frontend.Query('show transaction_read_only'))
return true
}
}

function data(x) {
buffer = buffer.length === 0
? x
Expand All @@ -237,54 +262,74 @@ function Connection(options = {}) {

function close() {
clearTimeout(connect_timer)
error(errors.connection('CONNECTION_CLOSED', options))
statements = {}
error(errors.connection('CONNECTION_CLOSED', options, socket))
messages = []
open = ready = false
onclose && onclose()
}

function cleanup() {
statements = {}
open = ready = write = false
}

/* c8 ignore next */
return connection
}

function postgresSocket(options, {
error,
close,
cleanup,
data
}) {
let socket
let closed = true
let succeeded = false
let next = null
let buffer
let i = 0

function onclose(err) {
oncleanup()
!succeeded && i < options.host.length
? connect()
: err instanceof Error
? error(err)
: close()
i >= options.host.length && (i = 0)
}

function onclose() {
function oncleanup() {
socket.removeListener('data', data)
socket.removeListener('error', error)
socket.removeListener('close', onclose)
socket.removeListener('error', onclose)
socket.removeListener('connect', ready)
socket.removeListener('secureConnect', ready)
closed = true
close()
cleanup()
}

function connect() {
if (!closed)
return

closed = false
closed = succeeded = false

socket = options.path
? net.connect(options.path)
: net.connect(options.port, options.host)
: net.connect(
x.port = options.port[i],
x.host = options.host[i++]
)

if (!options.ssl)
return attach(socket)

socket.once('connect', () => socket.write(Buffer.from('0000000804d2162f', 'hex')))
socket.once('error', error)
socket.once('connect', () => socket.write(frontend.SSLRequest))
socket.once('error', onclose)
socket.once('close', onclose)
socket.once('data', x => {
socket.removeListener('error', error)
socket.removeListener('error', onclose)
socket.removeListener('close', onclose)
x.toString() === 'S'
? attach(tls.connect(Object.assign({ socket }, ssl(options.ssl))))
Expand All @@ -303,22 +348,26 @@ function postgresSocket(options, {
function attach(x) {
socket = x
socket.on('data', data)
socket.once('error', error)
socket.once('error', onclose)
socket.once('connect', ready)
socket.once('secureConnect', ready)
socket.once('close', onclose)
}

function ready() {
try {
socket.write(frontend.connect(options))
socket.write(frontend.StartupMessage(options))
} catch (e) {
error(e)
socket.end()
}
}

const x = {
success: () => {
succeeded = true
i >= options.host.length && (i = 0)
},
write: x => {
buffer = buffer ? Buffer.concat([buffer, x]) : Buffer.from(x)
if (buffer.length >= 1024)
Expand Down
8 changes: 4 additions & 4 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ module.exports.errors = {
notSupported
}

function connection(x, options) {
function connection(x, options, socket) {
const error = Object.assign(
new Error(('write ' + x + ' ' + (options.path || (options.host + ':' + options.port)))),
new Error(('write ' + x + ' ' + (options.path || (socket.host + ':' + socket.port)))),
{
code: x,
errno: x,
address: options.path || options.host
}, options.path ? {} : { port: options.port }
address: options.path || socket.host
}, options.path ? {} : { port: socket.port }
)
Error.captureStackTrace(error, connection)
return error
Expand Down
8 changes: 5 additions & 3 deletions lib/frontend.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const execute = Buffer.concat([
bytes.S().end()
])

const SSLRequest = bytes.i32(8).i32(80877103).end(8)

const authNames = {
2 : 'KerberosV5',
3 : 'CleartextPassword',
Expand All @@ -33,9 +35,9 @@ const auths = {
12: SASLFinal
}


module.exports = {
connect,
StartupMessage,
SSLRequest,
auth,
Bind,
Parse,
Expand All @@ -44,7 +46,7 @@ module.exports = {
Execute
}

function connect({ user, database, connection }) {
function StartupMessage({ user, database, connection }) {
return bytes
.inc(4)
.i16(3)
Expand Down
32 changes: 24 additions & 8 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function Postgres(a, b) {
query.resolve = resolve
query.reject = reject
ended !== null
? reject(errors.connection('CONNECTION_ENDED', options))
? reject(errors.connection('CONNECTION_ENDED', options, options))
: ready
? send(connection, query, xs, args)
: fetchArrayTypes(connection).then(() => send(connection, query, xs, args)).catch(reject)
Expand Down Expand Up @@ -386,8 +386,8 @@ function Postgres(a, b) {
},
options
))
listener = { conn, result: {} };
all.push(conn);
listener = { conn, result: {} }
all.push(conn)
return listener
}

Expand Down Expand Up @@ -539,36 +539,52 @@ function Postgres(a, b) {

function parseOptions(a, b) {
const env = process.env // eslint-disable-line
, url = typeof a === 'string' ? Url.parse(a, true) : { query: {}, pathname: '' }
, o = (typeof a === 'string' ? b : a) || {}
, { url, multihost } = parseUrl(a, env)
, auth = (url.auth || '').split(':')
, host = o.hostname || o.host || url.hostname || env.PGHOST || 'localhost'
, host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost'
, port = o.port || url.port || env.PGPORT || 5432
, user = o.user || o.username || auth[0] || env.PGUSERNAME || env.PGUSER || osUsername()

return Object.assign({
host,
port,
host : host.split(',').map(x => x.split(':')[0]),
port : host.split(',').map(x => x.split(':')[1] || port),
path : o.path || host.indexOf('/') > -1 && host + '/.s.PGSQL.' + port,
database : o.database || o.db || (url.pathname || '').slice(1) || env.PGDATABASE || user,
user : user,
pass : o.pass || o.password || auth[1] || env.PGPASSWORD || '',
max : o.max || url.query.max || 10,
types : o.types || {},
ssl : o.ssl || url.sslmode || url.ssl || false,
ssl : o.ssl || url.query.sslmode || url.query.ssl || false,
idle_timeout : o.idle_timeout || url.query.idle_timeout || env.PGIDLE_TIMEOUT || warn(o.timeout),
connect_timeout : o.connect_timeout || url.query.connect_timeout || env.PGCONNECT_TIMEOUT || 30,
no_prepare : o.no_prepare,
onnotice : o.onnotice,
onparameter : o.onparameter,
transform : Object.assign({}, o.transform),
connection : Object.assign({ application_name: 'postgres.js' }, o.connection),
target_session_attrs: o.target_session_attrs || url.query.target_session_attrs || env.PGTARGETSESSIONATTRS,
debug : o.debug
},
mergeUserTypes(o.types)
)
}

function parseUrl(url) {
if (typeof url !== 'string')
return { url: { query: {} } }

let host = url
host = host.slice(host.indexOf('://') + 3)
host = host.split(/[?/]/)[0]
host = host.slice(host.indexOf('@') + 1)

return {
url: Url.parse(url.replace(host, host.split(',')[0]), true),
multihost: host.indexOf(',') > -1 && host
}
}

function warn(x) {
typeof x !== 'undefined' && console.log('The timeout option is deprecated, use idle_timeout instead') // eslint-disable-line
return x
Expand Down
Loading