Skip to content

chore: rename Coordinator to CoordinatorV1 #11222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/agenttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewClient(t testing.TB,
agentID uuid.UUID,
manifest agentsdk.Manifest,
statsChan chan *agentsdk.Stats,
coordinator tailnet.Coordinator,
coordinator tailnet.CoordinatorV1,
) *Client {
if manifest.AgentID == uuid.Nil {
manifest.AgentID = agentID
Expand All @@ -47,7 +47,7 @@ type Client struct {
manifest agentsdk.Manifest
metadata map[string]agentsdk.Metadata
statsChan chan *agentsdk.Stats
coordinator tailnet.Coordinator
coordinator tailnet.CoordinatorV1
LastWorkspaceAgent func()
PatchWorkspaceLogs func() error
GetServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
Expand Down
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
defer closeBatcher()

// We use a separate coderAPICloser so the Enterprise API
// can have it's own close functions. This is cleaner
// can have its own close functions. This is cleaner
// than abstracting the Coder API itself.
coderAPI, coderAPICloser, err := newAPI(ctx, options)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ func convertWorkspaceAgentMetadataDesc(mds []database.WorkspaceAgentMetadatum) [
return metadata
}

func convertWorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator tailnet.Coordinator,
func convertWorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator tailnet.CoordinatorV1,
dbAgent database.WorkspaceAgent, apps []codersdk.WorkspaceApp, scripts []codersdk.WorkspaceAgentScript, logSources []codersdk.WorkspaceAgentLogSource,
agentInactiveDisconnectTimeout time.Duration, agentFallbackTroubleshootingURL string,
) (codersdk.WorkspaceAgent, error) {
Expand Down
2 changes: 1 addition & 1 deletion coderd/wsconncache/wsconncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type client struct {
t *testing.T
agentID uuid.UUID
manifest agentsdk.Manifest
coordinator tailnet.Coordinator
coordinator tailnet.CoordinatorV1
}

func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
Expand Down
19 changes: 19 additions & 0 deletions enterprise/tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/codersdk"
agpl "github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
)

// NewCoordinator creates a new high availability coordinator
Expand Down Expand Up @@ -156,6 +157,24 @@ type haCoordinator struct {
legacyAgents map[uuid.UUID]struct{}
}

func (c *haCoordinator) Coordinate(ctx context.Context, _ uuid.UUID, _ string, _ agpl.TunnelAuth) (chan<- *proto.CoordinateRequest, <-chan *proto.CoordinateResponse) {
// HA Coordinator does NOT support v2 API and this is just here to appease the compiler and prevent
// panics while we build out v2 support elsewhere. We will retire the HA Coordinator in favor of
// PG Coordinator before we turn on the v2 API.
c.log.Warn(ctx, "v2 API invoked but unimplemented")
resp := make(chan *proto.CoordinateResponse)
close(resp)
req := make(chan *proto.CoordinateRequest)
go func() {
for {
if _, ok := <-req; !ok {
return
}
}
}()
return req, resp
}

// Node returns an in-memory node by ID.
func (c *haCoordinator) Node(id uuid.UUID) *agpl.Node {
c.mutex.Lock()
Expand Down
6 changes: 0 additions & 6 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,6 @@ func newPGCoordInternal(
return c, nil
}

// NewPGCoordV2 creates a high-availability coordinator that stores state in the PostgreSQL database and
// receives notifications of updates via the pubsub.
func NewPGCoordV2(ctx context.Context, logger slog.Logger, ps pubsub.Pubsub, store database.Store) (agpl.CoordinatorV2, error) {
return newPGCoordInternal(ctx, logger, ps, store)
}

func (c *pgCoord) ServeMultiAgent(id uuid.UUID) agpl.MultiAgentConn {
return agpl.ServeMultiAgent(c, c.logger, id)
}
Expand Down
10 changes: 5 additions & 5 deletions enterprise/tailnet/pgcoord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func TestPGCoordinator_BidirectionalTunnels(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator, err := tailnet.NewPGCoordV2(ctx, logger, ps, store)
coordinator, err := tailnet.NewPGCoord(ctx, logger, ps, store)
require.NoError(t, err)
defer coordinator.Close()
agpltest.BidirectionalTunnels(ctx, t, coordinator)
Expand All @@ -626,7 +626,7 @@ func TestPGCoordinator_GracefulDisconnect(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator, err := tailnet.NewPGCoordV2(ctx, logger, ps, store)
coordinator, err := tailnet.NewPGCoord(ctx, logger, ps, store)
require.NoError(t, err)
defer coordinator.Close()
agpltest.GracefulDisconnectTest(ctx, t, coordinator)
Expand All @@ -641,7 +641,7 @@ func TestPGCoordinator_Lost(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator, err := tailnet.NewPGCoordV2(ctx, logger, ps, store)
coordinator, err := tailnet.NewPGCoord(ctx, logger, ps, store)
require.NoError(t, err)
defer coordinator.Close()
agpltest.LostTest(ctx, t, coordinator)
Expand Down Expand Up @@ -676,7 +676,7 @@ func newTestConn(ids []uuid.UUID) *testConn {
return a
}

func newTestAgent(t *testing.T, coord agpl.Coordinator, name string, id ...uuid.UUID) *testConn {
func newTestAgent(t *testing.T, coord agpl.CoordinatorV1, name string, id ...uuid.UUID) *testConn {
a := newTestConn(id)
go func() {
err := coord.ServeAgent(a.serverWS, a.id, name)
Expand Down Expand Up @@ -731,7 +731,7 @@ func (c *testConn) waitForClose(ctx context.Context, t *testing.T) {
}
}

func newTestClient(t *testing.T, coord agpl.Coordinator, agentID uuid.UUID, id ...uuid.UUID) *testConn {
func newTestClient(t *testing.T, coord agpl.CoordinatorV1, agentID uuid.UUID, id ...uuid.UUID) *testConn {
c := newTestConn(id)
go func() {
err := coord.ServeClient(c.serverWS, c.id, agentID)
Expand Down
15 changes: 5 additions & 10 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
// └──────────────────┘ └────────────────────┘ └───────────────────┘ └──────────────────┘
// Coordinators have different guarantees for HA support.
type Coordinator interface {
CoordinatorV1
CoordinatorV2
}

type CoordinatorV1 interface {
// ServeHTTPDebug serves a debug webpage that shows the internal state of
// the coordinator.
ServeHTTPDebug(w http.ResponseWriter, r *http.Request)
Expand Down Expand Up @@ -143,16 +148,6 @@ func NewCoordinator(logger slog.Logger) Coordinator {
}
}

// NewCoordinatorV2 constructs a new in-memory connection coordinator. This
// coordinator is incompatible with multiple Coder replicas as all node data is
// in-memory.
func NewCoordinatorV2(logger slog.Logger) CoordinatorV2 {
return &coordinator{
core: newCore(logger.Named(LoggerName)),
closedChan: make(chan struct{}),
}
}

// coordinator exchanges nodes with agents to establish connections entirely in-memory.
// The Enterprise implementation provides this for high-availability.
// ┌──────────────────┐ ┌────────────────────┐ ┌───────────────────┐ ┌──────────────────┐
Expand Down
6 changes: 3 additions & 3 deletions tailnet/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,23 +357,23 @@ func TestCoordinator_AgentUpdateWhileClientConnects(t *testing.T) {
func TestCoordinator_BidirectionalTunnels(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator := tailnet.NewCoordinatorV2(logger)
coordinator := tailnet.NewCoordinator(logger)
ctx := testutil.Context(t, testutil.WaitShort)
test.BidirectionalTunnels(ctx, t, coordinator)
}

func TestCoordinator_GracefulDisconnect(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator := tailnet.NewCoordinatorV2(logger)
coordinator := tailnet.NewCoordinator(logger)
ctx := testutil.Context(t, testutil.WaitShort)
test.GracefulDisconnectTest(ctx, t, coordinator)
}

func TestCoordinator_Lost(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator := tailnet.NewCoordinatorV2(logger)
coordinator := tailnet.NewCoordinator(logger)
ctx := testutil.Context(t, testutil.WaitShort)
test.LostTest(ctx, t, coordinator)
}
Expand Down