diff --git a/packages/event-processor/CHANGELOG.MD b/packages/event-processor/CHANGELOG.MD index ce7ad0fec..0b9044afd 100644 --- a/packages/event-processor/CHANGELOG.MD +++ b/packages/event-processor/CHANGELOG.MD @@ -13,6 +13,7 @@ Changes that have landed but are not yet released. ### Changed - Removed transformers, interceptors, and callbacks from `AbstractEventProcessor` +- Removed grouping events by context and dispatching one event per group at flush time. Instead, only maintain one group and flush immediately when an incompatible event is processed. ## [0.2.1] - June 6, 2019 diff --git a/packages/event-processor/__tests__/eventQueue.spec.ts b/packages/event-processor/__tests__/eventQueue.spec.ts index 5a263d9f7..7722410ad 100644 --- a/packages/event-processor/__tests__/eventQueue.spec.ts +++ b/packages/event-processor/__tests__/eventQueue.spec.ts @@ -56,6 +56,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: -1, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -76,6 +77,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 0, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -96,6 +98,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 3, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -123,6 +126,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) queue.start() @@ -145,12 +149,37 @@ describe('eventQueue', () => { queue.stop() }) + it('should invoke the sink function when an item incompatable with the current batch (according to batchComparator) is received', () => { + const sinkFn = jest.fn() + const queue = new DefaultEventQueue({ + flushInterval: 100, + maxQueueSize: 100, + sink: sinkFn, + // This batchComparator returns true when the argument strings start with the same letter + batchComparator: (s1, s2) => s1[0] === s2[0] + }) + + queue.start() + + queue.enqueue('a1') + queue.enqueue('a2') + // After enqueuing these strings, both starting with 'a', the sinkFn should not yet be called. Thus far all the items enqueued are + // compatible according to the batchComparator. + expect(sinkFn).not.toHaveBeenCalled() + + // Enqueuing a string starting with 'b' should cause the sinkFn to be called + queue.enqueue('b1') + expect(sinkFn).toHaveBeenCalledTimes(1) + expect(sinkFn).toHaveBeenCalledWith(['a1', 'a2']) + }) + it('stop() should flush the existing queue and call timer.stop()', () => { const sinkFn = jest.fn() const queue = new DefaultEventQueue({ flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) jest.spyOn(queue.timer, 'stop') @@ -174,6 +203,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) jest.spyOn(queue.timer, 'refresh') @@ -196,6 +226,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) expect(queue.stop()).toBe(promise) @@ -207,6 +238,7 @@ describe('eventQueue', () => { flushInterval: 100, maxQueueSize: 100, sink: sinkFn, + batchComparator: () => true }) queue.start() diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index c1373c27b..5b372c361 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -292,27 +292,66 @@ describe('LogTierV1EventProcessor', () => { }) }) - it("should flush two batches if the event context isn't the same", () => { + it('should flush the current batch when it receives an event with a different context revision than the current batch', async () => { const impressionEvent1 = createImpressionEvent() - const impressionEvent2 = createImpressionEvent() const conversionEvent = createConversionEvent() + const impressionEvent2 = createImpressionEvent() + // createImpressionEvent and createConversionEvent create events with revision '1' + // We modify this one's revision to '2' in order to test that the queue is flushed + // when an event with a different revision is processed. impressionEvent2.context.revision = '2' processor.process(impressionEvent1) - processor.process(impressionEvent2) + processor.process(conversionEvent) expect(dispatchStub).toHaveBeenCalledTimes(0) - processor.process(conversionEvent) + processor.process(impressionEvent2) + + expect(dispatchStub).toHaveBeenCalledTimes(1) + expect(dispatchStub).toHaveBeenCalledWith({ + url: 'https://logx.optimizely.com/v1/events', + httpVerb: 'POST', + params: makeBatchedEventV1([impressionEvent1, conversionEvent]), + }) + + await processor.stop() expect(dispatchStub).toHaveBeenCalledTimes(2) + + expect(dispatchStub).toHaveBeenCalledWith({ + url: 'https://logx.optimizely.com/v1/events', + httpVerb: 'POST', + params: makeBatchedEventV1([impressionEvent2]), + }) + }) + + it('should flush the current batch when it receives an event with a different context projectId than the current batch', async () => { + const impressionEvent1 = createImpressionEvent() + const conversionEvent = createConversionEvent() + const impressionEvent2 = createImpressionEvent() + + impressionEvent2.context.projectId = 'projectId2' + + processor.process(impressionEvent1) + processor.process(conversionEvent) + + expect(dispatchStub).toHaveBeenCalledTimes(0) + + processor.process(impressionEvent2) + + expect(dispatchStub).toHaveBeenCalledTimes(1) expect(dispatchStub).toHaveBeenCalledWith({ url: 'https://logx.optimizely.com/v1/events', httpVerb: 'POST', params: makeBatchedEventV1([impressionEvent1, conversionEvent]), }) + await processor.stop() + + expect(dispatchStub).toHaveBeenCalledTimes(2) + expect(dispatchStub).toHaveBeenCalledWith({ url: 'https://logx.optimizely.com/v1/events', httpVerb: 'POST', diff --git a/packages/event-processor/src/eventProcessor.ts b/packages/event-processor/src/eventProcessor.ts index d42c0c1fa..96887e976 100644 --- a/packages/event-processor/src/eventProcessor.ts +++ b/packages/event-processor/src/eventProcessor.ts @@ -15,7 +15,7 @@ */ // TODO change this to use Managed from js-sdk-models when available import { Managed } from './managed' -import { ConversionEvent, ImpressionEvent } from './events' +import { ConversionEvent, ImpressionEvent, areEventContextsEqual } from './events' import { EventDispatcher, EventV1Request } from './eventDispatcher' import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue' import { getLogger } from '@optimizely/js-sdk-logging' @@ -69,10 +69,11 @@ export abstract class AbstractEventProcessor implements EventProcessor { maxQueueSize = Math.max(1, maxQueueSize) if (maxQueueSize > 1) { - this.queue = new DefaultEventQueue({ + this.queue = new DefaultEventQueue({ flushInterval, maxQueueSize, sink: buffer => this.drainQueue(buffer), + batchComparator: areEventContextsEqual, }) } else { this.queue = new SingleEventQueue({ @@ -82,27 +83,26 @@ export abstract class AbstractEventProcessor implements EventProcessor { this.notificationCenter = notificationCenter } - drainQueue(buffer: ProcessableEvents[]): Promise { - logger.debug('draining queue with %s events', buffer.length) + drainQueue(buffer: ProcessableEvents[]): Promise { + return new Promise(resolve => { + logger.debug('draining queue with %s events', buffer.length) - const promises = this.groupEvents(buffer).map(eventGroup => { - const formattedEvent = this.formatEvents(eventGroup) + if (buffer.length === 0) { + resolve() + return + } - return new Promise(resolve => { - this.dispatcher.dispatchEvent(formattedEvent, () => { - resolve() - }) - - if (this.notificationCenter) { - this.notificationCenter.sendNotifications( - NOTIFICATION_TYPES.LOG_EVENT, - formattedEvent, - ) - } + const formattedEvent = this.formatEvents(buffer) + this.dispatcher.dispatchEvent(formattedEvent, () => { + resolve() }) + if (this.notificationCenter) { + this.notificationCenter.sendNotifications( + NOTIFICATION_TYPES.LOG_EVENT, + formattedEvent, + ) + } }) - - return Promise.all(promises) } process(event: ProcessableEvents): void { @@ -123,7 +123,5 @@ export abstract class AbstractEventProcessor implements EventProcessor { this.queue.start() } - protected abstract groupEvents(events: ProcessableEvents[]): ProcessableEvents[][] - protected abstract formatEvents(events: ProcessableEvents[]): EventV1Request } diff --git a/packages/event-processor/src/eventQueue.ts b/packages/event-processor/src/eventQueue.ts index a8c997dc8..870970a3d 100644 --- a/packages/event-processor/src/eventQueue.ts +++ b/packages/event-processor/src/eventQueue.ts @@ -82,19 +82,25 @@ export class DefaultEventQueue implements EventQueue { private buffer: K[] private maxQueueSize: number private sink: EventQueueSink + // batchComparator is called to determine whether two events can be included + // together in the same batch + private batchComparator: (eventA: K, eventB: K) => boolean constructor({ flushInterval, maxQueueSize, sink, + batchComparator, }: { flushInterval: number maxQueueSize: number sink: EventQueueSink + batchComparator: (eventA: K, eventB: K) => boolean }) { this.buffer = [] this.maxQueueSize = Math.max(maxQueueSize, 1) this.sink = sink + this.batchComparator = batchComparator this.timer = new Timer({ callback: this.flush.bind(this), timeout: flushInterval, @@ -112,6 +118,13 @@ export class DefaultEventQueue implements EventQueue { } enqueue(event: K): void { + // If new event cannot be included into the current batch, flush so it can + // be in its own new batch. + const bufferedEvent: K | undefined = this.buffer[0] + if (bufferedEvent && !this.batchComparator(bufferedEvent, event)) { + this.flush() + } + // start the timer when the first event is put in if (this.buffer.length === 0) { this.timer.refresh() diff --git a/packages/event-processor/src/events.ts b/packages/event-processor/src/events.ts index cb5661c2f..2285ec859 100644 --- a/packages/event-processor/src/events.ts +++ b/packages/event-processor/src/events.ts @@ -80,3 +80,17 @@ export interface ConversionEvent extends BaseEvent { export type EventTags = { [key: string]: string | number | null } + +export function areEventContextsEqual(eventA: BaseEvent, eventB: BaseEvent): boolean { + const contextA = eventA.context + const contextB = eventB.context + return ( + contextA.accountId === contextB.accountId && + contextA.projectId === contextB.projectId && + contextA.clientName === contextB.clientName && + contextA.clientVersion === contextB.clientVersion && + contextA.revision === contextB.revision && + contextA.anonymizeIP === contextB.anonymizeIP && + contextA.botFiltering === contextB.botFiltering + ) +} diff --git a/packages/event-processor/src/v1/v1EventProcessor.ts b/packages/event-processor/src/v1/v1EventProcessor.ts index 9117c0403..3796f5f63 100644 --- a/packages/event-processor/src/v1/v1EventProcessor.ts +++ b/packages/event-processor/src/v1/v1EventProcessor.ts @@ -1,17 +1,24 @@ -import { groupBy, objectValues } from '@optimizely/js-sdk-utils' +/** + * Copyright 2019, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import { AbstractEventProcessor, ProcessableEvents } from '../eventProcessor' import { EventV1Request } from '../eventDispatcher' import { makeBatchedEventV1 } from './buildEventV1' export class LogTierV1EventProcessor extends AbstractEventProcessor { - private buildGroupByKey(event: ProcessableEvents): string { - return objectValues(event.context).join('|') - } - - protected groupEvents(events: ProcessableEvents[]): ProcessableEvents[][] { - return groupBy(events, event => this.buildGroupByKey(event)) - } - protected formatEvents(events: ProcessableEvents[]): EventV1Request { return { url: 'https://logx.optimizely.com/v1/events',