Skip to content

Commit b2fc817

Browse files
committed
spike changes
1 parent 053172f commit b2fc817

18 files changed

+179
-306
lines changed

coderd/database/dbauthz/dbauthz.go

-7
Original file line numberDiff line numberDiff line change
@@ -2645,13 +2645,6 @@ func (q *querier) ListWorkspaceAgentPortShares(ctx context.Context, workspaceID
26452645
return q.db.ListWorkspaceAgentPortShares(ctx, workspaceID)
26462646
}
26472647

2648-
func (q *querier) PublishReadyForHandshake(ctx context.Context, arg database.PublishReadyForHandshakeParams) error {
2649-
if err := q.authorizeContext(ctx, rbac.ActionUpdate, rbac.ResourceTailnetCoordinator); err != nil {
2650-
return err
2651-
}
2652-
return q.db.PublishReadyForHandshake(ctx, arg)
2653-
}
2654-
26552648
func (q *querier) ReduceWorkspaceAgentShareLevelToAuthenticatedByTemplate(ctx context.Context, templateID uuid.UUID) error {
26562649
template, err := q.db.GetTemplateByID(ctx, templateID)
26572650
if err != nil {

coderd/database/dbauthz/dbauthz_test.go

-5
Original file line numberDiff line numberDiff line change
@@ -1829,11 +1829,6 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
18291829
Asserts(rbac.ResourceTailnetCoordinator, rbac.ActionCreate).
18301830
Errors(dbmem.ErrUnimplemented)
18311831
}))
1832-
s.Run("PublishReadyForHandshake", s.Subtest(func(db database.Store, check *expects) {
1833-
check.Args(database.PublishReadyForHandshakeParams{}).
1834-
Asserts(rbac.ResourceTailnetCoordinator, rbac.ActionUpdate).
1835-
Errors(dbmem.ErrUnimplemented)
1836-
}))
18371832
}
18381833

