diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 92ec7033..970d2771 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,8 +8,8 @@ jobs: strategy: fail-fast: false matrix: - node: ['12', '14', '16', '18', '20'] - postgres: ['12', '13', '14', '15'] + node: ['12', '14', '16', '18', '20', '21', '22', '23', '24'] + postgres: ['12', '13', '14', '15', '16', '17'] runs-on: ubuntu-latest services: postgres: @@ -25,10 +25,10 @@ jobs: --health-timeout 5s --health-retries 5 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: | date - sudo apt purge postgresql-14 + sudo apt purge postgresql-16 sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update @@ -48,7 +48,7 @@ jobs: - uses: denoland/setup-deno@v1 with: deno-version: v1.x - - uses: actions/setup-node@v3 + - uses: actions/setup-node@v4 with: node-version: ${{ matrix.node }} - run: npm test diff --git a/README.md b/README.md index b0e64a75..b04ac21c 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,14 @@ async function insertUser({ name, age }) { } ``` +#### ESM dynamic imports + +The library can be used with ESM dynamic imports as well as shown here. + +```js +const { default: postgres } = await import('postgres') +``` + ## Table of Contents * [Connection](#connection) @@ -79,6 +87,7 @@ async function insertUser({ name, age }) { * [Teardown / Cleanup](#teardown--cleanup) * [Error handling](#error-handling) * [TypeScript support](#typescript-support) +* [Reserving connections](#reserving-connections) * [Changelog](./CHANGELOG.md) @@ -157,7 +166,7 @@ const users = await sql` ```js const columns = ['name', 'age'] -sql` +await sql` select ${ sql(columns) } from users @@ -175,7 +184,7 @@ const user = { age: 68 } -sql` +await sql` insert into users ${ sql(user, 'name', 'age') } @@ -183,6 +192,15 @@ sql` // Which results in: insert into users ("name", "age") values ($1, $2) + +// The columns can also be given with an array +const columns = ['name', 'age'] + +await sql` + insert into users ${ + sql(user, columns) + } +` ``` **You can omit column names and simply execute `sql(user)` to get all the fields from the object as columns**. Be careful not to allow users to supply columns that you do not want to be inserted. @@ -201,13 +219,13 @@ const users = [{ age: 80 }] -sql`insert into users ${ sql(users, 'name', 'age') }` +await sql`insert into users ${ sql(users, 'name', 'age') }` // Is translated to: insert into users ("name", "age") values ($1, $2), ($3, $4) // Here you can also omit column names which will use object keys as columns -sql`insert into users ${ sql(users) }` +await sql`insert into users ${ sql(users) }` // Which results in: insert into users ("name", "age") values ($1, $2), ($3, $4) @@ -222,7 +240,7 @@ const user = { age: 68 } -sql` +await sql` update users set ${ sql(user, 'name', 'age') } @@ -231,18 +249,28 @@ sql` // Which results in: update users set "name" = $1, "age" = $2 where user_id = $3 + +// The columns can also be given with an array +const columns = ['name', 'age'] + +await sql` + update users set ${ + sql(user, columns) + } + where user_id = ${ user.id } +` ``` ### Multiple updates in one query -It's possible to create multiple udpates in a single query. It's necessary to use arrays intead of objects to ensure the order of the items so that these correspond with the column names. +To create multiple updates in a single query, it is necessary to use arrays instead of objects to ensure that the order of the items correspond with the column names. ```js const users = [ [1, 'John', 34], [2, 'Jane', 27], ] -sql` - update users set name = update_data.name, (age = update_data.age)::int +await sql` + update users set name = update_data.name, age = (update_data.age)::int from (values ${sql(users)}) as update_data (id, name, age) where users.id = (update_data.id)::int returning users.id, users.name, users.age @@ -262,7 +290,7 @@ const users = await sql` or ```js -const [{ a, b, c }] => await sql` +const [{ a, b, c }] = await sql` select * from (values ${ sql(['a', 'b', 'c']) }) as x(a, b, c) @@ -280,7 +308,7 @@ const olderThan = x => sql`and age > ${ x }` const filterAge = true -sql` +await sql` select * from users @@ -298,7 +326,7 @@ select * from users where name is not null and age > 50 ### Dynamic filters ```js -sql` +await sql` select * from users ${ @@ -314,12 +342,33 @@ select * from users select * from users where user_id = $1 ``` +### Dynamic ordering + +```js +const id = 1 +const order = { + username: 'asc' + created_at: 'desc' +} +await sql` + select + * + from ticket + where account = ${ id } + order by ${ + Object.entries(order).flatMap(([column, order], i) => + [i ? sql`,` : sql``, sql`${ sql(column) } ${ order === 'desc' ? sql`desc` : sql`asc` }`] + ) + } +` +``` + ### SQL functions Using keywords or calling functions dynamically is also possible by using ``` sql`` ``` fragments. ```js const date = null -sql` +await sql` update users set updated_at = ${ date || sql`now()` } ` @@ -333,7 +382,7 @@ Dynamic identifiers like table names and column names is also supported like so: const table = 'users' , column = 'id' -sql` +await sql` select ${ sql(column) } from ${ sql(table) } ` @@ -347,10 +396,10 @@ Here's a quick oversight over all the ways to do interpolation in a query templa | Interpolation syntax | Usage | Example | | ------------- | ------------- | ------------- | -| `${ sql`` }` | for keywords or sql fragments | ``sql`SELECT * FROM users ${sql`order by age desc` }` `` | -| `${ sql(string) }` | for identifiers | ``sql`SELECT * FROM ${sql('table_name')` `` | -| `${ sql([] or {}, ...) }` | for helpers | ``sql`INSERT INTO users ${sql({ name: 'Peter'})}` `` | -| `${ 'somevalue' }` | for values | ``sql`SELECT * FROM users WHERE age = ${42}` `` | +| `${ sql`` }` | for keywords or sql fragments | ``await sql`SELECT * FROM users ${sql`order by age desc` }` `` | +| `${ sql(string) }` | for identifiers | ``await sql`SELECT * FROM ${sql('table_name')` `` | +| `${ sql([] or {}, ...) }` | for helpers | ``await sql`INSERT INTO users ${sql({ name: 'Peter'})}` `` | +| `${ 'somevalue' }` | for values | ``await sql`SELECT * FROM users WHERE age = ${42}` `` | ## Advanced query methods @@ -430,7 +479,7 @@ await sql` Rather than executing a given query, `.describe` will return information utilized in the query process. This information can include the query identifier, column types, etc. This is useful for debugging and analyzing your Postgres queries. Furthermore, **`.describe` will give you access to the final generated query string that would be executed.** - + ### Rows as Array of Values #### ```sql``.values()``` @@ -457,7 +506,7 @@ const result = await sql.file('query.sql', ['Murray', 68]) ### Multiple statements in one query #### ```await sql``.simple()``` -The postgres wire protocol supports ["simple"](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.4) and ["extended"](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) queries. "simple" queries supports multiple statements, but does not support any dynamic parameters. "extended" queries support parameters but only one statement. To use "simple" queries you can use +The postgres wire protocol supports ["simple"](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.4) and ["extended"](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) queries. "simple" queries supports multiple statements, but does not support any dynamic parameters. "extended" queries support parameters but only one statement. To use "simple" queries you can use ```sql``.simple()```. That will create it as a simple query. ```js @@ -499,8 +548,8 @@ await pipeline(readableStream, createWriteStream('output.tsv')) ```js const readableStream = await sql` copy ( - select name, age - from users + select name, age + from users where age = 68 ) to stdout `.readable() @@ -509,7 +558,7 @@ for await (const chunk of readableStream) { } ``` -> **NOTE** This is a low-level API which does not provide any type safety. To make this work, you must match your [`copy query` parameters](https://www.postgresql.org/docs/14/sql-copy.html) correctly to your [Node.js stream read or write](https://nodejs.org/api/stream.html) code. Ensure [Node.js stream backpressure](https://nodejs.org/en/docs/guides/backpressuring-in-streams/) is handled correctly to avoid memory exhaustion. +> **NOTE** This is a low-level API which does not provide any type safety. To make this work, you must match your [`copy query` parameters](https://www.postgresql.org/docs/14/sql-copy.html) correctly to your [Node.js stream read or write](https://nodejs.org/api/stream.html) code. Ensure [Node.js stream backpressure](https://nodejs.org/en/learn/modules/backpressuring-in-streams) is handled correctly to avoid memory exhaustion. ### Canceling Queries in Progress @@ -539,7 +588,9 @@ If you know what you're doing, you can use `unsafe` to pass any string you'd lik ```js sql.unsafe('select ' + danger + ' from users where id = ' + dragons) ``` - + +By default, `sql.unsafe` assumes the `query` string is sufficiently dynamic that prepared statements do not make sense, and so defaults them to off. If you'd like to re-enable prepared statements, you can pass `{ prepare: true }`. + You can also nest `sql.unsafe` within a safe `sql` expression. This is useful if only part of your fraction has unsafe elements. ```js @@ -579,6 +630,7 @@ const [user, account] = await sql.begin(async sql => { ) values ( 'Murray' ) + returning * ` const [account] = await sql` @@ -587,12 +639,15 @@ const [user, account] = await sql.begin(async sql => { ) values ( ${ user.user_id } ) + returning * ` return [user, account] }) ``` +Do note that you can often achieve the same result using [`WITH` queries (Common Table Expressions)](https://www.postgresql.org/docs/current/queries-with.html) instead of using transactions. + It's also possible to pipeline the requests in a transaction if needed by returning an array with queries from the callback function like this: ```js @@ -638,9 +693,9 @@ sql.begin('read write', async sql => { ``` -#### PREPARE `await sql.prepare([name]) -> fn()` +#### PREPARE TRANSACTION `await sql.prepare([name]) -> fn()` -Indicates that the transactions should be prepared using the `PREPARED TRANASCTION [NAME]` statement +Indicates that the transactions should be prepared using the [`PREPARE TRANSACTION [NAME]`](https://www.postgresql.org/docs/current/sql-prepare-transaction.html) statement instead of being committed. ```js @@ -652,13 +707,11 @@ sql.begin('read write', async sql => { 'Murray' ) ` - + await sql.prepare('tx1') }) ``` -Do note that you can often achieve the same result using [`WITH` queries (Common Table Expressions)](https://www.postgresql.org/docs/current/queries-with.html) instead of using transactions. - ## Data Transformation Postgres.js allows for transformation of the data passed to or returned from a query by using the `transform` option. @@ -714,7 +767,7 @@ console.log(data) // [ { a_test: 1 } ] ### Transform `undefined` Values -By default, Postgres.js will throw the error `UNDEFINED_VALUE: Undefined values are not allowed` when undefined values are passed +By default, Postgres.js will throw the error `UNDEFINED_VALUE: Undefined values are not allowed` when undefined values are passed ```js // Transform the column names to and from camel case @@ -795,7 +848,7 @@ The optional `onlisten` method is great to use for a very simply queue mechanism ```js await sql.listen( - 'jobs', + 'jobs', (x) => run(JSON.parse(x)), ( ) => sql`select unfinished_jobs()`.forEach(run) ) @@ -828,7 +881,7 @@ CREATE PUBLICATION alltables FOR ALL TABLES const sql = postgres({ publications: 'alltables' }) const { unsubscribe } = await sql.subscribe( - 'insert:events', + 'insert:events', (row, { command, relation, key, old }) => { // Callback function for each row change // tell about new event row over eg. websockets or do something else @@ -887,7 +940,7 @@ The `Result` Array returned from queries is a custom array allowing for easy des ### .count -The `count` property is the number of affected rows returned by the database. This is usefull for insert, update and delete operations to know the number of rows since .length will be 0 in these cases if not using `RETURNING ...`. +The `count` property is the number of affected rows returned by the database. This is useful for insert, update and delete operations to know the number of rows since .length will be 0 in these cases if not using `RETURNING ...`. ### .command @@ -941,7 +994,7 @@ const sql = postgres('postgres://username:password@host:port/database', { connect_timeout : 30, // Connect timeout in seconds prepare : true, // Automatic creation of prepared statements types : [], // Array of custom types, see more below - onnotice : fn, // Defaults to console.log + onnotice : fn, // Default console.log, set false to silence NOTICE onparameter : fn, // (key, value) when server param change debug : fn, // Is called with (connection, query, params, types) socket : fn, // fn returning custom socket to use @@ -953,7 +1006,7 @@ const sql = postgres('postgres://username:password@host:port/database', { }, connection : { application_name : 'postgres.js', // Default application_name - ... // Other connection parameters + ... // Other connection parameters, see https://www.postgresql.org/docs/current/runtime-config-client.html }, target_session_attrs : null, // Use 'read-write' with multiple hosts to // ensure only connecting to primary @@ -962,7 +1015,20 @@ const sql = postgres('postgres://username:password@host:port/database', { }) ``` -Note that `max_lifetime = 60 * (30 + Math.random() * 30)` by default. This resolves to an interval between 45 and 90 minutes to optimize for the benefits of prepared statements **and** working nicely with Linux's OOM killer. +Note that `max_lifetime = 60 * (30 + Math.random() * 30)` by default. This resolves to an interval between 30 and 60 minutes to optimize for the benefits of prepared statements **and** working nicely with Linux's OOM killer. + +### Dynamic passwords + +When clients need to use alternative authentication schemes such as access tokens or connections to databases with rotating passwords, provide either a synchronous or asynchronous function that will resolve the dynamic password value at connection time. + +```js +const sql = postgres(url, { + // Other connection config + ... + // Password function for the database user + password : async () => await signer.getAuthToken(), +}) +``` ### SSL @@ -1038,6 +1104,34 @@ const sql = postgres({ }) ``` +### Cloudflare Workers support + +Postgres.js has built-in support for the [TCP socket API](https://developers.cloudflare.com/workers/runtime-apis/tcp-sockets/) in Cloudflare Workers, which is [on-track](https://github.com/wintercg/proposal-sockets-api) to be standardized and adopted in Node.js and other JavaScript runtimes, such as Deno. + +You can use Postgres.js directly in a Worker, or to benefit from connection pooling and query caching, via the [Hyperdrive](https://developers.cloudflare.com/hyperdrive/learning/connect-to-postgres/#driver-examples) service available to Workers by passing the Hyperdrive `connectionString` when creating a new `postgres` client as follows: + +```ts +// Requires Postgres.js 3.4.0 or later +import postgres from 'postgres' + +interface Env { + HYPERDRIVE: Hyperdrive; +} + +export default async fetch(req: Request, env: Env, ctx: ExecutionContext) { + // The Postgres.js library accepts a connection string directly + const sql = postgres(env.HYPERDRIVE.connectionString) + const results = await sql`SELECT * FROM users LIMIT 10` + return Response.json(results) +} +``` + +In `wrangler.toml` you will need to enable the `nodejs_compat` compatibility flag to allow Postgres.js to operate in the Workers environment: + +```toml +compatibility_flags = ["nodejs_compat"] +``` + ### Auto fetching of array types Postgres.js will automatically fetch table/array-type information when it first connects to a database. @@ -1054,20 +1148,25 @@ It is also possible to connect to the database without a connection string or an const sql = postgres() ``` -| Option | Environment Variables | -| ----------------- | ------------------------ | -| `host` | `PGHOST` | -| `port` | `PGPORT` | -| `database` | `PGDATABASE` | -| `username` | `PGUSERNAME` or `PGUSER` | -| `password` | `PGPASSWORD` | -| `idle_timeout` | `PGIDLE_TIMEOUT` | -| `connect_timeout` | `PGCONNECT_TIMEOUT` | +| Option | Environment Variables | +| ------------------ | ------------------------ | +| `host` | `PGHOST` | +| `port` | `PGPORT` | +| `database` | `PGDATABASE` | +| `username` | `PGUSERNAME` or `PGUSER` | +| `password` | `PGPASSWORD` | +| `application_name` | `PGAPPNAME` | +| `idle_timeout` | `PGIDLE_TIMEOUT` | +| `connect_timeout` | `PGCONNECT_TIMEOUT` | ### Prepared statements Prepared statements will automatically be created for any queries where it can be inferred that the query is static. This can be disabled by using the `prepare: false` option. For instance — this is useful when [using PGBouncer in `transaction mode`](https://github.com/porsager/postgres/issues/93#issuecomment-656290493). +**update**: [since 1.21.0](https://www.pgbouncer.org/2023/10/pgbouncer-1-21-0) +PGBouncer supports protocol-level named prepared statements when [configured +properly](https://www.pgbouncer.org/config.html#max_prepared_statements) + ## Custom Types You can add ergonomic support for custom types, or simply use `sql.typed(value, type)` inline, where type is the PostgreSQL `oid` for the type and the correctly serialized string. _(`oid` values for types can be found in the `pg_catalog.pg_type` table.)_ @@ -1094,7 +1193,7 @@ const sql = postgres({ }) // Now you can use sql.typed.rect() as specified above -const [custom] = sql` +const [custom] = await sql` insert into rectangles ( name, rect @@ -1124,8 +1223,8 @@ const sql = postgres({ const ssh = new ssh2.Client() ssh .on('error', reject) - .on('ready', () => - ssh.forwardOut('127.0.0.1', 12345, host, port, + .on('ready', () => + ssh.forwardOut('127.0.0.1', 12345, host, port, (err, socket) => err ? reject(err) : resolve(socket) ) ) @@ -1151,6 +1250,22 @@ prexit(async () => { }) ``` +## Reserving connections + +### `await sql.reserve()` + +The `reserve` method pulls out a connection from the pool, and returns a client that wraps the single connection. This can be used for running queries on an isolated connection. + +```ts +const reserved = await sql.reserve() +await reserved`select * from users` +await reserved.release() +``` + +### `reserved.release()` + +Once you have finished with the reserved connection, call `release` to add it back to the pool. + ## Error handling Errors are all thrown to related queries and never globally. Errors coming from database itself are always in the [native Postgres format](https://www.postgresql.org/docs/current/errcodes-appendix.html), and the same goes for any [Node.js errors](https://nodejs.org/api/errors.html#errors_common_system_errors) eg. coming from the underlying connection. @@ -1211,8 +1326,8 @@ This error is thrown if the user has called [`sql.end()`](#teardown--cleanup) an This error is thrown for any queries that were pending when the timeout to [`sql.end({ timeout: X })`](#teardown--cleanup) was reached. -##### CONNECTION_CONNECT_TIMEOUT -> write CONNECTION_CONNECT_TIMEOUT host:port +##### CONNECT_TIMEOUT +> write CONNECT_TIMEOUT host:port This error is thrown if the startup phase of the connection (tcp, protocol negotiation, and auth) took more than the default 30 seconds or what was specified using `connect_timeout` or `PGCONNECT_TIMEOUT`. diff --git a/cf/polyfills.js b/cf/polyfills.js index f7809003..53c5203d 100644 --- a/cf/polyfills.js +++ b/cf/polyfills.js @@ -47,12 +47,25 @@ export const crypto = { ), createHash: type => ({ update: x => ({ - digest: () => { - if (type !== 'sha256') - throw Error('createHash only supports sha256 in this environment.') - if (!(x instanceof Uint8Array)) + digest: encoding => { + if (!(x instanceof Uint8Array)) { x = textEncoder.encode(x) - return Crypto.subtle.digest('SHA-256', x) + } + let prom + if (type === 'sha256') { + prom = Crypto.subtle.digest('SHA-256', x) + } else if (type === 'md5') { + prom = Crypto.subtle.digest('md5', x) + } else { + throw Error('createHash only supports sha256 or md5 in this environment, not ${type}.') + } + if (encoding === 'hex') { + return prom.then((arrayBuf) => Buffer.from(arrayBuf).toString('hex')) + } else if (encoding) { + throw Error(`createHash only supports hex encoding or unencoded in this environment, not ${encoding}`) + } else { + return prom + } } }) }), diff --git a/cf/src/connection.js b/cf/src/connection.js index 3803c8eb..203af80d 100644 --- a/cf/src/connection.js +++ b/cf/src/connection.js @@ -131,7 +131,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose try { x = options.socket ? (await Promise.resolve(options.socket(options))) - : net.Socket() + : new net.Socket() } catch (e) { error(e) return @@ -295,7 +295,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose if (incomings) { incomings.push(x) remaining -= x.length - if (remaining >= 0) + if (remaining > 0) return } @@ -387,13 +387,20 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function queryError(query, err) { - query.reject(Object.create(err, { + if (query.reserve) + return query.reject(err) + + if (!err || typeof err !== 'object') + err = new Error(err) + + 'query' in err || 'parameters' in err || Object.defineProperties(err, { stack: { value: err.stack + query.origin.replace(/.*\n/, '\n'), enumerable: options.debug }, query: { value: query.string, enumerable: options.debug }, parameters: { value: query.parameters, enumerable: options.debug }, args: { value: query.args, enumerable: options.debug }, types: { value: query.statement && query.statement.types, enumerable: options.debug } - })) + }) + query.reject(err) } function end() { @@ -430,10 +437,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose lifeTimer.cancel() connectTimer.cancel() - if (socket.encrypted) { - socket.removeAllListeners() - socket = null - } + socket.removeAllListeners() + socket = null if (initial) return reconnect() @@ -442,7 +447,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose closedDate = performance.now() hadError && options.shared.retries++ delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000 - onclose(connection) + onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket)) } /* Handlers */ @@ -534,11 +539,14 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose return terminate() } - if (needsTypes) + if (needsTypes) { + initial.reserve && (initial = null) return fetchArrayTypes() + } - execute(initial) - options.shared.retries = retries = initial = 0 + initial && !initial.reserve && execute(initial) + options.shared.retries = retries = 0 + initial = null return } @@ -658,27 +666,30 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose /* c8 ignore next 5 */ async function AuthenticationCleartextPassword() { + const payload = await Pass() write( - b().p().str(await Pass()).z(1).end() + b().p().str(payload).z(1).end() ) } async function AuthenticationMD5Password(x) { - write( - b().p().str( - 'md5' + - (await md5(Buffer.concat([ + const payload = 'md5' + ( + await md5( + Buffer.concat([ Buffer.from(await md5((await Pass()) + user)), x.subarray(9) - ]))) - ).z(1).end() + ]) + ) + ) + write( + b().p().str(payload).z(1).end() ) } async function SASL() { + nonce = (await crypto.randomBytes(18)).toString('base64') b().p().str('SCRAM-SHA-256' + b.N) const i = b.i - nonce = (await crypto.randomBytes(18)).toString('base64') write(b.inc(4).str('n,,n=*,r=' + nonce).i32(b.i - i - 4, i).end()) } @@ -700,12 +711,12 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose serverSignature = (await hmac(await hmac(saltedPassword, 'Server Key'), auth)).toString('base64') + const payload = 'c=biws,r=' + res.r + ',p=' + xor( + clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) + ).toString('base64') + write( - b().p().str( - 'c=biws,r=' + res.r + ',p=' + xor( - clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) - ).toString('base64') - ).end() + b().p().str(payload).end() ) } @@ -785,7 +796,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose const error = Errors.postgres(parseError(x)) query && query.retried ? errored(query.retried) - : query && retryRoutines.has(error.routine) + : query && query.prepared && retryRoutines.has(error.routine) ? retry(query, error) : errored(error) } diff --git a/cf/src/index.js b/cf/src/index.js index da4df290..3ffb7e65 100644 --- a/cf/src/index.js +++ b/cf/src/index.js @@ -202,17 +202,18 @@ function Postgres(a, b) { } async function reserve() { - const q = Queue() + const queue = Queue() const c = open.length ? open.shift() - : await new Promise(r => { - queries.push({ reserve: r }) - closed.length && connect(closed.shift()) + : await new Promise((resolve, reject) => { + const query = { reserve: resolve, reject } + queries.push(query) + closed.length && connect(closed.shift(), query) }) move(c, reserved) - c.reserved = () => q.length - ? c.execute(q.shift()) + c.reserved = () => queue.length + ? c.execute(queue.shift()) : move(c, reserved) c.reserved.release = true @@ -226,7 +227,7 @@ function Postgres(a, b) { function handler(q) { c.queue === full - ? q.push(q) + ? queue.push(q) : c.execute(q) || move(c, full) } } @@ -240,7 +241,10 @@ function Postgres(a, b) { try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() - return await scope(connection, fn) + return await Promise.race([ + scope(connection, fn), + new Promise((_, reject) => connection.onclose = reject) + ]) } catch (error) { throw error } @@ -415,9 +419,10 @@ function Postgres(a, b) { : move(c, full) } - function onclose(c) { + function onclose(c, e) { move(c, closed) c.reserved = null + c.onclose && (c.onclose(e), c.onclose = null) options.onclose && options.onclose(c.id) queries.length && connect(c, queries.shift()) } @@ -428,7 +433,7 @@ function parseOptions(a, b) { return a const env = process.env // eslint-disable-line - , o = (typeof a === 'string' ? b : a) || {} + , o = (!a || typeof a === 'string' ? b : a) || {} , { url, multihost } = parseUrl(a) , query = [...url.searchParams].reduce((a, [b, c]) => (a[b] = c, a), {}) , host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost' @@ -438,6 +443,7 @@ function parseOptions(a, b) { o.no_prepare && (o.prepare = false) query.sslmode && (query.ssl = query.sslmode, delete query.sslmode) 'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line + query.sslrootcert === 'system' && (query.ssl = 'verify-full') const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive'] const defaults = { @@ -476,7 +482,7 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', + application_name: env.PGAPPNAME || 'postgres.js', ...o.connection, ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, @@ -529,7 +535,7 @@ function parseTransform(x) { } function parseUrl(url) { - if (typeof url !== 'string') + if (!url || typeof url !== 'string') return { url: { searchParams: new Map() } } let host = url diff --git a/cf/src/query.js b/cf/src/query.js index 848f3b88..0d44a15c 100644 --- a/cf/src/query.js +++ b/cf/src/query.js @@ -37,13 +37,12 @@ export class Query extends Promise { } get origin() { - return this.handler.debug + return (this.handler.debug ? this[originError].stack - : this.tagged - ? originStackCache.has(this.strings) - ? originStackCache.get(this.strings) - : originStackCache.set(this.strings, this[originError].stack).get(this.strings) - : '' + : this.tagged && originStackCache.has(this.strings) + ? originStackCache.get(this.strings) + : originStackCache.set(this.strings, this[originError].stack).get(this.strings) + ) || '' } static get [Symbol.species]() { diff --git a/cf/src/subscribe.js b/cf/src/subscribe.js index 1ab8b0be..8716100e 100644 --- a/cf/src/subscribe.js +++ b/cf/src/subscribe.js @@ -48,7 +48,7 @@ export default function Subscribe(postgres, options) { return subscribe - async function subscribe(event, fn, onsubscribe = noop) { + async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { event = parseEvent(event) if (!connection) @@ -67,6 +67,7 @@ export default function Subscribe(postgres, options) { return connection.then(x => { connected(x) onsubscribe() + stream && stream.on('error', onerror) return { unsubscribe, state, sql } }) } @@ -104,14 +105,16 @@ export default function Subscribe(postgres, options) { return { stream, state: xs.state } function error(e) { - console.error('Unexpected error during logical streaming - reconnecting', e) + console.error('Unexpected error during logical streaming - reconnecting', e) // eslint-disable-line } function data(x) { - if (x[0] === 0x77) + if (x[0] === 0x77) { parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - else if (x[0] === 0x6b && x[17]) + } else if (x[0] === 0x6b && x[17]) { + state.lsn = x.subarray(1, 9) pong() + } } function handle(a, b) { diff --git a/cjs/src/connection.js b/cjs/src/connection.js index fc97a19b..589d3638 100644 --- a/cjs/src/connection.js +++ b/cjs/src/connection.js @@ -129,7 +129,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose try { x = options.socket ? (await Promise.resolve(options.socket(options))) - : net.Socket() + : new net.Socket() } catch (e) { error(e) return @@ -293,7 +293,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose if (incomings) { incomings.push(x) remaining -= x.length - if (remaining >= 0) + if (remaining > 0) return } @@ -385,13 +385,20 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function queryError(query, err) { - query.reject(Object.create(err, { + if (query.reserve) + return query.reject(err) + + if (!err || typeof err !== 'object') + err = new Error(err) + + 'query' in err || 'parameters' in err || Object.defineProperties(err, { stack: { value: err.stack + query.origin.replace(/.*\n/, '\n'), enumerable: options.debug }, query: { value: query.string, enumerable: options.debug }, parameters: { value: query.parameters, enumerable: options.debug }, args: { value: query.args, enumerable: options.debug }, types: { value: query.statement && query.statement.types, enumerable: options.debug } - })) + }) + query.reject(err) } function end() { @@ -428,10 +435,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose lifeTimer.cancel() connectTimer.cancel() - if (socket.encrypted) { - socket.removeAllListeners() - socket = null - } + socket.removeAllListeners() + socket = null if (initial) return reconnect() @@ -440,7 +445,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose closedDate = performance.now() hadError && options.shared.retries++ delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000 - onclose(connection) + onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket)) } /* Handlers */ @@ -532,11 +537,14 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose return terminate() } - if (needsTypes) + if (needsTypes) { + initial.reserve && (initial = null) return fetchArrayTypes() + } - execute(initial) - options.shared.retries = retries = initial = 0 + initial && !initial.reserve && execute(initial) + options.shared.retries = retries = 0 + initial = null return } @@ -656,27 +664,30 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose /* c8 ignore next 5 */ async function AuthenticationCleartextPassword() { + const payload = await Pass() write( - b().p().str(await Pass()).z(1).end() + b().p().str(payload).z(1).end() ) } async function AuthenticationMD5Password(x) { - write( - b().p().str( - 'md5' + - (await md5(Buffer.concat([ + const payload = 'md5' + ( + await md5( + Buffer.concat([ Buffer.from(await md5((await Pass()) + user)), x.subarray(9) - ]))) - ).z(1).end() + ]) + ) + ) + write( + b().p().str(payload).z(1).end() ) } async function SASL() { + nonce = (await crypto.randomBytes(18)).toString('base64') b().p().str('SCRAM-SHA-256' + b.N) const i = b.i - nonce = (await crypto.randomBytes(18)).toString('base64') write(b.inc(4).str('n,,n=*,r=' + nonce).i32(b.i - i - 4, i).end()) } @@ -698,12 +709,12 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose serverSignature = (await hmac(await hmac(saltedPassword, 'Server Key'), auth)).toString('base64') + const payload = 'c=biws,r=' + res.r + ',p=' + xor( + clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) + ).toString('base64') + write( - b().p().str( - 'c=biws,r=' + res.r + ',p=' + xor( - clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) - ).toString('base64') - ).end() + b().p().str(payload).end() ) } @@ -783,7 +794,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose const error = Errors.postgres(parseError(x)) query && query.retried ? errored(query.retried) - : query && retryRoutines.has(error.routine) + : query && query.prepared && retryRoutines.has(error.routine) ? retry(query, error) : errored(error) } diff --git a/cjs/src/index.js b/cjs/src/index.js index d022b976..baf7e60a 100644 --- a/cjs/src/index.js +++ b/cjs/src/index.js @@ -201,17 +201,18 @@ function Postgres(a, b) { } async function reserve() { - const q = Queue() + const queue = Queue() const c = open.length ? open.shift() - : await new Promise(r => { - queries.push({ reserve: r }) - closed.length && connect(closed.shift()) + : await new Promise((resolve, reject) => { + const query = { reserve: resolve, reject } + queries.push(query) + closed.length && connect(closed.shift(), query) }) move(c, reserved) - c.reserved = () => q.length - ? c.execute(q.shift()) + c.reserved = () => queue.length + ? c.execute(queue.shift()) : move(c, reserved) c.reserved.release = true @@ -225,7 +226,7 @@ function Postgres(a, b) { function handler(q) { c.queue === full - ? q.push(q) + ? queue.push(q) : c.execute(q) || move(c, full) } } @@ -239,7 +240,10 @@ function Postgres(a, b) { try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() - return await scope(connection, fn) + return await Promise.race([ + scope(connection, fn), + new Promise((_, reject) => connection.onclose = reject) + ]) } catch (error) { throw error } @@ -414,9 +418,10 @@ function Postgres(a, b) { : move(c, full) } - function onclose(c) { + function onclose(c, e) { move(c, closed) c.reserved = null + c.onclose && (c.onclose(e), c.onclose = null) options.onclose && options.onclose(c.id) queries.length && connect(c, queries.shift()) } @@ -427,7 +432,7 @@ function parseOptions(a, b) { return a const env = process.env // eslint-disable-line - , o = (typeof a === 'string' ? b : a) || {} + , o = (!a || typeof a === 'string' ? b : a) || {} , { url, multihost } = parseUrl(a) , query = [...url.searchParams].reduce((a, [b, c]) => (a[b] = c, a), {}) , host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost' @@ -437,6 +442,7 @@ function parseOptions(a, b) { o.no_prepare && (o.prepare = false) query.sslmode && (query.ssl = query.sslmode, delete query.sslmode) 'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line + query.sslrootcert === 'system' && (query.ssl = 'verify-full') const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive'] const defaults = { @@ -475,7 +481,7 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', + application_name: env.PGAPPNAME || 'postgres.js', ...o.connection, ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, @@ -528,7 +534,7 @@ function parseTransform(x) { } function parseUrl(url) { - if (typeof url !== 'string') + if (!url || typeof url !== 'string') return { url: { searchParams: new Map() } } let host = url diff --git a/cjs/src/query.js b/cjs/src/query.js index 7246c5f3..45327f2f 100644 --- a/cjs/src/query.js +++ b/cjs/src/query.js @@ -37,13 +37,12 @@ const Query = module.exports.Query = class Query extends Promise { } get origin() { - return this.handler.debug + return (this.handler.debug ? this[originError].stack - : this.tagged - ? originStackCache.has(this.strings) - ? originStackCache.get(this.strings) - : originStackCache.set(this.strings, this[originError].stack).get(this.strings) - : '' + : this.tagged && originStackCache.has(this.strings) + ? originStackCache.get(this.strings) + : originStackCache.set(this.strings, this[originError].stack).get(this.strings) + ) || '' } static get [Symbol.species]() { diff --git a/cjs/src/subscribe.js b/cjs/src/subscribe.js index 34d99e9f..6aaa8962 100644 --- a/cjs/src/subscribe.js +++ b/cjs/src/subscribe.js @@ -47,7 +47,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) { return subscribe - async function subscribe(event, fn, onsubscribe = noop) { + async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { event = parseEvent(event) if (!connection) @@ -66,6 +66,7 @@ module.exports = Subscribe;function Subscribe(postgres, options) { return connection.then(x => { connected(x) onsubscribe() + stream && stream.on('error', onerror) return { unsubscribe, state, sql } }) } @@ -103,14 +104,16 @@ module.exports = Subscribe;function Subscribe(postgres, options) { return { stream, state: xs.state } function error(e) { - console.error('Unexpected error during logical streaming - reconnecting', e) + console.error('Unexpected error during logical streaming - reconnecting', e) // eslint-disable-line } function data(x) { - if (x[0] === 0x77) + if (x[0] === 0x77) { parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - else if (x[0] === 0x6b && x[17]) + } else if (x[0] === 0x6b && x[17]) { + state.lsn = x.subarray(1, 9) pong() + } } function handle(a, b) { diff --git a/cjs/tests/bootstrap.js b/cjs/tests/bootstrap.js index 0ff56fbb..2106f0f8 100644 --- a/cjs/tests/bootstrap.js +++ b/cjs/tests/bootstrap.js @@ -1,15 +1,19 @@ const { spawnSync } = require('child_process') +exec('dropdb', ['postgres_js_test']) + exec('psql', ['-c', 'alter system set ssl=on']) +exec('psql', ['-c', 'drop user postgres_js_test']) exec('psql', ['-c', 'create user postgres_js_test']) exec('psql', ['-c', 'alter system set password_encryption=md5']) exec('psql', ['-c', 'select pg_reload_conf()']) +exec('psql', ['-c', 'drop user if exists postgres_js_test_md5']) exec('psql', ['-c', 'create user postgres_js_test_md5 with password \'postgres_js_test_md5\'']) exec('psql', ['-c', 'alter system set password_encryption=\'scram-sha-256\'']) exec('psql', ['-c', 'select pg_reload_conf()']) +exec('psql', ['-c', 'drop user if exists postgres_js_test_scram']) exec('psql', ['-c', 'create user postgres_js_test_scram with password \'postgres_js_test_scram\'']) -exec('dropdb', ['postgres_js_test']) exec('createdb', ['postgres_js_test']) exec('psql', ['-c', 'grant all on database postgres_js_test to postgres_js_test']) exec('psql', ['-c', 'alter database postgres_js_test owner to postgres_js_test']) diff --git a/cjs/tests/index.js b/cjs/tests/index.js index a8828d55..ec5222f7 100644 --- a/cjs/tests/index.js +++ b/cjs/tests/index.js @@ -429,6 +429,30 @@ t('Reconnect using SSL', { timeout: 2 }, async() => { return [1, (await sql`select 1 as x`)[0].x] }) +t('Proper handling of non object Errors', async() => { + const sql = postgres({ socket: () => { throw 'wat' } }) // eslint-disable-line + + return [ + 'wat', await sql`select 1 as x`.catch(e => e.message) + ] +}) + +t('Proper handling of null Errors', async() => { + const sql = postgres({ socket: () => { throw null } }) // eslint-disable-line + + return [ + 'null', await sql`select 1 as x`.catch(e => e.message) + ] +}) + +t('Ensure reserve on connection throws proper error', async() => { + const sql = postgres({ socket: () => { throw 'wat' }, idle_timeout }) // eslint-disable-line + + return [ + 'wat', await sql.reserve().catch(e => e) + ] +}) + t('Login without password', async() => { return [true, (await postgres({ ...options, ...login })`select true as x`)[0].x] }) @@ -1789,6 +1813,32 @@ t('Recreate prepared statements on RevalidateCachedQuery error', async() => { ] }) +t('Properly throws routine error on not prepared statements', async() => { + await sql`create table x (x text[])` + const { routine } = await sql.unsafe(` + insert into x(x) values (('a', 'b')) + `).catch(e => e) + + return ['transformAssignedExpr', routine, await sql`drop table x`] +}) + +t('Properly throws routine error on not prepared statements in transaction', async() => { + const { routine } = await sql.begin(sql => [ + sql`create table x (x text[])`, + sql`insert into x(x) values (('a', 'b'))` + ]).catch(e => e) + + return ['transformAssignedExpr', routine] +}) + +t('Properly throws routine error on not prepared statements using file', async() => { + const { routine } = await sql.unsafe(` + create table x (x text[]); + insert into x(x) values (('a', 'b')); + `, { prepare: true }).catch(e => e) + + return ['transformAssignedExpr', routine] +}) t('Catches connection config errors', async() => { const sql = postgres({ ...options, user: { toString: () => { throw new Error('wat') } }, database: 'prut' }) @@ -2134,7 +2184,7 @@ t('Execute', async() => { t('Cancel running query', async() => { const query = sql`select pg_sleep(2)` - setTimeout(() => query.cancel(), 200) + setTimeout(() => query.cancel(), 500) const error = await query.catch(x => x) return ['57014', error.code] }) @@ -2348,11 +2398,22 @@ t('Ensure reconnect after max_lifetime with transactions', { timeout: 5 }, async return [true, true] }) + +t('Ensure transactions throw if connection is closed dwhile there is no query', async() => { + const sql = postgres(options) + const x = await sql.begin(async() => { + setTimeout(() => sql.end({ timeout: 0 }), 10) + await new Promise(r => setTimeout(r, 200)) + return sql`select 1` + }).catch(x => x) + return ['CONNECTION_CLOSED', x.code] +}) + t('Custom socket', {}, async() => { let result const sql = postgres({ socket: () => new Promise((resolve, reject) => { - const socket = net.Socket() + const socket = new net.Socket() socket.connect(5432) socket.once('data', x => result = x[0]) socket.on('error', reject) @@ -2532,3 +2593,24 @@ t('reserve connection', async() => { xs.map(x => x.x).join('') ] }) + +t('arrays in reserved connection', async() => { + const reserved = await sql.reserve() + const [{ x }] = await reserved`select array[1, 2, 3] as x` + reserved.release() + + return [ + '123', + x.join('') + ] +}) + +t('Ensure reserve on query throws proper error', async() => { + const sql = postgres({ idle_timeout }) // eslint-disable-line + const reserved = await sql.reserve() + const [{ x }] = await reserved`select 'wat' as x` + + return [ + 'wat', x, reserved.release() + ] +}) diff --git a/deno/README.md b/deno/README.md index f599a18f..b6ec85b7 100644 --- a/deno/README.md +++ b/deno/README.md @@ -58,6 +58,14 @@ async function insertUser({ name, age }) { } ``` +#### ESM dynamic imports + +The library can be used with ESM dynamic imports as well as shown here. + +```js +const { default: postgres } = await import('postgres') +``` + ## Table of Contents * [Connection](#connection) @@ -75,6 +83,7 @@ async function insertUser({ name, age }) { * [Teardown / Cleanup](#teardown--cleanup) * [Error handling](#error-handling) * [TypeScript support](#typescript-support) +* [Reserving connections](#reserving-connections) * [Changelog](./CHANGELOG.md) @@ -153,7 +162,7 @@ const users = await sql` ```js const columns = ['name', 'age'] -sql` +await sql` select ${ sql(columns) } from users @@ -171,7 +180,7 @@ const user = { age: 68 } -sql` +await sql` insert into users ${ sql(user, 'name', 'age') } @@ -179,6 +188,15 @@ sql` // Which results in: insert into users ("name", "age") values ($1, $2) + +// The columns can also be given with an array +const columns = ['name', 'age'] + +await sql` + insert into users ${ + sql(user, columns) + } +` ``` **You can omit column names and simply execute `sql(user)` to get all the fields from the object as columns**. Be careful not to allow users to supply columns that you do not want to be inserted. @@ -197,13 +215,13 @@ const users = [{ age: 80 }] -sql`insert into users ${ sql(users, 'name', 'age') }` +await sql`insert into users ${ sql(users, 'name', 'age') }` // Is translated to: insert into users ("name", "age") values ($1, $2), ($3, $4) // Here you can also omit column names which will use object keys as columns -sql`insert into users ${ sql(users) }` +await sql`insert into users ${ sql(users) }` // Which results in: insert into users ("name", "age") values ($1, $2), ($3, $4) @@ -218,7 +236,7 @@ const user = { age: 68 } -sql` +await sql` update users set ${ sql(user, 'name', 'age') } @@ -227,18 +245,28 @@ sql` // Which results in: update users set "name" = $1, "age" = $2 where user_id = $3 + +// The columns can also be given with an array +const columns = ['name', 'age'] + +await sql` + update users set ${ + sql(user, columns) + } + where user_id = ${ user.id } +` ``` ### Multiple updates in one query -It's possible to create multiple udpates in a single query. It's necessary to use arrays intead of objects to ensure the order of the items so that these correspond with the column names. +To create multiple updates in a single query, it is necessary to use arrays instead of objects to ensure that the order of the items correspond with the column names. ```js const users = [ [1, 'John', 34], [2, 'Jane', 27], ] -sql` - update users set name = update_data.name, (age = update_data.age)::int +await sql` + update users set name = update_data.name, age = (update_data.age)::int from (values ${sql(users)}) as update_data (id, name, age) where users.id = (update_data.id)::int returning users.id, users.name, users.age @@ -258,7 +286,7 @@ const users = await sql` or ```js -const [{ a, b, c }] => await sql` +const [{ a, b, c }] = await sql` select * from (values ${ sql(['a', 'b', 'c']) }) as x(a, b, c) @@ -276,7 +304,7 @@ const olderThan = x => sql`and age > ${ x }` const filterAge = true -sql` +await sql` select * from users @@ -294,7 +322,7 @@ select * from users where name is not null and age > 50 ### Dynamic filters ```js -sql` +await sql` select * from users ${ @@ -310,12 +338,33 @@ select * from users select * from users where user_id = $1 ``` +### Dynamic ordering + +```js +const id = 1 +const order = { + username: 'asc' + created_at: 'desc' +} +await sql` + select + * + from ticket + where account = ${ id } + order by ${ + Object.entries(order).flatMap(([column, order], i) => + [i ? sql`,` : sql``, sql`${ sql(column) } ${ order === 'desc' ? sql`desc` : sql`asc` }`] + ) + } +` +``` + ### SQL functions Using keywords or calling functions dynamically is also possible by using ``` sql`` ``` fragments. ```js const date = null -sql` +await sql` update users set updated_at = ${ date || sql`now()` } ` @@ -329,7 +378,7 @@ Dynamic identifiers like table names and column names is also supported like so: const table = 'users' , column = 'id' -sql` +await sql` select ${ sql(column) } from ${ sql(table) } ` @@ -343,10 +392,10 @@ Here's a quick oversight over all the ways to do interpolation in a query templa | Interpolation syntax | Usage | Example | | ------------- | ------------- | ------------- | -| `${ sql`` }` | for keywords or sql fragments | ``sql`SELECT * FROM users ${sql`order by age desc` }` `` | -| `${ sql(string) }` | for identifiers | ``sql`SELECT * FROM ${sql('table_name')` `` | -| `${ sql([] or {}, ...) }` | for helpers | ``sql`INSERT INTO users ${sql({ name: 'Peter'})}` `` | -| `${ 'somevalue' }` | for values | ``sql`SELECT * FROM users WHERE age = ${42}` `` | +| `${ sql`` }` | for keywords or sql fragments | ``await sql`SELECT * FROM users ${sql`order by age desc` }` `` | +| `${ sql(string) }` | for identifiers | ``await sql`SELECT * FROM ${sql('table_name')` `` | +| `${ sql([] or {}, ...) }` | for helpers | ``await sql`INSERT INTO users ${sql({ name: 'Peter'})}` `` | +| `${ 'somevalue' }` | for values | ``await sql`SELECT * FROM users WHERE age = ${42}` `` | ## Advanced query methods @@ -426,7 +475,7 @@ await sql` Rather than executing a given query, `.describe` will return information utilized in the query process. This information can include the query identifier, column types, etc. This is useful for debugging and analyzing your Postgres queries. Furthermore, **`.describe` will give you access to the final generated query string that would be executed.** - + ### Rows as Array of Values #### ```sql``.values()``` @@ -453,7 +502,7 @@ const result = await sql.file('query.sql', ['Murray', 68]) ### Multiple statements in one query #### ```await sql``.simple()``` -The postgres wire protocol supports ["simple"](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.4) and ["extended"](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) queries. "simple" queries supports multiple statements, but does not support any dynamic parameters. "extended" queries support parameters but only one statement. To use "simple" queries you can use +The postgres wire protocol supports ["simple"](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.4) and ["extended"](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) queries. "simple" queries supports multiple statements, but does not support any dynamic parameters. "extended" queries support parameters but only one statement. To use "simple" queries you can use ```sql``.simple()```. That will create it as a simple query. ```js @@ -495,8 +544,8 @@ await pipeline(readableStream, createWriteStream('output.tsv')) ```js const readableStream = await sql` copy ( - select name, age - from users + select name, age + from users where age = 68 ) to stdout `.readable() @@ -505,7 +554,7 @@ for await (const chunk of readableStream) { } ``` -> **NOTE** This is a low-level API which does not provide any type safety. To make this work, you must match your [`copy query` parameters](https://www.postgresql.org/docs/14/sql-copy.html) correctly to your [Node.js stream read or write](https://nodejs.org/api/stream.html) code. Ensure [Node.js stream backpressure](https://nodejs.org/en/docs/guides/backpressuring-in-streams/) is handled correctly to avoid memory exhaustion. +> **NOTE** This is a low-level API which does not provide any type safety. To make this work, you must match your [`copy query` parameters](https://www.postgresql.org/docs/14/sql-copy.html) correctly to your [Node.js stream read or write](https://nodejs.org/api/stream.html) code. Ensure [Node.js stream backpressure](https://nodejs.org/en/learn/modules/backpressuring-in-streams) is handled correctly to avoid memory exhaustion. ### Canceling Queries in Progress @@ -535,7 +584,9 @@ If you know what you're doing, you can use `unsafe` to pass any string you'd lik ```js sql.unsafe('select ' + danger + ' from users where id = ' + dragons) ``` - + +By default, `sql.unsafe` assumes the `query` string is sufficiently dynamic that prepared statements do not make sense, and so defaults them to off. If you'd like to re-enable prepared statements, you can pass `{ prepare: true }`. + You can also nest `sql.unsafe` within a safe `sql` expression. This is useful if only part of your fraction has unsafe elements. ```js @@ -575,6 +626,7 @@ const [user, account] = await sql.begin(async sql => { ) values ( 'Murray' ) + returning * ` const [account] = await sql` @@ -583,12 +635,15 @@ const [user, account] = await sql.begin(async sql => { ) values ( ${ user.user_id } ) + returning * ` return [user, account] }) ``` +Do note that you can often achieve the same result using [`WITH` queries (Common Table Expressions)](https://www.postgresql.org/docs/current/queries-with.html) instead of using transactions. + It's also possible to pipeline the requests in a transaction if needed by returning an array with queries from the callback function like this: ```js @@ -634,9 +689,9 @@ sql.begin('read write', async sql => { ``` -#### PREPARE `await sql.prepare([name]) -> fn()` +#### PREPARE TRANSACTION `await sql.prepare([name]) -> fn()` -Indicates that the transactions should be prepared using the `PREPARED TRANASCTION [NAME]` statement +Indicates that the transactions should be prepared using the [`PREPARE TRANSACTION [NAME]`](https://www.postgresql.org/docs/current/sql-prepare-transaction.html) statement instead of being committed. ```js @@ -648,13 +703,11 @@ sql.begin('read write', async sql => { 'Murray' ) ` - + await sql.prepare('tx1') }) ``` -Do note that you can often achieve the same result using [`WITH` queries (Common Table Expressions)](https://www.postgresql.org/docs/current/queries-with.html) instead of using transactions. - ## Data Transformation Postgres.js allows for transformation of the data passed to or returned from a query by using the `transform` option. @@ -710,7 +763,7 @@ console.log(data) // [ { a_test: 1 } ] ### Transform `undefined` Values -By default, Postgres.js will throw the error `UNDEFINED_VALUE: Undefined values are not allowed` when undefined values are passed +By default, Postgres.js will throw the error `UNDEFINED_VALUE: Undefined values are not allowed` when undefined values are passed ```js // Transform the column names to and from camel case @@ -791,7 +844,7 @@ The optional `onlisten` method is great to use for a very simply queue mechanism ```js await sql.listen( - 'jobs', + 'jobs', (x) => run(JSON.parse(x)), ( ) => sql`select unfinished_jobs()`.forEach(run) ) @@ -824,7 +877,7 @@ CREATE PUBLICATION alltables FOR ALL TABLES const sql = postgres({ publications: 'alltables' }) const { unsubscribe } = await sql.subscribe( - 'insert:events', + 'insert:events', (row, { command, relation, key, old }) => { // Callback function for each row change // tell about new event row over eg. websockets or do something else @@ -883,7 +936,7 @@ The `Result` Array returned from queries is a custom array allowing for easy des ### .count -The `count` property is the number of affected rows returned by the database. This is usefull for insert, update and delete operations to know the number of rows since .length will be 0 in these cases if not using `RETURNING ...`. +The `count` property is the number of affected rows returned by the database. This is useful for insert, update and delete operations to know the number of rows since .length will be 0 in these cases if not using `RETURNING ...`. ### .command @@ -937,7 +990,7 @@ const sql = postgres('postgres://username:password@host:port/database', { connect_timeout : 30, // Connect timeout in seconds prepare : true, // Automatic creation of prepared statements types : [], // Array of custom types, see more below - onnotice : fn, // Defaults to console.log + onnotice : fn, // Default console.log, set false to silence NOTICE onparameter : fn, // (key, value) when server param change debug : fn, // Is called with (connection, query, params, types) socket : fn, // fn returning custom socket to use @@ -949,7 +1002,7 @@ const sql = postgres('postgres://username:password@host:port/database', { }, connection : { application_name : 'postgres.js', // Default application_name - ... // Other connection parameters + ... // Other connection parameters, see https://www.postgresql.org/docs/current/runtime-config-client.html }, target_session_attrs : null, // Use 'read-write' with multiple hosts to // ensure only connecting to primary @@ -958,7 +1011,20 @@ const sql = postgres('postgres://username:password@host:port/database', { }) ``` -Note that `max_lifetime = 60 * (30 + Math.random() * 30)` by default. This resolves to an interval between 45 and 90 minutes to optimize for the benefits of prepared statements **and** working nicely with Linux's OOM killer. +Note that `max_lifetime = 60 * (30 + Math.random() * 30)` by default. This resolves to an interval between 30 and 60 minutes to optimize for the benefits of prepared statements **and** working nicely with Linux's OOM killer. + +### Dynamic passwords + +When clients need to use alternative authentication schemes such as access tokens or connections to databases with rotating passwords, provide either a synchronous or asynchronous function that will resolve the dynamic password value at connection time. + +```js +const sql = postgres(url, { + // Other connection config + ... + // Password function for the database user + password : async () => await signer.getAuthToken(), +}) +``` ### SSL @@ -1034,6 +1100,34 @@ const sql = postgres({ }) ``` +### Cloudflare Workers support + +Postgres.js has built-in support for the [TCP socket API](https://developers.cloudflare.com/workers/runtime-apis/tcp-sockets/) in Cloudflare Workers, which is [on-track](https://github.com/wintercg/proposal-sockets-api) to be standardized and adopted in Node.js and other JavaScript runtimes, such as Deno. + +You can use Postgres.js directly in a Worker, or to benefit from connection pooling and query caching, via the [Hyperdrive](https://developers.cloudflare.com/hyperdrive/learning/connect-to-postgres/#driver-examples) service available to Workers by passing the Hyperdrive `connectionString` when creating a new `postgres` client as follows: + +```ts +// Requires Postgres.js 3.4.0 or later +import postgres from 'postgres' + +interface Env { + HYPERDRIVE: Hyperdrive; +} + +export default async fetch(req: Request, env: Env, ctx: ExecutionContext) { + // The Postgres.js library accepts a connection string directly + const sql = postgres(env.HYPERDRIVE.connectionString) + const results = await sql`SELECT * FROM users LIMIT 10` + return Response.json(results) +} +``` + +In `wrangler.toml` you will need to enable the `nodejs_compat` compatibility flag to allow Postgres.js to operate in the Workers environment: + +```toml +compatibility_flags = ["nodejs_compat"] +``` + ### Auto fetching of array types Postgres.js will automatically fetch table/array-type information when it first connects to a database. @@ -1050,20 +1144,25 @@ It is also possible to connect to the database without a connection string or an const sql = postgres() ``` -| Option | Environment Variables | -| ----------------- | ------------------------ | -| `host` | `PGHOST` | -| `port` | `PGPORT` | -| `database` | `PGDATABASE` | -| `username` | `PGUSERNAME` or `PGUSER` | -| `password` | `PGPASSWORD` | -| `idle_timeout` | `PGIDLE_TIMEOUT` | -| `connect_timeout` | `PGCONNECT_TIMEOUT` | +| Option | Environment Variables | +| ------------------ | ------------------------ | +| `host` | `PGHOST` | +| `port` | `PGPORT` | +| `database` | `PGDATABASE` | +| `username` | `PGUSERNAME` or `PGUSER` | +| `password` | `PGPASSWORD` | +| `application_name` | `PGAPPNAME` | +| `idle_timeout` | `PGIDLE_TIMEOUT` | +| `connect_timeout` | `PGCONNECT_TIMEOUT` | ### Prepared statements Prepared statements will automatically be created for any queries where it can be inferred that the query is static. This can be disabled by using the `prepare: false` option. For instance — this is useful when [using PGBouncer in `transaction mode`](https://github.com/porsager/postgres/issues/93#issuecomment-656290493). +**update**: [since 1.21.0](https://www.pgbouncer.org/2023/10/pgbouncer-1-21-0) +PGBouncer supports protocol-level named prepared statements when [configured +properly](https://www.pgbouncer.org/config.html#max_prepared_statements) + ## Custom Types You can add ergonomic support for custom types, or simply use `sql.typed(value, type)` inline, where type is the PostgreSQL `oid` for the type and the correctly serialized string. _(`oid` values for types can be found in the `pg_catalog.pg_type` table.)_ @@ -1090,7 +1189,7 @@ const sql = postgres({ }) // Now you can use sql.typed.rect() as specified above -const [custom] = sql` +const [custom] = await sql` insert into rectangles ( name, rect @@ -1120,8 +1219,8 @@ const sql = postgres({ const ssh = new ssh2.Client() ssh .on('error', reject) - .on('ready', () => - ssh.forwardOut('127.0.0.1', 12345, host, port, + .on('ready', () => + ssh.forwardOut('127.0.0.1', 12345, host, port, (err, socket) => err ? reject(err) : resolve(socket) ) ) @@ -1147,6 +1246,22 @@ prexit(async () => { }) ``` +## Reserving connections + +### `await sql.reserve()` + +The `reserve` method pulls out a connection from the pool, and returns a client that wraps the single connection. This can be used for running queries on an isolated connection. + +```ts +const reserved = await sql.reserve() +await reserved`select * from users` +await reserved.release() +``` + +### `reserved.release()` + +Once you have finished with the reserved connection, call `release` to add it back to the pool. + ## Error handling Errors are all thrown to related queries and never globally. Errors coming from database itself are always in the [native Postgres format](https://www.postgresql.org/docs/current/errcodes-appendix.html), and the same goes for any [Node.js errors](https://nodejs.org/api/errors.html#errors_common_system_errors) eg. coming from the underlying connection. @@ -1207,8 +1322,8 @@ This error is thrown if the user has called [`sql.end()`](#teardown--cleanup) an This error is thrown for any queries that were pending when the timeout to [`sql.end({ timeout: X })`](#teardown--cleanup) was reached. -##### CONNECTION_CONNECT_TIMEOUT -> write CONNECTION_CONNECT_TIMEOUT host:port +##### CONNECT_TIMEOUT +> write CONNECT_TIMEOUT host:port This error is thrown if the startup phase of the connection (tcp, protocol negotiation, and auth) took more than the default 30 seconds or what was specified using `connect_timeout` or `PGCONNECT_TIMEOUT`. diff --git a/deno/polyfills.js b/deno/polyfills.js index 81da6c4c..71ee694d 100644 --- a/deno/polyfills.js +++ b/deno/polyfills.js @@ -5,6 +5,140 @@ import { isIP } from 'https://deno.land/std@0.132.0/node/net.ts' const events = () => ({ data: [], error: [], drain: [], connect: [], secureConnect: [], close: [] }) +class Socket { + constructor() { + return createSocket() + } +} + +function createSocket() { + let paused + , resume + , keepAlive + + const socket = { + error, + success, + readyState: 'open', + setKeepAlive: x => { + keepAlive = x + socket.raw && socket.raw.setKeepAlive && socket.raw.setKeepAlive(x) + }, + connect: (port, hostname) => { + socket.raw = null + socket.readyState = 'connecting' + typeof port === 'string' + ? Deno.connect({ transport: 'unix', path: socket.path = port }).then(success, error) + : Deno.connect({ transport: 'tcp', port: socket.port = port, hostname: socket.hostname = hostname || 'localhost' }).then(success, error) // eslint-disable-line + return socket + }, + pause: () => { + paused = new Promise(r => resume = r) + }, + resume: () => { + resume && resume() + paused = null + }, + isPaused: () => !!paused, + removeAllListeners: () => socket.events = events(), + events: events(), + raw: null, + on: (x, fn) => socket.events[x].push(fn), + once: (x, fn) => { + if (x === 'data') + socket.break = true + const e = socket.events[x] + e.push(once) + once.once = fn + function once(...args) { + fn(...args) + e.indexOf(once) > -1 && e.splice(e.indexOf(once), 1) + } + }, + removeListener: (x, fn) => { + socket.events[x] = socket.events[x].filter(x => x !== fn && x.once !== fn) + }, + write: (x, cb) => { + socket.raw.write(x).then(l => { + l < x.length + ? socket.write(x.slice(l), cb) + : (cb && cb(null)) + }).catch(err => { + cb && cb() + call(socket.events.error, err) + }) + return false + }, + destroy: () => close(), + end: (x) => { + x && socket.write(x) + close() + } + } + + return socket + + async function success(raw) { + if (socket.readyState !== 'connecting') + return raw.close() + + const encrypted = socket.encrypted + socket.raw = raw + keepAlive != null && raw.setKeepAlive && raw.setKeepAlive(keepAlive) + socket.readyState = 'open' + socket.encrypted + ? call(socket.events.secureConnect) + : call(socket.events.connect) + + const b = new Uint8Array(1024) + let result + + try { + while ((result = socket.readyState === 'open' && await raw.read(b))) { + call(socket.events.data, Buffer.from(b.subarray(0, result))) + if (!encrypted && socket.break && (socket.break = false, b[0] === 83)) + return socket.break = false + paused && await paused + } + } catch (e) { + if (e instanceof Deno.errors.BadResource === false) + error(e) + } + + if (!socket.encrypted || encrypted) + closed() + } + + function close() { + try { + socket.raw && socket.raw.close() + } catch (e) { + if (e instanceof Deno.errors.BadResource === false) + call(socket.events.error, e) + } + } + + function closed() { + if (socket.readyState === 'closed') + return + + socket.break = socket.encrypted = false + socket.readyState = 'closed' + call(socket.events.close) + } + + function error(err) { + call(socket.events.error, err) + socket.raw + ? close() + : closed() + } + + function call(xs, x) { + xs.slice().forEach(fn => fn(x)) + } +} + export const net = { isIP, createServer() { @@ -23,133 +157,7 @@ export const net = { } return server }, - Socket() { - let paused - , resume - , keepAlive - - const socket = { - error, - success, - readyState: 'open', - setKeepAlive: x => { - keepAlive = x - socket.raw && socket.raw.setKeepAlive && socket.raw.setKeepAlive(x) - }, - connect: (port, hostname) => { - socket.raw = null - socket.readyState = 'connecting' - typeof port === 'string' - ? Deno.connect({ transport: 'unix', path: socket.path = port }).then(success, error) - : Deno.connect({ transport: 'tcp', port: socket.port = port, hostname: socket.hostname = hostname || 'localhost' }).then(success, error) // eslint-disable-line - return socket - }, - pause: () => { - paused = new Promise(r => resume = r) - }, - resume: () => { - resume && resume() - paused = null - }, - isPaused: () => !!paused, - removeAllListeners: () => socket.events = events(), - events: events(), - raw: null, - on: (x, fn) => socket.events[x].push(fn), - once: (x, fn) => { - if (x === 'data') - socket.break = true - const e = socket.events[x] - e.push(once) - once.once = fn - function once(...args) { - fn(...args) - e.indexOf(once) > -1 && e.splice(e.indexOf(once), 1) - } - }, - removeListener: (x, fn) => { - socket.events[x] = socket.events[x].filter(x => x !== fn && x.once !== fn) - }, - write: (x, cb) => { - socket.raw.write(x).then(l => { - l < x.length - ? socket.write(x.slice(l), cb) - : (cb && cb(null)) - }).catch(err => { - cb && cb() - call(socket.events.error, err) - }) - return false - }, - destroy: () => close(), - end: (x) => { - x && socket.write(x) - close() - } - } - - return socket - - async function success(raw) { - if (socket.readyState !== 'connecting') - return raw.close() - - const encrypted = socket.encrypted - socket.raw = raw - keepAlive != null && raw.setKeepAlive && raw.setKeepAlive(keepAlive) - socket.readyState = 'open' - socket.encrypted - ? call(socket.events.secureConnect) - : call(socket.events.connect) - - const b = new Uint8Array(1024) - let result - - try { - while ((result = socket.readyState === 'open' && await raw.read(b))) { - call(socket.events.data, Buffer.from(b.subarray(0, result))) - if (!encrypted && socket.break && (socket.break = false, b[0] === 83)) - return socket.break = false - paused && await paused - } - } catch (e) { - if (e instanceof Deno.errors.BadResource === false) - error(e) - } - - if (!socket.encrypted || encrypted) - closed() - } - - function close() { - try { - socket.raw && socket.raw.close() - } catch (e) { - if (e instanceof Deno.errors.BadResource === false) - call(socket.events.error, e) - } - } - - function closed() { - if (socket.readyState === 'closed') - return - - socket.break = socket.encrypted = false - socket.readyState = 'closed' - call(socket.events.close) - } - - function error(err) { - call(socket.events.error, err) - socket.raw - ? close() - : closed() - } - - function call(xs, x) { - xs.slice().forEach(fn => fn(x)) - } - } + Socket } export const tls = { diff --git a/deno/src/connection.js b/deno/src/connection.js index 80382577..a3f43c48 100644 --- a/deno/src/connection.js +++ b/deno/src/connection.js @@ -132,7 +132,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose try { x = options.socket ? (await Promise.resolve(options.socket(options))) - : net.Socket() + : new net.Socket() } catch (e) { error(e) return @@ -296,7 +296,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose if (incomings) { incomings.push(x) remaining -= x.length - if (remaining >= 0) + if (remaining > 0) return } @@ -388,13 +388,20 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function queryError(query, err) { - query.reject(Object.create(err, { + if (query.reserve) + return query.reject(err) + + if (!err || typeof err !== 'object') + err = new Error(err) + + 'query' in err || 'parameters' in err || Object.defineProperties(err, { stack: { value: err.stack + query.origin.replace(/.*\n/, '\n'), enumerable: options.debug }, query: { value: query.string, enumerable: options.debug }, parameters: { value: query.parameters, enumerable: options.debug }, args: { value: query.args, enumerable: options.debug }, types: { value: query.statement && query.statement.types, enumerable: options.debug } - })) + }) + query.reject(err) } function end() { @@ -431,10 +438,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose lifeTimer.cancel() connectTimer.cancel() - if (socket.encrypted) { - socket.removeAllListeners() - socket = null - } + socket.removeAllListeners() + socket = null if (initial) return reconnect() @@ -443,7 +448,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose closedDate = performance.now() hadError && options.shared.retries++ delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000 - onclose(connection) + onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket)) } /* Handlers */ @@ -535,11 +540,14 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose return terminate() } - if (needsTypes) + if (needsTypes) { + initial.reserve && (initial = null) return fetchArrayTypes() + } - execute(initial) - options.shared.retries = retries = initial = 0 + initial && !initial.reserve && execute(initial) + options.shared.retries = retries = 0 + initial = null return } @@ -659,27 +667,30 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose /* c8 ignore next 5 */ async function AuthenticationCleartextPassword() { + const payload = await Pass() write( - b().p().str(await Pass()).z(1).end() + b().p().str(payload).z(1).end() ) } async function AuthenticationMD5Password(x) { - write( - b().p().str( - 'md5' + - (await md5(Buffer.concat([ + const payload = 'md5' + ( + await md5( + Buffer.concat([ Buffer.from(await md5((await Pass()) + user)), x.subarray(9) - ]))) - ).z(1).end() + ]) + ) + ) + write( + b().p().str(payload).z(1).end() ) } async function SASL() { + nonce = (await crypto.randomBytes(18)).toString('base64') b().p().str('SCRAM-SHA-256' + b.N) const i = b.i - nonce = (await crypto.randomBytes(18)).toString('base64') write(b.inc(4).str('n,,n=*,r=' + nonce).i32(b.i - i - 4, i).end()) } @@ -701,12 +712,12 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose serverSignature = (await hmac(await hmac(saltedPassword, 'Server Key'), auth)).toString('base64') + const payload = 'c=biws,r=' + res.r + ',p=' + xor( + clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) + ).toString('base64') + write( - b().p().str( - 'c=biws,r=' + res.r + ',p=' + xor( - clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) - ).toString('base64') - ).end() + b().p().str(payload).end() ) } @@ -786,7 +797,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose const error = Errors.postgres(parseError(x)) query && query.retried ? errored(query.retried) - : query && retryRoutines.has(error.routine) + : query && query.prepared && retryRoutines.has(error.routine) ? retry(query, error) : errored(error) } diff --git a/deno/src/index.js b/deno/src/index.js index a871e0f1..aa7a920f 100644 --- a/deno/src/index.js +++ b/deno/src/index.js @@ -202,17 +202,18 @@ function Postgres(a, b) { } async function reserve() { - const q = Queue() + const queue = Queue() const c = open.length ? open.shift() - : await new Promise(r => { - queries.push({ reserve: r }) - closed.length && connect(closed.shift()) + : await new Promise((resolve, reject) => { + const query = { reserve: resolve, reject } + queries.push(query) + closed.length && connect(closed.shift(), query) }) move(c, reserved) - c.reserved = () => q.length - ? c.execute(q.shift()) + c.reserved = () => queue.length + ? c.execute(queue.shift()) : move(c, reserved) c.reserved.release = true @@ -226,7 +227,7 @@ function Postgres(a, b) { function handler(q) { c.queue === full - ? q.push(q) + ? queue.push(q) : c.execute(q) || move(c, full) } } @@ -240,7 +241,10 @@ function Postgres(a, b) { try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() - return await scope(connection, fn) + return await Promise.race([ + scope(connection, fn), + new Promise((_, reject) => connection.onclose = reject) + ]) } catch (error) { throw error } @@ -415,9 +419,10 @@ function Postgres(a, b) { : move(c, full) } - function onclose(c) { + function onclose(c, e) { move(c, closed) c.reserved = null + c.onclose && (c.onclose(e), c.onclose = null) options.onclose && options.onclose(c.id) queries.length && connect(c, queries.shift()) } @@ -428,7 +433,7 @@ function parseOptions(a, b) { return a const env = process.env // eslint-disable-line - , o = (typeof a === 'string' ? b : a) || {} + , o = (!a || typeof a === 'string' ? b : a) || {} , { url, multihost } = parseUrl(a) , query = [...url.searchParams].reduce((a, [b, c]) => (a[b] = c, a), {}) , host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost' @@ -438,6 +443,7 @@ function parseOptions(a, b) { o.no_prepare && (o.prepare = false) query.sslmode && (query.ssl = query.sslmode, delete query.sslmode) 'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line + query.sslrootcert === 'system' && (query.ssl = 'verify-full') const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive'] const defaults = { @@ -476,8 +482,8 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', ...o.connection, + application_name: o.connection?.application_name ?? env.PGAPPNAME ?? 'postgres.js', ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, types : o.types || {}, @@ -529,7 +535,7 @@ function parseTransform(x) { } function parseUrl(url) { - if (typeof url !== 'string') + if (!url || typeof url !== 'string') return { url: { searchParams: new Map() } } let host = url diff --git a/deno/src/query.js b/deno/src/query.js index 848f3b88..0d44a15c 100644 --- a/deno/src/query.js +++ b/deno/src/query.js @@ -37,13 +37,12 @@ export class Query extends Promise { } get origin() { - return this.handler.debug + return (this.handler.debug ? this[originError].stack - : this.tagged - ? originStackCache.has(this.strings) - ? originStackCache.get(this.strings) - : originStackCache.set(this.strings, this[originError].stack).get(this.strings) - : '' + : this.tagged && originStackCache.has(this.strings) + ? originStackCache.get(this.strings) + : originStackCache.set(this.strings, this[originError].stack).get(this.strings) + ) || '' } static get [Symbol.species]() { diff --git a/deno/src/subscribe.js b/deno/src/subscribe.js index dbb9b971..b20efb96 100644 --- a/deno/src/subscribe.js +++ b/deno/src/subscribe.js @@ -48,7 +48,7 @@ export default function Subscribe(postgres, options) { return subscribe - async function subscribe(event, fn, onsubscribe = noop) { + async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { event = parseEvent(event) if (!connection) @@ -67,6 +67,7 @@ export default function Subscribe(postgres, options) { return connection.then(x => { connected(x) onsubscribe() + stream && stream.on('error', onerror) return { unsubscribe, state, sql } }) } @@ -104,14 +105,16 @@ export default function Subscribe(postgres, options) { return { stream, state: xs.state } function error(e) { - console.error('Unexpected error during logical streaming - reconnecting', e) + console.error('Unexpected error during logical streaming - reconnecting', e) // eslint-disable-line } function data(x) { - if (x[0] === 0x77) + if (x[0] === 0x77) { parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - else if (x[0] === 0x6b && x[17]) + } else if (x[0] === 0x6b && x[17]) { + state.lsn = x.subarray(1, 9) pong() + } } function handle(a, b) { diff --git a/deno/tests/bootstrap.js b/deno/tests/bootstrap.js index 699b54bf..da416896 100644 --- a/deno/tests/bootstrap.js +++ b/deno/tests/bootstrap.js @@ -1,15 +1,19 @@ import { spawn } from 'https://deno.land/std@0.132.0/node/child_process.ts' +await exec('dropdb', ['postgres_js_test']) + await exec('psql', ['-c', 'alter system set ssl=on']) +await exec('psql', ['-c', 'drop user postgres_js_test']) await exec('psql', ['-c', 'create user postgres_js_test']) await exec('psql', ['-c', 'alter system set password_encryption=md5']) await exec('psql', ['-c', 'select pg_reload_conf()']) +await exec('psql', ['-c', 'drop user if exists postgres_js_test_md5']) await exec('psql', ['-c', 'create user postgres_js_test_md5 with password \'postgres_js_test_md5\'']) await exec('psql', ['-c', 'alter system set password_encryption=\'scram-sha-256\'']) await exec('psql', ['-c', 'select pg_reload_conf()']) +await exec('psql', ['-c', 'drop user if exists postgres_js_test_scram']) await exec('psql', ['-c', 'create user postgres_js_test_scram with password \'postgres_js_test_scram\'']) -await exec('dropdb', ['postgres_js_test']) await exec('createdb', ['postgres_js_test']) await exec('psql', ['-c', 'grant all on database postgres_js_test to postgres_js_test']) await exec('psql', ['-c', 'alter database postgres_js_test owner to postgres_js_test']) diff --git a/deno/tests/index.js b/deno/tests/index.js index 210a9f9b..adedf1e0 100644 --- a/deno/tests/index.js +++ b/deno/tests/index.js @@ -431,6 +431,30 @@ t('Reconnect using SSL', { timeout: 2 }, async() => { return [1, (await sql`select 1 as x`)[0].x] }) +t('Proper handling of non object Errors', async() => { + const sql = postgres({ socket: () => { throw 'wat' } }) // eslint-disable-line + + return [ + 'wat', await sql`select 1 as x`.catch(e => e.message) + ] +}) + +t('Proper handling of null Errors', async() => { + const sql = postgres({ socket: () => { throw null } }) // eslint-disable-line + + return [ + 'null', await sql`select 1 as x`.catch(e => e.message) + ] +}) + +t('Ensure reserve on connection throws proper error', async() => { + const sql = postgres({ socket: () => { throw 'wat' }, idle_timeout }) // eslint-disable-line + + return [ + 'wat', await sql.reserve().catch(e => e) + ] +}) + t('Login without password', async() => { return [true, (await postgres({ ...options, ...login })`select true as x`)[0].x] }) @@ -1791,6 +1815,32 @@ t('Recreate prepared statements on RevalidateCachedQuery error', async() => { ] }) +t('Properly throws routine error on not prepared statements', async() => { + await sql`create table x (x text[])` + const { routine } = await sql.unsafe(` + insert into x(x) values (('a', 'b')) + `).catch(e => e) + + return ['transformAssignedExpr', routine, await sql`drop table x`] +}) + +t('Properly throws routine error on not prepared statements in transaction', async() => { + const { routine } = await sql.begin(sql => [ + sql`create table x (x text[])`, + sql`insert into x(x) values (('a', 'b'))` + ]).catch(e => e) + + return ['transformAssignedExpr', routine] +}) + +t('Properly throws routine error on not prepared statements using file', async() => { + const { routine } = await sql.unsafe(` + create table x (x text[]); + insert into x(x) values (('a', 'b')); + `, { prepare: true }).catch(e => e) + + return ['transformAssignedExpr', routine] +}) t('Catches connection config errors', async() => { const sql = postgres({ ...options, user: { toString: () => { throw new Error('wat') } }, database: 'prut' }) @@ -2136,7 +2186,7 @@ t('Execute', async() => { t('Cancel running query', async() => { const query = sql`select pg_sleep(2)` - setTimeout(() => query.cancel(), 200) + setTimeout(() => query.cancel(), 500) const error = await query.catch(x => x) return ['57014', error.code] }) @@ -2350,11 +2400,22 @@ t('Ensure reconnect after max_lifetime with transactions', { timeout: 5 }, async return [true, true] }) + +t('Ensure transactions throw if connection is closed dwhile there is no query', async() => { + const sql = postgres(options) + const x = await sql.begin(async() => { + setTimeout(() => sql.end({ timeout: 0 }), 10) + await new Promise(r => setTimeout(r, 200)) + return sql`select 1` + }).catch(x => x) + return ['CONNECTION_CLOSED', x.code] +}) + t('Custom socket', {}, async() => { let result const sql = postgres({ socket: () => new Promise((resolve, reject) => { - const socket = net.Socket() + const socket = new net.Socket() socket.connect(5432) socket.once('data', x => result = x[0]) socket.on('error', reject) @@ -2535,4 +2596,25 @@ t('reserve connection', async() => { ] }) -;window.addEventListener("unload", () => Deno.exit(process.exitCode)) \ No newline at end of file +t('arrays in reserved connection', async() => { + const reserved = await sql.reserve() + const [{ x }] = await reserved`select array[1, 2, 3] as x` + reserved.release() + + return [ + '123', + x.join('') + ] +}) + +t('Ensure reserve on query throws proper error', async() => { + const sql = postgres({ idle_timeout }) // eslint-disable-line + const reserved = await sql.reserve() + const [{ x }] = await reserved`select 'wat' as x` + + return [ + 'wat', x, reserved.release() + ] +}) + +;globalThis.addEventListener("unload", () => Deno.exit(process.exitCode)) \ No newline at end of file diff --git a/deno/types/index.d.ts b/deno/types/index.d.ts index 64a00a4c..44a07af0 100644 --- a/deno/types/index.d.ts +++ b/deno/types/index.d.ts @@ -179,9 +179,17 @@ type Rest = T extends TemplateStringsArray ? never : // force fallback to the tagged template function overload T extends string ? readonly string[] : T extends readonly any[][] ? readonly [] : - T extends readonly (object & infer R)[] ? readonly (Keys & keyof R)[] : + T extends readonly (object & infer R)[] ? ( + readonly (Keys & keyof R)[] // sql(data, "prop", "prop2") syntax + | + [readonly (Keys & keyof R)[]] // sql(data, ["prop", "prop2"]) syntax + ) : T extends readonly any[] ? readonly [] : - T extends object ? readonly (Keys & keyof T)[] : + T extends object ? ( + readonly (Keys & keyof T)[] // sql(data, "prop", "prop2") syntax + | + [readonly (Keys & keyof T)[]] // sql(data, ["prop", "prop2"]) syntax + ) : any type Return = @@ -323,8 +331,18 @@ declare namespace postgres { * @default 'postgres.js' */ application_name: string; + default_transaction_isolation: 'read uncommitted' | 'read committed' | 'repeatable read' | 'serializable', + default_transaction_read_only: boolean, + default_transaction_deferrable: boolean, + statement_timeout: number, + lock_timeout: number, + idle_in_transaction_session_timeout: number, + idle_session_timeout: number, + DateStyle: string, + IntervalStyle: string, + TimeZone: string, /** Other connection parameters */ - [name: string]: string; + [name: string]: string | number | boolean; } interface Options> extends Partial> { @@ -440,7 +458,8 @@ declare namespace postgres { | 'NOT_TAGGED_CALL' | 'UNDEFINED_VALUE' | 'MAX_PARAMETERS_EXCEEDED' - | 'SASL_SIGNATURE_MISMATCH'; + | 'SASL_SIGNATURE_MISMATCH' + | 'UNSAFE_TRANSACTION'; message: string; } @@ -583,6 +602,7 @@ declare namespace postgres { type RowList = T & Iterable> & ResultQueryMeta; interface PendingQueryModifiers { + simple(): this; readable(): Promise; writable(): Promise; @@ -674,7 +694,7 @@ declare namespace postgres { listen(channel: string, onnotify: (value: string) => void, onlisten?: (() => void) | undefined): ListenRequest; notify(channel: string, payload: string): PendingRequest; - subscribe(event: string, cb: (row: Row | null, info: ReplicationEvent) => void, onsubscribe?: (() => void) | undefined): Promise; + subscribe(event: string, cb: (row: Row | null, info: ReplicationEvent) => void, onsubscribe?: (() => void), onerror?: (() => any)): Promise; largeObject(oid?: number | undefined, /** @default 0x00020000 | 0x00040000 */ mode?: number | undefined): Promise; @@ -685,6 +705,8 @@ declare namespace postgres { file(path: string | Buffer | URL | number, options?: { cache?: boolean | undefined } | undefined): PendingQuery; file(path: string | Buffer | URL | number, args: (ParameterOrJSON)[], options?: { cache?: boolean | undefined } | undefined): PendingQuery; json(value: JSONValue): Parameter; + + reserve(): Promise> } interface UnsafeQueryOptions { @@ -701,6 +723,10 @@ declare namespace postgres { prepare(name: string): Promise>; } + + interface ReservedSql = {}> extends Sql { + release(): void; + } } export = postgres; diff --git a/package.json b/package.json index c9d00db5..65157609 100644 --- a/package.json +++ b/package.json @@ -1,18 +1,22 @@ { "name": "postgres", - "version": "3.3.5", + "version": "3.4.7", "description": "Fastest full featured PostgreSQL client for Node.js", "type": "module", "module": "src/index.js", "main": "cjs/src/index.js", "exports": { - "worker": "./cf/src/index.js", "types": "./types/index.d.ts", + "bun": "./src/index.js", + "workerd": "./cf/src/index.js", "import": "./src/index.js", "default": "./cjs/src/index.js" }, "types": "types/index.d.ts", "typings": "types/index.d.ts", + "engines": { + "node": ">=12" + }, "scripts": { "build": "npm run build:cjs && npm run build:deno && npm run build:cf", "build:cjs": "node transpile.cjs", @@ -21,7 +25,7 @@ "test": "npm run test:esm && npm run test:cjs && npm run test:deno", "test:esm": "node tests/index.js", "test:cjs": "npm run build:cjs && cd cjs/tests && node index.js && cd ../../", - "test:deno": "npm run build:deno && cd deno/tests && deno run --unstable --allow-all --unsafely-ignore-certificate-errors index.js && cd ../../", + "test:deno": "npm run build:deno && cd deno/tests && deno run --no-lock --allow-all --unsafely-ignore-certificate-errors index.js && cd ../../", "lint": "eslint src && eslint tests", "prepare": "npm run build", "prepublishOnly": "npm run lint" diff --git a/src/connection.js b/src/connection.js index b4d0f6f1..c3f554aa 100644 --- a/src/connection.js +++ b/src/connection.js @@ -129,7 +129,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose try { x = options.socket ? (await Promise.resolve(options.socket(options))) - : net.Socket() + : new net.Socket() } catch (e) { error(e) return @@ -293,7 +293,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose if (incomings) { incomings.push(x) remaining -= x.length - if (remaining >= 0) + if (remaining > 0) return } @@ -385,13 +385,20 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function queryError(query, err) { - query.reject(Object.create(err, { + if (query.reserve) + return query.reject(err) + + if (!err || typeof err !== 'object') + err = new Error(err) + + 'query' in err || 'parameters' in err || Object.defineProperties(err, { stack: { value: err.stack + query.origin.replace(/.*\n/, '\n'), enumerable: options.debug }, query: { value: query.string, enumerable: options.debug }, parameters: { value: query.parameters, enumerable: options.debug }, args: { value: query.args, enumerable: options.debug }, types: { value: query.statement && query.statement.types, enumerable: options.debug } - })) + }) + query.reject(err) } function end() { @@ -428,10 +435,8 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose lifeTimer.cancel() connectTimer.cancel() - if (socket.encrypted) { - socket.removeAllListeners() - socket = null - } + socket.removeAllListeners() + socket = null if (initial) return reconnect() @@ -440,7 +445,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose closedDate = performance.now() hadError && options.shared.retries++ delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000 - onclose(connection) + onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket)) } /* Handlers */ @@ -532,11 +537,14 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose return terminate() } - if (needsTypes) + if (needsTypes) { + initial.reserve && (initial = null) return fetchArrayTypes() + } - execute(initial) - options.shared.retries = retries = initial = 0 + initial && !initial.reserve && execute(initial) + options.shared.retries = retries = 0 + initial = null return } @@ -656,27 +664,30 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose /* c8 ignore next 5 */ async function AuthenticationCleartextPassword() { + const payload = await Pass() write( - b().p().str(await Pass()).z(1).end() + b().p().str(payload).z(1).end() ) } async function AuthenticationMD5Password(x) { - write( - b().p().str( - 'md5' + - (await md5(Buffer.concat([ + const payload = 'md5' + ( + await md5( + Buffer.concat([ Buffer.from(await md5((await Pass()) + user)), x.subarray(9) - ]))) - ).z(1).end() + ]) + ) + ) + write( + b().p().str(payload).z(1).end() ) } async function SASL() { + nonce = (await crypto.randomBytes(18)).toString('base64') b().p().str('SCRAM-SHA-256' + b.N) const i = b.i - nonce = (await crypto.randomBytes(18)).toString('base64') write(b.inc(4).str('n,,n=*,r=' + nonce).i32(b.i - i - 4, i).end()) } @@ -698,12 +709,12 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose serverSignature = (await hmac(await hmac(saltedPassword, 'Server Key'), auth)).toString('base64') + const payload = 'c=biws,r=' + res.r + ',p=' + xor( + clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) + ).toString('base64') + write( - b().p().str( - 'c=biws,r=' + res.r + ',p=' + xor( - clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) - ).toString('base64') - ).end() + b().p().str(payload).end() ) } @@ -783,7 +794,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose const error = Errors.postgres(parseError(x)) query && query.retried ? errored(query.retried) - : query && retryRoutines.has(error.routine) + : query && query.prepared && retryRoutines.has(error.routine) ? retry(query, error) : errored(error) } diff --git a/src/index.js b/src/index.js index 936be5cc..944d50cf 100644 --- a/src/index.js +++ b/src/index.js @@ -201,17 +201,18 @@ function Postgres(a, b) { } async function reserve() { - const q = Queue() + const queue = Queue() const c = open.length ? open.shift() - : await new Promise(r => { - queries.push({ reserve: r }) - closed.length && connect(closed.shift()) + : await new Promise((resolve, reject) => { + const query = { reserve: resolve, reject } + queries.push(query) + closed.length && connect(closed.shift(), query) }) move(c, reserved) - c.reserved = () => q.length - ? c.execute(q.shift()) + c.reserved = () => queue.length + ? c.execute(queue.shift()) : move(c, reserved) c.reserved.release = true @@ -225,7 +226,7 @@ function Postgres(a, b) { function handler(q) { c.queue === full - ? q.push(q) + ? queue.push(q) : c.execute(q) || move(c, full) } } @@ -239,7 +240,10 @@ function Postgres(a, b) { try { await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() - return await scope(connection, fn) + return await Promise.race([ + scope(connection, fn), + new Promise((_, reject) => connection.onclose = reject) + ]) } catch (error) { throw error } @@ -414,9 +418,10 @@ function Postgres(a, b) { : move(c, full) } - function onclose(c) { + function onclose(c, e) { move(c, closed) c.reserved = null + c.onclose && (c.onclose(e), c.onclose = null) options.onclose && options.onclose(c.id) queries.length && connect(c, queries.shift()) } @@ -427,7 +432,7 @@ function parseOptions(a, b) { return a const env = process.env // eslint-disable-line - , o = (typeof a === 'string' ? b : a) || {} + , o = (!a || typeof a === 'string' ? b : a) || {} , { url, multihost } = parseUrl(a) , query = [...url.searchParams].reduce((a, [b, c]) => (a[b] = c, a), {}) , host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost' @@ -437,6 +442,7 @@ function parseOptions(a, b) { o.no_prepare && (o.prepare = false) query.sslmode && (query.ssl = query.sslmode, delete query.sslmode) 'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line + query.sslrootcert === 'system' && (query.ssl = 'verify-full') const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive'] const defaults = { @@ -475,7 +481,7 @@ function parseOptions(a, b) { {} ), connection : { - application_name: 'postgres.js', + application_name: env.PGAPPNAME || 'postgres.js', ...o.connection, ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) }, @@ -528,7 +534,7 @@ function parseTransform(x) { } function parseUrl(url) { - if (typeof url !== 'string') + if (!url || typeof url !== 'string') return { url: { searchParams: new Map() } } let host = url diff --git a/src/query.js b/src/query.js index 848f3b88..0d44a15c 100644 --- a/src/query.js +++ b/src/query.js @@ -37,13 +37,12 @@ export class Query extends Promise { } get origin() { - return this.handler.debug + return (this.handler.debug ? this[originError].stack - : this.tagged - ? originStackCache.has(this.strings) - ? originStackCache.get(this.strings) - : originStackCache.set(this.strings, this[originError].stack).get(this.strings) - : '' + : this.tagged && originStackCache.has(this.strings) + ? originStackCache.get(this.strings) + : originStackCache.set(this.strings, this[originError].stack).get(this.strings) + ) || '' } static get [Symbol.species]() { diff --git a/src/subscribe.js b/src/subscribe.js index 7a70842e..4f8934cc 100644 --- a/src/subscribe.js +++ b/src/subscribe.js @@ -47,7 +47,7 @@ export default function Subscribe(postgres, options) { return subscribe - async function subscribe(event, fn, onsubscribe = noop) { + async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { event = parseEvent(event) if (!connection) @@ -66,6 +66,7 @@ export default function Subscribe(postgres, options) { return connection.then(x => { connected(x) onsubscribe() + stream && stream.on('error', onerror) return { unsubscribe, state, sql } }) } @@ -103,14 +104,16 @@ export default function Subscribe(postgres, options) { return { stream, state: xs.state } function error(e) { - console.error('Unexpected error during logical streaming - reconnecting', e) + console.error('Unexpected error during logical streaming - reconnecting', e) // eslint-disable-line } function data(x) { - if (x[0] === 0x77) + if (x[0] === 0x77) { parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) - else if (x[0] === 0x6b && x[17]) + } else if (x[0] === 0x6b && x[17]) { + state.lsn = x.subarray(1, 9) pong() + } } function handle(a, b) { diff --git a/tests/bootstrap.js b/tests/bootstrap.js index 0070c7b7..f877543a 100644 --- a/tests/bootstrap.js +++ b/tests/bootstrap.js @@ -1,15 +1,19 @@ import { spawnSync } from 'child_process' +exec('dropdb', ['postgres_js_test']) + exec('psql', ['-c', 'alter system set ssl=on']) +exec('psql', ['-c', 'drop user postgres_js_test']) exec('psql', ['-c', 'create user postgres_js_test']) exec('psql', ['-c', 'alter system set password_encryption=md5']) exec('psql', ['-c', 'select pg_reload_conf()']) +exec('psql', ['-c', 'drop user if exists postgres_js_test_md5']) exec('psql', ['-c', 'create user postgres_js_test_md5 with password \'postgres_js_test_md5\'']) exec('psql', ['-c', 'alter system set password_encryption=\'scram-sha-256\'']) exec('psql', ['-c', 'select pg_reload_conf()']) +exec('psql', ['-c', 'drop user if exists postgres_js_test_scram']) exec('psql', ['-c', 'create user postgres_js_test_scram with password \'postgres_js_test_scram\'']) -exec('dropdb', ['postgres_js_test']) exec('createdb', ['postgres_js_test']) exec('psql', ['-c', 'grant all on database postgres_js_test to postgres_js_test']) exec('psql', ['-c', 'alter database postgres_js_test owner to postgres_js_test']) diff --git a/tests/index.js b/tests/index.js index d1d72b53..07ff98ed 100644 --- a/tests/index.js +++ b/tests/index.js @@ -429,6 +429,30 @@ t('Reconnect using SSL', { timeout: 2 }, async() => { return [1, (await sql`select 1 as x`)[0].x] }) +t('Proper handling of non object Errors', async() => { + const sql = postgres({ socket: () => { throw 'wat' } }) // eslint-disable-line + + return [ + 'wat', await sql`select 1 as x`.catch(e => e.message) + ] +}) + +t('Proper handling of null Errors', async() => { + const sql = postgres({ socket: () => { throw null } }) // eslint-disable-line + + return [ + 'null', await sql`select 1 as x`.catch(e => e.message) + ] +}) + +t('Ensure reserve on connection throws proper error', async() => { + const sql = postgres({ socket: () => { throw 'wat' }, idle_timeout }) // eslint-disable-line + + return [ + 'wat', await sql.reserve().catch(e => e) + ] +}) + t('Login without password', async() => { return [true, (await postgres({ ...options, ...login })`select true as x`)[0].x] }) @@ -1789,6 +1813,32 @@ t('Recreate prepared statements on RevalidateCachedQuery error', async() => { ] }) +t('Properly throws routine error on not prepared statements', async() => { + await sql`create table x (x text[])` + const { routine } = await sql.unsafe(` + insert into x(x) values (('a', 'b')) + `).catch(e => e) + + return ['transformAssignedExpr', routine, await sql`drop table x`] +}) + +t('Properly throws routine error on not prepared statements in transaction', async() => { + const { routine } = await sql.begin(sql => [ + sql`create table x (x text[])`, + sql`insert into x(x) values (('a', 'b'))` + ]).catch(e => e) + + return ['transformAssignedExpr', routine] +}) + +t('Properly throws routine error on not prepared statements using file', async() => { + const { routine } = await sql.unsafe(` + create table x (x text[]); + insert into x(x) values (('a', 'b')); + `, { prepare: true }).catch(e => e) + + return ['transformAssignedExpr', routine] +}) t('Catches connection config errors', async() => { const sql = postgres({ ...options, user: { toString: () => { throw new Error('wat') } }, database: 'prut' }) @@ -2134,7 +2184,7 @@ t('Execute', async() => { t('Cancel running query', async() => { const query = sql`select pg_sleep(2)` - setTimeout(() => query.cancel(), 200) + setTimeout(() => query.cancel(), 500) const error = await query.catch(x => x) return ['57014', error.code] }) @@ -2348,11 +2398,22 @@ t('Ensure reconnect after max_lifetime with transactions', { timeout: 5 }, async return [true, true] }) + +t('Ensure transactions throw if connection is closed dwhile there is no query', async() => { + const sql = postgres(options) + const x = await sql.begin(async() => { + setTimeout(() => sql.end({ timeout: 0 }), 10) + await new Promise(r => setTimeout(r, 200)) + return sql`select 1` + }).catch(x => x) + return ['CONNECTION_CLOSED', x.code] +}) + t('Custom socket', {}, async() => { let result const sql = postgres({ socket: () => new Promise((resolve, reject) => { - const socket = net.Socket() + const socket = new net.Socket() socket.connect(5432) socket.once('data', x => result = x[0]) socket.on('error', reject) @@ -2532,3 +2593,24 @@ t('reserve connection', async() => { xs.map(x => x.x).join('') ] }) + +t('arrays in reserved connection', async() => { + const reserved = await sql.reserve() + const [{ x }] = await reserved`select array[1, 2, 3] as x` + reserved.release() + + return [ + '123', + x.join('') + ] +}) + +t('Ensure reserve on query throws proper error', async() => { + const sql = postgres({ idle_timeout }) // eslint-disable-line + const reserved = await sql.reserve() + const [{ x }] = await reserved`select 'wat' as x` + + return [ + 'wat', x, reserved.release() + ] +}) diff --git a/transpile.deno.js b/transpile.deno.js index 923ac9af..f077677b 100644 --- a/transpile.deno.js +++ b/transpile.deno.js @@ -55,7 +55,7 @@ function transpile(x, name, folder) { .replace('{ spawnSync }', '{ spawn }') } if (name === 'index.js') - x += '\n;window.addEventListener("unload", () => Deno.exit(process.exitCode))' + x += '\n;globalThis.addEventListener("unload", () => Deno.exit(process.exitCode))' } const buffer = x.includes('Buffer') diff --git a/types/index.d.ts b/types/index.d.ts index ab797ee4..eb604918 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -177,9 +177,17 @@ type Rest = T extends TemplateStringsArray ? never : // force fallback to the tagged template function overload T extends string ? readonly string[] : T extends readonly any[][] ? readonly [] : - T extends readonly (object & infer R)[] ? readonly (Keys & keyof R)[] : + T extends readonly (object & infer R)[] ? ( + readonly (Keys & keyof R)[] // sql(data, "prop", "prop2") syntax + | + [readonly (Keys & keyof R)[]] // sql(data, ["prop", "prop2"]) syntax + ) : T extends readonly any[] ? readonly [] : - T extends object ? readonly (Keys & keyof T)[] : + T extends object ? ( + readonly (Keys & keyof T)[] // sql(data, "prop", "prop2") syntax + | + [readonly (Keys & keyof T)[]] // sql(data, ["prop", "prop2"]) syntax + ) : any type Return = @@ -321,8 +329,18 @@ declare namespace postgres { * @default 'postgres.js' */ application_name: string; + default_transaction_isolation: 'read uncommitted' | 'read committed' | 'repeatable read' | 'serializable', + default_transaction_read_only: boolean, + default_transaction_deferrable: boolean, + statement_timeout: number, + lock_timeout: number, + idle_in_transaction_session_timeout: number, + idle_session_timeout: number, + DateStyle: string, + IntervalStyle: string, + TimeZone: string, /** Other connection parameters */ - [name: string]: string; + [name: string]: string | number | boolean; } interface Options> extends Partial> { @@ -438,7 +456,8 @@ declare namespace postgres { | 'NOT_TAGGED_CALL' | 'UNDEFINED_VALUE' | 'MAX_PARAMETERS_EXCEEDED' - | 'SASL_SIGNATURE_MISMATCH'; + | 'SASL_SIGNATURE_MISMATCH' + | 'UNSAFE_TRANSACTION'; message: string; } @@ -581,6 +600,7 @@ declare namespace postgres { type RowList = T & Iterable> & ResultQueryMeta; interface PendingQueryModifiers { + simple(): this; readable(): Promise; writable(): Promise; @@ -672,7 +692,7 @@ declare namespace postgres { listen(channel: string, onnotify: (value: string) => void, onlisten?: (() => void) | undefined): ListenRequest; notify(channel: string, payload: string): PendingRequest; - subscribe(event: string, cb: (row: Row | null, info: ReplicationEvent) => void, onsubscribe?: (() => void) | undefined): Promise; + subscribe(event: string, cb: (row: Row | null, info: ReplicationEvent) => void, onsubscribe?: (() => void), onerror?: (() => any)): Promise; largeObject(oid?: number | undefined, /** @default 0x00020000 | 0x00040000 */ mode?: number | undefined): Promise; @@ -683,6 +703,8 @@ declare namespace postgres { file(path: string | Buffer | URL | number, options?: { cache?: boolean | undefined } | undefined): PendingQuery; file(path: string | Buffer | URL | number, args: (ParameterOrJSON)[], options?: { cache?: boolean | undefined } | undefined): PendingQuery; json(value: JSONValue): Parameter; + + reserve(): Promise> } interface UnsafeQueryOptions { @@ -699,6 +721,10 @@ declare namespace postgres { prepare(name: string): Promise>; } + + interface ReservedSql = {}> extends Sql { + release(): void; + } } export = postgres;