Skip to content

feat: HA tailnet coordinator #4170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Prev Previous commit
Next Next commit
close all connections on coordinator
  • Loading branch information
coadler committed Oct 7, 2022
commit fbad8d075ddfb47d99c9cd7f2d1696ded78266ed
12 changes: 7 additions & 5 deletions codersdk/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ const (
)

const (
FeatureUserLimit = "user_limit"
FeatureAuditLog = "audit_log"
FeatureBrowserOnly = "browser_only"
FeatureSCIM = "scim"
FeatureWorkspaceQuota = "workspace_quota"
FeatureUserLimit = "user_limit"
FeatureAuditLog = "audit_log"
FeatureBrowserOnly = "browser_only"
FeatureSCIM = "scim"
FeatureWorkspaceQuota = "workspace_quota"
FeatureHighAvailability = "high_availability"
)

var FeatureNames = []string{
Expand All @@ -28,6 +29,7 @@ var FeatureNames = []string{
FeatureBrowserOnly,
FeatureSCIM,
FeatureWorkspaceQuota,
FeatureHighAvailability,
}

type Feature struct {
Expand Down
8 changes: 8 additions & 0 deletions enterprise/coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (api *API) updateEntitlements(ctx context.Context) error {
api.AGPL.WorkspaceQuotaEnforcer.Store(&enforcer)
}

if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
enforcer := workspacequota.NewNop()
if enabled {
enforcer = NewEnforcer(api.Options.UserWorkspaceQuota)
}
api.AGPL.WorkspaceQuotaEnforcer.Store(&enforcer)
}

api.entitlements = entitlements

return nil
Expand Down
25 changes: 19 additions & 6 deletions enterprise/coderd/license/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ import (
)

// Entitlements processes licenses to return whether features are enabled or not.
func Entitlements(ctx context.Context, db database.Store, logger slog.Logger, keys map[string]ed25519.PublicKey, enablements map[string]bool) (codersdk.Entitlements, error) {
func Entitlements(
ctx context.Context,
db database.Store,
logger slog.Logger,
keys map[string]ed25519.PublicKey,
enablements map[string]bool,
) (codersdk.Entitlements, error) {
now := time.Now()
// Default all entitlements to be disabled.
entitlements := codersdk.Entitlements{
Expand Down Expand Up @@ -96,6 +102,12 @@ func Entitlements(ctx context.Context, db database.Store, logger slog.Logger, ke
Enabled: enablements[codersdk.FeatureWorkspaceQuota],
}
}
if claims.Features.HighAvailability > 0 {
entitlements.Features[codersdk.FeatureHighAvailability] = codersdk.Feature{
Entitlement: entitlement,
Enabled: enablements[codersdk.FeatureHighAvailability],
}
}
if claims.AllFeatures {
allFeatures = true
}
Expand Down Expand Up @@ -165,11 +177,12 @@ var (
)

type Features struct {
UserLimit int64 `json:"user_limit"`
AuditLog int64 `json:"audit_log"`
BrowserOnly int64 `json:"browser_only"`
SCIM int64 `json:"scim"`
WorkspaceQuota int64 `json:"workspace_quota"`
UserLimit int64 `json:"user_limit"`
AuditLog int64 `json:"audit_log"`
BrowserOnly int64 `json:"browser_only"`
SCIM int64 `json:"scim"`
WorkspaceQuota int64 `json:"workspace_quota"`
HighAvailability int64 `json:"high_availability"`
}

type Claims struct {
Expand Down
30 changes: 29 additions & 1 deletion enterprise/tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"golang.org/x/xerrors"

"cdr.dev/slog"

"github.com/coder/coder/coderd/database"
agpl "github.com/coder/coder/tailnet"
)
Expand Down Expand Up @@ -288,8 +287,37 @@ func (c *haCoordinator) hangleAgentUpdate(id uuid.UUID, decoder *json.Decoder) (
return &node, nil
}

// Close closes all of the open connections in the coordinator and stops the
// coordinator from accepting new connections.
func (c *haCoordinator) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()

close(c.close)

wg := sync.WaitGroup{}

wg.Add(len(c.agentSockets))
for _, socket := range c.agentSockets {
socket := socket
go func() {
_ = socket.Close()
wg.Done()
}()
}

for _, connMap := range c.agentToConnectionSockets {
wg.Add(len(connMap))
for _, socket := range connMap {
socket := socket
go func() {
_ = socket.Close()
wg.Done()
}()
}
}

wg.Wait()
return nil
}

Expand Down
48 changes: 46 additions & 2 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
// in-memory.
func NewMemoryCoordinator() Coordinator {
return &memoryCoordinator{
closed: false,
nodes: map[uuid.UUID]*Node{},
agentSockets: map[uuid.UUID]net.Conn{},
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]net.Conn{},
Expand All @@ -112,7 +113,8 @@ func NewMemoryCoordinator() Coordinator {
// This coordinator is incompatible with multiple Coder
// replicas as all node data is in-memory.
type memoryCoordinator struct {
mutex sync.Mutex
mutex sync.Mutex
closed bool

// nodes maps agent and connection IDs their respective node.
nodes map[uuid.UUID]*Node
Expand All @@ -135,6 +137,11 @@ func (c *memoryCoordinator) Node(id uuid.UUID) *Node {
// with the specified ID.
func (c *memoryCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
c.mutex.Lock()

if c.closed {
return xerrors.New("coordinator is closed")
}

// When a new connection is requested, we update it with the latest
// node of the agent. This allows the connection to establish.
node, ok := c.nodes[agent]
Expand Down Expand Up @@ -229,6 +236,11 @@ func (c *memoryCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder
// listens to incoming connections and publishes node updates.
func (c *memoryCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
c.mutex.Lock()

if c.closed {
return xerrors.New("coordinator is closed")
}

sockets, ok := c.agentToConnectionSockets[id]
if ok {
// Publish all nodes that want to connect to the
Expand Down Expand Up @@ -320,4 +332,36 @@ func (c *memoryCoordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.D
return nil
}

func (*memoryCoordinator) Close() error { return nil }
// Close closes all of the open connections in the coordinator and stops the
// coordinator from accepting new connections.
func (c *memoryCoordinator) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()

c.closed = true

wg := sync.WaitGroup{}

wg.Add(len(c.agentSockets))
for _, socket := range c.agentSockets {
socket := socket
go func() {
_ = socket.Close()
wg.Done()
}()
}

for _, connMap := range c.agentToConnectionSockets {
wg.Add(len(connMap))
for _, socket := range connMap {
socket := socket
go func() {
_ = socket.Close()
wg.Done()
}()
}
}

wg.Wait()
return nil
}