Skip to content

Commit a75f6f1

Browse files
committed
add extensive multiagent tests
1 parent 8256670 commit a75f6f1

File tree

3 files changed

+407
-49
lines changed

3 files changed

+407
-49
lines changed

enterprise/tailnet/multiagent_test.go

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
package tailnet_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/google/uuid"
8+
"github.com/stretchr/testify/require"
9+
10+
"cdr.dev/slog"
11+
"cdr.dev/slog/sloggers/slogtest"
12+
"github.com/coder/coder/v2/coderd/database/dbtestutil"
13+
"github.com/coder/coder/v2/enterprise/tailnet"
14+
agpl "github.com/coder/coder/v2/tailnet"
15+
"github.com/coder/coder/v2/testutil"
16+
)
17+
18+
// TestPGCoordinator_MultiAgent tests a single coordinator with a MultiAgent
19+
// connecting to one agent.
20+
//
21+
// +--------+
22+
// agent1 ---> | coord1 | <--- client
23+
// +--------+
24+
func TestPGCoordinator_MultiAgent(t *testing.T) {
25+
t.Parallel()
26+
if !dbtestutil.WillUsePostgres() {
27+
t.Skip("test only with postgres")
28+
}
29+
30+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
31+
defer cancel()
32+
33+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
34+
store, ps := dbtestutil.NewDB(t)
35+
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
36+
require.NoError(t, err)
37+
defer coord1.Close()
38+
39+
agent1 := newTestAgent(t, coord1, "agent1")
40+
defer agent1.close()
41+
agent1.sendNode(&agpl.Node{PreferredDERP: 5})
42+
43+
id := uuid.New()
44+
ma1 := coord1.ServeMultiAgent(id)
45+
defer ma1.Close()
46+
47+
err = ma1.SubscribeAgent(agent1.id)
48+
require.NoError(t, err)
49+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
50+
51+
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
52+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)
53+
54+
err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
55+
require.NoError(t, err)
56+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
57+
58+
require.NoError(t, ma1.Close())
59+
require.NoError(t, agent1.close())
60+
61+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
62+
assertEventuallyNoAgents(ctx, t, store, agent1.id)
63+
}
64+
65+
// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with
66+
// a MultiAgent connecting to one agent. It tries to race a call to Unsubscribe
67+
// with the MultiAgent closing.
68+
//
69+
// +--------+
70+
// agent1 ---> | coord1 | <--- client
71+
// +--------+
72+
func TestPGCoordinator_MultiAgent_UnsubscribeRace(t *testing.T) {
73+
t.Parallel()
74+
if !dbtestutil.WillUsePostgres() {
75+
t.Skip("test only with postgres")
76+
}
77+
78+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
79+
defer cancel()
80+
81+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
82+
store, ps := dbtestutil.NewDB(t)
83+
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
84+
require.NoError(t, err)
85+
defer coord1.Close()
86+
87+
agent1 := newTestAgent(t, coord1, "agent1")
88+
defer agent1.close()
89+
agent1.sendNode(&agpl.Node{PreferredDERP: 5})
90+
91+
id := uuid.New()
92+
ma1 := coord1.ServeMultiAgent(id)
93+
defer ma1.Close()
94+
95+
err = ma1.SubscribeAgent(agent1.id)
96+
require.NoError(t, err)
97+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
98+
99+
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
100+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)
101+
102+
err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
103+
require.NoError(t, err)
104+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
105+
106+
require.NoError(t, ma1.UnsubscribeAgent(agent1.id))
107+
require.NoError(t, ma1.Close())
108+
require.NoError(t, agent1.close())
109+
110+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
111+
assertEventuallyNoAgents(ctx, t, store, agent1.id)
112+
}
113+
114+
// TestPGCoordinator_MultiAgent_Unsubscribe tests a single coordinator with a
115+
// MultiAgent connecting to one agent. It unsubscribes before closing, and
116+
// ensures node updates are no longer propagated.
117+
//
118+
// +--------+
119+
// agent1 ---> | coord1 | <--- client
120+
// +--------+
121+
func TestPGCoordinator_MultiAgent_Unsubscribe(t *testing.T) {
122+
t.Parallel()
123+
if !dbtestutil.WillUsePostgres() {
124+
t.Skip("test only with postgres")
125+
}
126+
127+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
128+
defer cancel()
129+
130+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
131+
store, ps := dbtestutil.NewDB(t)
132+
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
133+
require.NoError(t, err)
134+
defer coord1.Close()
135+
136+
agent1 := newTestAgent(t, coord1, "agent1")
137+
defer agent1.close()
138+
agent1.sendNode(&agpl.Node{PreferredDERP: 5})
139+
140+
id := uuid.New()
141+
ma1 := coord1.ServeMultiAgent(id)
142+
defer ma1.Close()
143+
144+
err = ma1.SubscribeAgent(agent1.id)
145+
require.NoError(t, err)
146+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
147+
148+
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
149+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)
150+
151+
require.NoError(t, ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3}))
152+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
153+
154+
require.NoError(t, ma1.UnsubscribeAgent(agent1.id))
155+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
156+
157+
func() {
158+
ctx, cancel := context.WithTimeout(ctx, testutil.IntervalSlow*3)
159+
defer cancel()
160+
require.NoError(t, ma1.UpdateSelf(&agpl.Node{PreferredDERP: 9}))
161+
assertNeverHasDERPs(ctx, t, agent1, 9)
162+
}()
163+
func() {
164+
ctx, cancel := context.WithTimeout(ctx, testutil.IntervalSlow*3)
165+
defer cancel()
166+
agent1.sendNode(&agpl.Node{PreferredDERP: 8})
167+
assertMultiAgentNeverHasDERPs(ctx, t, ma1, 8)
168+
}()
169+
170+
require.NoError(t, ma1.Close())
171+
require.NoError(t, agent1.close())
172+
173+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
174+
assertEventuallyNoAgents(ctx, t, store, agent1.id)
175+
}
176+
177+
// TestPGCoordinator_MultiAgent_MultiCoordinator tests two coordinators with a
178+
// MultiAgent connecting to an agent on a separate coordinator.
179+
//
180+
// +--------+
181+
// agent1 ---> | coord1 |
182+
// +--------+
183+
// +--------+
184+
// | coord2 | <--- client
185+
// +--------+
186+
func TestPGCoordinator_MultiAgent_MultiCoordinator(t *testing.T) {
187+
t.Parallel()
188+
if !dbtestutil.WillUsePostgres() {
189+
t.Skip("test only with postgres")
190+
}
191+
192+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
193+
defer cancel()
194+
195+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
196+
store, ps := dbtestutil.NewDB(t)
197+
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
198+
require.NoError(t, err)
199+
defer coord1.Close()
200+
coord2, err := tailnet.NewPGCoord(ctx, logger.Named("coord2"), ps, store)
201+
require.NoError(t, err)
202+
defer coord2.Close()
203+
204+
agent1 := newTestAgent(t, coord1, "agent1")
205+
defer agent1.close()
206+
agent1.sendNode(&agpl.Node{PreferredDERP: 5})
207+
208+
id := uuid.New()
209+
ma1 := coord2.ServeMultiAgent(id)
210+
defer ma1.Close()
211+
212+
err = ma1.SubscribeAgent(agent1.id)
213+
require.NoError(t, err)
214+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
215+
216+
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
217+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)
218+
219+
err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
220+
require.NoError(t, err)
221+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
222+
223+
require.NoError(t, ma1.Close())
224+
require.NoError(t, agent1.close())
225+
226+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
227+
assertEventuallyNoAgents(ctx, t, store, agent1.id)
228+
}
229+
230+
// TestPGCoordinator_MultiAgent_MultiCoordinator_UpdateBeforeSubscribe tests two
231+
// coordinators with a MultiAgent connecting to an agent on a separate
232+
// coordinator. The MultiAgent updates its own node before subscribing.
233+
//
234+
// +--------+
235+
// agent1 ---> | coord1 |
236+
// +--------+
237+
// +--------+
238+
// | coord2 | <--- client
239+
// +--------+
240+
func TestPGCoordinator_MultiAgent_MultiCoordinator_UpdateBeforeSubscribe(t *testing.T) {
241+
t.Parallel()
242+
if !dbtestutil.WillUsePostgres() {
243+
t.Skip("test only with postgres")
244+
}
245+
246+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
247+
defer cancel()
248+
249+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
250+
store, ps := dbtestutil.NewDB(t)
251+
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
252+
require.NoError(t, err)
253+
defer coord1.Close()
254+
coord2, err := tailnet.NewPGCoord(ctx, logger.Named("coord2"), ps, store)
255+
require.NoError(t, err)
256+
defer coord2.Close()
257+
258+
agent1 := newTestAgent(t, coord1, "agent1")
259+
defer agent1.close()
260+
agent1.sendNode(&agpl.Node{PreferredDERP: 5})
261+
262+
id := uuid.New()
263+
ma1 := coord2.ServeMultiAgent(id)
264+
defer ma1.Close()
265+
266+
err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
267+
require.NoError(t, err)
268+
269+
err = ma1.SubscribeAgent(agent1.id)
270+
require.NoError(t, err)
271+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
272+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
273+
274+
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
275+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)
276+
277+
require.NoError(t, ma1.Close())
278+
require.NoError(t, agent1.close())
279+
280+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
281+
assertEventuallyNoAgents(ctx, t, store, agent1.id)
282+
}
283+
284+
// TestPGCoordinator_MultiAgent_TwoAgents tests three coordinators with a
285+
// MultiAgent connecting to two agents on separate coordinators.
286+
//
287+
// +--------+
288+
// agent1 ---> | coord1 |
289+
// +--------+
290+
// +--------+
291+
// agent2 ---> | coord2 |
292+
// +--------+
293+
// +--------+
294+
// | coord3 | <--- client
295+
// +--------+
296+
func TestPGCoordinator_MultiAgent_TwoAgents(t *testing.T) {
297+
t.Parallel()
298+
if !dbtestutil.WillUsePostgres() {
299+
t.Skip("test only with postgres")
300+
}
301+
302+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
303+
defer cancel()
304+
305+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
306+
store, ps := dbtestutil.NewDB(t)
307+
coord1, err := tailnet.NewPGCoord(ctx, logger.Named("coord1"), ps, store)
308+
require.NoError(t, err)
309+
defer coord1.Close()
310+
coord2, err := tailnet.NewPGCoord(ctx, logger.Named("coord2"), ps, store)
311+
require.NoError(t, err)
312+
defer coord2.Close()
313+
coord3, err := tailnet.NewPGCoord(ctx, logger.Named("coord3"), ps, store)
314+
require.NoError(t, err)
315+
defer coord3.Close()
316+
317+
agent1 := newTestAgent(t, coord1, "agent1")
318+
defer agent1.close()
319+
agent1.sendNode(&agpl.Node{PreferredDERP: 5})
320+
321+
agent2 := newTestAgent(t, coord2, "agent2")
322+
defer agent1.close()
323+
agent2.sendNode(&agpl.Node{PreferredDERP: 6})
324+
325+
id := uuid.New()
326+
ma1 := coord2.ServeMultiAgent(id)
327+
defer ma1.Close()
328+
329+
err = ma1.SubscribeAgent(agent1.id)
330+
require.NoError(t, err)
331+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 5)
332+
333+
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
334+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 1)
335+
336+
err = ma1.SubscribeAgent(agent2.id)
337+
require.NoError(t, err)
338+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 6)
339+
340+
agent2.sendNode(&agpl.Node{PreferredDERP: 2})
341+
assertMultiAgentEventuallyHasDERPs(ctx, t, ma1, 2)
342+
343+
err = ma1.UpdateSelf(&agpl.Node{PreferredDERP: 3})
344+
require.NoError(t, err)
345+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
346+
assertEventuallyHasDERPs(ctx, t, agent2, 3)
347+
348+
require.NoError(t, ma1.Close())
349+
require.NoError(t, agent1.close())
350+
require.NoError(t, agent2.close())
351+
352+
assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
353+
assertEventuallyNoAgents(ctx, t, store, agent1.id)
354+
}

enterprise/tailnet/pgcoord.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -942,9 +942,10 @@ func (q *querier) removeClientSubscription(c agpl.Queue, agentID uuid.UUID) {
942942
q.mu.Lock()
943943
defer q.mu.Unlock()
944944

945-
// agentID: uuid.Nil indicates that a client is going away. The querier
946-
// handles that in cleanupConn below instead.
947-
if agentID == uuid.Nil {
945+
// Allow duplicate unsubscribes. It's possible for cleanupConn to race with
946+
// an external call to removeClientSubscription, so we just ensure the
947+
// client subscription exists before attempting to remove it.
948+
if _, ok := q.clientSubscriptions[c.UniqueID()][agentID]; !ok {
948949
return
949950
}
950951

0 commit comments

Comments
 (0)