Skip to content

Commit 4fde8b7

Browse files
authored
Fix double readyForQuery (brianc#2420)
This is fixing a double readyForQuery message being sent from the backend (because we were calling sync after an error, which I already fixed in the main driver). Also closes brianc#2333
1 parent c6aa29a commit 4fde8b7

File tree

2 files changed

+90
-6
lines changed

2 files changed

+90
-6
lines changed

packages/pg-cursor/index.js

+21-6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ Cursor.prototype._rowDescription = function () {
3737
}
3838

3939
Cursor.prototype.submit = function (connection) {
40+
this.state = 'submitted'
4041
this.connection = connection
4142
this._portal = 'C_' + nextUniqueID++
4243

@@ -87,7 +88,12 @@ Cursor.prototype._closePortal = function () {
8788
// open can lock tables for modification if inside a transaction.
8889
// see https://github.com/brianc/node-pg-cursor/issues/56
8990
this.connection.close({ type: 'P', name: this._portal })
90-
this.connection.sync()
91+
92+
// If we've received an error we already sent a sync message.
93+
// do not send another sync as it triggers another readyForQuery message.
94+
if (this.state !== 'error') {
95+
this.connection.sync()
96+
}
9197
}
9298

9399
Cursor.prototype.handleRowDescription = function (msg) {
@@ -138,8 +144,18 @@ Cursor.prototype.handleEmptyQuery = function () {
138144
}
139145

140146
Cursor.prototype.handleError = function (msg) {
141-
this.connection.removeListener('noData', this._ifNoData)
142-
this.connection.removeListener('rowDescription', this._rowDescription)
147+
// If we're in an initialized state we've never been submitted
148+
// and don't have a connection instance reference yet.
149+
// This can happen if you queue a stream and close the client before
150+
// the client has submitted the stream. In this scenario we don't have
151+
// a connection so there's nothing to unsubscribe from.
152+
if (this.state !== 'initialized') {
153+
this.connection.removeListener('noData', this._ifNoData)
154+
this.connection.removeListener('rowDescription', this._rowDescription)
155+
// call sync to trigger a readyForQuery
156+
this.connection.sync()
157+
}
158+
143159
this.state = 'error'
144160
this._error = msg
145161
// satisfy any waiting callback
@@ -155,8 +171,6 @@ Cursor.prototype.handleError = function (msg) {
155171
// only dispatch error events if we have a listener
156172
this.emit('error', msg)
157173
}
158-
// call sync to keep this connection from hanging
159-
this.connection.sync()
160174
}
161175

162176
Cursor.prototype._getRows = function (rows, cb) {
@@ -189,6 +203,7 @@ Cursor.prototype.close = function (cb) {
189203
return
190204
}
191205
}
206+
192207
this._closePortal()
193208
this.state = 'done'
194209
if (cb) {
@@ -199,7 +214,7 @@ Cursor.prototype.close = function (cb) {
199214
}
200215

201216
Cursor.prototype.read = function (rows, cb) {
202-
if (this.state === 'idle') {
217+
if (this.state === 'idle' || this.state === 'submitted') {
203218
return this._getRows(rows, cb)
204219
}
205220
if (this.state === 'busy' || this.state === 'initialized') {

packages/pg-query-stream/test/error.ts

+69
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import assert from 'assert'
22
import helper from './helper'
33
import QueryStream from '../src'
4+
import { Pool, Client } from 'pg'
45

56
helper('error', function (client) {
67
it('receives error on stream', function (done) {
@@ -21,3 +22,71 @@ helper('error', function (client) {
2122
client.query('SELECT NOW()', done)
2223
})
2324
})
25+
26+
describe('error recovery', () => {
27+
// created from https://github.com/chrisdickinson/pg-test-case
28+
it('recovers from a streaming error in a transaction', async () => {
29+
const pool = new Pool()
30+
const client = await pool.connect()
31+
await client.query(`CREATE TEMP TABLE frobnicators (
32+
id serial primary key,
33+
updated timestamp
34+
)`)
35+
await client.query(`BEGIN;`)
36+
const query = new QueryStream(`INSERT INTO frobnicators ("updated") VALUES ($1) RETURNING "id"`, [Date.now()])
37+
let error: Error | undefined = undefined
38+
query.on('data', console.log).on('error', (e) => {
39+
error = e
40+
})
41+
client.query(query) // useless callback necessitated by an older version of honeycomb-beeline
42+
43+
await client.query(`ROLLBACK`)
44+
assert(error, 'Error should not be undefined')
45+
const { rows } = await client.query('SELECT NOW()')
46+
assert.strictEqual(rows.length, 1)
47+
client.release()
48+
const client2 = await pool.connect()
49+
await client2.query(`BEGIN`)
50+
client2.release()
51+
pool.end()
52+
})
53+
54+
// created from https://github.com/brianc/node-postgres/pull/2333
55+
it('handles an error on a stream after a plain text non-stream error', async () => {
56+
const client = new Client()
57+
const stmt = 'SELECT * FROM goose;'
58+
await client.connect()
59+
return new Promise((resolve, reject) => {
60+
client.query(stmt).catch((e) => {
61+
assert(e, 'Query should have rejected with an error')
62+
const stream = new QueryStream('SELECT * FROM duck')
63+
client.query(stream)
64+
stream.on('data', () => {})
65+
stream.on('error', () => {
66+
client.end((err) => {
67+
err ? reject(err) : resolve()
68+
})
69+
})
70+
})
71+
})
72+
})
73+
74+
it('does not crash when closing a connection with a queued stream', async () => {
75+
const client = new Client()
76+
const stmt = 'SELECT * FROM goose;'
77+
await client.connect()
78+
return new Promise(async (resolve) => {
79+
let queryError: Error | undefined
80+
client.query(stmt).catch((e) => {
81+
queryError = e
82+
})
83+
const stream = client.query(new QueryStream(stmt))
84+
stream.on('data', () => {})
85+
stream.on('error', () => {
86+
assert(queryError, 'query should have errored due to client ending')
87+
resolve()
88+
})
89+
await client.end()
90+
})
91+
})
92+
})

0 commit comments

Comments
 (0)