@@ -117,6 +117,10 @@ function MqttClient (streamBuilder, options) {
117
117
118
118
// Mark connected on connect
119
119
this . on ( 'connect' , function ( ) {
120
+ if ( this . disconnected ) {
121
+ return ;
122
+ }
123
+
120
124
this . connected = true ;
121
125
var outStore = null ;
122
126
outStore = this . outgoingStore . createStream ( ) ;
@@ -512,19 +516,25 @@ MqttClient.prototype.end = function (force, cb) {
512
516
}
513
517
514
518
function closeStores ( ) {
519
+ that . disconnected = true ;
515
520
that . incomingStore . close ( function ( ) {
516
521
that . outgoingStore . close ( cb ) ;
517
522
} ) ;
518
523
}
519
524
520
525
function finish ( ) {
521
- that . _cleanUp ( force , closeStores ) ;
526
+ // defer closesStores of an I/O cycle,
527
+ // just to make sure things are
528
+ // ok for websockets
529
+ that . _cleanUp ( force , setImmediate . bind ( null , closeStores ) ) ;
522
530
}
523
531
524
532
if ( this . disconnecting ) {
525
533
return true ;
526
534
}
527
535
536
+ this . _clearReconnect ( ) ;
537
+
528
538
this . disconnecting = true ;
529
539
530
540
if ( ! force && 0 < Object . keys ( this . outgoing ) . length ) {
@@ -569,7 +579,7 @@ MqttClient.prototype._setupReconnect = function () {
569
579
MqttClient . prototype . _clearReconnect = function ( ) {
570
580
if ( this . reconnectTimer ) {
571
581
clearInterval ( this . reconnectTimer ) ;
572
- this . reconnectTimer = false ;
582
+ this . reconnectTimer = null ;
573
583
}
574
584
} ;
575
585
@@ -596,7 +606,7 @@ MqttClient.prototype._cleanUp = function (forced, done) {
596
606
) ;
597
607
}
598
608
599
- if ( this . reconnectTimer ) {
609
+ if ( ! this . disconnecting ) {
600
610
this . _clearReconnect ( ) ;
601
611
this . _setupReconnect ( ) ;
602
612
}
0 commit comments