Skip to content

fix(event processor): Flush current batch when event with different context received #341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 13, 2019
1 change: 1 addition & 0 deletions packages/event-processor/CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changes that have landed but are not yet released.

### Changed
- Removed transformers, interceptors, and callbacks from `AbstractEventProcessor`
- Removed grouping events by context and dispatching one event per group at flush time. Instead, only maintain one group and flush immediately when an incompatible event is processed.

## [0.2.1] - June 6, 2019

Expand Down
32 changes: 32 additions & 0 deletions packages/event-processor/__tests__/eventQueue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: -1,
sink: sinkFn,
batchComparator: () => true
})

queue.start()
Expand All @@ -76,6 +77,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: 0,
sink: sinkFn,
batchComparator: () => true
})

queue.start()
Expand All @@ -96,6 +98,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: 3,
sink: sinkFn,
batchComparator: () => true
})

queue.start()
Expand Down Expand Up @@ -123,6 +126,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: 100,
sink: sinkFn,
batchComparator: () => true
})

queue.start()
Expand All @@ -145,12 +149,37 @@ describe('eventQueue', () => {
queue.stop()
})

it('should invoke the sink function when an item incompatable with the current batch (according to batchComparator) is received', () => {
const sinkFn = jest.fn()
const queue = new DefaultEventQueue<string>({
flushInterval: 100,
maxQueueSize: 100,
sink: sinkFn,
// This batchComparator returns true when the argument strings start with the same letter
batchComparator: (s1, s2) => s1[0] === s2[0]
})

queue.start()

queue.enqueue('a1')
queue.enqueue('a2')
// After enqueuing these strings, both starting with 'a', the sinkFn should not yet be called. Thus far all the items enqueued are
// compatible according to the batchComparator.
expect(sinkFn).not.toHaveBeenCalled()

// Enqueuing a string starting with 'b' should cause the sinkFn to be called
queue.enqueue('b1')
expect(sinkFn).toHaveBeenCalledTimes(1)
expect(sinkFn).toHaveBeenCalledWith(['a1', 'a2'])
})

it('stop() should flush the existing queue and call timer.stop()', () => {
const sinkFn = jest.fn()
const queue = new DefaultEventQueue<number>({
flushInterval: 100,
maxQueueSize: 100,
sink: sinkFn,
batchComparator: () => true
})

jest.spyOn(queue.timer, 'stop')
Expand All @@ -174,6 +203,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: 100,
sink: sinkFn,
batchComparator: () => true
})

jest.spyOn(queue.timer, 'refresh')
Expand All @@ -196,6 +226,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: 100,
sink: sinkFn,
batchComparator: () => true
})

expect(queue.stop()).toBe(promise)
Expand All @@ -207,6 +238,7 @@ describe('eventQueue', () => {
flushInterval: 100,
maxQueueSize: 100,
sink: sinkFn,
batchComparator: () => true
})

queue.start()
Expand Down
47 changes: 43 additions & 4 deletions packages/event-processor/__tests__/v1EventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,27 +292,66 @@ describe('LogTierV1EventProcessor', () => {
})
})

