@@ -90,12 +90,31 @@ func LogsWriter(ctx context.Context, sender func(ctx context.Context, log ...Log
90
90
}
91
91
}
92
92
93
+ // LogsSenderFlushTimeout changes the default flush timeout (250ms),
94
+ // this is mostly useful for tests.
95
+ func LogsSenderFlushTimeout (timeout time.Duration ) func (* logsSenderOptions ) {
96
+ return func (o * logsSenderOptions ) {
97
+ o .flushTimeout = timeout
98
+ }
99
+ }
100
+
101
+ type logsSenderOptions struct {
102
+ flushTimeout time.Duration
103
+ }
104
+
93
105
// LogsSender will send agent startup logs to the server. Calls to
94
106
// sendLog are non-blocking and will return an error if flushAndClose
95
107
// has been called. Calling sendLog concurrently is not supported. If
96
108
// the context passed to flushAndClose is canceled, any remaining logs
97
109
// will be discarded.
98
- func LogsSender (sourceID uuid.UUID , patchLogs func (ctx context.Context , req PatchLogs ) error , logger slog.Logger ) (sendLog func (ctx context.Context , log ... Log ) error , flushAndClose func (context.Context ) error ) {
110
+ func LogsSender (sourceID uuid.UUID , patchLogs func (ctx context.Context , req PatchLogs ) error , logger slog.Logger , opts ... func (* logsSenderOptions )) (sendLog func (ctx context.Context , log ... Log ) error , flushAndClose func (context.Context ) error ) {
111
+ o := logsSenderOptions {
112
+ flushTimeout : 250 * time .Millisecond ,
113
+ }
114
+ for _ , opt := range opts {
115
+ opt (& o )
116
+ }
117
+
99
118
// The main context is used to close the sender goroutine and cancel
100
119
// any outbound requests to the API. The shutdown context is used to
101
120
// signal the sender goroutine to flush logs and then exit.
@@ -109,10 +128,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
109
128
// Set flushTimeout and backlogLimit so that logs are uploaded
110
129
// once every 250ms or when 100 logs have been added to the
111
130
// backlog, whichever comes first.
112
- flushTimeout := 250 * time .Millisecond
113
131
backlogLimit := 100
114
132
115
- flush := time .NewTicker (flushTimeout )
133
+ flush := time .NewTicker (o . flushTimeout )
116
134
117
135
var backlog []Log
118
136
defer func () {
@@ -153,8 +171,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
153
171
// error occurs. Note that we use the main context here,
154
172
// meaning these requests won't be interrupted by
155
173
// shutdown.
156
- for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ) && ctx .Err () == nil ; {
157
- err := patchLogs (ctx , PatchLogs {
174
+ var err error
175
+ for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
176
+ err = patchLogs (ctx , PatchLogs {
158
177
Logs : backlog ,
159
178
LogSourceID : sourceID ,
160
179
})
@@ -163,26 +182,27 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
163
182
}
164
183
165
184
if errors .Is (err , context .Canceled ) {
166
- return
185
+ break
167
186
}
168
187
// This error is expected to be codersdk.Error, but it has
169
188
// private fields so we can't fake it in tests.
170
189
var statusErr interface { StatusCode () int }
171
190
if errors .As (err , & statusErr ) {
172
191
if statusErr .StatusCode () == http .StatusRequestEntityTooLarge {
173
192
logger .Warn (ctx , "startup logs too large, discarding logs" , slog .F ("discarded_logs_count" , len (backlog )), slog .Error (err ))
193
+ err = nil
174
194
break
175
195
}
176
196
}
177
197
logger .Error (ctx , "startup logs sender failed to upload logs, retrying later" , slog .F ("logs_count" , len (backlog )), slog .Error (err ))
178
198
}
179
- if ctx . Err () != nil {
199
+ if err != nil {
180
200
return
181
201
}
182
202
backlog = nil
183
203
184
204
// Anchor flush to the last log upload.
185
- flush .Reset (flushTimeout )
205
+ flush .Reset (o . flushTimeout )
186
206
}
187
207
if done {
188
208
return
0 commit comments