From 512f40464e7dfa60131573406fad3b0a129d1883 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Feb 2024 13:54:26 +0000 Subject: [PATCH 1/3] concurrent fetch --- support/support.go | 250 ++++++++++++++++++++++++++++++++------------- 1 file changed, 178 insertions(+), 72 deletions(-) diff --git a/support/support.go b/support/support.go index 58c9f332298ac..104d1bf51f209 100644 --- a/support/support.go +++ b/support/support.go @@ -5,6 +5,7 @@ import ( "io" "net/http" "strings" + "sync" "golang.org/x/xerrors" @@ -61,84 +62,141 @@ type Deps struct { } func DeploymentInfo(ctx context.Context, client *codersdk.Client, log slog.Logger) Deployment { - var d Deployment - - bi, err := client.BuildInfo(ctx) - if err != nil { - log.Error(ctx, "fetch build info", slog.Error(err)) - } else { + var ( + d Deployment + m sync.Mutex + wg sync.WaitGroup + ) + + wg.Add(1) + go func() { + defer wg.Done() + bi, err := client.BuildInfo(ctx) + if err != nil { + log.Error(ctx, "fetch build info", slog.Error(err)) + return + } + m.Lock() d.BuildInfo = &bi - } + m.Unlock() + }() - dc, err := client.DeploymentConfig(ctx) - if err != nil { - log.Error(ctx, "fetch deployment config", slog.Error(err)) - } else { + wg.Add(1) + go func() { + defer wg.Done() + dc, err := client.DeploymentConfig(ctx) + if err != nil { + log.Error(ctx, "fetch deployment config", slog.Error(err)) + return + } + m.Lock() d.Config = dc - } + m.Unlock() + }() - hr, err := client.DebugHealth(ctx) - if err != nil { - log.Error(ctx, "fetch health report", slog.Error(err)) - } else { + wg.Add(1) + go func() { + defer wg.Done() + hr, err := client.DebugHealth(ctx) + if err != nil { + log.Error(ctx, "fetch health report", slog.Error(err)) + return + } + m.Lock() d.HealthReport = &hr - } + m.Unlock() + }() - exp, err := client.Experiments(ctx) - if err != nil { - log.Error(ctx, "fetch experiments", slog.Error(err)) - } else { + wg.Add(1) + go func() { + defer wg.Done() + exp, err := client.Experiments(ctx) + if err != nil { + log.Error(ctx, "fetch experiments", slog.Error(err)) + return + } + m.Lock() d.Experiments = exp - } + m.Unlock() + }() + wg.Wait() return d } func NetworkInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, agentID uuid.UUID) Network { - var n Network - - coordResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/coordinator", nil) - if err != nil { - log.Error(ctx, "fetch coordinator debug page", slog.Error(err)) - } else { + var ( + n Network + m sync.Mutex + wg sync.WaitGroup + ) + + wg.Add(1) + go func() { + defer wg.Done() + coordResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/coordinator", nil) + if err != nil { + log.Error(ctx, "fetch coordinator debug page", slog.Error(err)) + return + } defer coordResp.Body.Close() bs, err := io.ReadAll(coordResp.Body) if err != nil { log.Error(ctx, "read coordinator debug page", slog.Error(err)) - } else { - n.CoordinatorDebug = string(bs) + return } - } + m.Lock() + n.CoordinatorDebug = string(bs) + m.Unlock() + }() - tailResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/tailnet", nil) - if err != nil { - log.Error(ctx, "fetch tailnet debug page", slog.Error(err)) - } else { + wg.Add(1) + go func() { + defer wg.Done() + tailResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/tailnet", nil) + if err != nil { + log.Error(ctx, "fetch tailnet debug page", slog.Error(err)) + return + } defer tailResp.Body.Close() bs, err := io.ReadAll(tailResp.Body) if err != nil { log.Error(ctx, "read tailnet debug page", slog.Error(err)) - } else { - n.TailnetDebug = string(bs) + return } - } + m.Lock() + n.TailnetDebug = string(bs) + m.Unlock() + }() - if agentID != uuid.Nil { + wg.Add(1) + go func() { + defer wg.Done() + if agentID == uuid.Nil { + log.Warn(ctx, "agent id required for agent connection info") + return + } connInfo, err := client.WorkspaceAgentConnectionInfo(ctx, agentID) if err != nil { log.Error(ctx, "fetch agent conn info", slog.Error(err), slog.F("agent_id", agentID.String())) - } else { - n.NetcheckLocal = &connInfo + return } - } else { - log.Warn(ctx, "agent id required for agent connection info") - } + m.Lock() + n.NetcheckLocal = &connInfo + m.Unlock() + }() + + wg.Wait() return n } func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, workspaceID, agentID uuid.UUID) Workspace { - var w Workspace + var ( + w Workspace + m sync.Mutex + wg sync.WaitGroup + ) if workspaceID == uuid.Nil { log.Error(ctx, "no workspace id specified") @@ -149,44 +207,66 @@ func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger log.Error(ctx, "no agent id specified") } + // dependency, cannot fetch concurrently ws, err := client.Workspace(ctx, workspaceID) if err != nil { log.Error(ctx, "fetch workspace", slog.Error(err), slog.F("workspace_id", workspaceID)) return w } - - agt, err := client.WorkspaceAgent(ctx, agentID) - if err != nil { - log.Error(ctx, "fetch workspace agent", slog.Error(err), slog.F("agent_id", agentID)) - } - w.Workspace = ws - w.Agent = agt - buildLogCh, closer, err := client.WorkspaceBuildLogsAfter(ctx, ws.LatestBuild.ID, 0) - if err != nil { - log.Error(ctx, "fetch provisioner job logs", slog.Error(err), slog.F("job_id", ws.LatestBuild.Job.ID.String())) - } else { + wg.Add(1) + go func() { + defer wg.Done() + agt, err := client.WorkspaceAgent(ctx, agentID) + if err != nil { + log.Error(ctx, "fetch workspace agent", slog.Error(err), slog.F("agent_id", agentID)) + } + m.Lock() + w.Agent = agt + m.Unlock() + }() + + wg.Add(1) + go func() { + defer wg.Done() + buildLogCh, closer, err := client.WorkspaceBuildLogsAfter(ctx, ws.LatestBuild.ID, 0) + if err != nil { + log.Error(ctx, "fetch provisioner job logs", slog.Error(err), slog.F("job_id", ws.LatestBuild.Job.ID.String())) + return + } defer closer.Close() + var logs []codersdk.ProvisionerJobLog for log := range buildLogCh { - w.BuildLogs = append(w.BuildLogs, log) + logs = append(w.BuildLogs, log) } - } - - if len(w.Workspace.LatestBuild.Resources) == 0 { - log.Warn(ctx, "workspace build has no resources") - return w - } + m.Lock() + w.BuildLogs = logs + m.Unlock() + }() - agentLogCh, closer, err := client.WorkspaceAgentLogsAfter(ctx, agentID, 0, false) - if err != nil { - log.Error(ctx, "fetch agent startup logs", slog.Error(err), slog.F("agent_id", agentID.String())) - } else { + wg.Add(1) + go func() { + defer wg.Done() + if len(w.Workspace.LatestBuild.Resources) == 0 { + log.Warn(ctx, "workspace build has no resources") + return + } + agentLogCh, closer, err := client.WorkspaceAgentLogsAfter(ctx, agentID, 0, false) + if err != nil { + log.Error(ctx, "fetch agent startup logs", slog.Error(err), slog.F("agent_id", agentID.String())) + } defer closer.Close() + var logs []codersdk.WorkspaceAgentLog for logChunk := range agentLogCh { - w.AgentStartupLogs = append(w.AgentStartupLogs, logChunk...) + logs = append(w.AgentStartupLogs, logChunk...) } - } + m.Lock() + w.AgentStartupLogs = logs + m.Unlock() + }() + + wg.Wait() return w } @@ -225,9 +305,35 @@ func Run(ctx context.Context, d *Deps) (*Bundle, error) { } } - b.Deployment = DeploymentInfo(ctx, d.Client, d.Log) - b.Workspace = WorkspaceInfo(ctx, d.Client, d.Log, d.WorkspaceID, d.AgentID) - b.Network = NetworkInfo(ctx, d.Client, d.Log, d.AgentID) + var ( + wg sync.WaitGroup + m sync.Mutex + ) + wg.Add(1) + go func() { + defer wg.Done() + di := DeploymentInfo(ctx, d.Client, d.Log) + m.Lock() + b.Deployment = di + m.Unlock() + }() + wg.Add(1) + go func() { + defer wg.Done() + wi := WorkspaceInfo(ctx, d.Client, d.Log, d.WorkspaceID, d.AgentID) + m.Lock() + b.Workspace = wi + m.Unlock() + }() + wg.Add(1) + go func() { + defer wg.Done() + ni := NetworkInfo(ctx, d.Client, d.Log, d.AgentID) + m.Lock() + b.Network = ni + m.Unlock() + }() + wg.Wait() return &b, nil } From b0cd05a3d95e9f419ba06910f340fc11fe0a424b Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 5 Mar 2024 14:19:18 +0000 Subject: [PATCH 2/3] apply suggestions from pr --- support/support.go | 166 +++++++++++++++++---------------------------- 1 file changed, 63 insertions(+), 103 deletions(-) diff --git a/support/support.go b/support/support.go index 104d1bf51f209..72485a01f4061 100644 --- a/support/support.go +++ b/support/support.go @@ -7,6 +7,7 @@ import ( "strings" "sync" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/google/uuid" @@ -62,141 +63,106 @@ type Deps struct { } func DeploymentInfo(ctx context.Context, client *codersdk.Client, log slog.Logger) Deployment { - var ( - d Deployment - m sync.Mutex - wg sync.WaitGroup - ) - - wg.Add(1) - go func() { - defer wg.Done() + // Note: each goroutine assigns to a different struct field, hence no mutex. + var d Deployment + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { bi, err := client.BuildInfo(ctx) if err != nil { - log.Error(ctx, "fetch build info", slog.Error(err)) - return + return xerrors.Errorf("fetch build info: %w", err) } - m.Lock() d.BuildInfo = &bi - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { dc, err := client.DeploymentConfig(ctx) if err != nil { - log.Error(ctx, "fetch deployment config", slog.Error(err)) - return + return xerrors.Errorf("fetch deployment config: %w", err) } - m.Lock() d.Config = dc - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { hr, err := client.DebugHealth(ctx) if err != nil { - log.Error(ctx, "fetch health report", slog.Error(err)) - return + return xerrors.Errorf("fetch health report: %w", err) } - m.Lock() d.HealthReport = &hr - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { exp, err := client.Experiments(ctx) if err != nil { - log.Error(ctx, "fetch experiments", slog.Error(err)) - return + return xerrors.Errorf("fetch experiments: %w", err) } - m.Lock() d.Experiments = exp - m.Unlock() - }() + return nil + }) + + if err := eg.Wait(); err != nil { + log.Error(ctx, "fetch deployment information", slog.Error(err)) + } - wg.Wait() return d } func NetworkInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, agentID uuid.UUID) Network { - var ( - n Network - m sync.Mutex - wg sync.WaitGroup - ) + var n Network - wg.Add(1) - go func() { - defer wg.Done() + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { coordResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/coordinator", nil) if err != nil { - log.Error(ctx, "fetch coordinator debug page", slog.Error(err)) - return + return xerrors.Errorf("fetch coordinator debug page: %w", err) } defer coordResp.Body.Close() bs, err := io.ReadAll(coordResp.Body) if err != nil { - log.Error(ctx, "read coordinator debug page", slog.Error(err)) - return + return xerrors.Errorf("read coordinator debug page: %w", err) } - m.Lock() n.CoordinatorDebug = string(bs) - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { tailResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/tailnet", nil) if err != nil { - log.Error(ctx, "fetch tailnet debug page", slog.Error(err)) - return + return xerrors.Errorf("fetch tailnet debug page: %w", err) } defer tailResp.Body.Close() bs, err := io.ReadAll(tailResp.Body) if err != nil { - log.Error(ctx, "read tailnet debug page", slog.Error(err)) - return + return xerrors.Errorf("read tailnet debug page: %w", err) } - m.Lock() n.TailnetDebug = string(bs) - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { if agentID == uuid.Nil { log.Warn(ctx, "agent id required for agent connection info") - return + return nil } connInfo, err := client.WorkspaceAgentConnectionInfo(ctx, agentID) if err != nil { - log.Error(ctx, "fetch agent conn info", slog.Error(err), slog.F("agent_id", agentID.String())) - return + return xerrors.Errorf("fetch agent conn info: %w", err) } - m.Lock() n.NetcheckLocal = &connInfo - m.Unlock() - }() + return nil + }) - wg.Wait() + if err := eg.Wait(); err != nil { + log.Error(ctx, "fetch network information", slog.Error(err)) + } return n } func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, workspaceID, agentID uuid.UUID) Workspace { - var ( - w Workspace - m sync.Mutex - wg sync.WaitGroup - ) + var w Workspace if workspaceID == uuid.Nil { log.Error(ctx, "no workspace id specified") @@ -215,58 +181,52 @@ func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger } w.Workspace = ws - wg.Add(1) - go func() { - defer wg.Done() + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { agt, err := client.WorkspaceAgent(ctx, agentID) if err != nil { - log.Error(ctx, "fetch workspace agent", slog.Error(err), slog.F("agent_id", agentID)) + return xerrors.Errorf("fetch workspace agent: %w", err) } - m.Lock() w.Agent = agt - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { buildLogCh, closer, err := client.WorkspaceBuildLogsAfter(ctx, ws.LatestBuild.ID, 0) if err != nil { - log.Error(ctx, "fetch provisioner job logs", slog.Error(err), slog.F("job_id", ws.LatestBuild.Job.ID.String())) - return + return xerrors.Errorf("fetch provisioner job logs: %w", err) } defer closer.Close() var logs []codersdk.ProvisionerJobLog for log := range buildLogCh { logs = append(w.BuildLogs, log) } - m.Lock() w.BuildLogs = logs - m.Unlock() - }() + return nil + }) - wg.Add(1) - go func() { - defer wg.Done() + eg.Go(func() error { if len(w.Workspace.LatestBuild.Resources) == 0 { log.Warn(ctx, "workspace build has no resources") - return + return nil } agentLogCh, closer, err := client.WorkspaceAgentLogsAfter(ctx, agentID, 0, false) if err != nil { - log.Error(ctx, "fetch agent startup logs", slog.Error(err), slog.F("agent_id", agentID.String())) + return xerrors.Errorf("fetch agent startup logs: %w", err) } defer closer.Close() var logs []codersdk.WorkspaceAgentLog for logChunk := range agentLogCh { logs = append(w.AgentStartupLogs, logChunk...) } - m.Lock() w.AgentStartupLogs = logs - m.Unlock() - }() + return nil + }) - wg.Wait() + if err := eg.Wait(); err != nil { + log.Error(ctx, "fetch workspace information", slog.Error(err)) + } return w } From a1e1dc4f21856ed2a3675d941a9a792dfb95f4a8 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 5 Mar 2024 15:45:28 +0000 Subject: [PATCH 3/3] do not early exit on error --- support/support.go | 56 +++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/support/support.go b/support/support.go index 72485a01f4061..0a1e31a968c31 100644 --- a/support/support.go +++ b/support/support.go @@ -5,7 +5,6 @@ import ( "io" "net/http" "strings" - "sync" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" @@ -64,8 +63,11 @@ type Deps struct { func DeploymentInfo(ctx context.Context, client *codersdk.Client, log slog.Logger) Deployment { // Note: each goroutine assigns to a different struct field, hence no mutex. - var d Deployment - eg, ctx := errgroup.WithContext(ctx) + var ( + d Deployment + eg errgroup.Group + ) + eg.Go(func() error { bi, err := client.BuildInfo(ctx) if err != nil { @@ -110,9 +112,11 @@ func DeploymentInfo(ctx context.Context, client *codersdk.Client, log slog.Logge } func NetworkInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, agentID uuid.UUID) Network { - var n Network + var ( + n Network + eg errgroup.Group + ) - eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { coordResp, err := client.Request(ctx, http.MethodGet, "/api/v2/debug/coordinator", nil) if err != nil { @@ -162,7 +166,10 @@ func NetworkInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, } func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger, workspaceID, agentID uuid.UUID) Workspace { - var w Workspace + var ( + w Workspace + eg errgroup.Group + ) if workspaceID == uuid.Nil { log.Error(ctx, "no workspace id specified") @@ -181,8 +188,6 @@ func WorkspaceInfo(ctx context.Context, client *codersdk.Client, log slog.Logger } w.Workspace = ws - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { agt, err := client.WorkspaceAgent(ctx, agentID) if err != nil { @@ -265,35 +270,24 @@ func Run(ctx context.Context, d *Deps) (*Bundle, error) { } } - var ( - wg sync.WaitGroup - m sync.Mutex - ) - wg.Add(1) - go func() { - defer wg.Done() + var eg errgroup.Group + eg.Go(func() error { di := DeploymentInfo(ctx, d.Client, d.Log) - m.Lock() b.Deployment = di - m.Unlock() - }() - wg.Add(1) - go func() { - defer wg.Done() + return nil + }) + eg.Go(func() error { wi := WorkspaceInfo(ctx, d.Client, d.Log, d.WorkspaceID, d.AgentID) - m.Lock() b.Workspace = wi - m.Unlock() - }() - wg.Add(1) - go func() { - defer wg.Done() + return nil + }) + eg.Go(func() error { ni := NetworkInfo(ctx, d.Client, d.Log, d.AgentID) - m.Lock() b.Network = ni - m.Unlock() - }() - wg.Wait() + return nil + }) + + _ = eg.Wait() return &b, nil }