Skip to content

Commit ab6e559

Browse files
committed
move core impl to coderd
1 parent 2c7fff2 commit ab6e559

File tree

13 files changed

+251
-212
lines changed

13 files changed

+251
-212
lines changed

cli/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
723723
options.Database = dbmetrics.New(options.Database, options.PrometheusRegistry)
724724
}
725725

726+
wsUpdates, err := coderd.NewUpdatesProvider(ctx, options.Database, options.Pubsub)
727+
if err != nil {
728+
return xerrors.Errorf("create workspace updates provider: %w", err)
729+
}
730+
options.WorkspaceUpdatesProvider = wsUpdates
731+
defer wsUpdates.Stop()
732+
726733
var deploymentID string
727734
err = options.Database.InTx(func(tx database.Store) error {
728735
// This will block until the lock is acquired, and will be

coderd/apidoc/docs.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ type Options struct {
229229

230230
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
231231

232+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
233+
232234
// This janky function is used in telemetry to parse fields out of the raw
233235
// JWT. It needs to be passed through like this because license parsing is
234236
// under the enterprise license, and can't be imported into AGPL.
@@ -592,12 +594,13 @@ func New(options *Options) *API {
592594
panic("CoordinatorResumeTokenProvider is nil")
593595
}
594596
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
595-
Logger: api.Logger.Named("tailnetclient"),
596-
CoordPtr: &api.TailnetCoordinator,
597-
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
598-
DERPMapFn: api.DERPMap,
599-
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
600-
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
597+
Logger: api.Logger.Named("tailnetclient"),
598+
CoordPtr: &api.TailnetCoordinator,
599+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
600+
DERPMapFn: api.DERPMap,
601+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
602+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
603+
WorkspaceUpdatesProvider: api.Options.WorkspaceUpdatesProvider,
601604
})
602605
if err != nil {
603606
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))

coderd/coderdtest/coderdtest.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ type Options struct {
159159
WorkspaceUsageTrackerFlush chan int
160160
WorkspaceUsageTrackerTick chan time.Time
161161

162+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
163+
162164
NotificationsEnqueuer notifications.Enqueuer
163165
}
164166

@@ -251,6 +253,15 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
251253
options.NotificationsEnqueuer = new(testutil.FakeNotificationsEnqueuer)
252254
}
253255

256+
if options.WorkspaceUpdatesProvider == nil {
257+
var err error
258+
ctx, cancel := context.WithCancel(context.Background())
259+
options.WorkspaceUpdatesProvider, err = coderd.NewUpdatesProvider(ctx, options.Database, options.Pubsub)
260+
require.NoError(t, err)
261+
t.Cleanup(cancel)
262+
t.Cleanup(options.WorkspaceUpdatesProvider.Stop)
263+
}
264+
254265
accessControlStore := &atomic.Pointer[dbauthz.AccessControlStore]{}
255266
var acs dbauthz.AccessControlStore = dbauthz.AGPLTemplateAccessControlStore{}
256267
accessControlStore.Store(&acs)
@@ -524,6 +535,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
524535
HealthcheckTimeout: options.HealthcheckTimeout,
525536
HealthcheckRefresh: options.HealthcheckRefresh,
526537
StatsBatcher: options.StatsBatcher,
538+
WorkspaceUpdatesProvider: options.WorkspaceUpdatesProvider,
527539
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
528540
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
529541
NewTicker: options.NewTicker,

coderd/workspaceagents.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/coder/coder/v2/coderd/externalauth"
3333
"github.com/coder/coder/v2/coderd/httpapi"
3434
"github.com/coder/coder/v2/coderd/httpmw"
35+
"github.com/coder/coder/v2/coderd/rbac"
3536
"github.com/coder/coder/v2/coderd/rbac/policy"
3637
"github.com/coder/coder/v2/coderd/wspubsub"
3738
"github.com/coder/coder/v2/codersdk"
@@ -1482,6 +1483,89 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R
14821483
}
14831484
}
14841485

