Skip to content

feat: add single tailnet support to pgcoord #9351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a20d318
feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
efa2071
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
2976c00
fixup! feat: add single tailnet support to pgcoord
coadler Aug 25, 2023
a7b39df
separate table + use channels
coadler Aug 31, 2023
618b0e0
fix migrations
coadler Sep 7, 2023
f50f929
fixup! fix migrations
coadler Sep 7, 2023
3a5bb76
fixup! fix migrations
coadler Sep 8, 2023
3ddc783
fixup! fix migrations
coadler Sep 8, 2023
c84626a
Merge branch 'main' into colin/single-pgcoord
coadler Sep 13, 2023
a0cb904
Merge branch 'main' into colin/single-pgcoord
coadler Sep 14, 2023
dcb007d
add subscriber subsystem
coadler Sep 15, 2023
bdd7ef1
fixup! add subscriber subsystem
coadler Sep 15, 2023
751b22f
fixup! add subscriber subsystem
coadler Sep 15, 2023
3af1af1
fixup! add subscriber subsystem
coadler Sep 15, 2023
7762a73
querier <- subscriber
coadler Sep 19, 2023
08501e6
Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
eb681ff
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
390e837
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a1c3acf
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
8256670
fixup! Merge branch 'main' into colin/single-pgcoord
coadler Sep 19, 2023
a75f6f1
add extensive multiagent tests
coadler Sep 20, 2023
d143d2d
use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
036094f
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 20, 2023
e8a2b01
fixup! use dedicated channels for querier subscribe and closing conns
coadler Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add extensive multiagent tests
  • Loading branch information
coadler committed Sep 20, 2023
commit a75f6f1cf35a6357cabf0e7676d2a4fffaf01b0c
354 changes: 354 additions & 0 deletions enterprise/tailnet/multiagent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
package tailnet_test

import (
"context"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/enterprise/tailnet"
agpl "github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/testutil"
)

// TestPGCoordinator_MultiAgent tests a single coordinator with a MultiAgent
// connecting to one agent.
//
// +--------+
// agent1 ---> | coord1 | <--- client
// +--------+
func TestPGCoordinator_MultiAgent(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, ps := dbtestutil.NewDB(t)
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
require.NoError(t, err)
defer coord1.Close()

agent1 := newTestAgent(t, coord1, "agent1")
defer agent1.close()
agent1.sendNode(&agpl.Node{PreferredDERP: 5})

id := uuid.New()
ma1 := coord1.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.SubscribeAgent(agent1.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)

err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
require.NoError(t, err)
assertEventuallyHasDERPs(ctx, t, agent1, 3)

require.NoError(t, ma1.Close())
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with
// a MultiAgent connecting to one agent. It tries to race a call to Unsubscribe
// with the MultiAgent closing.
//
// +--------+
// agent1 ---> | coord1 | <--- client
// +--------+
func TestPGCoordinator_MultiAgent_UnsubscribeRace(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, ps := dbtestutil.NewDB(t)
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
require.NoError(t, err)
defer coord1.Close()

agent1 := newTestAgent(t, coord1, "agent1")
defer agent1.close()
agent1.sendNode(&agpl.Node{PreferredDERP: 5})

id := uuid.New()
ma1 := coord1.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.SubscribeAgent(agent1.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)

err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
require.NoError(t, err)
assertEventuallyHasDERPs(ctx, t, agent1, 3)

require.NoError(t, ma1.UnsubscribeAgent(agent1.id))
require.NoError(t, ma1.Close())
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_Unsubscribe tests a single coordinator with a
// MultiAgent connecting to one agent. It unsubscribes before closing, and
// ensures node updates are no longer propagated.
//
// +--------+
// agent1 ---> | coord1 | <--- client
// +--------+
func TestPGCoordinator_MultiAgent_Unsubscribe(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, ps := dbtestutil.NewDB(t)
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
require.NoError(t, err)
defer coord1.Close()

agent1 := newTestAgent(t, coord1, "agent1")
defer agent1.close()
agent1.sendNode(&agpl.Node{PreferredDERP: 5})

id := uuid.New()
ma1 := coord1.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.SubscribeAgent(agent1.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)

require.NoError(t, ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3}))
assertEventuallyHasDERPs(ctx, t, agent1, 3)

require.NoError(t, ma1.UnsubscribeAgent(agent1.id))
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)

