@@ -7,114 +7,152 @@ import { MACHINE_METADATA } from "./constants.js";
7
7
import { EventCache } from "./eventCache.js" ;
8
8
import nodeMachineId from "node-machine-id" ;
9
9
import { getDeviceId } from "@mongodb-js/device-id" ;
10
+ import fs from "fs/promises" ;
11
+
12
+ async function fileExists ( filePath : string ) : Promise < boolean > {
13
+ try {
14
+ await fs . access ( filePath , fs . constants . F_OK ) ;
15
+ return true ; // File exists
16
+ } catch ( e : unknown ) {
17
+ if (
18
+ e instanceof Error &&
19
+ (
20
+ e as Error & {
21
+ code : string ;
22
+ }
23
+ ) . code === "ENOENT"
24
+ ) {
25
+ return false ; // File does not exist
26
+ }
27
+ throw e ; // Re-throw unexpected errors
28
+ }
29
+ }
10
30
11
- type EventResult = {
12
- success : boolean ;
13
- error ?: Error ;
14
- } ;
31
+ async function isContainerized ( ) : Promise < boolean > {
32
+ if ( process . env . container ) {
33
+ return true ;
34
+ }
35
+
36
+ const exists = await Promise . all ( [ "/.dockerenv" , "/run/.containerenv" , "/var/run/.containerenv" ] . map ( fileExists ) ) ;
15
37
16
- export const DEVICE_ID_TIMEOUT = 3000 ;
38
+ return exists . includes ( true ) ;
39
+ }
17
40
18
41
export class Telemetry {
19
- private isBufferingEvents : boolean = true ;
20
- /** Resolves when the device ID is retrieved or timeout occurs */
21
- public deviceIdPromise : Promise < string > | undefined ;
22
42
private deviceIdAbortController = new AbortController ( ) ;
23
43
private eventCache : EventCache ;
24
44
private getRawMachineId : ( ) => Promise < string > ;
45
+ private getContainerEnv : ( ) => Promise < boolean > ;
46
+ private cachedCommonProperties ?: CommonProperties ;
47
+ private flushing : boolean = false ;
25
48
26
49
private constructor (
27
50
private readonly session : Session ,
28
51
private readonly userConfig : UserConfig ,
29
- private readonly commonProperties : CommonProperties ,
30
- { eventCache, getRawMachineId } : { eventCache : EventCache ; getRawMachineId : ( ) => Promise < string > }
52
+ {
53
+ eventCache,
54
+ getRawMachineId,
55
+ getContainerEnv,
56
+ } : {
57
+ eventCache : EventCache ;
58
+ getRawMachineId : ( ) => Promise < string > ;
59
+ getContainerEnv : ( ) => Promise < boolean > ;
60
+ }
31
61
) {
32
62
this . eventCache = eventCache ;
33
63
this . getRawMachineId = getRawMachineId ;
64
+ this . getContainerEnv = getContainerEnv ;
34
65
}
35
66
36
67
static create (
37
68
session : Session ,
38
69
userConfig : UserConfig ,
39
70
{
40
- commonProperties = { ...MACHINE_METADATA } ,
41
71
eventCache = EventCache . getInstance ( ) ,
42
72
getRawMachineId = ( ) => nodeMachineId . machineId ( true ) ,
73
+ getContainerEnv = isContainerized ,
43
74
} : {
44
75
eventCache ?: EventCache ;
45
76
getRawMachineId ?: ( ) => Promise < string > ;
46
- commonProperties ?: CommonProperties ;
77
+ getContainerEnv ?: ( ) => Promise < boolean > ;
47
78
} = { }
48
79
) : Telemetry {
49
- const instance = new Telemetry ( session , userConfig , commonProperties , { eventCache, getRawMachineId } ) ;
50
-
51
- void instance . start ( ) ;
52
- return instance ;
53
- }
54
-
55
- private async start ( ) : Promise < void > {
56
- if ( ! this . isTelemetryEnabled ( ) ) {
57
- return ;
58
- }
59
- this . deviceIdPromise = getDeviceId ( {
60
- getMachineId : ( ) => this . getRawMachineId ( ) ,
61
- onError : ( reason , error ) => {
62
- switch ( reason ) {
63
- case "resolutionError" :
64
- logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
65
- break ;
66
- case "timeout" :
67
- logger . debug ( LogId . telemetryDeviceIdTimeout , "telemetry" , "Device ID retrieval timed out" ) ;
68
- break ;
69
- case "abort" :
70
- // No need to log in the case of aborts
71
- break ;
72
- }
73
- } ,
74
- abortSignal : this . deviceIdAbortController . signal ,
80
+ const instance = new Telemetry ( session , userConfig , {
81
+ eventCache,
82
+ getRawMachineId,
83
+ getContainerEnv,
75
84
} ) ;
76
85
77
- this . commonProperties . device_id = await this . deviceIdPromise ;
78
-
79
- this . isBufferingEvents = false ;
86
+ return instance ;
80
87
}
81
88
82
89
public async close ( ) : Promise < void > {
83
90
this . deviceIdAbortController . abort ( ) ;
84
- this . isBufferingEvents = false ;
85
- await this . emitEvents ( this . eventCache . getEvents ( ) ) ;
91
+ await this . flush ( ) ;
86
92
}
87
93
88
94
/**
89
95
* Emits events through the telemetry pipeline
90
96
* @param events - The events to emit
91
97
*/
92
- public async emitEvents ( events : BaseEvent [ ] ) : Promise < void > {
93
- try {
94
- if ( ! this . isTelemetryEnabled ( ) ) {
95
- logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
96
- return ;
97
- }
98
-
99
- await this . emit ( events ) ;
100
- } catch {
101
- logger . debug ( LogId . telemetryEmitFailure , "telemetry" , `Error emitting telemetry events.` ) ;
102
- }
98
+ public emitEvents ( events : BaseEvent [ ] ) : void {
99
+ void this . flush ( events ) ;
103
100
}
104
101
105
102
/**
106
103
* Gets the common properties for events
107
104
* @returns Object containing common properties for all events
108
105
*/
109
- public getCommonProperties ( ) : CommonProperties {
110
- return {
111
- ...this . commonProperties ,
112
- mcp_client_version : this . session . agentRunner ?. version ,
113
- mcp_client_name : this . session . agentRunner ?. name ,
114
- session_id : this . session . sessionId ,
115
- config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
116
- config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
117
- } ;
106
+ private async getCommonProperties ( ) : Promise < CommonProperties > {
107
+ if ( ! this . cachedCommonProperties ) {
108
+ let deviceId : string | undefined ;
109
+ let containerEnv : boolean | undefined ;
110
+ try {
111
+ await Promise . all ( [
112
+ getDeviceId ( {
113
+ getMachineId : ( ) => this . getRawMachineId ( ) ,
114
+ onError : ( reason , error ) => {
115
+ switch ( reason ) {
116
+ case "resolutionError" :
117
+ logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
118
+ break ;
119
+ case "timeout" :
120
+ logger . debug (
121
+ LogId . telemetryDeviceIdTimeout ,
122
+ "telemetry" ,
123
+ "Device ID retrieval timed out"
124
+ ) ;
125
+ break ;
126
+ case "abort" :
127
+ // No need to log in the case of aborts
128
+ break ;
129
+ }
130
+ } ,
131
+ abortSignal : this . deviceIdAbortController . signal ,
132
+ } ) . then ( ( id ) => {
133
+ deviceId = id ;
134
+ } ) ,
135
+ this . getContainerEnv ( ) . then ( ( env ) => {
136
+ containerEnv = env ;
137
+ } ) ,
138
+ ] ) ;
139
+ } catch ( error : unknown ) {
140
+ const err = error instanceof Error ? error : new Error ( String ( error ) ) ;
141
+ logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , err . message ) ;
142
+ }
143
+ this . cachedCommonProperties = {
144
+ ...MACHINE_METADATA ,
145
+ mcp_client_version : this . session . agentRunner ?. version ,
146
+ mcp_client_name : this . session . agentRunner ?. name ,
147
+ session_id : this . session . sessionId ,
148
+ config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
149
+ config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
150
+ is_container_env : containerEnv ? "true" : "false" ,
151
+ device_id : deviceId ,
152
+ } ;
153
+ }
154
+
155
+ return this . cachedCommonProperties ;
118
156
}
119
157
120
158
/**
@@ -135,60 +173,74 @@ export class Telemetry {
135
173
}
136
174
137
175
/**
138
- * Attempts to emit events through authenticated and unauthenticated clients
176
+ * Attempts to flush events through authenticated and unauthenticated clients
139
177
* Falls back to caching if both attempts fail
140
178
*/
141
- private async emit ( events : BaseEvent [ ] ) : Promise < void > {
142
- if ( this . isBufferingEvents ) {
143
- this . eventCache . appendEvents ( events ) ;
179
+ public async flush ( events ? : BaseEvent [ ] ) : Promise < void > {
180
+ if ( ! this . isTelemetryEnabled ( ) ) {
181
+ logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
144
182
return ;
145
183
}
146
184
147
- const cachedEvents = this . eventCache . getEvents ( ) ;
148
- const allEvents = [ ...cachedEvents , ...events ] ;
185
+ if ( this . flushing ) {
186
+ this . eventCache . appendEvents ( events ?? [ ] ) ;
187
+ process . nextTick ( async ( ) => {
188
+ // try again if in the middle of a flush
189
+ await this . flush ( ) ;
190
+ } ) ;
191
+ return ;
192
+ }
149
193
150
- logger . debug (
151
- LogId . telemetryEmitStart ,
152
- "telemetry" ,
153
- `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
154
- ) ;
194
+ this . flushing = true ;
155
195
156
- const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
157
- if ( result . success ) {
196
+ try {
197
+ const cachedEvents = this . eventCache . getEvents ( ) ;
198
+ const allEvents = [ ...cachedEvents , ...( events ?? [ ] ) ] ;
199
+ if ( allEvents . length <= 0 ) {
200
+ this . flushing = false ;
201
+ return ;
202
+ }
203
+
204
+ logger . debug (
205
+ LogId . telemetryEmitStart ,
206
+ "telemetry" ,
207
+ `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
208
+ ) ;
209
+
210
+ await this . sendEvents ( this . session . apiClient , allEvents ) ;
158
211
this . eventCache . clearEvents ( ) ;
159
212
logger . debug (
160
213
LogId . telemetryEmitSuccess ,
161
214
"telemetry" ,
162
215
`Sent ${ allEvents . length } events successfully: ${ JSON . stringify ( allEvents , null , 2 ) } `
163
216
) ;
164
- return ;
217
+ } catch ( error : unknown ) {
218
+ logger . debug (
219
+ LogId . telemetryEmitFailure ,
220
+ "telemetry" ,
221
+ `Error sending event to client: ${ error instanceof Error ? error . message : String ( error ) } `
222
+ ) ;
223
+ this . eventCache . appendEvents ( events ?? [ ] ) ;
224
+ process . nextTick ( async ( ) => {
225
+ // try again
226
+ await this . flush ( ) ;
227
+ } ) ;
165
228
}
166
229
167
- logger . debug (
168
- LogId . telemetryEmitFailure ,
169
- "telemetry" ,
170
- `Error sending event to client: ${ result . error instanceof Error ? result . error . message : String ( result . error ) } `
171
- ) ;
172
- this . eventCache . appendEvents ( events ) ;
230
+ this . flushing = false ;
173
231
}
174
232
175
233
/**
176
234
* Attempts to send events through the provided API client
177
235
*/
178
- private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < EventResult > {
179
- try {
180
- await client . sendEvents (
181
- events . map ( ( event ) => ( {
182
- ...event ,
183
- properties : { ...this . getCommonProperties ( ) , ...event . properties } ,
184
- } ) )
185
- ) ;
186
- return { success : true } ;
187
- } catch ( error ) {
188
- return {
189
- success : false ,
190
- error : error instanceof Error ? error : new Error ( String ( error ) ) ,
191
- } ;
192
- }
236
+ private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < void > {
237
+ const commonProperties = await this . getCommonProperties ( ) ;
238
+
239
+ await client . sendEvents (
240
+ events . map ( ( event ) => ( {
241
+ ...event ,
242
+ properties : { ...commonProperties , ...event . properties } ,
243
+ } ) )
244
+ ) ;
193
245
}
194
246
}
0 commit comments