Skip to content

Commit 143ed29

Browse files
committed
More sync messages
1 parent 577e644 commit 143ed29

File tree

2 files changed

+94
-44
lines changed

2 files changed

+94
-44
lines changed

packages/pg/bench.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
const pg = require("./lib");
2+
const pool = new pg.Pool()
3+
4+
const q = {
5+
text:
6+
"select typname, typnamespace, typowner, typlen, typbyval, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray from pg_type where typtypmod = $1 and typisdefined = $2",
7+
values: [-1, true]
8+
};
9+
10+
const exec = async client => {
11+
const result = await client.query({
12+
text: q.text,
13+
values: q.values,
14+
rowMode: "array"
15+
});
16+
};
17+
18+
const bench = async (client, time) => {
19+
let start = Date.now();
20+
let count = 0;
21+
while (true) {
22+
await exec(client);
23+
count++;
24+
if (Date.now() - start > time) {
25+
return count;
26+
}
27+
}
28+
};
29+
30+
const run = async () => {
31+
const client = new pg.Client();
32+
await client.connect();
33+
await bench(client, 1000);
34+
console.log("warmup done");
35+
const seconds = 5;
36+
const queries = await bench(client, seconds * 1000);
37+
console.log("queries:", queries);
38+
console.log("qps", queries / seconds);
39+
console.log("on my laptop best so far seen 713 qps")
40+
await client.end();
41+
};
42+
43+
run().catch(e => console.error(e) || process.exit(-1));

packages/pg/lib/connection.js

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ var Reader = require('packet-reader')
1616

