Skip to content

Commit 79b5582

Browse files
authored
possible sequencer implementation (mqttjs#1449)
1 parent c798c03 commit 79b5582

File tree

2 files changed

+329
-2
lines changed

2 files changed

+329
-2
lines changed

src/client.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { defaultClientId } from './util/defaultClientId.js';
2121
import { PublishPacket } from './interface/packets.js';
2222
import { NumberAllocator } from 'number-allocator';
2323
import { Logger } from 'pino';
24+
import * as sequencer from './sequencer.js';
2425

2526
function eosPromisified(stream: NodeJS.ReadableStream | NodeJS.WritableStream): Promise<void> {
2627
return new Promise<void>((resolve, reject) => {
@@ -42,8 +43,10 @@ export class MqttClient extends EventEmitter {
4243
* Use packet ID as key if there is one (e.g., SUBACK)
4344
* Use packet type as key if there is no packet ID (e.g., CONNACK)
4445
*/
46+
// TODO: This should be removed after we remove CONNECT into the sequencer
4547
_inflightPackets: Map<string | number, (err: Error | null, packet: IPacket) => void>;
4648
private _numberAllocator: NumberAllocator;
49+
private _packetSequencer = new sequencer.MqttPacketSequencer(this._sendPacketCallback.bind(this));
4750

4851
constructor(options: ConnectOptions) {
4952
super();
@@ -121,6 +124,15 @@ export class MqttClient extends EventEmitter {
121124
});
122125
}
123126

127+
private _sendPacketCallback(packetType: sequencer.PacketType, message: sequencer.Message): void {
128+
if (packetType == 'publish') {
129+
write(this, message as IPublishPacket);
130+
} else {
131+
logger.error(`Unexpected packet type: ${packetType}`);
132+
throw new Error(`Unexpected packet type: ${packetType}`);
133+
}
134+
}
135+
124136
async handleIncomingPacket(packet: Packet): Promise<void> {
125137
this._clientLogger.trace(`handleIncomingPacket packet.cmd=${packet.cmd}`);
126138
switch (packet.cmd) {
@@ -132,6 +144,15 @@ export class MqttClient extends EventEmitter {
132144
}
133145
break;
134146
}
147+
case 'puback': {
148+
// We should be sending almost every packet into the incoming packet sequencer including publish
149+
// When we add publish, we may need another callback function so the seqencer can tell us when a new publish packet comes in.
150+
// (We need the sequencer to do this because it has to send puback messages and it needs to do the whole QOS-2 thing when packets come in.)
151+
//
152+
// Also, another random thought, when we get suback back from the broker, it will include granted QOS values and we'll need to return those.
153+
this._packetSequencer.handleIncomingPacket('puback', (packet as unknown) as sequencer.Packet);
154+
break;
155+
}
135156
}
136157
}
137158

@@ -202,10 +223,11 @@ export class MqttClient extends EventEmitter {
202223
...defaultPublishPacket,
203224
...packet,
204225
};
205-
this._clientLogger.trace(`publishing packet ${JSON.stringify(publishPacket)}`);
206-
write(this, publishPacket);
226+
// TODO: remove this ugly cast
227+
await this._packetSequencer.runSequence('publish', (publishPacket as unknown) as sequencer.Message);
207228

208229
// deallocate the messageId used.
230+
// TODO: this should be in a finally block
209231
this._numberAllocator.free(messageId);
210232
return;
211233
}

src/sequencer.ts

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
import { logger } from './util/logger.js';
2+
3+
export declare type PacketType = 'publish' | 'puback' | 'pubrec' | 'pubrel' | 'pubcomp';
4+
5+
export declare type SequenceType = 'publish';
6+
7+
export interface Message {
8+
qos: 0 | 1 | 2;
9+
cmd: PacketType;
10+
messageId: number | undefined;
11+
}
12+
13+
export interface Packet {
14+
messageId: number | undefined;
15+
}
16+
17+
// It's called SendPacketFuncton, but it accepts a Message. Maybe we need to rethink this.
18+
// The more I think about this, the more I think it should accept a `Packet` object, and it
19+
// should use `Packet.cmd` instead of accepting a separate `PacketType` parameter.
20+
export type SendPacketFunction = (packetType: PacketType, message: Message) => void;
21+
22+
type InFlightMessageMap = { [key: number]: SequenceMachine };
23+
24+
// These should be documented so callers can change them. Do we want these to be specific for each packet type (pubRelInterval, maxPubRe, etc)?
25+
/* eslint prefer-const: 0 */
26+
let pubAckInterval = 2000;
27+
let maxPublishCount = 5;
28+
29+
type DoneFunction = (err?: Error) => void;
30+
31+
export class MqttPacketSequencer {
32+
sendPacketFunction: SendPacketFunction;
33+
inFlightMessages: InFlightMessageMap = {};
34+
35+
constructor(sendPacketFunction: SendPacketFunction) {
36+
this.sendPacketFunction = sendPacketFunction;
37+
}
38+
39+
_runSequence(sequenceType: SequenceType, message: Message, done: DoneFunction) {
40+
let sequenceMachine: SequenceMachine;
41+
42+
switch (sequenceType) {
43+
case 'publish':
44+
switch (message.qos) {
45+
case 0:
46+
sequenceMachine = new PublishQos0(message, this.sendPacketFunction, done);
47+
break;
48+
case 1:
49+
sequenceMachine = new PublishQos1(message, this.sendPacketFunction, done);
50+
break;
51+
case 2:
52+
sequenceMachine = new PublishQos2(message, this.sendPacketFunction, done);
53+
break;
54+
}
55+
this.inFlightMessages[message.messageId as number] = sequenceMachine;
56+
// SUBSCRIBE also goes into the inFlightMesages map. CONNECT maybe goes somewhere else?
57+
}
58+
59+
sequenceMachine.start();
60+
}
61+
62+
// TODO: Is there an easier way to Promisify this?
63+
runSequence(sequenceType: SequenceType, message: Message, done?: DoneFunction) {
64+
if (done) {
65+
return this._runSequence(sequenceType, message, done);
66+
} else {
67+
return new Promise<void>((resolve, reject) => {
68+
this._runSequence(sequenceType, message, (err: Error | void) => {
69+
if (err) {
70+
reject(err);
71+
} else {
72+
resolve();
73+
}
74+
});
75+
});
76+
}
77+
}
78+
79+
// `Packet` has an `cmd` value. Should we use this? Probably?
80+
handleIncomingPacket(packetType: PacketType, packet: Packet) {
81+
const sequenceMachine = this.inFlightMessages[packet.messageId as number];
82+
83+
if (sequenceMachine) {
84+
sequenceMachine.handleIncomingPacket(packetType, packet);
85+
} else {
86+
logger.info('blah');
87+
}
88+
}
89+
}
90+
91+
abstract class SequenceMachine {
92+
message: Message;
93+
sendPacketFunction: SendPacketFunction;
94+
done: DoneFunction;
95+
timeout: NodeJS.Timeout | number = 0;
96+
97+
constructor(message: Message, sendPacketFunction: SendPacketFunction, done: DoneFunction) {
98+
this.message = message;
99+
this.sendPacketFunction = sendPacketFunction;
100+
this.done = done;
101+
}
102+
103+
abstract start(): void;
104+
abstract handleIncomingPacket(packetType: PacketType, packet: Packet): void;
105+
abstract cancel(): void;
106+
107+
_setTimer(callback: () => void, interval: number): void {
108+
this._clearTimer();
109+
this.timeout = setTimeout(callback, interval);
110+
}
111+
112+
_clearTimer(): void {
113+
if (this.timeout) {
114+
clearTimeout(this.timeout as NodeJS.Timeout);
115+
this.timeout = 0;
116+
}
117+
}
118+
}
119+
120+
enum Qos0State {
121+
New,
122+
Done,
123+
Failed,
124+
}
125+
126+
class PublishQos0 extends SequenceMachine {
127+
state = Qos0State.New;
128+
129+
start(): void {
130+
this._sendPublish();
131+
}
132+
133+
_sendPublish() {
134+
try {
135+
this.sendPacketFunction('publish', this.message);
136+
this.state == Qos0State.Done;
137+
this.done();
138+
} catch (e: any) {
139+
this.state = Qos0State.Failed;
140+
this.done(e);
141+
}
142+
}
143+
144+
handleIncomingPacket(packetType: PacketType, packet: Packet): void {
145+
packetType;
146+
packet;
147+
logger.info('blah');
148+
}
149+
150+
cancel() {
151+
logger.info('blah');
152+
}
153+
}
154+
155+
// TODO: do we want a state for Cancelled?
156+
enum Qos1State {
157+
New,
158+
WaitingForPubAck,
159+
Done,
160+
Failed,
161+
}
162+
163+
class PublishQos1 extends SequenceMachine {
164+
state = Qos1State.New;
165+
sendPublishCount = 0;
166+
167+
start(): void {
168+
this._sendPublish();
169+
}
170+
171+
_sendPublish() {
172+
this.sendPublishCount++;
173+
if (this.sendPublishCount > maxPublishCount) {
174+
this._clearTimer();
175+
this.state = Qos1State.Failed;
176+
this.done(new Error());
177+
} else {
178+
// Set the state and start the timer before you send anything, just
179+
// in case the ack comes back before sendPacketFunction returns.
180+
this.state == Qos1State.WaitingForPubAck;
181+
try {
182+
this.sendPacketFunction('publish', this.message);
183+
this._setTimer(this._sendPublish.bind(this), pubAckInterval);
184+
} catch (e) {
185+
this.state = Qos1State.Failed;
186+
this.done(e as Error);
187+
}
188+
}
189+
}
190+
191+
handleIncomingPacket(packetType: PacketType, packet: Packet): void {
192+
packet;
193+
if (packetType == 'puback' && this.state == Qos1State.WaitingForPubAck) {
194+
this._clearTimer();
195+
this.state = Qos1State.Done;
196+
this.done();
197+
} else {
198+
logger.info('blah');
199+
}
200+
}
201+
202+
cancel() {
203+
this._clearTimer();
204+
if (this.state in [Qos1State.New, Qos1State.WaitingForPubAck]) {
205+
this.done(new Error());
206+
this.state = Qos1State.Failed;
207+
}
208+
}
209+
}
210+
211+
enum Qos2State {
212+
New,
213+
WaitingForPubRec,
214+
WaitingForPubComp,
215+
Done,
216+
Failed,
217+
}
218+
219+
class PublishQos2 extends SequenceMachine {
220+
state = Qos2State.New;
221+
sendPublishCount = 0;
222+
sendPubRelCount = 0;
223+
224+
start(): void {
225+
this._sendPublish();
226+
}
227+
228+
_sendPublish() {
229+
this._clearTimer();
230+
this.sendPublishCount++;
231+
if (this.sendPublishCount > maxPublishCount) {
232+
this.state = Qos2State.Failed;
233+
this.done(new Error());
234+
} else {
235+
// Set the state before you send anything, just
236+
// in case the ack comes back before sendPacketFunction returns.
237+
this.state == Qos2State.WaitingForPubRec;
238+
try {
239+
this.sendPacketFunction('publish', this.message);
240+
this._setTimer(this._sendPublish.bind(this), pubAckInterval);
241+
} catch (e) {
242+
this.state = Qos2State.Failed;
243+
this.done(e as Error);
244+
}
245+
}
246+
}
247+
248+
_sendPubRel() {
249+
this._clearTimer();
250+
this.sendPubRelCount++;
251+
if (this.sendPubRelCount > maxPublishCount) {
252+
this.state = Qos2State.Failed;
253+
this.done(new Error());
254+
} else {
255+
// Set the state before you send anything, just
256+
// in case the ack comes back before sendPacketFunction returns.
257+
this.state == Qos2State.WaitingForPubComp;
258+
try {
259+
this.sendPacketFunction('pubrel', this.message);
260+
this._setTimer(this._sendPubRel.bind(this), pubAckInterval);
261+
} catch (e) {
262+
this.state = Qos2State.Failed;
263+
this.done(e as Error);
264+
}
265+
}
266+
}
267+
268+
handleIncomingPacket(packetType: PacketType, packet: Packet) {
269+
packet;
270+
if (packetType == 'pubrec') {
271+
if (this.state in [Qos2State.WaitingForPubRec, Qos2State.WaitingForPubComp]) {
272+
// Set the state before you send anything, just
273+
// in case the ack comes back before sendPacketFunction returns.
274+
this.state = Qos2State.WaitingForPubComp;
275+
this._sendPubRel();
276+
}
277+
} else if (packetType == 'pubcomp') {
278+
if (this.state == Qos2State.WaitingForPubComp) {
279+
this._clearTimer();
280+
this.state = Qos2State.Done;
281+
this.done();
282+
} else {
283+
logger.info('blah');
284+
}
285+
} else {
286+
logger.info('blah');
287+
}
288+
}
289+
290+
// Note: after this has been proven, we need to flush out
291+
// the code so we handle any action in any state. Mostly we
292+
// just log that we're ignoring something, but we also want
293+
// to check state on the _send methods. If state == done or
294+
// failed, then we don't send anything -- this just means
295+
// we had a timer fire or an ack come back late. But if we
296+
// call _send in some other unexpected state, we might want
297+
// to except.
298+
299+
cancel() {
300+
if (this.state in [Qos2State.New, Qos2State.WaitingForPubRec, Qos2State.WaitingForPubComp]) {
301+
this.done(new Error());
302+
this.state = Qos2State.Failed;
303+
}
304+
}
305+
}

0 commit comments

Comments
 (0)