From e23b30c6de130dcc63410d29b01b2b3ba67accd0 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Tue, 27 Aug 2019 16:09:24 -0700 Subject: [PATCH 1/2] In event processor, stop accepting events after stop is called --- packages/event-processor/CHANGELOG.MD | 3 ++ .../__tests__/eventQueue.spec.ts | 20 ++++++++++ .../__tests__/v1EventProcessor.spec.ts | 38 +++++++++++++++++++ packages/event-processor/src/eventQueue.ts | 15 ++++++++ 4 files changed, 76 insertions(+) diff --git a/packages/event-processor/CHANGELOG.MD b/packages/event-processor/CHANGELOG.MD index dc55f3f0e..147412b17 100644 --- a/packages/event-processor/CHANGELOG.MD +++ b/packages/event-processor/CHANGELOG.MD @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] Changes that have landed but are not yet released. +### Changed +- Introduce an explict`DefaultEventQueue` no longer enqueues additional events after being stopped + ## [0.3.0] - August 13, 2019 ### New Features diff --git a/packages/event-processor/__tests__/eventQueue.spec.ts b/packages/event-processor/__tests__/eventQueue.spec.ts index 7722410ad..71aaa415f 100644 --- a/packages/event-processor/__tests__/eventQueue.spec.ts +++ b/packages/event-processor/__tests__/eventQueue.spec.ts @@ -266,5 +266,25 @@ describe('eventQueue', () => { queue.stop() }) + + it('should not enqueue additional events after stop() is called', () => { + const sinkFn = jest.fn() + const queue = new DefaultEventQueue({ + flushInterval: 30000, + maxQueueSize: 3, + sink: sinkFn, + batchComparator: () => true + }) + queue.start() + queue.enqueue(1) + queue.stop() + expect(sinkFn).toHaveBeenCalledTimes(1) + expect(sinkFn).toHaveBeenCalledWith([1]) + sinkFn.mockClear() + queue.enqueue(2) + queue.enqueue(3) + queue.enqueue(4) + expect(sinkFn).toBeCalledTimes(0) + }) }) }) diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index 5b372c361..ce6484244 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -154,6 +154,7 @@ describe('LogTierV1EventProcessor', () => { flushInterval: 100, maxQueueSize: 100, }) + processor.start() const impressionEvent = createImpressionEvent() processor.process(impressionEvent) @@ -183,6 +184,7 @@ describe('LogTierV1EventProcessor', () => { flushInterval: 100, maxQueueSize: 100, }) + processor.start() const impressionEvent = createImpressionEvent() processor.process(impressionEvent) @@ -211,6 +213,7 @@ describe('LogTierV1EventProcessor', () => { flushInterval: 100, maxQueueSize: 100, }) + processor.start() const impressionEvent1 = createImpressionEvent() const impressionEvent2 = createImpressionEvent() @@ -223,6 +226,41 @@ describe('LogTierV1EventProcessor', () => { done() }) }) + + it('should stop accepting events after stop is called', () => { + const dispatcher = { + dispatchEvent: jest.fn((event: EventV1Request, callback: EventDispatcherCallback) => { + setTimeout(() => callback({ statusCode: 204 }), 0) + }) + } + const processor = new LogTierV1EventProcessor({ + dispatcher, + flushInterval: 100, + maxQueueSize: 3, + }) + processor.start() + + const impressionEvent1 = createImpressionEvent() + processor.process(impressionEvent1) + processor.stop() + // calling stop should haver flushed the current batch of size 1 + expect(dispatcher.dispatchEvent).toBeCalledTimes(1) + + dispatcher.dispatchEvent.mockClear(); + + // From now on, subsequent events should be ignored. + // Process 3 more, which ordinarily would have triggered + // a flush due to the batch size. + const impressionEvent2 = createImpressionEvent() + processor.process(impressionEvent2) + const impressionEvent3 = createImpressionEvent() + processor.process(impressionEvent3) + const impressionEvent4 = createImpressionEvent() + processor.process(impressionEvent4) + // Since we already stopped the processor, the dispatcher should + // not have been called again. + expect(dispatcher.dispatchEvent).toBeCalledTimes(0) + }) }) describe('when maxQueueSize = 1', () => { diff --git a/packages/event-processor/src/eventQueue.ts b/packages/event-processor/src/eventQueue.ts index 870970a3d..8a9438a83 100644 --- a/packages/event-processor/src/eventQueue.ts +++ b/packages/event-processor/src/eventQueue.ts @@ -13,8 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +import { getLogger } from '@optimizely/js-sdk-logging' // TODO change this to use Managed from js-sdk-models when available import { Managed } from './managed' + +const logger = getLogger('EventProcessor') + export type EventQueueSink = (buffer: K[]) => Promise export interface EventQueue extends Managed { @@ -85,6 +90,7 @@ export class DefaultEventQueue implements EventQueue { // batchComparator is called to determine whether two events can be included // together in the same batch private batchComparator: (eventA: K, eventB: K) => boolean + private started: boolean constructor({ flushInterval, @@ -105,19 +111,28 @@ export class DefaultEventQueue implements EventQueue { callback: this.flush.bind(this), timeout: flushInterval, }) + this.started = false } start(): void { + this.started = true // dont start the timer until the first event is enqueued } stop(): Promise { + this.started = false const result = this.sink(this.buffer) + this.buffer = [] this.timer.stop() return result } enqueue(event: K): void { + if (!this.started) { + logger.warn('Queue is stopped, not accepting event') + return + } + // If new event cannot be included into the current batch, flush so it can // be in its own new batch. const bufferedEvent: K | undefined = this.buffer[0] From 8eea114c61a388382daf58fa0da8ec86927a34a9 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Thu, 29 Aug 2019 09:50:27 -0700 Subject: [PATCH 2/2] Fix CHANGELOG.md --- packages/event-processor/CHANGELOG.MD | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/event-processor/CHANGELOG.MD b/packages/event-processor/CHANGELOG.MD index 147412b17..a6d7802a5 100644 --- a/packages/event-processor/CHANGELOG.MD +++ b/packages/event-processor/CHANGELOG.MD @@ -7,8 +7,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] Changes that have landed but are not yet released. -### Changed -- Introduce an explict`DefaultEventQueue` no longer enqueues additional events after being stopped +### Fixed +- `DefaultEventQueue` no longer enqueues additional events after being stopped. As a result, `AbstractEventProcessor` no longer processes events after being stopped. +- `DefaultEventQueue` clears its buffer after being stopped. Event duplication, which was previously possible when additional events were enqueued after the stop, is no longer possible. ## [0.3.0] - August 13, 2019