Skip to content

Commit 4fa1833

Browse files
committed
feat: add WorkspaceUpdates rpc
1 parent 8be52ed commit 4fa1833

19 files changed

+1812
-237
lines changed

cli/server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
728728
options.Database = dbmetrics.NewDBMetrics(options.Database, options.Logger, options.PrometheusRegistry)
729729
}
730730

731+
wsUpdates, err := coderd.NewUpdatesProvider(logger.Named("workspace_updates"), options.Database, options.Pubsub)
732+
if err != nil {
733+
return xerrors.Errorf("create workspace updates provider: %w", err)
734+
}
735+
options.WorkspaceUpdatesProvider = wsUpdates
736+
defer wsUpdates.Stop()
737+
731738
var deploymentID string
732739
err = options.Database.InTx(func(tx database.Store) error {
733740
// This will block until the lock is acquired, and will be

coderd/apidoc/docs.go

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ type Options struct {
227227

228228
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
229229

230+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
231+
230232
// This janky function is used in telemetry to parse fields out of the raw
231233
// JWT. It needs to be passed through like this because license parsing is
232234
// under the enterprise license, and can't be imported into AGPL.
@@ -652,12 +654,13 @@ func New(options *Options) *API {
652654
panic("CoordinatorResumeTokenProvider is nil")
653655
}
654656
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
655-
Logger: api.Logger.Named("tailnetclient"),
656-
CoordPtr: &api.TailnetCoordinator,
657-
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
658-
DERPMapFn: api.DERPMap,
659-
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
660-
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
657+
Logger: api.Logger.Named("tailnetclient"),
658+
CoordPtr: &api.TailnetCoordinator,
659+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
660+
DERPMapFn: api.DERPMap,
661+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
662+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
663+
WorkspaceUpdatesProvider: api.Options.WorkspaceUpdatesProvider,
661664
})
662665
if err != nil {
663666
api.Logger.Fatal(context.Background(), "failed to initialize tailnet client service", slog.Error(err))
@@ -1070,6 +1073,7 @@ func New(options *Options) *API {
10701073
r.Route("/roles", func(r chi.Router) {
10711074
r.Get("/", api.AssignableSiteRoles)
10721075
})
1076+
r.Get("/me/tailnet", api.tailnet)
10731077
r.Route("/{user}", func(r chi.Router) {
10741078
r.Use(httpmw.ExtractUserParam(options.Database))
10751079
r.Post("/convert-login", api.postConvertLoginType)

coderd/coderdtest/coderdtest.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,12 @@ type Options struct {
159159
DatabaseRolluper *dbrollup.Rolluper
160160
WorkspaceUsageTrackerFlush chan int
161161
WorkspaceUsageTrackerTick chan time.Time
162-
NotificationsEnqueuer notifications.Enqueuer
163162
APIKeyEncryptionCache cryptokeys.EncryptionKeycache
164163
OIDCConvertKeyCache cryptokeys.SigningKeycache
165164
Clock quartz.Clock
165+
NotificationsEnqueuer notifications.Enqueuer
166+
167+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
166168
}
167169

168170
// New constructs a codersdk client connected to an in-memory API instance.
@@ -254,6 +256,13 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
254256
options.NotificationsEnqueuer = new(testutil.FakeNotificationsEnqueuer)
255257
}
256258

259+
if options.WorkspaceUpdatesProvider == nil {
260+
var err error
261+
options.WorkspaceUpdatesProvider, err = coderd.NewUpdatesProvider(options.Logger.Named("workspace_updates"), options.Database, options.Pubsub)
262+
require.NoError(t, err)
263+
t.Cleanup(options.WorkspaceUpdatesProvider.Stop)
264+
}
265+
257266
accessControlStore := &atomic.Pointer[dbauthz.AccessControlStore]{}
258267
var acs dbauthz.AccessControlStore = dbauthz.AGPLTemplateAccessControlStore{}
259268
accessControlStore.Store(&acs)
@@ -531,6 +540,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
531540
HealthcheckTimeout: options.HealthcheckTimeout,
532541
HealthcheckRefresh: options.HealthcheckRefresh,
533542
StatsBatcher: options.StatsBatcher,
543+
WorkspaceUpdatesProvider: options.WorkspaceUpdatesProvider,
534544
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
535545
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
536546
NewTicker: options.NewTicker,

coderd/workspaceagents.go

Lines changed: 97 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -844,31 +844,10 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
844844
return
845845
}
846846

847-
// Accept a resume_token query parameter to use the same peer ID.
848-
var (
849-
peerID = uuid.New()
850-
resumeToken = r.URL.Query().Get("resume_token")
851-
)
852-
if resumeToken != "" {
853-
var err error
854-
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken)
855-
// If the token is missing the key ID, it's probably an old token in which
856-
// case we just want to generate a new peer ID.
857-
if xerrors.Is(err, jwtutils.ErrMissingKeyID) {
858-
peerID = uuid.New()
859-
} else if err != nil {
860-
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
861-
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
862-
Detail: err.Error(),
863-
Validations: []codersdk.ValidationError{
864-
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
865-
},
866-
})
867-
return
868-
} else {
869-
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
870-
slog.F("peer_id", peerID.String()))
871-
}
847+
peerID, err := api.handleResumeToken(ctx, rw, r)
848+
if err != nil {
849+
// handleResumeToken has already written the response.
850+
return
872851
}
873852