18391834
func (s *MethodTestSuite) TestDBCrypt() {

coderd/database/dbmem/dbmem.go

-4
Original file line numberDiff line numberDiff line change
@@ -6742,10 +6742,6 @@ func (q *FakeQuerier) ListWorkspaceAgentPortShares(_ context.Context, workspaceI
67426742
return shares, nil
67436743
}
67446744

6745-
func (*FakeQuerier) PublishReadyForHandshake(context.Context, database.PublishReadyForHandshakeParams) error {
6746-
return ErrUnimplemented
6747-
}
6748-
67496745
func (q *FakeQuerier) ReduceWorkspaceAgentShareLevelToAuthenticatedByTemplate(_ context.Context, templateID uuid.UUID) error {
67506746
err := validateDatabaseType(templateID)
67516747
if err != nil {

coderd/database/dbmetrics/dbmetrics.go

-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

-17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/tailnet.sql

-6
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,6 @@ FROM tailnet_tunnels
207207
INNER JOIN tailnet_peers ON tailnet_tunnels.src_id = tailnet_peers.id
208208
WHERE tailnet_tunnels.dst_id = $1;
209209

210-
-- name: PublishReadyForHandshake :exec
211-
SELECT pg_notify(
212-
'tailnet_ready_for_handshake',
213-
format('%s,%s', sqlc.arg('to')::text, sqlc.arg('from')::text)
214-
);
215-
216210
-- For PG Coordinator HTMLDebug
217211

218212
-- name: GetAllTailnetCoordinators :many

enterprise/tailnet/connio.go

+31-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package tailnet
22

33
import (
44
"context"
5+
"fmt"
6+
"slices"
57
"sync"
68
"sync/atomic"
79
"time"
@@ -35,6 +37,8 @@ type connIO struct {
3537
mu sync.Mutex
3638
closed bool
3739
disconnected bool
40+
// latest is the most recent, unfiltered snapshot of the mappings we know about
41+
latest []mapping
3842

3943
name string
4044
start int64
@@ -204,10 +208,23 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
204208
return err
205209
}
206210

207-
if err := agpl.SendCtx(c.coordCtx, c.rfhs, readyForHandshake{hKey: hKey{
211+
mappings := c.getLatestMapping()
212+
if !slices.ContainsFunc(mappings, func(mapping mapping) bool {
213+
return mapping.peer == dst
214+
}) {
215+
c.logger.Debug(c.peerCtx, "cannot process ready for handshake, src isn't peered with dst",
216+
slog.F("dst", dst.String()),
217+
)
218+
_ = c.Enqueue(&proto.CoordinateResponse{
219+
Error: fmt.Sprintf("you do not share a tunnel with %q", dst.String()),
220+
})
221+
return nil
222+
}
223+
224+
if err := agpl.SendCtx(c.coordCtx, c.rfhs, readyForHandshake{
208225
src: c.id,
209226
dst: dst,
210-
}}); err != nil {
227+
}); err != nil {
211228
c.logger.Debug(c.peerCtx, "failed to send ready for handshake", slog.Error(err))
212229
return err
213230
}
@@ -216,6 +233,18 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
216233
return nil
217234
}
218235

236+
func (c *connIO) setLatestMapping(latest []mapping) {
237+
c.mu.Lock()
238+
defer c.mu.Unlock()
239+
c.latest = latest
240+
}
241+
242+
func (c *connIO) getLatestMapping() []mapping {
243+
c.mu.Lock()
244+
defer c.mu.Unlock()
245+
return c.latest
246+
}
247+
219248
func (c *connIO) UniqueID() uuid.UUID {
220249
return c.id
221250
}

enterprise/tailnet/handshaker.go

+16-76
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,16 @@ package tailnet
22

33
import (
44
"context"
5-
"slices"
5+
"fmt"
66
"sync"
77

8-
"github.com/cenkalti/backoff/v4"
98
"github.com/google/uuid"
109

1110
"cdr.dev/slog"
12-
"github.com/coder/coder/v2/coderd/database"
11+
"github.com/coder/coder/v2/coderd/database/pubsub"
1312
)
1413

1514
type readyForHandshake struct {
16-
hKey
17-
}
18-
19-
type hKey struct {
2015
src uuid.UUID
2116
dst uuid.UUID
2217
}
@@ -25,30 +20,26 @@ type handshaker struct {
2520
ctx context.Context
2621
logger slog.Logger
2722
coordinatorID uuid.UUID
28-
store database.Store
23+
pubsub pubsub.Pubsub
2924
updates <-chan readyForHandshake
3025

31-
workQ *workQ[hKey]
32-
3326
workerWG sync.WaitGroup
3427
}
3528

3629
func newHandshaker(ctx context.Context,
3730
logger slog.Logger,
3831
id uuid.UUID,
39-
store database.Store,
32+
ps pubsub.Pubsub,
4033
updates <-chan readyForHandshake,
4134
startWorkers <-chan struct{},
4235
) *handshaker {
4336
s := &handshaker{
4437
ctx: ctx,
4538
logger: logger,
4639
coordinatorID: id,
47-
store: store,
40+
pubsub: ps,
4841
updates: updates,
49-
workQ: newWorkQ[hKey](ctx),
5042
}
51-
go s.handle()
5243
// add to the waitgroup immediately to avoid any races waiting for it before
5344
// the workers start.
5445
s.workerWG.Add(numHandshakerWorkers)
@@ -61,73 +52,22 @@ func newHandshaker(ctx context.Context,
6152
return s
6253
}
6354

64-
func (t *handshaker) handle() {
65-
for {
66-
select {
67-
case <-t.ctx.Done():
68-
t.logger.Debug(t.ctx, "handshaker exiting", slog.Error(t.ctx.Err()))
69-
return
70-
case rfh := <-t.updates:
71-
t.workQ.enqueue(rfh.hKey)
72-
}
73-
}
74-
}
75-
7655
func (t *handshaker) worker() {
7756
defer t.workerWG.Done()
78-
eb := backoff.NewExponentialBackOff()
79-
eb.MaxElapsedTime = 0 // retry indefinitely
80-
eb.MaxInterval = dbMaxBackoff
81-
bkoff := backoff.WithContext(eb, t.ctx)
57+
8258
for {
83-
hk, err := t.workQ.acquire()
84-
if err != nil {
85-
// context expired
59+
select {
60+
case <-t.ctx.Done():
61+
t.logger.Debug(t.ctx, "handshaker worker exiting", slog.Error(t.ctx.Err()))
8662
return
87-
}
88-
err = backoff.Retry(func() error {
89-
return t.writeOne(hk)
90-
}, bkoff)
91-
if err != nil {
92-
bkoff.Reset()
93-
}
94-
t.workQ.done(hk)
95-
}
96-
}
97-
98-
func (t *handshaker) writeOne(hk hKey) error {
99-
logger := t.logger.With(
100-
slog.F("src_id", hk.src),
101-
slog.F("dst_id", hk.dst),
102-
)
10363

104-
peers, err := t.store.GetTailnetTunnelPeerIDs(t.ctx, hk.src)
105-
if err != nil {
106-
if !database.IsQueryCanceledError(err) {
107-
logger.Error(t.ctx, "get tunnel peers ids", slog.Error(err))
108-
}
109-
return err
110-
}
111-
112-
if !slices.ContainsFunc(peers, func(peer database.GetTailnetTunnelPeerIDsRow) bool {
113-
return peer.PeerID == hk.dst
114-
}) {
115-
// In the in-memory coordinator we return an error to the client, but
116-
// this isn't really possible here.
117-
logger.Warn(t.ctx, "cannot process ready for handshake, src isn't peered with dst")
118-
return nil
119-
}
120-
121-
err = t.store.PublishReadyForHandshake(t.ctx, database.PublishReadyForHandshakeParams{
122-
To: hk.dst.String(),
123-
From: hk.src.String(),
124-
})
125-
if err != nil {
126-
if !database.IsQueryCanceledError(err) {
127-
logger.Error(t.ctx, "publish ready for handshake", slog.Error(err))
64+
case rfh := <-t.updates:
65+
err := t.pubsub.Publish(eventReadyForHandshake, []byte(fmt.Sprintf(
66+
"%s,%s", rfh.dst.String(), rfh.src.String(),
67+
)))
68+
if err != nil {
69+
t.logger.Error(t.ctx, "publish ready for handshake", slog.Error(err))
70+
}
12871
}
129-
return err
13072
}
131-
132-
return nil
13373
}

enterprise/tailnet/handshaker_internal_test.go

-45
This file was deleted.

0 commit comments

Comments
 (0)