Skip to content

feat: Add high availability for multiple replicas #4555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 86 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
35b2fed
feat: HA tailnet coordinator
coadler Sep 22, 2022
68a812b
fixup! feat: HA tailnet coordinator
coadler Sep 23, 2022
774c5da
fixup! feat: HA tailnet coordinator
coadler Sep 23, 2022
bd82c5e
remove printlns
coadler Sep 23, 2022
02e079d
Merge branch 'main' into colin/pg-coordinate
coadler Oct 7, 2022
fbad8d0
close all connections on coordinator
coadler Oct 7, 2022
46803aa
impelement high availability feature
coadler Oct 7, 2022
d38391e
fixup! impelement high availability feature
coadler Oct 7, 2022
a0bcd64
fixup! impelement high availability feature
coadler Oct 7, 2022
1f33018
fixup! impelement high availability feature
coadler Oct 7, 2022
b6a5070
fixup! impelement high availability feature
coadler Oct 7, 2022
1883430
Add replicas
kylecarbs Oct 12, 2022
7dc968c
Add DERP meshing to arbitrary addresses
kylecarbs Oct 12, 2022
1dcf0d0
Move packages to highavailability folder
kylecarbs Oct 12, 2022
5c43d63
Merge branch 'main' into colin/pg-coordinate
kylecarbs Oct 12, 2022
4804269
Merge branch 'colin/pg-coordinate' into replica
kylecarbs Oct 12, 2022
289e139
Move coordinator to high availability package
kylecarbs Oct 12, 2022
585bc1d
Add flags for HA
kylecarbs Oct 12, 2022
fdb3557
Rename to replicasync
kylecarbs Oct 13, 2022
9124b00
Denest packages for replicas
kylecarbs Oct 13, 2022
d5555f6
Add test for multiple replicas
kylecarbs Oct 13, 2022
8dfc261
Fix coordination test
kylecarbs Oct 13, 2022
ff5968b
Add HA to the helm chart
kylecarbs Oct 13, 2022
557b390
Rename function pointer
kylecarbs Oct 13, 2022
186a5e2
Add warnings for HA
kylecarbs Oct 13, 2022
de5b13b
Add the ability to block endpoints
kylecarbs Oct 13, 2022
9a50ac4
Add flag to disable P2P connections
kylecarbs Oct 14, 2022
6fa941f
Wow, I made the tests pass
kylecarbs Oct 14, 2022
abff96b
Add replicas endpoint
kylecarbs Oct 14, 2022
d6ce216
Ensure close kills replica
kylecarbs Oct 14, 2022
c3786a5
Merge branch 'main' into replica
kylecarbs Oct 14, 2022
d7cc0ff
Update sql
kylecarbs Oct 14, 2022
9914840
Add database latency to high availability
kylecarbs Oct 15, 2022
c1aa3d2
Pipe TLS to DERP mesh
kylecarbs Oct 15, 2022
0cc4263
Fix DERP mesh with TLS
kylecarbs Oct 15, 2022
f9177e4
Add tests for TLS
kylecarbs Oct 15, 2022
ee59d88
Fix replica sync TLS
kylecarbs Oct 15, 2022
8641e58
Fix RootCA for replica meshing
kylecarbs Oct 15, 2022
3dfb796
Remove ID from replicasync
kylecarbs Oct 15, 2022
ec2c1f1
Fix getting certificates for meshing
kylecarbs Oct 15, 2022
590f0f8
Remove excessive locking
kylecarbs Oct 15, 2022
d8580d1
Fix linting
kylecarbs Oct 15, 2022
ae956fb
Store mesh key in the database
kylecarbs Oct 15, 2022
d703e2d
Fix replica key for tests
kylecarbs Oct 15, 2022
9bb021c
Fix types gen
kylecarbs Oct 15, 2022
76c9e2c
Fix unlocking unlocked
kylecarbs Oct 15, 2022
09e87b0
Fix race in tests
kylecarbs Oct 15, 2022
18c0464
Update enterprise/derpmesh/derpmesh.go
kylecarbs Oct 15, 2022
6f25b2d
Rename to syncReplicas
kylecarbs Oct 15, 2022
efb6ece
Merge branch 'replica' of github.com:coder/coder into replica
kylecarbs Oct 15, 2022
1e85039
Reuse http client
kylecarbs Oct 15, 2022
ae0aa5f
Delete old replicas on a CRON
kylecarbs Oct 15, 2022
332d435
Merge branch 'main' into replica
kylecarbs Oct 15, 2022
bd7fb13
Fix race condition in connection tests
kylecarbs Oct 15, 2022
bb5b347
Fix linting
kylecarbs Oct 15, 2022
76e0511
Fix nil type
kylecarbs Oct 15, 2022
1ff5f7d
Move pubsub to in-memory for twenty test
kylecarbs Oct 16, 2022
b732184
Add comment for configuration tweaking
kylecarbs Oct 16, 2022
38465ac
Fix leak with transport
kylecarbs Oct 16, 2022
72555e2
Fix close leak in derpmesh
kylecarbs Oct 16, 2022
e54072a
Fix race when creating server
kylecarbs Oct 16, 2022
27d5f40
Remove handler update
kylecarbs Oct 16, 2022
4d0b1d8
Skip test on Windows
kylecarbs Oct 16, 2022
129f5ba
Fix DERP mesh test
kylecarbs Oct 16, 2022
4e5d30e
Wrap HTTP handler replacement in mutex
kylecarbs Oct 16, 2022
0359a7e
Fix error message for relay
kylecarbs Oct 16, 2022
f364d1f
Fix API handler for normal tests
kylecarbs Oct 16, 2022
423a47e
Fix speedtest
kylecarbs Oct 16, 2022
c3a77fe
Fix replica resend
kylecarbs Oct 16, 2022
729f8a0
Fix derpmesh send
kylecarbs Oct 16, 2022
ae0bc5d
Ping async
kylecarbs Oct 16, 2022
d7d50db
Increase wait time of template version jobd
kylecarbs Oct 16, 2022
77d23dc
Fix race when closing replica sync
kylecarbs Oct 16, 2022
435bbbb
Add name to client
kylecarbs Oct 16, 2022
9b7c41a
Log the derpmap being used
kylecarbs Oct 17, 2022
9615402
Don't connect if DERP is empty
kylecarbs Oct 17, 2022
bcb97ac
Improve agent coordinator logging
kylecarbs Oct 17, 2022
e2f6a19
Fix lock in coordinator
kylecarbs Oct 17, 2022
c855c9b
Fix relay addr
kylecarbs Oct 17, 2022
a0e5cab
Fix race when updating durations
kylecarbs Oct 17, 2022
9878fc5
Fix client publish race
kylecarbs Oct 17, 2022
7a40bf8
Run pubsub loop in a queue
kylecarbs Oct 17, 2022
08b9681
Store agent nodes in order
kylecarbs Oct 17, 2022
79991a9
Fix coordinator locking
kylecarbs Oct 17, 2022
020171b
Check for closed pipe
kylecarbs Oct 17, 2022
6a57554
Merge branch 'main' into replica
kylecarbs Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add replicas endpoint
  • Loading branch information
