-
Notifications
You must be signed in to change notification settings - Fork 887
feat: add agentsdk function for sending logs #7629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ import ( | |
"net/http/cookiejar" | ||
"net/url" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"cloud.google.com/go/compute/metadata" | ||
|
@@ -585,6 +586,109 @@ func (c *Client) PatchStartupLogs(ctx context.Context, req PatchStartupLogs) err | |
return nil | ||
} | ||
|
||
// QueueStartupLogs debounces log messages at an interval and bulk-writes them using the patch method. | ||
// No requests are made immediately when calling this function, all happened async. | ||
// | ||
// This function is used by API consumers to send startup logs to the server. | ||
func (c *Client) QueueStartupLogs(ctx context.Context, debounce time.Duration) (func(log StartupLog), io.Closer) { | ||
if debounce == 0 { | ||
debounce = 250 * time.Millisecond | ||
} | ||
logChan := make(chan StartupLog, 100) | ||
closeChan := make(chan struct{}) | ||
closed := make(chan struct{}) | ||
closeMutex := sync.Mutex{} | ||
|
||
var lastSendErr error | ||
sendLogsWithRetry := func(logs []StartupLog) { | ||
// We don't want to send logs indefinitely, as idle services could | ||
// be spamming a coderd instance. Instead, we extend a lengthy timeout | ||
// to allow for intermittent service disruption. | ||
ctx, cancelFunc := context.WithTimeout(ctx, 15*time.Minute) | ||
defer cancelFunc() | ||
for r := retry.New(250*time.Millisecond, 5*time.Second); r.Wait(ctx); { | ||
err := c.PatchStartupLogs(ctx, PatchStartupLogs{ | ||
Logs: logs, | ||
}) | ||
if err == nil { | ||
lastSendErr = nil | ||
return | ||
} | ||
var sdkErr *codersdk.Error | ||
if xerrors.As(err, &sdkErr) { | ||
lastSendErr = sdkErr | ||
if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { | ||
c.SDK.Logger.Warn(ctx, "startup logs too large, dropping logs", slog.F("count", len(logs))) | ||
break | ||
} | ||
if sdkErr.StatusCode() == http.StatusUnauthorized { | ||
continue | ||
} | ||
} | ||
c.SDK.Logger.Warn(ctx, "upload startup logs failed", slog.Error(err), slog.F("count", len(logs))) | ||
} | ||
if ctx.Err() != nil { | ||
c.SDK.Logger.Warn(ctx, "startup logs failed to send in time", slog.Error(lastSendErr), slog.F("count", len(logs))) | ||
} | ||
} | ||
|
||
go func() { | ||
defer close(closed) | ||
queuedLogs := make([]StartupLog, 0, 100) | ||
for { | ||
select { | ||
case <-time.After(debounce): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: Consider a timer vs I also wonder if we should avoid starting a new timer because 100 log entries at a rate of 249ms/line will take ~25s to be sent to the server. |
||
if len(queuedLogs) == 0 { | ||
break | ||
} | ||
sendLogsWithRetry(queuedLogs) | ||
queuedLogs = nil | ||
case log := <-logChan: | ||
queuedLogs = append(queuedLogs, log) | ||
if len(queuedLogs) < 100 { | ||
break | ||
} | ||
sendLogsWithRetry(queuedLogs) | ||
queuedLogs = nil | ||
case <-closeChan: | ||
close(logChan) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The https://go.dev/play/p/-qzJ3Wo1TYp (Try running a few times.) |
||
for log := range logChan { | ||
queuedLogs = append(queuedLogs, log) | ||
} | ||
if len(queuedLogs) > 0 { | ||
sendLogsWithRetry(queuedLogs) | ||
} | ||
return | ||
case <-ctx.Done(): | ||
// Return immediately without sending, because we can't | ||
// send requests with a closed context. | ||
return | ||
} | ||
} | ||
}() | ||
return func(log StartupLog) { | ||
closeMutex.Lock() | ||
defer closeMutex.Unlock() | ||
select { | ||
case <-closed: | ||
return | ||
case logChan <- log: | ||
case <-ctx.Done(): | ||
} | ||
}, closeFunc(func() error { | ||
closeMutex.Lock() | ||
defer closeMutex.Unlock() | ||
select { | ||
case <-closed: | ||
return nil | ||
default: | ||
} | ||
close(closeChan) | ||
<-closed | ||
return lastSendErr | ||
}) | ||
} | ||
|
||
type GitAuthResponse struct { | ||
Username string `json:"username"` | ||
Password string `json:"password"` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package agentsdk_test | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"net/http" | ||
"net/http/httptest" | ||
"net/url" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/coder/coder/codersdk" | ||
"github.com/coder/coder/codersdk/agentsdk" | ||
) | ||
|
||
func TestQueueStartupLogs(t *testing.T) { | ||
t.Parallel() | ||
t.Run("Spam", func(t *testing.T) { | ||
t.Parallel() | ||
lastLog := 0 | ||
totalLogs := 1000 | ||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
req := agentsdk.PatchStartupLogs{} | ||
err := json.NewDecoder(r.Body).Decode(&req) | ||
require.NoError(t, err) | ||
for _, log := range req.Logs { | ||
require.Equal(t, strconv.Itoa(lastLog), log.Output) | ||
lastLog++ | ||
} | ||
})) | ||
srvURL, err := url.Parse(srv.URL) | ||
require.NoError(t, err) | ||
client := agentsdk.New(srvURL) | ||
sendLog, closer := client.QueueStartupLogs(context.Background(), 0) | ||
for i := 0; i < totalLogs; i++ { | ||
sendLog(agentsdk.StartupLog{ | ||
CreatedAt: time.Now(), | ||
Output: strconv.Itoa(i), | ||
Level: codersdk.LogLevelInfo, | ||
}) | ||
} | ||
err = closer.Close() | ||
require.NoError(t, err) | ||
require.Equal(t, totalLogs, lastLog) | ||
}) | ||
t.Run("Debounce", func(t *testing.T) { | ||
t.Parallel() | ||
got := make(chan agentsdk.StartupLog) | ||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
req := agentsdk.PatchStartupLogs{} | ||
err := json.NewDecoder(r.Body).Decode(&req) | ||
require.NoError(t, err) | ||
for _, log := range req.Logs { | ||
got <- log | ||
} | ||
})) | ||
srvURL, err := url.Parse(srv.URL) | ||
require.NoError(t, err) | ||
client := agentsdk.New(srvURL) | ||
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond) | ||
sendLog(agentsdk.StartupLog{ | ||
Output: "hello", | ||
}) | ||
gotLog := <-got | ||
require.Equal(t, "hello", gotLog.Output) | ||
err = closer.Close() | ||
require.NoError(t, err) | ||
}) | ||
t.Run("RetryOnError", func(t *testing.T) { | ||
t.Parallel() | ||
got := make(chan agentsdk.StartupLog) | ||
first := true | ||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if first { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
first = false | ||
return | ||
} | ||
req := agentsdk.PatchStartupLogs{} | ||
err := json.NewDecoder(r.Body).Decode(&req) | ||
require.NoError(t, err) | ||
for _, log := range req.Logs { | ||
got <- log | ||
} | ||
})) | ||
srvURL, err := url.Parse(srv.URL) | ||
require.NoError(t, err) | ||
client := agentsdk.New(srvURL) | ||
sendLog, closer := client.QueueStartupLogs(context.Background(), time.Millisecond) | ||
sendLog(agentsdk.StartupLog{ | ||
Output: "hello", | ||
}) | ||
gotLog := <-got | ||
require.Equal(t, "hello", gotLog.Output) | ||
err = closer.Close() | ||
require.NoError(t, err) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be captured and shared, i.e.
if lastSendErr == nil { lastSendErr = ctx.Err(); }
or some such.