1
1
import { IConnackPacket , IConnectPacket , Packet , parser as mqttParser , Parser as MqttParser , writeToStream } from 'mqtt-packet'
2
2
import { ConnectOptions } from './interfaces/connectOptions.js'
3
- import { Duplex , Readable } from 'stream'
3
+ import { Duplex } from 'stream'
4
4
import { EventEmitter } from 'node:events'
5
5
import { connectionFactory } from './connectionFactory/index.js'
6
6
import eos from 'end-of-stream'
7
7
import { defaultConnectOptions } from './utils/constants.js'
8
- import { write } from './write.js'
9
8
import { ReasonCodeErrors } from './errors.js'
10
- import { Store } from './store.js'
11
- import { nextTick } from 'process'
12
9
import { logger } from './utils/logger.js'
13
10
import { defaultClientId } from './utils/defaultClientId.js'
14
11
15
12
// const eventEmitter = require('events')
16
13
// const mqttErrors = require('errors')
17
14
15
+ function eosPromisified ( stream : NodeJS . ReadableStream | NodeJS . WritableStream ) : Promise < void > {
16
+ return new Promise < void > ( ( resolve , reject ) => {
17
+ eos ( stream , err => err instanceof Error ? reject ( err ) : resolve ( ) ) ;
18
+ } )
19
+ }
20
+
21
+ // call close(done) after the stream has closed () => void
22
+ // call await close()
18
23
19
24
export class MqttClient extends EventEmitter {
20
25
_incomingPacketParser : MqttParser
21
26
_options : ConnectOptions
22
27
connacked : boolean = false
23
28
disconnected : boolean = true
24
- incomingStore : Store
25
- outgoingStore : Store
26
29
disconnecting : any
27
30
pingTimer : any
28
31
queueQoSZero : boolean = false
@@ -42,11 +45,11 @@ export class MqttClient extends EventEmitter {
42
45
connected : boolean
43
46
errored : boolean
44
47
id : null
48
+ _eos : Promise < void > | undefined
45
49
clean : boolean
46
50
version : null
47
51
conn : Duplex
48
52
_disconnected : boolean
49
- _eos : ( ) => void
50
53
_parsingBatch : number = 0
51
54
pingResp : boolean | null
52
55
inflightMessagesThatNeedToBeCleanedUpIfTheConnCloses : { [ x : string ] : any }
@@ -58,7 +61,6 @@ export class MqttClient extends EventEmitter {
58
61
outgoing : { [ x :string ] : any }
59
62
60
63
61
-
62
64
constructor ( options : ConnectOptions ) {
63
65
super ( )
64
66
// assume this the options have been validated before instantiating the client.
@@ -91,11 +93,8 @@ export class MqttClient extends EventEmitter {
91
93
// Using this method to clean up the constructor to do options handling
92
94
this . _options = { ...defaultConnectOptions , ...options }
93
95
94
- this . conn = this . _options . customStreamFactory ? this . _options . customStreamFactory ( this . _options ) : connectionFactory ( this . _options )
95
-
96
- this . outgoingStore = options . outgoingStore || new Store ( )
97
- this . incomingStore = options . incomingStore || new Store ( )
98
-
96
+ this . conn = this . _options . customStreamFactory ? this . _options . customStreamFactory ( this . _options ) : connectionFactory ( this . _options ) ;
97
+
99
98
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
100
99
this . conn . setMaxListeners ( 1000 )
101
100
@@ -108,8 +107,10 @@ export class MqttClient extends EventEmitter {
108
107
// readable stream of the conn stream.
109
108
this . _incomingPacketParser . on ( 'packet' , this . handleIncomingPacket )
110
109
111
- // Echo connection errors
112
- this . _incomingPacketParser . on ( 'error' , this . emit . bind ( this , 'error' ) )
110
+ // Echo connection errors this.emit('clientError')
111
+ // We could look at maybe pushing errors in different directions depending on how we should
112
+ // respond to the different errors.
113
+ this . _incomingPacketParser . on ( 'error' , ( ) => this . emit ( 'clientError' ) ) ;
113
114
114
115
this . once ( 'connected' , ( ) => { } )
115
116
this . on ( 'close' , ( ) => {
@@ -125,12 +126,14 @@ export class MqttClient extends EventEmitter {
125
126
}
126
127
} )
127
128
128
- this . on ( 'error ' , this . onError )
129
- this . conn . on ( 'error' , this . emit . bind ( this , 'error ' ) )
129
+ this . on ( 'clientError ' , this . onError )
130
+ this . conn . on ( 'error' , this . emit . bind ( this , 'clientError ' ) )
130
131
131
- this . conn . on ( 'end' , this . close . bind ( this ) )
132
- this . _eos = eos ( this . conn , this . close . bind ( this ) )
133
-
132
+ this . conn . on ( 'end' , ( ) => { this . close ( ) } ) ;
133
+ this . _eos = eosPromisified ( this . conn ) ;
134
+ this . _eos . catch ( ( err : any ) => {
135
+ this . emit ( 'error' , err ) ;
136
+ } )
134
137
}
135
138
136
139
mergeDefaultOptions ( options : ConnectOptions ) : ConnectOptions {
@@ -224,44 +227,35 @@ export class MqttClient extends EventEmitter {
224
227
} else if ( rc > 0 ) {
225
228
const err :any = new Error ( 'Connection refused: ' + ReasonCodeErrors [ rc as keyof typeof ReasonCodeErrors ] )
226
229
err . code = rc
227
- this . emit ( 'error ' , err )
230
+ this . emit ( 'clientError ' , err )
228
231
throw err
229
232
}
230
-
231
- let outStore : Readable | null = await this . outgoingStore . createStream ( )
232
- const clearStoreProcessing = ( ) => {
233
- }
234
-
235
- this . once ( 'close' , ( ) => {
236
- if ( outStore ) {
237
- outStore . destroy ( )
238
- clearStoreProcessing ( )
239
- }
240
- } )
241
- outStore . on ( 'error' , ( err ) => {
242
- clearStoreProcessing ( )
243
- this . removeListener ( 'close' , remove )
244
- this . emit ( 'error' , err )
245
- } )
246
-
247
- const remove = ( ) => {
248
- if ( outStore ) {
249
- outStore . destroy ( )
250
- outStore = null
251
- clearStoreProcessing ( )
252
- }
253
- }
254
233
}
255
234
235
+ /* THIS NEEDS TO BE THOUGHT THROUGH MORE */
256
236
private async _sendConnect ( ) : Promise < void > {
257
- const connectPacket : IConnectPacket = createConnectPacket ( this . _options ) ;
258
- await write ( this , connectPacket )
237
+ const result : boolean | undefined = writeToStream ( createConnectPacket ( this . _options ) , this . conn ) as boolean | undefined ;
238
+ return new Promise < void > ( ( resolve , reject ) => {
239
+ if ( this . errored ) {
240
+ reject ( ) ;
241
+ } else if ( ! result ) { // conn is full, wait to drain before resolving...
242
+ this . conn . once ( 'drain' , resolve )
243
+ } else { // no error, no
244
+ resolve ( ) ;
245
+ }
246
+ } ) ;
259
247
}
260
248
261
- close ( _done : any ) {
249
+ async close ( _error ?: Error | null | undefined ) : Promise < void > {
250
+ // empty right now
262
251
}
263
252
264
- onError ( _err : any ) {
253
+ onError ( err ?: Error | null | undefined ) {
254
+ this . emit ( 'error' , err ) ;
255
+ this . errored = true ;
256
+ this . conn . removeAllListeners ( 'error' ) ;
257
+ this . conn . on ( 'error' , ( ) => { } ) ;
258
+ this . close ( )
265
259
}
266
260
267
261
sendPacket ( packet : Packet ) {
@@ -270,22 +264,12 @@ export class MqttClient extends EventEmitter {
270
264
}
271
265
272
266
async end ( force ?: boolean , opts ?: any ) {
273
- const closeStores = async ( ) => {
274
- this . disconnected = true
275
- try {
276
- await this . incomingStore . close ( ) ;
277
- await this . outgoingStore . close ( ) ;
278
- } catch ( e ) {
279
- }
280
- this . emit ( 'end' )
281
- }
282
267
283
268
const finish = async ( ) => {
284
269
// defer closesStores of an I/O cycle,
285
270
// just to make sure things are
286
271
// ok for websockets
287
272
await this . _cleanUp ( force , opts ) ;
288
- nextTick ( closeStores . bind ( this ) )
289
273
}
290
274
291
275
if ( this . disconnecting ) {
0 commit comments