@@ -2,6 +2,7 @@ import buffers from './testing/test-buffers'
2
2
import BufferList from './testing/buffer-list'
3
3
import { PgPacketStream } from './'
4
4
import assert from 'assert'
5
+ import { Readable } from 'stream'
5
6
6
7
var authOkBuffer = buffers . authenticationOk ( )
7
8
var paramStatusBuffer = buffers . parameterStatus ( 'client_encoding' , 'UTF8' )
@@ -136,23 +137,25 @@ var expectedTwoRowMessage = {
136
137
} ]
137
138
}
138
139
140
+ const concat = ( stream : Readable ) : Promise < any [ ] > => {
141
+ return new Promise ( ( resolve ) => {
142
+ const results : any [ ] = [ ]
143
+ stream . on ( 'data' , item => results . push ( item ) )
144
+ stream . on ( 'end' , ( ) => resolve ( results ) )
145
+ } )
146
+ }
147
+
139
148
var testForMessage = function ( buffer : Buffer , expectedMessage : any ) {
140
149
it ( 'recieves and parses ' + expectedMessage . name , async ( ) => {
141
150
const parser = new PgPacketStream ( ) ;
151
+ parser . write ( buffer ) ;
152
+ parser . end ( ) ;
153
+ const [ lastMessage ] = await concat ( parser ) ;
142
154
143
- await new Promise ( ( resolve ) => {
144
- let lastMessage : any = { }
145
- parser . on ( 'message' , function ( msg ) {
146
- lastMessage = msg
147
- } )
148
-
149
- parser . write ( buffer ) ;
155
+ for ( const key in expectedMessage ) {
156
+ assert . deepEqual ( lastMessage [ key ] , expectedMessage [ key ] )
157
+ }
150
158
151
- for ( const key in expectedMessage ) {
152
- assert . deepEqual ( lastMessage [ key ] , expectedMessage [ key ] )
153
- }
154
- resolve ( ) ;
155
- } )
156
159
} )
157
160
}
158
161
@@ -388,17 +391,14 @@ describe('PgPacketStream', function () {
388
391
describe ( 'split buffer, single message parsing' , function ( ) {
389
392
var fullBuffer = buffers . dataRow ( [ null , 'bang' , 'zug zug' , null , '!' ] )
390
393
391
- const parse = ( buffers : Buffer [ ] ) : Promise < any > => {
392
- return new Promise ( ( resolve ) => {
393
- const parser = new PgPacketStream ( ) ;
394
- parser . once ( 'message' , ( msg ) => {
395
- resolve ( msg )
396
- } )
397
- for ( const buffer of buffers ) {
398
- parser . write ( buffer ) ;
399
- }
400
- parser . end ( )
401
- } )
394
+ const parse = async ( buffers : Buffer [ ] ) : Promise < any > => {
395
+ const parser = new PgPacketStream ( ) ;
396
+ for ( const buffer of buffers ) {
397
+ parser . write ( buffer ) ;
398
+ }
399
+ parser . end ( )
400
+ const [ msg ] = await concat ( parser )
401
+ return msg ;
402
402
}
403
403
404
404
it ( 'parses when full buffer comes in' , async function ( ) {
@@ -448,20 +448,12 @@ describe('PgPacketStream', function () {
448
448
readyForQueryBuffer . copy ( fullBuffer , dataRowBuffer . length , 0 )
449
449
450
450
const parse = ( buffers : Buffer [ ] ) : Promise < any [ ] > => {
451
- return new Promise ( ( resolve ) => {
452
- const parser = new PgPacketStream ( ) ;
453
- const results : any [ ] = [ ]
454
- parser . on ( 'message' , ( msg ) => {
455
- results . push ( msg )
456
- if ( results . length === 2 ) {
457
- resolve ( results )
458
- }
459
- } )
460
- for ( const buffer of buffers ) {
461
- parser . write ( buffer ) ;
462
- }
463
- parser . end ( )
464
- } )
451
+ const parser = new PgPacketStream ( ) ;
452
+ for ( const buffer of buffers ) {
453
+ parser . write ( buffer ) ;
454
+ }
455
+ parser . end ( )
456
+ return concat ( parser )
465
457
}
466
458
467
459
var verifyMessages = function ( messages : any [ ] ) {
0 commit comments