Skip to content

Commit 9b89f3b

Browse files
authored
Merge branch 'coder:main' into patch-2
2 parents 5fc7d4b + 05facc9 commit 9b89f3b

File tree

16 files changed

+680
-332
lines changed

16 files changed

+680
-332
lines changed

.github/actions/setup-tf/action.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ runs:
77
- name: Install Terraform
88
uses: hashicorp/setup-terraform@v3
99
with:
10-
terraform_version: 1.5.7
10+
terraform_version: 1.6.6
1111
terraform_wrapper: false

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ jobs:
142142
143143
# Check for any typos
144144
- name: Check for typos
145-
uses: crate-ci/typos@v1.20.10
145+
uses: crate-ci/typos@v1.21.0
146146
with:
147147
config: .github/workflows/typos.toml
148148

@@ -861,7 +861,7 @@ jobs:
861861
- name: "Dependency Review"
862862
id: review
863863
# TODO: Replace this with the latest release once https://github.com/actions/dependency-review-action/pull/761 is merged.
864-
uses: actions/dependency-review-action@49fbbe0acb033b7824f26d00b005d7d598d76301
864+
uses: actions/dependency-review-action@82ab8f69c78827a746628706b5d2c3f87231fd4c
865865
with:
866866
allow-licenses: Apache-2.0, BSD-2-Clause, BSD-3-Clause, CC0-1.0, ISC, MIT, MIT-0, MPL-2.0
867867
allow-dependencies-licenses: "pkg:golang/github.com/pelletier/go-toml/v2"

