Skip to content

Commit 389d5d8

Browse files
committed
Fix named portal being left open
When code was added to use a random named portal instead of the empty portal to support redshift we didn't update the close messages approprately. This could result in postgres keeping locks on tables for modification if streaming and table modification was both done within a transaction. This update fixes the issue by always issuing a close command on the named portal when query is finished. fixes brianc#56
1 parent 6d47026 commit 389d5d8

File tree

4 files changed

+49
-8
lines changed

4 files changed

+49
-8
lines changed

.eslintrc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
{
22
"extends": ["eslint:recommended"],
3+
"parserOptions": {
4+
"ecmaVersion": 2017
5+
},
36
"plugins": ["prettier"],
47
"rules": {
58
"prettier/prettier": "error",

.travis.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ language: node_js
22
dist: trusty
33
sudo: false
44
node_js:
5-
- "8"
6-
- "10"
7-
- "12"
5+
- '8'
6+
- '10'
7+
- '12'
88
env:
99
- PGUSER=postgres
1010
services:
1111
- postgresql
1212
addons:
13-
postgresql: "9.6"
13+
postgresql: '9.6'
1414
before_script:
1515
- psql -c 'create database travis;' -U postgres | true

index.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ Cursor.prototype._shiftQueue = function() {
7575
}
7676
}
7777

78+
Cursor.prototype._closePortal = function() {
79+
// because we opened a named portal to stream results
80+
// we need to close the same named portal. Leaving a named portal
81+
// open can lock tables for modification if inside a transaction.
82+
// see https://github.com/brianc/node-pg-cursor/issues/56
83+
this.connection.close({ type: 'P', name: this._portal })
84+
this.connection.sync()
85+
}
86+
7887
Cursor.prototype.handleRowDescription = function(msg) {
7988
this._result.addFields(msg.fields)
8089
this.state = 'idle'
@@ -105,7 +114,7 @@ Cursor.prototype._sendRows = function() {
105114

106115
Cursor.prototype.handleCommandComplete = function(msg) {
107116
this._result.addCommandComplete(msg)
108-
this.connection.sync()
117+
this._closePortal()
109118
}
110119

111120
Cursor.prototype.handlePortalSuspended = function() {
@@ -114,8 +123,8 @@ Cursor.prototype.handlePortalSuspended = function() {
114123

115124
Cursor.prototype.handleReadyForQuery = function() {
116125
this._sendRows()
117-
this.emit('end', this._result)
118126
this.state = 'done'
127+
this.emit('end', this._result)
119128
}
120129

121130
Cursor.prototype.handleEmptyQuery = function() {
@@ -166,8 +175,7 @@ Cursor.prototype.close = function(cb) {
166175
if (this.state === 'done') {
167176
return setImmediate(cb)
168177
}
169-
this.connection.close({ type: 'P' })
170-
this.connection.sync()
178+
this._closePortal()
171179
this.state = 'done'
172180
if (cb) {
173181
this.connection.once('closeComplete', function() {

test/transactions.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
const assert = require('assert')
2+
const Cursor = require('../')
3+
const pg = require('pg')
4+
5+
describe('transactions', () => {
6+
it('can execute multiple statements in a transaction', async () => {
7+
const client = new pg.Client()
8+
await client.connect()
9+
await client.query('begin')
10+
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
11+
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
12+
const rows = await new Promise((resolve, reject) => {
13+
cursor.read(10, (err, rows) => (err ? reject(err) : resolve(rows)))
14+
})
15+
assert.equal(rows.length, 0)
16+
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
17+
await client.end()
18+
})
19+
20+
it('can execute multiple statements in a transaction if ending cursor early', async () => {
21+
const client = new pg.Client()
22+
await client.connect()
23+
await client.query('begin')
24+
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
25+
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
26+
await new Promise(resolve => cursor.close(resolve))
27+
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
28+
await client.end()
29+
})
30+
})

0 commit comments

Comments
 (0)