Skip to content

Commit 3ff91ea

Browse files
authored
Decouple serializing messages w/ writing them to socket (brianc#2155)
* Move message writing to typescript lib * Write more tests, cleanup code to some extent * Rename package to something more representing its name * Remove unused code * Small tweaks based on microbenchmarks * Rename w/o underscore
1 parent 2013d77 commit 3ff91ea

17 files changed

+685
-188
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
],
1212
"scripts": {
1313
"test": "yarn lerna exec yarn test",
14-
"build": "yarn lerna exec --scope pg-packet-stream yarn build",
14+
"build": "yarn lerna exec --scope pg-protocol yarn build",
1515
"pretest": "yarn build",
1616
"lint": "yarn lerna exec --parallel yarn lint"
1717
},

packages/pg-packet-stream/package.json renamed to packages/pg-protocol/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
2-
"name": "pg-packet-stream",
2+
"name": "pg-protocol",
33
"version": "1.1.0",
4+
"description": "The postgres client/server binary protocol, implemented in TypeScript",
45
"main": "dist/index.js",
56
"types": "dist/index.d.ts",
67
"license": "MIT",

packages/pg-protocol/src/b.ts

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// file for microbenchmarking
2+
3+
import { Writer } from './buffer-writer'
4+
import { serialize } from './index'
5+
6+
const LOOPS = 1000
7+
let count = 0
8+
let start = Date.now()
9+
const writer = new Writer()
10+
11+
const run = () => {
12+
if (count > LOOPS) {
13+
console.log(Date.now() - start)
14+
return;
15+
}
16+
count++
17+
for(let i = 0; i < LOOPS; i++) {
18+
serialize.describe({ type: 'P'})
19+
serialize.describe({ type: 'S'})
20+
}
21+
setImmediate(run)
22+
}
23+
24+
run()

packages/pg-packet-stream/src/BufferReader.ts renamed to packages/pg-protocol/src/buffer-reader.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ const emptyBuffer = Buffer.allocUnsafe(0);
22

33
export class BufferReader {
44
private buffer: Buffer = emptyBuffer;
5-
// TODO(bmc): support non-utf8 encoding
5+
6+
// TODO(bmc): support non-utf8 encoding?
67
private encoding: string = 'utf-8';
8+
79
constructor(private offset: number = 0) {
810
}
911
public setBuffer(offset: number, buffer: Buffer): void {
+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
//binary data writer tuned for encoding binary specific to the postgres binary protocol
2+
3+
export class Writer {
4+
private buffer: Buffer;
5+
private offset: number = 5;
6+
private headerPosition: number = 0;
7+
constructor(private size = 256) {
8+
this.buffer = Buffer.alloc(size)
9+
}
10+
11+
private ensure(size: number): void {
12+
var remaining = this.buffer.length - this.offset;
13+
if (remaining < size) {
14+
var oldBuffer = this.buffer;
15+
// exponential growth factor of around ~ 1.5
16+
// https://stackoverflow.com/questions/2269063/buffer-growth-strategy
17+
var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size;
18+
this.buffer = Buffer.alloc(newSize);
19+
oldBuffer.copy(this.buffer);
20+
}
21+
}
22+
23+
public addInt32(num: number): Writer {
24+
this.ensure(4);
25+
this.buffer[this.offset++] = (num >>> 24 & 0xFF);
26+
this.buffer[this.offset++] = (num >>> 16 & 0xFF);
27+
this.buffer[this.offset++] = (num >>> 8 & 0xFF);
28+
this.buffer[this.offset++] = (num >>> 0 & 0xFF);
29+
return this;
30+
}
31+
32+
public addInt16(num: number): Writer {
33+
this.ensure(2);
34+
this.buffer[this.offset++] = (num >>> 8 & 0xFF);
35+
this.buffer[this.offset++] = (num >>> 0 & 0xFF);
36+
return this;
37+
}
38+
39+
40+
public addCString(string: string): Writer {
41+
if (!string) {
42+
this.ensure(1);
43+
} else {
44+
var len = Buffer.byteLength(string);
45+
this.ensure(len + 1); // +1 for null terminator
46+
this.buffer.write(string, this.offset, 'utf-8')
47+
this.offset += len;
48+
}
49+
50+
this.buffer[this.offset++] = 0; // null terminator
51+
return this;
52+
}
53+
54+
public addString(string: string = ""): Writer {
55+
var len = Buffer.byteLength(string);
56+
this.ensure(len);
57+
this.buffer.write(string, this.offset);
58+
this.offset += len;
59+
return this;
60+
}
61+
62+
public add(otherBuffer: Buffer): Writer {
63+
this.ensure(otherBuffer.length);
64+
otherBuffer.copy(this.buffer, this.offset);
65+
this.offset += otherBuffer.length;
66+
return this;
67+
}
68+
69+
private join(code?: number): Buffer {
70+
if (code) {
71+
this.buffer[this.headerPosition] = code;
72+
//length is everything in this packet minus the code
73+
const length = this.offset - (this.headerPosition + 1)
74+
this.buffer.writeInt32BE(length, this.headerPosition + 1)
75+
}
76+
return this.buffer.slice(code ? 0 : 5, this.offset);
77+
}
78+
79+
public flush(code?: number): Buffer {
80+
var result = this.join(code);
81+
this.offset = 5;
82+
this.headerPosition = 0;
83+
this.buffer = Buffer.allocUnsafe(this.size)
84+
return result;
85+
}
86+
}
87+

packages/pg-packet-stream/src/inbound-parser.test.ts renamed to packages/pg-protocol/src/inbound-parser.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import buffers from './testing/test-buffers'
22
import BufferList from './testing/buffer-list'
3-
import { parse } from './'
3+
import { parse } from '.'
44
import assert from 'assert'
55
import { PassThrough } from 'stream'
66
import { BackendMessage } from './messages'

packages/pg-protocol/src/index.ts

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { BackendMessage } from './messages';
2+
import { serialize } from './serializer';
3+
import { Parser, MessageCallback } from './parser'
4+
5+
export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
6+
const parser = new Parser()
7+
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
8+
return new Promise((resolve) => stream.on('end', () => resolve()))
9+
}
10+
11+
export { serialize };

0 commit comments

Comments
 (0)