Skip to content

Commit ddc493e

Browse files
committed
feat: add WorkspaceUpdates rpc
1 parent 22985f7 commit ddc493e

18 files changed

+1422
-232
lines changed

coderd/coderd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,7 @@ func New(options *Options) *API {
10091009
r.Route("/roles", func(r chi.Router) {
10101010
r.Get("/", api.AssignableSiteRoles)
10111011
})
1012+
r.Get("/me/tailnet", api.tailnet)
10121013
r.Route("/{user}", func(r chi.Router) {
10131014
r.Use(httpmw.ExtractUserParam(options.Database))
10141015
r.Post("/convert-login", api.postConvertLoginType)

coderd/workspaceagents.go

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -846,26 +846,10 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
846846
return
847847
}
848848

849-
// Accept a resume_token query parameter to use the same peer ID.
850-
var (
851-
peerID = uuid.New()
852-
resumeToken = r.URL.Query().Get("resume_token")
853-
)
854-
if resumeToken != "" {
855-
var err error
856-
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(resumeToken)
857-
if err != nil {
858-
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
859-
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
860-
Detail: err.Error(),
861-
Validations: []codersdk.ValidationError{
862-
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
863-
},
864-
})
865-
return
866-
}
867-
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
868-
slog.F("peer_id", peerID.String()))
849+
peerID, err := api.handleResumeToken(ctx, rw, r)
850+
if err != nil {
851+
// handleResumeToken has already written the response.
852+
return
869853
}
870854

871855
api.WebsocketWaitMutex.Lock()
@@ -895,6 +879,28 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
895879
}
896880
}
897881

882+
// handleResumeToken accepts a resume_token query parameter to use the same peer ID
883+
func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) {
884+
peerID = uuid.New()
885+
resumeToken := r.URL.Query().Get("resume_token")
886+
if resumeToken != "" {
887+
peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(resumeToken)
888+
if err != nil {
889+
httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{
890+
Message: workspacesdk.CoordinateAPIInvalidResumeToken,
891+
Detail: err.Error(),
892+
Validations: []codersdk.ValidationError{
893+
{Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken},
894+
},
895+
})
896+
return peerID, err
897+
}
898+
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
899+
slog.F("peer_id", peerID.String()))
900+
}
901+
return peerID, err
902+
}
903+
898904
// @Summary Post workspace agent log source
899905
// @ID post-workspace-agent-log-source
900906
// @Security CoderSessionToken

coderd/workspacebuilds.go

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ func (api *API) convertWorkspaceBuild(
947947
MaxDeadline: codersdk.NewNullTime(build.MaxDeadline, !build.MaxDeadline.IsZero()),
948948
Reason: codersdk.BuildReason(build.Reason),
949949
Resources: apiResources,
950-
Status: convertWorkspaceStatus(apiJob.Status, transition),
950+
Status: codersdk.ConvertWorkspaceStatus(apiJob.Status, transition),
951951
DailyCost: build.DailyCost,
952952
}, nil
953953
}
@@ -976,37 +976,3 @@ func convertWorkspaceResource(resource database.WorkspaceResource, agents []code
976976
DailyCost: resource.DailyCost,
977977
}
978978
}
979-
980-
func convertWorkspaceStatus(jobStatus codersdk.ProvisionerJobStatus, transition codersdk.WorkspaceTransition) codersdk.WorkspaceStatus {
981-
switch jobStatus {
982-
case codersdk.ProvisionerJobPending:
983-
return codersdk.WorkspaceStatusPending
984-
case codersdk.ProvisionerJobRunning:
985-
switch transition {
986-
case codersdk.WorkspaceTransitionStart:
987-
return codersdk.WorkspaceStatusStarting
988-
case codersdk.WorkspaceTransitionStop:
989-
return codersdk.WorkspaceStatusStopping
990-
case codersdk.WorkspaceTransitionDelete:
991-
return codersdk.WorkspaceStatusDeleting
992-
}
993-
case codersdk.ProvisionerJobSucceeded:
994-
switch transition {
995-
case codersdk.WorkspaceTransitionStart:
996-
return codersdk.WorkspaceStatusRunning
997-
case codersdk.WorkspaceTransitionStop:
998-
return codersdk.WorkspaceStatusStopped
999-
case codersdk.WorkspaceTransitionDelete:
1000-
return codersdk.WorkspaceStatusDeleted
1001-
}
1002-
case codersdk.ProvisionerJobCanceling:
1003-
return codersdk.WorkspaceStatusCanceling
1004-
case codersdk.ProvisionerJobCanceled:
1005-
return codersdk.WorkspaceStatusCanceled
1006-
case codersdk.ProvisionerJobFailed:
1007-
return codersdk.WorkspaceStatusFailed
1008-
}
1009-
1010-
// return error status since we should never get here
1011-
return codersdk.WorkspaceStatusFailed
1012-
}

