diff --git a/packages/event-processor/CHANGELOG.MD b/packages/event-processor/CHANGELOG.MD index 75ec3410e..09cce83ee 100644 --- a/packages/event-processor/CHANGELOG.MD +++ b/packages/event-processor/CHANGELOG.MD @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] Changes that have landed but are not yet released. +### New Features +- Promise returned from `stop` method of `EventProcessor` now tracks the state of all in-flight dispatcher requests, not just the final request that was triggered at the time `stop` was called ## [0.3.2] - October 21, 2019 diff --git a/packages/event-processor/__tests__/requestTracker.spec.ts b/packages/event-processor/__tests__/requestTracker.spec.ts new file mode 100644 index 000000000..f6318de50 --- /dev/null +++ b/packages/event-processor/__tests__/requestTracker.spec.ts @@ -0,0 +1,64 @@ +/** + * Copyright 2020, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import RequestTracker from '../src/requestTracker' + +describe('requestTracker', () => { + describe('onRequestsComplete', () => { + it('returns an immediately-fulfilled promise when no requests are in flight', async () => { + const tracker = new RequestTracker() + await tracker.onRequestsComplete() + }) + + it('returns a promise that fulfills after in-flight requests are complete', async () => { + let resolveReq1: () => void + const req1 = new Promise(resolve => { + resolveReq1 = resolve + }) + let resolveReq2: () => void + const req2 = new Promise(resolve => { + resolveReq2 = resolve + }) + let resolveReq3: () => void + const req3 = new Promise(resolve => { + resolveReq3 = resolve + }) + + const tracker = new RequestTracker() + tracker.trackRequest(req1) + tracker.trackRequest(req2) + tracker.trackRequest(req3) + + let reqsComplete = false + const reqsCompletePromise = tracker.onRequestsComplete().then(() => { + reqsComplete = true + }) + + resolveReq1!() + await req1 + expect(reqsComplete).toBe(false) + + resolveReq2!() + await req2 + expect(reqsComplete).toBe(false) + + resolveReq3!() + await req3 + await reqsCompletePromise + expect(reqsComplete).toBe(true) + }) + }) +}) diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index ce6484244..2a58047dc 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -1,5 +1,5 @@ /** - * Copyright 2019, Optimizely + * Copyright 2019-2020, Optimizely * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -261,6 +261,40 @@ describe('LogTierV1EventProcessor', () => { // not have been called again. expect(dispatcher.dispatchEvent).toBeCalledTimes(0) }) + + it('should resolve the stop promise after all dispatcher requests are done', async () => { + const dispatchCbs: Array = [] + const dispatcher = { + dispatchEvent: jest.fn((event: EventV1Request, callback: EventDispatcherCallback) => { + dispatchCbs.push(callback) + }) + } + + const processor = new LogTierV1EventProcessor({ + dispatcher, + flushInterval: 100, + maxQueueSize: 2, + }) + processor.start() + + for (let i = 0; i < 4; i++) { + processor.process(createImpressionEvent()) + } + expect(dispatchCbs.length).toBe(2) + + let stopPromiseResolved = false + const stopPromise = processor.stop().then(() => { + stopPromiseResolved = true + }) + expect(stopPromiseResolved).toBe(false) + + dispatchCbs[0]({ statusCode: 204 }) + jest.advanceTimersByTime(100) + expect(stopPromiseResolved).toBe(false) + dispatchCbs[1]({ statusCode: 204 }) + await stopPromise + expect(stopPromiseResolved).toBe(true) + }) }) describe('when maxQueueSize = 1', () => { diff --git a/packages/event-processor/src/eventProcessor.ts b/packages/event-processor/src/eventProcessor.ts index 96887e976..1eeba4422 100644 --- a/packages/event-processor/src/eventProcessor.ts +++ b/packages/event-processor/src/eventProcessor.ts @@ -1,5 +1,5 @@ /** - * Copyright 2019, Optimizely + * Copyright 2019-2020, Optimizely * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import { EventDispatcher, EventV1Request } from './eventDispatcher' import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue' import { getLogger } from '@optimizely/js-sdk-logging' import { NOTIFICATION_TYPES, NotificationCenter } from '@optimizely/js-sdk-utils' +import RequestTracker from './requestTracker'; const logger = getLogger('EventProcessor') @@ -38,6 +39,7 @@ export abstract class AbstractEventProcessor implements EventProcessor { protected dispatcher: EventDispatcher protected queue: EventQueue private notificationCenter?: NotificationCenter + private requestTracker: RequestTracker constructor({ dispatcher, @@ -81,10 +83,12 @@ export abstract class AbstractEventProcessor implements EventProcessor { }) } this.notificationCenter = notificationCenter + + this.requestTracker = new RequestTracker() } drainQueue(buffer: ProcessableEvents[]): Promise { - return new Promise(resolve => { + const reqPromise = new Promise(resolve => { logger.debug('draining queue with %s events', buffer.length) if (buffer.length === 0) { @@ -103,6 +107,8 @@ export abstract class AbstractEventProcessor implements EventProcessor { ) } }) + this.requestTracker.trackRequest(reqPromise) + return reqPromise } process(event: ProcessableEvents): void { @@ -110,9 +116,10 @@ export abstract class AbstractEventProcessor implements EventProcessor { } stop(): Promise { + // swallow - an error stopping this queue shouldn't prevent this from stopping try { - // swallow, an error stopping this queue should prevent this from stopping - return this.queue.stop() + this.queue.stop() + return this.requestTracker.onRequestsComplete() } catch (e) { logger.error('Error stopping EventProcessor: "%s"', e.message, e) } diff --git a/packages/event-processor/src/requestTracker.ts b/packages/event-processor/src/requestTracker.ts new file mode 100644 index 000000000..e3f774690 --- /dev/null +++ b/packages/event-processor/src/requestTracker.ts @@ -0,0 +1,60 @@ +/** + * Copyright 2020, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * RequestTracker keeps track of in-flight requests for EventProcessor using + * an internal counter. It exposes methods for adding a new request to be + * tracked, and getting a Promise representing the completion of currently + * tracked requests. + */ +class RequestTracker { + private reqsInFlightCount: number = 0 + private reqsCompleteResolvers: Array<() => void> = [] + + /** + * Track the argument request (represented by a Promise). reqPromise will feed + * into the state of Promises returned by onRequestsComplete. + * @param {Promise} reqPromise + */ + public trackRequest(reqPromise: Promise): void { + this.reqsInFlightCount++ + const onReqComplete = () => { + this.reqsInFlightCount-- + if (this.reqsInFlightCount === 0) { + this.reqsCompleteResolvers.forEach(resolver => resolver()) + this.reqsCompleteResolvers = [] + } + } + reqPromise.then(onReqComplete, onReqComplete) + } + + /** + * Return a Promise that fulfills after all currently-tracked request promises + * are resolved. + * @return {Promise} + */ + public onRequestsComplete(): Promise { + return new Promise(resolve => { + if (this.reqsInFlightCount === 0) { + resolve() + } else { + this.reqsCompleteResolvers.push(resolve) + } + }) + } +} + +export default RequestTracker