Skip to content

Commit 19da14b

Browse files
committed
move core impl to coderd
1 parent 22ae813 commit 19da14b

File tree

13 files changed

+251
-212
lines changed

13 files changed

+251
-212
lines changed

cli/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
719719
options.Database = dbmetrics.New(options.Database, options.PrometheusRegistry)
720720
}
721721

722+
wsUpdates, err := coderd.NewUpdatesProvider(ctx, options.Database, options.Pubsub)
723+
if err != nil {
724+
return xerrors.Errorf("create workspace updates provider: %w", err)
725+
}
726+
options.WorkspaceUpdatesProvider = wsUpdates
727+
defer wsUpdates.Stop()
728+
722729
var deploymentID string
723730
err = options.Database.InTx(func(tx database.Store) error {
724731
// This will block until the lock is acquired, and will be

coderd/apidoc/docs.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ type Options struct {
228228

229229
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
230230

231+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
232+
231233
// This janky function is used in telemetry to parse fields out of the raw
232234
// JWT. It needs to be passed through like this because license parsing is
233235
// under the enterprise license, and can't be imported into AGPL.
@@ -591,12 +593,13 @@ func New(options *Options) *API {
591593
panic("CoordinatorResumeTokenProvider is nil")
592594
}
593595
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
594-
Logger: api.Logger.Named("tailnetclient"),
595-
CoordPtr: &api.TailnetCoordinator,
596-
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
597-
DERPMapFn: api.DERPMap,
598-
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
599-
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
596+
Logger: api.Logger.Named("tailnetclient"),
597+
CoordPtr: &api.TailnetCoordinator,
598+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
599+
DERPMapFn: api.DERPMap,
600+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
601+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
602+
WorkspaceUpdatesProvider: api.Options.WorkspaceUpdatesProvider,
600603
})
601604
if err != nil {
602605
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))

coderd/coderdtest/coderdtest.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ type Options struct {
159159
WorkspaceUsageTrackerFlush chan int
160160
WorkspaceUsageTrackerTick chan time.Time
161161

162+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
163+
162164
NotificationsEnqueuer notifications.Enqueuer
163165
}
164166

@@ -251,6 +253,15 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
251253
options.NotificationsEnqueuer = new(testutil.FakeNotificationsEnqueuer)
252254
}
253255

256+
if options.WorkspaceUpdatesProvider == nil {
257+
var err error
258+
ctx, cancel := context.WithCancel(context.Background())
259+
options.WorkspaceUpdatesProvider, err = coderd.NewUpdatesProvider(ctx, options.Database, options.Pubsub)
260+
require.NoError(t, err)
261+
t.Cleanup(cancel)
262+
t.Cleanup(options.WorkspaceUpdatesProvider.Stop)
263+
}
264+
254265
accessControlStore := &atomic.Pointer[dbauthz.AccessControlStore]{}
255266
var acs dbauthz.AccessControlStore = dbauthz.AGPLTemplateAccessControlStore{}
256267
accessControlStore.Store(&acs)
@@ -524,6 +535,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
524535
HealthcheckTimeout: options.HealthcheckTimeout,
525536
HealthcheckRefresh: options.HealthcheckRefresh,
526537
StatsBatcher: options.StatsBatcher,
538+
WorkspaceUpdatesProvider: options.WorkspaceUpdatesProvider,
527539
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
528540
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
529541
NewTicker: options.NewTicker,

coderd/workspaceagents.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/coder/coder/v2/coderd/externalauth"
3333
"github.com/coder/coder/v2/coderd/httpapi"
3434
"github.com/coder/coder/v2/coderd/httpmw"
35+
"github.com/coder/coder/v2/coderd/rbac"
3536
"github.com/coder/coder/v2/coderd/rbac/policy"
3637
"github.com/coder/coder/v2/codersdk"
3738
"github.com/coder/coder/v2/codersdk/agentsdk"
@@ -1472,6 +1473,89 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R
14721473
}
14731474
}
14741475

