Skip to content

Commit 2013d77

Browse files
authored
Parser speed improvements (brianc#2151)
* Change from transform stream * Yeah a thing * Make tests pass, add new code to travis * Update 'best' benchmarks and include tsc in pretest script * Need to add build early so we can create test tables * logging
1 parent 90c6d13 commit 2013d77

File tree

8 files changed

+114
-77
lines changed

8 files changed

+114
-77
lines changed

.travis.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ language: node_js
22
dist: bionic
33

44
before_script: |
5+
yarn build
56
node packages/pg/script/create-test-tables.js postgresql:///
67
78
env:
89
- CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres
10+
# test w/ new faster parsing code
11+
- CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres PG_FAST_CONNECTION=true
912

1013
node_js:
1114
- lts/dubnium
@@ -30,6 +33,7 @@ matrix:
3033
-e '/^host/ s/trust$/md5/' \
3134
/etc/postgresql/10/main/pg_hba.conf
3235
sudo -u postgres psql -c "ALTER ROLE postgres PASSWORD 'test-password'; SELECT pg_reload_conf()"
36+
yarn build
3337
node packages/pg/script/create-test-tables.js postgresql:///
3438
3539
- node_js: lts/carbon

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
],
1212
"scripts": {
1313
"test": "yarn lerna exec yarn test",
14+
"build": "yarn lerna exec --scope pg-packet-stream yarn build",
15+
"pretest": "yarn build",
1416
"lint": "yarn lerna exec --parallel yarn lint"
1517
},
1618
"devDependencies": {

packages/pg-packet-stream/package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
},
1717
"scripts": {
1818
"test": "mocha dist/**/*.test.js",
19-
"prepublish": "tsc",
20-
"pretest": "tsc"
19+
"build": "tsc",
20+
"build:watch": "tsc --watch",
21+
"prepublish": "yarn build",
22+
"pretest": "yarn build"
2123
}
2224
}

packages/pg-packet-stream/src/inbound-parser.test.ts

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import buffers from './testing/test-buffers'
22
import BufferList from './testing/buffer-list'
3-
import { PgPacketStream } from './'
3+
import { parse } from './'
44
import assert from 'assert'
5-
import { Readable } from 'stream'
5+
import { PassThrough } from 'stream'
6+
import { BackendMessage } from './messages'
67

78
var authOkBuffer = buffers.authenticationOk()
89
var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8')
@@ -137,25 +138,14 @@ var expectedTwoRowMessage = {
137138
}]
138139
}
139140

140-
const concat = (stream: Readable): Promise<any[]> => {
141-
return new Promise((resolve) => {
142-
const results: any[] = []
143-
stream.on('data', item => results.push(item))
144-
stream.on('end', () => resolve(results))
145-
})
146-
}
147-
148141
var testForMessage = function (buffer: Buffer, expectedMessage: any) {
149142
it('recieves and parses ' + expectedMessage.name, async () => {
150-
const parser = new PgPacketStream();
151-
parser.write(buffer);
152-
parser.end();
153-
const [lastMessage] = await concat(parser);
143+
const messages = await parseBuffers([buffer])
144+
const [lastMessage] = messages;
154145

155146
for (const key in expectedMessage) {
156-
assert.deepEqual(lastMessage[key], expectedMessage[key])
147+
assert.deepEqual((lastMessage as any)[key], expectedMessage[key])
157148
}
158-
159149
})
160150
}
161151

@@ -197,6 +187,19 @@ var expectedNotificationResponseMessage = {
197187
payload: 'boom'
198188
}
199189

