Skip to content

feat: HA tailnet coordinator #4170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Prev Previous commit
Next Next commit
impelement high availability feature
  • Loading branch information
coadler committed Oct 7, 2022
commit 46803aa38ba2d4189f687bda248f01bf933bf18e
2 changes: 2 additions & 0 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func New(options *Options) *API {
api.Auditor.Store(&options.Auditor)
api.WorkspaceQuotaEnforcer.Store(&options.WorkspaceQuotaEnforcer)
api.workspaceAgentCache = wsconncache.New(api.dialWorkspaceAgentTailnet, 0)
api.TailnetCoordinator.Store(&options.TailnetCoordinator)
api.derpServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger))
oauthConfigs := &httpmw.OAuth2Configs{
Github: options.GithubOAuth2Config,
Expand Down Expand Up @@ -525,6 +526,7 @@ type API struct {
Auditor atomic.Pointer[audit.Auditor]
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter) bool]
WorkspaceQuotaEnforcer atomic.Pointer[workspacequota.Enforcer]
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
HTTPAuth *HTTPAuthorizer

// APIHandler serves "/api/v2"
Expand Down
2 changes: 1 addition & 1 deletion coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (api *API) provisionerJobResources(rw http.ResponseWriter, r *http.Request,
}
}

apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, agent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), agent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading job agent.",
Expand Down
16 changes: 8 additions & 8 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (api *API) workspaceAgent(rw http.ResponseWriter, r *http.Request) {
})
return
}
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -77,7 +77,7 @@ func (api *API) workspaceAgentApps(rw http.ResponseWriter, r *http.Request) {
func (api *API) workspaceAgentMetadata(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspaceAgent := httpmw.WorkspaceAgent(r)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand All @@ -97,7 +97,7 @@ func (api *API) workspaceAgentMetadata(rw http.ResponseWriter, r *http.Request)
func (api *API) postWorkspaceAgentVersion(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspaceAgent := httpmw.WorkspaceAgent(r)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -151,7 +151,7 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
httpapi.ResourceNotFound(rw)
return
}
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -228,7 +228,7 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req
return
}

apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -322,7 +322,7 @@ func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*
})
conn.SetNodeCallback(sendNodes)
go func() {
err := api.TailnetCoordinator.ServeClient(serverConn, uuid.New(), agentID)
err := (*api.TailnetCoordinator.Load()).ServeClient(serverConn, uuid.New(), agentID)
if err != nil {
_ = conn.Close()
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
closeChan := make(chan struct{})
go func() {
defer close(closeChan)
err := api.TailnetCoordinator.ServeAgent(wsNetConn, workspaceAgent.ID)
err := (*api.TailnetCoordinator.Load()).ServeAgent(wsNetConn, workspaceAgent.ID)
if err != nil {
_ = conn.Close(websocket.StatusInternalError, err.Error())
return
Expand Down Expand Up @@ -529,7 +529,7 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
go httpapi.Heartbeat(ctx, conn)

defer conn.Close(websocket.StatusNormalClosure, "")
err = api.TailnetCoordinator.ServeClient(websocket.NetConn(ctx, conn, websocket.MessageBinary), uuid.New(), workspaceAgent.ID)
err = (*api.TailnetCoordinator.Load()).ServeClient(websocket.NetConn(ctx, conn, websocket.MessageBinary), uuid.New(), workspaceAgent.ID)
if err != nil {
_ = conn.Close(websocket.StatusInternalError, err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion coderd/workspacebuilds.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (api *API) convertWorkspaceBuild(
apiAgents := make([]codersdk.WorkspaceAgent, 0)
for _, agent := range agents {
apps := appsByAgentID[agent.ID]
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, agent, convertApps(apps), api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), agent, convertApps(apps), api.AgentInactiveDisconnectTimeout)
if err != nil {
return codersdk.WorkspaceBuild{}, xerrors.Errorf("converting workspace agent: %w", err)
}
Expand Down
24 changes: 21 additions & 3 deletions enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/coder/coder/enterprise/audit"
"github.com/coder/coder/enterprise/audit/backends"
"github.com/coder/coder/enterprise/coderd/license"
"github.com/coder/coder/enterprise/tailnet"
agpltailnet "github.com/coder/coder/tailnet"
)

// New constructs an Enterprise coderd API instance.
Expand Down Expand Up @@ -171,11 +173,27 @@ func (api *API) updateEntitlements(ctx context.Context) error {
}

if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
enforcer := workspacequota.NewNop()
coordinator := agpltailnet.NewMemoryCoordinator()
if enabled {
enforcer = NewEnforcer(api.Options.UserWorkspaceQuota)
haCoordinator, err := tailnet.NewHACoordinator(api.Logger, api.Pubsub)
if err != nil {
api.Logger.Error(ctx, "unable to setup HA tailnet coordinator", slog.Error(err))
// If we try to setup the HA coordinator and it fails, nothing
// is actually changing.
changed = false
} else {
coordinator = haCoordinator
}
}

// Recheck changed in case the HA coordinator failed to set up.
if changed {
oldCoordinator := *api.AGPL.TailnetCoordinator.Swap(&coordinator)
err := oldCoordinator.Close()
if err != nil {
api.Logger.Error(ctx, "unable to setup HA tailnet coordinator", slog.Error(err))
}
}
api.AGPL.WorkspaceQuotaEnforcer.Store(&enforcer)
}

api.entitlements = entitlements
Expand Down