1476+
// @Summary Coordinate multiple workspace agents
1477+
// @ID coordinate-multiple-workspace-agents
1478+
// @Security CoderSessionToken
1479+
// @Tags Agents
1480+
// @Success 101
1481+
// @Router /users/me/tailnet [get]
1482+
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
1483+
ctx := r.Context()
1484+
owner := httpmw.UserParam(r)
1485+
ownerRoles := httpmw.UserAuthorization(r)
1486+
1487+
// Check if the actor is allowed to access any workspace owned by the user.
1488+
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
1489+
httpapi.ResourceNotFound(rw)
1490+
return
1491+
}
1492+
1493+
version := "1.0"
1494+
qv := r.URL.Query().Get("version")
1495+
if qv != "" {
1496+
version = qv
1497+
}
1498+
if err := proto.CurrentVersion.Validate(version); err != nil {
1499+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1500+
Message: "Unknown or unsupported API version",
1501+
Validations: []codersdk.ValidationError{
1502+
{Field: "version", Detail: err.Error()},
1503+
},
1504+
})
1505+
return
1506+
}
1507+
1508+
peerID, err := api.handleResumeToken(ctx, rw, r)
1509+
if err != nil {
1510+
// handleResumeToken has already written the response.
1511+
return
1512+
}
1513+
1514+
api.WebsocketWaitMutex.Lock()
1515+
api.WebsocketWaitGroup.Add(1)
1516+
api.WebsocketWaitMutex.Unlock()
1517+
defer api.WebsocketWaitGroup.Done()
1518+
1519+
conn, err := websocket.Accept(rw, r, nil)
1520+
if err != nil {
1521+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1522+
Message: "Failed to accept websocket.",
1523+
Detail: err.Error(),
1524+
})
1525+
return
1526+
}
1527+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
1528+
defer wsNetConn.Close()
1529+
defer conn.Close(websocket.StatusNormalClosure, "")
1530+
1531+
go httpapi.Heartbeat(ctx, conn)
1532+
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
1533+
PeerID: peerID,
1534+
UserID: owner.ID,
1535+
AuthFn: authAgentFn(api.Database, api.Authorizer, &ownerRoles),
1536+
})
1537+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
1538+
_ = conn.Close(websocket.StatusInternalError, err.Error())
1539+
return
1540+
}
1541+
}
1542+
1543+
// authAgentFn accepts a subject, and returns a closure that authorizes against
1544+
// passed agent IDs.
1545+
func authAgentFn(db database.Store, auth rbac.Authorizer, user *rbac.Subject) func(context.Context, uuid.UUID) error {
1546+
return func(ctx context.Context, agentID uuid.UUID) error {
1547+
ws, err := db.GetWorkspaceByAgentID(ctx, agentID)
1548+
if err != nil {
1549+
return xerrors.Errorf("get workspace by agent id: %w", err)
1550+
}
1551+
err = auth.Authorize(ctx, *user, policy.ActionSSH, ws.RBACObject())
1552+
if err != nil {
1553+
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
1554+
}
1555+
return nil
1556+
}
1557+
}
1558+
14751559
// createExternalAuthResponse creates an ExternalAuthResponse based on the
14761560
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
14771561
// which uses `Username` and `Password`.

coderd/workspaces.go

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"io"
109
"net/http"
1110
"slices"
1211
"strconv"
@@ -16,7 +15,6 @@ import (
1615
"github.com/go-chi/chi/v5"
1716
"github.com/google/uuid"
1817
"golang.org/x/xerrors"
19-
"nhooyr.io/websocket"
2018

2119
"cdr.dev/slog"
2220
"github.com/coder/coder/v2/agent/proto"
@@ -38,7 +36,6 @@ import (
3836
"github.com/coder/coder/v2/coderd/wsbuilder"
3937
"github.com/coder/coder/v2/codersdk"
4038
"github.com/coder/coder/v2/codersdk/agentsdk"
41-
"github.com/coder/coder/v2/tailnet"
4239
)
4340

4441
var (
@@ -2088,72 +2085,3 @@ func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAg
20882085
api.Logger.Warn(ctx, "failed to publish workspace agent logs update", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
20892086
}
20902087
}
2091-
2092-
// @Summary Coordinate multiple workspace agents
2093-
// @ID coordinate-multiple-workspace-agents
2094-
// @Security CoderSessionToken
2095-
// @Tags Workspaces
2096-
// @Success 101
2097-
// @Router /users/me/tailnet [get]
2098-
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
2099-
ctx := r.Context()
2100-
owner := httpmw.UserParam(r)
2101-
ownerRoles := httpmw.UserAuthorization(r)
2102-
2103-
// Check if the actor is allowed to access any workspace owned by the user.
2104-
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
2105-
httpapi.ResourceNotFound(rw)
2106-
return
2107-
}
2108-
2109-
version := "1.0"
2110-
qv := r.URL.Query().Get("version")
2111-
if qv != "" {
2112-
version = qv
2113-
}
2114-
if err := proto.CurrentVersion.Validate(version); err != nil {
2115-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2116-
Message: "Unknown or unsupported API version",
2117-
Validations: []codersdk.ValidationError{
2118-
{Field: "version", Detail: err.Error()},
2119-
},
2120-
})
2121-
return
2122-
}
2123-
2124-
peerID, err := api.handleResumeToken(ctx, rw, r)
2125-
if err != nil {
2126-
// handleResumeToken has already written the response.
2127-
return
2128-
}
2129-
2130-
api.WebsocketWaitMutex.Lock()
2131-
api.WebsocketWaitGroup.Add(1)
2132-
api.WebsocketWaitMutex.Unlock()
2133-
defer api.WebsocketWaitGroup.Done()
2134-
2135-
conn, err := websocket.Accept(rw, r, nil)
2136-
if err != nil {
2137-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2138-
Message: "Failed to accept websocket.",
2139-
Detail: err.Error(),
2140-
})
2141-
return
2142-
}
2143-
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
2144-
defer wsNetConn.Close()
2145-
defer conn.Close(websocket.StatusNormalClosure, "")
2146-
2147-
go httpapi.Heartbeat(ctx, conn)
2148-
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
2149-
PeerID: peerID,
2150-
UserID: owner.ID,
2151-
Subject: &ownerRoles,
2152-
Authz: api.Authorizer,
2153-
Database: api.Database,
2154-
})
2155-
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
2156-
_ = conn.Close(websocket.StatusInternalError, err.Error())
2157-
return
2158-
}
2159-
}

0 commit comments

Comments
 (0)