File tree Expand file tree Collapse file tree 2 files changed +46
-5
lines changed Expand file tree Collapse file tree 2 files changed +46
-5
lines changed Original file line number Diff line number Diff line change @@ -136,10 +136,28 @@ function MqttClient (streamBuilder, options) {
136
136
}
137
137
138
138
this . connected = true
139
- var outStore = null
140
- outStore = this . outgoingStore . createStream ( )
139
+ var outStore = this . outgoingStore . createStream ( )
140
+
141
+ this . once ( 'close' , remove )
142
+ outStore . on ( 'end' , function ( ) {
143
+ that . removeListener ( 'close' , remove )
144
+ } )
145
+ outStore . on ( 'error' , function ( err ) {
146
+ that . removeListener ( 'close' , remove )
147
+ that . emit ( 'error' , err )
148
+ } )
149
+
150
+ function remove ( ) {
151
+ outStore . destroy ( )
152
+ outStore = null
153
+ }
141
154
142
155
function storeDeliver ( ) {
156
+ // edge case, we wrapped this twice
157
+ if ( ! outStore ) {
158
+ return
159
+ }
160
+
143
161
var packet = outStore . read ( 1 )
144
162
var cb
145
163
@@ -166,9 +184,6 @@ function MqttClient (streamBuilder, options) {
166
184
}
167
185
}
168
186
169
- // Control of stored messages
170
- outStore . on ( 'error' , this . emit . bind ( this , 'error' ) )
171
-
172
187
// start flowing
173
188
storeDeliver ( )
174
189
} )
Original file line number Diff line number Diff line change @@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) {
130
130
} )
131
131
} )
132
132
} )
133
+
134
+ it ( 'should replace a packet when doing put with the same messageId' , function ( done ) {
135
+ var packet1 = {
136
+ cmd : 'publish' , // added
137
+ topic : 'hello' ,
138
+ payload : 'world' ,
139
+ qos : 2 ,
140
+ messageId : 42
141
+ }
142
+ var packet2 = {
143
+ cmd : 'pubrel' , // added
144
+ qos : 2 ,
145
+ messageId : 42
146
+ }
147
+
148
+ store . put ( packet1 , function ( ) {
149
+ store . put ( packet2 , function ( ) {
150
+ store
151
+ . createStream ( )
152
+ . on ( 'data' , function ( data ) {
153
+ data . should . eql ( packet2 )
154
+ done ( )
155
+ } )
156
+ } )
157
+ } )
158
+ } )
133
159
}
You can’t perform that action at this time.
0 commit comments