Skip to content

Commit adff840

Browse files
authored
Merge pull request mqttjs#864 from redboltz/fix_store_pub_after_proc
Added error handling to imcomingStore.put callback in _handlePublish.
2 parents 0c9cd0d + d7391a9 commit adff840

File tree

2 files changed

+124
-6
lines changed

2 files changed

+124
-6
lines changed

lib/client.js

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,7 @@ default:
954954
for now i just suppressed the warnings
955955
*/
956956
MqttClient.prototype._handlePublish = function (packet, done) {
957+
done = typeof done !== 'undefined' ? done : nop
957958
var topic = packet.topic.toString()
958959
var message = packet.payload
959960
var qos = packet.qos
@@ -962,7 +963,10 @@ MqttClient.prototype._handlePublish = function (packet, done) {
962963

963964
switch (qos) {
964965
case 2:
965-
this.incomingStore.put(packet, function () {
966+
this.incomingStore.put(packet, function (err) {
967+
if (err) {
968+
return done(err)
969+
}
966970
that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
967971
})
968972
break
@@ -971,7 +975,7 @@ MqttClient.prototype._handlePublish = function (packet, done) {
971975
this.emit('message', topic, message, packet)
972976
this.handleMessage(packet, function (err) {
973977
if (err) {
974-
return done && done(err)
978+
return done(err)
975979
}
976980
// send 'puback' if the above 'handleMessage' method executed
977981
// successfully.
@@ -1074,6 +1078,7 @@ MqttClient.prototype._handleAck = function (packet) {
10741078
* @api private
10751079
*/
10761080
MqttClient.prototype._handlePubrel = function (packet, callback) {
1081+
callback = typeof callback !== 'undefined' ? callback : nop
10771082
var mid = packet.messageId
10781083
var that = this
10791084

@@ -1082,12 +1087,16 @@ MqttClient.prototype._handlePubrel = function (packet, callback) {
10821087
that.incomingStore.get(packet, function (err, pub) {
10831088
if (!err && pub.cmd !== 'pubrel') {
10841089
that.emit('message', pub.topic, pub.payload, pub)
1085-
that.incomingStore.put(packet)
1086-
that.handleMessage(pub, function (err) {
1090+
that.incomingStore.put(packet, function (err) {
10871091
if (err) {
1088-
return callback && callback(err)
1092+
return callback(err)
10891093
}
1090-
that._sendPacket(comp, callback)
1094+
that.handleMessage(pub, function (err) {
1095+
if (err) {
1096+
return callback(err)
1097+
}
1098+
that._sendPacket(comp, callback)
1099+
})
10911100
})
10921101
} else {
10931102
that._sendPacket(comp, callback)

test/abstract_client.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,115 @@ module.exports = function (server, config) {
973973
}
974974
})
975975

976+
it('should handle error with async incoming store in QoS 2 `handlePublish` method', function (done) {
977+
function AsyncStore () {
978+
if (!(this instanceof AsyncStore)) {
979+
return new AsyncStore()
980+
}
981+
}
982+
AsyncStore.prototype.put = function (packet, cb) {
983+
process.nextTick(function () {
984+
cb(new Error('Error'))
985+
})
986+
}
987+
var store = new AsyncStore()
988+
var client = connect({incomingStore: store})
989+
990+
client._handlePublish({
991+
messageId: 1,
992+
topic: 'test',
993+
payload: 'test',
994+
qos: 2
995+
}, function () {
996+
done()
997+
client.end()
998+
})
999+
})
1000+
1001+
it('should handle error with async incoming store in QoS 2 `handlePubrel` method', function (done) {
1002+
function AsyncStore () {
1003+
if (!(this instanceof AsyncStore)) {
1004+
return new AsyncStore()
1005+
}
1006+
}
1007+
AsyncStore.prototype.put = function (packet, cb) {
1008+
process.nextTick(function () {
1009+
cb(new Error('Error'))
1010+
})
1011+
}
1012+
AsyncStore.prototype.get = function (packet, cb) {
1013+
process.nextTick(function () {
1014+
cb(null, {cmd: 'publish'})
1015+
})
1016+
}
1017+
var store = new AsyncStore()
1018+
var client = connect({incomingStore: store})
1019+
1020+
client._handlePubrel({
1021+
messageId: 1,
1022+
qos: 2
1023+
}, function () {
1024+
done()
1025+
client.end()
1026+
})
1027+
})
1028+
1029+
it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
1030+
var putComplete = false
1031+
function AsyncStore () {
1032+
if (!(this instanceof AsyncStore)) {
1033+
return new AsyncStore()
1034+
}
1035+
}
1036+
AsyncStore.prototype.put = function (packet, cb) {
1037+
process.nextTick(function () {
1038+
putComplete = true
1039+
cb(null)
1040+
})
1041+
}
1042+
AsyncStore.prototype.get = function (packet, cb) {
1043+
process.nextTick(function () {
1044+
cb(null, {cmd: 'publish'})
1045+
})
1046+
}
1047+
var store = new AsyncStore()
1048+
var client = connect({incomingStore: store})
1049+
1050+
client._handlePubrel({
1051+
messageId: 1,
1052+
qos: 2
1053+
}, function () {
1054+
putComplete.should.equal(true)
1055+
done()
1056+
client.end()
1057+
})
1058+
})
1059+
1060+
it('should handle error with async incoming store in QoS 1 `handlePublish` method', function (done) {
1061+
function AsyncStore () {
1062+
if (!(this instanceof AsyncStore)) {
1063+
return new AsyncStore()
1064+
}
1065+
}
1066+
AsyncStore.prototype.put = function (packet, cb) {
1067+
process.nextTick(function () {
1068+
cb(null, 'Error')
1069+
})
1070+
}
1071+
var store = new AsyncStore()
1072+
var client = connect({incomingStore: store})
1073+
1074+
client._handlePublish({
1075+
messageId: 1,
1076+
topic: 'test',
1077+
payload: 'test',
1078+
qos: 1
1079+
}, function () {
1080+
done()
1081+
client.end()
1082+
})
1083+
})
1084+
9761085
it('should not send a `pubcomp` if the execution of `handleMessage` fails for messages with QoS `2`', function (done) {
9771086
var store = new Store()
9781087
var client = connect({incomingStore: store})

0 commit comments

Comments
 (0)