Skip to content

Commit a694bb3

Browse files
committed
move core impl to coderd
1 parent 073c57f commit a694bb3

File tree

13 files changed

+251
-212
lines changed

13 files changed

+251
-212
lines changed

cli/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
723723
options.Database = dbmetrics.New(options.Database, options.PrometheusRegistry)
724724
}
725725

726+
wsUpdates, err := coderd.NewUpdatesProvider(ctx, options.Database, options.Pubsub)
727+
if err != nil {
728+
return xerrors.Errorf("create workspace updates provider: %w", err)
729+
}
730+
options.WorkspaceUpdatesProvider = wsUpdates
731+
defer wsUpdates.Stop()
732+
726733
var deploymentID string
727734
err = options.Database.InTx(func(tx database.Store) error {
728735
// This will block until the lock is acquired, and will be

coderd/apidoc/docs.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ type Options struct {
229229

230230
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
231231

232+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
233+
232234
// This janky function is used in telemetry to parse fields out of the raw
233235
// JWT. It needs to be passed through like this because license parsing is
234236
// under the enterprise license, and can't be imported into AGPL.
@@ -592,12 +594,13 @@ func New(options *Options) *API {
592594
panic("CoordinatorResumeTokenProvider is nil")
593595
}
594596
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
595-
Logger: api.Logger.Named("tailnetclient"),
596-
CoordPtr: &api.TailnetCoordinator,
597-
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
598-
DERPMapFn: api.DERPMap,
599-
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
600-
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
597+
Logger: api.Logger.Named("tailnetclient"),
598+
CoordPtr: &api.TailnetCoordinator,
599+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
600+
DERPMapFn: api.DERPMap,
601+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
602+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
603+
WorkspaceUpdatesProvider: api.Options.WorkspaceUpdatesProvider,
601604
})
602605
if err != nil {
603606
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))

coderd/coderdtest/coderdtest.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ type Options struct {
159159
WorkspaceUsageTrackerFlush chan int
160160
WorkspaceUsageTrackerTick chan time.Time
161161

162+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
163+
162164
NotificationsEnqueuer notifications.Enqueuer
163165
}
164166

@@ -251,6 +253,15 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
251253
options.NotificationsEnqueuer = new(testutil.FakeNotificationsEnqueuer)
252254
}
253255

256+
if options.WorkspaceUpdatesProvider == nil {
257+
var err error
258+
ctx, cancel := context.WithCancel(context.Background())
259+
options.WorkspaceUpdatesProvider, err = coderd.NewUpdatesProvider(ctx, options.Database, options.Pubsub)
260+
require.NoError(t, err)
261+
t.Cleanup(cancel)
262+
t.Cleanup(options.WorkspaceUpdatesProvider.Stop)
263+
}
264+
254265
accessControlStore := &atomic.Pointer[dbauthz.AccessControlStore]{}
255266
var acs dbauthz.AccessControlStore = dbauthz.AGPLTemplateAccessControlStore{}
256267
accessControlStore.Store(&acs)
@@ -524,6 +535,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
524535
HealthcheckTimeout: options.HealthcheckTimeout,
525536
HealthcheckRefresh: options.HealthcheckRefresh,
526537
StatsBatcher: options.StatsBatcher,
538+
WorkspaceUpdatesProvider: options.WorkspaceUpdatesProvider,
527539
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
528540
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
529541
NewTicker: options.NewTicker,

coderd/workspaceagents.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/coder/coder/v2/coderd/externalauth"
3333
"github.com/coder/coder/v2/coderd/httpapi"
3434
"github.com/coder/coder/v2/coderd/httpmw"
35+
"github.com/coder/coder/v2/coderd/rbac"
3536
"github.com/coder/coder/v2/coderd/rbac/policy"
3637
"github.com/coder/coder/v2/codersdk"
3738
"github.com/coder/coder/v2/codersdk/agentsdk"
@@ -1481,6 +1482,89 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R
14811482
}
14821483
}
14831484

