Skip to content

Commit 58fb8df

Browse files
authored
Fix(publish): call callback when messageId available (mqttjs#1393)
Process is enqueued if messageId allocation is failure.
1 parent ee75c32 commit 58fb8df

File tree

1 file changed

+3
-9
lines changed

1 file changed

+3
-9
lines changed

lib/client.js

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -660,16 +660,14 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
660660
return true
661661
}
662662

663-
if (this._storeProcessing || this._storeProcessingQueue.length > 0) {
663+
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) {
664664
this._storeProcessingQueue.push(
665665
{
666666
invoke: publishProc,
667667
cbStorePut: opts.cbStorePut,
668668
callback: callback
669669
}
670670
)
671-
} else {
672-
publishProc()
673671
}
674672
return this
675673
}
@@ -842,15 +840,13 @@ MqttClient.prototype.subscribe = function () {
842840
return true
843841
}
844842

845-
if (this._storeProcessing || this._storeProcessingQueue.length > 0) {
843+
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) {
846844
this._storeProcessingQueue.push(
847845
{
848846
invoke: subscribeProc,
849847
callback: callback
850848
}
851849
)
852-
} else {
853-
subscribeProc()
854850
}
855851

856852
return this
@@ -935,15 +931,13 @@ MqttClient.prototype.unsubscribe = function () {
935931
return true
936932
}
937933

938-
if (this._storeProcessing || this._storeProcessingQueue.length > 0) {
934+
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) {
939935
this._storeProcessingQueue.push(
940936
{
941937
invoke: unsubscribeProc,
942938
callback: callback
943939
}
944940
)
945-
} else {
946-
unsubscribeProc()
947941
}
948942

949943
return this

0 commit comments

Comments
 (0)