func() {
ctx, cancel := context.WithTimeout(ctx, testutil.IntervalSlow*3)
defer cancel()
require.NoError(t, ma1.UpdateSelf(&agpl.Node{PreferredDERP: 9}))
assertNeverHasDERPs(ctx, t, agent1, 9)
}()
func() {
ctx, cancel := context.WithTimeout(ctx, testutil.IntervalSlow*3)
defer cancel()
agent1.sendNode(&agpl.Node{PreferredDERP: 8})
assertMultiAgentNeverHasDERPs(ctx, t, ma1, 8)
}()

require.NoError(t, ma1.Close())
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_MultiCoordinator tests two coordinators with a
// MultiAgent connecting to an agent on a separate coordinator.
//
// +--------+
// agent1 ---> | coord1 |
// +--------+
// +--------+
// | coord2 | <--- client
// +--------+
func TestPGCoordinator_MultiAgent_MultiCoordinator(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, ps := dbtestutil.NewDB(t)
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
require.NoError(t, err)
defer coord1.Close()
coord2, err := tailnet.NewPGCoord(ctx, logger.Named("coord2"), ps, store)
require.NoError(t, err)
defer coord2.Close()

agent1 := newTestAgent(t, coord1, "agent1")
defer agent1.close()
agent1.sendNode(&agpl.Node{PreferredDERP: 5})

id := uuid.New()
ma1 := coord2.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.SubscribeAgent(agent1.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)

err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
require.NoError(t, err)
assertEventuallyHasDERPs(ctx, t, agent1, 3)

require.NoError(t, ma1.Close())
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_MultiCoordinator_UpdateBeforeSubscribe tests two
// coordinators with a MultiAgent connecting to an agent on a separate
// coordinator. The MultiAgent updates its own node before subscribing.
//
// +--------+
// agent1 ---> | coord1 |
// +--------+
// +--------+
// | coord2 | <--- client
// +--------+
func TestPGCoordinator_MultiAgent_MultiCoordinator_UpdateBeforeSubscribe(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, ps := dbtestutil.NewDB(t)
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
require.NoError(t, err)
defer coord1.Close()
coord2, err := tailnet.NewPGCoord(ctx, logger.Named("coord2"), ps, store)
require.NoError(t, err)
defer coord2.Close()

agent1 := newTestAgent(t, coord1, "agent1")
defer agent1.close()
agent1.sendNode(&agpl.Node{PreferredDERP: 5})

id := uuid.New()
ma1 := coord2.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
require.NoError(t, err)

err = ma1.SubscribeAgent(agent1.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
assertEventuallyHasDERPs(ctx, t, agent1, 3)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)

require.NoError(t, ma1.Close())
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_TwoAgents tests three coordinators with a
// MultiAgent connecting to two agents on separate coordinators.
//
// +--------+
// agent1 ---> | coord1 |
// +--------+
// +--------+
// agent2 ---> | coord2 |
// +--------+
// +--------+
// | coord3 | <--- client
// +--------+
func TestPGCoordinator_MultiAgent_TwoAgents(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
store, ps := dbtestutil.NewDB(t)
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
require.NoError(t, err)
defer coord1.Close()
coord2, err := tailnet.NewPGCoord(ctx, logger.Named("coord2"), ps, store)
require.NoError(t, err)
defer coord2.Close()
coord3, err := tailnet.NewPGCoord(ctx, logger.Named("coord3"), ps, store)
require.NoError(t, err)
defer coord3.Close()

agent1 := newTestAgent(t, coord1, "agent1")
defer agent1.close()
agent1.sendNode(&agpl.Node{PreferredDERP: 5})

agent2 := newTestAgent(t, coord2, "agent2")
defer agent1.close()
agent2.sendNode(&agpl.Node{PreferredDERP: 6})

id := uuid.New()
ma1 := coord2.ServeMultiAgent(id)
defer ma1.Close()

err = ma1.SubscribeAgent(agent1.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)

err = ma1.SubscribeAgent(agent2.id)
require.NoError(t, err)
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 6)

agent2.sendNode(&agpl.Node{PreferredDERP: 2})
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 2)

err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
require.NoError(t, err)
assertEventuallyHasDERPs(ctx, t, agent1, 3)
assertEventuallyHasDERPs(ctx, t, agent2, 3)

require.NoError(t, ma1.Close())
require.NoError(t, agent1.close())
require.NoError(t, agent2.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
}
7 changes: 4 additions & 3 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,9 +942,10 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()

// agentID: uuid.Nil indicates that a client is going away. The querier
// handles that in cleanupConn below instead.
if agentID == uuid.Nil {
// Allow duplicate unsubscribes. It's possible for cleanupConn to race with
// an external call to removeClientSubscription, so we just ensure the
// client subscription exists before attempting to remove it.
if _, ok := q.clientSubscriptions[c.UniqueID()][agentID]; !ok {
return
}

Expand Down
Loading