Skip to content

Commit b1298a3

Browse files
feat: add WorkspaceUpdates tailnet RPC (#14847)
Closes #14716 Closes #14717 Adds a new user-scoped tailnet API endpoint (`api/v2/tailnet`) with a new RPC stream for receiving updates on workspaces owned by a specific user, as defined in #14716. When a stream is started, the `WorkspaceUpdatesProvider` will begin listening on the user-scoped pubsub events implemented in #14964. When a relevant event type is seen (such as a workspace state transition), the provider will query the DB for all the workspaces (and agents) owned by the user. This gets compared against the result of the previous query to produce a set of workspace updates. Workspace updates can be requested for any user ID, however only workspaces the authorised user is permitted to `ActionRead` will have their updates streamed. Opening a tunnel to an agent requires that the user can perform `ActionSSH` against the workspace containing it.
1 parent f941e78 commit b1298a3

25 files changed

+2220
-271
lines changed

coderd/apidoc/docs.go

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,8 @@ func New(options *Options) *API {
493493
}
494494
}
495495

496+
updatesProvider := NewUpdatesProvider(options.Logger.Named("workspace_updates"), options.Pubsub, options.Database, options.Authorizer)
497+
496498
// Start a background process that rotates keys. We intentionally start this after the caches
497499
// are created to force initial requests for a key to populate the caches. This helps catch
498500
// bugs that may only occur when a key isn't precached in tests and the latency cost is minimal.
@@ -523,6 +525,7 @@ func New(options *Options) *API {
523525
metricsCache: metricsCache,
524526
Auditor: atomic.Pointer[audit.Auditor]{},
525527
TailnetCoordinator: atomic.Pointer[tailnet.Coordinator]{},
528+
UpdatesProvider: updatesProvider,
526529
TemplateScheduleStore: options.TemplateScheduleStore,
527530
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
528531
AccessControlStore: options.AccessControlStore,
@@ -652,12 +655,13 @@ func New(options *Options) *API {
652655
panic("CoordinatorResumeTokenProvider is nil")
653656
}
654657
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
655-
Logger: api.Logger.Named("tailnetclient"),
656-
CoordPtr: &api.TailnetCoordinator,
657-
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
658-
DERPMapFn: api.DERPMap,
659-
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
660-
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
658+
Logger: api.Logger.Named("tailnetclient"),
659+
CoordPtr: &api.TailnetCoordinator,
660+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
661+
DERPMapFn: api.DERPMap,
662+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
663+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
664+
WorkspaceUpdatesProvider: api.UpdatesProvider,
661665
})
662666
if err != nil {
663667
api.Logger.Fatal(context.Background(), "failed to initialize tailnet client service", slog.Error(err))
@@ -1327,6 +1331,10 @@ func New(options *Options) *API {
13271331
})
13281332
r.Get("/dispatch-methods", api.notificationDispatchMethods)
13291333
})
1334+
r.Route("/tailnet", func(r chi.Router) {
1335+
r.Use(apiKeyMiddleware)
1336+
r.Get("/", api.tailnetRPCConn)
1337+
})
13301338
})
13311339

13321340
if options.SwaggerEndpoint {
@@ -1408,6 +1416,8 @@ type API struct {
14081416
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
14091417
PortSharer atomic.Pointer[portsharing.PortSharer]
14101418

1419+
UpdatesProvider tailnet.WorkspaceUpdatesProvider
1420+
14111421
HTTPAuth *HTTPAuthorizer
14121422

14131423
// APIHandler serves "/api/v2"
@@ -1489,6 +1499,7 @@ func (api *API) Close() error {
14891499
_ = api.OIDCConvertKeyCache.Close()
14901500
_ = api.AppSigningKeyCache.Close()
14911501
_ = api.AppEncryptionKeyCache.Close()
1502+
_ = api.UpdatesProvider.Close()
14921503
return nil
14931504
}
14941505

coderd/database/dbfake/dbfake.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,14 @@ func (b WorkspaceBuildBuilder) Do() WorkspaceResponse {
224224
}
225225
_ = dbgen.WorkspaceBuildParameters(b.t, b.db, b.params)
226226

227+
if b.ws.Deleted {
228+
err = b.db.UpdateWorkspaceDeletedByID(ownerCtx, database.UpdateWorkspaceDeletedByIDParams{
229+
ID: b.ws.ID,
230+
Deleted: true,
231+
})
232+
require.NoError(b.t, err)
233+
}
234+
227235
if b.ps != nil {
228236
msg, err := json.Marshal(wspubsub.WorkspaceEvent{
229237
Kind: wspubsub.WorkspaceEventKindStateChange,

coderd/workspaceagents.go

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/coder/coder/v2/coderd/httpapi"
3434
"github.com/coder/coder/v2/coderd/httpmw"
3535
"github.com/coder/coder/v2/coderd/jwtutils"
36+
"github.com/coder/coder/v2/coderd/rbac"
3637
"github.com/coder/coder/v2/coderd/rbac/policy"
3738
"github.com/coder/coder/v2/coderd/wspubsub"
3839
"github.com/coder/coder/v2/codersdk"
@@ -844,31 +845,10 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
844845
return
845846
}
846847

847-
// Accept a resume_token query parameter to use the same peer ID.
848-
var (
849-
peerID = uuid.New()
850-
resumeToken = r.URL.Query().Get("resume_token")
851-
)
852-
if resumeToken != "" {
853-
var err error
854-
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken)
855-
// If the token is missing the key ID, it's probably an old token in which
856-
// case we just want to generate a new peer ID.
857-
if xerrors.Is(err, jwtutils.ErrMissingKeyID) {
858-
peerID = uuid.New()
859-
} else if err != nil {
860-
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
861-
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
862-
Detail: err.Error(),
863-
Validations: []codersdk.ValidationError{
864-
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
865-
},
866-
})
867-
return
868-
} else {
869-
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
870-
slog.F("peer_id", peerID.String()))
871-
}
848+
peerID, err := api.handleResumeToken(ctx, rw, r)
849+
if err != nil {
850+
// handleResumeToken has already written the response.
851+
return
872852
}
873853

874854
api.WebsocketWaitMutex.Lock()
@@ -891,13 +871,47 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
891871
go httpapi.Heartbeat(ctx, conn)
892872

893873
defer conn.Close(websocket.StatusNormalClosure, "")
894-
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, peerID, workspaceAgent.ID)
874+
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{
875+
Name: "client",
876+
ID: peerID,
877+
Auth: tailnet.ClientCoordinateeAuth{
878+
AgentID: workspaceAgent.ID,
879+
},
880+
})
895881
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
896882
_ = conn.Close(websocket.StatusInternalError, err.Error())
897883
return
898884
}
899885
}
900886

