@@ -3,6 +3,8 @@ var crypto = require('crypto');
3
3
var EventEmitter = require ( 'events' ) . EventEmitter ;
4
4
var util = require ( 'util' ) ;
5
5
6
+ var Readable = require ( 'stream' ) . Readable ;
7
+
6
8
var utils = require ( __dirname + '/utils' ) ;
7
9
var Writer = require ( 'buffer-writer' ) ;
8
10
@@ -37,15 +39,6 @@ Connection.prototype.connect = function(port, host) {
37
39
self . emit ( 'connect' ) ;
38
40
} ) ;
39
41
40
- this . stream . on ( 'error' , function ( error ) {
41
- //don't raise ECONNRESET errors - they can & should be ignored
42
- //during disconnect
43
- if ( self . _ending && error . code == 'ECONNRESET' ) {
44
- return ;
45
- }
46
- self . emit ( 'error' , error ) ;
47
- } ) ;
48
-
49
42
this . stream . on ( 'end' , function ( ) {
50
43
self . emit ( 'end' ) ;
51
44
} ) ;
@@ -89,15 +82,27 @@ Connection.prototype.connect = function(port, host) {
89
82
90
83
Connection . prototype . attachListeners = function ( stream ) {
91
84
var self = this ;
92
- stream . on ( 'data' , function ( buffer ) {
93
- self . setBuffer ( buffer ) ;
85
+ var reader = new Readable ( ) ;
86
+ reader . wrap ( stream ) ;
87
+ reader . on ( 'readable' , function ( ) {
88
+ self . setBuffer ( reader . read ( ) ) ;
94
89
var msg = self . parseMessage ( ) ;
95
90
while ( msg ) {
96
91
self . emit ( 'message' , msg ) ;
97
92
self . emit ( msg . name , msg ) ;
98
93
msg = self . parseMessage ( ) ;
99
94
}
100
95
} ) ;
96
+ reader . on ( 'error' , function ( error ) {
97
+ //don't raise ECONNRESET errors - they can & should be ignored
98
+ //during disconnect
99
+ if ( self . _ending && error . code == 'ECONNRESET' ) {
100
+ return ;
101
+ }
102
+ self . emit ( 'error' , error ) ;
103
+ } ) ;
104
+
105
+ reader . read ( 0 ) ;
101
106
} ;
102
107
103
108
Connection . prototype . requestSsl = function ( config ) {
0 commit comments