1486+
// @Summary Coordinate multiple workspace agents
1487+
// @ID coordinate-multiple-workspace-agents
1488+
// @Security CoderSessionToken
1489+
// @Tags Agents
1490+
// @Success 101
1491+
// @Router /users/me/tailnet [get]
1492+
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
1493+
ctx := r.Context()
1494+
owner := httpmw.UserParam(r)
1495+
ownerRoles := httpmw.UserAuthorization(r)
1496+
1497+
// Check if the actor is allowed to access any workspace owned by the user.
1498+
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
1499+
httpapi.ResourceNotFound(rw)
1500+
return
1501+
}
1502+
1503+
version := "1.0"
1504+
qv := r.URL.Query().Get("version")
1505+
if qv != "" {
1506+
version = qv
1507+
}
1508+
if err := proto.CurrentVersion.Validate(version); err != nil {
1509+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1510+
Message: "Unknown or unsupported API version",
1511+
Validations: []codersdk.ValidationError{
1512+
{Field: "version", Detail: err.Error()},
1513+
},
1514+
})
1515+
return
1516+
}
1517+
1518+
peerID, err := api.handleResumeToken(ctx, rw, r)
1519+
if err != nil {
1520+
// handleResumeToken has already written the response.
1521+
return
1522+
}
1523+
1524+
api.WebsocketWaitMutex.Lock()
1525+
api.WebsocketWaitGroup.Add(1)
1526+
api.WebsocketWaitMutex.Unlock()
1527+
defer api.WebsocketWaitGroup.Done()
1528+
1529+
conn, err := websocket.Accept(rw, r, nil)
1530+
if err != nil {
1531+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1532+
Message: "Failed to accept websocket.",
1533+
Detail: err.Error(),
1534+
})
1535+
return
1536+
}
1537+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
1538+
defer wsNetConn.Close()
1539+
defer conn.Close(websocket.StatusNormalClosure, "")
1540+
1541+
go httpapi.Heartbeat(ctx, conn)
1542+
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
1543+
PeerID: peerID,
1544+
UserID: owner.ID,
1545+
AuthFn: authAgentFn(api.Database, api.Authorizer, &ownerRoles),
1546+
})
1547+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
1548+
_ = conn.Close(websocket.StatusInternalError, err.Error())
1549+
return
1550+
}
1551+
}
1552+
1553+
// authAgentFn accepts a subject, and returns a closure that authorizes against
1554+
// passed agent IDs.
1555+
func authAgentFn(db database.Store, auth rbac.Authorizer, user *rbac.Subject) func(context.Context, uuid.UUID) error {
1556+
return func(ctx context.Context, agentID uuid.UUID) error {
1557+
ws, err := db.GetWorkspaceByAgentID(ctx, agentID)
1558+
if err != nil {
1559+
return xerrors.Errorf("get workspace by agent id: %w", err)
1560+
}
1561+
err = auth.Authorize(ctx, *user, policy.ActionSSH, ws.RBACObject())
1562+
if err != nil {
1563+
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
1564+
}
1565+
return nil
1566+
}
1567+
}
1568+
14851569
// createExternalAuthResponse creates an ExternalAuthResponse based on the
14861570
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
14871571
// which uses `Username` and `Password`.

coderd/workspaces.go

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"io"
109
"net/http"
1110
"slices"
1211
"strconv"
@@ -16,7 +15,6 @@ import (
1615
"github.com/go-chi/chi/v5"
1716
"github.com/google/uuid"
1817
"golang.org/x/xerrors"
19-
"nhooyr.io/websocket"
2018

2119
"cdr.dev/slog"
2220
"github.com/coder/coder/v2/agent/proto"
@@ -39,7 +37,6 @@ import (
3937
"github.com/coder/coder/v2/coderd/wspubsub"
4038
"github.com/coder/coder/v2/codersdk"
4139
"github.com/coder/coder/v2/codersdk/agentsdk"
42-
"github.com/coder/coder/v2/tailnet"
4340
)
4441

4542
var (
@@ -2110,72 +2107,3 @@ func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAg
21102107
api.Logger.Warn(ctx, "failed to publish workspace agent logs update", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
21112108
}
21122109
}
2113-
2114-
// @Summary Coordinate multiple workspace agents
2115-
// @ID coordinate-multiple-workspace-agents
2116-
// @Security CoderSessionToken
2117-
// @Tags Workspaces
2118-
// @Success 101
2119-
// @Router /users/me/tailnet [get]
2120-
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
2121-
ctx := r.Context()
2122-
owner := httpmw.UserParam(r)
2123-
ownerRoles := httpmw.UserAuthorization(r)
2124-
2125-
// Check if the actor is allowed to access any workspace owned by the user.
2126-
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
2127-
httpapi.ResourceNotFound(rw)
2128-
return
2129-
}
2130-
2131-
version := "1.0"
2132-
qv := r.URL.Query().Get("version")
2133-
if qv != "" {
2134-
version = qv
2135-
}
2136-
if err := proto.CurrentVersion.Validate(version); err != nil {
2137-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2138-
Message: "Unknown or unsupported API version",
2139-
Validations: []codersdk.ValidationError{
2140-
{Field: "version", Detail: err.Error()},
2141-
},
2142-
})
2143-
return
2144-
}
2145-
2146-
peerID, err := api.handleResumeToken(ctx, rw, r)
2147-
if err != nil {
2148-
// handleResumeToken has already written the response.
2149-
return
2150-
}
2151-
2152-
api.WebsocketWaitMutex.Lock()
2153-
api.WebsocketWaitGroup.Add(1)
2154-
api.WebsocketWaitMutex.Unlock()
2155-
defer api.WebsocketWaitGroup.Done()
2156-
2157-
conn, err := websocket.Accept(rw, r, nil)
2158-
if err != nil {
2159-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2160-
Message: "Failed to accept websocket.",
2161-
Detail: err.Error(),
2162-
})
2163-
return
2164-
}
2165-
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
2166-
defer wsNetConn.Close()
2167-
defer conn.Close(websocket.StatusNormalClosure, "")
2168-
2169-
go httpapi.Heartbeat(ctx, conn)
2170-
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
2171-
PeerID: peerID,
2172-
UserID: owner.ID,
2173-
Subject: &ownerRoles,
2174-
Authz: api.Authorizer,
2175-
Database: api.Database,
2176-
})
2177-
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
2178-
_ = conn.Close(websocket.StatusInternalError, err.Error())
2179-
return
2180-
}
2181-
}

0 commit comments

Comments
 (0)