Skip to content

Commit aa5647d

Browse files
committed
Refined null callback handling.
Added unit test.
1 parent a1af37a commit aa5647d

File tree

2 files changed

+115
-6
lines changed

2 files changed

+115
-6
lines changed

lib/client.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,7 @@ default:
953953
954954
for now i just suppressed the warnings
955955
*/
956-
MqttClient.prototype._handlePublish = function (packet, done) {
956+
MqttClient.prototype._handlePublish = function (packet, done = function () {}) {
957957
var topic = packet.topic.toString()
958958
var message = packet.payload
959959
var qos = packet.qos
@@ -964,7 +964,7 @@ MqttClient.prototype._handlePublish = function (packet, done) {
964964
case 2:
965965
this.incomingStore.put(packet, function (err) {
966966
if (err) {
967-
return done && done(err)
967+
return done(err)
968968
}
969969
that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
970970
})
@@ -974,7 +974,7 @@ MqttClient.prototype._handlePublish = function (packet, done) {
974974
this.emit('message', topic, message, packet)
975975
this.handleMessage(packet, function (err) {
976976
if (err) {
977-
return done && done(err)
977+
return done(err)
978978
}
979979
// send 'puback' if the above 'handleMessage' method executed
980980
// successfully.
@@ -1076,7 +1076,7 @@ MqttClient.prototype._handleAck = function (packet) {
10761076
* @param {Object} packet
10771077
* @api private
10781078
*/
1079-
MqttClient.prototype._handlePubrel = function (packet, callback) {
1079+
MqttClient.prototype._handlePubrel = function (packet, callback = function () {}) {
10801080
var mid = packet.messageId
10811081
var that = this
10821082

@@ -1087,11 +1087,11 @@ MqttClient.prototype._handlePubrel = function (packet, callback) {
10871087
that.emit('message', pub.topic, pub.payload, pub)
10881088
that.incomingStore.put(packet, function (err) {
10891089
if (err) {
1090-
return callback && callback(err)
1090+
return callback(err)
10911091
}
10921092
that.handleMessage(pub, function (err) {
10931093
if (err) {
1094-
return callback && callback(err)
1094+
return callback(err)
10951095
}
10961096
that._sendPacket(comp, callback)
10971097
})

test/abstract_client.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,115 @@ module.exports = function (server, config) {
973973
}
974974
})
975975

976+
it('should handle error with async incoming store in QoS 2 `handlePublish` method', function (done) {
977+
function AsyncStore () {
978+
if (!(this instanceof AsyncStore)) {
979+
return new AsyncStore()
980+
}
981+
}
982+
AsyncStore.prototype.put = function (packet, cb) {
983+
setTimeout(function () {
984+
cb(new Error('Error'))
985+
}, 200)
986+
}
987+
var store = new AsyncStore()
988+
var client = connect({incomingStore: store})
989+
990+
client._handlePublish({
991+
messageId: 1,
992+
topic: 'test',
993+
payload: 'test',
994+
qos: 2
995+
}, function () {
996+
done()
997+
client.end()
998+
})
999+
})
1000+
1001+
it('should handle error with async incoming store in QoS 2 `handlePubrel` method', function (done) {
1002+
function AsyncStore () {
1003+
if (!(this instanceof AsyncStore)) {
1004+
return new AsyncStore()
1005+
}
1006+
}
1007+
AsyncStore.prototype.put = function (packet, cb) {
1008+
setTimeout(function () {
1009+
cb(new Error('Error'))
1010+
}, 200)
1011+
}
1012+
AsyncStore.prototype.get = function (packet, cb) {
1013+
setTimeout(function () {
1014+
cb(null, {cmd: 'publish'})
1015+
}, 200)
1016+
}
1017+
var store = new AsyncStore()
1018+
var client = connect({incomingStore: store})
1019+
1020+
client._handlePubrel({
1021+
messageId: 1,
1022+
qos: 2
1023+
}, function () {
1024+
done()
1025+
client.end()
1026+
})
1027+
})
1028+
1029+
it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
1030+
var putComplete = false
1031+
function AsyncStore () {
1032+
if (!(this instanceof AsyncStore)) {
1033+
return new AsyncStore()
1034+
}
1035+
}
1036+
AsyncStore.prototype.put = function (packet, cb) {
1037+
setTimeout(function () {
1038+
putComplete = true
1039+
cb(null)
1040+
}, 200)
1041+
}
1042+
AsyncStore.prototype.get = function (packet, cb) {
1043+
setTimeout(function () {
1044+
cb(null, {cmd: 'publish'})
1045+
}, 200)
1046+
}
1047+
var store = new AsyncStore()
1048+
var client = connect({incomingStore: store})
1049+
1050+
client._handlePubrel({
1051+
messageId: 1,
1052+
qos: 2
1053+
}, function () {
1054+
putComplete.should.equal(true)
1055+
done()
1056+
client.end()
1057+
})
1058+
})
1059+
1060+
it('should handle error with async incoming store in QoS 1 `handlePublish` method', function (done) {
1061+
function AsyncStore () {
1062+
if (!(this instanceof AsyncStore)) {
1063+
return new AsyncStore()
1064+
}
1065+
}
1066+
AsyncStore.prototype.put = function (packet, cb) {
1067+
setTimeout(function () {
1068+
cb(null, 'Error')
1069+
}, 200)
1070+
}
1071+
var store = new AsyncStore()
1072+
var client = connect({incomingStore: store})
1073+
1074+
client._handlePublish({
1075+
messageId: 1,
1076+
topic: 'test',
1077+
payload: 'test',
1078+
qos: 1
1079+
}, function () {
1080+
done()
1081+
client.end()
1082+
})
1083+
})
1084+
9761085
it('should not send a `pubcomp` if the execution of `handleMessage` fails for messages with QoS `2`', function (done) {
9771086
var store = new Store()
9781087
var client = connect({incomingStore: store})

0 commit comments

Comments
 (0)