From 6549330212f21ea85ba39b48252e52822e7b0a2b Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Fri, 9 Aug 2019 11:09:14 -0700 Subject: [PATCH 1/9] Instead of sending multiple requests at flush time, one for each context, flush as soon as a different context is observed --- .../event-processor/src/eventProcessor.ts | 24 ++++++------------ packages/event-processor/src/eventQueue.ts | 13 ++++++++++ packages/event-processor/src/events.ts | 6 +++++ .../src/v1/v1EventProcessor.ts | 25 ++++++++++++------- 4 files changed, 43 insertions(+), 25 deletions(-) diff --git a/packages/event-processor/src/eventProcessor.ts b/packages/event-processor/src/eventProcessor.ts index 62f8b4597..8020f3a18 100644 --- a/packages/event-processor/src/eventProcessor.ts +++ b/packages/event-processor/src/eventProcessor.ts @@ -15,7 +15,7 @@ */ // TODO change this to use Managed from js-sdk-models when available import { Managed } from './managed' -import { ConversionEvent, ImpressionEvent } from './events' +import { ConversionEvent, ImpressionEvent, areEventContextsEqual } from './events' import { EventDispatcher, EventV1Request, @@ -51,10 +51,11 @@ export abstract class AbstractEventProcessor implements EventProcessor { maxQueueSize = Math.max(1, maxQueueSize) if (maxQueueSize > 1) { - this.queue = new DefaultEventQueue({ + this.queue = new DefaultEventQueue({ flushInterval: Math.max(flushInterval, MIN_FLUSH_INTERVAL), maxQueueSize, sink: buffer => this.drainQueue(buffer), + batchComparator: areEventContextsEqual, }) } else { this.queue = new SingleEventQueue({ @@ -63,20 +64,13 @@ export abstract class AbstractEventProcessor implements EventProcessor { } } - drainQueue(buffer: ProcessableEvents[]): Promise { - logger.debug('draining queue with %s events', buffer.length) - - const promises = this.groupEvents(buffer).map(eventGroup => { - const formattedEvent = this.formatEvents(eventGroup) - - return new Promise((resolve) => { - this.dispatcher.dispatchEvent(formattedEvent, () => { - resolve() - }) + drainQueue(buffer: ProcessableEvents[]): Promise { + return new Promise(resolve => { + logger.debug('draining queue with %s events', buffer.length) + this.dispatcher.dispatchEvent(this.formatEvents(buffer), () => { + resolve() }) }) - - return Promise.all(promises) } process(event: ProcessableEvents): void { @@ -97,7 +91,5 @@ export abstract class AbstractEventProcessor implements EventProcessor { this.queue.start() } - protected abstract groupEvents(events: ProcessableEvents[]): ProcessableEvents[][] - protected abstract formatEvents(events: ProcessableEvents[]): EventV1Request } diff --git a/packages/event-processor/src/eventQueue.ts b/packages/event-processor/src/eventQueue.ts index a8c997dc8..870970a3d 100644 --- a/packages/event-processor/src/eventQueue.ts +++ b/packages/event-processor/src/eventQueue.ts @@ -82,19 +82,25 @@ export class DefaultEventQueue implements EventQueue { private buffer: K[] private maxQueueSize: number private sink: EventQueueSink + // batchComparator is called to determine whether two events can be included + // together in the same batch + private batchComparator: (eventA: K, eventB: K) => boolean constructor({ flushInterval, maxQueueSize, sink, + batchComparator, }: { flushInterval: number maxQueueSize: number sink: EventQueueSink + batchComparator: (eventA: K, eventB: K) => boolean }) { this.buffer = [] this.maxQueueSize = Math.max(maxQueueSize, 1) this.sink = sink + this.batchComparator = batchComparator this.timer = new Timer({ callback: this.flush.bind(this), timeout: flushInterval, @@ -112,6 +118,13 @@ export class DefaultEventQueue implements EventQueue { } enqueue(event: K): void { + // If new event cannot be included into the current batch, flush so it can + // be in its own new batch. + const bufferedEvent: K | undefined = this.buffer[0] + if (bufferedEvent && !this.batchComparator(bufferedEvent, event)) { + this.flush() + } + // start the timer when the first event is put in if (this.buffer.length === 0) { this.timer.refresh() diff --git a/packages/event-processor/src/events.ts b/packages/event-processor/src/events.ts index cb5661c2f..3be354847 100644 --- a/packages/event-processor/src/events.ts +++ b/packages/event-processor/src/events.ts @@ -80,3 +80,9 @@ export interface ConversionEvent extends BaseEvent { export type EventTags = { [key: string]: string | number | null } + +export function areEventContextsEqual(eventA: BaseEvent, eventB: BaseEvent): boolean { + return Object.keys(eventA.context).every( + contextKey => eventA.context[contextKey] === eventB.context[contextKey], + ) +} diff --git a/packages/event-processor/src/v1/v1EventProcessor.ts b/packages/event-processor/src/v1/v1EventProcessor.ts index 9117c0403..3796f5f63 100644 --- a/packages/event-processor/src/v1/v1EventProcessor.ts +++ b/packages/event-processor/src/v1/v1EventProcessor.ts @@ -1,17 +1,24 @@ -import { groupBy, objectValues } from '@optimizely/js-sdk-utils' +/** + * Copyright 2019, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import { AbstractEventProcessor, ProcessableEvents } from '../eventProcessor' import { EventV1Request } from '../eventDispatcher' import { makeBatchedEventV1 } from './buildEventV1' export class LogTierV1EventProcessor extends AbstractEventProcessor { - private buildGroupByKey(event: ProcessableEvents): string { - return objectValues(event.context).join('|') - } - - protected groupEvents(events: ProcessableEvents[]): ProcessableEvents[][] { - return groupBy(events, event => this.buildGroupByKey(event)) - } - protected formatEvents(events: ProcessableEvents[]): EventV1Request { return { url: 'https://logx.optimizely.com/v1/events', From 8e4c088c275c840c0a5700f4c33fd1ba5c5fb4c9 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Fri, 9 Aug 2019 11:44:32 -0700 Subject: [PATCH 2/9] Don't dispatch an event if nothing is in the buffer upon flush --- packages/event-processor/src/eventProcessor.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/event-processor/src/eventProcessor.ts b/packages/event-processor/src/eventProcessor.ts index 8020f3a18..b6d55d44f 100644 --- a/packages/event-processor/src/eventProcessor.ts +++ b/packages/event-processor/src/eventProcessor.ts @@ -67,6 +67,12 @@ export abstract class AbstractEventProcessor implements EventProcessor { drainQueue(buffer: ProcessableEvents[]): Promise { return new Promise(resolve => { logger.debug('draining queue with %s events', buffer.length) + + if (buffer.length === 0) { + resolve() + return + } + this.dispatcher.dispatchEvent(this.formatEvents(buffer), () => { resolve() }) From 675d2150db7b9d0512c677be1f4f469eb14dda05 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Fri, 9 Aug 2019 13:51:43 -0700 Subject: [PATCH 3/9] Update unit tests --- .../__tests__/eventQueue.spec.ts | 28 +++++++++++++++++++ .../__tests__/v1EventProcessor.spec.ts | 14 ++++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/packages/event-processor/__tests__/eventQueue.spec.ts b/packages/event-processor/__tests__/eventQueue.spec.ts index 5a263d9f7..02f5a0b52 100644 --- a/packages/event-processor/__tests__/eventQueue.spec.ts +++ b/packages/event-processor/__tests__/eventQueue.spec.ts @@ -56,6 +56,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: -1, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -76,6 +77,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 0, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -96,6 +98,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 3, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -123,6 +126,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -145,12 +149,33 @@ describe('eventQueue', () => { queue.stop() }) + it('should invoke the sink function when an event incompatable with the current batch (according to batchComparator) is received', () => { + const sinkFn = jest.fn() + const queue = new DefaultEventQueue({ + flushInterval: 100, + maxQueueSize: 100, + sink: sinkFn, + batchComparator: (n1, n2) => (n1 % 2) === (n2 % 2) + }) + + queue.start() + + queue.enqueue(2) + queue.enqueue(4) + expect(sinkFn).not.toHaveBeenCalled() + + queue.enqueue(3) + expect(sinkFn).toHaveBeenCalledTimes(1) + expect(sinkFn).toHaveBeenCalledWith([2, 4]) + }) + it('stop() should flush the existing queue and call timer.stop()', () => { const sinkFn = jest.fn() const queue = new DefaultEventQueue({ flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) jest.spyOn(queue.timer, 'stop') @@ -174,6 +199,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) jest.spyOn(queue.timer, 'refresh') @@ -196,6 +222,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) expect(queue.stop()).toBe(promise) @@ -207,6 +234,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) queue.start() diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index 5484e15e4..ce4414b5c 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -291,27 +291,31 @@ describe('LogTierV1EventProcessor', () => { }) }) - it("should flush two batches if the event context isn't the same", () => { + it("should flush the current batch when it receives an event with a different context than the current batch", async () => { const impressionEvent1 = createImpressionEvent() - const impressionEvent2 = createImpressionEvent() const conversionEvent = createConversionEvent() + const impressionEvent2 = createImpressionEvent() impressionEvent2.context.revision = '2' processor.process(impressionEvent1) - processor.process(impressionEvent2) + processor.process(conversionEvent) expect(dispatchStub).toHaveBeenCalledTimes(0) - processor.process(conversionEvent) + processor.process(impressionEvent2) - expect(dispatchStub).toHaveBeenCalledTimes(2) + expect(dispatchStub).toHaveBeenCalledTimes(1) expect(dispatchStub).toHaveBeenCalledWith({ url: 'https://logx.optimizely.com/v1/events', httpVerb: 'POST', params: makeBatchedEventV1([impressionEvent1, conversionEvent]), }) + await processor.stop() + + expect(dispatchStub).toHaveBeenCalledTimes(2) + expect(dispatchStub).toHaveBeenCalledWith({ url: 'https://logx.optimizely.com/v1/events', httpVerb: 'POST', From dd035eaf26fba6695b3fe2b3aaa68780a40e5436 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Fri, 9 Aug 2019 14:30:57 -0700 Subject: [PATCH 4/9] More test fixes, and check context key equivalence --- .../__tests__/eventQueue.spec.ts | 2 +- .../__tests__/v1EventProcessor.spec.ts | 35 ++++++++++++++++++- packages/event-processor/src/events.ts | 7 +++- 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/packages/event-processor/__tests__/eventQueue.spec.ts b/packages/event-processor/__tests__/eventQueue.spec.ts index 02f5a0b52..029df62d7 100644 --- a/packages/event-processor/__tests__/eventQueue.spec.ts +++ b/packages/event-processor/__tests__/eventQueue.spec.ts @@ -149,7 +149,7 @@ describe('eventQueue', () => { queue.stop() }) - it('should invoke the sink function when an event incompatable with the current batch (according to batchComparator) is received', () => { + it('should invoke the sink function when an item incompatable with the current batch (according to batchComparator) is received', () => { const sinkFn = jest.fn() const queue = new DefaultEventQueue({ flushInterval: 100, diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index ce4414b5c..947ccbe76 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -291,7 +291,7 @@ describe('LogTierV1EventProcessor', () => { }) }) - it("should flush the current batch when it receives an event with a different context than the current batch", async () => { + it('should flush the current batch when it receives an event with a different context revision than the current batch', async () => { const impressionEvent1 = createImpressionEvent() const conversionEvent = createConversionEvent() const impressionEvent2 = createImpressionEvent() @@ -323,6 +323,39 @@ describe('LogTierV1EventProcessor', () => { }) }) + it('should flush the current batch when it receives an event with a different context projectId than the current batch', async () => { + const impressionEvent1 = createImpressionEvent() + const conversionEvent = createConversionEvent() + const impressionEvent2 = createImpressionEvent() + + impressionEvent2.context.projectId = 'projectId2' + + processor.process(impressionEvent1) + processor.process(conversionEvent) + + expect(dispatchStub).toHaveBeenCalledTimes(0) + + processor.process(impressionEvent2) + + expect(dispatchStub).toHaveBeenCalledTimes(1) + expect(dispatchStub).toHaveBeenCalledWith({ + url: 'https://logx.optimizely.com/v1/events', + httpVerb: 'POST', + params: makeBatchedEventV1([impressionEvent1, conversionEvent]), + }) + + await processor.stop() + + expect(dispatchStub).toHaveBeenCalledTimes(2) + + expect(dispatchStub).toHaveBeenCalledWith({ + url: 'https://logx.optimizely.com/v1/events', + httpVerb: 'POST', + params: makeBatchedEventV1([impressionEvent2]), + }) + + }) + it('should flush the queue when the flush interval happens', () => { const impressionEvent1 = createImpressionEvent() diff --git a/packages/event-processor/src/events.ts b/packages/event-processor/src/events.ts index 3be354847..d64bdfc3d 100644 --- a/packages/event-processor/src/events.ts +++ b/packages/event-processor/src/events.ts @@ -82,7 +82,12 @@ export type EventTags = { } export function areEventContextsEqual(eventA: BaseEvent, eventB: BaseEvent): boolean { - return Object.keys(eventA.context).every( + const eventAContextKeys = Object.keys(eventA.context) + const eventBContextKeys = Object.keys(eventB.context) + if (eventAContextKeys.length !== eventBContextKeys.length) { + return false + } + return eventAContextKeys.every( contextKey => eventA.context[contextKey] === eventB.context[contextKey], ) } From 350cf8ece4fa13854d25788438033ce3e96a09c9 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Fri, 9 Aug 2019 14:39:34 -0700 Subject: [PATCH 5/9] Don't iterate in event context equality check --- packages/event-processor/src/events.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/event-processor/src/events.ts b/packages/event-processor/src/events.ts index d64bdfc3d..2285ec859 100644 --- a/packages/event-processor/src/events.ts +++ b/packages/event-processor/src/events.ts @@ -82,12 +82,15 @@ export type EventTags = { } export function areEventContextsEqual(eventA: BaseEvent, eventB: BaseEvent): boolean { - const eventAContextKeys = Object.keys(eventA.context) - const eventBContextKeys = Object.keys(eventB.context) - if (eventAContextKeys.length !== eventBContextKeys.length) { - return false - } - return eventAContextKeys.every( - contextKey => eventA.context[contextKey] === eventB.context[contextKey], + const contextA = eventA.context + const contextB = eventB.context + return ( + contextA.accountId === contextB.accountId && + contextA.projectId === contextB.projectId && + contextA.clientName === contextB.clientName && + contextA.clientVersion === contextB.clientVersion && + contextA.revision === contextB.revision && + contextA.anonymizeIP === contextB.anonymizeIP && + contextA.botFiltering === contextB.botFiltering ) } From 23a6ad177ce1087560347ccf1bfdc28b672f8636 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Fri, 9 Aug 2019 14:40:48 -0700 Subject: [PATCH 6/9] Remove newline --- packages/event-processor/__tests__/v1EventProcessor.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index 947ccbe76..457894946 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -353,7 +353,6 @@ describe('LogTierV1EventProcessor', () => { httpVerb: 'POST', params: makeBatchedEventV1([impressionEvent2]), }) - }) it('should flush the queue when the flush interval happens', () => { From ae405384053c588c9dae95e4c0bdced5d0eff24f Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Mon, 12 Aug 2019 15:11:57 -0700 Subject: [PATCH 7/9] Only call formatEvent once --- packages/event-processor/src/eventProcessor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/event-processor/src/eventProcessor.ts b/packages/event-processor/src/eventProcessor.ts index 41bf9c4a4..96887e976 100644 --- a/packages/event-processor/src/eventProcessor.ts +++ b/packages/event-processor/src/eventProcessor.ts @@ -93,7 +93,7 @@ export abstract class AbstractEventProcessor implements EventProcessor { } const formattedEvent = this.formatEvents(buffer) - this.dispatcher.dispatchEvent(this.formatEvents(buffer), () => { + this.dispatcher.dispatchEvent(formattedEvent, () => { resolve() }) if (this.notificationCenter) { From ff8415bbcc00a6cedae34154dff5da9ba00b33f4 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Tue, 13 Aug 2019 08:58:59 -0700 Subject: [PATCH 8/9] Update CHANGELOG --- packages/event-processor/CHANGELOG.MD | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/event-processor/CHANGELOG.MD b/packages/event-processor/CHANGELOG.MD index ce7ad0fec..0b9044afd 100644 --- a/packages/event-processor/CHANGELOG.MD +++ b/packages/event-processor/CHANGELOG.MD @@ -13,6 +13,7 @@ Changes that have landed but are not yet released. ### Changed - Removed transformers, interceptors, and callbacks from `AbstractEventProcessor` +- Removed grouping events by context and dispatching one event per group at flush time. Instead, only maintain one group and flush immediately when an incompatible event is processed. ## [0.2.1] - June 6, 2019 From 95874b4247bbe6d73286b5fffdf1565cb6964507 Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Tue, 13 Aug 2019 09:14:05 -0700 Subject: [PATCH 9/9] Address review feedback --- .../event-processor/__tests__/eventQueue.spec.ts | 16 ++++++++++------ .../__tests__/v1EventProcessor.spec.ts | 3 +++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/packages/event-processor/__tests__/eventQueue.spec.ts b/packages/event-processor/__tests__/eventQueue.spec.ts index 029df62d7..7722410ad 100644 --- a/packages/event-processor/__tests__/eventQueue.spec.ts +++ b/packages/event-processor/__tests__/eventQueue.spec.ts @@ -151,22 +151,26 @@ describe('eventQueue', () => { it('should invoke the sink function when an item incompatable with the current batch (according to batchComparator) is received', () => { const sinkFn = jest.fn() - const queue = new DefaultEventQueue({ + const queue = new DefaultEventQueue({ flushInterval: 100, maxQueueSize: 100, sink: sinkFn, - batchComparator: (n1, n2) => (n1 % 2) === (n2 % 2) + // This batchComparator returns true when the argument strings start with the same letter + batchComparator: (s1, s2) => s1[0] === s2[0] }) queue.start() - queue.enqueue(2) - queue.enqueue(4) + queue.enqueue('a1') + queue.enqueue('a2') + // After enqueuing these strings, both starting with 'a', the sinkFn should not yet be called. Thus far all the items enqueued are + // compatible according to the batchComparator. expect(sinkFn).not.toHaveBeenCalled() - queue.enqueue(3) + // Enqueuing a string starting with 'b' should cause the sinkFn to be called + queue.enqueue('b1') expect(sinkFn).toHaveBeenCalledTimes(1) - expect(sinkFn).toHaveBeenCalledWith([2, 4]) + expect(sinkFn).toHaveBeenCalledWith(['a1', 'a2']) }) it('stop() should flush the existing queue and call timer.stop()', () => { diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index 93768e47b..5b372c361 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -297,6 +297,9 @@ describe('LogTierV1EventProcessor', () => { const conversionEvent = createConversionEvent() const impressionEvent2 = createImpressionEvent() + // createImpressionEvent and createConversionEvent create events with revision '1' + // We modify this one's revision to '2' in order to test that the queue is flushed + // when an event with a different revision is processed. impressionEvent2.context.revision = '2' processor.process(impressionEvent1)