Skip to content

Commit 6308dea

Browse files
author
Yoseph Maguire
authored
Merge branch 'master' into master
2 parents 51c5c02 + be17dd7 commit 6308dea

File tree

11 files changed

+128
-60
lines changed

11 files changed

+128
-60
lines changed

.github/workflows/nodejs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515

1616
strategy:
1717
matrix:
18-
node-version: [10.x, 12.x, 14.x]
18+
node-version: [12.x, 14.x]
1919
fail-fast: false
2020

2121
steps:

lib/connect/ws.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ function streamBuilder (client, opts) {
117117
let socket = createWebSocket(client, url, options)
118118
let webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
119119
webSocketStream.url = url
120+
socket.on('close', () => { webSocketStream.destroy() })
120121
return webSocketStream
121122
}
122123

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@
6565
"commist": "^1.0.0",
6666
"concat-stream": "^2.0.0",
6767
"debug": "^4.1.1",
68-
"help-me": "^1.0.1",
68+
"duplexify": "^4.1.1",
69+
"help-me": "^3.0.0",
6970
"inherits": "^2.0.3",
7071
"minimist": "^1.2.5",
71-
"mqtt-packet": "^6.6.0",
72+
"mqtt-packet": "^6.8.0",
7273
"pump": "^3.0.0",
7374
"readable-stream": "^3.6.0",
7475
"reinterval": "^1.1.0",

test/abstract_client.js

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ var should = require('chai').should
77
var sinon = require('sinon')
88
var mqtt = require('../')
99
var xtend = require('xtend')
10-
var MqttServer = require('./server').MqttServer
1110
var Store = require('./../lib/store')
1211
var assert = require('chai').assert
1312
var ports = require('./helpers/port_list')
13+
var serverBuilder = require('./server_helpers_for_client_tests').serverBuilder
1414

