Skip to content

Commit 289e139

Browse files
committed
Move coordinator to high availability package
1 parent 4804269 commit 289e139

File tree

8 files changed

+33
-30
lines changed

8 files changed

+33
-30
lines changed

agent/agent_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
560560
if metadata.DERPMap == nil {
561561
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
562562
}
563-
coordinator := tailnet.NewMemoryCoordinator()
563+
coordinator := tailnet.NewCoordinator()
564564
agentID := uuid.New()
565565
statsCh := make(chan *codersdk.AgentStats)
566566
closer := agent.New(agent.Options{

coderd/coderd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func New(options *Options) *API {
119119
options.PrometheusRegistry = prometheus.NewRegistry()
120120
}
121121
if options.TailnetCoordinator == nil {
122-
options.TailnetCoordinator = tailnet.NewMemoryCoordinator()
122+
options.TailnetCoordinator = tailnet.NewCoordinator()
123123
}
124124
if options.Auditor == nil {
125125
options.Auditor = audit.NewNop()

coderd/wsconncache/wsconncache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func TestCache(t *testing.T) {
143143
func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeout time.Duration) *codersdk.AgentConn {
144144
metadata.DERPMap = tailnettest.RunDERPAndSTUN(t)
145145

146-
coordinator := tailnet.NewMemoryCoordinator()
146+
coordinator := tailnet.NewCoordinator()
147147
agentID := uuid.New()
148148
closer := agent.New(agent.Options{
149149
FetchMetadata: func(ctx context.Context) (codersdk.WorkspaceAgentMetadata, error) {

enterprise/coderd/coderd.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
"github.com/coder/coder/enterprise/audit"
2424
"github.com/coder/coder/enterprise/audit/backends"
2525
"github.com/coder/coder/enterprise/coderd/license"
26-
"github.com/coder/coder/enterprise/tailnet"
27-
agpltailnet "github.com/coder/coder/tailnet"
26+
"github.com/coder/coder/enterprise/highavailability"
27+
"github.com/coder/coder/tailnet"
2828
)
2929

3030
// New constructs an Enterprise coderd API instance.
@@ -206,9 +206,9 @@ func (api *API) updateEntitlements(ctx context.Context) error {
206206
}
207207

208208
if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed {
209-
coordinator := agpltailnet.NewMemoryCoordinator()
209+
coordinator := tailnet.NewCoordinator()
210210
if enabled {
211-
haCoordinator, err := tailnet.NewHACoordinator(api.Logger, api.Pubsub)
211+
haCoordinator, err := highavailability.NewCoordinator(api.Logger, api.Pubsub)
212212
if err != nil {
213213
api.Logger.Error(ctx, "unable to setup HA tailnet coordinator", slog.Error(err))
214214
// If we try to setup the HA coordinator and it fails, nothing

enterprise/tailnet/coordinator.go renamed to enterprise/highavailability/coordinator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package tailnet
1+
package highavailability
22

33
import (
44
"bytes"
@@ -18,7 +18,9 @@ import (
1818
agpl "github.com/coder/coder/tailnet"
1919
)
2020

21-
func NewHACoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
21+
// NewCoordinator creates a new high availability coordinator
22+
// that uses PostgreSQL pubsub to exchange handshakes.
23+
func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) {
2224
coord := &haCoordinator{
2325
id: uuid.New(),
2426
log: logger,

enterprise/tailnet/coordinator_test.go renamed to enterprise/highavailability/coordinator_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package tailnet_test
1+
package highavailability_test
22

33
import (
44
"net"
@@ -11,7 +11,7 @@ import (
1111
"cdr.dev/slog/sloggers/slogtest"
1212

1313
"github.com/coder/coder/coderd/database"
14-
"github.com/coder/coder/enterprise/tailnet"
14+
"github.com/coder/coder/enterprise/highavailability"
1515
agpl "github.com/coder/coder/tailnet"
1616
"github.com/coder/coder/testutil"
1717
)
@@ -20,7 +20,7 @@ func TestCoordinatorSingle(t *testing.T) {
2020
t.Parallel()
2121
t.Run("ClientWithoutAgent", func(t *testing.T) {
2222
t.Parallel()
23-
coordinator, err := tailnet.NewHACoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory())
23+
coordinator, err := highavailability.NewCoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory())
2424
require.NoError(t, err)
2525
defer coordinator.Close()
2626

@@ -48,7 +48,7 @@ func TestCoordinatorSingle(t *testing.T) {
4848

4949
t.Run("AgentWithoutClients", func(t *testing.T) {
5050
t.Parallel()
51-
coordinator, err := tailnet.NewHACoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory())
51+
coordinator, err := highavailability.NewCoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory())
5252
require.NoError(t, err)
5353
defer coordinator.Close()
5454

@@ -76,7 +76,7 @@ func TestCoordinatorSingle(t *testing.T) {
7676
t.Run("AgentWithClient", func(t *testing.T) {
7777
t.Parallel()
7878

79-
coordinator, err := tailnet.NewHACoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory())
79+
coordinator, err := highavailability.NewCoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory())
8080
require.NoError(t, err)
8181
defer coordinator.Close()
8282

@@ -169,11 +169,11 @@ func TestCoordinatorHA(t *testing.T) {
169169

170170
pubsub := database.NewPubsubInMemory()
171171

172-
coordinator1, err := tailnet.NewHACoordinator(slogtest.Make(t, nil), pubsub)
172+
coordinator1, err := highavailability.NewCoordinator(slogtest.Make(t, nil), pubsub)
173173
require.NoError(t, err)
174174
defer coordinator1.Close()
175175

176-
coordinator2, err := tailnet.NewHACoordinator(slogtest.Make(t, nil), pubsub)
176+
coordinator2, err := highavailability.NewCoordinator(slogtest.Make(t, nil), pubsub)
177177
require.NoError(t, err)
178178
defer coordinator2.Close()
179179

tailnet/coordinator.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,25 +94,26 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func
9494
}, errChan
9595
}
9696

97-
// NewMemoryCoordinator constructs a new in-memory connection coordinator. This
97+
// NewCoordinator constructs a new in-memory connection coordinator. This
9898
// coordinator is incompatible with multiple Coder replicas as all node data is
9999
// in-memory.
100-
func NewMemoryCoordinator() Coordinator {
101-
return &memoryCoordinator{
100+
func NewCoordinator() Coordinator {
101+
return &coordinator{
102102
closed: false,
103103
nodes: map[uuid.UUID]*Node{},
104104
agentSockets: map[uuid.UUID]net.Conn{},
105105
agentToConnectionSockets: map[uuid.UUID]map[uuid.UUID]net.Conn{},
106106
}
107107
}
108108

109-
// MemoryCoordinator exchanges nodes with agents to establish connections.
109+
// coordinator exchanges nodes with agents to establish connections entirely in-memory.
110+
// The Enterprise implementation provides this for high-availability.
110111
// ┌──────────────────┐ ┌────────────────────┐ ┌───────────────────┐ ┌──────────────────┐
111112
// │tailnet.Coordinate├──►│tailnet.AcceptClient│◄─►│tailnet.AcceptAgent│◄──┤tailnet.Coordinate│
112113
// └──────────────────┘ └────────────────────┘ └───────────────────┘ └──────────────────┘
113114
// This coordinator is incompatible with multiple Coder
114115
// replicas as all node data is in-memory.
115-
type memoryCoordinator struct {
116+
type coordinator struct {
116117
mutex sync.Mutex
117118
closed bool
118119

@@ -126,7 +127,7 @@ type memoryCoordinator struct {
126127
}
127128

128129
// Node returns an in-memory node by ID.
129-
func (c *memoryCoordinator) Node(id uuid.UUID) *Node {
130+
func (c *coordinator) Node(id uuid.UUID) *Node {
130131
c.mutex.Lock()
131132
defer c.mutex.Unlock()
132133
node := c.nodes[id]
@@ -135,7 +136,7 @@ func (c *memoryCoordinator) Node(id uuid.UUID) *Node {
135136

136137
// ServeClient accepts a WebSocket connection that wants to connect to an agent
137138
// with the specified ID.
138-
func (c *memoryCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
139+
func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error {
139140
c.mutex.Lock()
140141

141142
if c.closed {
@@ -194,7 +195,7 @@ func (c *memoryCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.
194195
}
195196
}
196197

197-
func (c *memoryCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error {
198+
func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error {
198199
var node Node
199200
err := decoder.Decode(&node)
200201
if err != nil {
@@ -234,7 +235,7 @@ func (c *memoryCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder
234235

235236
// ServeAgent accepts a WebSocket connection to an agent that
236237
// listens to incoming connections and publishes node updates.
237-
func (c *memoryCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
238+
func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
238239
c.mutex.Lock()
239240

240241
if c.closed {
@@ -293,7 +294,7 @@ func (c *memoryCoordinator) ServeAgent(conn net.Conn, id uuid.UUID) error {
293294
}
294295
}
295296

296-
func (c *memoryCoordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error {
297+
func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error {
297298
var node Node
298299
err := decoder.Decode(&node)
299300
if err != nil {
@@ -334,7 +335,7 @@ func (c *memoryCoordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.D
334335

335336
// Close closes all of the open connections in the coordinator and stops the
336337
// coordinator from accepting new connections.
337-
func (c *memoryCoordinator) Close() error {
338+
func (c *coordinator) Close() error {
338339
c.mutex.Lock()
339340
defer c.mutex.Unlock()
340341

tailnet/coordinator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestCoordinator(t *testing.T) {
1616
t.Parallel()
1717
t.Run("ClientWithoutAgent", func(t *testing.T) {
1818
t.Parallel()
19-
coordinator := tailnet.NewMemoryCoordinator()
19+
coordinator := tailnet.NewCoordinator()
2020
client, server := net.Pipe()
2121
sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error {
2222
return nil
@@ -40,7 +40,7 @@ func TestCoordinator(t *testing.T) {
4040

4141
t.Run("AgentWithoutClients", func(t *testing.T) {
4242
t.Parallel()
43-
coordinator := tailnet.NewMemoryCoordinator()
43+
coordinator := tailnet.NewCoordinator()
4444
client, server := net.Pipe()
4545
sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error {
4646
return nil
@@ -64,7 +64,7 @@ func TestCoordinator(t *testing.T) {
6464

6565
t.Run("AgentWithClient", func(t *testing.T) {
6666
t.Parallel()
67-
coordinator := tailnet.NewMemoryCoordinator()
67+
coordinator := tailnet.NewCoordinator()
6868

6969
agentWS, agentServerWS := net.Pipe()
7070
defer agentWS.Close()

0 commit comments

Comments
 (0)