1
- import { IConnackPacket , IConnectPacket , IDisconnectPacket , IPublishPacket , Packet , parser as mqttParser , Parser as MqttParser } from 'mqtt-packet'
1
+ import { IPacket , IConnackPacket , IConnectPacket , IDisconnectPacket , IPublishPacket , Packet , parser as mqttParser , Parser as MqttParser } from 'mqtt-packet'
2
2
import { write } from './write.js'
3
3
import { ConnectOptions } from './interface/connectOptions.js'
4
4
import { Duplex } from 'stream'
@@ -29,6 +29,11 @@ export class MqttClient extends EventEmitter {
29
29
_eos : Promise < void > | undefined
30
30
conn : Duplex
31
31
_clientLogger : Logger
32
+ /**
33
+ * Use packet ID as key if there is one (e.g., SUBACK)
34
+ * Use packet type as key if there is no packet ID (e.g., CONNACK)
35
+ */
36
+ _inflightPackets : Map < string | number , ( err : Error | null , packet : IPacket ) => void >
32
37
private _numberAllocator : NumberAllocator
33
38
34
39
@@ -39,6 +44,7 @@ export class MqttClient extends EventEmitter {
39
44
this . connected = false
40
45
this . errored = false
41
46
this . disconnecting = false
47
+ this . _inflightPackets = new Map ( )
42
48
43
49
// Using this method to clean up the constructor to do options handling
44
50
logger . trace ( `populating internal client options object...` ) ;
@@ -107,7 +113,11 @@ export class MqttClient extends EventEmitter {
107
113
this . _clientLogger . trace ( `handleIncomingPacket packet.cmd=${ packet . cmd } ` ) ;
108
114
switch ( packet . cmd ) {
109
115
case 'connack' :
110
- this . emit ( 'connack' , packet )
116
+ const connackCallback = this . _inflightPackets . get ( 'connack' )
117
+ if ( connackCallback ) {
118
+ this . _inflightPackets . delete ( 'connack' )
119
+ connackCallback ( null , packet as IConnackPacket )
120
+ }
111
121
break ;
112
122
}
113
123
}
@@ -234,18 +244,20 @@ export class MqttClient extends EventEmitter {
234
244
}
235
245
236
246
private async _awaitConnack ( ) : Promise < IConnackPacket > {
237
- this . _clientLogger . trace ( `in awaitConnect. setting connackTimeout.` ) ;
238
247
return new Promise ( ( resolve , reject ) => {
239
- const connackTimeout = setTimeout (
240
- ( ) => { reject ( new Error ( 'Connection timed out' ) ) } ,
241
- this . _options . connectTimeout
242
- ) ;
243
- this . _clientLogger . trace ( `listening for 'connack'` ) ;
244
- this . once ( 'connack' , ( connackPacket : IConnackPacket ) => {
245
- this . _clientLogger . trace ( `connack received. clearing connackTimeout...` ) ;
246
- clearTimeout ( connackTimeout ) ;
247
- resolve ( connackPacket )
248
+ if ( this . _inflightPackets . has ( 'connack' ) ) {
249
+ reject ( new Error ( 'connack packet callback already exists' ) )
250
+ return ;
251
+ }
252
+ this . _inflightPackets . set ( 'connack' , ( err , packet ) => {
253
+ err ? reject ( err ) : resolve ( packet as IConnackPacket )
248
254
} )
255
+ let connackTimeout : NodeJS . Timeout | null = setTimeout ( ( ) => {
256
+ this . _inflightPackets . delete ( 'connack' )
257
+ clearTimeout ( connackTimeout as NodeJS . Timeout )
258
+ connackTimeout = null
259
+ reject ( new Error ( 'connack packet timeout' ) )
260
+ } , this . _options . connectTimeout )
249
261
} )
250
262
}
251
263
0 commit comments