190+
191+
192+
const parseBuffers = async (buffers: Buffer[]): Promise<BackendMessage[]> => {
193+
const stream = new PassThrough();
194+
for (const buffer of buffers) {
195+
stream.write(buffer);
196+
}
197+
stream.end()
198+
const msgs: BackendMessage[] = []
199+
await parse(stream, (msg) => msgs.push(msg))
200+
return msgs
201+
}
202+
200203
describe('PgPacketStream', function () {
201204
testForMessage(authOkBuffer, expectedAuthenticationOkayMessage)
202205
testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage)
@@ -391,18 +394,9 @@ describe('PgPacketStream', function () {
391394
describe('split buffer, single message parsing', function () {
392395
var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!'])
393396

394-
const parse = async (buffers: Buffer[]): Promise<any> => {
395-
const parser = new PgPacketStream();
396-
for (const buffer of buffers) {
397-
parser.write(buffer);
398-
}
399-
parser.end()
400-
const [msg] = await concat(parser)
401-
return msg;
402-
}
403-
404397
it('parses when full buffer comes in', async function () {
405-
const message = await parse([fullBuffer]);
398+
const messages = await parseBuffers([fullBuffer]);
399+
const message = messages[0] as any
406400
assert.equal(message.fields.length, 5)
407401
assert.equal(message.fields[0], null)
408402
assert.equal(message.fields[1], 'bang')
@@ -416,7 +410,8 @@ describe('PgPacketStream', function () {
416410
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
417411
fullBuffer.copy(firstBuffer, 0, 0)
418412
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
419-
const message = await parse([firstBuffer, secondBuffer]);
413+
const messages = await parseBuffers([fullBuffer]);
414+
const message = messages[0] as any
420415
assert.equal(message.fields.length, 5)
421416
assert.equal(message.fields[0], null)
422417
assert.equal(message.fields[1], 'bang')
@@ -447,15 +442,6 @@ describe('PgPacketStream', function () {
447442
dataRowBuffer.copy(fullBuffer, 0, 0)
448443
readyForQueryBuffer.copy(fullBuffer, dataRowBuffer.length, 0)
449444

450-
const parse = (buffers: Buffer[]): Promise<any[]> => {
451-
const parser = new PgPacketStream();
452-
for (const buffer of buffers) {
453-
parser.write(buffer);
454-
}
455-
parser.end()
456-
return concat(parser)
457-
}
458-
459445
var verifyMessages = function (messages: any[]) {
460446
assert.strictEqual(messages.length, 2)
461447
assert.deepEqual(messages[0], {
@@ -473,7 +459,7 @@ describe('PgPacketStream', function () {
473459
}
474460
// sanity check
475461
it('recieves both messages when packet is not split', async function () {
476-
const messages = await parse([fullBuffer])
462+
const messages = await parseBuffers([fullBuffer])
477463
verifyMessages(messages)
478464
})
479465

@@ -482,7 +468,7 @@ describe('PgPacketStream', function () {
482468
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
483469
fullBuffer.copy(firstBuffer, 0, 0)
484470
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
485-
const messages = await parse([firstBuffer, secondBuffer])
471+
const messages = await parseBuffers([firstBuffer, secondBuffer])
486472
verifyMessages(messages)
487473
}
488474

packages/pg-packet-stream/src/index.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Transform, TransformCallback, TransformOptions } from 'stream';
2-
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password } from './messages';
1+
import { TransformOptions } from 'stream';
2+
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages';
33
import { BufferReader } from './BufferReader';
44
import assert from 'assert'
55

@@ -46,23 +46,27 @@ const enum MessageCodes {
4646
CopyData = 0x64, // d
4747
}
4848

49-
export class PgPacketStream extends Transform {
49+
type MessageCallback = (msg: BackendMessage) => void;
50+
51+
export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
52+
const parser = new PgPacketParser()
53+
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
54+
return new Promise((resolve) => stream.on('end', () => resolve()))
55+
}
56+
57+
class PgPacketParser {
5058
private remainingBuffer: Buffer = emptyBuffer;
5159
private reader = new BufferReader();
5260
private mode: Mode;
5361

5462
constructor(opts?: StreamOptions) {
55-
super({
56-
...opts,
57-
readableObjectMode: true
58-
})
5963
if (opts?.mode === 'binary') {
6064
throw new Error('Binary mode not supported yet')
6165
}
6266
this.mode = opts?.mode || 'text';
6367
}
6468

65-
public _transform(buffer: Buffer, encoding: string, callback: TransformCallback) {
69+
public parse(buffer: Buffer, callback: MessageCallback) {
6670
let combinedBuffer = buffer;
6771
if (this.remainingBuffer.byteLength) {
6872
combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength);
@@ -81,7 +85,7 @@ export class PgPacketStream extends Transform {
8185

8286
if (fullMessageLength + offset <= combinedBuffer.byteLength) {
8387
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer);
84-
this.push(message)
88+
callback(message)
8589
offset += fullMessageLength;
8690
} else {
8791
break;
@@ -94,7 +98,6 @@ export class PgPacketStream extends Transform {
9498
this.remainingBuffer = combinedBuffer.slice(offset)
9599
}
96100

97-
callback(null);
98101
}
99102

100103
private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage {
@@ -146,10 +149,6 @@ export class PgPacketStream extends Transform {
146149
}
147150
}
148151

149-
public _flush(callback: TransformCallback) {
150-
this._transform(Buffer.alloc(0), 'utf-8', callback)
151-
}
152-
153152
private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
154153
this.reader.setBuffer(offset, bytes);
155154
const status = this.reader.string(1);
@@ -304,8 +303,9 @@ export class PgPacketStream extends Transform {
304303
fieldType = this.reader.string(1)
305304
}
306305

307-
// the msg is an Error instance
308-
var message = new DatabaseError(fields.M, length, name)
306+
const messageValue = fields.M
307+
308+
const message = name === MessageName.notice ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
309309

310310
message.severity = fields.S
311311
message.code = fields.C

packages/pg-packet-stream/src/messages.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,27 @@ export const copyDone: BackendMessage = {
7474
length: 4,
7575
}
7676

77-
export class DatabaseError extends Error {
77+
interface NoticeOrError {
78+
message: string | undefined;
79+
severity: string | undefined;
80+
code: string | undefined;
81+
detail: string | undefined;
82+
hint: string | undefined;
83+
position: string | undefined;
84+
internalPosition: string | undefined;
85+
internalQuery: string | undefined;
86+
where: string | undefined;
87+
schema: string | undefined;
88+
table: string | undefined;
89+
column: string | undefined;
90+
dataType: string | undefined;
91+
constraint: string | undefined;
92+
file: string | undefined;
93+
line: string | undefined;
94+
routine: string | undefined;
95+
}
96+
97+
export class DatabaseError extends Error implements NoticeOrError {
7898
public severity: string | undefined;
7999
public code: string | undefined;
80100
public detail: string | undefined;
@@ -167,3 +187,24 @@ export class DataRowMessage {
167187
this.fieldCount = fields.length;
168188
}
169189
}
190+
191+
export class NoticeMessage implements BackendMessage, NoticeOrError {
192+
constructor(public readonly length: number, public readonly message: string | undefined) {}
193+
public readonly name = MessageName.notice;
194+
public severity: string | undefined;
195+
public code: string | undefined;
196+
public detail: string | undefined;
197+
public hint: string | undefined;
198+
public position: string | undefined;
199+
public internalPosition: string | undefined;
200+
public internalQuery: string | undefined;
201+
public where: string | undefined;
202+
public schema: string | undefined;
203+
public table: string | undefined;
204+
public column: string | undefined;
205+
public dataType: string | undefined;
206+
public constraint: string | undefined;
207+
public file: string | undefined;
208+
public line: string | undefined;
209+
public routine: string | undefined;
210+
}

packages/pg/bench.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,14 @@ const run = async () => {
5454
queries = await bench(client, seq, seconds * 1000);
5555
console.log("sequence queries:", queries);
5656
console.log("qps", queries / seconds);
57-
console.log("on my laptop best so far seen 1192 qps")
57+
console.log("on my laptop best so far seen 1209 qps")
5858

5959
console.log('')
6060
queries = await bench(client, insert, seconds * 1000);
6161
console.log("insert queries:", queries);
6262
console.log("qps", queries / seconds);
63-
console.log("on my laptop best so far seen 5600 qps")
63+
console.log("on my laptop best so far seen 5799 qps")
64+
console.log()
6465
await client.end();
6566
await client.end();
6667
};

packages/pg/lib/connection-fast.js

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ var util = require('util')
1313

1414
var Writer = require('buffer-writer')
1515
// eslint-disable-next-line
16-
var PacketStream = require('pg-packet-stream')
16+
const { parse } = require('pg-packet-stream')
1717

1818
var TEXT_MODE = 0
1919

2020
// TODO(bmc) support binary mode here
2121
// var BINARY_MODE = 1
22-
console.log('using faster connection')
22+
console.log('***using faster connection***')
2323
var Connection = function (config) {
2424
EventEmitter.call(this)
2525
config = config || {}
@@ -84,12 +84,13 @@ Connection.prototype.connect = function (port, host) {
8484
this.stream.once('data', function (buffer) {
8585
var responseCode = buffer.toString('utf8')
8686
switch (responseCode) {
87-
case 'N': // Server does not support SSL connections
88-
return self.emit('error', new Error('The server does not support SSL connections'))
8987
case 'S': // Server supports SSL connections, continue with a secure connection
9088
break
91-
default:
92-
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
89+
case 'N': // Server does not support SSL connections
90+
self.stream.end()
91+
return self.emit('error', new Error('The server does not support SSL connections'))
92+
default: // Any other response byte, including 'E' (ErrorResponse) indicating a server error
93+
self.stream.end()
9394
return self.emit('error', new Error('There was an error establishing an SSL connection'))
9495
}
9596
var tls = require('tls')
@@ -108,19 +109,15 @@ Connection.prototype.connect = function (port, host) {
108109
}
109110

110111
Connection.prototype.attachListeners = function (stream) {
111-
var self = this
112-
const mode = this._mode === TEXT_MODE ? 'text' : 'binary'
113-
const packetStream = new PacketStream.PgPacketStream({ mode })
114-
this.stream.pipe(packetStream)
115-
packetStream.on('data', (msg) => {
112+
stream.on('end', () => {
113+
this.emit('end')
114+
})
115+
parse(stream, (msg) => {
116116
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
117-
if (self._emitMessage) {
118-
self.emit('message', msg)
117+
if (this._emitMessage) {
118+
this.emit('message', msg)
119119
}
120-
self.emit(eventName, msg)
121-
})
122-
stream.on('end', function () {
123-
self.emit('end')
120+
this.emit(eventName, msg)
124121
})
125122
}
126123

@@ -331,6 +328,10 @@ Connection.prototype.end = function () {
331328
// 0x58 = 'X'
332329
this.writer.clear()
333330
this._ending = true
331+
if (!this.stream.writable) {
332+
this.stream.end()
333+
return
334+
}
334335
return this.stream.write(END_BUFFER, () => {
335336
this.stream.end()
336337
})

0 commit comments

Comments
 (0)