Skip to content

Commit 64c78b0

Browse files
committed
fix: major performance issues with bytea performance brianc#2240
1 parent 5e0d684 commit 64c78b0

File tree

3 files changed

+55
-95
lines changed

3 files changed

+55
-95
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"packages/*"
1111
],
1212
"scripts": {
13-
"test": "yarn lint && yarn lerna exec yarn test",
13+
"test": "export PGDATABASE=data && export PGUSER=user && export PGPASSWORD=pass && yarn lint && yarn lerna exec yarn test",
1414
"build": "yarn lerna exec --scope pg-protocol yarn build",
1515
"pretest": "yarn build",
1616
"lint": "if [ -x ./node_modules/.bin/prettier ]; then eslint '*/**/*.{js,ts,tsx}'; fi;"

packages/pg-protocol/src/parser.ts

+37-91
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,10 @@ const enum MessageCodes {
7373

7474
export type MessageCallback = (msg: BackendMessage) => void
7575

76-
interface CombinedBuffer {
77-
combinedBuffer: Buffer
78-
combinedBufferOffset: number
79-
combinedBufferLength: number
80-
combinedBufferFullLength: number
81-
reuseRemainingBuffer: boolean
82-
}
83-
8476
export class Parser {
85-
private remainingBuffer: Buffer = emptyBuffer
86-
private remainingBufferLength: number = 0
87-
private remainingBufferOffset: number = 0
77+
private buffer: Buffer = emptyBuffer
78+
private bufferLength: number = 0
79+
private bufferOffset: number = 0
8880
private reader = new BufferReader()
8981
private mode: Mode
9082

@@ -96,111 +88,65 @@ export class Parser {
9688
}
9789

9890
public parse(buffer: Buffer, callback: MessageCallback) {
99-
const {
100-
combinedBuffer,
101-
combinedBufferOffset,
102-
combinedBufferLength,
103-
reuseRemainingBuffer,
104-
combinedBufferFullLength,
105-
} = this.mergeBuffer(buffer)
106-
let offset = combinedBufferOffset
107-
while (offset + HEADER_LENGTH <= combinedBufferFullLength) {
91+
this.mergeBuffer(buffer)
92+
const bufferFullLength = this.bufferOffset + this.bufferLength
93+
let offset = this.bufferOffset
94+
while (offset + HEADER_LENGTH <= bufferFullLength) {
10895
// code is 1 byte long - it identifies the message type
109-
const code = combinedBuffer[offset]
110-
96+
const code = this.buffer[offset]
11197
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
112-
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH)
113-
98+
const length = this.buffer.readUInt32BE(offset + CODE_LENGTH)
11499
const fullMessageLength = CODE_LENGTH + length
115-
116-
if (fullMessageLength + offset <= combinedBufferFullLength) {
117-
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer)
100+
if (fullMessageLength + offset <= bufferFullLength) {
101+
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer)
118102
callback(message)
119103
offset += fullMessageLength
120104
} else {
121105
break
122106
}
123107
}
124-
this.consumeBuffer({
125-
combinedBuffer,
126-
combinedBufferOffset: offset,
127-
combinedBufferLength,
128-
reuseRemainingBuffer,
129-
combinedBufferFullLength,
130-
})
108+
if (offset === bufferFullLength) {
109+
// No more use for the buffer
110+
this.buffer = emptyBuffer
111+
this.bufferLength = 0
112+
this.bufferOffset = 0
113+
} else {
114+
// Adjust the cursors of remainingBuffer
115+
this.bufferLength = bufferFullLength - offset
116+
this.bufferOffset = offset
117+
}
131118
}
132119

133-
private mergeBuffer(buffer: Buffer): CombinedBuffer {
134-
let combinedBuffer = buffer
135-
let combinedBufferLength = buffer.byteLength
136-
let combinedBufferOffset = 0
137-
let reuseRemainingBuffer = this.remainingBufferLength > 0
138-
if (reuseRemainingBuffer) {
139-
const newLength = this.remainingBufferLength + combinedBufferLength
140-
const newFullLength = newLength + this.remainingBufferOffset
141-
if (newFullLength > this.remainingBuffer.byteLength) {
120+
private mergeBuffer(buffer: Buffer): void {
121+
if (this.bufferLength > 0) {
122+
const newLength = this.bufferLength + buffer.byteLength
123+
const newFullLength = newLength + this.bufferOffset
124+
if (newFullLength > this.buffer.byteLength) {
142125
// We can't concat the new buffer with the remaining one
143126
let newBuffer: Buffer
144-
if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) {
127+
if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
145128
// We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
146-
newBuffer = this.remainingBuffer
129+
newBuffer = this.buffer
147130
} else {
148131
// Allocate a new larger buffer
149-
let newBufferLength = this.remainingBuffer.byteLength * 2
132+
let newBufferLength = this.buffer.byteLength * 2
150133
while (newLength >= newBufferLength) {
151134
newBufferLength *= 2
152135
}
153136
newBuffer = Buffer.allocUnsafe(newBufferLength)
154137
}
155138
// Move the remaining buffer to the new one
156-
this.remainingBuffer.copy(
157-
newBuffer,
158-
0,
159-
this.remainingBufferOffset,
160-
this.remainingBufferOffset + this.remainingBufferLength
161-
)
162-
this.remainingBuffer = newBuffer
163-
this.remainingBufferOffset = 0
139+
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
140+
this.buffer = newBuffer
141+
this.bufferOffset = 0
164142
}
165143
// Concat the new buffer with the remaining one
166-
buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength)
167-
combinedBuffer = this.remainingBuffer
168-
combinedBufferLength = this.remainingBufferLength = newLength
169-
combinedBufferOffset = this.remainingBufferOffset
170-
}
171-
const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength
172-
return {
173-
combinedBuffer,
174-
combinedBufferOffset,
175-
combinedBufferLength,
176-
reuseRemainingBuffer,
177-
combinedBufferFullLength,
178-
}
179-
}
180-
181-
private consumeBuffer({
182-
combinedBufferOffset,
183-
combinedBufferFullLength,
184-
reuseRemainingBuffer,
185-
combinedBuffer,
186-
combinedBufferLength,
187-
}: CombinedBuffer) {
188-
if (combinedBufferOffset === combinedBufferFullLength) {
189-
// No more use for the buffer
190-
this.remainingBuffer = emptyBuffer
191-
this.remainingBufferLength = 0
192-
this.remainingBufferOffset = 0
144+
buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
145+
this.bufferLength = newLength
193146
} else {
194-
this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset
195-
if (reuseRemainingBuffer) {
196-
// Adjust the cursors of remainingBuffer
197-
this.remainingBufferOffset = combinedBufferOffset
198-
} else {
199-
// To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer
200-
this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2)
201-
combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset)
202-
this.remainingBufferOffset = 0
203-
}
147+
this.buffer = buffer
148+
this.bufferOffset = 0
149+
this.bufferLength = buffer.byteLength
204150
}
205151
}
206152

packages/pg/bench.js

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
const pg = require('./lib')
2-
const pool = new pg.Pool()
32

43
const params = {
54
text:
@@ -17,7 +16,7 @@ const seq = {
1716
}
1817

1918
const exec = async (client, q) => {
20-
const result = await client.query({
19+
await client.query({
2120
text: q.text,
2221
values: q.values,
2322
rowMode: 'array',
@@ -40,6 +39,7 @@ const run = async () => {
4039
const client = new pg.Client()
4140
await client.connect()
4241
await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)')
42+
await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)')
4343
await bench(client, params, 1000)
4444
console.log('warmup done')
4545
const seconds = 5
@@ -61,7 +61,21 @@ const run = async () => {
6161
console.log('insert queries:', queries)
6262
console.log('qps', queries / seconds)
6363
console.log('on my laptop best so far seen 5799 qps')
64-
console.log()
64+
65+
console.log('')
66+
console.log('Warming up bytea test')
67+
await client.query({
68+
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
69+
values: ['test', Buffer.allocUnsafe(104857600)],
70+
})
71+
console.log('bytea warmup done')
72+
const start = Date.now()
73+
const results = await client.query('SELECT * FROM buf')
74+
const time = Date.now() - start
75+
console.log('bytea time:', time, 'ms')
76+
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
77+
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
78+
6579
await client.end()
6680
await client.end()
6781
}

0 commit comments

Comments
 (0)