Skip to content

Commit ba97700

Browse files
committed
spike changes
1 parent d7f72a8 commit ba97700

17 files changed

+175
-293
lines changed

coderd/database/dbauthz/dbauthz.go

-7
Original file line numberDiff line numberDiff line change
@@ -2645,13 +2645,6 @@ func (q *querier) ListWorkspaceAgentPortShares(ctx context.Context, workspaceID
26452645
return q.db.ListWorkspaceAgentPortShares(ctx, workspaceID)
26462646
}
26472647

2648-
func (q *querier) PublishReadyForHandshake(ctx context.Context, arg database.PublishReadyForHandshakeParams) error {
2649-
if err := q.authorizeContext(ctx, rbac.ActionUpdate, rbac.ResourceTailnetCoordinator); err != nil {
2650-
return err
2651-
}
2652-
return q.db.PublishReadyForHandshake(ctx, arg)
2653-
}
2654-
26552648
func (q *querier) ReduceWorkspaceAgentShareLevelToAuthenticatedByTemplate(ctx context.Context, templateID uuid.UUID) error {
26562649
template, err := q.db.GetTemplateByID(ctx, templateID)
26572650
if err != nil {

coderd/database/dbauthz/dbauthz_test.go

-5
Original file line numberDiff line numberDiff line change
@@ -1829,11 +1829,6 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
18291829
Asserts(rbac.ResourceTailnetCoordinator, rbac.ActionCreate).
18301830
Errors(dbmem.ErrUnimplemented)
18311831
}))
1832-
s.Run("PublishReadyForHandshake", s.Subtest(func(db database.Store, check *expects) {
1833-
check.Args(database.PublishReadyForHandshakeParams{}).
1834-
Asserts(rbac.ResourceTailnetCoordinator, rbac.ActionUpdate).
1835-
Errors(dbmem.ErrUnimplemented)
1836-
}))
18371832
}
18381833

