Skip to content

Commit 585bc1d

Browse files
committed
Add flags for HA
1 parent 289e139 commit 585bc1d

File tree

15 files changed

+190
-28
lines changed

15 files changed

+190
-28
lines changed

cli/config/file.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ func (r Root) Session() File {
1313
return File(filepath.Join(string(r), "session"))
1414
}
1515

16+
// ReplicaID is a unique identifier for the Coder server.
17+
func (r Root) ReplicaID() File {
18+
return File(filepath.Join(string(r), "replica_id"))
19+
}
20+
1621
func (r Root) URL() File {
1722
return File(filepath.Join(string(r), "url"))
1823
}

cli/deployment/flags.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ func Flags() *codersdk.DeploymentFlags {
8585
Description: "Addresses for STUN servers to establish P2P connections. Set empty to disable P2P connections.",
8686
Default: []string{"stun.l.google.com:19302"},
8787
},
88+
DerpServerRelayAddress: &codersdk.StringFlag{
89+
Name: "DERP Server Relay Address",
90+
Flag: "derp-server-relay-address",
91+
EnvVar: "CODER_DERP_SERVER_RELAY_ADDRESS",
92+
Description: "An HTTP address that is accessible by other replicas to relay DERP traffic. Required for high availability.",
93+
Enterprise: true,
94+
},
8895
DerpConfigURL: &codersdk.StringFlag{
8996
Name: "DERP Config URL",
9097
Flag: "derp-config-url",
@@ -123,6 +130,14 @@ func Flags() *codersdk.DeploymentFlags {
123130
Description: "The bind address to serve pprof.",
124131
Default: "127.0.0.1:6060",
125132
},
133+
HighAvailability: &codersdk.BoolFlag{
134+
Name: "High Availability",
135+
Flag: "high-availability",
136+
EnvVar: "CODER_HIGH_AVAILABILITY",
137+
Description: "Specifies whether high availability is enabled.",
138+
Default: true,
139+
Enterprise: true,
140+
},
126141
CacheDir: &codersdk.StringFlag{
127142
Name: "Cache Directory",
128143
Flag: "cache-dir",

cli/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func Core() []*cobra.Command {
100100
}
101101

102102
func AGPL() []*cobra.Command {
103-
all := append(Core(), Server(deployment.Flags(), func(_ context.Context, o *coderd.Options) (*coderd.API, error) {
103+
all := append(Core(), Server(deployment.Flags(), func(_ context.Context, _ config.Root, o *coderd.Options) (*coderd.API, error) {
104104
return coderd.New(o), nil
105105
}))
106106
return all

cli/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ import (
6767
)
6868

6969
// nolint:gocyclo
70-
func Server(dflags *codersdk.DeploymentFlags, newAPI func(context.Context, *coderd.Options) (*coderd.API, error)) *cobra.Command {
70+
func Server(dflags *codersdk.DeploymentFlags, newAPI func(context.Context, config.Root, *coderd.Options) (*coderd.API, error)) *cobra.Command {
7171
root := &cobra.Command{
7272
Use: "server",
7373
Short: "Start a Coder server",
@@ -463,7 +463,7 @@ func Server(dflags *codersdk.DeploymentFlags, newAPI func(context.Context, *code
463463
), dflags.PromAddress.Value, "prometheus")()
464464
}
465465

466-
coderAPI, err := newAPI(ctx, options)
466+
coderAPI, err := newAPI(ctx, config, options)
467467
if err != nil {
468468
return err
469469
}

coderd/coderd.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type Options struct {
7777
AutoImportTemplates []AutoImportTemplate
7878

7979
TailnetCoordinator tailnet.Coordinator
80+
DERPServer *derp.Server
8081
DERPMap *tailcfg.DERPMap
8182

8283
MetricsCacheRefreshInterval time.Duration
@@ -121,6 +122,9 @@ func New(options *Options) *API {
121122
if options.TailnetCoordinator == nil {
122123
options.TailnetCoordinator = tailnet.NewCoordinator()
123124
}
125+
if options.DERPServer == nil {
126+
options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger))
127+
}
124128
if options.Auditor == nil {
125129
options.Auditor = audit.NewNop()
126130
}
@@ -160,7 +164,6 @@ func New(options *Options) *API {
160164
api.WorkspaceQuotaEnforcer.Store(&options.WorkspaceQuotaEnforcer)
161165
api.workspaceAgentCache = wsconncache.New(api.dialWorkspaceAgentTailnet, 0)
162166
api.TailnetCoordinator.Store(&options.TailnetCoordinator)
163-
api.derpServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger))
164167
oauthConfigs := &httpmw.OAuth2Configs{
165168
Github: options.GithubOAuth2Config,
166169
OIDC: options.OIDCConfig,
@@ -228,7 +231,7 @@ func New(options *Options) *API {
228231
r.Route("/%40{user}/{workspace_and_agent}/apps/{workspaceapp}", apps)
229232
r.Route("/@{user}/{workspace_and_agent}/apps/{workspaceapp}", apps)
230233
r.Route("/derp", func(r chi.Router) {
231-
r.Get("/", derphttp.Handler(api.derpServer).ServeHTTP)
234+
r.Get("/", derphttp.Handler(api.DERPServer).ServeHTTP)
232235
// This is used when UDP is blocked, and latency must be checked via HTTP(s).
233236
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
234237
w.WriteHeader(http.StatusOK)
@@ -540,7 +543,6 @@ type API struct {
540543
// RootHandler serves "/"
541544
RootHandler chi.Router
542545

543-
derpServer *derp.Server
544546
metricsCache *metricscache.Cache
545547
siteHandler http.Handler
546548
websocketWaitMutex sync.Mutex

coderd/coderdtest/coderdtest.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ type Options struct {
8181
MetricsCacheRefreshInterval time.Duration
8282
AgentStatsRefreshInterval time.Duration
8383
DeploymentFlags *codersdk.DeploymentFlags
84+
85+
// Overriding the database is heavily discouraged.
86+
// It should only be used in cases where multiple Coder
87+
// test instances are running against the same database.
88+
Database database.Store
89+
Pubsub database.Pubsub
8490
}
8591

8692
// New constructs a codersdk client connected to an in-memory API instance.
@@ -135,13 +141,14 @@ func NewOptions(t *testing.T, options *Options) (*httptest.Server, context.Cance
135141
close(options.AutobuildStats)
136142
})
137143
}
138-
139-
db, pubsub := dbtestutil.NewDB(t)
144+
if options.Database == nil {
145+
options.Database, options.Pubsub = dbtestutil.NewDB(t)
146+
}
140147

141148
ctx, cancelFunc := context.WithCancel(context.Background())
142149
lifecycleExecutor := executor.New(
143150
ctx,
144-
db,
151+
options.Database,
145152
slogtest.Make(t, nil).Named("autobuild.executor").Leveled(slog.LevelDebug),
146153
options.AutobuildTicker,
147154
).WithStatsChannel(options.AutobuildStats)
@@ -181,8 +188,8 @@ func NewOptions(t *testing.T, options *Options) (*httptest.Server, context.Cance
181188
AppHostname: options.AppHostname,
182189
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
183190
CacheDir: t.TempDir(),
184-
Database: db,
185-
Pubsub: pubsub,
191+
Database: options.Database,
192+
Pubsub: options.Pubsub,
186193

187194
Auditor: options.Auditor,
188195
AWSCertificates: options.AWSCertificates,

codersdk/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type DeploymentFlags struct {
1919
DerpServerRegionCode *StringFlag `json:"derp_server_region_code" typescript:",notnull"`
2020
DerpServerRegionName *StringFlag `json:"derp_server_region_name" typescript:",notnull"`
2121
DerpServerSTUNAddresses *StringArrayFlag `json:"derp_server_stun_address" typescript:",notnull"`
22+
DerpServerRelayAddress *StringFlag `json:"derp_server_relay_address" typescript:",notnull"`
2223
DerpConfigURL *StringFlag `json:"derp_config_url" typescript:",notnull"`
2324
DerpConfigPath *StringFlag `json:"derp_config_path" typescript:",notnull"`
2425
PromEnabled *BoolFlag `json:"prom_enabled" typescript:",notnull"`
@@ -59,6 +60,7 @@ type DeploymentFlags struct {
5960
Verbose *BoolFlag `json:"verbose" typescript:",notnull"`
6061
AuditLogging *BoolFlag `json:"audit_logging" typescript:",notnull"`
6162
BrowserOnly *BoolFlag `json:"browser_only" typescript:",notnull"`
63+
HighAvailability *BoolFlag `json:"high_availability" typescript:",notnull"`
6264
SCIMAuthHeader *StringFlag `json:"scim_auth_header" typescript:",notnull"`
6365
UserWorkspaceQuota *IntFlag `json:"user_workspace_quota" typescript:",notnull"`
6466
}

codersdk/replicas.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package codersdk
2+
3+
import (
4+
"time"
5+
6+
"github.com/google/uuid"
7+
)
8+
9+
type Replica struct {
10+
// ID is the unique identifier for the replica.
11+
ID uuid.UUID `json:"id"`
12+
// Hostname is the hostname of the replica.
13+
Hostname string `json:"hostname"`
14+
// CreatedAt is when the replica was first seen.
15+
CreatedAt time.Time `json:"created_at"`
16+
// Active determines whether the replica is online.
17+
Active bool `json:"active"`
18+
// RelayAddress is the accessible address to relay DERP connections.
19+
RelayAddress string `json:"relay_address"`
20+
// Error is the error.
21+
Error string `json:"error"`
22+
}

enterprise/cli/server.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@ package cli
33
import (
44
"context"
55

6+
"github.com/google/uuid"
67
"github.com/spf13/cobra"
78

9+
"cdr.dev/slog"
10+
11+
"github.com/coder/coder/cli/config"
812
"github.com/coder/coder/cli/deployment"
913
"github.com/coder/coder/enterprise/coderd"
1014

@@ -14,14 +18,29 @@ import (
1418

1519
func server() *cobra.Command {
1620
dflags := deployment.Flags()
17-
cmd := agpl.Server(dflags, func(ctx context.Context, options *agplcoderd.Options) (*agplcoderd.API, error) {
21+
cmd := agpl.Server(dflags, func(ctx context.Context, cfg config.Root, options *agplcoderd.Options) (*agplcoderd.API, error) {
22+
replicaIDRaw, err := cfg.ReplicaID().Read()
23+
if err != nil {
24+
replicaIDRaw = uuid.NewString()
25+
}
26+
replicaID, err := uuid.Parse(replicaIDRaw)
27+
if err != nil {
28+
options.Logger.Warn(ctx, "failed to parse replica id", slog.Error(err), slog.F("replica_id", replicaIDRaw))
29+
replicaID = uuid.New()
30+
}
1831
o := &coderd.Options{
1932
AuditLogging: dflags.AuditLogging.Value,
2033
BrowserOnly: dflags.BrowserOnly.Value,
2134
SCIMAPIKey: []byte(dflags.SCIMAuthHeader.Value),
2235
UserWorkspaceQuota: dflags.UserWorkspaceQuota.Value,
23-
RBACEnabled: true,
24-
Options: options,
36+
RBAC: true,
37+
HighAvailability: dflags.HighAvailability.Value,
38+
39+
ReplicaID: replicaID,
40+
DERPServerRelayAddress: dflags.DerpServerRelayAddress.Value,
41+
DERPServerRegionID: dflags.DerpServerRegionID.Value,
42+
43+
Options: options,
2544
}
2645
api, err := coderd.New(ctx, o)
2746
if err != nil {

enterprise/coderd/coderd.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/cenkalti/backoff/v4"
1313
"github.com/go-chi/chi/v5"
14+
"github.com/google/uuid"
1415

1516
"cdr.dev/slog"
1617
"github.com/coder/coder/coderd"
@@ -24,6 +25,8 @@ import (
2425
"github.com/coder/coder/enterprise/audit/backends"
2526
"github.com/coder/coder/enterprise/coderd/license"
2627
"github.com/coder/coder/enterprise/highavailability"
28+
"github.com/coder/coder/enterprise/highavailability/derpmesh"
29+
"github.com/coder/coder/enterprise/highavailability/replica"
2730
"github.com/coder/coder/tailnet"
2831
)
2932

@@ -43,6 +46,7 @@ func New(ctx context.Context, options *Options) (*API, error) {
4346
Options: options,
4447
cancelEntitlementsLoop: cancelFunc,
4548
}
49+
4650
oauthConfigs := &httpmw.OAuth2Configs{
4751
Github: options.GithubOAuth2Config,
4852
OIDC: options.OIDCConfig,
@@ -113,7 +117,27 @@ func New(ctx context.Context, options *Options) (*API, error) {
113117
})
114118
}
115119

116-
err := api.updateEntitlements(ctx)
120+
// If high availability is disabled and multiple replicas appear, show an error.
121+
// If high availability is enabled and the built-in DERP is but the DERP relay isn't set, show an error.
122+
// We need to block meshing if high availability is disabled, because the meshing code would just work.
123+
// SetAddresses([]string{})
124+
125+
api.AGPL.RootHandler.Route("/replicas", func(r chi.Router) {
126+
127+
})
128+
129+
var err error
130+
api.replica, err = replica.New(ctx, options.Logger, options.Database, options.Pubsub, replica.Options{
131+
ID: options.ReplicaID,
132+
RelayAddress: options.DERPServerRelayAddress,
133+
RegionID: int32(options.DERPServerRegionID),
134+
})
135+
if err != nil {
136+
return nil, xerrors.Errorf("initialize replica: %w", err)
137+
}
138+
api.derpMesh = derpmesh.New(options.Logger, api.DERPServer)
139+
140+
err = api.updateEntitlements(ctx)
117141
if err != nil {
118142
return nil, xerrors.Errorf("update entitlements: %w", err)
119143
}
@@ -125,12 +149,18 @@ func New(ctx context.Context, options *Options) (*API, error) {
125149
type Options struct {
126150
*coderd.Options
127151

128-
RBACEnabled bool
152+
RBAC bool
129153
AuditLogging bool
130154
// Whether to block non-browser connections.
131155
BrowserOnly bool
132156
SCIMAPIKey []byte
133157
UserWorkspaceQuota int
158+
HighAvailability bool
159+
160+
// Used for high availability.
161+
DERPServerRelayAddress string
162+
DERPServerRegionID int
163+
ReplicaID uuid.UUID
134164

135165
EntitlementsUpdateInterval time.Duration
136166
Keys map[string]ed25519.PublicKey
@@ -140,13 +170,20 @@ type API struct {
140170
AGPL *coderd.API
141171
*Options
142172

173+
// Detects multiple Coder replicas running at the same time.
174+
replica *replica.Server
175+
// Meshes DERP connections from multiple replicas.
176+
derpMesh *derpmesh.Mesh
177+
143178
cancelEntitlementsLoop func()
144179
entitlementsMu sync.RWMutex
145180
entitlements codersdk.Entitlements
146181
}
147182

148183
func (api *API) Close() error {
149184
api.cancelEntitlementsLoop()
185+
_ = api.replica.Close()
186+
_ = api.derpMesh.Close()
150187
return api.AGPL.Close()
151188
}
152189

@@ -155,11 +192,12 @@ func (api *API) updateEntitlements(ctx context.Context) error {
155192
defer api.entitlementsMu.Unlock()
156193

157194
entitlements, err := license.Entitlements(ctx, api.Database, api.Logger, api.Keys, map[string]bool{
158-
codersdk.FeatureAuditLog: api.AuditLogging,
159-
codersdk.FeatureBrowserOnly: api.BrowserOnly,
160-
codersdk.FeatureSCIM: len(api.SCIMAPIKey) != 0,
161-
codersdk.FeatureWorkspaceQuota: api.UserWorkspaceQuota != 0,
162-
codersdk.FeatureTemplateRBAC: api.RBACEnabled,
195+
codersdk.FeatureAuditLog: api.AuditLogging,
196+
codersdk.FeatureBrowserOnly: api.BrowserOnly,
197+
codersdk.FeatureSCIM: len(api.SCIMAPIKey) != 0,
198+
codersdk.FeatureWorkspaceQuota: api.UserWorkspaceQuota != 0,
199+
codersdk.FeatureHighAvailability: api.HighAvailability,
200+
codersdk.FeatureTemplateRBAC: api.RBAC,
163201
})
164202
if err != nil {
165203
return err
@@ -210,13 +248,23 @@ func (api *API) updateEntitlements(ctx context.Context) error {
210248
if enabled {
211249
haCoordinator, err := highavailability.NewCoordinator(api.Logger, api.Pubsub)
212250
if err != nil {
213-
api.Logger.Error(ctx, "unable to setup HA tailnet coordinator", slog.Error(err))
251+
api.Logger.Error(ctx, "unable to set up high availability coordinator", slog.Error(err))
214252
// If we try to setup the HA coordinator and it fails, nothing
215253
// is actually changing.
216254
changed = false
217255
} else {
218256
coordinator = haCoordinator
219257
}
258+
259+
api.replica.SetCallback(func() {
260+
addresses := make([]string, 0)
261+
for _, replica := range api.replica.Regional() {
262+
addresses = append(addresses, replica.RelayAddress)
263+
}
264+
api.derpMesh.SetAddresses(addresses)
265+
})
266+
} else {
267+
api.derpMesh.SetAddresses([]string{})
220268
}
221269

222270
// Recheck changed in case the HA coordinator failed to set up.

0 commit comments

Comments
 (0)