1485+
// @Summary Coordinate multiple workspace agents
1486+
// @ID coordinate-multiple-workspace-agents
1487+
// @Security CoderSessionToken
1488+
// @Tags Agents
1489+
// @Success 101
1490+
// @Router /users/me/tailnet [get]
1491+
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
1492+
ctx := r.Context()
1493+
owner := httpmw.UserParam(r)
1494+
ownerRoles := httpmw.UserAuthorization(r)
1495+
1496+
// Check if the actor is allowed to access any workspace owned by the user.
1497+
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
1498+
httpapi.ResourceNotFound(rw)
1499+
return
1500+
}
1501+
1502+
version := "1.0"
1503+
qv := r.URL.Query().Get("version")
1504+
if qv != "" {
1505+
version = qv
1506+
}
1507+
if err := proto.CurrentVersion.Validate(version); err != nil {
1508+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1509+
Message: "Unknown or unsupported API version",
1510+
Validations: []codersdk.ValidationError{
1511+
{Field: "version", Detail: err.Error()},
1512+
},
1513+
})
1514+
return
1515+
}
1516+
1517+
peerID, err := api.handleResumeToken(ctx, rw, r)
1518+
if err != nil {
1519+
// handleResumeToken has already written the response.
1520+
return
1521+
}
1522+
1523+
api.WebsocketWaitMutex.Lock()
1524+
api.WebsocketWaitGroup.Add(1)
1525+
api.WebsocketWaitMutex.Unlock()
1526+
defer api.WebsocketWaitGroup.Done()
1527+
1528+
conn, err := websocket.Accept(rw, r, nil)
1529+
if err != nil {
1530+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1531+
Message: "Failed to accept websocket.",
1532+
Detail: err.Error(),
1533+
})
1534+
return
1535+
}
1536+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
1537+
defer wsNetConn.Close()
1538+
defer conn.Close(websocket.StatusNormalClosure, "")
1539+
1540+
go httpapi.Heartbeat(ctx, conn)
1541+
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
1542+
PeerID: peerID,
1543+
UserID: owner.ID,
1544+
AuthFn: authAgentFn(api.Database, api.Authorizer, &ownerRoles),
1545+
})
1546+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
1547+
_ = conn.Close(websocket.StatusInternalError, err.Error())
1548+
return
1549+
}
1550+
}
1551+
1552+
// authAgentFn accepts a subject, and returns a closure that authorizes against
1553+
// passed agent IDs.
1554+
func authAgentFn(db database.Store, auth rbac.Authorizer, user *rbac.Subject) func(context.Context, uuid.UUID) error {
1555+
return func(ctx context.Context, agentID uuid.UUID) error {
1556+
ws, err := db.GetWorkspaceByAgentID(ctx, agentID)
1557+
if err != nil {
1558+
return xerrors.Errorf("get workspace by agent id: %w", err)
1559+
}
1560+
err = auth.Authorize(ctx, *user, policy.ActionSSH, ws.RBACObject())
1561+
if err != nil {
1562+
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
1563+
}
1564+
return nil
1565+
}
1566+
}
1567+
14841568
// createExternalAuthResponse creates an ExternalAuthResponse based on the
14851569
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
14861570
// which uses `Username` and `Password`.

coderd/workspaces.go

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"io"
109
"net/http"
1110
"slices"
1211
"strconv"
@@ -16,7 +15,6 @@ import (
1615
"github.com/go-chi/chi/v5"
1716
"github.com/google/uuid"
1817
"golang.org/x/xerrors"
19-
"nhooyr.io/websocket"
2018

2119
"cdr.dev/slog"
2220
"github.com/coder/coder/v2/agent/proto"
@@ -38,7 +36,6 @@ import (
3836
"github.com/coder/coder/v2/coderd/wsbuilder"
3937
"github.com/coder/coder/v2/codersdk"
4038
"github.com/coder/coder/v2/codersdk/agentsdk"
41-
"github.com/coder/coder/v2/tailnet"
4239
)
4340

4441
var (
@@ -2109,72 +2106,3 @@ func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAg
21092106
api.Logger.Warn(ctx, "failed to publish workspace agent logs update", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
21102107
}
21112108
}
2112-
2113-
// @Summary Coordinate multiple workspace agents
2114-
// @ID coordinate-multiple-workspace-agents
2115-
// @Security CoderSessionToken
2116-
// @Tags Workspaces
2117-
// @Success 101
2118-
// @Router /users/me/tailnet [get]
2119-
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
2120-
ctx := r.Context()
2121-
owner := httpmw.UserParam(r)
2122-
ownerRoles := httpmw.UserAuthorization(r)
2123-
2124-
// Check if the actor is allowed to access any workspace owned by the user.
2125-
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
2126-
httpapi.ResourceNotFound(rw)
2127-
return
2128-
}
2129-
2130-
version := "1.0"
2131-
qv := r.URL.Query().Get("version")
2132-
if qv != "" {
2133-
version = qv
2134-
}
2135-
if err := proto.CurrentVersion.Validate(version); err != nil {
2136-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2137-
Message: "Unknown or unsupported API version",
2138-
Validations: []codersdk.ValidationError{
2139-
{Field: "version", Detail: err.Error()},
2140-
},
2141-
})
2142-
return
2143-
}
2144-
2145-
peerID, err := api.handleResumeToken(ctx, rw, r)
2146-
if err != nil {
2147-
// handleResumeToken has already written the response.
2148-
return
2149-
}
2150-
2151-
api.WebsocketWaitMutex.Lock()
2152-
api.WebsocketWaitGroup.Add(1)
2153-
api.WebsocketWaitMutex.Unlock()
2154-
defer api.WebsocketWaitGroup.Done()
2155-
2156-
conn, err := websocket.Accept(rw, r, nil)
2157-
if err != nil {
2158-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2159-
Message: "Failed to accept websocket.",
2160-
Detail: err.Error(),
2161-
})
2162-
return
2163-
}
2164-
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
2165-
defer wsNetConn.Close()
2166-
defer conn.Close(websocket.StatusNormalClosure, "")
2167-
2168-
go httpapi.Heartbeat(ctx, conn)
2169-
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
2170-
PeerID: peerID,
2171-
UserID: owner.ID,
2172-
Subject: &ownerRoles,
2173-
Authz: api.Authorizer,
2174-
Database: api.Database,
2175-
})
2176-
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
2177-
_ = conn.Close(websocket.StatusInternalError, err.Error())
2178-
return
2179-
}
2180-
}

0 commit comments

Comments
 (0)