887+
// handleResumeToken accepts a resume_token query parameter to use the same peer ID
888+
func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) {
889+
peerID = uuid.New()
890+
resumeToken := r.URL.Query().Get("resume_token")
891+
if resumeToken != "" {
892+
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken)
893+
// If the token is missing the key ID, it's probably an old token in which
894+
// case we just want to generate a new peer ID.
895+
if xerrors.Is(err, jwtutils.ErrMissingKeyID) {
896+
peerID = uuid.New()
897+
err = nil
898+
} else if err != nil {
899+
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
900+
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
901+
Detail: err.Error(),
902+
Validations: []codersdk.ValidationError{
903+
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
904+
},
905+
})
906+
return peerID, err
907+
} else {
908+
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
909+
slog.F("peer_id", peerID.String()))
910+
}
911+
}
912+
return peerID, err
913+
}
914+
901915
// @Summary Post workspace agent log source
902916
// @ID post-workspace-agent-log-source
903917
// @Security CoderSessionToken
@@ -1469,6 +1483,80 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R
14691483
}
14701484
}
14711485

1486+
// @Summary User-scoped tailnet RPC connection
1487+
// @ID user-scoped-tailnet-rpc-connection
1488+
// @Security CoderSessionToken
1489+
// @Tags Agents
1490+
// @Success 101
1491+
// @Router /tailnet [get]
1492+
func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) {
1493+
ctx := r.Context()
1494+
1495+
version := "2.0"
1496+
qv := r.URL.Query().Get("version")
1497+
if qv != "" {
1498+
version = qv
1499+
}
1500+
if err := proto.CurrentVersion.Validate(version); err != nil {
1501+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1502+
Message: "Unknown or unsupported API version",
1503+
Validations: []codersdk.ValidationError{
1504+
{Field: "version", Detail: err.Error()},
1505+
},
1506+
})
1507+
return
1508+
}
1509+
1510+
peerID, err := api.handleResumeToken(ctx, rw, r)
1511+
if err != nil {
1512+
// handleResumeToken has already written the response.
1513+
return
1514+
}
1515+
1516+
// Used to authorize tunnel request
1517+
sshPrep, err := api.HTTPAuth.AuthorizeSQLFilter(r, policy.ActionSSH, rbac.ResourceWorkspace.Type)
1518+
if err != nil {
1519+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1520+
Message: "Internal error preparing sql filter.",
1521+
Detail: err.Error(),
1522+
})
1523+
return
1524+
}
1525+
1526+
api.WebsocketWaitMutex.Lock()
1527+
api.WebsocketWaitGroup.Add(1)
1528+
api.WebsocketWaitMutex.Unlock()
1529+
defer api.WebsocketWaitGroup.Done()
1530+
1531+
conn, err := websocket.Accept(rw, r, nil)
1532+
if err != nil {
1533+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1534+
Message: "Failed to accept websocket.",
1535+
Detail: err.Error(),
1536+
})
1537+
return
1538+
}
1539+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
1540+
defer wsNetConn.Close()
1541+
defer conn.Close(websocket.StatusNormalClosure, "")
1542+
1543+
go httpapi.Heartbeat(ctx, conn)
1544+
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{
1545+
Name: "client",
1546+
ID: peerID,
1547+
Auth: tailnet.ClientUserCoordinateeAuth{
1548+
Auth: &rbacAuthorizer{
1549+
sshPrep: sshPrep,
1550+
db: api.Database,
1551+
},
1552+
},
1553+
})
1554+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
1555+
_ = conn.Close(websocket.StatusInternalError, err.Error())
1556+
return
1557+
}
1558+
}
1559+
14721560
// createExternalAuthResponse creates an ExternalAuthResponse based on the
14731561
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
14741562
// which uses `Username` and `Password`.

0 commit comments

Comments
 (0)