Skip to content

Commit 00c3f26

Browse files
committed
initial port to streams2
1 parent 4e822a1 commit 00c3f26

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

lib/connection.js

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ var crypto = require('crypto');
33
var EventEmitter = require('events').EventEmitter;
44
var util = require('util');
55

6+
var Readable = require('stream').Readable;
7+
68
var utils = require(__dirname + '/utils');
79
var Writer = require('buffer-writer');
810

@@ -37,15 +39,6 @@ Connection.prototype.connect = function(port, host) {
3739
self.emit('connect');
3840
});
3941

40-
this.stream.on('error', function(error) {
41-
//don't raise ECONNRESET errors - they can & should be ignored
42-
//during disconnect
43-
if(self._ending && error.code == 'ECONNRESET') {
44-
return;
45-
}
46-
self.emit('error', error);
47-
});
48-
4942
this.stream.on('end', function() {
5043
self.emit('end');
5144
});
@@ -89,15 +82,27 @@ Connection.prototype.connect = function(port, host) {
8982

9083
Connection.prototype.attachListeners = function(stream) {
9184
var self = this;
92-
stream.on('data', function(buffer) {
93-
self.setBuffer(buffer);
85+
var reader = new Readable();
86+
reader.wrap(stream);
87+
reader.on('readable', function() {
88+
self.setBuffer(reader.read());
9489
var msg = self.parseMessage();
9590
while(msg) {
9691
self.emit('message', msg);
9792
self.emit(msg.name, msg);
9893
msg = self.parseMessage();
9994
}
10095
});
96+
reader.on('error', function(error) {
97+
//don't raise ECONNRESET errors - they can & should be ignored
98+
//during disconnect
99+
if(self._ending && error.code == 'ECONNRESET') {
100+
return;
101+
}
102+
self.emit('error', error);
103+
});
104+
105+
reader.read(0);
101106
};
102107

103108
Connection.prototype.requestSsl = function(config) {

0 commit comments

Comments
 (0)