|
5 | 5 | */
|
6 | 6 | var EventEmitter = require('events').EventEmitter
|
7 | 7 | var Store = require('./store')
|
| 8 | +var TopicAliasRecv = require('./topic-alias-recv') |
| 9 | +var TopicAliasSend = require('./topic-alias-send') |
8 | 10 | var mqttPacket = require('mqtt-packet')
|
9 | 11 | var DefaultMessageIdProvider = require('./default-message-id-provider')
|
10 | 12 | var Writable = require('readable-stream').Writable
|
11 | 13 | var inherits = require('inherits')
|
12 | 14 | var reInterval = require('reinterval')
|
| 15 | +var clone = require('rfdc/default') |
13 | 16 | var validations = require('./validations')
|
14 | 17 | var xtend = require('xtend')
|
15 | 18 | var debug = require('debug')('mqttjs:client')
|
@@ -88,9 +91,85 @@ function defaultId () {
|
88 | 91 | return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
|
89 | 92 | }
|
90 | 93 |
|
| 94 | +function applyTopicAlias (client, packet) { |
| 95 | + if (client.options.protocolVersion === 5) { |
| 96 | + if (packet.cmd === 'publish') { |
| 97 | + var alias |
| 98 | + if (packet.properties) { |
| 99 | + alias = packet.properties.topicAlias |
| 100 | + } |
| 101 | + var topic = packet.topic.toString() |
| 102 | + if (client.topicAliasSend) { |
| 103 | + if (alias) { |
| 104 | + if (topic.length !== 0) { |
| 105 | + // register topic alias |
| 106 | + debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias) |
| 107 | + if (!client.topicAliasSend.put(topic, alias)) { |
| 108 | + debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias) |
| 109 | + return new Error('Sending Topic Alias out of range') |
| 110 | + } |
| 111 | + } |
| 112 | + } else { |
| 113 | + if (topic.length !== 0) { |
| 114 | + if (client.options.autoAssignTopicAlias) { |
| 115 | + alias = client.topicAliasSend.getAliasByTopic(topic) |
| 116 | + if (alias) { |
| 117 | + packet.topic = '' |
| 118 | + packet.properties = {...(packet.properties), topicAlias: alias} |
| 119 | + debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias) |
| 120 | + } else { |
| 121 | + alias = client.topicAliasSend.getLruAlias() |
| 122 | + client.topicAliasSend.put(topic, alias) |
| 123 | + packet.properties = {...(packet.properties), topicAlias: alias} |
| 124 | + debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias) |
| 125 | + } |
| 126 | + } else if (client.options.autoUseTopicAlias) { |
| 127 | + alias = client.topicAliasSend.getAliasByTopic(topic) |
| 128 | + if (alias) { |
| 129 | + packet.topic = '' |
| 130 | + packet.properties = {...(packet.properties), topicAlias: alias} |
| 131 | + debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias) |
| 132 | + } |
| 133 | + } |
| 134 | + } |
| 135 | + } |
| 136 | + } else if (alias) { |
| 137 | + debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias) |
| 138 | + return new Error('Sending Topic Alias out of range') |
| 139 | + } |
| 140 | + } |
| 141 | + } |
| 142 | +} |
| 143 | + |
| 144 | +function removeTopicAliasAndRecoverTopicName (client, packet) { |
| 145 | + var alias |
| 146 | + if (packet.properties) { |
| 147 | + alias = packet.properties.topicAlias |
| 148 | + } |
| 149 | + |
| 150 | + var topic = packet.topic.toString() |
| 151 | + if (topic.length === 0) { |
| 152 | + // restore topic from alias |
| 153 | + if (typeof alias === 'undefined') { |
| 154 | + return new Error('Unregistered Topic Alias') |
| 155 | + } else { |
| 156 | + topic = client.topicAliasSend.getTopicByAlias(alias) |
| 157 | + if (typeof topic === 'undefined') { |
| 158 | + return new Error('Unregistered Topic Alias') |
| 159 | + } else { |
| 160 | + packet.topic = topic |
| 161 | + } |
| 162 | + } |
| 163 | + } |
| 164 | + if (alias) { |
| 165 | + delete packet.properties.topicAlias |
| 166 | + } |
| 167 | +} |
| 168 | + |
91 | 169 | function sendPacket (client, packet, cb) {
|
92 | 170 | debug('sendPacket :: packet: %O', packet)
|
93 | 171 | debug('sendPacket :: emitting `packetsend`')
|
| 172 | + |
94 | 173 | client.emit('packetsend', packet)
|
95 | 174 |
|
96 | 175 | debug('sendPacket :: writing to stream')
|
@@ -131,7 +210,19 @@ function flushVolatile (queue) {
|
131 | 210 |
|
132 | 211 | function storeAndSend (client, packet, cb, cbStorePut) {
|
133 | 212 | debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
|
134 |
| - client.outgoingStore.put(packet, function storedPacket (err) { |
| 213 | + var storePacket = packet |
| 214 | + var err |
| 215 | + if (storePacket.cmd === 'publish') { |
| 216 | + // The original packet is for sending. |
| 217 | + // The cloned storePacket is for storing to resend on reconnect. |
| 218 | + // Topic Alias must not be used after disconnected. |
| 219 | + storePacket = clone(packet) |
| 220 | + err = removeTopicAliasAndRecoverTopicName(client, storePacket) |
| 221 | + if (err) { |
| 222 | + return cb && cb(err) |
| 223 | + } |
| 224 | + } |
| 225 | + client.outgoingStore.put(storePacket, function storedPacket (err) { |
135 | 226 | if (err) {
|
136 | 227 | return cb && cb(err)
|
137 | 228 | }
|
@@ -176,6 +267,7 @@ function MqttClient (streamBuilder, options) {
|
176 | 267 | debug('MqttClient :: options.keepalive', options.keepalive)
|
177 | 268 | debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
|
178 | 269 | debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
|
| 270 | + debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum) |
179 | 271 |
|
180 | 272 | this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
|
181 | 273 |
|
@@ -225,6 +317,14 @@ function MqttClient (streamBuilder, options) {
|
225 | 317 | // True if connection is first time.
|
226 | 318 | this._firstConnection = true
|
227 | 319 |
|
| 320 | + if (options.topicAliasMaximum > 0) { |
| 321 | + if (options.topicAliasMaximum > 0xffff) { |
| 322 | + debug('MqttClient :: options.topicAliasMaximum is out of range') |
| 323 | + } else { |
| 324 | + this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum) |
| 325 | + } |
| 326 | + } |
| 327 | + |
228 | 328 | // Send queued packets
|
229 | 329 | this.on('connect', function () {
|
230 | 330 | var queue = this.queue
|
@@ -282,6 +382,10 @@ function MqttClient (streamBuilder, options) {
|
282 | 382 | that.pingTimer = null
|
283 | 383 | }
|
284 | 384 |
|
| 385 | + if (this.topicAliasRecv) { |
| 386 | + this.topicAliasRecv.clear() |
| 387 | + } |
| 388 | + |
285 | 389 | debug('close :: calling _setupReconnect')
|
286 | 390 | this._setupReconnect()
|
287 | 391 | })
|
@@ -378,6 +482,14 @@ MqttClient.prototype._setupStream = function () {
|
378 | 482 | debug('_setupStream: sending packet `connect`')
|
379 | 483 | connectPacket = Object.create(this.options)
|
380 | 484 | connectPacket.cmd = 'connect'
|
| 485 | + if (this.topicAliasRecv) { |
| 486 | + if (!connectPacket.properties) { |
| 487 | + connectPacket.properties = {} |
| 488 | + } |
| 489 | + if (this.topicAliasRecv) { |
| 490 | + connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max |
| 491 | + } |
| 492 | + } |
381 | 493 | // avoid message queue
|
382 | 494 | sendPacket(this, connectPacket)
|
383 | 495 |
|
@@ -526,17 +638,6 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
|
526 | 638 |
|
527 | 639 | if (options.protocolVersion === 5) {
|
528 | 640 | packet.properties = opts.properties
|
529 |
| - if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && |
530 |
| - ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || |
531 |
| - (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { |
532 |
| - /* |
533 |
| - if we are don`t setup topic alias or |
534 |
| - topic alias maximum less than topic alias or |
535 |
| - server don`t give topic alias maximum, |
536 |
| - we are removing topic alias from packet |
537 |
| - */ |
538 |
| - delete packet.properties.topicAlias |
539 |
| - } |
540 | 641 | }
|
541 | 642 |
|
542 | 643 | debug('publish :: qos', opts.qos)
|
@@ -1102,6 +1203,13 @@ MqttClient.prototype._cleanUp = function (forced, done) {
|
1102 | 1203 | MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
|
1103 | 1204 | debug('_sendPacket :: (%s) :: start', this.options.clientId)
|
1104 | 1205 | cbStorePut = cbStorePut || nop
|
| 1206 | + cb = cb || nop |
| 1207 | + |
| 1208 | + var err = applyTopicAlias(this, packet) |
| 1209 | + if (err) { |
| 1210 | + cb(err) |
| 1211 | + return |
| 1212 | + } |
1105 | 1213 |
|
1106 | 1214 | if (!this.connected) {
|
1107 | 1215 | debug('_sendPacket :: client not connected. Storing packet offline.')
|
@@ -1154,12 +1262,23 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
|
1154 | 1262 | debug('_storePacket :: cb? %s', !!cb)
|
1155 | 1263 | cbStorePut = cbStorePut || nop
|
1156 | 1264 |
|
| 1265 | + var storePacket = packet |
| 1266 | + if (storePacket.cmd === 'publish') { |
| 1267 | + // The original packet is for sending. |
| 1268 | + // The cloned storePacket is for storing to resend on reconnect. |
| 1269 | + // Topic Alias must not be used after disconnected. |
| 1270 | + storePacket = clone(packet) |
| 1271 | + var err = removeTopicAliasAndRecoverTopicName(this, storePacket) |
| 1272 | + if (err) { |
| 1273 | + return cb && cb(err) |
| 1274 | + } |
| 1275 | + } |
1157 | 1276 | // check that the packet is not a qos of 0, or that the command is not a publish
|
1158 |
| - if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { |
1159 |
| - this.queue.push({ packet: packet, cb: cb }) |
1160 |
| - } else if (packet.qos > 0) { |
1161 |
| - cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null |
1162 |
| - this.outgoingStore.put(packet, function (err) { |
| 1277 | + if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') { |
| 1278 | + this.queue.push({ packet: storePacket, cb: cb }) |
| 1279 | + } else if (storePacket.qos > 0) { |
| 1280 | + cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null |
| 1281 | + this.outgoingStore.put(storePacket, function (err) { |
1163 | 1282 | if (err) {
|
1164 | 1283 | return cb && cb(err)
|
1165 | 1284 | }
|
@@ -1237,11 +1356,17 @@ MqttClient.prototype._handleConnack = function (packet) {
|
1237 | 1356 | var rc = version === 5 ? packet.reasonCode : packet.returnCode
|
1238 | 1357 |
|
1239 | 1358 | clearTimeout(this.connackTimer)
|
| 1359 | + delete this.topicAliasSend |
1240 | 1360 |
|
1241 | 1361 | if (packet.properties) {
|
1242 | 1362 | if (packet.properties.topicAliasMaximum) {
|
1243 |
| - if (!options.properties) { options.properties = {} } |
1244 |
| - options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum |
| 1363 | + if (packet.properties.topicAliasMaximum > 0xffff) { |
| 1364 | + this.emit('error', new Error('topicAliasMaximum from broker is out of range')) |
| 1365 | + return |
| 1366 | + } |
| 1367 | + if (packet.properties.topicAliasMaximum > 0) { |
| 1368 | + this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum) |
| 1369 | + } |
1245 | 1370 | }
|
1246 | 1371 | if (packet.properties.serverKeepAlive && options.keepalive) {
|
1247 | 1372 | options.keepalive = packet.properties.serverKeepAlive
|
@@ -1303,6 +1428,39 @@ MqttClient.prototype._handlePublish = function (packet, done) {
|
1303 | 1428 | var that = this
|
1304 | 1429 | var options = this.options
|
1305 | 1430 | var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
|
| 1431 | + if (this.options.protocolVersion === 5) { |
| 1432 | + var alias |
| 1433 | + if (packet.properties) { |
| 1434 | + alias = packet.properties.topicAlias |
| 1435 | + } |
| 1436 | + if (typeof alias !== 'undefined') { |
| 1437 | + if (topic.length === 0) { |
| 1438 | + if (alias > 0 && alias <= 0xffff) { |
| 1439 | + var gotTopic = this.topicAliasRecv.getTopicByAlias(alias) |
| 1440 | + if (gotTopic) { |
| 1441 | + topic = gotTopic |
| 1442 | + debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias) |
| 1443 | + } else { |
| 1444 | + debug('_handlePublish :: unregistered topic alias. alias: %d', alias) |
| 1445 | + this.emit('error', new Error('Received unregistered Topic Alias')) |
| 1446 | + return |
| 1447 | + } |
| 1448 | + } else { |
| 1449 | + debug('_handlePublish :: topic alias out of range. alias: %d', alias) |
| 1450 | + this.emit('error', new Error('Received Topic Alias is out of range')) |
| 1451 | + return |
| 1452 | + } |
| 1453 | + } else { |
| 1454 | + if (this.topicAliasRecv.put(topic, alias)) { |
| 1455 | + debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias) |
| 1456 | + } else { |
| 1457 | + debug('_handlePublish :: topic alias out of range. alias: %d', alias) |
| 1458 | + this.emit('error', new Error('Received Topic Alias is out of range')) |
| 1459 | + return |
| 1460 | + } |
| 1461 | + } |
| 1462 | + } |
| 1463 | + } |
1306 | 1464 | debug('_handlePublish: qos %d', qos)
|
1307 | 1465 | switch (qos) {
|
1308 | 1466 | case 2: {
|
|
0 commit comments