coderd/workspaces.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"io"
910
"net/http"
1011
"slices"
1112
"strconv"
@@ -15,6 +16,7 @@ import (
1516
"github.com/go-chi/chi/v5"
1617
"github.com/google/uuid"
1718
"golang.org/x/xerrors"
19+
"nhooyr.io/websocket"
1820

1921
"cdr.dev/slog"
2022
"github.com/coder/coder/v2/agent/proto"
@@ -36,6 +38,7 @@ import (
3638
"github.com/coder/coder/v2/coderd/wsbuilder"
3739
"github.com/coder/coder/v2/codersdk"
3840
"github.com/coder/coder/v2/codersdk/agentsdk"
41+
"github.com/coder/coder/v2/tailnet"
3942
)
4043

4144
var (
@@ -2068,6 +2071,11 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUI
20682071
api.Logger.Warn(ctx, "failed to publish workspace update",
20692072
slog.F("workspace_id", workspaceID), slog.Error(err))
20702073
}
2074+
err = api.Pubsub.Publish(codersdk.AllWorkspacesNotifyChannel, []byte(workspaceID.String()))
2075+
if err != nil {
2076+
api.Logger.Warn(ctx, "failed to publish all workspaces update",
2077+
slog.F("workspace_id", workspaceID), slog.Error(err))
2078+
}
20712079
}
20722080

20732081
func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAgentID uuid.UUID, m agentsdk.LogsNotifyMessage) {
@@ -2080,3 +2088,72 @@ func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAg
20802088
api.Logger.Warn(ctx, "failed to publish workspace agent logs update", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
20812089
}
20822090
}
2091+
2092+
// @Summary Coordinate multiple workspace agents
2093+
// @ID coordinate-multiple-workspace-agents
2094+
// @Security CoderSessionToken
2095+
// @Tags Workspaces
2096+
// @Success 101
2097+
// @Router /users/me/tailnet [get]
2098+
func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
2099+
ctx := r.Context()
2100+
owner := httpmw.UserParam(r)
2101+
ownerRoles := httpmw.UserAuthorization(r)
2102+
2103+
// Check if the actor is allowed to access any workspace owned by the user.
2104+
if !api.Authorize(r, policy.ActionSSH, rbac.ResourceWorkspace.WithOwner(owner.ID.String())) {
2105+
httpapi.ResourceNotFound(rw)
2106+
return
2107+
}
2108+
2109+
version := "1.0"
2110+
qv := r.URL.Query().Get("version")
2111+
if qv != "" {
2112+
version = qv
2113+
}
2114+
if err := proto.CurrentVersion.Validate(version); err != nil {
2115+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2116+
Message: "Unknown or unsupported API version",
2117+
Validations: []codersdk.ValidationError{
2118+
{Field: "version", Detail: err.Error()},
2119+
},
2120+
})
2121+
return
2122+
}
2123+
2124+
peerID, err := api.handleResumeToken(ctx, rw, r)
2125+
if err != nil {
2126+
// handleResumeToken has already written the response.
2127+
return
2128+
}
2129+
2130+
api.WebsocketWaitMutex.Lock()
2131+
api.WebsocketWaitGroup.Add(1)
2132+
api.WebsocketWaitMutex.Unlock()
2133+
defer api.WebsocketWaitGroup.Done()
2134+
2135+
conn, err := websocket.Accept(rw, r, nil)
2136+
if err != nil {
2137+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
2138+
Message: "Failed to accept websocket.",
2139+
Detail: err.Error(),
2140+
})
2141+
return
2142+
}
2143+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
2144+
defer wsNetConn.Close()
2145+
defer conn.Close(websocket.StatusNormalClosure, "")
2146+
2147+
go httpapi.Heartbeat(ctx, conn)
2148+
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
2149+
PeerID: peerID,
2150+
UserID: owner.ID,
2151+
Subject: &ownerRoles,
2152+
Authz: api.Authorizer,
2153+
Database: api.Database,
2154+
})
2155+
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
2156+
_ = conn.Close(websocket.StatusInternalError, err.Error())
2157+
return
2158+
}
2159+
}

