Skip to content

Commit 139a5ec

Browse files
Yoseph Maguirevishnureddy17
andauthored
more pruning and updates (mqttjs#1409)
* yoseph and vishnu feb 1 * Co-authored-by: Vishnu Reddy <vishnureddy17@users.noreply.github.com> Co-authored-by: Vishnu Reddy <vishnureddy@microsoft.com>
1 parent 04394a8 commit 139a5ec

File tree

5 files changed

+61
-90
lines changed

5 files changed

+61
-90
lines changed

package-lock.json

Lines changed: 7 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
],
2626
"types": "dist",
2727
"scripts": {
28-
"build": "tsc"
28+
"build": "tsc",
29+
"test": "npm run build && ava"
2930
},
3031
"dependencies": {
3132
"duplexify": "^4.1.2",
@@ -38,8 +39,7 @@
3839
"devDependencies": {
3940
"@types/duplexify": "^3.6.1",
4041
"@types/end-of-stream": "^1.4.1",
41-
"@types/node": "^14.14.31",
42-
"@types/pino": "^7.0.5",
42+
"@types/node": "^14.18.10",
4343
"@types/ws": "^8.2.2",
4444
"ava": "^4.0.1",
4545
"pino-pretty": "^7.5.0",

src/client.ts

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,31 @@
11
import { IConnackPacket, IConnectPacket, Packet, parser as mqttParser, Parser as MqttParser, writeToStream } from 'mqtt-packet'
22
import { ConnectOptions } from './interfaces/connectOptions.js'
3-
import { Duplex, Readable } from 'stream'
3+
import { Duplex } from 'stream'
44
import { EventEmitter } from 'node:events'
55
import { connectionFactory } from './connectionFactory/index.js'
66
import eos from 'end-of-stream'
77
import { defaultConnectOptions } from './utils/constants.js'
8-
import { write } from './write.js'
98
import { ReasonCodeErrors } from './errors.js'
10-
import { Store } from './store.js'
11-
import { nextTick } from 'process'
129
import { logger } from './utils/logger.js'
1310
import { defaultClientId } from './utils/defaultClientId.js'
1411

1512
// const eventEmitter = require('events')
1613
// const mqttErrors = require('errors')
1714

15+
function eosPromisified(stream: NodeJS.ReadableStream | NodeJS.WritableStream): Promise<void> {
16+
return new Promise<void>((resolve, reject) => {
17+
eos(stream, err => err instanceof Error ? reject(err) : resolve());
18+
})
19+
}
20+
21+
// call close(done) after the stream has closed () => void
22+
// call await close()
1823

1924
export class MqttClient extends EventEmitter {
2025
_incomingPacketParser: MqttParser
2126
_options: ConnectOptions
2227
connacked: boolean = false
2328
disconnected: boolean = true
24-
incomingStore: Store
25-
outgoingStore: Store
2629
disconnecting: any
2730
pingTimer: any
2831
queueQoSZero: boolean = false
@@ -42,11 +45,11 @@ export class MqttClient extends EventEmitter {
4245
connected: boolean
4346
errored: boolean
4447
id: null
48+
_eos: Promise<void> | undefined
4549
clean: boolean
4650
version: null
4751
conn: Duplex
4852
_disconnected: boolean
49-
_eos: () => void
5053
_parsingBatch: number = 0
5154
pingResp: boolean | null
5255
inflightMessagesThatNeedToBeCleanedUpIfTheConnCloses: {[x: string]: any}
@@ -58,7 +61,6 @@ export class MqttClient extends EventEmitter {
5861
outgoing: {[x:string]: any}
5962

6063

61-
6264
constructor (options: ConnectOptions) {
6365
super()
6466
// assume this the options have been validated before instantiating the client.
@@ -91,11 +93,8 @@ export class MqttClient extends EventEmitter {
9193
// Using this method to clean up the constructor to do options handling
9294
this._options = {...defaultConnectOptions, ...options}
9395

94-
this.conn = this._options.customStreamFactory? this._options.customStreamFactory(this._options) : connectionFactory(this._options)
95-
96-
this.outgoingStore = options.outgoingStore || new Store()
97-
this.incomingStore = options.incomingStore || new Store()
98-
96+
this.conn = this._options.customStreamFactory? this._options.customStreamFactory(this._options) : connectionFactory(this._options);
97+
9998
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
10099
this.conn.setMaxListeners(1000)
101100

@@ -108,8 +107,10 @@ export class MqttClient extends EventEmitter {
108107
// readable stream of the conn stream.
109108
this._incomingPacketParser.on('packet', this.handleIncomingPacket)
110109

111-
// Echo connection errors
112-
this._incomingPacketParser.on('error', this.emit.bind(this, 'error'))
110+
// Echo connection errors this.emit('clientError')
111+
// We could look at maybe pushing errors in different directions depending on how we should
112+
// respond to the different errors.
113+
this._incomingPacketParser.on('error', () => this.emit('clientError'));
113114

114115
this.once('connected', () => {})
115116
this.on('close', () => {
@@ -125,12 +126,14 @@ export class MqttClient extends EventEmitter {
125126
}
126127
})
127128

128-
this.on('error', this.onError)
129-
this.conn.on('error', this.emit.bind(this, 'error'))
129+
this.on('clientError', this.onError)
130+
this.conn.on('error', this.emit.bind(this, 'clientError'))
130131

131-
this.conn.on('end', this.close.bind(this))
132-
this._eos = eos(this.conn, this.close.bind(this))
133-
132+
this.conn.on('end', () => { this.close() });
133+
this._eos = eosPromisified(this.conn);
134+
this._eos.catch((err: any) => {
135+
this.emit('error', err);
136+
})
134137
}
135138

136139
mergeDefaultOptions(options: ConnectOptions): ConnectOptions {
@@ -224,44 +227,35 @@ export class MqttClient extends EventEmitter {
224227
} else if (rc > 0) {
225228
const err:any = new Error('Connection refused: ' + ReasonCodeErrors[rc as keyof typeof ReasonCodeErrors])
226229
err.code = rc
227-
this.emit('error', err)
230+
this.emit('clientError', err)
228231
throw err
229232
}
230-
231-
let outStore: Readable | null = await this.outgoingStore.createStream()
232-
const clearStoreProcessing = () => {
233-
}
234-
235-
this.once('close', () => {
236-
if (outStore) {
237-
outStore.destroy()
238-
clearStoreProcessing()
239-
}
240-
})
241-
outStore.on('error', (err) => {
242-
clearStoreProcessing()
243-
this.removeListener('close', remove)
244-
this.emit('error', err)
245-
})
246-
247-
const remove = () => {
248-
if (outStore) {
249-
outStore.destroy()
250-
outStore = null
251-
clearStoreProcessing()
252-
}
253-
}
254233
}
255234

235+
/* THIS NEEDS TO BE THOUGHT THROUGH MORE */
256236
private async _sendConnect(): Promise<void> {
257-
const connectPacket: IConnectPacket = createConnectPacket(this._options);
258-
await write(this, connectPacket)
237+
const result: boolean | undefined = writeToStream(createConnectPacket(this._options), this.conn) as boolean | undefined;
238+
return new Promise<void>((resolve, reject) => {
239+
if (this.errored) {
240+
reject();
241+
} else if (!result) { // conn is full, wait to drain before resolving...
242+
this.conn.once('drain', resolve)
243+
} else { // no error, no
244+
resolve();
245+
}
246+
});
259247
}
260248

261-
close (_done: any) {
249+
async close(_error?: Error | null | undefined): Promise<void> {
250+
// empty right now
262251
}
263252

264-
onError (_err: any) {
253+
onError (err?: Error | null | undefined) {
254+
this.emit('error', err);
255+
this.errored = true;
256+
this.conn.removeAllListeners('error');
257+
this.conn.on('error', () => {});
258+
this.close()
265259
}
266260

267261
sendPacket(packet: Packet) {
@@ -270,22 +264,12 @@ export class MqttClient extends EventEmitter {
270264
}
271265

272266
async end (force?: boolean, opts?: any) {
273-
const closeStores = async () => {
274-
this.disconnected = true
275-
try {
276-
await this.incomingStore.close();
277-
await this.outgoingStore.close();
278-
} catch (e) {
279-
}
280-
this.emit('end')
281-
}
282267

283268
const finish = async () => {
284269
// defer closesStores of an I/O cycle,
285270
// just to make sure things are
286271
// ok for websockets
287272
await this._cleanUp(force, opts);
288-
nextTick(closeStores.bind(this))
289273
}
290274

291275
if (this.disconnecting) {

src/write.js

Whitespace-only changes.

test/test_connect.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,11 @@ test('should send a CONNECT packet to the broker and receive a CONNACK', async t
1414
const client = await connect({
1515
brokerUrl: 'mqtt://localhost',
1616
});
17-
});
17+
});
18+
19+
/**
20+
* 1) Send a Connect packet
21+
* 2) writeToStream returns false and emits an error on this.conn('error')
22+
*
23+
* We shouldn't be throwing away the whole client because 1 packet failed.
24+
*/

0 commit comments

Comments
 (0)