874853
api.WebsocketWaitMutex.Lock()
@@ -898,6 +877,33 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
898877
}
899878
}
900879

880+
// handleResumeToken accepts a resume_token query parameter to use the same peer ID
881+
func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) {
882+
peerID = uuid.New()
883+
resumeToken := r.URL.Query().Get("resume_token")
884+
if resumeToken != "" {
885+
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken)
886+
// If the token is missing the key ID, it's probably an old token in which
887+
// case we just want to generate a new peer ID.
888+
if xerrors.Is(err, jwtutils.ErrMissingKeyID) {
889+
peerID = uuid.New()
890+
} else if err != nil {
891+
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
892+
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
893+
Detail: err.Error(),
894+
Validations: []codersdk.ValidationError{
895+
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
896+
},
897+
})
898+
return
899+
} else {
900+
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
901+
slog.F("peer_id", peerID.String()))
902+
}
903+
}
904+
return peerID, err
905+
}
906+
901907
// @Summary Post workspace agent log source
902908
// @ID post-workspace-agent-log-source
903909
// @Security CoderSessionToken
@@ -1469,6 +1475,72 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R
14691475
}
14701476
}
14711477

1478+
// @Summary Coordinate multiple workspace agents
1479+
// @ID coordinate-multiple-workspace-agents
1480+
// @Security CoderSessionToken
1481+
// @Tags Agents
1482+
// @Success 101
1483+
// @Router /users/me/tailnet [get]
1484+
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
1485+
ctx := r.Context()
1486+
apiKey, ok := httpmw.APIKeyOptional(r)
1487+
if !ok {
1488+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1489+
Message: "Cannot use \"me\" without a valid session.",
1490+
})
1491+
return
1492+
}
1493+
1494+
version := "2.0"
1495+
qv := r.URL.Query().Get("version")
1496+
if qv != "" {
1497+
version = qv
1498+
}
1499+
if err := proto.CurrentVersion.Validate(version); err != nil {
1500+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1501+
Message: "Unknown or unsupported API version",
1502+
Validations: []codersdk.ValidationError{
1503+
{Field: "version", Detail: err.Error()},
1504+
},
1505+
})
1506+
return
1507+
}
1508+
1509+
peerID, err := api.handleResumeToken(ctx, rw, r)
1510+
if err != nil {
1511+
// handleResumeToken has already written the response.
1512+
return
1513+
}
1514+
1515+
api.WebsocketWaitMutex.Lock()
1516+
api.WebsocketWaitGroup.Add(1)
1517+
api.WebsocketWaitMutex.Unlock()
1518+
defer api.WebsocketWaitGroup.Done()
1519+
1520+
conn, err := websocket.Accept(rw, r, nil)
1521+
if err != nil {
1522+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
1523+
Message: "Failed to accept websocket.",
1524+
Detail: err.Error(),
1525+
})
1526+
return
1527+
}
1528+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
1529+
defer wsNetConn.Close()
1530+
defer conn.Close(websocket.StatusNormalClosure, "")
1531+
1532+
go httpapi.Heartbeat(ctx, conn)
1533+
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
1534+
PeerID: peerID,
1535+
UserID: apiKey.UserID,
1536+
UpdatesProvider: api.WorkspaceUpdatesProvider,
1537+
})
1538+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
1539+
_ = conn.Close(websocket.StatusInternalError, err.Error())
1540+
return
1541+
}
1542+
}
1543+
14721544
// createExternalAuthResponse creates an ExternalAuthResponse based on the
14731545
// provider type. This is to support legacy `/workspaceagents/me/gitauth`
14741546
// which uses `Username` and `Password`.

0 commit comments

Comments
 (0)