Skip to content

feat: add agentsdk function for sending logs #7629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions codersdk/agentsdk/agentsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/cookiejar"
"net/url"
"strconv"
"sync"
"time"

"cloud.google.com/go/compute/metadata"
Expand Down Expand Up @@ -585,6 +586,109 @@ func (c *Client) PatchStartupLogs(ctx context.Context, req PatchStartupLogs) err
return nil
}

// QueueStartupLogs debounces log messages at an interval and bulk-writes them using the patch method.
// No requests are made immediately when calling this function, all happened async.
//
// This function is used by API consumers to send startup logs to the server.
func (c *Client) QueueStartupLogs(ctx context.Context, debounce time.Duration) (func(log StartupLog), io.Closer) {
if debounce == 0 {
debounce = 250 * time.Millisecond
}
logChan := make(chan StartupLog, 100)
closeChan := make(chan struct{})
closed := make(chan struct{})
closeMutex := sync.Mutex{}

var lastSendErr error
sendLogsWithRetry := func(logs []StartupLog) {
// We don't want to send logs indefinitely, as idle services could
// be spamming a coderd instance. Instead, we extend a lengthy timeout
// to allow for intermittent service disruption.
ctx, cancelFunc := context.WithTimeout(ctx, 15*time.Minute)
defer cancelFunc()
for r := retry.New(250*time.Millisecond, 5*time.Second); r.Wait(ctx); {
err := c.PatchStartupLogs(ctx, PatchStartupLogs{
Logs: logs,
})
if err == nil {
lastSendErr = nil
return
}
var sdkErr *codersdk.Error
if xerrors.As(err, &sdkErr) {
lastSendErr = sdkErr
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge {
c.SDK.Logger.Warn(ctx, "startup logs too large, dropping logs", slog.F("count", len(logs)))
break
}
if sdkErr.StatusCode() == http.StatusUnauthorized {
continue
}
}
c.SDK.Logger.Warn(ctx, "upload startup logs failed", slog.Error(err), slog.F("count", len(logs)))
}
if ctx.Err() != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be captured and shared, i.e. if lastSendErr == nil { lastSendErr = ctx.Err(); } or some such.

c.SDK.Logger.Warn(ctx, "startup logs failed to send in time", slog.Error(lastSendErr), slog.F("count", len(logs)))
}
}

go func() {
defer close(closed)
queuedLogs := make([]StartupLog, 0, 100)
for {
select {
case <-time.After(debounce):
Copy link
Member

@mafredri mafredri Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Consider a timer vs time.After for conserving resources/cleaner shutdown. Let's say we log 1000 lines in 100ms, we will have ~1000 of these timers.

I also wonder if we should avoid starting a new timer because 100 log entries at a rate of 249ms/line will take ~25s to be sent to the server.

if len(queuedLogs) == 0 {
break
}
sendLogsWithRetry(queuedLogs)
queuedLogs = nil
case log := <-logChan:
queuedLogs = append(queuedLogs, log)
if len(queuedLogs) < 100 {
break
}
sendLogsWithRetry(queuedLogs)
queuedLogs = nil
case <-closeChan:
close(logChan)
Copy link
Member

@mafredri mafredri Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The func(log StartupLog) may panic after this since it can try to send on the closed channel. It should be left open.

https://go.dev/play/p/-qzJ3Wo1TYp (Try running a few times.)

for log := range logChan {
queuedLogs = append(queuedLogs, log)
}
if len(queuedLogs) > 0 {
sendLogsWithRetry(queuedLogs)
}
return
case <-ctx.Done():
// Return immediately without sending, because we can't
// send requests with a closed context.
return
}
}
}()
return func(log StartupLog) {
closeMutex.Lock()
defer closeMutex.Unlock()
select {
case <-closed:
return
case logChan <- log:
case <-ctx.Done():
}
}, closeFunc(func() error {
closeMutex.Lock()
defer closeMutex.Unlock()
select {
case <-closed:
return nil
default:
}
close(closeChan)
<-closed
return lastSendErr
})
}

type GitAuthResponse struct {
Username string `json:"username"`
Password string `json:"password"`
Expand Down
101 changes: 101 additions & 0 deletions codersdk/agentsdk/agentsdk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package agentsdk_test

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/coder/coder/codersdk"
"github.com/coder/coder/codersdk/agentsdk"
)

func TestQueueStartupLogs(t *testing.T) {
t.Parallel()
t.Run("Spam", func(t *testing.T) {
t.Parallel()
lastLog := 0
totalLogs := 1000
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req := agentsdk.PatchStartupLogs{}
err := json.NewDecoder(r.Body).Decode(&req)
require.NoError(t, err)
for _, log := range req.Logs {
require.Equal(t, strconv.Itoa(lastLog), log.Output)
lastLog++
}
}))
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := agentsdk.New(srvURL)
sendLog, closer := client.QueueStartupLogs(context.Background(), 0)
for i := 0; i < totalLogs; i++ {
sendLog(agentsdk.StartupLog{
CreatedAt: time.Now(),
Output: strconv.Itoa(i),
Level: codersdk.LogLevelInfo,
})
}
err = closer.Close()
require.NoError(t, err)
require.Equal(t, totalLogs, lastLog)
})
t.Run("Debounce", func(t *testing.T) {
t.Parallel()
got := make(chan agentsdk.StartupLog)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req := agentsdk.PatchStartupLogs{}
err := json.NewDecoder(r.Body).Decode(&req)
require.NoError(t, err)
for _, log := range req.Logs {
got <- log
}
}))
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := agentsdk.New(srvURL)
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond)
sendLog(agentsdk.StartupLog{
Output: "hello",
})
gotLog := <-got
require.Equal(t, "hello", gotLog.Output)
err = closer.Close()
require.NoError(t, err)
})
t.Run("RetryOnError", func(t *testing.T) {
t.Parallel()
got := make(chan agentsdk.StartupLog)
first := true
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if first {
w.WriteHeader(http.StatusInternalServerError)
first = false
return
}
req := agentsdk.PatchStartupLogs{}
err := json.NewDecoder(r.Body).Decode(&req)
require.NoError(t, err)
for _, log := range req.Logs {
got <- log
}
}))
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
client := agentsdk.New(srvURL)
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond)
sendLog(agentsdk.StartupLog{
Output: "hello",
})
gotLog := <-got
require.Equal(t, "hello", gotLog.Output)
err = closer.Close()
require.NoError(t, err)
})
}