Skip to content

Commit 396c125

Browse files
Sequencer Tweaks (mqttjs#1467)
Co-authored-by: Yoseph Maguire <yoseph.maguire@gmail.com>
1 parent f6fc93b commit 396c125

File tree

8 files changed

+368
-332
lines changed

8 files changed

+368
-332
lines changed

src/client.ts

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@ import {
1010
} from 'mqtt-packet';
1111
import { write } from './write.js';
1212
import { ConnectOptions } from './interface/connectOptions.js';
13-
import { Duplex } from 'stream';
13+
import { Duplex } from 'node:stream';
14+
import { Socket } from 'node:net';
1415
import { EventEmitter } from 'node:events';
1516
import { connectionFactory } from './connectionFactory/index.js';
1617
import eos from 'end-of-stream';
1718
import { defaultConnectOptions } from './util/constants.js';
18-
import { ReasonCodeErrors } from './util/errors.js';
19+
import { ReasonCodeErrors } from './util/reasonCodes.js';
1920
import { logger } from './util/logger.js';
2021
import { defaultClientId } from './util/defaultClientId.js';
2122
import { PublishPacket } from './interface/packets.js';
22-
import { NumberAllocator } from 'number-allocator';
2323
import { Logger } from 'pino';
2424
import * as sequencer from './sequencer.js';
2525

@@ -37,16 +37,15 @@ export class MqttClient extends EventEmitter {
3737
connected: boolean;
3838
errored: boolean;
3939
_eos: Promise<void> | undefined;
40-
conn: Duplex;
40+
conn: Duplex | Socket;
4141
_clientLogger: Logger;
4242
/**
4343
* Use packet ID as key if there is one (e.g., SUBACK)
4444
* Use packet type as key if there is no packet ID (e.g., CONNACK)
4545
*/
4646
// TODO: This should be removed after we remove CONNECT into the sequencer
4747
_inflightPackets: Map<string | number, (err: Error | null, packet: IPacket) => void>;
48-
private _numberAllocator: NumberAllocator;
49-
private _packetSequencer = new sequencer.MqttPacketSequencer(this._sendPacketCallback.bind(this));
48+
private _packetSequencer = new sequencer.MqttPacketSequencer(write.bind(null, this));
5049

5150
constructor(options: ConnectOptions) {
5251
super();
@@ -67,8 +66,6 @@ export class MqttClient extends EventEmitter {
6766

6867
this._clientLogger = logger.child({ id: this._options.clientId });
6968

70-
this._numberAllocator = new NumberAllocator(1, 65535);
71-
7269
this.conn = this._options.customStreamFactory
7370
? this._options.customStreamFactory(this._options)
7471
: connectionFactory(this._options);
@@ -124,15 +121,6 @@ export class MqttClient extends EventEmitter {
124121
});
125122
}
126123

127-
private _sendPacketCallback(packetType: sequencer.PacketType, message: sequencer.Message): void {
128-
if (packetType == 'publish') {
129-
write(this, message as IPublishPacket);
130-
} else {
131-
logger.error(`Unexpected packet type: ${packetType}`);
132-
throw new Error(`Unexpected packet type: ${packetType}`);
133-
}
134-
}
135-
136124
async handleIncomingPacket(packet: Packet): Promise<void> {
137125
this._clientLogger.trace(`handleIncomingPacket packet.cmd=${packet.cmd}`);
138126
switch (packet.cmd) {
@@ -150,7 +138,7 @@ export class MqttClient extends EventEmitter {
150138
// (We need the sequencer to do this because it has to send puback messages and it needs to do the whole QOS-2 thing when packets come in.)
151139
//
152140
// Also, another random thought, when we get suback back from the broker, it will include granted QOS values and we'll need to return those.
153-
this._packetSequencer.handleIncomingPacket('puback', (packet as unknown) as sequencer.Packet);
141+
this._packetSequencer.handleIncomingPacket((packet as unknown) as sequencer.Packet);
154142
break;
155143
}
156144
}
@@ -165,7 +153,7 @@ export class MqttClient extends EventEmitter {
165153
public async connect(): Promise<IConnackPacket> {
166154
logger.trace('sending connect...');
167155
this.connecting = true;
168-
156+
169157
const connackPromise = this._awaitConnack();
170158
const packet: IConnectPacket = {
171159
cmd: 'connect',
@@ -179,7 +167,8 @@ export class MqttClient extends EventEmitter {
179167
will: this._options.will,
180168
properties: this._options.properties,
181169
};
182-
logger.trace(`writing connect...`);
170+
this._packetSequencer.runSequence(packet)
171+
logger.trace(`running connect sequence...`);
183172
await write(this, packet);
184173
logger.trace('waiting for connack...');
185174
const connack = await connackPromise;
@@ -319,11 +308,27 @@ export class MqttClient extends EventEmitter {
319308
}
320309
}
321310

311+
// TODO: follow up on Aedes to see if there is a better way than breaking the Node Streams contract and accessing _writableState
312+
// to make sure that the write callback is cleaned up in case of error.
322313
onError(err?: Error | null | undefined) {
323314
this.emit('error', err);
324315
this.errored = true;
325316
this.conn.removeAllListeners('error');
326317
this.conn.on('error', () => {});
318+
// hack to clean up the write callbacks in case of error
319+
this.hackyCleanupWriteCallback();
327320
this._destroyClient(true);
328321
}
322+
323+
hackyCleanupWriteCallback() {
324+
// _writableState is not part of the public API for Duplex or Socket, so we have to do some typecasting here to work with it as the stream state.
325+
// See https://github.com/nodejs/node/issues/445 for information on this.
326+
const state = (this.conn as any)._writableState;
327+
if (typeof state.getBuffer !== 'function') {
328+
// See https://github.com/nodejs/node/pull/31165
329+
throw new Error('_writableState.buffer is EOL. _writableState should have getBuffer() as a function.');
330+
}
331+
const list: any[] = state.getBuffer();
332+
list.forEach((req) => {req.callback()});
333+
}
329334
}

src/interface/clientOptions.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export interface ClientOptions {
2+
/**
3+
* MQTT protocol version to use. Use 4 for vMQTT 3.1.1, and 5 for MQTT v5.0
4+
* Default: 5
5+
*/
6+
protocolVersion?: 4 | 5;
7+
}

src/interface/packets.ts

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)