Skip to content

Commit d28d01d

Browse files
johanneswuerbachbrianc
authored andcommitted
Prevent requeuing a broken client
If a client is not queryable, the pool should prevent requeuing instead of strictly enforcing errors to be propagated back to it.
1 parent 7aee261 commit d28d01d

File tree

2 files changed

+77
-18
lines changed

2 files changed

+77
-18
lines changed

packages/pg-pool/index.js

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,45 @@ const removeWhere = (list, predicate) => {
1212
}
1313

1414
class IdleItem {
15-
constructor (client, idleListener, timeoutId) {
15+
constructor(client, idleListener, timeoutId) {
1616
this.client = client
1717
this.idleListener = idleListener
1818
this.timeoutId = timeoutId
1919
}
2020
}
2121

2222
class PendingItem {
23-
constructor (callback) {
23+
constructor(callback) {
2424
this.callback = callback
2525
}
2626
}
2727

28-
function promisify (Promise, callback) {
28+
function throwOnRelease() {
29+
throw new Error('Release called on client which has already been released to the pool.')
30+
}
31+
32+
function release(client, err) {
33+
client.release = throwOnRelease
34+
if (err || this.ending || !client._queryable || client._ending) {
35+
this._remove(client)
36+
this._pulseQueue()
37+
return
38+
}
39+
40+
// idle timeout
41+
let tid
42+
if (this.options.idleTimeoutMillis) {
43+
tid = setTimeout(() => {
44+
this.log('remove idle client')
45+
this._remove(client)
46+
}, this.options.idleTimeoutMillis)
47+
}
48+
49+
this._idle.push(new IdleItem(client, tid))
50+
this._pulseQueue()
51+
}
52+
53+
function promisify(Promise, callback) {
2954
if (callback) {
3055
return { callback: callback, result: undefined }
3156
}
@@ -41,8 +66,8 @@ function promisify (Promise, callback) {
4166
return { callback: cb, result: result }
4267
}
4368

44-
function makeIdleListener (pool, client) {
45-
return function idleListener (err) {
69+
function makeIdleListener(pool, client) {
70+
return function idleListener(err) {
4671
err.client = client
4772

4873
client.removeListener('error', idleListener)
@@ -57,7 +82,7 @@ function makeIdleListener (pool, client) {
5782
}
5883

5984
class Pool extends EventEmitter {
60-
constructor (options, Client) {
85+
constructor(options, Client) {
6186
super()
6287
this.options = Object.assign({}, options)
6388

@@ -89,11 +114,11 @@ class Pool extends EventEmitter {
89114
this.ended = false
90115
}
91116

92-
_isFull () {
117+
_isFull() {
93118
return this._clients.length >= this.options.max
94119
}
95120

96-
_pulseQueue () {
121+
_pulseQueue() {
97122
this.log('pulse queue')
98123
if (this.ended) {
99124
this.log('pulse queue ended')
@@ -136,7 +161,7 @@ class Pool extends EventEmitter {
136161
throw new Error('unexpected condition')
137162
}
138163

139-
_remove (client) {
164+
_remove(client) {
140165
const removed = removeWhere(
141166
this._idle,
142167
item => item.client === client
@@ -151,7 +176,7 @@ class Pool extends EventEmitter {
151176
this.emit('remove', client)
152177
}
153178

154-
connect (cb) {
179+
connect(cb) {
155180
if (this.ending) {
156181
const err = new Error('Cannot use a pool after calling end on the pool')
157182
return cb ? cb(err) : this.Promise.reject(err)
@@ -197,7 +222,7 @@ class Pool extends EventEmitter {
197222
return result
198223
}
199224

200-
newClient (pendingItem) {
225+
newClient(pendingItem) {
201226
const client = new this.Client(this.options)
202227
this._clients.push(client)
203228
const idleListener = makeIdleListener(this, client)
@@ -245,7 +270,7 @@ class Pool extends EventEmitter {
245270
}
246271

247272
// acquire a client for a pending work item
248-
_acquireClient (client, pendingItem, idleListener, isNew) {
273+
_acquireClient(client, pendingItem, idleListener, isNew) {
249274
if (isNew) {
250275
this.emit('connect', client)
251276
}
@@ -289,7 +314,7 @@ class Pool extends EventEmitter {
289314

290315
// release a client back to the poll, include an error
291316
// to remove it from the pool
292-
_release (client, idleListener, err) {
317+
_release(client, idleListener, err) {
293318
client.on('error', idleListener)
294319

295320
if (err || this.ending) {
@@ -311,7 +336,7 @@ class Pool extends EventEmitter {
311336
this._pulseQueue()
312337
}
313338

314-
query (text, values, cb) {
339+
query(text, values, cb) {
315340
// guard clause against passing a function as the first parameter
316341
if (typeof text === 'function') {
317342
const response = promisify(this.Promise, text)
@@ -364,7 +389,7 @@ class Pool extends EventEmitter {
364389
return response.result
365390
}
366391

367-
end (cb) {
392+
end(cb) {
368393
this.log('ending')
369394
if (this.ending) {
370395
const err = new Error('Called end on pool more than once')
@@ -377,15 +402,15 @@ class Pool extends EventEmitter {
377402
return promised.result
378403
}
379404

380-
get waitingCount () {
405+
get waitingCount() {
381406
return this._pendingQueue.length
382407
}
383408

384-
get idleCount () {
409+
get idleCount() {
385410
return this._idle.length
386411
}
387412

388-
get totalCount () {
413+
get totalCount() {
389414
return this._clients.length
390415
}
391416
}

packages/pg-pool/test/error-handling.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,40 @@ describe('pool error handling', function () {
181181
})
182182
})
183183

184+
describe('releasing a not queryable client', () => {
185+
it('removes the client from the pool', (done) => {
186+
const pool = new Pool({ max: 1 })
187+
const connectionError = new Error('connection failed')
188+
189+
pool.once('error', () => {
190+
// Ignore error on pool
191+
})
192+
193+
pool.connect((err, client) => {
194+
expect(err).to.be(undefined)
195+
196+
client.once('error', (err) => {
197+
expect(err).to.eql(connectionError)
198+
199+
// Releasing the client should remove it from the pool,
200+
// whether called with an error or not
201+
client.release()
202+
203+
// Verify that the pool is still usuable and new client has been
204+
// created
205+
pool.query('SELECT $1::text as name', ['brianc'], (err, res) => {
206+
expect(err).to.be(undefined)
207+
expect(res.rows).to.eql([{ name: 'brianc' }])
208+
209+
pool.end(done)
210+
})
211+
})
212+
213+
client.emit('error', connectionError)
214+
})
215+
})
216+
})
217+
184218
describe('pool with lots of errors', () => {
185219
it('continues to work and provide new clients', co.wrap(function* () {
186220
const pool = new Pool({ max: 1 })

0 commit comments

Comments
 (0)