Skip to content

Commit c8befde

Browse files
committed
feat: add agentsdk function for sending logs
This enables external API consumers to easily send logs in order and with failure tolerance.
1 parent b8c07ff commit c8befde

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

codersdk/agentsdk/agentsdk.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,97 @@ func (c *Client) PatchStartupLogs(ctx context.Context, req PatchStartupLogs) err
585585
return nil
586586
}
587587

588+
// QueueStartupLogs debounces log messages at an interval and bulk-writes them using the patch method.
589+
// No requests are made immediately when calling this function, all happened async.
590+
//
591+
// This function is used by API consumers to send startup logs to the server.
592+
func (c *Client) QueueStartupLogs(ctx context.Context, debounce time.Duration) (func(log StartupLog), io.Closer) {
593+
if debounce == 0 {
594+
debounce = 250 * time.Millisecond
595+
}
596+
logChan := make(chan StartupLog, 100)
597+
closeChan := make(chan struct{})
598+
closed := make(chan struct{})
599+
600+
var lastSendErr error
601+
sendLogsWithRetry := func(logs []StartupLog) {
602+
// We don't want to send logs indefinitely, as idle services could
603+
// be spamming a coderd instance. Instead, we extend a lengthy timeout
604+
// to allow for intermittent service disruption.
605+
ctx, cancelFunc := context.WithTimeout(ctx, 15*time.Minute)
606+
defer cancelFunc()
607+
for r := retry.New(250*time.Millisecond, 5*time.Second); r.Wait(ctx); {
608+
err := c.PatchStartupLogs(ctx, PatchStartupLogs{
609+
Logs: logs,
610+
})
611+
if err == nil {
612+
lastSendErr = nil
613+
return
614+
}
615+
var sdkErr *codersdk.Error
616+
if xerrors.As(err, &sdkErr) {
617+
lastSendErr = sdkErr
618+
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
619+
c.SDK.Logger.Warn(ctx, "startup logs too large, dropping logs", slog.F("count", len(logs)))
620+
break
621+
}
622+
if sdkErr.StatusCode() == http.StatusUnauthorized {
623+
continue
624+
}
625+
}
626+
c.SDK.Logger.Warn(ctx, "upload startup logs failed", slog.Error(err), slog.F("count", len(logs)))
627+
}
628+
if ctx.Err() != nil {
629+
c.SDK.Logger.Warn(ctx, "startup logs failed to send in time", slog.Error(lastSendErr), slog.F("count", len(logs)))
630+
}
631+
}
632+
633+
go func() {
634+
defer close(closed)
635+
queuedLogs := make([]StartupLog, 0, 100)
636+
for {
637+
select {
638+
case <-time.After(debounce):
639+
if len(queuedLogs) == 0 {
640+
break
641+
}
642+
sendLogsWithRetry(queuedLogs)
643+
queuedLogs = nil
644+
case log := <-logChan:
645+
queuedLogs = append(queuedLogs, log)
646+
if len(queuedLogs) < 100 {
647+
break
648+
}
649+
sendLogsWithRetry(queuedLogs)
650+
queuedLogs = nil
651+
case <-closeChan:
652+
close(logChan)
653+
for log := range logChan {
654+
queuedLogs = append(queuedLogs, log)
655+
}
656+
if len(queuedLogs) > 0 {
657+
sendLogsWithRetry(queuedLogs)
658+
}
659+
return
660+
case <-ctx.Done():
661+
// Return immediately without sending, because we can't
662+
// send requests with a closed context.
663+
return
664+
}
665+
}
666+
}()
667+
return func(log StartupLog) {
668+
select {
669+
case logChan <- log:
670+
case <-ctx.Done():
671+
}
672+
}, closeFunc(func() error {
673+
close(closeChan)
674+
<-closed
675+
return lastSendErr
676+
})
677+
}
678+
588679
type GitAuthResponse struct {
589680
Username string `json:"username"`
590681
Password string `json:"password"`

codersdk/agentsdk/agentsdk_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package agentsdk_test
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"net/url"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/coder/coder/codersdk"
16+
"github.com/coder/coder/codersdk/agentsdk"
17+
)
18+
19+
func TestQueueStartupLogs(t *testing.T) {
20+
t.Parallel()
21+
t.Run("Spam", func(t *testing.T) {
22+
t.Parallel()
23+
lastLog := 0
24+
totalLogs := 1000
25+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
26+
req := agentsdk.PatchStartupLogs{}
27+
err := json.NewDecoder(r.Body).Decode(&req)
28+
require.NoError(t, err)
29+
for _, log := range req.Logs {
30+
require.Equal(t, strconv.Itoa(lastLog), log.Output)
31+
lastLog++
32+
}
33+
}))
34+
srvURL, err := url.Parse(srv.URL)
35+
require.NoError(t, err)
36+
client := agentsdk.New(srvURL)
37+
sendLog, closer := client.QueueStartupLogs(context.Background(), 0)
38+
for i := 0; i < totalLogs; i++ {
39+
sendLog(agentsdk.StartupLog{
40+
CreatedAt: time.Now(),
41+
Output: strconv.Itoa(i),
42+
Level: codersdk.LogLevelInfo,
43+
})
44+
}
45+
err = closer.Close()
46+
require.NoError(t, err)
47+
require.Equal(t, totalLogs, lastLog)
48+
})
49+
t.Run("Debounce", func(t *testing.T) {
50+
t.Parallel()
51+
got := make(chan agentsdk.StartupLog)
52+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
53+
req := agentsdk.PatchStartupLogs{}
54+
err := json.NewDecoder(r.Body).Decode(&req)
55+
require.NoError(t, err)
56+
for _, log := range req.Logs {
57+
got <- log
58+
}
59+
}))
60+
srvURL, err := url.Parse(srv.URL)
61+
require.NoError(t, err)
62+
client := agentsdk.New(srvURL)
63+
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond)
64+
sendLog(agentsdk.StartupLog{
65+
Output: "hello",
66+
})
67+
gotLog := <-got
68+
require.Equal(t, "hello", gotLog.Output)
69+
err = closer.Close()
70+
require.NoError(t, err)
71+
})
72+
t.Run("RetryOnError", func(t *testing.T) {
73+
t.Parallel()
74+
got := make(chan agentsdk.StartupLog)
75+
first := true
76+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
77+
if first {
78+
w.WriteHeader(http.StatusInternalServerError)
79+
first = false
80+
return
81+
}
82+
req := agentsdk.PatchStartupLogs{}
83+
err := json.NewDecoder(r.Body).Decode(&req)
84+
require.NoError(t, err)
85+
for _, log := range req.Logs {
86+
got <- log
87+
}
88+
}))
89+
srvURL, err := url.Parse(srv.URL)
90+
require.NoError(t, err)
91+
client := agentsdk.New(srvURL)
92+
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond)
93+
sendLog(agentsdk.StartupLog{
94+
Output: "hello",
95+
})
96+
gotLog := <-got
97+
require.Equal(t, "hello", gotLog.Output)
98+
err = closer.Close()
99+
require.NoError(t, err)
100+
})
101+
}

0 commit comments

Comments
 (0)