@@ -176,7 +176,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
176
176
177
177
t0 := dbtime .Now ()
178
178
ls1 := uuid.UUID {0x11 }
179
- hugeLog := make ([]byte , logOutputMaxBytes + 1 )
179
+ hugeLog := make ([]byte , maxBytesPerBatch + 1 )
180
180
for i := range hugeLog {
181
181
hugeLog [i ] = 'q'
182
182
}
@@ -246,14 +246,14 @@ func TestLogSender_Batch(t *testing.T) {
246
246
gotLogs += len (req .Logs )
247
247
wire , err := protobuf .Marshal (req )
248
248
require .NoError (t , err )
249
- require .Less (t , len (wire ), logOutputMaxBytes , "wire should not exceed 1MiB" )
249
+ require .Less (t , len (wire ), maxBytesPerBatch , "wire should not exceed 1MiB" )
250
250
testutil .RequireSendCtx (ctx , t , fDest .resps , & proto.BatchCreateLogsResponse {})
251
251
req = testutil .RequireRecvCtx (ctx , t , fDest .reqs )
252
252
require .NotNil (t , req )
253
253
gotLogs += len (req .Logs )
254
254
wire , err = protobuf .Marshal (req )
255
255
require .NoError (t , err )
256
- require .Less (t , len (wire ), logOutputMaxBytes , "wire should not exceed 1MiB" )
256
+ require .Less (t , len (wire ), maxBytesPerBatch , "wire should not exceed 1MiB" )
257
257
require .Equal (t , 60000 , gotLogs )
258
258
testutil .RequireSendCtx (ctx , t , fDest .resps , & proto.BatchCreateLogsResponse {})
259
259
@@ -262,6 +262,68 @@ func TestLogSender_Batch(t *testing.T) {
262
262
require .NoError (t , err )
263
263
}
264
264
265
+ func TestLogSender_MaxQueuedLogs (t * testing.T ) {
266
+ t .Parallel ()
267
+ testCtx := testutil .Context (t , testutil .WaitShort )
268
+ ctx , cancel := context .WithCancel (testCtx )
269
+ logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
270
+ fDest := newFakeLogDest ()
271
+ uut := newLogSender (logger )
272
+
273
+ t0 := dbtime .Now ()
274
+ ls1 := uuid.UUID {0x11 }
275
+ n := 4
276
+ hugeLog := make ([]byte , maxBytesQueued / n )
277
+ for i := range hugeLog {
278
+ hugeLog [i ] = 'q'
279
+ }
280
+ var logs []agentsdk.Log
281
+ for i := 0 ; i < n ; i ++ {
282
+ logs = append (logs , agentsdk.Log {
283
+ CreatedAt : t0 ,
284
+ Output : string (hugeLog ),
285
+ Level : codersdk .LogLevelInfo ,
286
+ })
287
+ }
288
+ err := uut .enqueue (ls1 , logs ... )
289
+ require .NoError (t , err )
290
+
291
+ // we're now right at the limit of output
292
+ require .Equal (t , maxBytesQueued , uut .outputLen )
293
+
294
+ // adding more logs should error...
295
+ ls2 := uuid.UUID {0x22 }
296
+ err = uut .enqueue (ls2 , logs ... )
297
+ require .ErrorIs (t , err , MaxQueueExceededError )
298
+
299
+ loopErr := make (chan error , 1 )
300
+ go func () {
301
+ err := uut .sendLoop (ctx , fDest )
302
+ loopErr <- err
303
+ }()
304
+
305
+ // ...but, it should still queue up one log from source #2, so that we would exceed the database
306
+ // limit. These come over a total of 3 updates, because due to overhead, the n logs from source
307
+ // #1 come in 2 updates, plus 1 update for source #2.
308
+ logsBySource := make (map [uuid.UUID ]int )
309
+ for i := 0 ; i < 3 ; i ++ {
310
+ req := testutil .RequireRecvCtx (ctx , t , fDest .reqs )
311
+ require .NotNil (t , req )
312
+ srcID , err := uuid .FromBytes (req .LogSourceId )
313
+ require .NoError (t , err )
314
+ logsBySource [srcID ] += len (req .Logs )
315
+ testutil .RequireSendCtx (ctx , t , fDest .resps , & proto.BatchCreateLogsResponse {})
316
+ }
317
+ require .Equal (t , map [uuid.UUID ]int {
318
+ ls1 : n ,
319
+ ls2 : 1 ,
320
+ }, logsBySource )
321
+
322
+ cancel ()
323
+ err = testutil .RequireRecvCtx (testCtx , t , loopErr )
324
+ require .NoError (t , err )
325
+ }
326
+
265
327
type fakeLogDest struct {
266
328
reqs chan * proto.BatchCreateLogsRequest
267
329
resps chan * proto.BatchCreateLogsResponse
0 commit comments