Skip to content

feat: HA tailnet coordinator #4170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
if metadata.DERPMap == nil {
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
}
coordinator := tailnet.NewCoordinator()
coordinator := tailnet.NewMemoryCoordinator()
agentID := uuid.New()
statsCh := make(chan *codersdk.AgentStats)
closer := agent.New(agent.Options{
Expand Down
6 changes: 4 additions & 2 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Options struct {
TracerProvider trace.TracerProvider
AutoImportTemplates []AutoImportTemplate

TailnetCoordinator *tailnet.Coordinator
TailnetCoordinator tailnet.Coordinator
DERPMap *tailcfg.DERPMap

MetricsCacheRefreshInterval time.Duration
Expand Down Expand Up @@ -119,7 +119,7 @@ func New(options *Options) *API {
options.PrometheusRegistry = prometheus.NewRegistry()
}
if options.TailnetCoordinator == nil {
options.TailnetCoordinator = tailnet.NewCoordinator()
options.TailnetCoordinator = tailnet.NewMemoryCoordinator()
}
if options.Auditor == nil {
options.Auditor = audit.NewNop()
Expand Down Expand Up @@ -159,6 +159,7 @@ func New(options *Options) *API {
api.Auditor.Store(&options.Auditor)
api.WorkspaceQuotaEnforcer.Store(&options.WorkspaceQuotaEnforcer)
api.workspaceAgentCache = wsconncache.New(api.dialWorkspaceAgentTailnet, 0)
api.TailnetCoordinator.Store(&options.TailnetCoordinator)
api.derpServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger))
oauthConfigs := &httpmw.OAuth2Configs{
Github: options.GithubOAuth2Config,
Expand Down Expand Up @@ -531,6 +532,7 @@ type API struct {
Auditor atomic.Pointer[audit.Auditor]
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter) bool]
WorkspaceQuotaEnforcer atomic.Pointer[workspacequota.Enforcer]
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
HTTPAuth *HTTPAuthorizer

// APIHandler serves "/api/v2"
Expand Down
3 changes: 2 additions & 1 deletion coderd/database/pubsub_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func (m *memoryPubsub) Publish(event string, message []byte) error {
return nil
}
for _, listener := range listeners {
listener(context.Background(), message)
go listener(context.Background(), message)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (api *API) provisionerJobResources(rw http.ResponseWriter, r *http.Request,
}
}

apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, agent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), agent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading job agent.",
Expand Down
18 changes: 9 additions & 9 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (api *API) workspaceAgent(rw http.ResponseWriter, r *http.Request) {
})
return
}
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, convertApps(dbApps), api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -78,7 +78,7 @@ func (api *API) workspaceAgentApps(rw http.ResponseWriter, r *http.Request) {
func (api *API) workspaceAgentMetadata(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspaceAgent := httpmw.WorkspaceAgent(r)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand All @@ -98,7 +98,7 @@ func (api *API) workspaceAgentMetadata(rw http.ResponseWriter, r *http.Request)
func (api *API) postWorkspaceAgentVersion(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspaceAgent := httpmw.WorkspaceAgent(r)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -152,7 +152,7 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
httpapi.ResourceNotFound(rw)
return
}
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -229,7 +229,7 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req
return
}

apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Expand Down Expand Up @@ -376,7 +376,7 @@ func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*
})
conn.SetNodeCallback(sendNodes)
go func() {
err := api.TailnetCoordinator.ServeClient(serverConn, uuid.New(), agentID)
err := (*api.TailnetCoordinator.Load()).ServeClient(serverConn, uuid.New(), agentID)
if err != nil {
_ = conn.Close()
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
closeChan := make(chan struct{})
go func() {
defer close(closeChan)
err := api.TailnetCoordinator.ServeAgent(wsNetConn, workspaceAgent.ID)
err := (*api.TailnetCoordinator.Load()).ServeAgent(wsNetConn, workspaceAgent.ID)
if err != nil {
_ = conn.Close(websocket.StatusInternalError, err.Error())
return
Expand Down Expand Up @@ -583,7 +583,7 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
go httpapi.Heartbeat(ctx, conn)

defer conn.Close(websocket.StatusNormalClosure, "")
err = api.TailnetCoordinator.ServeClient(websocket.NetConn(ctx, conn, websocket.MessageBinary), uuid.New(), workspaceAgent.ID)
err = (*api.TailnetCoordinator.Load()).ServeClient(websocket.NetConn(ctx, conn, websocket.MessageBinary), uuid.New(), workspaceAgent.ID)
if err != nil {
_ = conn.Close(websocket.StatusInternalError, err.Error())
return
Expand All @@ -610,7 +610,7 @@ func convertApps(dbApps []database.WorkspaceApp) []codersdk.WorkspaceApp {
return apps
}

func convertWorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator *tailnet.Coordinator, dbAgent database.WorkspaceAgent, apps []codersdk.WorkspaceApp, agentInactiveDisconnectTimeout time.Duration) (codersdk.WorkspaceAgent, error) {
func convertWorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator tailnet.Coordinator, dbAgent database.WorkspaceAgent, apps []codersdk.WorkspaceApp, agentInactiveDisconnectTimeout time.Duration) (codersdk.WorkspaceAgent, error) {
var envs map[string]string
if dbAgent.EnvironmentVariables.Valid {
err := json.Unmarshal(dbAgent.EnvironmentVariables.RawMessage, &envs)
Expand Down
2 changes: 1 addition & 1 deletion coderd/workspacebuilds.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func (api *API) convertWorkspaceBuild(
apiAgents := make([]codersdk.WorkspaceAgent, 0)
for _, agent := range agents {
apps := appsByAgentID[agent.ID]
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, agent, convertApps(apps), api.AgentInactiveDisconnectTimeout)
apiAgent, err := convertWorkspaceAgent(api.DERPMap, *api.TailnetCoordinator.Load(), agent, convertApps(apps), api.AgentInactiveDisconnectTimeout)
if err != nil {
return codersdk.WorkspaceBuild{}, xerrors.Errorf("converting workspace agent: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion coderd/wsconncache/wsconncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestCache(t *testing.T) {
func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeout time.Duration) *codersdk.AgentConn {
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)

coordinator := tailnet.NewCoordinator()
coordinator := tailnet.NewMemoryCoordinator()
agentID := uuid.New()
closer := agent.New(agent.Options{
FetchMetadata: func(ctx context.Context) (codersdk.WorkspaceAgentMetadata, error) {
Expand Down
14 changes: 8 additions & 6 deletions codersdk/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ const (
)

const (
FeatureUserLimit = "user_limit"
FeatureAuditLog = "audit_log"
FeatureBrowserOnly = "browser_only"
FeatureSCIM = "scim"
FeatureWorkspaceQuota = "workspace_quota"
FeatureTemplateRBAC = "template_rbac"
FeatureUserLimit = "user_limit"
FeatureAuditLog = "audit_log"
FeatureBrowserOnly = "browser_only"
FeatureSCIM = "scim"
FeatureWorkspaceQuota = "workspace_quota"
FeatureTemplateRBAC = "template_rbac"
FeatureHighAvailability = "high_availability"
)

var FeatureNames = []string{
Expand All @@ -30,6 +31,7 @@ var FeatureNames = []string{
FeatureSCIM,
FeatureWorkspaceQuota,
FeatureTemplateRBAC,
FeatureHighAvailability,
}

type Feature struct {
Expand Down
1 change: 0 additions & 1 deletion codersdk/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"tailscale.com/tailcfg"

"cdr.dev/slog"

"github.com/coder/coder/tailnet"
"github.com/coder/retry"
)
Expand Down
2 changes: 2 additions & 0 deletions enterprise/cli/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func TestFeaturesList(t *testing.T) {
entitlements.Features[codersdk.FeatureTemplateRBAC].Entitlement)
assert.Equal(t, codersdk.EntitlementNotEntitled,
entitlements.Features[codersdk.FeatureSCIM].Entitlement)
assert.Equal(t, codersdk.EntitlementNotEntitled,
entitlements.Features[codersdk.FeatureHighAvailability].Entitlement)
assert.False(t, entitlements.HasLicense)
assert.False(t, entitlements.Experimental)
})
Expand Down
2 changes: 1 addition & 1 deletion enterprise/coderd/authorize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCheckACLPermissions(t *testing.T) {
// Create adminClient, member, and org adminClient
adminUser := coderdtest.CreateFirstUser(t, adminClient)
_ = coderdenttest.AddLicense(t, adminClient, coderdenttest.LicenseOptions{
TemplateRBACEnabled: true,
TemplateRBAC: true,
})

memberClient := coderdtest.CreateAnotherUser(t, adminClient, adminUser.OrganizationID)
Expand Down
26 changes: 26 additions & 0 deletions enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/coder/coder/enterprise/audit"
"github.com/coder/coder/enterprise/audit/backends"
"github.com/coder/coder/enterprise/coderd/license"
"github.com/coder/coder/enterprise/tailnet"
agpltailnet "github.com/coder/coder/tailnet"
)

// New constructs an Enterprise coderd API instance.
Expand Down Expand Up @@ -203,6 +205,30 @@ func (api *API) updateEntitlements(ctx context.Context) error {
api.AGPL.WorkspaceQuotaEnforcer.Store(&enforcer)
}

if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
coordinator := agpltailnet.NewMemoryCoordinator()
if enabled {
haCoordinator, err := tailnet.NewHACoordinator(api.Logger, api.Pubsub)
if err != nil {
api.Logger.Error(ctx, "unable to setup HA tailnet coordinator", slog.Error(err))
// If we try to setup the HA coordinator and it fails, nothing
// is actually changing.
changed = false
} else {
coordinator = haCoordinator
}
}

// Recheck changed in case the HA coordinator failed to set up.
if changed {
oldCoordinator := *api.AGPL.TailnetCoordinator.Swap(&coordinator)
err := oldCoordinator.Close()
if err != nil {
api.Logger.Error(ctx, "close old tailnet coordinator", slog.Error(err))
}
}
}

api.entitlements = entitlements

return nil
Expand Down
6 changes: 3 additions & 3 deletions enterprise/coderd/coderd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestEntitlements(t *testing.T) {
})
_ = coderdtest.CreateFirstUser(t, client)
coderdenttest.AddLicense(t, client, coderdenttest.LicenseOptions{
UserLimit: 100,
AuditLog: true,
TemplateRBACEnabled: true,
UserLimit: 100,
AuditLog: true,
TemplateRBAC: true,
})
res, err := client.Entitlements(context.Background())
require.NoError(t, err)
Expand Down
44 changes: 25 additions & 19 deletions enterprise/coderd/coderdenttest/coderdenttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ func NewWithAPI(t *testing.T, options *Options) (*codersdk.Client, io.Closer, *c
}

type LicenseOptions struct {
AccountType string
AccountID string
Trial bool
AllFeatures bool
GraceAt time.Time
ExpiresAt time.Time
UserLimit int64
AuditLog bool
BrowserOnly bool
SCIM bool
WorkspaceQuota bool
TemplateRBACEnabled bool
AccountType string
AccountID string
Trial bool
AllFeatures bool
GraceAt time.Time
ExpiresAt time.Time
UserLimit int64
AuditLog bool
BrowserOnly bool
SCIM bool
WorkspaceQuota bool
TemplateRBAC bool
HighAvailability bool
}

// AddLicense generates a new license with the options provided and inserts it.
Expand Down Expand Up @@ -134,9 +135,13 @@ func GenerateLicense(t *testing.T, options LicenseOptions) string {
if options.WorkspaceQuota {
workspaceQuota = 1
}
highAvailability := int64(0)
if options.HighAvailability {
highAvailability = 1
}

rbacEnabled := int64(0)
if options.TemplateRBACEnabled {
if options.TemplateRBAC {
rbacEnabled = 1
}

Expand All @@ -154,12 +159,13 @@ func GenerateLicense(t *testing.T, options LicenseOptions) string {
Version: license.CurrentVersion,
AllFeatures: options.AllFeatures,
Features: license.Features{
UserLimit: options.UserLimit,
AuditLog: auditLog,
BrowserOnly: browserOnly,
SCIM: scim,
WorkspaceQuota: workspaceQuota,
TemplateRBAC: rbacEnabled,
UserLimit: options.UserLimit,
AuditLog: auditLog,
BrowserOnly: browserOnly,
SCIM: scim,
WorkspaceQuota: workspaceQuota,
HighAvailability: highAvailability,
TemplateRBAC: rbacEnabled,
},
}
tok := jwt.NewWithClaims(jwt.SigningMethodEdDSA, c)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/coderd/coderdenttest/coderdenttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestAuthorizeAllEndpoints(t *testing.T) {
ctx, _ := testutil.Context(t)
admin := coderdtest.CreateFirstUser(t, client)
license := coderdenttest.AddLicense(t, client, coderdenttest.LicenseOptions{
TemplateRBACEnabled: true,
TemplateRBAC: true,
})
group, err := client.CreateGroup(ctx, admin.OrganizationID, codersdk.CreateGroupRequest{
Name: "testgroup",
Expand Down
Loading