Skip to content

Commit 8730a31

Browse files
committed
Merge remote branch 'upstream/master'
2 parents 59c5df6 + 91afa89 commit 8730a31

File tree

10 files changed

+161
-14
lines changed

10 files changed

+161
-14
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ Many thanks to the following:
118118
* [homme](https://github.com/homme)
119119
* [bdunavant](https://github.com/bdunavant)
120120
* [tokumine](https://github.com/tokumine)
121+
* [shtylman](https://github.com/shtylman)
122+
* [cricri](https://github.com/cricri)
121123

122124
## Documentation
123125

lib/client.js

+26
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ var Client = function(config) {
2121
this.queryQueue = [];
2222
this.password = config.password || defaults.password;
2323
this.encoding = 'utf8';
24+
this.processID = null;
25+
this.secretKey = null;
2426
var self = this;
2527
};
2628

@@ -59,6 +61,11 @@ p.connect = function(callback) {
5961
con.password(md5password);
6062
});
6163

64+
con.once('backendKeyData', function(msg) {
65+
self.processID = msg.processID;
66+
self.secretKey = msg.secretKey;
67+
});
68+
6269
//hook up query handling events to connection
6370
//after the connection initially becomes ready for queries
6471
con.once('readyForQuery', function() {
@@ -130,6 +137,25 @@ p.connect = function(callback) {
130137

131138
};
132139

140+
p.cancel = function(client, query) {
141+
if (client.activeQuery == query) {
142+
var con = this.connection;
143+
144+
if(this.host && this.host.indexOf('/') === 0) {
145+
con.connect(this.host + '/.s.PGSQL.' + this.port);
146+
} else {
147+
con.connect(this.port, this.host);
148+
}
149+
150+
//once connection is established send cancel message
151+
con.on('connect', function() {
152+
con.cancel(client.processID, client.secretKey);
153+
});
154+
}
155+
else if (client.queryQueue.indexOf(query) != -1)
156+
client.queryQueue.splice(client.queryQueue.indexOf(query), 1);
157+
};
158+
133159
p._pulseQueryQueue = function() {
134160
if(this.readyForQuery===true) {
135161
this.activeQuery = this.queryQueue.shift();

lib/connection.js

+17
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ p.startup = function(config) {
7373
this.stream.write(buffer);
7474
};
7575

76+
p.cancel = function(processID, secretKey) {
77+
var bodyBuffer = this.writer
78+
.addInt16(1234)
79+
.addInt16(5678)
80+
.addInt32(processID)
81+
.addInt32(secretKey)
82+
.addCString('').flush();
83+
84+
var length = bodyBuffer.length + 4;
85+
86+
var buffer = new Writer()
87+
.addInt32(length)
88+
.add(bodyBuffer)
89+
.join();
90+
this.stream.write(buffer);
91+
};
92+
7693
p.password = function(password) {
7794
//0x70 = 'p'
7895
this._send(0x70, this.writer.addCString(password));

lib/index.js

+10
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ PG.prototype.connect = function(config, callback) {
8181
return pool.acquire(cb);
8282
}
8383

84+
// cancel the query runned by the given client
85+
PG.prototype.cancel = function(config, client, query) {
86+
var c = config;
87+
//allow for no config to be passed
88+
if(typeof c === 'function')
89+
c = defaults;
90+
var cancellingClient = new this.Client(c);
91+
cancellingClient.cancel(client, query);
92+
}
93+
8494
module.exports = new PG(Client);
8595

8696
//lazy require native module...the native module may not have installed

lib/native/index.js

+11-2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ p.query = function(config, values, callback) {
5656
return q;
5757
}
5858

59+
var nativeCancel = p.cancel;
60+
61+
p.cancel = function(client, query) {
62+
if (client._activeQuery == query)
63+
this.connect(nativeCancel.bind(client));
64+
else if (client._queryQueue.indexOf(query) != -1)
65+
client._queryQueue.splice(client._queryQueue.indexOf(query), 1);
66+
};
67+
5968
p._pulseQueryQueue = function(initialConnection) {
6069
if(!this._connected) {
6170
return;
@@ -94,8 +103,8 @@ p.pauseDrain = function() {
94103
};
95104

96105
p.resumeDrain = function() {
97-
if(this._drainPaused > 1) {
98-
this.emit('drain')
106+
if(this._drainPaused > 1) {
107+
this.emit('drain')
99108
};
100109
this._drainPaused = 0;
101110
};

lib/writer.js

+22-10
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//same buffer to avoid memcpy and limit memory allocations
44
var Writer = function(size) {
55
this.size = size || 1024;
6-
this.buffer = new Buffer(this.size + 5);
6+
this.buffer = Buffer(this.size + 5);
77
this.offset = 5;
88
this.headerPosition = 0;
99
};
@@ -15,7 +15,7 @@ p._ensure = function(size) {
1515
var remaining = this.buffer.length - this.offset;
1616
if(remaining < size) {
1717
var oldBuffer = this.buffer;
18-
this.buffer = Buffer(oldBuffer.length + size);
18+
this.buffer = new Buffer(oldBuffer.length + size);
1919
oldBuffer.copy(this.buffer);
2020
}
2121
}
@@ -36,24 +36,36 @@ p.addInt16 = function(num) {
3636
return this;
3737
}
3838

39+
//for versions of node requiring 'length' as 3rd argument to buffer.write
40+
var writeString = function(buffer, string, offset, len) {
41+
buffer.write(string, offset, len);
42+
}
43+
44+
//overwrite function for older versions of node
45+
if(Buffer.prototype.write.length === 3) {
46+
writeString = function(buffer, string, offset, len) {
47+
buffer.write(string, offset);
48+
}
49+
}
50+
3951
p.addCString = function(string) {
4052
//just write a 0 for empty or null strings
4153
if(!string) {
4254
this._ensure(1);
43-
this.buffer[this.offset++] = 0;
44-
return this;
55+
} else {
56+
var len = Buffer.byteLength(string);
57+
this._ensure(len + 1); //+1 for null terminator
58+
writeString(this.buffer, string, this.offset, len);
59+
this.offset += len;
4560
}
46-
var len = Buffer.byteLength(string) + 1;
47-
this._ensure(len);
48-
this.buffer.write(string, this.offset);
49-
this.offset += len;
50-
this.buffer[this.offset] = 0; //add null terminator
61+
62+
this.buffer[this.offset++] = 0; // null terminator
5163
return this;
5264
}
5365

5466
p.addChar = function(char) {
5567
this._ensure(1);
56-
this.buffer.write(char, this.offset);
68+
writeString(this.buffer, char, this.offset, 1);
5769
this.offset++;
5870
return this;
5971
}

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{ "name": "pg",
2-
"version": "0.6.3",
2+
"version": "0.6.6",
33
"description": "PostgreSQL client - pure javascript & libpq with the same API",
44
"keywords" : ["postgres", "pg", "libpq", "postgre", "database", "rdbms"],
55
"homepage": "http://github.com/brianc/node-postgres",

script/create-test-tables.js

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
var sys = require('utils');
21
var args = require(__dirname + '/../test/cli');
32
var pg = require(__dirname + '/../lib');
43

src/binding.cc

+26
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class Connection : public ObjectWrap {
6969
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams);
7070
NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare);
7171
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared);
72+
NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel);
7273
NODE_SET_PROTOTYPE_METHOD(t, "end", End);
7374

7475
target->Set(String::NewSymbol("Connection"), t->GetFunction());
@@ -104,6 +105,22 @@ class Connection : public ObjectWrap {
104105
return Undefined();
105106
}
106107

108+
//v8 entry point into Connection#cancel
109+
static Handle<Value>
110+
Cancel(const Arguments& args)
111+
{
112+
HandleScope scope;
113+
Connection *self = ObjectWrap::Unwrap<Connection>(args.This());
114+
115+
bool success = self->Cancel();
116+
if(!success) {
117+
self -> EmitLastError();
118+
self -> DestroyConnection();
119+
}
120+
121+
return Undefined();
122+
}
123+
107124
//v8 entry point into Connection#_sendQuery
108125
static Handle<Value>
109126
SendQuery(const Arguments& args)
@@ -267,6 +284,15 @@ class Connection : public ObjectWrap {
267284
return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0);
268285
}
269286

287+
int Cancel()
288+
{
289+
PGcancel* pgCancel = PQgetCancel(connection_);
290+
char errbuf[256];
291+
int result = PQcancel(pgCancel, errbuf, 256);
292+
PQfreeCancel(pgCancel);
293+
return result;
294+
}
295+
270296
//flushes socket
271297
void Flush()
272298
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
var helper = require(__dirname+"/test-helper");
2+
3+
//before running this test make sure you run the script create-test-tables
4+
test("cancellation of a query", function() {
5+
6+
var client = helper.client();
7+
8+
var qry = client.query("select name from person order by name");
9+
10+
client.on('drain', client.end.bind(client));
11+
12+
var rows1 = 0, rows2 = 0, rows3 = 0, rows4 = 0;
13+
14+
var query1 = client.query(qry);
15+
query1.on('row', function(row) {
16+
rows1++;
17+
});
18+
var query2 = client.query(qry);
19+
query2.on('row', function(row) {
20+
rows2++;
21+
});
22+
var query3 = client.query(qry);
23+
query3.on('row', function(row) {
24+
rows3++;
25+
});
26+
var query4 = client.query(qry);
27+
query4.on('row', function(row) {
28+
rows4++;
29+
});
30+
31+
helper.pg.cancel(helper.connectionString, client, query1);
32+
helper.pg.cancel(helper.connectionString, client, query2);
33+
helper.pg.cancel(helper.connectionString, client, query4);
34+
35+
setTimeout(function() {
36+
assert.equal(rows1, 0);
37+
assert.equal(rows2, 0);
38+
assert.equal(rows4, 0);
39+
}, 2000);
40+
41+
assert.emits(query3, 'end', function() {
42+
test("returned right number of rows", function() {
43+
assert.equal(rows3, 26);
44+
});
45+
});
46+
});

0 commit comments

Comments
 (0)