Skip to content

fix(event processor): Stop accepting events after stop is called #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/event-processor/CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
Changes that have landed but are not yet released.

### Fixed
- `DefaultEventQueue` no longer enqueues additional events after being stopped. As a result, `AbstractEventProcessor` no longer processes events after being stopped.
- `DefaultEventQueue` clears its buffer after being stopped. Event duplication, which was previously possible when additional events were enqueued after the stop, is no longer possible.

## [0.3.0] - August 13, 2019

### New Features
Expand Down
20 changes: 20 additions & 0 deletions packages/event-processor/__tests__/eventQueue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,25 @@ describe('eventQueue', () => {
queue.stop()

})

it('should not enqueue additional events after stop() is called', () => {
const sinkFn = jest.fn()
const queue = new DefaultEventQueue<number>({
flushInterval: 30000,
maxQueueSize: 3,
sink: sinkFn,
batchComparator: () => true
})
queue.start()
queue.enqueue(1)
queue.stop()
expect(sinkFn).toHaveBeenCalledTimes(1)
expect(sinkFn).toHaveBeenCalledWith([1])
sinkFn.mockClear()
queue.enqueue(2)
queue.enqueue(3)
queue.enqueue(4)
expect(sinkFn).toBeCalledTimes(0)
})
})
})
38 changes: 38 additions & 0 deletions packages/event-processor/__tests__/v1EventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ describe('LogTierV1EventProcessor', () => {
flushInterval: 100,
maxQueueSize: 100,
})
processor.start()

const impressionEvent = createImpressionEvent()
processor.process(impressionEvent)
Expand Down Expand Up @@ -183,6 +184,7 @@ describe('LogTierV1EventProcessor', () => {
flushInterval: 100,
maxQueueSize: 100,
})
processor.start()

const impressionEvent = createImpressionEvent()
processor.process(impressionEvent)
Expand Down Expand Up @@ -211,6 +213,7 @@ describe('LogTierV1EventProcessor', () => {
flushInterval: 100,
maxQueueSize: 100,
})
processor.start()

const impressionEvent1 = createImpressionEvent()
const impressionEvent2 = createImpressionEvent()
Expand All @@ -223,6 +226,41 @@ describe('LogTierV1EventProcessor', () => {
done()
})
})

it('should stop accepting events after stop is called', () => {
const dispatcher = {
dispatchEvent: jest.fn((event: EventV1Request, callback: EventDispatcherCallback) => {
setTimeout(() => callback({ statusCode: 204 }), 0)
})
}
const processor = new LogTierV1EventProcessor({
dispatcher,
flushInterval: 100,
maxQueueSize: 3,
})
processor.start()

const impressionEvent1 = createImpressionEvent()
processor.process(impressionEvent1)
processor.stop()
// calling stop should haver flushed the current batch of size 1
expect(dispatcher.dispatchEvent).toBeCalledTimes(1)

dispatcher.dispatchEvent.mockClear();

// From now on, subsequent events should be ignored.
// Process 3 more, which ordinarily would have triggered
// a flush due to the batch size.
const impressionEvent2 = createImpressionEvent()
processor.process(impressionEvent2)
const impressionEvent3 = createImpressionEvent()
processor.process(impressionEvent3)
const impressionEvent4 = createImpressionEvent()
processor.process(impressionEvent4)
// Since we already stopped the processor, the dispatcher should
// not have been called again.
expect(dispatcher.dispatchEvent).toBeCalledTimes(0)
})
})

describe('when maxQueueSize = 1', () => {
Expand Down
15 changes: 15 additions & 0 deletions packages/event-processor/src/eventQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { getLogger } from '@optimizely/js-sdk-logging'
// TODO change this to use Managed from js-sdk-models when available
import { Managed } from './managed'

const logger = getLogger('EventProcessor')

export type EventQueueSink<K> = (buffer: K[]) => Promise<any>

export interface EventQueue<K> extends Managed {
Expand Down Expand Up @@ -85,6 +90,7 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
// batchComparator is called to determine whether two events can be included
// together in the same batch
private batchComparator: (eventA: K, eventB: K) => boolean
private started: boolean

constructor({
flushInterval,
Expand All @@ -105,19 +111,28 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
callback: this.flush.bind(this),
timeout: flushInterval,
})
this.started = false
}

start(): void {
this.started = true
// dont start the timer until the first event is enqueued
}

stop(): Promise<any> {
this.started = false
const result = this.sink(this.buffer)
this.buffer = []
this.timer.stop()
return result
}

enqueue(event: K): void {
if (!this.started) {
logger.warn('Queue is stopped, not accepting event')
return
}

// If new event cannot be included into the current batch, flush so it can
// be in its own new batch.
const bufferedEvent: K | undefined = this.buffer[0]
Expand Down