Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 8fc6d38

Browse files
authored
fix(event processor): Stop accepting events after stop is called (optimizely#355)
Summary: Before this change, event processor would continue processing user events after its stop method was called. If the batch size limit was reached after stop was called, it would dispatch a log event as normal. Worse, the internal queue wasn't being emptied when stop was called, which could lead to the same user event being included in two different log events. With this change, the event processor's internal queue stops accepting events after the event processor is stopped. A warning message is logged, but the event is dropped. Test plan: Updated unit tests. Will manually test upon integration of a new version in optimizely-sdk.
1 parent f71646a commit 8fc6d38

File tree

4 files changed

+77
-0
lines changed

4 files changed

+77
-0
lines changed

packages/event-processor/CHANGELOG.MD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77
## [Unreleased]
88
Changes that have landed but are not yet released.
99

10+
### Fixed
11+
- `DefaultEventQueue` no longer enqueues additional events after being stopped. As a result, `AbstractEventProcessor` no longer processes events after being stopped.
12+
- `DefaultEventQueue` clears its buffer after being stopped. Event duplication, which was previously possible when additional events were enqueued after the stop, is no longer possible.
13+
1014
## [0.3.0] - August 13, 2019
1115

1216
### New Features

packages/event-processor/__tests__/eventQueue.spec.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,5 +266,25 @@ describe('eventQueue', () => {
266266
queue.stop()
267267

268268
})
269+
270+
it('should not enqueue additional events after stop() is called', () => {
271+
const sinkFn = jest.fn()
272+
const queue = new DefaultEventQueue<number>({
273+
flushInterval: 30000,
274+
maxQueueSize: 3,
275+
sink: sinkFn,
276+
batchComparator: () => true
277+
})
278+
queue.start()
279+
queue.enqueue(1)
280+
queue.stop()
281+
expect(sinkFn).toHaveBeenCalledTimes(1)
282+
expect(sinkFn).toHaveBeenCalledWith([1])
283+
sinkFn.mockClear()
284+
queue.enqueue(2)
285+
queue.enqueue(3)
286+
queue.enqueue(4)
287+
expect(sinkFn).toBeCalledTimes(0)
288+
})
269289
})
270290
})

packages/event-processor/__tests__/v1EventProcessor.spec.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ describe('LogTierV1EventProcessor', () => {
154154
flushInterval: 100,
155155
maxQueueSize: 100,
156156
})
157+
processor.start()
157158

158159
const impressionEvent = createImpressionEvent()
159160
processor.process(impressionEvent)
@@ -183,6 +184,7 @@ describe('LogTierV1EventProcessor', () => {
183184
flushInterval: 100,
184185
maxQueueSize: 100,
185186
})
187+
processor.start()
186188

187189
const impressionEvent = createImpressionEvent()
188190
processor.process(impressionEvent)
@@ -211,6 +213,7 @@ describe('LogTierV1EventProcessor', () => {
211213
flushInterval: 100,
212214
maxQueueSize: 100,
213215
})
216+
processor.start()
214217

215218
const impressionEvent1 = createImpressionEvent()
216219
const impressionEvent2 = createImpressionEvent()
@@ -223,6 +226,41 @@ describe('LogTierV1EventProcessor', () => {
223226
done()
224227
})
225228
})
229+
230+
it('should stop accepting events after stop is called', () => {
231+
const dispatcher = {
232+
dispatchEvent: jest.fn((event: EventV1Request, callback: EventDispatcherCallback) => {
233+
setTimeout(() => callback({ statusCode: 204 }), 0)
234+
})
235+
}
236+
const processor = new LogTierV1EventProcessor({
237+
dispatcher,
238+
flushInterval: 100,
239+
maxQueueSize: 3,
240+
})
241+
processor.start()
242+
243+
const impressionEvent1 = createImpressionEvent()
244+
processor.process(impressionEvent1)
245+
processor.stop()
246+
// calling stop should haver flushed the current batch of size 1
247+
expect(dispatcher.dispatchEvent).toBeCalledTimes(1)
248+
249+
dispatcher.dispatchEvent.mockClear();
250+
251+
// From now on, subsequent events should be ignored.
252+
// Process 3 more, which ordinarily would have triggered
253+
// a flush due to the batch size.
254+
const impressionEvent2 = createImpressionEvent()
255+
processor.process(impressionEvent2)
256+
const impressionEvent3 = createImpressionEvent()
257+
processor.process(impressionEvent3)
258+
const impressionEvent4 = createImpressionEvent()
259+
processor.process(impressionEvent4)
260+
// Since we already stopped the processor, the dispatcher should
261+
// not have been called again.
262+
expect(dispatcher.dispatchEvent).toBeCalledTimes(0)
263+
})
226264
})
227265

228266
describe('when maxQueueSize = 1', () => {

packages/event-processor/src/eventQueue.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
17+
import { getLogger } from '@optimizely/js-sdk-logging'
1618
// TODO change this to use Managed from js-sdk-models when available
1719
import { Managed } from './managed'
20+
21+
const logger = getLogger('EventProcessor')
22+
1823
export type EventQueueSink<K> = (buffer: K[]) => Promise<any>
1924

2025
export interface EventQueue<K> extends Managed {
@@ -85,6 +90,7 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
8590
// batchComparator is called to determine whether two events can be included
8691
// together in the same batch
8792
private batchComparator: (eventA: K, eventB: K) => boolean
93+
private started: boolean
8894

8995
constructor({
9096
flushInterval,
@@ -105,19 +111,28 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
105111
callback: this.flush.bind(this),
106112
timeout: flushInterval,
107113
})
114+
this.started = false
108115
}
109116

110117
start(): void {
118+
this.started = true
111119
// dont start the timer until the first event is enqueued
112120
}
113121

114122
stop(): Promise<any> {
123+
this.started = false
115124
const result = this.sink(this.buffer)
125+
this.buffer = []
116126
this.timer.stop()
117127
return result
118128
}
119129

120130
enqueue(event: K): void {
131+
if (!this.started) {
132+
logger.warn('Queue is stopped, not accepting event')
133+
return
134+
}
135+
121136
// If new event cannot be included into the current batch, flush so it can
122137
// be in its own new batch.
123138
const bufferedEvent: K | undefined = this.buffer[0]

0 commit comments

Comments
 (0)