Skip to content

Commit 9583910

Browse files
committed
feat: add agentsdk function for sending logs
This enables external API consumers to easily send logs in order and with failure tolerance.
1 parent b8c07ff commit 9583910

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

codersdk/agentsdk/agentsdk.go

+104
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/http/cookiejar"
1111
"net/url"
1212
"strconv"
13+
"sync"
1314
"time"
1415

1516
"cloud.google.com/go/compute/metadata"
@@ -585,6 +586,109 @@ func (c *Client) PatchStartupLogs(ctx context.Context, req PatchStartupLogs) err
585586
return nil
586587
}
587588

589+
// QueueStartupLogs debounces log messages at an interval and bulk-writes them using the patch method.
590+
// No requests are made immediately when calling this function, all happened async.
591+
//
592+
// This function is used by API consumers to send startup logs to the server.
593+
func (c *Client) QueueStartupLogs(ctx context.Context, debounce time.Duration) (func(log StartupLog), io.Closer) {
594+
if debounce == 0 {
595+
debounce = 250 * time.Millisecond
596+
}
597+
logChan := make(chan StartupLog, 100)
598+
closeChan := make(chan struct{})
599+
closed := make(chan struct{})
600+
closeMutex := sync.Mutex{}
601+
602+
var lastSendErr error
603+
sendLogsWithRetry := func(logs []StartupLog) {
604+
// We don't want to send logs indefinitely, as idle services could
605+
// be spamming a coderd instance. Instead, we extend a lengthy timeout
606+
// to allow for intermittent service disruption.
607+
ctx, cancelFunc := context.WithTimeout(ctx, 15*time.Minute)
608+
defer cancelFunc()
609+
for r := retry.New(250*time.Millisecond, 5*time.Second); r.Wait(ctx); {
610+
err := c.PatchStartupLogs(ctx, PatchStartupLogs{
611+
Logs: logs,
612+
})
613+
if err == nil {
614+
lastSendErr = nil
615+
return
616+
}
617+
var sdkErr *codersdk.Error
618+
if xerrors.As(err, &sdkErr) {
619+
lastSendErr = sdkErr
620+
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
621+
c.SDK.Logger.Warn(ctx, "startup logs too large, dropping logs", slog.F("count", len(logs)))
622+
break
623+
}
624+
if sdkErr.StatusCode() == http.StatusUnauthorized {
625+
continue
626+
}
627+
}
628+
c.SDK.Logger.Warn(ctx, "upload startup logs failed", slog.Error(err), slog.F("count", len(logs)))
629+
}
630+
if ctx.Err() != nil {
631+
c.SDK.Logger.Warn(ctx, "startup logs failed to send in time", slog.Error(lastSendErr), slog.F("count", len(logs)))
632+
}
633+
}
634+
635+
go func() {
636+
defer close(closed)
637+
queuedLogs := make([]StartupLog, 0, 100)
638+
for {
639+
select {
640+
case <-time.After(debounce):
641+
if len(queuedLogs) == 0 {
642+
break
643+
}
644+
sendLogsWithRetry(queuedLogs)
645+
queuedLogs = nil
646+
case log := <-logChan:
647+
queuedLogs = append(queuedLogs, log)
648+
if len(queuedLogs) < 100 {
649+
break
650+
}
651+
sendLogsWithRetry(queuedLogs)
652+
queuedLogs = nil
653+
case <-closeChan:
654+
close(logChan)
655+
for log := range logChan {
656+
queuedLogs = append(queuedLogs, log)
657+
}
658+
if len(queuedLogs) > 0 {
659+
sendLogsWithRetry(queuedLogs)
660+
}
661+
return
662+
case <-ctx.Done():
663+
// Return immediately without sending, because we can't
664+
// send requests with a closed context.
665+
return
666+
}
667+
}
668+
}()
669+
return func(log StartupLog) {
670+
closeMutex.Lock()
671+
defer closeMutex.Unlock()
672+
select {
673+
case <-closed:
674+
return
675+
case logChan <- log:
676+
case <-ctx.Done():
677+
}
678+
}, closeFunc(func() error {
679+
closeMutex.Lock()
680+
defer closeMutex.Unlock()
681+
select {
682+
case <-closed:
683+
return nil
684+
default:
685+
}
686+
close(closeChan)
687+
<-closed
688+
return lastSendErr
689+
})
690+
}
691+
588692
type GitAuthResponse struct {
589693
Username string `json:"username"`
590694
Password string `json:"password"`

codersdk/agentsdk/agentsdk_test.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package agentsdk_test
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"net/url"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/coder/coder/codersdk"
16+
"github.com/coder/coder/codersdk/agentsdk"
17+
)
18+
19+
func TestQueueStartupLogs(t *testing.T) {
20+
t.Parallel()
21+
t.Run("Spam", func(t *testing.T) {
22+
t.Parallel()
23+
lastLog := 0
24+
totalLogs := 1000
25+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
26+
req := agentsdk.PatchStartupLogs{}
27+
err := json.NewDecoder(r.Body).Decode(&req)
28+
require.NoError(t, err)
29+
for _, log := range req.Logs {
30+
require.Equal(t, strconv.Itoa(lastLog), log.Output)
31+
lastLog++
32+
}
33+
}))
34+
srvURL, err := url.Parse(srv.URL)
35+
require.NoError(t, err)
36+
client := agentsdk.New(srvURL)
37+
sendLog, closer := client.QueueStartupLogs(context.Background(), 0)
38+
for i := 0; i < totalLogs; i++ {
39+
sendLog(agentsdk.StartupLog{
40+
CreatedAt: time.Now(),
41+
Output: strconv.Itoa(i),
42+
Level: codersdk.LogLevelInfo,
43+
})
44+
}
45+
err = closer.Close()
46+
require.NoError(t, err)
47+
require.Equal(t, totalLogs, lastLog)
48+
})
49+
t.Run("Debounce", func(t *testing.T) {
50+
t.Parallel()
51+
got := make(chan agentsdk.StartupLog)
52+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
53+
req := agentsdk.PatchStartupLogs{}
54+
err := json.NewDecoder(r.Body).Decode(&req)
55+
require.NoError(t, err)
56+
for _, log := range req.Logs {
57+
got <- log
58+
}
59+
}))
60+
srvURL, err := url.Parse(srv.URL)
61+
require.NoError(t, err)
62+
client := agentsdk.New(srvURL)
63+
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond)
64+
sendLog(agentsdk.StartupLog{
65+
Output: "hello",
66+
})
67+
gotLog := <-got
68+
require.Equal(t, "hello", gotLog.Output)
69+
err = closer.Close()
70+
require.NoError(t, err)
71+
})
72+
t.Run("RetryOnError", func(t *testing.T) {
73+
t.Parallel()
74+
got := make(chan agentsdk.StartupLog)
75+
first := true
76+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
77+
if first {
78+
w.WriteHeader(http.StatusInternalServerError)
79+
first = false
80+
return
81+
}
82+
req := agentsdk.PatchStartupLogs{}
83+
err := json.NewDecoder(r.Body).Decode(&req)
84+
require.NoError(t, err)
85+
for _, log := range req.Logs {
86+
got <- log
87+
}
88+
}))
89+
srvURL, err := url.Parse(srv.URL)
90+
require.NoError(t, err)
91+
client := agentsdk.New(srvURL)
92+
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond)
93+
sendLog(agentsdk.StartupLog{
94+
Output: "hello",
95+
})
96+
gotLog := <-got
97+
require.Equal(t, "hello", gotLog.Output)
98+
err = closer.Close()
99+
require.NoError(t, err)
100+
})
101+
}

0 commit comments

Comments
 (0)