Skip to content

Commit 9a3ff89

Browse files
authored
Merge pull request mqttjs#859 from mqttjs/fix-use-of-readable
Proper use of `'readable'` event.
2 parents 797f9ea + 59fd16e commit 9a3ff89

File tree

3 files changed

+95
-26
lines changed

3 files changed

+95
-26
lines changed

lib/client.js

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -136,38 +136,56 @@ function MqttClient (streamBuilder, options) {
136136
}
137137

138138
this.connected = true
139-
var outStore = null
140-
outStore = this.outgoingStore.createStream()
139+
var outStore = this.outgoingStore.createStream()
141140

142-
// Control of stored messages
143-
outStore.once('readable', function () {
144-
function storeDeliver () {
145-
var packet = outStore.read(1)
146-
var cb
141+
this.once('close', remove)
142+
outStore.on('end', function () {
143+
that.removeListener('close', remove)
144+
})
145+
outStore.on('error', function (err) {
146+
that.removeListener('close', remove)
147+
that.emit('error', err)
148+
})
147149

148-
if (!packet) {
149-
return
150-
}
150+
function remove () {
151+
outStore.destroy()
152+
outStore = null
153+
}
154+
155+
function storeDeliver () {
156+
// edge case, we wrapped this twice
157+
if (!outStore) {
158+
return
159+
}
151160

152-
// Avoid unnecessary stream read operations when disconnected
153-
if (!that.disconnecting && !that.reconnectTimer) {
154-
outStore.read(0)
155-
cb = that.outgoing[packet.messageId]
156-
that.outgoing[packet.messageId] = function (err, status) {
157-
// Ensure that the original callback passed in to publish gets invoked
158-
if (cb) {
159-
cb(err, status)
160-
}
161-
162-
storeDeliver()
161+
var packet = outStore.read(1)
162+
var cb
163+
164+
if (!packet) {
165+
// read when data is available in the future
166+
outStore.once('readable', storeDeliver)
167+
return
168+
}
169+
170+
// Avoid unnecessary stream read operations when disconnected
171+
if (!that.disconnecting && !that.reconnectTimer) {
172+
cb = that.outgoing[packet.messageId]
173+
that.outgoing[packet.messageId] = function (err, status) {
174+
// Ensure that the original callback passed in to publish gets invoked
175+
if (cb) {
176+
cb(err, status)
163177
}
164-
that._sendPacket(packet)
165-
} else if (outStore.destroy) {
166-
outStore.destroy()
178+
179+
storeDeliver()
167180
}
181+
that._sendPacket(packet)
182+
} else if (outStore.destroy) {
183+
outStore.destroy()
168184
}
169-
storeDeliver()
170-
}).on('error', this.emit.bind(this, 'error'))
185+
}
186+
187+
// start flowing
188+
storeDeliver()
171189
})
172190

173191
// Mark disconnected on stream close

test/abstract_client.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,6 +1911,31 @@ module.exports = function (server, config) {
19111911
}
19121912
})
19131913

1914+
it('should not resend in-flight publish messages if disconnecting', function (done) {
1915+
var client = connect({reconnectPeriod: 200})
1916+
var serverPublished = false
1917+
var clientCalledBack = false
1918+
server.once('client', function (serverClient) {
1919+
serverClient.on('connect', function () {
1920+
setImmediate(function () {
1921+
serverClient.stream.destroy()
1922+
client.end()
1923+
serverPublished.should.be.false()
1924+
clientCalledBack.should.be.false()
1925+
done()
1926+
})
1927+
})
1928+
server.once('client', function (serverClientNew) {
1929+
serverClientNew.on('publish', function () {
1930+
serverPublished = true
1931+
})
1932+
})
1933+
})
1934+
client.publish('hello', 'world', { qos: 1 }, function () {
1935+
clientCalledBack = true
1936+
})
1937+
})
1938+
19141939
it('should resend in-flight QoS 2 publish messages from the client', function (done) {
19151940
var client = connect({reconnectPeriod: 200})
19161941
var serverPublished = false

test/abstract_store.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) {
130130
})
131131
})
132132
})
133+
134+
it('should replace a packet when doing put with the same messageId', function (done) {
135+
var packet1 = {
136+
cmd: 'publish', // added
137+
topic: 'hello',
138+
payload: 'world',
139+
qos: 2,
140+
messageId: 42
141+
}
142+
var packet2 = {
143+
cmd: 'pubrel', // added
144+
qos: 2,
145+
messageId: 42
146+
}
147+
148+
store.put(packet1, function () {
149+
store.put(packet2, function () {
150+
store
151+
.createStream()
152+
.on('data', function (data) {
153+
data.should.eql(packet2)
154+
done()
155+
})
156+
})
157+
})
158+
})
133159
}

0 commit comments

Comments
 (0)