@@ -10,16 +10,16 @@ import {
10
10
} from 'mqtt-packet' ;
11
11
import { write } from './write.js' ;
12
12
import { ConnectOptions } from './interface/connectOptions.js' ;
13
- import { Duplex } from 'stream' ;
13
+ import { Duplex } from 'node:stream' ;
14
+ import { Socket } from 'node:net' ;
14
15
import { EventEmitter } from 'node:events' ;
15
16
import { connectionFactory } from './connectionFactory/index.js' ;
16
17
import eos from 'end-of-stream' ;
17
18
import { defaultConnectOptions } from './util/constants.js' ;
18
- import { ReasonCodeErrors } from './util/errors .js' ;
19
+ import { ReasonCodeErrors } from './util/reasonCodes .js' ;
19
20
import { logger } from './util/logger.js' ;
20
21
import { defaultClientId } from './util/defaultClientId.js' ;
21
22
import { PublishPacket } from './interface/packets.js' ;
22
- import { NumberAllocator } from 'number-allocator' ;
23
23
import { Logger } from 'pino' ;
24
24
import * as sequencer from './sequencer.js' ;
25
25
@@ -37,16 +37,15 @@ export class MqttClient extends EventEmitter {
37
37
connected : boolean ;
38
38
errored : boolean ;
39
39
_eos : Promise < void > | undefined ;
40
- conn : Duplex ;
40
+ conn : Duplex | Socket ;
41
41
_clientLogger : Logger ;
42
42
/**
43
43
* Use packet ID as key if there is one (e.g., SUBACK)
44
44
* Use packet type as key if there is no packet ID (e.g., CONNACK)
45
45
*/
46
46
// TODO: This should be removed after we remove CONNECT into the sequencer
47
47
_inflightPackets : Map < string | number , ( err : Error | null , packet : IPacket ) => void > ;
48
- private _numberAllocator : NumberAllocator ;
49
- private _packetSequencer = new sequencer . MqttPacketSequencer ( this . _sendPacketCallback . bind ( this ) ) ;
48
+ private _packetSequencer = new sequencer . MqttPacketSequencer ( write . bind ( null , this ) ) ;
50
49
51
50
constructor ( options : ConnectOptions ) {
52
51
super ( ) ;
@@ -67,8 +66,6 @@ export class MqttClient extends EventEmitter {
67
66
68
67
this . _clientLogger = logger . child ( { id : this . _options . clientId } ) ;
69
68
70
- this . _numberAllocator = new NumberAllocator ( 1 , 65535 ) ;
71
-
72
69
this . conn = this . _options . customStreamFactory
73
70
? this . _options . customStreamFactory ( this . _options )
74
71
: connectionFactory ( this . _options ) ;
@@ -124,15 +121,6 @@ export class MqttClient extends EventEmitter {
124
121
} ) ;
125
122
}
126
123
127
- private _sendPacketCallback ( packetType : sequencer . PacketType , message : sequencer . Message ) : void {
128
- if ( packetType == 'publish' ) {
129
- write ( this , message as IPublishPacket ) ;
130
- } else {
131
- logger . error ( `Unexpected packet type: ${ packetType } ` ) ;
132
- throw new Error ( `Unexpected packet type: ${ packetType } ` ) ;
133
- }
134
- }
135
-
136
124
async handleIncomingPacket ( packet : Packet ) : Promise < void > {
137
125
this . _clientLogger . trace ( `handleIncomingPacket packet.cmd=${ packet . cmd } ` ) ;
138
126
switch ( packet . cmd ) {
@@ -150,7 +138,7 @@ export class MqttClient extends EventEmitter {
150
138
// (We need the sequencer to do this because it has to send puback messages and it needs to do the whole QOS-2 thing when packets come in.)
151
139
//
152
140
// Also, another random thought, when we get suback back from the broker, it will include granted QOS values and we'll need to return those.
153
- this . _packetSequencer . handleIncomingPacket ( 'puback' , ( packet as unknown ) as sequencer . Packet ) ;
141
+ this . _packetSequencer . handleIncomingPacket ( ( packet as unknown ) as sequencer . Packet ) ;
154
142
break ;
155
143
}
156
144
}
@@ -165,7 +153,7 @@ export class MqttClient extends EventEmitter {
165
153
public async connect ( ) : Promise < IConnackPacket > {
166
154
logger . trace ( 'sending connect...' ) ;
167
155
this . connecting = true ;
168
-
156
+
169
157
const connackPromise = this . _awaitConnack ( ) ;
170
158
const packet : IConnectPacket = {
171
159
cmd : 'connect' ,
@@ -179,7 +167,8 @@ export class MqttClient extends EventEmitter {
179
167
will : this . _options . will ,
180
168
properties : this . _options . properties ,
181
169
} ;
182
- logger . trace ( `writing connect...` ) ;
170
+ this . _packetSequencer . runSequence ( packet )
171
+ logger . trace ( `running connect sequence...` ) ;
183
172
await write ( this , packet ) ;
184
173
logger . trace ( 'waiting for connack...' ) ;
185
174
const connack = await connackPromise ;
@@ -319,11 +308,27 @@ export class MqttClient extends EventEmitter {
319
308
}
320
309
}
321
310
311
+ // TODO: follow up on Aedes to see if there is a better way than breaking the Node Streams contract and accessing _writableState
312
+ // to make sure that the write callback is cleaned up in case of error.
322
313
onError ( err ?: Error | null | undefined ) {
323
314
this . emit ( 'error' , err ) ;
324
315
this . errored = true ;
325
316
this . conn . removeAllListeners ( 'error' ) ;
326
317
this . conn . on ( 'error' , ( ) => { } ) ;
318
+ // hack to clean up the write callbacks in case of error
319
+ this . hackyCleanupWriteCallback ( ) ;
327
320
this . _destroyClient ( true ) ;
328
321
}
322
+
323
+ hackyCleanupWriteCallback ( ) {
324
+ // _writableState is not part of the public API for Duplex or Socket, so we have to do some typecasting here to work with it as the stream state.
325
+ // See https://github.com/nodejs/node/issues/445 for information on this.
326
+ const state = ( this . conn as any ) . _writableState ;
327
+ if ( typeof state . getBuffer !== 'function' ) {
328
+ // See https://github.com/nodejs/node/pull/31165
329
+ throw new Error ( '_writableState.buffer is EOL. _writableState should have getBuffer() as a function.' ) ;
330
+ }
331
+ const list : any [ ] = state . getBuffer ( ) ;
332
+ list . forEach ( ( req ) => { req . callback ( ) } ) ;
333
+ }
329
334
}
0 commit comments