Skip to content

Commit fc01091

Browse files
committed
feat: add WorkspaceUpdates rpc
1 parent aaf8e86 commit fc01091

21 files changed

+1478
-232
lines changed

coderd/apidoc/docs.go

+19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

+17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

+1
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,7 @@ func New(options *Options) *API {
10091009
r.Route("/roles", func(r chi.Router) {
10101010
r.Get("/", api.AssignableSiteRoles)
10111011
})
1012+
r.Get("/me/tailnet", api.tailnet)
10121013
r.Route("/{user}", func(r chi.Router) {
10131014
r.Use(httpmw.ExtractUserParam(options.Database))
10141015
r.Post("/convert-login", api.postConvertLoginType)

coderd/workspaceagents.go

+26-20
Original file line numberDiff line numberDiff line change
@@ -846,26 +846,10 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
846846
return
847847
}
848848

849-
// Accept a resume_token query parameter to use the same peer ID.
850-
var (
851-
peerID = uuid.New()
852-
resumeToken = r.URL.Query().Get("resume_token")
853-
)
854-
if resumeToken != "" {
855-
var err error
856-
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(resumeToken)
857-
if err != nil {
858-
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
859-
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
860-
Detail: err.Error(),
861-
Validations: []codersdk.ValidationError{
862-
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
863-
},
864-
})
865-
return
866-
}
867-
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
868-
slog.F("peer_id", peerID.String()))
849+
peerID, err := api.handleResumeToken(ctx, rw, r)
850+
if err != nil {
851+
// handleResumeToken has already written the response.
852+
return
869853
}
870854

871855
api.WebsocketWaitMutex.Lock()
@@ -895,6 +879,28 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
895879
}
896880
}
897881

882+
// handleResumeToken accepts a resume_token query parameter to use the same peer ID
883+
func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) {
884+
peerID = uuid.New()
885+
resumeToken := r.URL.Query().Get("resume_token")
886+
if resumeToken != "" {
887+
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(resumeToken)
888+
if err != nil {
889+
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
890+
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
891+
Detail: err.Error(),
892+
Validations: []codersdk.ValidationError{
893+
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
894+
},
895+
})
896+
return peerID, err
897+
}
898+
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
899+
slog.F("peer_id", peerID.String()))
900+
}
901+
return peerID, err
902+
}
903+
898904
// @Summary Post workspace agent log source
899905
// @ID post-workspace-agent-log-source
900906
// @Security CoderSessionToken

coderd/workspacebuilds.go

+1-35
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ func (api *API) convertWorkspaceBuild(
947947
MaxDeadline: codersdk.NewNullTime(build.MaxDeadline, !build.MaxDeadline.IsZero()),
948948
Reason: codersdk.BuildReason(build.Reason),
949949
Resources: apiResources,
950-
Status: convertWorkspaceStatus(apiJob.Status, transition),
950+
Status: codersdk.ConvertWorkspaceStatus(apiJob.Status, transition),
951951
DailyCost: build.DailyCost,
952952
}, nil
953953
}
@@ -976,37 +976,3 @@ func convertWorkspaceResource(resource database.WorkspaceResource, agents []code
976976
DailyCost: resource.DailyCost,
977977
}
978978
}
979-
980-
func convertWorkspaceStatus(jobStatus codersdk.ProvisionerJobStatus, transition codersdk.WorkspaceTransition) codersdk.WorkspaceStatus {
981-
switch jobStatus {
982-
case codersdk.ProvisionerJobPending:
983-
return codersdk.WorkspaceStatusPending
984-
case codersdk.ProvisionerJobRunning:
985-
switch transition {
986-
case codersdk.WorkspaceTransitionStart:
987-
return codersdk.WorkspaceStatusStarting
988-
case codersdk.WorkspaceTransitionStop:
989-
return codersdk.WorkspaceStatusStopping
990-
case codersdk.WorkspaceTransitionDelete:
991-
return codersdk.WorkspaceStatusDeleting
992-
}
993-
case codersdk.ProvisionerJobSucceeded:
994-
switch transition {
995-
case codersdk.WorkspaceTransitionStart:
996-
return codersdk.WorkspaceStatusRunning
997-
case codersdk.WorkspaceTransitionStop:
998-
return codersdk.WorkspaceStatusStopped
999-
case codersdk.WorkspaceTransitionDelete:
1000-
return codersdk.WorkspaceStatusDeleted
1001-
}
1002-
case codersdk.ProvisionerJobCanceling:
1003-
return codersdk.WorkspaceStatusCanceling
1004-
case codersdk.ProvisionerJobCanceled:
1005-
return codersdk.WorkspaceStatusCanceled
1006-
case codersdk.ProvisionerJobFailed:
1007-
return codersdk.WorkspaceStatusFailed
1008-
}
1009-
1010-
// return error status since we should never get here
1011-
return codersdk.WorkspaceStatusFailed
1012-
}