18391834
func (s *MethodTestSuite) TestDBCrypt() {

coderd/database/dbmem/dbmem.go

-4
Original file line numberDiff line numberDiff line change
@@ -6742,10 +6742,6 @@ func (q *FakeQuerier) ListWorkspaceAgentPortShares(_ context.Context, workspaceI
67426742
return shares, nil
67436743
}
67446744

6745-
func (*FakeQuerier) PublishReadyForHandshake(context.Context, database.PublishReadyForHandshakeParams) error {
6746-
return ErrUnimplemented
6747-
}
6748-
67496745
func (q *FakeQuerier) ReduceWorkspaceAgentShareLevelToAuthenticatedByTemplate(_ context.Context, templateID uuid.UUID) error {
67506746
err := validateDatabaseType(templateID)
67516747
if err != nil {

coderd/database/dbmetrics/dbmetrics.go

-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

-17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/tailnet.sql

-6
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,6 @@ FROM tailnet_tunnels
207207
INNER JOIN tailnet_peers ON tailnet_tunnels.src_id = tailnet_peers.id
208208
WHERE tailnet_tunnels.dst_id = $1;
209209

210-
-- name: PublishReadyForHandshake :exec
211-
SELECT pg_notify(
212-
'tailnet_ready_for_handshake',
213-
format('%s,%s', sqlc.arg('to')::text, sqlc.arg('from')::text)
214-
);
215-
216210
-- For PG Coordinator HTMLDebug
217211

218212
-- name: GetAllTailnetCoordinators :many

enterprise/tailnet/connio.go

+29
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package tailnet
22

33
import (
44
"context"
5+
"fmt"
6+
"slices"
57
"sync"
68
"sync/atomic"
79
"time"
@@ -35,6 +37,8 @@ type connIO struct {
3537
mu sync.Mutex
3638
closed bool
3739
disconnected bool
40+
// latest is the most recent, unfiltered snapshot of the mappings we know about
41+
latest []mapping
3842

3943
name string
4044
start int64
@@ -204,6 +208,19 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
204208
return err
205209
}
206210

211+
mappings := c.getLatestMapping()
212+
if !slices.ContainsFunc(mappings, func(mapping mapping) bool {
213+
return mapping.peer == dst
214+
}) {
215+
c.logger.Debug(c.peerCtx, "cannot process ready for handshake, src isn't peered with dst",
216+
slog.F("dst", dst.String()),
217+
)
218+
_ = c.Enqueue(&proto.CoordinateResponse{
219+
Error: fmt.Sprintf("you do not share a tunnel with %q", dst.String()),
220+
})
221+
return nil
222+
}
223+
207224
if err := agpl.SendCtx(c.coordCtx, c.rfhs, readyForHandshake{hKey: hKey{
208225
src: c.id,
209226
dst: dst,
@@ -216,6 +233,18 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
216233
return nil
217234
}
218235

236+
func (c *connIO) setLatestMapping(latest []mapping) {
237+
c.mu.Lock()
238+
defer c.mu.Unlock()
239+
c.latest = latest
240+
}
241+
242+
func (c *connIO) getLatestMapping() []mapping {
243+
c.mu.Lock()
244+
defer c.mu.Unlock()
245+
return c.latest
246+
}
247+
219248
func (c *connIO) UniqueID() uuid.UUID {
220249
return c.id
221250
}

enterprise/tailnet/handshaker.go

+16-68
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package tailnet
22

33
import (
44
"context"
5-
"slices"
5+
"fmt"
66
"sync"
77

8-
"github.com/cenkalti/backoff/v4"
98
"github.com/google/uuid"
109

1110
"cdr.dev/slog"
1211
"github.com/coder/coder/v2/coderd/database"
12+
"github.com/coder/coder/v2/coderd/database/pubsub"
1313
)
1414

1515
type readyForHandshake struct {
@@ -25,7 +25,7 @@ type handshaker struct {
2525
ctx context.Context
2626
logger slog.Logger
2727
coordinatorID uuid.UUID
28-
store database.Store
28+
pubsub pubsub.Pubsub
2929
updates <-chan readyForHandshake
3030

3131
workQ *workQ[hKey]
@@ -36,19 +36,18 @@ type handshaker struct {
3636
func newHandshaker(ctx context.Context,
3737
logger slog.Logger,
3838
id uuid.UUID,
39-
store database.Store,
39+
ps pubsub.Pubsub,
4040
updates <-chan readyForHandshake,
4141
startWorkers <-chan struct{},
4242
) *handshaker {
4343
s := &handshaker{
4444
ctx: ctx,
4545
logger: logger,
4646
coordinatorID: id,
47-
store: store,
47+
pubsub: ps,
4848
updates: updates,
4949
workQ: newWorkQ[hKey](ctx),
5050
}
51-
go s.handle()
5251
// add to the waitgroup immediately to avoid any races waiting for it before
5352
// the workers start.
5453
s.workerWG.Add(numHandshakerWorkers)
@@ -61,73 +60,22 @@ func newHandshaker(ctx context.Context,
6160
return s
6261
}
6362

64-
func (t *handshaker) handle() {
65-
for {
66-
select {
67-
case <-t.ctx.Done():
68-
t.logger.Debug(t.ctx, "handshaker exiting", slog.Error(t.ctx.Err()))
69-
return
70-
case rfh := <-t.updates:
71-
t.workQ.enqueue(rfh.hKey)
72-
}
73-
}
74-
}
75-
7663
func (t *handshaker) worker() {
7764
defer t.workerWG.Done()
78-
eb := backoff.NewExponentialBackOff()
79-
eb.MaxElapsedTime = 0 // retry indefinitely
80-
eb.MaxInterval = dbMaxBackoff
81-
bkoff := backoff.WithContext(eb, t.ctx)
65+
8266
for {
83-
hk, err := t.workQ.acquire()
84-
if err != nil {
85-
// context expired
67+
select {
68+
case <-t.ctx.Done():
69+
t.logger.Debug(t.ctx, "handshaker worker exiting", slog.Error(t.ctx.Err()))
8670
return
87-
}
88-
err = backoff.Retry(func() error {
89-
return t.writeOne(hk)
90-
}, bkoff)
91-
if err != nil {
92-
bkoff.Reset()
93-
}
94-
t.workQ.done(hk)
95-
}
96-
}
97-
98-
func (t *handshaker) writeOne(hk hKey) error {
99-
logger := t.logger.With(
100-
slog.F("src_id", hk.src),
101-
slog.F("dst_id", hk.dst),
102-
)
103-
104-
peers, err := t.store.GetTailnetTunnelPeerIDs(t.ctx, hk.src)
105-
if err != nil {
106-
if !database.IsQueryCanceledError(err) {
107-
logger.Error(t.ctx, "get tunnel peers ids", slog.Error(err))
108-
}
109-
return err
110-
}
111-
112-
if !slices.ContainsFunc(peers, func(peer database.GetTailnetTunnelPeerIDsRow) bool {
113-
return peer.PeerID == hk.dst
114-
}) {
115-
// In the in-memory coordinator we return an error to the client, but
116-
// this isn't really possible here.
117-
logger.Warn(t.ctx, "cannot process ready for handshake, src isn't peered with dst")
118-
return nil
119-
}
12071

121-
err = t.store.PublishReadyForHandshake(t.ctx, database.PublishReadyForHandshakeParams{
122-
To: hk.dst.String(),
123-
From: hk.src.String(),
124-
})
125-
if err != nil {
126-
if !database.IsQueryCanceledError(err) {
127-
logger.Error(t.ctx, "publish ready for handshake", slog.Error(err))
72+
case rfh := <-t.updates:
73+
err := t.pubsub.Publish(eventReadyForHandshake, []byte(fmt.Sprintf(
74+
"%s,%s", rfh.dst.String(), rfh.src.String(),
75+
)))
76+
if err != nil && !database.IsQueryCanceledError(err) {
77+
t.logger.Error(t.ctx, "publish ready for handshake", slog.Error(err))
78+
}
12879
}
129-
return err
13080
}
131-
132-
return nil
13381
}

enterprise/tailnet/handshaker_internal_test.go

-45
This file was deleted.

0 commit comments

Comments
 (0)