kylecarbs committed Oct 14, 2022
commit abff96b103bcc4d6a72697154d95477bd9b69aed
4 changes: 4 additions & 0 deletions coderd/rbac/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ var (
ResourceDeploymentFlags = Object{
Type: "deployment_flags",
}

ResourceReplicas = Object{
Type: "replicas",
}
)

// Object is used to create objects for authz checks when you have none in
Expand Down
26 changes: 0 additions & 26 deletions codersdk/deployment.go

This file was deleted.

42 changes: 42 additions & 0 deletions codersdk/replicas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package codersdk

import (
"context"
"encoding/json"
"net/http"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"
)

type Replica struct {
// ID is the unique identifier for the replica.
ID uuid.UUID `json:"id"`
// Hostname is the hostname of the replica.
Hostname string `json:"hostname"`
// CreatedAt is when the replica was first seen.
CreatedAt time.Time `json:"created_at"`
// RelayAddress is the accessible address to relay DERP connections.
RelayAddress string `json:"relay_address"`
// RegionID is the region of the replica.
RegionID int32 `json:"region_id"`
// Error is the error.
Error string `json:"error"`
}

// Replicas fetches the list of replicas.
func (c *Client) Replicas(ctx context.Context) ([]Replica, error) {
res, err := c.Request(ctx, http.MethodGet, "/api/v2/replicas", nil)
if err != nil {
return nil, xerrors.Errorf("execute request: %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, readBodyAsError(res)
}

var replicas []Replica
return replicas, json.NewDecoder(res.Body).Decode(&replicas)
}
4 changes: 4 additions & 0 deletions codersdk/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,17 @@ func (c *Client) ListenWorkspaceAgentTailnet(ctx context.Context) (net.Conn, err
return websocket.NetConn(ctx, conn, websocket.MessageBinary), nil
}

// @typescript-ignore DialWorkspaceAgentOptions
type DialWorkspaceAgentOptions struct {
Logger slog.Logger
// BlockEndpoints forced a direct connection through DERP.
BlockEndpoints bool
}

func (c *Client) DialWorkspaceAgent(ctx context.Context, agentID uuid.UUID, options *DialWorkspaceAgentOptions) (*AgentConn, error) {
if options == nil {
options = &DialWorkspaceAgentOptions{}
}
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("/api/v2/workspaceagents/%s/connection", agentID), nil)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion enterprise/cli/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestFeaturesList(t *testing.T) {
var entitlements codersdk.Entitlements
err := json.Unmarshal(buf.Bytes(), &entitlements)
require.NoError(t, err, "unmarshal JSON output")
assert.Len(t, entitlements.Features, 6)
assert.Len(t, entitlements.Features, 7)
assert.Empty(t, entitlements.Warnings)
assert.Equal(t, codersdk.EntitlementNotEntitled,
entitlements.Features[codersdk.FeatureUserLimit].Entitlement)
Expand Down
4 changes: 4 additions & 0 deletions enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func New(ctx context.Context, options *Options) (*API, error) {

api.AGPL.APIHandler.Group(func(r chi.Router) {
r.Get("/entitlements", api.serveEntitlements)
r.Route("/replicas", func(r chi.Router) {
r.Use(apiKeyMiddleware)
r.Get("/", api.replicas)
})
r.Route("/licenses", func(r chi.Router) {
r.Use(apiKeyMiddleware)
r.Post("/", api.postLicense)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/coderd/coderd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestEntitlements(t *testing.T) {
assert.False(t, res.HasLicense)
al = res.Features[codersdk.FeatureAuditLog]
assert.Equal(t, codersdk.EntitlementNotEntitled, al.Entitlement)
assert.True(t, al.Enabled)
assert.False(t, al.Enabled)
})
t.Run("Pubsub", func(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 4 additions & 0 deletions enterprise/coderd/coderdenttest/coderdenttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestAuthorizeAllEndpoints(t *testing.T) {
AssertAction: rbac.ActionRead,
AssertObject: rbac.ResourceLicense,
}
assertRoute["GET:/api/v2/replicas"] = coderdtest.RouteCheck{
AssertAction: rbac.ActionRead,
AssertObject: rbac.ResourceReplicas,
}
assertRoute["DELETE:/api/v2/licenses/{id}"] = coderdtest.RouteCheck{
AssertAction: rbac.ActionDelete,
AssertObject: rbac.ResourceLicense,
Expand Down
35 changes: 35 additions & 0 deletions enterprise/coderd/replicas.go
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
package coderd

import (
"net/http"

"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/coderd/rbac"
"github.com/coder/coder/codersdk"
)

// replicas returns the number of replicas that are active in Coder.
func (api *API) replicas(rw http.ResponseWriter, r *http.Request) {
if !api.AGPL.Authorize(r, rbac.ActionRead, rbac.ResourceReplicas) {
httpapi.ResourceNotFound(rw)
return
}

replicas := api.replicaManager.All()
res := make([]codersdk.Replica, 0, len(replicas))
for _, replica := range replicas {
res = append(res, convertReplica(replica))
}
httpapi.Write(r.Context(), rw, http.StatusOK, res)
}

func convertReplica(replica database.Replica) codersdk.Replica {
return codersdk.Replica{
ID: replica.ID,
Hostname: replica.Hostname,
CreatedAt: replica.CreatedAt,
RelayAddress: replica.RelayAddress,
RegionID: replica.RegionID,
Error: replica.Error.String,
}
}
5 changes: 5 additions & 0 deletions enterprise/coderd/replicas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func TestReplicas(t *testing.T) {
},
})
secondClient.SessionToken = firstClient.SessionToken
replicas, err := secondClient.Replicas(context.Background())
require.NoError(t, err)
require.Len(t, replicas, 2)

agentID := setupWorkspaceAgent(t, firstClient, firstUser)
conn, err := secondClient.DialWorkspaceAgent(context.Background(), agentID, &codersdk.DialWorkspaceAgentOptions{
BlockEndpoints: true,
Expand All @@ -76,5 +80,6 @@ func TestReplicas(t *testing.T) {
return err == nil
}, testutil.WaitLong, testutil.IntervalFast)
_ = conn.Close()

})
}
8 changes: 8 additions & 0 deletions enterprise/replicasync/replicasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub data
if err != nil {
return nil, xerrors.Errorf("run replica: %w", err)
}
peers := server.Regional()
if len(peers) > 0 {
self := server.Self()
if self.RelayAddress == "" {
return nil, xerrors.Errorf("a relay address must be specified when running multiple replicas in the same region")
}
}

err = server.subscribe(ctx)
if err != nil {
return nil, xerrors.Errorf("subscribe: %w", err)
Expand Down
36 changes: 24 additions & 12 deletions enterprise/replicasync/replicasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -66,6 +65,25 @@ func TestReplica(t *testing.T) {
_ = server.Close()
require.NoError(t, err)
})
t.Run("ErrorsWithoutRelayAddress", func(t *testing.T) {
// Ensures that the replica reports a successful status for
// accessing all of its peers.
t.Parallel()
db, pubsub := dbtestutil.NewDB(t)
_, err := db.InsertReplica(context.Background(), database.InsertReplicaParams{
ID: uuid.New(),
CreatedAt: database.Now(),
StartedAt: database.Now(),
UpdatedAt: database.Now(),
Hostname: "something",
})
require.NoError(t, err)
_, err = replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
ID: uuid.New(),
})
require.Error(t, err)
require.Equal(t, "a relay address must be specified when running multiple replicas in the same region", err.Error())
})
t.Run("ConnectsToPeerReplica", func(t *testing.T) {
// Ensures that the replica reports a successful status for
// accessing all of its peers.
Expand All @@ -85,7 +103,8 @@ func TestReplica(t *testing.T) {
})
require.NoError(t, err)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
ID: uuid.New(),
ID: uuid.New(),
RelayAddress: "http://169.254.169.254",
})
require.NoError(t, err)
require.Len(t, server.Regional(), 1)
Expand All @@ -96,12 +115,6 @@ func TestReplica(t *testing.T) {
t.Run("ConnectsToFakePeerWithError", func(t *testing.T) {
t.Parallel()
db, pubsub := dbtestutil.NewDB(t)
var count atomic.Int32
cancel, err := pubsub.Subscribe(replicasync.PubsubEvent, func(ctx context.Context, message []byte) {
count.Add(1)
})
require.NoError(t, err)
defer cancel()
peer, err := db.InsertReplica(context.Background(), database.InsertReplicaParams{
ID: uuid.New(),
CreatedAt: database.Now(),
Expand All @@ -113,16 +126,15 @@ func TestReplica(t *testing.T) {
})
require.NoError(t, err)
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, replicasync.Options{
ID: uuid.New(),
PeerTimeout: 1 * time.Millisecond,
ID: uuid.New(),
PeerTimeout: 1 * time.Millisecond,
RelayAddress: "http://169.254.169.254",
})
require.NoError(t, err)
require.Len(t, server.Regional(), 1)
require.Equal(t, peer.ID, server.Regional()[0].ID)
require.True(t, server.Self().Error.Valid)
require.Contains(t, server.Self().Error.String, "Failed to dial peers")
// Once for the initial creation of a replica, and another time for the error.
require.Equal(t, int32(2), count.Load())
_ = server.Close()
})
t.Run("RefreshOnPublish", func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions site/src/api/typesGenerated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ export interface DeploymentFlags {
readonly derp_server_region_code: StringFlag
readonly derp_server_region_name: StringFlag
readonly derp_server_stun_address: StringArrayFlag
readonly derp_server_relay_address: StringFlag
readonly derp_config_url: StringFlag
readonly derp_config_path: StringFlag
readonly prom_enabled: BoolFlag
Expand Down Expand Up @@ -522,6 +523,16 @@ export interface PutExtendWorkspaceRequest {
readonly deadline: string
}

// From codersdk/replicas.go
export interface Replica {
readonly id: string
readonly hostname: string
readonly created_at: string
readonly relay_address: string
readonly region_id: number
readonly error: string
}

// From codersdk/error.go
export interface Response {
readonly message: string
Expand Down