coderd/workspaces.go

+77
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"io"
910
"net/http"
1011
"slices"
1112
"strconv"
@@ -15,6 +16,7 @@ import (
1516
"github.com/go-chi/chi/v5"
1617
"github.com/google/uuid"
1718
"golang.org/x/xerrors"
19+
"nhooyr.io/websocket"
1820

1921
"cdr.dev/slog"
2022
"github.com/coder/coder/v2/agent/proto"
@@ -36,6 +38,7 @@ import (
3638
"github.com/coder/coder/v2/coderd/wsbuilder"
3739
"github.com/coder/coder/v2/codersdk"
3840
"github.com/coder/coder/v2/codersdk/agentsdk"
41+
"github.com/coder/coder/v2/tailnet"
3942
)
4043

4144
var (
@@ -2068,6 +2071,11 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUI
20682071
api.Logger.Warn(ctx, "failed to publish workspace update",
20692072
slog.F("workspace_id", workspaceID), slog.Error(err))
20702073
}
2074+
err = api.Pubsub.Publish(codersdk.AllWorkspacesNotifyChannel, []byte(workspaceID.String()))
2075+
if err != nil {
2076+
api.Logger.Warn(ctx, "failed to publish all workspaces update",
2077+
slog.F("workspace_id", workspaceID), slog.Error(err))
2078+
}
20712079
}
20722080

20732081
func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAgentID uuid.UUID, m agentsdk.LogsNotifyMessage) {
@@ -2080,3 +2088,72 @@ func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAg
20802088
api.Logger.Warn(ctx, "failed to publish workspace agent logs update", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
20812089
}
20822090
}
2091+
2092+
// @Summary Coordinate multiple workspace agents
2093+
// @ID coordinate-multiple-workspace-agents
2094+
// @Security CoderSessionToken
2095+
// @Tags Workspaces
2096+
// @Success 101
2097+
// @Router /users/me/tailnet [get]
2098+
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
2099+
ctx := r.Context()
2100+
owner := httpmw.UserParam(r)
2101+
ownerRoles := httpmw.UserAuthorization(r)
2102+
2103+
// Check if the actor is allowed to access any workspace owned by the user.
2104+
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
2105+
httpapi.ResourceNotFound(rw)
2106+
return
2107+
}
2108+
2109+
version := "1.0"
2110+
qv := r.URL.Query().Get("version")
2111+
if qv != "" {
2112+
version = qv
2113+
}
2114+
if err := proto.CurrentVersion.Validate(version); err != nil {
2115+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2116+
Message: "Unknown or unsupported API version",
2117+
Validations: []codersdk.ValidationError{
2118+
{Field: "version", Detail: err.Error()},
2119+
},
2120+
})
2121+
return
2122+
}
2123+
2124+
peerID, err := api.handleResumeToken(ctx, rw, r)
2125+
if err != nil {
2126+
// handleResumeToken has already written the response.
2127+
return
2128+
}
2129+
2130+
api.WebsocketWaitMutex.Lock()
2131+
api.WebsocketWaitGroup.Add(1)
2132+
api.WebsocketWaitMutex.Unlock()
2133+
defer api.WebsocketWaitGroup.Done()
2134+
2135+
conn, err := websocket.Accept(rw, r, nil)
2136+
if err != nil {
2137+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2138+
Message: "Failed to accept websocket.",
2139+
Detail: err.Error(),
2140+
})
2141+
return
2142+
}
2143+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
2144+
defer wsNetConn.Close()
2145+
defer conn.Close(websocket.StatusNormalClosure, "")
2146+
2147+
go httpapi.Heartbeat(ctx, conn)
2148+
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
2149+
PeerID: peerID,
2150+
UserID: owner.ID,
2151+
Subject: &ownerRoles,
2152+
Authz: api.Authorizer,
2153+
Database: api.Database,
2154+
})
2155+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
2156+
_ = conn.Close(websocket.StatusInternalError, err.Error())
2157+
return
2158+
}
2159+
}

