Skip to content

Commit 0c4d2c3

Browse files
committed
Start creating log sending loop
1 parent b86c400 commit 0c4d2c3

22 files changed

+540
-220
lines changed

agent/agent.go

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"sync"
2626
"time"
2727

28-
"github.com/ammario/prefixsuffix"
2928
"github.com/armon/circbuf"
3029
"github.com/gliderlabs/ssh"
3130
"github.com/google/uuid"
@@ -202,6 +201,19 @@ func (a *agent) runLoop(ctx context.Context) {
202201
}
203202
}
204203

204+
func (a *agent) appendStartupLogsLoop(ctx context.Context, logs []agentsdk.StartupLog) {
205+
for r := retry.New(time.Second, 15*time.Second); r.Wait(ctx); {
206+
err := a.client.AppendStartupLogs(ctx, logs)
207+
if err == nil {
208+
return
209+
}
210+
if errors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
211+
return
212+
}
213+
a.logger.Error(ctx, "failed to append startup logs", slog.Error(err))
214+
}
215+
}
216+
205217
// reportLifecycleLoop reports the current lifecycle state once.
206218
// Only the latest state is reported, intermediate states may be
207219
// lost if the agent can't communicate with the API.
@@ -649,9 +661,55 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) error {
649661
_ = fileWriter.Close()
650662
}()
651663

