Skip to content

Commit 5417e09

Browse files
committed
Merge remote-tracking branch 'origin/main' into user-params
2 parents 69ed7c0 + 383eed9 commit 5417e09

File tree

95 files changed

+1770
-809
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+1770
-809
lines changed

.github/workflows/ci.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ jobs:
141141
142142
# Check for any typos
143143
- name: Check for typos
144-
uses: crate-ci/typos@v1.17.1
144+
uses: crate-ci/typos@v1.17.2
145145
with:
146146
config: .github/workflows/typos.toml
147147

.github/workflows/pr-auto-assign.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ jobs:
1414
runs-on: ubuntu-latest
1515
steps:
1616
- name: Assign author
17-
uses: toshimaru/auto-author-assign@v2.0.1
17+
uses: toshimaru/auto-author-assign@v2.1.0

Makefile

+6-2
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,9 @@ gen: \
476476
site/e2e/provisionerGenerated.ts \
477477
site/src/theme/icons.json \
478478
examples/examples.gen.json \
479-
tailnet/tailnettest/coordinatormock.go
479+
tailnet/tailnettest/coordinatormock.go \
480+
tailnet/tailnettest/coordinateemock.go \
481+
tailnet/tailnettest/multiagentmock.go
480482
.PHONY: gen
481483

482484
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -504,6 +506,8 @@ gen/mark-fresh:
504506
site/src/theme/icons.json \
505507
examples/examples.gen.json \
506508
tailnet/tailnettest/coordinatormock.go \
509+
tailnet/tailnettest/coordinateemock.go \
510+
tailnet/tailnettest/multiagentmock.go \
507511
"
508512
for file in $$files; do
509513
echo "$$file"
@@ -531,7 +535,7 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
531535
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
532536
go generate ./coderd/database/dbmock/
533537

534-
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
538+
tailnet/tailnettest/coordinatormock.go tailnet/tailnettest/multiagentmock.go tailnet/tailnettest/coordinateemock.go: tailnet/coordinator.go tailnet/multiagent.go
535539
go generate ./tailnet/tailnettest/
536540

537541
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto

agent/agent.go

