Skip to content

Commit 8a51187

Browse files
use map for tracking inflight packets being waited on (mqttjs#1433)
* use map for tracking inflight packets being waited on * tweak _awaitConnack
1 parent 3ab6a0b commit 8a51187

File tree

1 file changed

+24
-12
lines changed

1 file changed

+24
-12
lines changed

src/client.ts

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IConnackPacket, IConnectPacket, IDisconnectPacket, IPublishPacket, Packet, parser as mqttParser, Parser as MqttParser } from 'mqtt-packet'
1+
import { IPacket, IConnackPacket, IConnectPacket, IDisconnectPacket, IPublishPacket, Packet, parser as mqttParser, Parser as MqttParser } from 'mqtt-packet'
22
import { write } from './write.js'
33
import { ConnectOptions } from './interface/connectOptions.js'
44
import { Duplex } from 'stream'
@@ -29,6 +29,11 @@ export class MqttClient extends EventEmitter {
2929
_eos: Promise<void> | undefined
3030
conn: Duplex
3131
_clientLogger: Logger
32+
/**
33+
* Use packet ID as key if there is one (e.g., SUBACK)
34+
* Use packet type as key if there is no packet ID (e.g., CONNACK)
35+
*/
36+
_inflightPackets: Map<string | number, (err: Error | null, packet: IPacket) => void>
3237
private _numberAllocator: NumberAllocator
3338

3439

@@ -39,6 +44,7 @@ export class MqttClient extends EventEmitter {
3944
this.connected = false
4045
this.errored = false
4146
this.disconnecting = false
47+
this._inflightPackets = new Map()
4248

4349
// Using this method to clean up the constructor to do options handling
4450
logger.trace(`populating internal client options object...`);
@@ -107,7 +113,11 @@ export class MqttClient extends EventEmitter {
107113
this._clientLogger.trace(`handleIncomingPacket packet.cmd=${packet.cmd}`);
108114
switch (packet.cmd) {
109115
case 'connack':
110-
this.emit('connack', packet)
116+
const connackCallback = this._inflightPackets.get('connack')
117+
if (connackCallback) {
118+
this._inflightPackets.delete('connack')
119+
connackCallback(null, packet as IConnackPacket)
120+
}
111121
break;
112122
}
113123
}
@@ -234,18 +244,20 @@ export class MqttClient extends EventEmitter {
234244
}
235245

236246
private async _awaitConnack(): Promise<IConnackPacket> {
237-
this._clientLogger.trace(`in awaitConnect. setting connackTimeout.`);
238247
return new Promise((resolve, reject) => {
239-
const connackTimeout = setTimeout(
240-
() => { reject(new Error('Connection timed out')) },
241-
this._options.connectTimeout
242-
);
243-
this._clientLogger.trace(`listening for 'connack'`);
244-
this.once('connack', (connackPacket: IConnackPacket) => {
245-
this._clientLogger.trace(`connack received. clearing connackTimeout...`);
246-
clearTimeout(connackTimeout);
247-
resolve(connackPacket)
248+
if (this._inflightPackets.has('connack')) {
249+
reject(new Error('connack packet callback already exists'))
250+
return;
251+
}
252+
this._inflightPackets.set('connack', (err, packet) => {
253+
err ? reject(err) : resolve(packet as IConnackPacket)
248254
})
255+
let connackTimeout: NodeJS.Timeout | null = setTimeout(() => {
256+
this._inflightPackets.delete('connack')
257+
clearTimeout(connackTimeout as NodeJS.Timeout)
258+
connackTimeout = null
259+
reject(new Error('connack packet timeout'))
260+
}, this._options.connectTimeout)
249261
})
250262
}
251263

0 commit comments

Comments
 (0)