Skip to content

Commit c92b877

Browse files
authored
fix(client): Refined Topic Alias support. (Implement mqttjs#1300) (mqttjs#1301)
* Refined Topic Alias support. (Implement mqttjs#1300) Add automatic topic alias management functionality. - On PUBLISH sending, the client can automatic using/assin Topic Alias (optional). - On PUBLISH receiving, the topic parameter of on message handler is automatically complemented but the packet.topic preserves the original topic. Fix invalid tests. * Fix typo. * Fix comment. * Rename the function name from `removeTopicAlias` to `removeTopicAliasAndRecoverTopicName`. Add comments for caller side of `removeTopicAliasAndRecoverTopicName`. * Captalize label.
1 parent 8aa2f8d commit c92b877

File tree

7 files changed

+897
-42
lines changed

7 files changed

+897
-42
lines changed

README.md

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,41 @@ the final connection when it drops.
210210
The default value is 1000 ms which means it will try to reconnect 1 second
211211
after losing the connection.
212212

213+
<a name="topicalias"></a>
214+
## About Topic Alias Management
213215

216+
### Enabling automatic Topic Alias using
217+
If the client sets the option `autoUseTopicAlias:true` then MQTT.js uses existing topic alias automatically.
218+
219+
example scenario:
220+
```
221+
1. PUBLISH topic:'t1', ta:1 (register)
222+
2. PUBLISH topic:'t1' -> topic:'', ta:1 (auto use existing map entry)
223+
3. PUBLISH topic:'t2', ta:1 (register overwrite)
224+
4. PUBLISH topic:'t2' -> topic:'', ta:1 (auto use existing map entry based on the receent map)
225+
5. PUBLISH topic:'t1' (t1 is no longer mapped to ta:1)
226+
```
227+
228+
User doesn't need to manage which topic is mapped to which topic alias.
229+
If the user want to register topic alias, then publish topic with topic alias.
230+
If the user want to use topic alias, then publish topic without topic alias. If there is a mapped topic alias then added it as a property and update the topic to empty string.
231+
232+
### Enabling automatic Topic Alias assign
233+
234+
If the client sets the option `autoAssignTopicAlias:true` then MQTT.js uses existing topic alias automatically.
235+
If no topic alias exists, then assign a new vacant topic alias automatically. If topic alias is fully used, then LRU(Least Recently Used) topic-alias entry is overwritten.
236+
237+
example scenario:
238+
```
239+
The broker returns CONNACK (TopicAliasMaximum:3)
240+
1. PUBLISH topic:'t1' -> 't1', ta:1 (auto assign t1:1 and register)
241+
2. PUBLISH topic:'t1' -> '' , ta:1 (auto use existing map entry)
242+
3. PUBLISH topic:'t2' -> 't2', ta:2 (auto assign t1:2 and register. 2 was vacant)
243+
4. PUBLISH topic:'t3' -> 't3', ta:3 (auto assign t1:3 and register. 3 was vacant)
244+
5. PUBLISH topic:'t4' -> 't4', ta:1 (LRU entry is overwritten)
245+
```
246+
247+
Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:X. It works well with automatic topic alias assign.
214248

215249
<a name="api"></a>
216250
## API
@@ -295,6 +329,8 @@ the `connect` event. Typically a `net.Socket`.
295329
```js
296330
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/}
297331
```
332+
* `autoUseTopicAlias`: enabling automatic Topic Alias using functionality
333+
* `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality
298334
* `properties`: properties MQTT 5.0.
299335
`object` that supports the following properties:
300336
* `sessionExpiryInterval`: representing the Session Expiry Interval in seconds `number`,
@@ -665,7 +701,7 @@ npm install browserify
665701
npm install tinyify
666702
cd node_modules/mqtt/
667703
npm install .
668-
npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
704+
npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
669705
# show size for compressed browser transfer
670706
gzip <browserMqtt.js | wc -c
671707
```

lib/client.js

Lines changed: 177 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
*/
66
var EventEmitter = require('events').EventEmitter
77
var Store = require('./store')
8+
var TopicAliasRecv = require('./topic-alias-recv')
9+
var TopicAliasSend = require('./topic-alias-send')
810
var mqttPacket = require('mqtt-packet')
911
var DefaultMessageIdProvider = require('./default-message-id-provider')
1012
var Writable = require('readable-stream').Writable
1113
var inherits = require('inherits')
1214
var reInterval = require('reinterval')
15+
var clone = require('rfdc/default')
1316
var validations = require('./validations')
1417
var xtend = require('xtend')
1518
var debug = require('debug')('mqttjs:client')
@@ -88,9 +91,85 @@ function defaultId () {
8891
return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
8992
}
9093

94+
function applyTopicAlias (client, packet) {
95+
if (client.options.protocolVersion === 5) {
96+
if (packet.cmd === 'publish') {
97+
var alias
98+
if (packet.properties) {
99+
alias = packet.properties.topicAlias
100+
}
101+
var topic = packet.topic.toString()
102+
if (client.topicAliasSend) {
103+
if (alias) {
104+
if (topic.length !== 0) {
105+
// register topic alias
106+
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
107+
if (!client.topicAliasSend.put(topic, alias)) {
108+
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
109+
return new Error('Sending Topic Alias out of range')
110+
}
111+
}
112+
} else {
113+
if (topic.length !== 0) {
114+
if (client.options.autoAssignTopicAlias) {
115+
alias = client.topicAliasSend.getAliasByTopic(topic)
116+
if (alias) {
117+
packet.topic = ''
118+
packet.properties = {...(packet.properties), topicAlias: alias}
119+
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
120+
} else {
121+
alias = client.topicAliasSend.getLruAlias()
122+
client.topicAliasSend.put(topic, alias)
123+
packet.properties = {...(packet.properties), topicAlias: alias}
124+
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
125+
}
126+
} else if (client.options.autoUseTopicAlias) {
127+
alias = client.topicAliasSend.getAliasByTopic(topic)
128+
if (alias) {
129+
packet.topic = ''
130+
packet.properties = {...(packet.properties), topicAlias: alias}
131+
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
132+
}
133+
}
134+
}
135+
}
136+
} else if (alias) {
137+
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
138+
return new Error('Sending Topic Alias out of range')
139+
}
140+
}
141+
}
142+
}
143+
144+
function removeTopicAliasAndRecoverTopicName (client, packet) {
145+
var alias
146+
if (packet.properties) {
147+
alias = packet.properties.topicAlias
148+
}
149+
150+
var topic = packet.topic.toString()
151+
if (topic.length === 0) {
152+
// restore topic from alias
153+
if (typeof alias === 'undefined') {
154+
return new Error('Unregistered Topic Alias')
155+
} else {
156+
topic = client.topicAliasSend.getTopicByAlias(alias)
157+
if (typeof topic === 'undefined') {
158+
return new Error('Unregistered Topic Alias')
159+
} else {
160+
packet.topic = topic
161+
}
162+
}
163+
}
164+
if (alias) {
165+
delete packet.properties.topicAlias
166+
}
167+
}
168+
91169
function sendPacket (client, packet, cb) {
92170
debug('sendPacket :: packet: %O', packet)
93171
debug('sendPacket :: emitting `packetsend`')
172+
94173
client.emit('packetsend', packet)
95174

96175
debug('sendPacket :: writing to stream')
@@ -131,7 +210,19 @@ function flushVolatile (queue) {
131210

132211
function storeAndSend (client, packet, cb, cbStorePut) {
133212
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
134-
client.outgoingStore.put(packet, function storedPacket (err) {
213+
var storePacket = packet
214+
var err
215+
if (storePacket.cmd === 'publish') {
216+
// The original packet is for sending.
217+
// The cloned storePacket is for storing to resend on reconnect.
218+
// Topic Alias must not be used after disconnected.
219+
storePacket = clone(packet)
220+
err = removeTopicAliasAndRecoverTopicName(client, storePacket)
221+
if (err) {
222+
return cb && cb(err)
223+
}
224+
}
225+
client.outgoingStore.put(storePacket, function storedPacket (err) {
135226
if (err) {
136227
return cb && cb(err)
137228
}
@@ -176,6 +267,7 @@ function MqttClient (streamBuilder, options) {
176267
debug('MqttClient :: options.keepalive', options.keepalive)
177268
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
178269
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
270+
debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum)
179271

180272
this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
181273

@@ -225,6 +317,14 @@ function MqttClient (streamBuilder, options) {
225317
// True if connection is first time.
226318
this._firstConnection = true
227319

320+
if (options.topicAliasMaximum > 0) {
321+
if (options.topicAliasMaximum > 0xffff) {
322+
debug('MqttClient :: options.topicAliasMaximum is out of range')
323+
} else {
324+
this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
325+
}
326+
}
327+
228328
// Send queued packets
229329
this.on('connect', function () {
230330
var queue = this.queue
@@ -282,6 +382,10 @@ function MqttClient (streamBuilder, options) {
282382
that.pingTimer = null
283383
}
284384

385+
if (this.topicAliasRecv) {
386+
this.topicAliasRecv.clear()
387+
}
388+
285389
debug('close :: calling _setupReconnect')
286390
this._setupReconnect()
287391
})
@@ -378,6 +482,14 @@ MqttClient.prototype._setupStream = function () {
378482
debug('_setupStream: sending packet `connect`')
379483
connectPacket = Object.create(this.options)
380484
connectPacket.cmd = 'connect'
485+
if (this.topicAliasRecv) {
486+
if (!connectPacket.properties) {
487+
connectPacket.properties = {}
488+
}
489+
if (this.topicAliasRecv) {
490+
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
491+
}
492+
}
381493
// avoid message queue
382494
sendPacket(this, connectPacket)
383495

@@ -526,17 +638,6 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
526638

527639
if (options.protocolVersion === 5) {
528640
packet.properties = opts.properties
529-
if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
530-
((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
531-
(!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
532-
/*
533-
if we are don`t setup topic alias or
534-
topic alias maximum less than topic alias or
535-
server don`t give topic alias maximum,
536-
we are removing topic alias from packet
537-
*/
538-
delete packet.properties.topicAlias
539-
}
540641
}
541642

542643
debug('publish :: qos', opts.qos)
@@ -1102,6 +1203,13 @@ MqttClient.prototype._cleanUp = function (forced, done) {
11021203
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
11031204
debug('_sendPacket :: (%s) :: start', this.options.clientId)
11041205
cbStorePut = cbStorePut || nop
1206+
cb = cb || nop
1207+
1208+
var err = applyTopicAlias(this, packet)
1209+
if (err) {
1210+
cb(err)
1211+
return
1212+
}
11051213

11061214
if (!this.connected) {
11071215
debug('_sendPacket :: client not connected. Storing packet offline.')
@@ -1154,12 +1262,23 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
11541262
debug('_storePacket :: cb? %s', !!cb)
11551263
cbStorePut = cbStorePut || nop
11561264

1265+
var storePacket = packet
1266+
if (storePacket.cmd === 'publish') {
1267+
// The original packet is for sending.
1268+
// The cloned storePacket is for storing to resend on reconnect.
1269+
// Topic Alias must not be used after disconnected.
1270+
storePacket = clone(packet)
1271+
var err = removeTopicAliasAndRecoverTopicName(this, storePacket)
1272+
if (err) {
1273+
return cb && cb(err)
1274+
}
1275+
}
11571276
// check that the packet is not a qos of 0, or that the command is not a publish
1158-
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
1159-
this.queue.push({ packet: packet, cb: cb })
1160-
} else if (packet.qos > 0) {
1161-
cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
1162-
this.outgoingStore.put(packet, function (err) {
1277+
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
1278+
this.queue.push({ packet: storePacket, cb: cb })
1279+
} else if (storePacket.qos > 0) {
1280+
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
1281+
this.outgoingStore.put(storePacket, function (err) {
11631282
if (err) {
11641283
return cb && cb(err)
11651284
}
@@ -1237,11 +1356,17 @@ MqttClient.prototype._handleConnack = function (packet) {
12371356
var rc = version === 5 ? packet.reasonCode : packet.returnCode
12381357

12391358
clearTimeout(this.connackTimer)
1359+
delete this.topicAliasSend
12401360

12411361
if (packet.properties) {
12421362
if (packet.properties.topicAliasMaximum) {
1243-
if (!options.properties) { options.properties = {} }
1244-
options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
1363+
if (packet.properties.topicAliasMaximum > 0xffff) {
1364+
this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
1365+
return
1366+
}
1367+
if (packet.properties.topicAliasMaximum > 0) {
1368+
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
1369+
}
12451370
}
12461371
if (packet.properties.serverKeepAlive && options.keepalive) {
12471372
options.keepalive = packet.properties.serverKeepAlive
@@ -1303,6 +1428,39 @@ MqttClient.prototype._handlePublish = function (packet, done) {
13031428
var that = this
13041429
var options = this.options
13051430
var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
1431+
if (this.options.protocolVersion === 5) {
1432+
var alias
1433+
if (packet.properties) {
1434+
alias = packet.properties.topicAlias
1435+
}
1436+
if (typeof alias !== 'undefined') {
1437+
if (topic.length === 0) {
1438+
if (alias > 0 && alias <= 0xffff) {
1439+
var gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
1440+
if (gotTopic) {
1441+
topic = gotTopic
1442+
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
1443+
} else {
1444+
debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
1445+
this.emit('error', new Error('Received unregistered Topic Alias'))
1446+
return
1447+
}
1448+
} else {
1449+
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
1450+
this.emit('error', new Error('Received Topic Alias is out of range'))
1451+
return
1452+
}
1453+
} else {
1454+
if (this.topicAliasRecv.put(topic, alias)) {
1455+
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
1456+
} else {
1457+
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
1458+
this.emit('error', new Error('Received Topic Alias is out of range'))
1459+
return
1460+
}
1461+
}
1462+
}
1463+
}
13061464
debug('_handlePublish: qos %d', qos)
13071465
switch (qos) {
13081466
case 2: {

lib/topic-alias-recv.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
'use strict'
2+
3+
/**
4+
* Topic Alias receiving manager
5+
* This holds alias to topic map
6+
* @param {Number} [max] - topic alias maximum entries
7+
*/
8+
function TopicAliasRecv (max) {
9+
if (!(this instanceof TopicAliasRecv)) {
10+
return new TopicAliasRecv(max)
11+
}
12+
this.aliasToTopic = {}
13+
this.max = max
14+
}
15+
16+
/**
17+
* Insert or update topic - alias entry.
18+
* @param {String} [topic] - topic
19+
* @param {Number} [alias] - topic alias
20+
* @returns {Boolean} - if success return true otherwise false
21+
*/
22+
TopicAliasRecv.prototype.put = function (topic, alias) {
23+
if (alias === 0 || alias > this.max) {
24+
return false
25+
}
26+
this.aliasToTopic[alias] = topic
27+
this.length = Object.keys(this.aliasToTopic).length
28+
return true
29+
}
30+
31+
/**
32+
* Get topic by alias
33+
* @param {String} [topic] - topic
34+
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
35+
*/
36+
TopicAliasRecv.prototype.getTopicByAlias = function (alias) {
37+
return this.aliasToTopic[alias]
38+
}
39+
40+
/**
41+
* Clear all entries
42+
*/
43+
TopicAliasRecv.prototype.clear = function () {
44+
this.aliasToTopic = {}
45+
}
46+
47+
module.exports = TopicAliasRecv

0 commit comments

Comments
 (0)