codersdk/provisionerdaemons.go

+34
Original file line numberDiff line numberDiff line change
@@ -402,3 +402,37 @@ func (c *Client) DeleteProvisionerKey(ctx context.Context, organizationID uuid.U
402402
}
403403
return nil
404404
}
405+
406+
func ConvertWorkspaceStatus(jobStatus ProvisionerJobStatus, transition WorkspaceTransition) WorkspaceStatus {
407+
switch jobStatus {
408+
case ProvisionerJobPending:
409+
return WorkspaceStatusPending
410+
case ProvisionerJobRunning:
411+
switch transition {
412+
case WorkspaceTransitionStart:
413+
return WorkspaceStatusStarting
414+
case WorkspaceTransitionStop:
415+
return WorkspaceStatusStopping
416+
case WorkspaceTransitionDelete:
417+
return WorkspaceStatusDeleting
418+
}
419+
case ProvisionerJobSucceeded:
420+
switch transition {
421+
case WorkspaceTransitionStart:
422+
return WorkspaceStatusRunning
423+
case WorkspaceTransitionStop:
424+
return WorkspaceStatusStopped
425+
case WorkspaceTransitionDelete:
426+
return WorkspaceStatusDeleted
427+
}
428+
case ProvisionerJobCanceling:
429+
return WorkspaceStatusCanceling
430+
case ProvisionerJobCanceled:
431+
return WorkspaceStatusCanceled
432+
case ProvisionerJobFailed:
433+
return WorkspaceStatusFailed
434+
}
435+
436+
// return error status since we should never get here
437+
return WorkspaceStatusFailed
438+
}

codersdk/workspaces.go

+2
Original file line numberDiff line numberDiff line change
@@ -661,3 +661,5 @@ func (c *Client) WorkspaceTimings(ctx context.Context, id uuid.UUID) (WorkspaceT
661661
func WorkspaceNotifyChannel(id uuid.UUID) string {
662662
return fmt.Sprintf("workspace:%s", id)
663663
}
664+
665+
const AllWorkspacesNotifyChannel = "all-workspaces"

codersdk/workspacesdk/connector_internal_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,11 @@ func (f *fakeDRPCClient) RefreshResumeToken(_ context.Context, _ *proto.RefreshR
571571
}, nil
572572
}
573573

574+
// WorkspaceUpdates implements proto.DRPCTailnetClient.
575+
func (*fakeDRPCClient) WorkspaceUpdates(context.Context, *proto.WorkspaceUpdatesRequest) (proto.DRPCTailnet_WorkspaceUpdatesClient, error) {
576+
panic("unimplemented")
577+
}
578+
574579
type fakeDRPCConn struct{}
575580

576581
var _ drpc.Conn = &fakeDRPCConn{}

docs/reference/api/workspaces.md

+20
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

enterprise/tailnet/connio.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ var errDisconnect = xerrors.New("graceful disconnect")
133133

134134
func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
135135
c.logger.Debug(c.peerCtx, "got request")
136-
err := c.auth.Authorize(req)
136+
err := c.auth.Authorize(c.coordCtx, req)
137137
if err != nil {
138138
c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err))
139139
return xerrors.Errorf("authorize request: %w", err)

0 commit comments

Comments
 (0)