codersdk/provisionerdaemons.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,3 +387,37 @@ func (c *Client) DeleteProvisionerKey(ctx context.Context, organizationID uuid.U
387387
}
388388
return nil
389389
}
390+
391+
func ConvertWorkspaceStatus(jobStatus ProvisionerJobStatus, transition WorkspaceTransition) WorkspaceStatus {
392+
switch jobStatus {
393+
case ProvisionerJobPending:
394+
return WorkspaceStatusPending
395+
case ProvisionerJobRunning:
396+
switch transition {
397+
case WorkspaceTransitionStart:
398+
return WorkspaceStatusStarting
399+
case WorkspaceTransitionStop:
400+
return WorkspaceStatusStopping
401+
case WorkspaceTransitionDelete:
402+
return WorkspaceStatusDeleting
403+
}
404+
case ProvisionerJobSucceeded:
405+
switch transition {
406+
case WorkspaceTransitionStart:
407+
return WorkspaceStatusRunning
408+
case WorkspaceTransitionStop:
409+
return WorkspaceStatusStopped
410+
case WorkspaceTransitionDelete:
411+
return WorkspaceStatusDeleted
412+
}
413+
case ProvisionerJobCanceling:
414+
return WorkspaceStatusCanceling
415+
case ProvisionerJobCanceled:
416+
return WorkspaceStatusCanceled
417+
case ProvisionerJobFailed:
418+
return WorkspaceStatusFailed
419+
}
420+
421+
// return error status since we should never get here
422+
return WorkspaceStatusFailed
423+
}

codersdk/workspaces.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,3 +661,5 @@ func (c *Client) WorkspaceTimings(ctx context.Context, id uuid.UUID) (WorkspaceT
661661
func WorkspaceNotifyChannel(id uuid.UUID) string {
662662
return fmt.Sprintf("workspace:%s", id)
663663
}
664+
665+
const AllWorkspacesNotifyChannel = "all-workspaces"

codersdk/workspacesdk/connector_internal_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,11 @@ func (f *fakeDRPCClient) RefreshResumeToken(_ context.Context, _ *proto.RefreshR
571571
}, nil
572572
}
573573

574+
// WorkspaceUpdates implements proto.DRPCTailnetClient.
575+
func (*fakeDRPCClient) WorkspaceUpdates(context.Context, *proto.WorkspaceUpdatesRequest) (proto.DRPCTailnet_WorkspaceUpdatesClient, error) {
576+
panic("unimplemented")
577+
}
578+
574579
type fakeDRPCConn struct{}
575580

576581
var _ drpc.Conn = &fakeDRPCConn{}

enterprise/tailnet/connio.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ var errDisconnect = xerrors.New("graceful disconnect")
133133

134134
func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
135135
c.logger.Debug(c.peerCtx, "got request")
136-
err := c.auth.Authorize(req)
136+
err := c.auth.Authorize(c.coordCtx, req)
137137
if err != nil {
138138
c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err))
139139
return xerrors.Errorf("authorize request: %w", err)

tailnet/convert.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"tailscale.com/tailcfg"
1010
"tailscale.com/types/key"
1111

12+
"github.com/coder/coder/v2/codersdk"
1213
"github.com/coder/coder/v2/tailnet/proto"
1314
)
1415

@@ -270,3 +271,30 @@ func DERPNodeFromProto(node *proto.DERPMap_Region_Node) *tailcfg.DERPNode {
270271
CanPort80: node.CanPort_80,
271272
}
272273
}
274+
275+
func WorkspaceStatusToProto(status codersdk.WorkspaceStatus) proto.Workspace_Status {
276+
switch status {
277+
case codersdk.WorkspaceStatusCanceled:
278+
return proto.Workspace_CANCELED
279+
case codersdk.WorkspaceStatusCanceling:
280+
return proto.Workspace_CANCELING
281+
case codersdk.WorkspaceStatusDeleted:
282+
return proto.Workspace_DELETED
283+
case codersdk.WorkspaceStatusDeleting:
284+
return proto.Workspace_DELETING
285+
case codersdk.WorkspaceStatusFailed:
286+
return proto.Workspace_FAILED
287+
case codersdk.WorkspaceStatusPending:
288+
return proto.Workspace_PENDING
289+
case codersdk.WorkspaceStatusRunning:
290+
return proto.Workspace_RUNNING
291+
case codersdk.WorkspaceStatusStarting:
292+
return proto.Workspace_STARTING
293+
case codersdk.WorkspaceStatusStopped:
294+
return proto.Workspace_STOPPED
295+
case codersdk.WorkspaceStatusStopping:
296+
return proto.Workspace_STOPPING
297+
default:
298+
return proto.Workspace_UNKNOWN
299+
}
300+
}

tailnet/coordinator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ func (c *core) handleRequest(p *peer, req *proto.CoordinateRequest) error {
577577
return ErrAlreadyRemoved
578578
}
579579

580-
if err := pr.auth.Authorize(req); err != nil {
580+
if err := pr.auth.Authorize(context.Background(), req); err != nil {
581581
return xerrors.Errorf("authorize request: %w", err)
582582
}
583583

0 commit comments

Comments
 (0)