1515
module.exports = function (server, config) {
1616
var version = config.protocolVersion || 4
@@ -598,9 +598,10 @@ module.exports = function (server, config) {
598598
var incomingStore = new mqtt.Store({ clean: false })
599599
var outgoingStore = new mqtt.Store({ clean: false })
600600
var publishCount = 0
601-
var server2 = new MqttServer(function (serverClient) {
601+
var server2 = serverBuilder(config.protocol, function (serverClient) {
602602
serverClient.on('connect', function () {
603-
serverClient.connack({returnCode: 0})
603+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
604+
serverClient.connack(connack)
604605
})
605606
serverClient.on('publish', function (packet) {
606607
if (packet.qos !== 0) {
@@ -626,7 +627,7 @@ module.exports = function (server, config) {
626627
})
627628

628629
server2.listen(ports.PORTAND50, function () {
629-
client = mqtt.connect({
630+
client = connect({
630631
port: ports.PORTAND50,
631632
host: 'localhost',
632633
clean: false,
@@ -1292,13 +1293,14 @@ module.exports = function (server, config) {
12921293
var client = {}
12931294
var incomingStore = new mqtt.Store({ clean: false })
12941295
var outgoingStore = new mqtt.Store({ clean: false })
1295-
var server2 = new MqttServer(function (serverClient) {
1296+
var server2 = serverBuilder(config.protocol, function (serverClient) {
12961297
// errors are not interesting for this test
12971298
// but they might happen on some platforms
12981299
serverClient.on('error', function () {})
12991300

13001301
serverClient.on('connect', function (packet) {
1301-
serverClient.connack({returnCode: 0})
1302+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
1303+
serverClient.connack(connack)
13021304
})
13031305
serverClient.on('publish', function (packet) {
13041306
serverClient.puback({messageId: packet.messageId})
@@ -1321,7 +1323,7 @@ module.exports = function (server, config) {
13211323
})
13221324

13231325
server2.listen(ports.PORTAND50, function () {
1324-
client = mqtt.connect({
1326+
client = connect({
13251327
port: ports.PORTAND50,
13261328
host: 'localhost',
13271329
clean: false,
@@ -2655,9 +2657,10 @@ module.exports = function (server, config) {
26552657
it('should not resubscribe when reconnecting if suback is error', function (done) {
26562658
var tryReconnect = true
26572659
var reconnectEvent = false
2658-
var server2 = new MqttServer(function (serverClient) {
2660+
var server2 = serverBuilder(config.protocol, function (serverClient) {
26592661
serverClient.on('connect', function (packet) {
2660-
serverClient.connack({returnCode: 0})
2662+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2663+
serverClient.connack(connack)
26612664
})
26622665
serverClient.on('subscribe', function (packet) {
26632666
serverClient.suback({
@@ -2671,7 +2674,7 @@ module.exports = function (server, config) {
26712674
})
26722675

26732676
server2.listen(ports.PORTAND49, function () {
2674-
var client = mqtt.connect({
2677+
var client = connect({
26752678
port: ports.PORTAND49,
26762679
host: 'localhost',
26772680
reconnectPeriod: 100
@@ -2708,9 +2711,10 @@ module.exports = function (server, config) {
27082711
var client = {}
27092712
var incomingStore = new mqtt.Store({ clean: false })
27102713
var outgoingStore = new mqtt.Store({ clean: false })
2711-
var server2 = new MqttServer(function (serverClient) {
2714+
var server2 = serverBuilder(config.protocol, function (serverClient) {
27122715
serverClient.on('connect', function (packet) {
2713-
serverClient.connack({returnCode: 0})
2716+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2717+
serverClient.connack(connack)
27142718
if (reconnect) {
27152719
serverClient.pubrel({ messageId: 1 })
27162720
}
@@ -2741,7 +2745,7 @@ module.exports = function (server, config) {
27412745
})
27422746

27432747
server2.listen(ports.PORTAND50, function () {
2744-
client = mqtt.connect({
2748+
client = connect({
27452749
port: ports.PORTAND50,
27462750
host: 'localhost',
27472751
clean: false,
@@ -2768,9 +2772,10 @@ module.exports = function (server, config) {
27682772
it('should clear outgoing if close from server', function (done) {
27692773
var reconnect = false
27702774
var client = {}
2771-
var server2 = new MqttServer(function (serverClient) {
2775+
var server2 = serverBuilder(config.protocol, function (serverClient) {
27722776
serverClient.on('connect', function (packet) {
2773-
serverClient.connack({returnCode: 0})
2777+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2778+
serverClient.connack(connack)
27742779
})
27752780
serverClient.on('subscribe', function (packet) {
27762781
if (reconnect) {
@@ -2787,11 +2792,12 @@ module.exports = function (server, config) {
27872792
})
27882793

27892794
server2.listen(ports.PORTAND50, function () {
2790-
client = mqtt.connect({
2795+
client = connect({
27912796
port: ports.PORTAND50,
27922797
host: 'localhost',
27932798
clean: true,
27942799
clientId: 'cid1',
2800+
keepalive: 1,
27952801
reconnectPeriod: 0
27962802
})
27972803

@@ -2821,9 +2827,10 @@ module.exports = function (server, config) {
28212827
var client = {}
28222828
var incomingStore = new mqtt.Store({ clean: false })
28232829
var outgoingStore = new mqtt.Store({ clean: false })
2824-
var server2 = new MqttServer(function (serverClient) {
2830+
var server2 = serverBuilder(config.protocol, function (serverClient) {
28252831
serverClient.on('connect', function (packet) {
2826-
serverClient.connack({returnCode: 0})
2832+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2833+
serverClient.connack(connack)
28272834
})
28282835
serverClient.on('publish', function (packet) {
28292836
if (reconnect) {
@@ -2842,7 +2849,7 @@ module.exports = function (server, config) {
28422849
})
28432850

28442851
server2.listen(ports.PORTAND50, function () {
2845-
client = mqtt.connect({
2852+
client = connect({
28462853
port: ports.PORTAND50,
28472854
host: 'localhost',
28482855
clean: false,
@@ -2866,9 +2873,10 @@ module.exports = function (server, config) {
28662873
var client = {}
28672874
var incomingStore = new mqtt.Store({ clean: false })
28682875
var outgoingStore = new mqtt.Store({ clean: false })
2869-
var server2 = new MqttServer(function (serverClient) {
2876+
var server2 = serverBuilder(config.protocol, function (serverClient) {
28702877
serverClient.on('connect', function (packet) {
2871-
serverClient.connack({returnCode: 0})
2878+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2879+
serverClient.connack(connack)
28722880
})
28732881
serverClient.on('publish', function (packet) {
28742882
if (reconnect) {
@@ -2887,7 +2895,7 @@ module.exports = function (server, config) {
28872895
})
28882896

28892897
server2.listen(ports.PORTAND50, function () {
2890-
client = mqtt.connect({
2898+
client = connect({
28912899
port: ports.PORTAND50,
28922900
host: 'localhost',
28932901
clean: false,
@@ -2911,9 +2919,10 @@ module.exports = function (server, config) {
29112919
var client = {}
29122920
var incomingStore = new mqtt.Store({ clean: false })
29132921
var outgoingStore = new mqtt.Store({ clean: false })
2914-
var server2 = new MqttServer(function (serverClient) {
2922+
var server2 = serverBuilder(config.protocol, function (serverClient) {
29152923
serverClient.on('connect', function (packet) {
2916-
serverClient.connack({returnCode: 0})
2924+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2925+
serverClient.connack(connack)
29172926
})
29182927
serverClient.on('publish', function (packet) {
29192928
if (!reconnect) {
@@ -2937,7 +2946,7 @@ module.exports = function (server, config) {
29372946
})
29382947

29392948
server2.listen(ports.PORTAND50, function () {
2940-
client = mqtt.connect({
2949+
client = connect({
29412950
port: ports.PORTAND50,
29422951
host: 'localhost',
29432952
clean: false,
@@ -2963,13 +2972,14 @@ module.exports = function (server, config) {
29632972
var client = {}
29642973
var incomingStore = new mqtt.Store({ clean: false })
29652974
var outgoingStore = new mqtt.Store({ clean: false })
2966-
var server2 = new MqttServer(function (serverClient) {
2975+
var server2 = serverBuilder(config.protocol, function (serverClient) {
29672976
// errors are not interesting for this test
29682977
// but they might happen on some platforms
29692978
serverClient.on('error', function () {})
29702979

29712980
serverClient.on('connect', function (packet) {
2972-
serverClient.connack({returnCode: 0})
2981+
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
2982+
serverClient.connack(connack)
29732983
})
29742984
serverClient.on('publish', function (packet) {
29752985
serverClient.puback({messageId: packet.messageId})
@@ -3003,7 +3013,7 @@ module.exports = function (server, config) {
30033013
})
30043014

30053015
server2.listen(ports.PORTAND50, function () {
3006-
client = mqtt.connect({
3016+
client = connect({
30073017
port: ports.PORTAND50,
30083018
host: 'localhost',
30093019
clean: false,
@@ -3105,7 +3115,7 @@ module.exports = function (server, config) {
31053115
})
31063116

31073117
it('should resubscribe even if disconnect is before suback', function (done) {
3108-
var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
3118+
var client = connect(Object.assign({ reconnectPeriod: 100 }, config))
31093119
var subscribeCount = 0
31103120
var connectCount = 0
31113121

@@ -3136,7 +3146,7 @@ module.exports = function (server, config) {
31363146
})
31373147

31383148
it('should resubscribe exactly once', function (done) {
3139-
var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
3149+
var client = connect(Object.assign({ reconnectPeriod: 100 }, config))
31403150
var subscribeCount = 0
31413151

31423152
server.on('client', function (serverClient) {

test/client.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var debug = require('debug')('TEST:client')
1818

1919
describe('MqttClient', function () {
2020
var client
21-
var server = serverBuilder()
21+
var server = serverBuilder('mqtt')
2222
var config = {protocol: 'mqtt', port: ports.PORT}
2323
server.listen(ports.PORT)
2424

@@ -277,7 +277,7 @@ describe('MqttClient', function () {
277277
it('should not keep requeueing the first message when offline', function (done) {
278278
this.timeout(2500)
279279

280-
var server2 = serverBuilder().listen(ports.PORTAND45)
280+
var server2 = serverBuilder('mqtt').listen(ports.PORTAND45)
281281
client = mqtt.connect({
282282
port: ports.PORTAND45,
283283
host: 'localhost',

test/client_mqtt5.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ var serverBuilder = require('./server_helpers_for_client_tests').serverBuilder
88
var ports = require('./helpers/port_list')
99

1010
describe('MQTT 5.0', function () {
11-
var server = serverBuilder().listen(ports.PORTAND115)
11+
var server = serverBuilder('mqtt').listen(ports.PORTAND115)
1212
var config = { protocol: 'mqtt', port: ports.PORTAND115, protocolVersion: 5, properties: { maximumPacketSize: 200 } }
1313

1414
abstractClientTests(server, config)

test/server_helpers_for_client_tests.js

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
11
'use strict'
22

33
var MqttServer = require('./server').MqttServer
4-
var MqttServerNoWait = require('./server').MqttServerNoWait
4+
var MqttSecureServer = require('./server').MqttSecureServer
55
var debug = require('debug')('TEST:server_helpers')
66

7+
var path = require('path')
8+
var fs = require('fs')
9+
var KEY = path.join(__dirname, 'helpers', 'tls-key.pem')
10+
var CERT = path.join(__dirname, 'helpers', 'tls-cert.pem')
11+
12+
var http = require('http')
13+
var WebSocket = require('ws')
14+
var MQTTConnection = require('mqtt-connection')
15+
716
/**
817
* This will build the client for the server to use during testing, and set up the
918
* server side client based on mqtt-connection for handling MQTT messages.
10-
* @param {boolean} fastFlag
19+
* @param {String} protocol - 'mqtt', 'mqtts' or 'ws'
20+
* @param {Function} handler - event handler
1121
*/
12-
function serverBuilder (fastFlag) {
13-
var handler = function (serverClient) {
22+
function serverBuilder (protocol, handler) {
23+
var defaultHandler = function (serverClient) {
1424
serverClient.on('auth', function (packet) {
1525
var rc = 'reasonCode'
1626
var connack = {}
@@ -90,10 +100,39 @@ function serverBuilder (fastFlag) {
90100
debug('disconnected from server')
91101
})
92102
}
93-
if (fastFlag) {
94-
return new MqttServerNoWait(handler)
95-
} else {
96-
return new MqttServer(handler)
103+
104+
if (!handler) {
105+
handler = defaultHandler
106+
}
107+
108+
switch (protocol) {
109+
case 'mqtt':
110+
return new MqttServer(handler)
111+
case 'mqtts':
112+
return new MqttSecureServer({
113+
key: fs.readFileSync(KEY),
114+
cert: fs.readFileSync(CERT)
115+
},
116+
handler)
117+
case 'ws':
118+
var attachWebsocketServer = function (server) {
119+
var webSocketServer = new WebSocket.Server({server: server, perMessageDeflate: false})
120+
121+
webSocketServer.on('connection', function (ws) {
122+
var stream = WebSocket.createWebSocketStream(ws)
123+
var connection = new MQTTConnection(stream)
124+
connection.protocol = ws.protocol
125+
server.emit('client', connection)
126+
stream.on('error', function () {})
127+
connection.on('error', function () {})
128+
connection.on('close', function () {})
129+
})
130+
}
131+
132+
var httpServer = http.createServer()
133+
attachWebsocketServer(httpServer)
134+
httpServer.on('client', handler)
135+
return httpServer
97136
}
98137
}
99138

0 commit comments

Comments
 (0)