From 95839109db095356ad75e36bc32771717791e5ff Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Mon, 22 May 2023 16:28:00 +0000 Subject: [PATCH] feat: add agentsdk function for sending logs This enables external API consumers to easily send logs in order and with failure tolerance. --- codersdk/agentsdk/agentsdk.go | 104 +++++++++++++++++++++++++++++ codersdk/agentsdk/agentsdk_test.go | 101 ++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 codersdk/agentsdk/agentsdk_test.go diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index 0a5d30c0f1d9a..b6208f68074ea 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -10,6 +10,7 @@ import ( "net/http/cookiejar" "net/url" "strconv" + "sync" "time" "cloud.google.com/go/compute/metadata" @@ -585,6 +586,109 @@ func (c *Client) PatchStartupLogs(ctx context.Context, req PatchStartupLogs) err return nil } +// QueueStartupLogs debounces log messages at an interval and bulk-writes them using the patch method. +// No requests are made immediately when calling this function, all happened async. +// +// This function is used by API consumers to send startup logs to the server. +func (c *Client) QueueStartupLogs(ctx context.Context, debounce time.Duration) (func(log StartupLog), io.Closer) { + if debounce == 0 { + debounce = 250 * time.Millisecond + } + logChan := make(chan StartupLog, 100) + closeChan := make(chan struct{}) + closed := make(chan struct{}) + closeMutex := sync.Mutex{} + + var lastSendErr error + sendLogsWithRetry := func(logs []StartupLog) { + // We don't want to send logs indefinitely, as idle services could + // be spamming a coderd instance. Instead, we extend a lengthy timeout + // to allow for intermittent service disruption. + ctx, cancelFunc := context.WithTimeout(ctx, 15*time.Minute) + defer cancelFunc() + for r := retry.New(250*time.Millisecond, 5*time.Second); r.Wait(ctx); { + err := c.PatchStartupLogs(ctx, PatchStartupLogs{ + Logs: logs, + }) + if err == nil { + lastSendErr = nil + return + } + var sdkErr *codersdk.Error + if xerrors.As(err, &sdkErr) { + lastSendErr = sdkErr + if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { + c.SDK.Logger.Warn(ctx, "startup logs too large, dropping logs", slog.F("count", len(logs))) + break + } + if sdkErr.StatusCode() == http.StatusUnauthorized { + continue + } + } + c.SDK.Logger.Warn(ctx, "upload startup logs failed", slog.Error(err), slog.F("count", len(logs))) + } + if ctx.Err() != nil { + c.SDK.Logger.Warn(ctx, "startup logs failed to send in time", slog.Error(lastSendErr), slog.F("count", len(logs))) + } + } + + go func() { + defer close(closed) + queuedLogs := make([]StartupLog, 0, 100) + for { + select { + case <-time.After(debounce): + if len(queuedLogs) == 0 { + break + } + sendLogsWithRetry(queuedLogs) + queuedLogs = nil + case log := <-logChan: + queuedLogs = append(queuedLogs, log) + if len(queuedLogs) < 100 { + break + } + sendLogsWithRetry(queuedLogs) + queuedLogs = nil + case <-closeChan: + close(logChan) + for log := range logChan { + queuedLogs = append(queuedLogs, log) + } + if len(queuedLogs) > 0 { + sendLogsWithRetry(queuedLogs) + } + return + case <-ctx.Done(): + // Return immediately without sending, because we can't + // send requests with a closed context. + return + } + } + }() + return func(log StartupLog) { + closeMutex.Lock() + defer closeMutex.Unlock() + select { + case <-closed: + return + case logChan <- log: + case <-ctx.Done(): + } + }, closeFunc(func() error { + closeMutex.Lock() + defer closeMutex.Unlock() + select { + case <-closed: + return nil + default: + } + close(closeChan) + <-closed + return lastSendErr + }) +} + type GitAuthResponse struct { Username string `json:"username"` Password string `json:"password"` diff --git a/codersdk/agentsdk/agentsdk_test.go b/codersdk/agentsdk/agentsdk_test.go new file mode 100644 index 0000000000000..89cb0a0a9bfe8 --- /dev/null +++ b/codersdk/agentsdk/agentsdk_test.go @@ -0,0 +1,101 @@ +package agentsdk_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/coder/coder/codersdk" + "github.com/coder/coder/codersdk/agentsdk" +) + +func TestQueueStartupLogs(t *testing.T) { + t.Parallel() + t.Run("Spam", func(t *testing.T) { + t.Parallel() + lastLog := 0 + totalLogs := 1000 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req := agentsdk.PatchStartupLogs{} + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + for _, log := range req.Logs { + require.Equal(t, strconv.Itoa(lastLog), log.Output) + lastLog++ + } + })) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := agentsdk.New(srvURL) + sendLog, closer := client.QueueStartupLogs(context.Background(), 0) + for i := 0; i < totalLogs; i++ { + sendLog(agentsdk.StartupLog{ + CreatedAt: time.Now(), + Output: strconv.Itoa(i), + Level: codersdk.LogLevelInfo, + }) + } + err = closer.Close() + require.NoError(t, err) + require.Equal(t, totalLogs, lastLog) + }) + t.Run("Debounce", func(t *testing.T) { + t.Parallel() + got := make(chan agentsdk.StartupLog) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req := agentsdk.PatchStartupLogs{} + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + for _, log := range req.Logs { + got <- log + } + })) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := agentsdk.New(srvURL) + sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond) + sendLog(agentsdk.StartupLog{ + Output: "hello", + }) + gotLog := <-got + require.Equal(t, "hello", gotLog.Output) + err = closer.Close() + require.NoError(t, err) + }) + t.Run("RetryOnError", func(t *testing.T) { + t.Parallel() + got := make(chan agentsdk.StartupLog) + first := true + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if first { + w.WriteHeader(http.StatusInternalServerError) + first = false + return + } + req := agentsdk.PatchStartupLogs{} + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + for _, log := range req.Logs { + got <- log + } + })) + srvURL, err := url.Parse(srv.URL) + require.NoError(t, err) + client := agentsdk.New(srvURL) + sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond) + sendLog(agentsdk.StartupLog{ + Output: "hello", + }) + gotLog := <-got + require.Equal(t, "hello", gotLog.Output) + err = closer.Close() + require.NoError(t, err) + }) +}