+34-37
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Options struct {
8989
type Client interface {
9090
Manifest(ctx context.Context) (agentsdk.Manifest, error)
9191
Listen(ctx context.Context) (drpc.Conn, error)
92-
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
9392
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9493
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9594
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
@@ -822,10 +821,22 @@ func (a *agent) run(ctx context.Context) error {
822821
network.SetBlockEndpoints(manifest.DisableDirectConnections)
823822
}
824823

824+
// Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
825+
conn, err := a.client.Listen(ctx)
826+
if err != nil {
827+
return err
828+
}
829+
defer func() {
830+
cErr := conn.Close()
831+
if cErr != nil {
832+
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
833+
}
834+
}()
835+
825836
eg, egCtx := errgroup.WithContext(ctx)
826837
eg.Go(func() error {
827838
a.logger.Debug(egCtx, "running tailnet connection coordinator")
828-
err := a.runCoordinator(egCtx, network)
839+
err := a.runCoordinator(egCtx, conn, network)
829840
if err != nil {
830841
return xerrors.Errorf("run coordinator: %w", err)
831842
}
@@ -834,7 +845,7 @@ func (a *agent) run(ctx context.Context) error {
834845

835846
eg.Go(func() error {
836847
a.logger.Debug(egCtx, "running derp map subscriber")
837-
err := a.runDERPMapSubscriber(egCtx, network)
848+
err := a.runDERPMapSubscriber(egCtx, conn, network)
838849
if err != nil {
839850
return xerrors.Errorf("run derp map subscriber: %w", err)
840851
}
@@ -1056,21 +1067,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
10561067

10571068
// runCoordinator runs a coordinator and returns whether a reconnect
10581069
// should occur.
1059-
func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error {
1060-
ctx, cancel := context.WithCancel(ctx)
1061-
defer cancel()
1062-
1063-
conn, err := a.client.Listen(ctx)
1064-
if err != nil {
1065-
return err
1066-
}
1067-
defer func() {
1068-
cErr := conn.Close()
1069-
if cErr != nil {
1070-
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
1071-
}
1072-
}()
1073-
1070+
func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
1071+
defer a.logger.Debug(ctx, "disconnected from coordination RPC")
10741072
tClient := tailnetproto.NewDRPCTailnetClient(conn)
10751073
coordinate, err := tClient.Coordinate(ctx)
10761074
if err != nil {
@@ -1082,7 +1080,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10821080
a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
10831081
}
10841082
}()
1085-
a.logger.Info(ctx, "connected to coordination endpoint")
1083+
a.logger.Info(ctx, "connected to coordination RPC")
10861084
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
10871085
select {
10881086
case <-ctx.Done():
@@ -1093,30 +1091,29 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10931091
}
10941092

10951093
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1096-
func (a *agent) runDERPMapSubscriber(ctx context.Context, network *tailnet.Conn) error {
1094+
func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
1095+
defer a.logger.Debug(ctx, "disconnected from derp map RPC")
10971096
ctx, cancel := context.WithCancel(ctx)
10981097
defer cancel()
1099-
1100-
updates, closer, err := a.client.DERPMapUpdates(ctx)
1098+
tClient := tailnetproto.NewDRPCTailnetClient(conn)
1099+
stream, err := tClient.StreamDERPMaps(ctx, &tailnetproto.StreamDERPMapsRequest{})
11011100
if err != nil {
1102-
return err
1101+
return xerrors.Errorf("stream DERP Maps: %w", err)
11031102
}
1104-
defer closer.Close()
1105-
1106-
a.logger.Info(ctx, "connected to derp map endpoint")
1103+
defer func() {
1104+
cErr := stream.Close()
1105+
if cErr != nil {
1106+
a.logger.Debug(ctx, "error closing DERPMap stream", slog.Error(err))
1107+
}
1108+
}()
1109+
a.logger.Info(ctx, "connected to derp map RPC")
11071110
for {
1108-
select {
1109-
case <-ctx.Done():
1110-
return ctx.Err()
1111-
case update := <-updates:
1112-
if update.Err != nil {
1113-
return update.Err
1114-
}
1115-
if update.DERPMap != nil && !tailnet.CompareDERPMaps(network.DERPMap(), update.DERPMap) {
1116-
a.logger.Info(ctx, "updating derp map due to detected changes")
1117-
network.SetDERPMap(update.DERPMap)
1118-
}
1111+
dmp, err := stream.Recv()
1112+
if err != nil {
1113+
return xerrors.Errorf("recv DERPMap error: %w", err)
11191114
}
1115+
dm := tailnet.DERPMapFromProto(dmp)
1116+
network.SetDERPMap(dm)
11201117
}
11211118
}
11221119

agent/agent_test.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ func TestAgent_Lifecycle(t *testing.T) {
13491349
make(chan *agentsdk.Stats, 50),
13501350
tailnet.NewCoordinator(logger),
13511351
)
1352+
defer client.Close()
13521353

13531354
fs := afero.NewMemMapFs()
13541355
agent := agent.New(agent.Options{
@@ -1683,13 +1684,18 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16831684
statsCh,
16841685
coordinator,
16851686
)
1687+
t.Cleanup(func() {
1688+
t.Log("closing client")
1689+
client.Close()
1690+
})
16861691
uut := agent.New(agent.Options{
16871692
Client: client,
16881693
Filesystem: fs,
16891694
Logger: logger.Named("agent"),
16901695
ReconnectingPTYTimeout: time.Minute,
16911696
})
16921697
t.Cleanup(func() {
1698+
t.Log("closing agent")
16931699
_ = uut.Close()
16941700
})
16951701

@@ -1718,6 +1724,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17181724
if err != nil {
17191725
t.Logf("error closing in-memory coordination: %s", err.Error())
17201726
}
1727+
t.Logf("closed coordination %s", name)
17211728
})
17221729
// Force DERP.
17231730
conn.SetBlockEndpoints(true)
@@ -1753,11 +1760,9 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17531760
}
17541761

17551762
// Push a new DERP map to the agent.
1756-
err := client.PushDERPMapUpdate(agentsdk.DERPMapUpdate{
1757-
DERPMap: newDerpMap,
1758-
})
1763+
err := client.PushDERPMapUpdate(newDerpMap)
17591764
require.NoError(t, err)
1760-
t.Logf("client Pushed DERPMap update")
1765+
t.Logf("pushed DERPMap update to agent")
17611766

17621767
require.Eventually(t, func() bool {
17631768
conn := uut.TailnetConn()
@@ -1826,6 +1831,7 @@ func TestAgent_Reconnect(t *testing.T) {
18261831
statsCh,
18271832
coordinator,
18281833
)
1834+
defer client.Close()
18291835
initialized := atomic.Int32{}
18301836
closer := agent.New(agent.Options{
18311837
ExchangeToken: func(ctx context.Context) (string, error) {
@@ -1862,6 +1868,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
18621868
make(chan *agentsdk.Stats, 50),
18631869
coordinator,
18641870
)
1871+
defer client.Close()
18651872
filesystem := afero.NewMemMapFs()
18661873
closer := agent.New(agent.Options{
18671874
ExchangeToken: func(ctx context.Context) (string, error) {
@@ -2039,6 +2046,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20392046
statsCh := make(chan *agentsdk.Stats, 50)
20402047
fs := afero.NewMemMapFs()
20412048
c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
2049+
t.Cleanup(c.Close)
20422050

20432051
options := agent.Options{
20442052
Client: c,

agent/agentssh/agentssh.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,11 @@ func (s *Server) CreateCommand(ctx context.Context, script string, env []string)
681681

682682
// This adds the ports dialog to code-server that enables
683683
// proxying a port dynamically.
684-
cmd.Env = append(cmd.Env, fmt.Sprintf("VSCODE_PROXY_URI=%s", manifest.VSCodePortProxyURI))
684+
// If this is empty string, do not set anything. Code-server auto defaults
685+
// using its basepath to construct a path based port proxy.
686+
if manifest.VSCodePortProxyURI != "" {
687+
cmd.Env = append(cmd.Env, fmt.Sprintf("VSCODE_PROXY_URI=%s", manifest.VSCodePortProxyURI))
688+
}
685689

686690
// Hide Coder message on code-server's "Getting Started" page
687691
cmd.Env = append(cmd.Env, "CS_DISABLE_GETTING_STARTED_OVERRIDE=true")

agent/agenttest/client.go

+15-21
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ func NewClient(t testing.TB,
3939
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
4040
coordPtr.Store(&coordinator)
4141
mux := drpcmux.New()
42+
derpMapUpdates := make(chan *tailcfg.DERPMap)
4243
drpcService := &tailnet.DRPCService{
43-
CoordPtr: &coordPtr,
44-
Logger: logger,
45-
// TODO: handle DERPMap too!
46-
DerpMapUpdateFrequency: time.Hour,
47-
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
44+
CoordPtr: &coordPtr,
45+
Logger: logger,
46+
DerpMapUpdateFrequency: time.Microsecond,
47+
DerpMapFn: func() *tailcfg.DERPMap { return <-derpMapUpdates },
4848
}
4949
err := proto.DRPCRegisterTailnet(mux, drpcService)
5050
require.NoError(t, err)
@@ -64,7 +64,7 @@ func NewClient(t testing.TB,
6464
statsChan: statsChan,
6565
coordinator: coordinator,
6666
server: server,
67-
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
67+
derpMapUpdates: derpMapUpdates,
6868
}
6969
}
7070

@@ -85,23 +85,26 @@ type Client struct {
8585
lifecycleStates []codersdk.WorkspaceAgentLifecycle
8686
startup agentsdk.PostStartupRequest
8787
logs []agentsdk.Log
88-
derpMapUpdates chan agentsdk.DERPMapUpdate
88+
derpMapUpdates chan *tailcfg.DERPMap
89+
derpMapOnce sync.Once
90+
}
91+
92+
func (c *Client) Close() {
93+
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
8994
}
9095

9196
func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
9297
return c.manifest, nil
9398
}
9499

95-
func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
100+
func (c *Client) Listen(ctx context.Context) (drpc.Conn, error) {
96101
conn, lis := drpcsdk.MemTransportPipe()
97-
closed := make(chan struct{})
98102
c.LastWorkspaceAgent = func() {
99103
_ = conn.Close()
100104
_ = lis.Close()
101-
<-closed
102105
}
103106
c.t.Cleanup(c.LastWorkspaceAgent)
104-
serveCtx, cancel := context.WithCancel(context.Background())
107+
serveCtx, cancel := context.WithCancel(ctx)
105108
c.t.Cleanup(cancel)
106109
auth := tailnet.AgentTunnelAuth{}
107110
streamID := tailnet.StreamID{
@@ -112,7 +115,6 @@ func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
112115
serveCtx = tailnet.WithStreamID(serveCtx, streamID)
113116
go func() {
114117
_ = c.server.Serve(serveCtx, lis)
115-
close(closed)
116118
}()
117119
return conn, nil
118120
}
@@ -235,7 +237,7 @@ func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerCo
235237
return codersdk.ServiceBannerConfig{}, nil
236238
}
237239

238-
func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
240+
func (c *Client) PushDERPMapUpdate(update *tailcfg.DERPMap) error {
239241
timer := time.NewTimer(testutil.WaitShort)
240242
defer timer.Stop()
241243
select {
@@ -247,14 +249,6 @@ func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
247249
return nil
248250
}
249251

250-
func (c *Client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
251-
closed := make(chan struct{})
252-
return c.derpMapUpdates, closeFunc(func() error {
253-
close(closed)
254-
return nil
255-
}), nil
256-
}
257-
258252
type closeFunc func() error
259253

260254
func (c closeFunc) Close() error {

agent/proto/version.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package proto
2+
3+
import (
4+
"github.com/coder/coder/v2/tailnet/proto"
5+
)
6+
7+
// CurrentVersion is the current version of the agent API. It is tied to the
8+
// tailnet API version to avoid confusion, since agents connect to the tailnet
9+
// API over the same websocket.
10+
var CurrentVersion = proto.CurrentVersion

cli/restart.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (r *RootCmd) restart() *clibase.Cmd {
6464
// It's possible for a workspace build to fail due to the template requiring starting
6565
// workspaces with the active version.
6666
if cerr, ok := codersdk.AsError(err); ok && cerr.StatusCode() == http.StatusForbidden {
67-
_, _ = fmt.Fprintln(inv.Stdout, "Failed to restart with the template version from your last build. Policy may require you to restart with the current active template version.")
67+
_, _ = fmt.Fprintln(inv.Stdout, "Unable to restart the workspace with the template version from the last build. Policy may require you to restart with the current active template version.")
6868
build, err = startWorkspace(inv, client, workspace, parameterFlags, WorkspaceUpdate)
6969
if err != nil {
7070
return xerrors.Errorf("start workspace with active template version: %w", err)

0 commit comments

Comments
 (0)