652-
saver := &prefixsuffix.Saver{N: 512 << 10}
653-
writer := io.MultiWriter(saver, fileWriter)
664+
startupLogsReader, startupLogsWriter := io.Pipe()
665+
writer := io.MultiWriter(startupLogsWriter, fileWriter)
666+
667+
queuedLogs := make([]agentsdk.StartupLog, 0)
668+
var flushLogsTimer *time.Timer
669+
var logMutex sync.Mutex
670+
flushQueuedLogs := func() {
671+
logMutex.Lock()
672+
if flushLogsTimer != nil {
673+
flushLogsTimer.Stop()
674+
}
675+
toSend := make([]agentsdk.StartupLog, len(queuedLogs))
676+
copy(toSend, queuedLogs)
677+
logMutex.Unlock()
678+
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
679+
err := a.client.AppendStartupLogs(ctx, toSend)
680+
if err == nil {
681+
break
682+
}
683+
a.logger.Error(ctx, "upload startup logs", slog.Error(err))
684+
}
685+
if ctx.Err() != nil {
686+
return
687+
}
688+
logMutex.Lock()
689+
queuedLogs = queuedLogs[len(toSend):]
690+
logMutex.Unlock()
691+
}
692+
queueLog := func(log agentsdk.StartupLog) {
693+
logMutex.Lock()
694+
defer logMutex.Unlock()
695+
queuedLogs = append(queuedLogs, log)
696+
if flushLogsTimer != nil {
697+
flushLogsTimer.Reset(100 * time.Millisecond)
698+
return
699+
}
700+
if len(queuedLogs) > 100 {
701+
go flushQueuedLogs()
702+
return
703+
}
704+
}
705+
go func() {
706+
scanner := bufio.NewScanner(startupLogsReader)
707+
for scanner.Scan() {
708+
709+
}
710+
}()
654711
defer func() {
712+
655713
// err := a.client.AppendStartupLogs(ctx, agentsdk.InsertOrUpdateStartupLogsRequest{
656714
// Output: string(saver.Bytes()),
657715
// })

agent/agent_test.go

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ import (
3131
"github.com/stretchr/testify/require"
3232
"go.uber.org/goleak"
3333
"golang.org/x/crypto/ssh"
34-
"golang.org/x/text/encoding/unicode"
35-
"golang.org/x/text/transform"
3634
"golang.org/x/xerrors"
3735
"tailscale.com/net/speedtest"
3836
"tailscale.com/tailcfg"
@@ -744,33 +742,15 @@ func TestAgent_StartupScript(t *testing.T) {
744742
}
745743
content := "output\n"
746744
//nolint:dogsled
747-
_, client, _, fs, _ := setupAgent(t, agentsdk.Metadata{
745+
_, client, _, _, _ := setupAgent(t, agentsdk.Metadata{
748746
StartupScript: "echo " + content,
749747
}, 0)
750-
var gotContent string
751-
require.Eventually(t, func() bool {
752-
outputPath := filepath.Join(os.TempDir(), "coder-startup-script.log")
753-
content, err := afero.ReadFile(fs, outputPath)
754-
if err != nil {
755-
t.Logf("read file %q: %s", outputPath, err)
756-
return false
757-
}
758-
if len(content) == 0 {
759-
t.Logf("no content in %q", outputPath)
760-
return false
761-
}
762-
if runtime.GOOS == "windows" {
763-
// Windows uses UTF16! 🪟🪟🪟
764-
content, _, err = transform.Bytes(unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewDecoder(), content)
765-
if !assert.NoError(t, err) {
766-
return false
767-
}
768-
}
769-
gotContent = string(content)
770-
return true
748+
assert.Eventually(t, func() bool {
749+
got := client.getLifecycleStates()
750+
return len(got) > 0 && got[len(got)-1] == codersdk.WorkspaceAgentLifecycleReady
771751
}, testutil.WaitShort, testutil.IntervalMedium)
772-
require.Equal(t, content, gotContent)
773-
require.Equal(t, content, client.getLogs())
752+
753+
require.Len(t, client.getStartupLogs(), 1)
774754
}
775755

776756
func TestAgent_Lifecycle(t *testing.T) {
@@ -1500,7 +1480,7 @@ type client struct {
15001480
mu sync.Mutex // Protects following.
15011481
lifecycleStates []codersdk.WorkspaceAgentLifecycle
15021482
startup agentsdk.PostStartupRequest
1503-
logs agentsdk.InsertOrUpdateStartupLogsRequest
1483+
logs []agentsdk.StartupLog
15041484
}
15051485

15061486
func (c *client) Metadata(_ context.Context) (agentsdk.Metadata, error) {
@@ -1585,16 +1565,16 @@ func (c *client) PostStartup(_ context.Context, startup agentsdk.PostStartupRequ
15851565
return nil
15861566
}
15871567

1588-
func (c *client) getLogs() string {
1568+
func (c *client) getStartupLogs() []agentsdk.StartupLog {
15891569
c.mu.Lock()
15901570
defer c.mu.Unlock()
1591-
return c.logs.Output
1571+
return c.logs
15921572
}
15931573

1594-
func (c *client) InsertOrUpdateStartupLogs(_ context.Context, logs agentsdk.InsertOrUpdateStartupLogsRequest) error {
1574+
func (c *client) AppendStartupLogs(_ context.Context, logs []agentsdk.StartupLog) error {
15951575
c.mu.Lock()
15961576
defer c.mu.Unlock()
1597-
c.logs = logs
1577+
c.logs = append(c.logs, logs...)
15981578
return nil
15991579
}
16001580

coderd/apidoc/docs.go

Lines changed: 59 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 53 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -587,10 +587,8 @@ func New(options *Options) *API {
587587
r.Route("/me", func(r chi.Router) {
588588
r.Use(httpmw.ExtractWorkspaceAgent(options.Database))
589589
r.Get("/metadata", api.workspaceAgentMetadata)
590-
r.Route("/startup", func(r chi.Router) {
591-
r.Post("/", api.postWorkspaceAgentStartup)
592-
r.Patch("/logs", api.patchWorkspaceAgentStartupLogs)
593-
})
590+
r.Post("/startup", api.postWorkspaceAgentStartup)
591+
r.Patch("/startup-logs", api.patchWorkspaceAgentStartupLogs)
594592
r.Post("/app-health", api.postWorkspaceAppHealth)
595593
r.Get("/gitauth", api.workspaceAgentsGitAuth)
596594
r.Get("/gitsshkey", api.agentGitSSHKey)
@@ -606,6 +604,7 @@ func New(options *Options) *API {
606604
)
607605
r.Get("/", api.workspaceAgent)
608606
r.Get("/pty", api.workspaceAgentPTY)
607+
r.Get("/startup-logs", api.workspaceAgentStartupLogs)
609608
r.Get("/listening-ports", api.workspaceAgentListeningPorts)
610609
r.Get("/connection", api.workspaceAgentConnection)
611610
r.Get("/coordinate", api.workspaceAgentClientCoordinate)
@@ -648,7 +647,6 @@ func New(options *Options) *API {
648647
r.Get("/parameters", api.workspaceBuildParameters)
649648
r.Get("/resources", api.workspaceBuildResources)
650649
r.Get("/state", api.workspaceBuildState)
651-
r.Get("/startup-script-logs", api.startupScriptLogs)
652650
})
653651
r.Route("/authcheck", func(r chi.Router) {
654652
r.Use(apiKeyMiddleware)

coderd/database/dbauthz/querier.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,21 +263,21 @@ func (q *querier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (data
263263
return job, nil
264264
}
265265

266-
func (q *querier) GetProvisionerLogsByIDBetween(ctx context.Context, arg database.GetProvisionerLogsByIDBetweenParams) ([]database.ProvisionerJobLog, error) {
266+
func (q *querier) GetProvisionerLogsAfterID(ctx context.Context, arg database.GetProvisionerLogsAfterIDParams) ([]database.ProvisionerJobLog, error) {
267267
// Authorized read on job lets the actor also read the logs.
268268
_, err := q.GetProvisionerJobByID(ctx, arg.JobID)
269269
if err != nil {
270270
return nil, err
271271
}
272-
return q.db.GetProvisionerLogsByIDBetween(ctx, arg)
272+
return q.db.GetProvisionerLogsAfterID(ctx, arg)
273273
}
274274

275-
func (q *querier) GetWorkspaceAgentStartupLogsBetween(ctx context.Context, arg database.GetWorkspaceAgentStartupLogsBetweenParams) ([]database.WorkspaceAgentStartupLog, error) {
275+
func (q *querier) GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg database.GetWorkspaceAgentStartupLogsAfterParams) ([]database.WorkspaceAgentStartupLog, error) {
276276
_, err := q.GetWorkspaceAgentByID(ctx, arg.AgentID)
277277
if err != nil {
278278
return nil, err
279279
}
280-
return q.db.GetWorkspaceAgentStartupLogsBetween(ctx, arg)
280+
return q.db.GetWorkspaceAgentStartupLogsAfter(ctx, arg)
281281
}
282282

283283
func (q *querier) GetLicenses(ctx context.Context) ([]database.License, error) {

0 commit comments

Comments
 (0)