diff --git a/src/logger.ts b/src/logger.ts index 7adf1263..8157324b 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -25,7 +25,6 @@ export const LogId = { telemetryMetadataError: mongoLogId(1_002_005), telemetryDeviceIdFailure: mongoLogId(1_002_006), telemetryDeviceIdTimeout: mongoLogId(1_002_007), - telemetryContainerEnvFailure: mongoLogId(1_002_008), toolExecute: mongoLogId(1_003_001), toolExecuteFailure: mongoLogId(1_003_002), diff --git a/src/server.ts b/src/server.ts index 9012fdf5..b0e8e19c 100644 --- a/src/server.ts +++ b/src/server.ts @@ -130,7 +130,7 @@ export class Server { } } - this.telemetry.emitEvents([event]); + this.telemetry.emitEvents([event]).catch(() => {}); } private registerTools() { diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index 3f543341..ccf0eb41 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -7,152 +7,114 @@ import { MACHINE_METADATA } from "./constants.js"; import { EventCache } from "./eventCache.js"; import nodeMachineId from "node-machine-id"; import { getDeviceId } from "@mongodb-js/device-id"; -import fs from "fs/promises"; - -async function fileExists(filePath: string): Promise { - try { - await fs.access(filePath, fs.constants.F_OK); - return true; // File exists - } catch (e: unknown) { - if ( - e instanceof Error && - ( - e as Error & { - code: string; - } - ).code === "ENOENT" - ) { - return false; // File does not exist - } - throw e; // Re-throw unexpected errors - } -} -async function isContainerized(): Promise { - if (process.env.container) { - return true; - } - - const exists = await Promise.all(["/.dockerenv", "/run/.containerenv", "/var/run/.containerenv"].map(fileExists)); +type EventResult = { + success: boolean; + error?: Error; +}; - return exists.includes(true); -} +export const DEVICE_ID_TIMEOUT = 3000; export class Telemetry { + private isBufferingEvents: boolean = true; + /** Resolves when the device ID is retrieved or timeout occurs */ + public deviceIdPromise: Promise | undefined; private deviceIdAbortController = new AbortController(); private eventCache: EventCache; private getRawMachineId: () => Promise; - private getContainerEnv: () => Promise; - private cachedCommonProperties?: CommonProperties; - private flushing: boolean = false; private constructor( private readonly session: Session, private readonly userConfig: UserConfig, - { - eventCache, - getRawMachineId, - getContainerEnv, - }: { - eventCache: EventCache; - getRawMachineId: () => Promise; - getContainerEnv: () => Promise; - } + private readonly commonProperties: CommonProperties, + { eventCache, getRawMachineId }: { eventCache: EventCache; getRawMachineId: () => Promise } ) { this.eventCache = eventCache; this.getRawMachineId = getRawMachineId; - this.getContainerEnv = getContainerEnv; } static create( session: Session, userConfig: UserConfig, { + commonProperties = { ...MACHINE_METADATA }, eventCache = EventCache.getInstance(), getRawMachineId = () => nodeMachineId.machineId(true), - getContainerEnv = isContainerized, }: { eventCache?: EventCache; getRawMachineId?: () => Promise; - getContainerEnv?: () => Promise; + commonProperties?: CommonProperties; } = {} ): Telemetry { - const instance = new Telemetry(session, userConfig, { - eventCache, - getRawMachineId, - getContainerEnv, - }); + const instance = new Telemetry(session, userConfig, commonProperties, { eventCache, getRawMachineId }); + void instance.start(); return instance; } + private async start(): Promise { + if (!this.isTelemetryEnabled()) { + return; + } + this.deviceIdPromise = getDeviceId({ + getMachineId: () => this.getRawMachineId(), + onError: (reason, error) => { + switch (reason) { + case "resolutionError": + logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); + break; + case "timeout": + logger.debug(LogId.telemetryDeviceIdTimeout, "telemetry", "Device ID retrieval timed out"); + break; + case "abort": + // No need to log in the case of aborts + break; + } + }, + abortSignal: this.deviceIdAbortController.signal, + }); + + this.commonProperties.device_id = await this.deviceIdPromise; + + this.isBufferingEvents = false; + } + public async close(): Promise { this.deviceIdAbortController.abort(); - await this.flush(); + this.isBufferingEvents = false; + await this.emitEvents(this.eventCache.getEvents()); } /** * Emits events through the telemetry pipeline * @param events - The events to emit */ - public emitEvents(events: BaseEvent[]): void { - void this.flush(events); + public async emitEvents(events: BaseEvent[]): Promise { + try { + if (!this.isTelemetryEnabled()) { + logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); + return; + } + + await this.emit(events); + } catch { + logger.debug(LogId.telemetryEmitFailure, "telemetry", `Error emitting telemetry events.`); + } } /** * Gets the common properties for events * @returns Object containing common properties for all events */ - private async getCommonProperties(): Promise { - if (!this.cachedCommonProperties) { - let deviceId: string | undefined; - let containerEnv: boolean | undefined; - try { - await Promise.all([ - getDeviceId({ - getMachineId: () => this.getRawMachineId(), - onError: (reason, error) => { - switch (reason) { - case "resolutionError": - logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); - break; - case "timeout": - logger.debug( - LogId.telemetryDeviceIdTimeout, - "telemetry", - "Device ID retrieval timed out" - ); - break; - case "abort": - // No need to log in the case of aborts - break; - } - }, - abortSignal: this.deviceIdAbortController.signal, - }).then((id) => { - deviceId = id; - }), - this.getContainerEnv().then((env) => { - containerEnv = env; - }), - ]); - } catch (error: unknown) { - const err = error instanceof Error ? error : new Error(String(error)); - logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", err.message); - } - this.cachedCommonProperties = { - ...MACHINE_METADATA, - mcp_client_version: this.session.agentRunner?.version, - mcp_client_name: this.session.agentRunner?.name, - session_id: this.session.sessionId, - config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", - config_connection_string: this.userConfig.connectionString ? "true" : "false", - is_container_env: containerEnv ? "true" : "false", - device_id: deviceId, - }; - } - - return this.cachedCommonProperties; + public getCommonProperties(): CommonProperties { + return { + ...this.commonProperties, + mcp_client_version: this.session.agentRunner?.version, + mcp_client_name: this.session.agentRunner?.name, + session_id: this.session.sessionId, + config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", + config_connection_string: this.userConfig.connectionString ? "true" : "false", + }; } /** @@ -173,74 +135,60 @@ export class Telemetry { } /** - * Attempts to flush events through authenticated and unauthenticated clients + * Attempts to emit events through authenticated and unauthenticated clients * Falls back to caching if both attempts fail */ - public async flush(events?: BaseEvent[]): Promise { - if (!this.isTelemetryEnabled()) { - logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); - return; - } - - if (this.flushing) { - this.eventCache.appendEvents(events ?? []); - process.nextTick(async () => { - // try again if in the middle of a flush - await this.flush(); - }); + private async emit(events: BaseEvent[]): Promise { + if (this.isBufferingEvents) { + this.eventCache.appendEvents(events); return; } - this.flushing = true; + const cachedEvents = this.eventCache.getEvents(); + const allEvents = [...cachedEvents, ...events]; - try { - const cachedEvents = this.eventCache.getEvents(); - const allEvents = [...cachedEvents, ...(events ?? [])]; - if (allEvents.length <= 0) { - this.flushing = false; - return; - } - - logger.debug( - LogId.telemetryEmitStart, - "telemetry", - `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` - ); + logger.debug( + LogId.telemetryEmitStart, + "telemetry", + `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` + ); - await this.sendEvents(this.session.apiClient, allEvents); + const result = await this.sendEvents(this.session.apiClient, allEvents); + if (result.success) { this.eventCache.clearEvents(); logger.debug( LogId.telemetryEmitSuccess, "telemetry", `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}` ); - } catch (error: unknown) { - logger.debug( - LogId.telemetryEmitFailure, - "telemetry", - `Error sending event to client: ${error instanceof Error ? error.message : String(error)}` - ); - this.eventCache.appendEvents(events ?? []); - process.nextTick(async () => { - // try again - await this.flush(); - }); + return; } - this.flushing = false; + logger.debug( + LogId.telemetryEmitFailure, + "telemetry", + `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}` + ); + this.eventCache.appendEvents(events); } /** * Attempts to send events through the provided API client */ - private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise { - const commonProperties = await this.getCommonProperties(); - - await client.sendEvents( - events.map((event) => ({ - ...event, - properties: { ...commonProperties, ...event.properties }, - })) - ); + private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise { + try { + await client.sendEvents( + events.map((event) => ({ + ...event, + properties: { ...this.getCommonProperties(), ...event.properties }, + })) + ); + return { success: true }; + } catch (error) { + return { + success: false, + error: error instanceof Error ? error : new Error(String(error)), + }; + } } } diff --git a/src/telemetry/types.ts b/src/telemetry/types.ts index 05ce8f3f..d77cc010 100644 --- a/src/telemetry/types.ts +++ b/src/telemetry/types.ts @@ -71,5 +71,4 @@ export type CommonProperties = { config_atlas_auth?: TelemetryBoolSet; config_connection_string?: TelemetryBoolSet; session_id?: string; - is_container_env?: TelemetryBoolSet; } & CommonStaticProperties; diff --git a/src/tools/tool.ts b/src/tools/tool.ts index 37375f70..b7cce354 100644 --- a/src/tools/tool.ts +++ b/src/tools/tool.ts @@ -74,12 +74,12 @@ export abstract class ToolBase { logger.debug(LogId.toolExecute, "tool", `Executing tool ${this.name}`); const result = await this.execute(...args); - this.emitToolEvent(startTime, result, ...args); + await this.emitToolEvent(startTime, result, ...args).catch(() => {}); return result; } catch (error: unknown) { logger.error(LogId.toolExecuteFailure, "tool", `Error executing ${this.name}: ${error as string}`); const toolResult = await this.handleError(error, args[0] as ToolArgs); - this.emitToolEvent(startTime, toolResult, ...args); + await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {}); return toolResult; } }; @@ -179,11 +179,11 @@ export abstract class ToolBase { * @param result - Whether the command succeeded or failed * @param args - The arguments passed to the tool */ - private emitToolEvent( + private async emitToolEvent( startTime: number, result: CallToolResult, ...args: Parameters> - ): void { + ): Promise { if (!this.telemetry.isTelemetryEnabled()) { return; } @@ -209,6 +209,6 @@ export abstract class ToolBase { event.properties.project_id = metadata.projectId; } - this.telemetry.emitEvents([event]); + await this.telemetry.emitEvents([event]); } } diff --git a/tests/integration/telemetry.test.ts b/tests/integration/telemetry.test.ts new file mode 100644 index 00000000..522c1154 --- /dev/null +++ b/tests/integration/telemetry.test.ts @@ -0,0 +1,28 @@ +import { createHmac } from "crypto"; +import { Telemetry } from "../../src/telemetry/telemetry.js"; +import { Session } from "../../src/session.js"; +import { config } from "../../src/config.js"; +import nodeMachineId from "node-machine-id"; + +describe("Telemetry", () => { + it("should resolve the actual machine ID", async () => { + const actualId: string = await nodeMachineId.machineId(true); + + const actualHashedId = createHmac("sha256", actualId.toUpperCase()).update("atlascli").digest("hex"); + + const telemetry = Telemetry.create( + new Session({ + apiBaseUrl: "", + }), + config + ); + + expect(telemetry.getCommonProperties().device_id).toBe(undefined); + expect(telemetry["isBufferingEvents"]).toBe(true); + + await telemetry.deviceIdPromise; + + expect(telemetry.getCommonProperties().device_id).toBe(actualHashedId); + expect(telemetry["isBufferingEvents"]).toBe(false); + }); +}); diff --git a/tests/unit/telemetry.test.ts b/tests/unit/telemetry.test.ts index 3e27f9eb..c1ae28ea 100644 --- a/tests/unit/telemetry.test.ts +++ b/tests/unit/telemetry.test.ts @@ -1,6 +1,6 @@ import { ApiClient } from "../../src/common/atlas/apiClient.js"; import { Session } from "../../src/session.js"; -import { Telemetry } from "../../src/telemetry/telemetry.js"; +import { DEVICE_ID_TIMEOUT, Telemetry } from "../../src/telemetry/telemetry.js"; import { BaseEvent, TelemetryResult } from "../../src/telemetry/types.js"; import { EventCache } from "../../src/telemetry/eventCache.js"; import { config } from "../../src/config.js"; @@ -16,8 +16,6 @@ const MockApiClient = ApiClient as jest.MockedClass; jest.mock("../../src/telemetry/eventCache.js"); const MockEventCache = EventCache as jest.MockedClass; -const nextTick = () => new Promise((resolve) => process.nextTick(resolve)); - describe("Telemetry", () => { const machineId = "test-machine-id"; const hashedMachineId = createHmac("sha256", machineId.toUpperCase()).update("atlascli").digest("hex"); @@ -26,11 +24,6 @@ describe("Telemetry", () => { let mockEventCache: jest.Mocked; let session: Session; let telemetry: Telemetry; - let telemetryConfig: { - eventCache: EventCache; - getRawMachineId: () => Promise; - getContainerEnv: () => Promise; - }; // Helper function to create properly typed test events function createTestEvent(options?: { @@ -84,11 +77,19 @@ describe("Telemetry", () => { expect(appendEvents.length).toBe(appendEventsCalls); if (sendEventsCalledWith) { - expect(sendEvents[0]?.[0]).toMatchObject(sendEventsCalledWith); + expect(sendEvents[0]?.[0]).toEqual( + sendEventsCalledWith.map((event) => ({ + ...event, + properties: { + ...telemetry.getCommonProperties(), + ...event.properties, + }, + })) + ); } if (appendEventsCalledWith) { - expect(appendEvents[0]?.[0]).toMatchObject(appendEventsCalledWith); + expect(appendEvents[0]?.[0]).toEqual(appendEventsCalledWith); } } @@ -124,13 +125,10 @@ describe("Telemetry", () => { setAgentRunner: jest.fn().mockResolvedValue(undefined), } as unknown as Session; - telemetryConfig = { + telemetry = Telemetry.create(session, config, { eventCache: mockEventCache, getRawMachineId: () => Promise.resolve(machineId), - getContainerEnv: () => Promise.resolve(false), - }; - - telemetry = Telemetry.create(session, config, telemetryConfig); + }); config.telemetry = "enabled"; }); @@ -140,8 +138,7 @@ describe("Telemetry", () => { it("should send events successfully", async () => { const testEvent = createTestEvent(); - telemetry.emitEvents([testEvent]); - await nextTick(); // wait for the event to be sent + await telemetry.emitEvents([testEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -155,8 +152,7 @@ describe("Telemetry", () => { const testEvent = createTestEvent(); - telemetry.emitEvents([testEvent]); - await nextTick(); // wait for the event to be sent + await telemetry.emitEvents([testEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -179,8 +175,7 @@ describe("Telemetry", () => { // Set up mock to return cached events mockEventCache.getEvents.mockReturnValueOnce([cachedEvent]); - telemetry.emitEvents([newEvent]); - await nextTick(); // wait for the event to be sent + await telemetry.emitEvents([newEvent]); verifyMockCalls({ sendEventsCalls: 1, @@ -189,7 +184,9 @@ describe("Telemetry", () => { }); }); - it("should correctly add common properties to events", async () => { + it("should correctly add common properties to events", () => { + const commonProps = telemetry.getCommonProperties(); + // Use explicit type assertion const expectedProps: Record = { mcp_client_version: "1.0.0", @@ -200,86 +197,48 @@ describe("Telemetry", () => { device_id: hashedMachineId, }; - const testEvent = createTestEvent(); - - telemetry.emitEvents([testEvent]); - await nextTick(); // wait for the event to be sent - - const checkEvent = { - ...testEvent, - properties: { - ...testEvent.properties, - ...expectedProps, - }, - }; - - verifyMockCalls({ - sendEventsCalls: 1, - clearEventsCalls: 1, - sendEventsCalledWith: [checkEvent], - }); + expect(commonProps).toMatchObject(expectedProps); }); - it("should send cache new event while sending another event", async () => { - const newEvent = createTestEvent({ - command: "new-command", - component: "new-component", + describe("machine ID resolution", () => { + beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers(); }); - const newEvent2 = createTestEvent({ - command: "new-command-2", - component: "new-component-2", + afterEach(() => { + jest.clearAllMocks(); + jest.useRealTimers(); }); - telemetry.emitEvents([newEvent]); - telemetry.emitEvents([newEvent2]); + it("should successfully resolve the machine ID", async () => { + telemetry = Telemetry.create(session, config, { + getRawMachineId: () => Promise.resolve(machineId), + }); - await nextTick(); // wait for the event to be sent + expect(telemetry["isBufferingEvents"]).toBe(true); + expect(telemetry.getCommonProperties().device_id).toBe(undefined); - verifyMockCalls({ - sendEventsCalls: 1, - clearEventsCalls: 1, - appendEventsCalls: 1, - sendEventsCalledWith: [newEvent], - appendEventsCalledWith: [newEvent2], - }); - }); + await telemetry.deviceIdPromise; - describe("machine ID resolution", () => { - it("should successfully resolve the machine ID", async () => { - const testEvent = createTestEvent(); - - telemetry.emitEvents([testEvent]); - await nextTick(); // wait for the event to be sent - - const checkEvent = { - ...testEvent, - properties: { - ...testEvent.properties, - device_id: hashedMachineId, - }, - }; - - verifyMockCalls({ - sendEventsCalls: 1, - clearEventsCalls: 1, - sendEventsCalledWith: [checkEvent], - }); + expect(telemetry["isBufferingEvents"]).toBe(false); + expect(telemetry.getCommonProperties().device_id).toBe(hashedMachineId); }); it("should handle machine ID resolution failure", async () => { const loggerSpy = jest.spyOn(logger, "debug"); telemetry = Telemetry.create(session, config, { - ...telemetryConfig, getRawMachineId: () => Promise.reject(new Error("Failed to get device ID")), }); - const testEvent = createTestEvent(); + expect(telemetry["isBufferingEvents"]).toBe(true); + expect(telemetry.getCommonProperties().device_id).toBe(undefined); - telemetry.emitEvents([testEvent]); + await telemetry.deviceIdPromise; - await nextTick(); // wait for the event to be sent + expect(telemetry["isBufferingEvents"]).toBe(false); + expect(telemetry.getCommonProperties().device_id).toBe("unknown"); expect(loggerSpy).toHaveBeenCalledWith( LogId.telemetryDeviceIdFailure, @@ -288,28 +247,27 @@ describe("Telemetry", () => { ); }); - it("should timeout if machine ID resolution takes too long", () => { + it("should timeout if machine ID resolution takes too long", async () => { const loggerSpy = jest.spyOn(logger, "debug"); - jest.useFakeTimers(); - - telemetry = Telemetry.create(session, config, { - ...telemetryConfig, - getRawMachineId: () => new Promise(() => {}), // Never resolves - }); + telemetry = Telemetry.create(session, config, { getRawMachineId: () => new Promise(() => {}) }); - const testEvent = createTestEvent(); + expect(telemetry["isBufferingEvents"]).toBe(true); + expect(telemetry.getCommonProperties().device_id).toBe(undefined); - telemetry.emitEvents([testEvent]); + jest.advanceTimersByTime(DEVICE_ID_TIMEOUT / 2); - jest.advanceTimersByTime(5000); + // Make sure the timeout doesn't happen prematurely. + expect(telemetry["isBufferingEvents"]).toBe(true); + expect(telemetry.getCommonProperties().device_id).toBe(undefined); - jest.useRealTimers(); + jest.advanceTimersByTime(DEVICE_ID_TIMEOUT); - expect(loggerSpy).toHaveBeenCalledTimes(2); + await telemetry.deviceIdPromise; - expect(loggerSpy).toHaveBeenNthCalledWith( - 2, + expect(telemetry.getCommonProperties().device_id).toBe("unknown"); + expect(telemetry["isBufferingEvents"]).toBe(false); + expect(loggerSpy).toHaveBeenCalledWith( LogId.telemetryDeviceIdTimeout, "telemetry", "Device ID retrieval timed out" @@ -330,12 +288,9 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - telemetry.emitEvents([testEvent]); - await nextTick(); // wait for the event to be sent + await telemetry.emitEvents([testEvent]); - verifyMockCalls({ - sendEventsCalls: 0, - }); + verifyMockCalls(); }); }); @@ -358,12 +313,9 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - telemetry.emitEvents([testEvent]); - await nextTick(); // wait for the event to be sent + await telemetry.emitEvents([testEvent]); - verifyMockCalls({ - sendEventsCalls: 0, - }); + verifyMockCalls(); }); }); });