it("should flush two batches if the event context isn't the same", () => {
it('should flush the current batch when it receives an event with a different context revision than the current batch', async () => {
const impressionEvent1 = createImpressionEvent()
const impressionEvent2 = createImpressionEvent()
const conversionEvent = createConversionEvent()
const impressionEvent2 = createImpressionEvent()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Can you add a note that createImpressionEvent and createConversionEvent create events with revision '1' and so line 300 makes sense with that context.

Or leave a comment for line 300 i.e. setting different revision to be able to test flush.


// createImpressionEvent and createConversionEvent create events with revision '1'
// We modify this one's revision to '2' in order to test that the queue is flushed
// when an event with a different revision is processed.
impressionEvent2.context.revision = '2'

processor.process(impressionEvent1)
processor.process(impressionEvent2)
processor.process(conversionEvent)

expect(dispatchStub).toHaveBeenCalledTimes(0)

processor.process(conversionEvent)
processor.process(impressionEvent2)

expect(dispatchStub).toHaveBeenCalledTimes(1)
expect(dispatchStub).toHaveBeenCalledWith({
url: 'https://logx.optimizely.com/v1/events',
httpVerb: 'POST',
params: makeBatchedEventV1([impressionEvent1, conversionEvent]),
})

await processor.stop()

expect(dispatchStub).toHaveBeenCalledTimes(2)

expect(dispatchStub).toHaveBeenCalledWith({
url: 'https://logx.optimizely.com/v1/events',
httpVerb: 'POST',
params: makeBatchedEventV1([impressionEvent2]),
})
})

it('should flush the current batch when it receives an event with a different context projectId than the current batch', async () => {
const impressionEvent1 = createImpressionEvent()
const conversionEvent = createConversionEvent()
const impressionEvent2 = createImpressionEvent()

impressionEvent2.context.projectId = 'projectId2'

processor.process(impressionEvent1)
processor.process(conversionEvent)

expect(dispatchStub).toHaveBeenCalledTimes(0)

processor.process(impressionEvent2)

expect(dispatchStub).toHaveBeenCalledTimes(1)
expect(dispatchStub).toHaveBeenCalledWith({
url: 'https://logx.optimizely.com/v1/events',
httpVerb: 'POST',
params: makeBatchedEventV1([impressionEvent1, conversionEvent]),
})

await processor.stop()

expect(dispatchStub).toHaveBeenCalledTimes(2)

expect(dispatchStub).toHaveBeenCalledWith({
url: 'https://logx.optimizely.com/v1/events',
httpVerb: 'POST',
Expand Down
40 changes: 19 additions & 21 deletions packages/event-processor/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
// TODO change this to use Managed from js-sdk-models when available
import { Managed } from './managed'
import { ConversionEvent, ImpressionEvent } from './events'
import { ConversionEvent, ImpressionEvent, areEventContextsEqual } from './events'
import { EventDispatcher, EventV1Request } from './eventDispatcher'
import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue'
import { getLogger } from '@optimizely/js-sdk-logging'
Expand Down Expand Up @@ -69,10 +69,11 @@ export abstract class AbstractEventProcessor implements EventProcessor {

maxQueueSize = Math.max(1, maxQueueSize)
if (maxQueueSize > 1) {
this.queue = new DefaultEventQueue({
this.queue = new DefaultEventQueue<ProcessableEvents>({
flushInterval,
maxQueueSize,
sink: buffer => this.drainQueue(buffer),
batchComparator: areEventContextsEqual,
})
} else {
this.queue = new SingleEventQueue({
Expand All @@ -82,27 +83,26 @@ export abstract class AbstractEventProcessor implements EventProcessor {
this.notificationCenter = notificationCenter
}

drainQueue(buffer: ProcessableEvents[]): Promise<any> {
logger.debug('draining queue with %s events', buffer.length)
drainQueue(buffer: ProcessableEvents[]): Promise<void> {
return new Promise(resolve => {
logger.debug('draining queue with %s events', buffer.length)

const promises = this.groupEvents(buffer).map(eventGroup => {
const formattedEvent = this.formatEvents(eventGroup)
if (buffer.length === 0) {
resolve()
return
}

return new Promise(resolve => {
this.dispatcher.dispatchEvent(formattedEvent, () => {
resolve()
})

if (this.notificationCenter) {
this.notificationCenter.sendNotifications(
NOTIFICATION_TYPES.LOG_EVENT,
formattedEvent,
)
}
const formattedEvent = this.formatEvents(buffer)
this.dispatcher.dispatchEvent(formattedEvent, () => {
resolve()
})
if (this.notificationCenter) {
this.notificationCenter.sendNotifications(
NOTIFICATION_TYPES.LOG_EVENT,
formattedEvent,
)
}
})

return Promise.all(promises)
}

process(event: ProcessableEvents): void {
Expand All @@ -123,7 +123,5 @@ export abstract class AbstractEventProcessor implements EventProcessor {
this.queue.start()
}

protected abstract groupEvents(events: ProcessableEvents[]): ProcessableEvents[][]

protected abstract formatEvents(events: ProcessableEvents[]): EventV1Request
}
13 changes: 13 additions & 0 deletions packages/event-processor/src/eventQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,25 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
private buffer: K[]
private maxQueueSize: number
private sink: EventQueueSink<K>
// batchComparator is called to determine whether two events can be included
// together in the same batch
private batchComparator: (eventA: K, eventB: K) => boolean

constructor({
flushInterval,
maxQueueSize,
sink,
batchComparator,
}: {
flushInterval: number
maxQueueSize: number
sink: EventQueueSink<K>
batchComparator: (eventA: K, eventB: K) => boolean
}) {
this.buffer = []
this.maxQueueSize = Math.max(maxQueueSize, 1)
this.sink = sink
this.batchComparator = batchComparator
this.timer = new Timer({
callback: this.flush.bind(this),
timeout: flushInterval,
Expand All @@ -112,6 +118,13 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
}

enqueue(event: K): void {
// If new event cannot be included into the current batch, flush so it can
// be in its own new batch.
const bufferedEvent: K | undefined = this.buffer[0]
if (bufferedEvent && !this.batchComparator(bufferedEvent, event)) {
this.flush()
}

// start the timer when the first event is put in
if (this.buffer.length === 0) {
this.timer.refresh()
Expand Down
14 changes: 14 additions & 0 deletions packages/event-processor/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,17 @@ export interface ConversionEvent extends BaseEvent {
export type EventTags = {
[key: string]: string | number | null
}

export function areEventContextsEqual(eventA: BaseEvent, eventB: BaseEvent): boolean {
const contextA = eventA.context
const contextB = eventB.context
return (
contextA.accountId === contextB.accountId &&
contextA.projectId === contextB.projectId &&
contextA.clientName === contextB.clientName &&
contextA.clientVersion === contextB.clientVersion &&
contextA.revision === contextB.revision &&
contextA.anonymizeIP === contextB.anonymizeIP &&
contextA.botFiltering === contextB.botFiltering
)
}
25 changes: 16 additions & 9 deletions packages/event-processor/src/v1/v1EventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import { groupBy, objectValues } from '@optimizely/js-sdk-utils'
/**
* Copyright 2019, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { AbstractEventProcessor, ProcessableEvents } from '../eventProcessor'
import { EventV1Request } from '../eventDispatcher'
import { makeBatchedEventV1 } from './buildEventV1'

export class LogTierV1EventProcessor extends AbstractEventProcessor {
private buildGroupByKey(event: ProcessableEvents): string {
return objectValues(event.context).join('|')
}

protected groupEvents(events: ProcessableEvents[]): ProcessableEvents[][] {
return groupBy(events, event => this.buildGroupByKey(event))
}

protected formatEvents(events: ProcessableEvents[]): EventV1Request {
return {
url: 'https://logx.optimizely.com/v1/events',
Expand Down