Skip to content

Commit 20bec59

Browse files
author
Adam Rudd
committed
Queue packets if the client is disconnected
1 parent 0da49b1 commit 20bec59

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

lib/client.js

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ function MqttClient(stream, options) {
6060
this.pingTimer = null;
6161
// Is the client connected?
6262
this.connected = false;
63+
// Packet queue
64+
this.queue = [];
6365

6466
// Inflight messages
6567
this.inflight = {
@@ -88,9 +90,21 @@ function MqttClient(stream, options) {
8890
// Setup ping timer
8991
this.on('connect', this._setupPingTimer);
9092

91-
// Mark the client as connected
93+
// Send queued packets
9294
this.on('connect', function() {
9395
that.connected = true;
96+
97+
var queue = that.queue
98+
, length = queue.length;
99+
100+
for (var i = 0; i < length; i += 1) {
101+
that._sendPacket(
102+
queue[i].type,
103+
queue[i].packet,
104+
queue[i].cb
105+
);
106+
}
107+
that.queue = [];
94108
});
95109

96110
// Handle incoming publish
@@ -298,12 +312,20 @@ MqttClient.prototype.end = function() {
298312
}
299313
};
300314

315+
MqttClient.prototype._sendPacket = function(type, packet, cb) {
316+
if (this.connected) {
317+
this.conn[type](packet);
318+
if (cb) cb.call(this);
319+
} else {
320+
this.queue.push({type: type, packet: packet, cb: cb});
321+
}
322+
};
323+
301324
/**
302325
* _setupPingTimer
303326
*
304327
* @api private
305328
*/
306-
307329
MqttClient.prototype._setupPingTimer = function() {
308330
// No ping
309331
if (this.options.keepalive === 0) {

test/client.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,23 @@ describe('MqttClient', function () {
211211
});
212212

213213
describe('publishing', function() {
214-
it('should publish a message', function (done) {
214+
it('should queue message until connected', function(done) {
215+
var client = createClient(port);
216+
217+
client.publish('test', 'test');
218+
client.queue.length.should.equal(1);
219+
220+
client.once('connect', function() {
221+
client.queue.length.should.equal(0);
222+
done();
223+
});
224+
225+
this.server.once('client', function(client) {
226+
client.once('connect', function(packet) {
227+
client.connack({returnCode: 0});
228+
});
229+
});
230+
});
215231
var client = createClient(port)
216232
, payload = 'test'
217233
, topic = 'test';

0 commit comments

Comments
 (0)