Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit a18a073

Browse files
authored
fix(event processor): Flush current batch when event with different context received (optimizely#341)
Summary: Prior to this change, event processor's buffer could contain events with different contexts. At flush time, the buffered events would be grouped by context and a request would be dispatched for each group. With this change, we introduce an invariant: all events in the buffer share the same context. As soon as an event with different context is received, the queue is flushed. With this restriction, at most one request is dispatched per flush. - A new comparator function (batchComparator) is provided to DefaultEventQueue. When enqueuing an event, batchComparator is used to determine whether the incoming event can be included into the current batch. If not, we flush. - LogTierV1EventProcessor provides a batchComparator when constructing a DefaultEventQueue that checks equality of all context properties. - In AbstractEventProcessor, the grouping step is removed from the flush process. Test plan: New unit tests. Manually tested. Issues: https://optimizely.atlassian.net/browse/OASIS-5160
1 parent 0ab1759 commit a18a073

File tree

7 files changed

+138
-34
lines changed

7 files changed

+138
-34
lines changed

packages/event-processor/CHANGELOG.MD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Changes that have landed but are not yet released.
1313

1414
### Changed
1515
- Removed transformers, interceptors, and callbacks from `AbstractEventProcessor`
16+
- Removed grouping events by context and dispatching one event per group at flush time. Instead, only maintain one group and flush immediately when an incompatible event is processed.
1617

1718
## [0.2.1] - June 6, 2019
1819

packages/event-processor/__tests__/eventQueue.spec.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ describe('eventQueue', () => {
5656
flushInterval: 100,
5757
maxQueueSize: -1,
5858
sink: sinkFn,
59+
batchComparator: () => true
5960
})
6061

6162
queue.start()
@@ -76,6 +77,7 @@ describe('eventQueue', () => {
7677
flushInterval: 100,
7778
maxQueueSize: 0,
7879
sink: sinkFn,
80+
batchComparator: () => true
7981
})
8082

8183
queue.start()
@@ -96,6 +98,7 @@ describe('eventQueue', () => {
9698
flushInterval: 100,
9799
maxQueueSize: 3,
98100
sink: sinkFn,
101+
batchComparator: () => true
99102
})
100103

101104
queue.start()
@@ -123,6 +126,7 @@ describe('eventQueue', () => {
123126
flushInterval: 100,
124127
maxQueueSize: 100,
125128
sink: sinkFn,
129+
batchComparator: () => true
126130
})
127131

128132
queue.start()
@@ -145,12 +149,37 @@ describe('eventQueue', () => {
145149
queue.stop()
146150
})
147151

152+
it('should invoke the sink function when an item incompatable with the current batch (according to batchComparator) is received', () => {
153+
const sinkFn = jest.fn()
154+
const queue = new DefaultEventQueue<string>({
155+
flushInterval: 100,
156+
maxQueueSize: 100,
157+
sink: sinkFn,
158+
// This batchComparator returns true when the argument strings start with the same letter
159+
batchComparator: (s1, s2) => s1[0] === s2[0]
160+
})
161+
162+
queue.start()
163+
164+
queue.enqueue('a1')
165+
queue.enqueue('a2')
166+
// After enqueuing these strings, both starting with 'a', the sinkFn should not yet be called. Thus far all the items enqueued are
167+
// compatible according to the batchComparator.
168+
expect(sinkFn).not.toHaveBeenCalled()
169+
170+
// Enqueuing a string starting with 'b' should cause the sinkFn to be called
171+
queue.enqueue('b1')
172+
expect(sinkFn).toHaveBeenCalledTimes(1)
173+
expect(sinkFn).toHaveBeenCalledWith(['a1', 'a2'])
174+
})
175+
148176
it('stop() should flush the existing queue and call timer.stop()', () => {
149177
const sinkFn = jest.fn()
150178
const queue = new DefaultEventQueue<number>({
151179
flushInterval: 100,
152180
maxQueueSize: 100,
153181
sink: sinkFn,
182+
batchComparator: () => true
154183
})
155184

156185
jest.spyOn(queue.timer, 'stop')
@@ -174,6 +203,7 @@ describe('eventQueue', () => {
174203
flushInterval: 100,
175204
maxQueueSize: 100,
176205
sink: sinkFn,
206+
batchComparator: () => true
177207
})
178208

179209
jest.spyOn(queue.timer, 'refresh')
@@ -196,6 +226,7 @@ describe('eventQueue', () => {
196226
flushInterval: 100,
197227
maxQueueSize: 100,
198228
sink: sinkFn,
229+
batchComparator: () => true
199230
})
200231

201232
expect(queue.stop()).toBe(promise)
@@ -207,6 +238,7 @@ describe('eventQueue', () => {
207238
flushInterval: 100,
208239
maxQueueSize: 100,
209240
sink: sinkFn,
241+
batchComparator: () => true
210242
})
211243

212244
queue.start()

packages/event-processor/__tests__/v1EventProcessor.spec.ts

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,27 +292,66 @@ describe('LogTierV1EventProcessor', () => {
292292
})
293293
})
294294

295-
it("should flush two batches if the event context isn't the same", () => {
295+
it('should flush the current batch when it receives an event with a different context revision than the current batch', async () => {
296296
const impressionEvent1 = createImpressionEvent()
297-
const impressionEvent2 = createImpressionEvent()
298297
const conversionEvent = createConversionEvent()
298+
const impressionEvent2 = createImpressionEvent()
299299

300+
// createImpressionEvent and createConversionEvent create events with revision '1'
301+
// We modify this one's revision to '2' in order to test that the queue is flushed
302+
// when an event with a different revision is processed.
300303
impressionEvent2.context.revision = '2'
301304

302305
processor.process(impressionEvent1)
303-
processor.process(impressionEvent2)
306+
processor.process(conversionEvent)
304307

305308
expect(dispatchStub).toHaveBeenCalledTimes(0)
306309

307-
processor.process(conversionEvent)
310+
processor.process(impressionEvent2)
311+
312+
expect(dispatchStub).toHaveBeenCalledTimes(1)
313+
expect(dispatchStub).toHaveBeenCalledWith({
314+
url: 'https://logx.optimizely.com/v1/events',
315+
httpVerb: 'POST',
316+
params: makeBatchedEventV1([impressionEvent1, conversionEvent]),
317+
})
318+
319+
await processor.stop()
308320

309321
expect(dispatchStub).toHaveBeenCalledTimes(2)
322+
323+
expect(dispatchStub).toHaveBeenCalledWith({
324+
url: 'https://logx.optimizely.com/v1/events',
325+
httpVerb: 'POST',
326+
params: makeBatchedEventV1([impressionEvent2]),
327+
})
328+
})
329+
330+
it('should flush the current batch when it receives an event with a different context projectId than the current batch', async () => {
331+
const impressionEvent1 = createImpressionEvent()
332+
const conversionEvent = createConversionEvent()
333+
const impressionEvent2 = createImpressionEvent()
334+
335+
impressionEvent2.context.projectId = 'projectId2'
336+
337+
processor.process(impressionEvent1)
338+
processor.process(conversionEvent)
339+
340+
expect(dispatchStub).toHaveBeenCalledTimes(0)
341+
342+
processor.process(impressionEvent2)
343+
344+
expect(dispatchStub).toHaveBeenCalledTimes(1)
310345
expect(dispatchStub).toHaveBeenCalledWith({
311346
url: 'https://logx.optimizely.com/v1/events',
312347
httpVerb: 'POST',
313348
params: makeBatchedEventV1([impressionEvent1, conversionEvent]),
314349
})
315350

351+
await processor.stop()
352+
353+
expect(dispatchStub).toHaveBeenCalledTimes(2)
354+
316355
expect(dispatchStub).toHaveBeenCalledWith({
317356
url: 'https://logx.optimizely.com/v1/events',
318357
httpVerb: 'POST',

packages/event-processor/src/eventProcessor.ts

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
// TODO change this to use Managed from js-sdk-models when available
1717
import { Managed } from './managed'
18-
import { ConversionEvent, ImpressionEvent } from './events'
18+
import { ConversionEvent, ImpressionEvent, areEventContextsEqual } from './events'
1919
import { EventDispatcher, EventV1Request } from './eventDispatcher'
2020
import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue'
2121
import { getLogger } from '@optimizely/js-sdk-logging'
@@ -69,10 +69,11 @@ export abstract class AbstractEventProcessor implements EventProcessor {
6969

7070
maxQueueSize = Math.max(1, maxQueueSize)
7171
if (maxQueueSize > 1) {
72-
this.queue = new DefaultEventQueue({
72+
this.queue = new DefaultEventQueue<ProcessableEvents>({
7373
flushInterval,
7474
maxQueueSize,
7575
sink: buffer => this.drainQueue(buffer),
76+
batchComparator: areEventContextsEqual,
7677
})
7778
} else {
7879
this.queue = new SingleEventQueue({
@@ -82,27 +83,26 @@ export abstract class AbstractEventProcessor implements EventProcessor {
8283
this.notificationCenter = notificationCenter
8384
}
8485

85-
drainQueue(buffer: ProcessableEvents[]): Promise<any> {
86-
logger.debug('draining queue with %s events', buffer.length)
86+
drainQueue(buffer: ProcessableEvents[]): Promise<void> {
87+
return new Promise(resolve => {
88+
logger.debug('draining queue with %s events', buffer.length)
8789

88-
const promises = this.groupEvents(buffer).map(eventGroup => {
89-
const formattedEvent = this.formatEvents(eventGroup)
90+
if (buffer.length === 0) {
91+
resolve()
92+
return
93+
}
9094

91-
return new Promise(resolve => {
92-
this.dispatcher.dispatchEvent(formattedEvent, () => {
93-
resolve()
94-
})
95-
96-
if (this.notificationCenter) {
97-
this.notificationCenter.sendNotifications(
98-
NOTIFICATION_TYPES.LOG_EVENT,
99-
formattedEvent,
100-
)
101-
}
95+
const formattedEvent = this.formatEvents(buffer)
96+
this.dispatcher.dispatchEvent(formattedEvent, () => {
97+
resolve()
10298
})
99+
if (this.notificationCenter) {
100+
this.notificationCenter.sendNotifications(
101+
NOTIFICATION_TYPES.LOG_EVENT,
102+
formattedEvent,
103+
)
104+
}
103105
})
104-
105-
return Promise.all(promises)
106106
}
107107

108108
process(event: ProcessableEvents): void {
@@ -123,7 +123,5 @@ export abstract class AbstractEventProcessor implements EventProcessor {
123123
this.queue.start()
124124
}
125125

126-
protected abstract groupEvents(events: ProcessableEvents[]): ProcessableEvents[][]
127-
128126
protected abstract formatEvents(events: ProcessableEvents[]): EventV1Request
129127
}

packages/event-processor/src/eventQueue.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,25 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
8282
private buffer: K[]
8383
private maxQueueSize: number
8484
private sink: EventQueueSink<K>
85+
// batchComparator is called to determine whether two events can be included
86+
// together in the same batch
87+
private batchComparator: (eventA: K, eventB: K) => boolean
8588

8689
constructor({
8790
flushInterval,
8891
maxQueueSize,
8992
sink,
93+
batchComparator,
9094
}: {
9195
flushInterval: number
9296
maxQueueSize: number
9397
sink: EventQueueSink<K>
98+
batchComparator: (eventA: K, eventB: K) => boolean
9499
}) {
95100
this.buffer = []
96101
this.maxQueueSize = Math.max(maxQueueSize, 1)
97102
this.sink = sink
103+
this.batchComparator = batchComparator
98104
this.timer = new Timer({
99105
callback: this.flush.bind(this),
100106
timeout: flushInterval,
@@ -112,6 +118,13 @@ export class DefaultEventQueue<K> implements EventQueue<K> {
112118
}
113119

114120
enqueue(event: K): void {
121+
// If new event cannot be included into the current batch, flush so it can
122+
// be in its own new batch.
123+
const bufferedEvent: K | undefined = this.buffer[0]
124+
if (bufferedEvent && !this.batchComparator(bufferedEvent, event)) {
125+
this.flush()
126+
}
127+
115128
// start the timer when the first event is put in
116129
if (this.buffer.length === 0) {
117130
this.timer.refresh()

packages/event-processor/src/events.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,17 @@ export interface ConversionEvent extends BaseEvent {
8080
export type EventTags = {
8181
[key: string]: string | number | null
8282
}
83+
84+
export function areEventContextsEqual(eventA: BaseEvent, eventB: BaseEvent): boolean {
85+
const contextA = eventA.context
86+
const contextB = eventB.context
87+
return (
88+
contextA.accountId === contextB.accountId &&
89+
contextA.projectId === contextB.projectId &&
90+
contextA.clientName === contextB.clientName &&
91+
contextA.clientVersion === contextB.clientVersion &&
92+
contextA.revision === contextB.revision &&
93+
contextA.anonymizeIP === contextB.anonymizeIP &&
94+
contextA.botFiltering === contextB.botFiltering
95+
)
96+
}

packages/event-processor/src/v1/v1EventProcessor.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
1-
import { groupBy, objectValues } from '@optimizely/js-sdk-utils'
1+
/**
2+
* Copyright 2019, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
217
import { AbstractEventProcessor, ProcessableEvents } from '../eventProcessor'
318
import { EventV1Request } from '../eventDispatcher'
419
import { makeBatchedEventV1 } from './buildEventV1'
520

621
export class LogTierV1EventProcessor extends AbstractEventProcessor {
7-
private buildGroupByKey(event: ProcessableEvents): string {
8-
return objectValues(event.context).join('|')
9-
}
10-
11-
protected groupEvents(events: ProcessableEvents[]): ProcessableEvents[][] {
12-
return groupBy(events, event => this.buildGroupByKey(event))
13-
}
14-
1522
protected formatEvents(events: ProcessableEvents[]): EventV1Request {
1623
return {
1724
url: 'https://logx.optimizely.com/v1/events',

0 commit comments

Comments
 (0)