1717
var TEXT_MODE = 0
1818
var BINARY_MODE = 1
19+
1920
var Connection = function (config) {
2021
EventEmitter.call(this)
2122
config = config || {}
2223
this.stream = config.stream || new net.Socket()
24+
this.stream.setNoDelay(true)
2325
this._keepAlive = config.keepAlive
2426
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
2527
this.lastBuffer = false
@@ -87,7 +89,8 @@ Connection.prototype.connect = function (port, host) {
8789
return self.emit('error', new Error('The server does not support SSL connections'))
8890
case 'S': // Server supports SSL connections, continue with a secure connection
8991
break
90-
default: // Any other response byte, including 'E' (ErrorResponse) indicating a server error
92+
default:
93+
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
9194
return self.emit('error', new Error('There was an error establishing an SSL connection'))
9295
}
9396
var tls = require('tls')
@@ -136,8 +139,9 @@ Connection.prototype.attachListeners = function (stream) {
136139

137140
Connection.prototype.requestSsl = function () {
138141
var bodyBuffer = this.writer
139-
.addInt16(0x04D2)
140-
.addInt16(0x162F).flush()
142+
.addInt16(0x04d2)
143+
.addInt16(0x162f)
144+
.flush()
141145

142146
var length = bodyBuffer.length + 4
143147

@@ -149,9 +153,7 @@ Connection.prototype.requestSsl = function () {
149153
}
150154

151155
Connection.prototype.startup = function (config) {
152-
var writer = this.writer
153-
.addInt16(3)
154-
.addInt16(0)
156+
var writer = this.writer.addInt16(3).addInt16(0)
155157

156158
Object.keys(config).forEach(function (key) {
157159
var val = config[key]
@@ -206,8 +208,7 @@ Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initi
206208

207209
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
208210
// 0x70 = 'p'
209-
this.writer
210-
.addString(additionalData)
211+
this.writer.addString(additionalData)
211212

212213
this._send(0x70)
213214
}
@@ -216,11 +217,7 @@ Connection.prototype._send = function (code, more) {
216217
if (!this.stream.writable) {
217218
return false
218219
}
219-
if (more === true) {
220-
this.writer.addHeader(code)
221-
} else {
222-
return this.stream.write(this.writer.flush(code))
223-
}
220+
return this.stream.write(this.writer.flush(code))
224221
}
225222

226223
Connection.prototype.query = function (text) {
@@ -229,8 +226,7 @@ Connection.prototype.query = function (text) {
229226
}
230227

231228
// send parse message
232-
// "more" === true to buffer the message until flush() is called
233-
Connection.prototype.parse = function (query, more) {
229+
Connection.prototype.parse = function (query) {
234230
// expect something like this:
235231
// { name: 'queryName',
236232
// text: 'select * from blah',
@@ -257,12 +253,13 @@ Connection.prototype.parse = function (query, more) {
257253
}
258254

259255
var code = 0x50
260-
this._send(code, more)
256+
this._send(code)
257+
this.flush()
261258
}
262259

263260
// send bind message
264261
// "more" === true to buffer the message until flush() is called
265-
Connection.prototype.bind = function (config, more) {
262+
Connection.prototype.bind = function (config) {
266263
// normalize config
267264
config = config || {}
268265
config.portal = config.portal || ''
@@ -271,13 +268,17 @@ Connection.prototype.bind = function (config, more) {
271268
var values = config.values || []
272269
var len = values.length
273270
var useBinary = false
274-
for (var j = 0; j < len; j++) { useBinary |= values[j] instanceof Buffer }
275-
var buffer = this.writer
276-
.addCString(config.portal)
277-
.addCString(config.statement)
278-
if (!useBinary) { buffer.addInt16(0) } else {
271+
for (var j = 0; j < len; j++) {
272+
useBinary |= values[j] instanceof Buffer
273+
}
274+
var buffer = this.writer.addCString(config.portal).addCString(config.statement)
275+
if (!useBinary) {
276+
buffer.addInt16(0)
277+
} else {
279278
buffer.addInt16(len)
280-
for (j = 0; j < len; j++) { buffer.addInt16(values[j] instanceof Buffer) }
279+
for (j = 0; j < len; j++) {
280+
buffer.addInt16(values[j] instanceof Buffer)
281+
}
281282
}
282283
buffer.addInt16(len)
283284
for (var i = 0; i < len; i++) {
@@ -300,59 +301,63 @@ Connection.prototype.bind = function (config, more) {
300301
buffer.addInt16(0) // format codes to use text
301302
}
302303
// 0x42 = 'B'
303-
this._send(0x42, more)
304+
this._send(0x42)
305+
this.flush()
304306
}
305307

306308
// send execute message
307309
// "more" === true to buffer the message until flush() is called
308-
Connection.prototype.execute = function (config, more) {
310+
Connection.prototype.execute = function (config) {
309311
config = config || {}
310312
config.portal = config.portal || ''
311313
config.rows = config.rows || ''
312-
this.writer
313-
.addCString(config.portal)
314-
.addInt32(config.rows)
314+
this.writer.addCString(config.portal).addInt32(config.rows)
315315

316316
// 0x45 = 'E'
317-
this._send(0x45, more)
317+
this._send(0x45)
318+
this.flush()
318319
}
319320

320321
var emptyBuffer = Buffer.alloc(0)
321322

323+
const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04])
322324
Connection.prototype.flush = function () {
323-
// 0x48 = 'H'
324-
this.writer.add(emptyBuffer)
325-
this._send(0x48)
325+
if (this.stream.writable) {
326+
this.stream.write(flushBuffer)
327+
}
326328
}
327329

330+
const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04])
328331
Connection.prototype.sync = function () {
329-
// clear out any pending data in the writer
330-
this.writer.flush(0)
331-
332-
this.writer.add(emptyBuffer)
333332
this._ending = true
334-
this._send(0x53)
333+
// clear out any pending data in the writer
334+
this.writer.clear()
335+
if (this.stream.writable) {
336+
this.stream.write(syncBuffer)
337+
this.stream.write(flushBuffer)
338+
}
335339
}
336340

337341
const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04])
338342

339343
Connection.prototype.end = function () {
340344
// 0x58 = 'X'
341-
this.writer.add(emptyBuffer)
345+
this.writer.clear()
342346
this._ending = true
343347
return this.stream.write(END_BUFFER, () => {
344348
this.stream.end()
345349
})
346350
}
347351

348-
Connection.prototype.close = function (msg, more) {
352+
Connection.prototype.close = function (msg) {
349353
this.writer.addCString(msg.type + (msg.name || ''))
350-
this._send(0x43, more)
354+
this._send(0x43)
351355
}
352356

353-
Connection.prototype.describe = function (msg, more) {
357+
Connection.prototype.describe = function (msg) {
354358
this.writer.addCString(msg.type + (msg.name || ''))
355-
this._send(0x44, more)
359+
this._send(0x44)
360+
this.flush()
356361
}
357362

358363
Connection.prototype.sendCopyFromChunk = function (chunk) {
@@ -376,8 +381,9 @@ var Message = function (name, length) {
376381

377382
Connection.prototype.parseMessage = function (buffer) {
378383
this.offset = 0
379-
var length = buffer.length + 4
380-
switch (this._reader.header) {
384+
const length = buffer.length + 4;
385+
const code = this._reader.header;
386+
switch (code) {
381387
case 0x52: // R
382388
return this.parseR(buffer, length)
383389

@@ -441,6 +447,7 @@ Connection.prototype.parseMessage = function (buffer) {
441447
case 0x64: // d
442448
return this.parsed(buffer, length)
443449
}
450+
console.log('could not parse', packet)
444451
}
445452

446453
Connection.prototype.parseR = function (buffer, length) {

0 commit comments

Comments
 (0)