Skip to content

Commit 59fd16e

Browse files
committed
destroy the ongoing stream in case of a disconnect
1 parent f41ea10 commit 59fd16e

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

lib/client.js

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,28 @@ function MqttClient (streamBuilder, options) {
136136
}
137137

138138
this.connected = true
139-
var outStore = null
140-
outStore = this.outgoingStore.createStream()
139+
var outStore = this.outgoingStore.createStream()
140+
141+
this.once('close', remove)
142+
outStore.on('end', function () {
143+
that.removeListener('close', remove)
144+
})
145+
outStore.on('error', function (err) {
146+
that.removeListener('close', remove)
147+
that.emit('error', err)
148+
})
149+
150+
function remove () {
151+
outStore.destroy()
152+
outStore = null
153+
}
141154

142155
function storeDeliver () {
156+
// edge case, we wrapped this twice
157+
if (!outStore) {
158+
return
159+
}
160+
143161
var packet = outStore.read(1)
144162
var cb
145163

@@ -166,9 +184,6 @@ function MqttClient (streamBuilder, options) {
166184
}
167185
}
168186

169-
// Control of stored messages
170-
outStore.on('error', this.emit.bind(this, 'error'))
171-
172187
// start flowing
173188
storeDeliver()
174189
})

test/abstract_store.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) {
130130
})
131131
})
132132
})
133+
134+
it('should replace a packet when doing put with the same messageId', function (done) {
135+
var packet1 = {
136+
cmd: 'publish', // added
137+
topic: 'hello',
138+
payload: 'world',
139+
qos: 2,
140+
messageId: 42
141+
}
142+
var packet2 = {
143+
cmd: 'pubrel', // added
144+
qos: 2,
145+
messageId: 42
146+
}
147+
148+
store.put(packet1, function () {
149+
store.put(packet2, function () {
150+
store
151+
.createStream()
152+
.on('data', function (data) {
153+
data.should.eql(packet2)
154+
done()
155+
})
156+
})
157+
})
158+
})
133159
}

0 commit comments

Comments
 (0)