diff --git a/README.md b/README.md index 550aacb7..940e3883 100644 --- a/README.md +++ b/README.md @@ -45,8 +45,8 @@ You can use either a `postgres://` url connection string or the options to defin ```js const sql = postgres('postgres://username:password@host:port/database', { - host : '', // Postgres ip address or domain name - port : 5432, // Postgres server port + host : '', // Postgres ip address[s] or domain name[s] + port : 5432, // Postgres server port[s] path : '', // unix socket path (usually '/tmp') database : '', // Name of database to connect to username : '', // Username of database user @@ -68,10 +68,15 @@ const sql = postgres('postgres://username:password@host:port/database', { connection : { application_name : 'postgres.js', // Default application_name ... // Other connection parameters - } + }, + target_session_attrs : null // Use 'read-write' with multiple hosts to + // ensure only connecting to primary }) ``` +### SSL +More info for the `ssl` option can be found in the [Node.js docs for tls connect options](https://nodejs.org/dist/latest-v10.x/docs/api/tls.html#tls_new_tls_tlssocket_socket_options). + Although it is [vulnerable to MITM attacks](https://security.stackexchange.com/a/229297/174913), a common configuration for the `ssl` option for some cloud providers like Heroku is to set `rejectUnauthorized` to `false` (if `NODE_ENV` is `production`): ```js @@ -83,23 +88,31 @@ const sql = : postgres(); ``` -More info for the `ssl` option can be found in the [Node.js docs for tls connect options](https://nodejs.org/dist/latest-v10.x/docs/api/tls.html#tls_new_tls_tlssocket_socket_options). +### Multi host connections - High Availability (HA) + +Connection uri strings with multiple hosts works like in [`psql multiple host uris`](https://www.postgresql.org/docs/13/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) + +Connecting to the specified hosts/ports will be tried in order, and on a successfull connection retries will be reset. This ensures that hosts can come up and down seamless to your application. + +If you specify `target_session_attrs: 'read-write'` or `PGTARGETSESSIONATTRS=read-write` Postgres.js will only connect to a writeable host allowing for zero down time failovers. ### Environment Variables for Options -It is also possible to connect to the database without a connection string or options, which will read the options from the environment variables in the table below: +It is also possible to connect to the database without a connection string or any options. Postgres.js will fall back to the common environment variables used by `psql` as in the table below: ```js const sql = postgres() ``` -| Option | Environment Variables | -| ---------- | ------------------------ | -| `host` | `PGHOST` | -| `port` | `PGPORT` | -| `database` | `PGDATABASE` | -| `username` | `PGUSERNAME` or `PGUSER` | -| `password` | `PGPASSWORD` | +| Option | Environment Variables | +| ----------------- | ------------------------ | +| `host` | `PGHOST` | +| `port` | `PGPORT` | +| `database` | `PGDATABASE` | +| `username` | `PGUSERNAME` or `PGUSER` | +| `password` | `PGPASSWORD` | +| `idle_timeout` | `PGIDLE_TIMEOUT` | +' `connect_timeout` | `PGCONNECT_TIMEOUT` | ## Query ```sql` ` -> Promise``` diff --git a/lib/connection.js b/lib/connection.js index 22edf483..6b8ea28a 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -29,6 +29,8 @@ function Connection(options = {}) { let ended let open = false let ready = false + let write = false + let next = false let statements = {} let connect_timer @@ -41,7 +43,8 @@ function Connection(options = {}) { ready, data, error, - close + close, + cleanup }) const backend = Backend({ @@ -102,7 +105,7 @@ function Connection(options = {}) { } function destroy() { - error(errors.connection('CONNECTION_DESTROYED', options)) + error(errors.connection('CONNECTION_DESTROYED', options, socket)) socket.destroy() } @@ -150,7 +153,7 @@ function Connection(options = {}) { } function connectTimedOut() { - error(errors.connection('CONNECT_TIMEOUT', options)) + error(errors.connection('CONNECT_TIMEOUT', options, socket)) socket.destroy() } @@ -210,6 +213,9 @@ function Connection(options = {}) { idle() if (!open) { + if (multi()) + return + messages.forEach(socket.write) messages = [] open = true @@ -220,6 +226,25 @@ function Connection(options = {}) { ready && ended && ended() } + function multi() { + if (next) + return (next = false, true) + + if (!write && options.target_session_attrs === 'read-write') { + backend.query = { + origin: '', + result: [], + statement: {}, + resolve: ([{ transaction_read_only }]) => transaction_read_only === 'on' + ? (next = true, socket.destroy()) + : (write = true, socket.success()), + reject: error + } + socket.write(frontend.Query('show transaction_read_only')) + return true + } + } + function data(x) { buffer = buffer.length === 0 ? x @@ -237,13 +262,16 @@ function Connection(options = {}) { function close() { clearTimeout(connect_timer) - error(errors.connection('CONNECTION_CLOSED', options)) - statements = {} + error(errors.connection('CONNECTION_CLOSED', options, socket)) messages = [] - open = ready = false onclose && onclose() } + function cleanup() { + statements = {} + open = ready = write = false + } + /* c8 ignore next */ return connection } @@ -251,40 +279,57 @@ function Connection(options = {}) { function postgresSocket(options, { error, close, + cleanup, data }) { let socket let closed = true + let succeeded = false let next = null let buffer + let i = 0 + + function onclose(err) { + oncleanup() + !succeeded && i < options.host.length + ? connect() + : err instanceof Error + ? error(err) + : close() + i >= options.host.length && (i = 0) + } - function onclose() { + function oncleanup() { socket.removeListener('data', data) - socket.removeListener('error', error) + socket.removeListener('close', onclose) + socket.removeListener('error', onclose) socket.removeListener('connect', ready) socket.removeListener('secureConnect', ready) closed = true - close() + cleanup() } function connect() { if (!closed) return - closed = false + closed = succeeded = false socket = options.path ? net.connect(options.path) - : net.connect(options.port, options.host) + : net.connect( + x.port = options.port[i], + x.host = options.host[i++] + ) if (!options.ssl) return attach(socket) - socket.once('connect', () => socket.write(Buffer.from('0000000804d2162f', 'hex'))) - socket.once('error', error) + socket.once('connect', () => socket.write(frontend.SSLRequest)) + socket.once('error', onclose) socket.once('close', onclose) socket.once('data', x => { - socket.removeListener('error', error) + socket.removeListener('error', onclose) socket.removeListener('close', onclose) x.toString() === 'S' ? attach(tls.connect(Object.assign({ socket }, ssl(options.ssl)))) @@ -303,7 +348,7 @@ function postgresSocket(options, { function attach(x) { socket = x socket.on('data', data) - socket.once('error', error) + socket.once('error', onclose) socket.once('connect', ready) socket.once('secureConnect', ready) socket.once('close', onclose) @@ -311,7 +356,7 @@ function postgresSocket(options, { function ready() { try { - socket.write(frontend.connect(options)) + socket.write(frontend.StartupMessage(options)) } catch (e) { error(e) socket.end() @@ -319,6 +364,10 @@ function postgresSocket(options, { } const x = { + success: () => { + succeeded = true + i >= options.host.length && (i = 0) + }, write: x => { buffer = buffer ? Buffer.concat([buffer, x]) : Buffer.from(x) if (buffer.length >= 1024) diff --git a/lib/errors.js b/lib/errors.js index 58b65be4..16732d44 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -15,14 +15,14 @@ module.exports.errors = { notSupported } -function connection(x, options) { +function connection(x, options, socket) { const error = Object.assign( - new Error(('write ' + x + ' ' + (options.path || (options.host + ':' + options.port)))), + new Error(('write ' + x + ' ' + (options.path || (socket.host + ':' + socket.port)))), { code: x, errno: x, - address: options.path || options.host - }, options.path ? {} : { port: options.port } + address: options.path || socket.host + }, options.path ? {} : { port: socket.port } ) Error.captureStackTrace(error, connection) return error diff --git a/lib/frontend.js b/lib/frontend.js index fe8bd79d..e3cd4b1d 100644 --- a/lib/frontend.js +++ b/lib/frontend.js @@ -12,6 +12,8 @@ const execute = Buffer.concat([ bytes.S().end() ]) +const SSLRequest = bytes.i32(8).i32(80877103).end(8) + const authNames = { 2 : 'KerberosV5', 3 : 'CleartextPassword', @@ -33,9 +35,9 @@ const auths = { 12: SASLFinal } - module.exports = { - connect, + StartupMessage, + SSLRequest, auth, Bind, Parse, @@ -44,7 +46,7 @@ module.exports = { Execute } -function connect({ user, database, connection }) { +function StartupMessage({ user, database, connection }) { return bytes .inc(4) .i16(3) diff --git a/lib/index.js b/lib/index.js index 995d60fa..e420e8e4 100644 --- a/lib/index.js +++ b/lib/index.js @@ -175,7 +175,7 @@ function Postgres(a, b) { query.resolve = resolve query.reject = reject ended !== null - ? reject(errors.connection('CONNECTION_ENDED', options)) + ? reject(errors.connection('CONNECTION_ENDED', options, options)) : ready ? send(connection, query, xs, args) : fetchArrayTypes(connection).then(() => send(connection, query, xs, args)).catch(reject) @@ -386,8 +386,8 @@ function Postgres(a, b) { }, options )) - listener = { conn, result: {} }; - all.push(conn); + listener = { conn, result: {} } + all.push(conn) return listener } @@ -539,23 +539,23 @@ function Postgres(a, b) { function parseOptions(a, b) { const env = process.env // eslint-disable-line - , url = typeof a === 'string' ? Url.parse(a, true) : { query: {}, pathname: '' } , o = (typeof a === 'string' ? b : a) || {} + , { url, multihost } = parseUrl(a, env) , auth = (url.auth || '').split(':') - , host = o.hostname || o.host || url.hostname || env.PGHOST || 'localhost' + , host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost' , port = o.port || url.port || env.PGPORT || 5432 , user = o.user || o.username || auth[0] || env.PGUSERNAME || env.PGUSER || osUsername() return Object.assign({ - host, - port, + host : host.split(',').map(x => x.split(':')[0]), + port : host.split(',').map(x => x.split(':')[1] || port), path : o.path || host.indexOf('/') > -1 && host + '/.s.PGSQL.' + port, database : o.database || o.db || (url.pathname || '').slice(1) || env.PGDATABASE || user, user : user, pass : o.pass || o.password || auth[1] || env.PGPASSWORD || '', max : o.max || url.query.max || 10, types : o.types || {}, - ssl : o.ssl || url.sslmode || url.ssl || false, + ssl : o.ssl || url.query.sslmode || url.query.ssl || false, idle_timeout : o.idle_timeout || url.query.idle_timeout || env.PGIDLE_TIMEOUT || warn(o.timeout), connect_timeout : o.connect_timeout || url.query.connect_timeout || env.PGCONNECT_TIMEOUT || 30, no_prepare : o.no_prepare, @@ -563,12 +563,28 @@ function parseOptions(a, b) { onparameter : o.onparameter, transform : Object.assign({}, o.transform), connection : Object.assign({ application_name: 'postgres.js' }, o.connection), + target_session_attrs: o.target_session_attrs || url.query.target_session_attrs || env.PGTARGETSESSIONATTRS, debug : o.debug }, mergeUserTypes(o.types) ) } +function parseUrl(url) { + if (typeof url !== 'string') + return { url: { query: {} } } + + let host = url + host = host.slice(host.indexOf('://') + 3) + host = host.split(/[?/]/)[0] + host = host.slice(host.indexOf('@') + 1) + + return { + url: Url.parse(url.replace(host, host.split(',')[0]), true), + multihost: host.indexOf(',') > -1 && host + } +} + function warn(x) { typeof x !== 'undefined' && console.log('The timeout option is deprecated, use idle_timeout instead') // eslint-disable-line return x diff --git a/tests/index.js b/tests/index.js index a3d77b47..9c24ec3a 100644 --- a/tests/index.js +++ b/tests/index.js @@ -1190,3 +1190,24 @@ t('Catches query format errors', async() => [ 'wat', await sql.unsafe({ toString: () => { throw new Error('wat') } }).catch((e) => e.message) ]) + +t('Multiple hosts', { + timeout: 10000 +}, async() => { + const sql = postgres('postgres://localhost:5432,localhost:5433') + , result = [] + + const a = (await sql`show data_directory`)[0].data_directory + result.push((await sql`select setting as x from pg_settings where name = 'port'`)[0].x) + cp.execSync('pg_ctl stop -D "' + a + '"') + + const b = (await sql`show data_directory`)[0].data_directory + result.push((await sql`select setting as x from pg_settings where name = 'port'`)[0].x) + cp.execSync('pg_ctl start -D "' + a + '" -w -l "' + a + '/postgresql.log"') + cp.execSync('pg_ctl stop -D "' + b + '"') + + result.push((await sql`select setting as x from pg_settings where name = 'port'`)[0].x) + cp.execSync('pg_ctl start -o "-p 5433" -D "' + b + '" -w -l "' + b + '/postgresql.log"') + + return ['5432,5433,5432', result.join(',')] +}) diff --git a/types/index.d.ts b/types/index.d.ts index 8232dd9c..8e3e5d15 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -17,15 +17,15 @@ declare function postgres(url: string, options?: */ interface BaseOptions { /** Postgres ip address or domain name */ - host: string; + host: string | string[]; /** Postgres server port */ - port: number; + port: number | number[]; /** Name of database to connect to */ database: string; /** Username of database user */ - username: string; + user: string; /** True; or options for tls.connect */ - ssl: boolean | object; + ssl: 'require' | 'prefer' | boolean | object; /** Max number of connections */ max: number; /** Idle connection timeout in seconds */ @@ -34,6 +34,8 @@ interface BaseOptions { connect_timeout: number; /** Array of custom types; see more below */ types: PostgresTypeList; + /** Disable prepared mode */ + no_prepare: boolean; /** Defaults to console.log */ onnotice: (notice: postgres.Notice) => void; /** (key; value) when server param change */ @@ -117,7 +119,7 @@ declare namespace postgres { */ function toKebab(str: string): string; - const BigInt: PostgresType<(number: BigInt) => string>; + const BigInt: PostgresType<(number: bigint) => string>; interface ConnectionParameters { /** Default application_name */ @@ -127,6 +129,10 @@ declare namespace postgres { } interface Options extends Partial> { + /** @inheritdoc */ + host?: string; + /** @inheritdoc */ + port?: number; /** unix socket path (usually '/tmp') */ path?: string | (() => string); /** Password of database user (an alias for `password`) */ @@ -136,12 +142,16 @@ declare namespace postgres { /** Name of database to connect to (an alias for `database`) */ db?: Options['database']; /** Username of database user (an alias for `username`) */ - user?: Options['username']; + username?: Options['user']; /** Postgres ip address or domain name (an alias for `host`) */ hostname?: Options['host']; } interface ParsedOptions extends BaseOptions { + /** @inheritdoc */ + host: string[]; + /** @inheritdoc */ + port: number[]; /** @inheritdoc */ pass: null; serializers: { [oid: number]: T[keyof T] }; @@ -233,7 +243,7 @@ declare namespace postgres { | number | string | Date - | Buffer; + | Uint8Array; type SerializableParameter = Serializable | Helper @@ -243,6 +253,14 @@ declare namespace postgres { type HelperSerializable = { [index: string]: SerializableParameter } | { [index: string]: SerializableParameter }[]; + type SerializableKeys = (keyof T) extends infer R + ? R extends keyof T + ? T[R] extends SerializableParameter + ? R + : never + : keyof T + : keyof T; + interface Row { [column: string]: any; } @@ -257,7 +275,7 @@ declare namespace postgres { ? { '?column?': T; } : T; - type AsRowList = { [k in keyof T]: TransformRow }; + type AsRowList = { [k in keyof T]: TransformRow }; interface Column { name: T; @@ -283,14 +301,14 @@ declare namespace postgres { columns: ColumnList; } - type ExecutionResult = [] & ResultQueryMeta; + type ExecutionResult = [] & ResultQueryMeta>; type RowList = T & Iterable> & ResultQueryMeta; interface PendingQuery extends Promise> { - stream(cb: (row: NonNullable, result: ExecutionResult>) => void): Promise>>; - cursor(cb: (row: NonNullable) => void): Promise>>; - cursor(size: 1, cb: (row: NonNullable) => void): Promise>>; - cursor(size: number, cb: (rows: NonNullable[]) => void): Promise>>; + stream(cb: (row: NonNullable, result: ExecutionResult) => void): Promise>; + cursor(cb: (row: NonNullable) => void): Promise>; + cursor(size: 1, cb: (row: NonNullable) => void): Promise>; + cursor(size: number, cb: (rows: NonNullable[]) => void): Promise>; } interface PendingRequest extends Promise<[] & ResultMeta> { } @@ -329,7 +347,7 @@ declare namespace postgres { * @param keys Keys to extract from the object or from objets inside the array * @returns A formated representation of the parameter */ - (objOrArray: T, ...keys: U): Helper; + >(objOrArray: T, ...keys: U[]): Helper; END: {}; // FIXME unique symbol ? PostgresError: typeof PostgresError;