agent/agent.go

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -807,56 +807,48 @@ func (a *agent) run() (retErr error) {
807807
// coordination <--------------------------+
808808
// derp map subscriber <----------------+
809809
// stats report loop <---------------+
810-
networkOK := make(chan struct{})
811-
manifestOK := make(chan struct{})
810+
networkOK := newCheckpoint(a.logger)
811+
manifestOK := newCheckpoint(a.logger)
812812

813813
connMan.start("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
814814

815815
connMan.start("app health reporter", gracefulShutdownBehaviorStop,
816816
func(ctx context.Context, conn drpc.Conn) error {
817-
select {
818-
case <-ctx.Done():
819-
return nil
820-
case <-manifestOK:
821-
manifest := a.manifest.Load()
822-
NewWorkspaceAppHealthReporter(
823-
a.logger, manifest.Apps, agentsdk.AppHealthPoster(proto.NewDRPCAgentClient(conn)),
824-
)(ctx)
825-
return nil
817+
if err := manifestOK.wait(ctx); err != nil {
818+
return xerrors.Errorf("no manifest: %w", err)
826819
}
820+
manifest := a.manifest.Load()
821+
NewWorkspaceAppHealthReporter(
822+
a.logger, manifest.Apps, agentsdk.AppHealthPoster(proto.NewDRPCAgentClient(conn)),
823+
)(ctx)
824+
return nil
827825
})
828826

829827
connMan.start("create or update network", gracefulShutdownBehaviorStop,
830828
a.createOrUpdateNetwork(manifestOK, networkOK))
831829

832830
connMan.start("coordination", gracefulShutdownBehaviorStop,
833831
func(ctx context.Context, conn drpc.Conn) error {
834-
select {
835-
case <-ctx.Done():
836-
return nil
837-
case <-networkOK:
832+
if err := networkOK.wait(ctx); err != nil {
833+
return xerrors.Errorf("no network: %w", err)
838834
}
839835
return a.runCoordinator(ctx, conn, a.network)
840836
},
841837
)
842838

843839
connMan.start("derp map subscriber", gracefulShutdownBehaviorStop,
844840
func(ctx context.Context, conn drpc.Conn) error {
845-
select {
846-
case <-ctx.Done():
847-
return nil
848-
case <-networkOK:
841+
if err := networkOK.wait(ctx); err != nil {
842+
return xerrors.Errorf("no network: %w", err)
849843
}
850844
return a.runDERPMapSubscriber(ctx, conn, a.network)
851845
})
852846

853847
connMan.start("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
854848

855849
connMan.start("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, conn drpc.Conn) error {
856-
select {
857-
case <-ctx.Done():
858-
return nil
859-
case <-networkOK:
850+
if err := networkOK.wait(ctx); err != nil {
851+
return xerrors.Errorf("no network: %w", err)
860852
}
861853
return a.statsReporter.reportLoop(ctx, proto.NewDRPCAgentClient(conn))
862854
})
@@ -865,8 +857,17 @@ func (a *agent) run() (retErr error) {
865857
}
866858

867859
// handleManifest returns a function that fetches and processes the manifest
868-
func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Context, conn drpc.Conn) error {
860+
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, conn drpc.Conn) error {
869861
return func(ctx context.Context, conn drpc.Conn) error {
862+
var (
863+
sentResult = false
864+
err error
865+
)
866+
defer func() {
867+
if !sentResult {
868+
manifestOK.complete(err)
869+
}
870+
}()
870871
aAPI := proto.NewDRPCAgentClient(conn)
871872
mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{})
872873
if err != nil {
@@ -903,14 +904,12 @@ func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Cont
903904
Subsystems: subsys,
904905
}})
905906
if err != nil {
906-
if xerrors.Is(err, context.Canceled) {
907-
return nil
908-
}
909907
return xerrors.Errorf("update workspace agent startup: %w", err)
910908
}
911909

912910
oldManifest := a.manifest.Swap(&manifest)
913-
close(manifestOK)
911+
manifestOK.complete(nil)
912+
sentResult = true
914913

915914
// The startup script should only execute on the first run!
916915
if oldManifest == nil {
@@ -971,14 +970,15 @@ func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Cont
971970

972971
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
973972
// the tailnet using the information in the manifest
974-
func (a *agent) createOrUpdateNetwork(manifestOK <-chan struct{}, networkOK chan<- struct{}) func(context.Context, drpc.Conn) error {
975-
return func(ctx context.Context, _ drpc.Conn) error {
976-
select {
977-
case <-ctx.Done():
978-
return nil
979-
case <-manifestOK:
973+
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, drpc.Conn) error {
974+
return func(ctx context.Context, _ drpc.Conn) (retErr error) {
975+
if err := manifestOK.wait(ctx); err != nil {
976+
return xerrors.Errorf("no manifest: %w", err)
980977
}
981978
var err error
979+
defer func() {
980+
networkOK.complete(retErr)
981+
}()
982982
manifest := a.manifest.Load()
983983
a.closeMutex.Lock()
984984
network := a.network
@@ -1014,7 +1014,6 @@ func (a *agent) createOrUpdateNetwork(manifestOK <-chan struct{}, networkOK chan
10141014
network.SetDERPForceWebSockets(manifest.DERPForceWebSockets)
10151015
network.SetBlockEndpoints(manifest.DisableDirectConnections)
10161016
}
1017-
close(networkOK)
10181017
return nil
10191018
}
10201019
}

agent/checkpoint.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"runtime"
6+
"sync"
7+
8+
"cdr.dev/slog"
9+
)
10+
11+
// checkpoint allows a goroutine to communicate when it is OK to proceed beyond some async condition
12+
// to other dependent goroutines.
13+
type checkpoint struct {
14+
logger slog.Logger
15+
mu sync.Mutex
16+
called bool
17+
done chan struct{}
18+
err error
19+
}
20+
21+
// complete the checkpoint. Pass nil to indicate the checkpoint was ok. It is an error to call this
22+
// more than once.
23+
func (c *checkpoint) complete(err error) {
24+
c.mu.Lock()
25+
defer c.mu.Unlock()
26+
if c.called {
27+
b := make([]byte, 2048)
28+
n := runtime.Stack(b, false)
29+
c.logger.Critical(context.Background(), "checkpoint complete called more than once", slog.F("stacktrace", b[:n]))
30+
return
31+
}
32+
c.called = true
33+
c.err = err
34+
close(c.done)
35+
}
36+
37+
func (c *checkpoint) wait(ctx context.Context) error {
38+
select {
39+
case <-ctx.Done():
40+
return ctx.Err()
41+
case <-c.done:
42+
return c.err
43+
}
44+
}
45+
46+
func newCheckpoint(logger slog.Logger) *checkpoint {
47+
return &checkpoint{
48+
logger: logger,
49+
done: make(chan struct{}),
50+
}
51+
}

agent/checkpoint_internal_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package agent
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/xerrors"
8+
9+
"cdr.dev/slog/sloggers/slogtest"
10+
"github.com/coder/coder/v2/testutil"
11+
)
12+
13+
func TestCheckpoint_CompleteWait(t *testing.T) {
14+
t.Parallel()
15+
logger := slogtest.Make(t, nil)
16+
ctx := testutil.Context(t, testutil.WaitShort)
17+
uut := newCheckpoint(logger)
18+
err := xerrors.New("test")
19+
uut.complete(err)
20+
got := uut.wait(ctx)
21+
require.Equal(t, err, got)
22+
}
23+
24+
func TestCheckpoint_CompleteTwice(t *testing.T) {
25+
t.Parallel()
26+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
27+
ctx := testutil.Context(t, testutil.WaitShort)
28+
uut := newCheckpoint(logger)
29+
err := xerrors.New("test")
30+
uut.complete(err)
31+
uut.complete(nil) // drops CRITICAL log
32+
got := uut.wait(ctx)
33+
require.Equal(t, err, got)
34+
}
35+
36+
func TestCheckpoint_WaitComplete(t *testing.T) {
37+
t.Parallel()
38+
logger := slogtest.Make(t, nil)
39+
ctx := testutil.Context(t, testutil.WaitShort)
40+
uut := newCheckpoint(logger)
41+
err := xerrors.New("test")
42+
errCh := make(chan error, 1)
43+
go func() {
44+
errCh <- uut.wait(ctx)
45+
}()
46+
uut.complete(err)
47+
got := testutil.RequireRecvCtx(ctx, t, errCh)
48+
require.Equal(t, err, got)
49+
}

coderd/database/dbpurge/dbpurge_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package dbpurge_test
22

33
import (
4+
"bufio"
5+
"bytes"
46
"context"
57
"database/sql"
8+
"encoding/json"
9+
"fmt"
610
"testing"
711
"time"
812

@@ -50,50 +54,59 @@ func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
5054
if t.Failed() {
5155
t.Logf("Test failed, printing rows...")
5256
ctx := testutil.Context(t, testutil.WaitShort)
57+
buf := &bytes.Buffer{}
58+
enc := json.NewEncoder(buf)
59+
enc.SetIndent("", "\t")
5360
wasRows, err := db.GetWorkspaceAgentStats(ctx, now.AddDate(0, -7, 0))
5461
if err == nil {
55-
for _, row := range wasRows {
56-
t.Logf("workspace agent stat: %v", row)
57-
}
62+
_, _ = fmt.Fprintf(buf, "workspace agent stats: ")
63+
_ = enc.Encode(wasRows)
5864
}
5965
tusRows, err := db.GetTemplateUsageStats(context.Background(), database.GetTemplateUsageStatsParams{
6066
StartTime: now.AddDate(0, -7, 0),
6167
EndTime: now,
6268
})
6369
if err == nil {
64-
for _, row := range tusRows {
65-
t.Logf("template usage stat: %v", row)
66-
}
70+
_, _ = fmt.Fprintf(buf, "template usage stats: ")
71+
_ = enc.Encode(tusRows)
6772
}
73+
s := bufio.NewScanner(buf)
74+
for s.Scan() {
75+
t.Log(s.Text())
76+
}
77+
_ = s.Err()
6878
}
6979
}()
7080

7181
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
7282
defer cancel()
7383

7484
// given
85+
// Note: We use increments of 2 hours to ensure we avoid any DST
86+
// conflicts, verifying DST behavior is beyond the scope of this
87+
// test.
7588
// Let's use RxBytes to identify stat entries.
76-
// Stat inserted 6 months + 1 hour ago, should be deleted.
89+
// Stat inserted 6 months + 2 hour ago, should be deleted.
7790
first := dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
78-
CreatedAt: now.AddDate(0, -6, 0).Add(-time.Hour),
91+
CreatedAt: now.AddDate(0, -6, 0).Add(-2 * time.Hour),
7992
ConnectionCount: 1,
8093
ConnectionMedianLatencyMS: 1,
8194
RxBytes: 1111,
8295
SessionCountSSH: 1,
8396
})
8497

85-
// Stat inserted 6 months - 1 hour ago, should not be deleted before rollup.
98+
// Stat inserted 6 months - 2 hour ago, should not be deleted before rollup.
8699
second := dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
87-
CreatedAt: now.AddDate(0, -6, 0).Add(time.Hour),
100+
CreatedAt: now.AddDate(0, -6, 0).Add(2 * time.Hour),
88101
ConnectionCount: 1,
89102
ConnectionMedianLatencyMS: 1,
90103
RxBytes: 2222,
91104
SessionCountSSH: 1,
92105
})
93106

94-
// Stat inserted 6 months - 1 day - 2 hour ago, should not be deleted at all.
107+
// Stat inserted 6 months - 1 day - 4 hour ago, should not be deleted at all.
95108
third := dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{
96-
CreatedAt: now.AddDate(0, -6, 0).AddDate(0, 0, 1).Add(2 * time.Hour),
109+
CreatedAt: now.AddDate(0, -6, 0).AddDate(0, 0, 1).Add(4 * time.Hour),
97110
ConnectionCount: 1,
98111
ConnectionMedianLatencyMS: 1,
99112
RxBytes: 3333,

codersdk/agentsdk/agentsdk.go

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"io"
8-
"net"
98
"net/http"
109
"net/http/cookiejar"
1110
"net/url"
@@ -206,14 +205,11 @@ func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
206205
return nil, codersdk.ReadBodyAsError(res)
207206
}
208207

209-
_, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
208+
// Set the read limit to 4 MiB -- about the limit for protobufs. This needs to be larger than
209+
// the default because some of our protocols can include large messages like startup scripts.
210+
conn.SetReadLimit(1 << 22)
211+
netConn := websocket.NetConn(ctx, conn, websocket.MessageBinary)
210212

211-
netConn := &closeNetConn{
212-
Conn: wsNetConn,
213-
closeFunc: func() {
214-
_ = conn.Close(websocket.StatusGoingAway, "ConnectRPC closed")
215-
},
216-
}
217213
config := yamux.DefaultConfig()
218214
config.LogOutput = nil
219215
config.Logger = slog.Stdlib(ctx, c.SDK.Logger(), slog.LevelInfo)
@@ -618,13 +614,3 @@ func LogsNotifyChannel(agentID uuid.UUID) string {
618614
type LogsNotifyMessage struct {
619615
CreatedAfter int64 `json:"created_after"`
620616
}
621-
622-
type closeNetConn struct {
623-
net.Conn
624-
closeFunc func()
625-
}
626-
627-
func (c *closeNetConn) Close() error {
628-
c.closeFunc()
629-
return c.Conn.Close()
630-
}

0 commit comments

Comments
 (0)