Skip to content

Commit 0e96fa3

Browse files
committed
feat: modify PG Coordinator to work with new v2 Tailnet API
1 parent 0d829dd commit 0e96fa3

File tree

6 files changed

+767
-709
lines changed

6 files changed

+767
-709
lines changed

enterprise/tailnet/connio.go

+136-56
Original file line numberDiff line numberDiff line change
@@ -2,52 +2,72 @@ package tailnet
22

33
import (
44
"context"
5-
"encoding/json"
65
"io"
7-
"net"
6+
"sync"
7+
"sync/atomic"
8+
"time"
89

910
"github.com/google/uuid"
1011
"golang.org/x/xerrors"
11-
"nhooyr.io/websocket"
1212

1313
"cdr.dev/slog"
14+
1415
agpl "github.com/coder/coder/v2/tailnet"
16+
"github.com/coder/coder/v2/tailnet/proto"
1517
)
1618

1719
// connIO manages the reading and writing to a connected client or agent. Agent connIOs have their client field set to
1820
// uuid.Nil. It reads node updates via its decoder, then pushes them onto the bindings channel. It receives mappings
1921
// via its updates TrackedConn, which then writes them.
2022
type connIO struct {
21-
pCtx context.Context
22-
ctx context.Context
23-
cancel context.CancelFunc
24-
logger slog.Logger
25-
decoder *json.Decoder
26-
updates *agpl.TrackedConn
27-
bindings chan<- binding
23+
id uuid.UUID
24+
pCtx context.Context
25+
ctx context.Context
26+
cancel context.CancelFunc
27+
logger slog.Logger
28+
requests <-chan *proto.CoordinateRequest
29+
responses chan<- *proto.CoordinateResponse
30+
bindings chan<- binding
31+
tunnels chan<- tunnel
32+
auth agpl.TunnelAuth
33+
mu sync.Mutex
34+
closed bool
35+
36+
name string
37+
start int64
38+
lastWrite int64
39+
overwrites int64
2840
}
2941

3042
func newConnIO(pCtx context.Context,
3143
logger slog.Logger,
3244
bindings chan<- binding,
33-
conn net.Conn,
45+
tunnels chan<- tunnel,
46+
requests <-chan *proto.CoordinateRequest,
47+
responses chan<- *proto.CoordinateResponse,
3448
id uuid.UUID,
3549
name string,
36-
kind agpl.QueueKind,
50+
auth agpl.TunnelAuth,
3751
) *connIO {
3852
ctx, cancel := context.WithCancel(pCtx)
53+
now := time.Now().Unix()
3954
c := &connIO{
40-
pCtx: pCtx,
41-
ctx: ctx,
42-
cancel: cancel,
43-
logger: logger,
44-
decoder: json.NewDecoder(conn),
45-
updates: agpl.NewTrackedConn(ctx, cancel, conn, id, logger, name, 0, kind),
46-
bindings: bindings,
55+
id: id,
56+
pCtx: pCtx,
57+
ctx: ctx,
58+
cancel: cancel,
59+
logger: logger.With(slog.F("name", name)),
60+
requests: requests,
61+
responses: responses,
62+
bindings: bindings,
63+
tunnels: tunnels,
64+
auth: auth,
65+
name: name,
66+
start: now,
67+
lastWrite: now,
4768
}
4869
go c.recvLoop()
49-
go c.updates.SendUpdates()
50-
logger.Info(ctx, "serving connection")
70+
c.logger.Info(pCtx, "serving connection")
5171
return c
5272
}
5373

@@ -56,82 +76,142 @@ func (c *connIO) recvLoop() {
5676
// withdraw bindings when we exit. We need to use the parent context here, since our own context might be
5777
// canceled, but we still need to withdraw bindings.
5878
b := binding{
59-
bKey: bKey{
60-
id: c.UniqueID(),
61-
kind: c.Kind(),
62-
},
79+
bKey: bKey(c.UniqueID()),
6380
}
6481
if err := sendCtx(c.pCtx, c.bindings, b); err != nil {
65-
c.logger.Debug(c.ctx, "parent context expired while withdrawing bindings", slog.Error(err))
82+
c.logger.Debug(c.pCtx, "parent context expired while withdrawing bindings", slog.Error(err))
6683
}
6784
}()
68-
defer c.cancel()
85+
defer c.Close()
6986
for {
70-
var node agpl.Node
71-
err := c.decoder.Decode(&node)
87+
req, err := recvCtx(c.ctx, c.requests)
7288
if err != nil {
73-
if xerrors.Is(err, io.EOF) ||
74-
xerrors.Is(err, io.ErrClosedPipe) ||
75-
xerrors.Is(err, context.Canceled) ||
89+
if xerrors.Is(err, context.Canceled) ||
7690
xerrors.Is(err, context.DeadlineExceeded) ||
77-
websocket.CloseStatus(err) > 0 {
78-
c.logger.Debug(c.ctx, "exiting recvLoop", slog.Error(err))
91+
xerrors.Is(err, io.EOF) {
92+
c.logger.Debug(c.pCtx, "exiting recvLoop", slog.Error(err))
7993
} else {
80-
c.logger.Error(c.ctx, "failed to decode Node update", slog.Error(err))
94+
c.logger.Error(c.pCtx, "failed to receive request", slog.Error(err))
8195
}
8296
return
8397
}
84-
c.logger.Debug(c.ctx, "got node update", slog.F("node", node))
98+
if err := c.handleRequest(req); err != nil {
99+
return
100+
}
101+
}
102+
}
103+
104+
func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
105+
c.logger.Debug(c.ctx, "got request")
106+
if req.UpdateSelf != nil {
107+
c.logger.Debug(c.ctx, "got node update", slog.F("node", req.UpdateSelf))
85108
b := binding{
86-
bKey: bKey{
87-
id: c.UniqueID(),
88-
kind: c.Kind(),
109+
bKey: bKey(c.UniqueID()),
110+
node: req.UpdateSelf.Node,
111+
}
112+
if err := sendCtx(c.pCtx, c.bindings, b); err != nil {
113+
c.logger.Debug(c.ctx, "failed to send binding, context expired?", slog.Error(err))
114+
return err
115+
}
116+
}
117+
if req.AddTunnel != nil {
118+
c.logger.Debug(c.ctx, "got add tunnel", slog.F("tunnel", req.AddTunnel))
119+
dst, err := uuid.FromBytes(req.AddTunnel.Uuid)
120+
if err != nil {
121+
c.logger.Error(c.ctx, "unable to convert bytes to UUID", slog.Error(err))
122+
// this shouldn't happen unless there is a client error. Close the connection so the client
123+
// doesn't just happily continue thinking everything is fine.
124+
return err
125+
}
126+
if !c.auth.Authorize(dst) {
127+
return xerrors.New("unauthorized tunnel")
128+
}
129+
t := tunnel{
130+
tKey: tKey{
131+
src: c.UniqueID(),
132+
dst: dst,
89133
},
90-
node: &node,
134+
active: true,
91135
}
92-
if err := sendCtx(c.ctx, c.bindings, b); err != nil {
93-
c.logger.Debug(c.ctx, "recvLoop ctx expired", slog.Error(err))
94-
return
136+
if err := sendCtx(c.pCtx, c.tunnels, t); err != nil {
137+
c.logger.Debug(c.ctx, "failed to send add tunnel, context expired?", slog.Error(err))
138+
return err
95139
}
96140
}
141+
if req.RemoveTunnel != nil {
142+
c.logger.Debug(c.ctx, "got remove tunnel", slog.F("tunnel", req.RemoveTunnel))
143+
dst, err := uuid.FromBytes(req.RemoveTunnel.Uuid)
144+
if err != nil {
145+
c.logger.Error(c.ctx, "unable to convert bytes to UUID", slog.Error(err))
146+
// this shouldn't happen unless there is a client error. Close the connection so the client
147+
// doesn't just happily continue thinking everything is fine.
148+
return err
149+
}
150+
t := tunnel{
151+
tKey: tKey{
152+
src: c.UniqueID(),
153+
dst: dst,
154+
},
155+
active: false,
156+
}
157+
if err := sendCtx(c.pCtx, c.tunnels, t); err != nil {
158+
c.logger.Debug(c.ctx, "failed to send remove tunnel, context expired?", slog.Error(err))
159+
return err
160+
}
161+
}
162+
// TODO: (spikecurtis) support Disconnect
163+
return nil
97164
}
98165

99166
func (c *connIO) UniqueID() uuid.UUID {
100-
return c.updates.UniqueID()
101-
}
102-
103-
func (c *connIO) Kind() agpl.QueueKind {
104-
return c.updates.Kind()
167+
return c.id
105168
}
106169

107-
func (c *connIO) Enqueue(n []*agpl.Node) error {
108-
return c.updates.Enqueue(n)
170+
func (c *connIO) Enqueue(resp *proto.CoordinateResponse) error {
171+
atomic.StoreInt64(&c.lastWrite, time.Now().Unix())
172+
c.mu.Lock()
173+
closed := c.closed
174+
c.mu.Unlock()
175+
if closed {
176+
return xerrors.New("connIO closed")
177+
}
178+
select {
179+
case <-c.pCtx.Done():
180+
return c.pCtx.Err()
181+
case c.responses <- resp:
182+
return nil
183+
default:
184+
return agpl.ErrWouldBlock
185+
}
109186
}
110187

111188
func (c *connIO) Name() string {
112-
return c.updates.Name()
189+
return c.name
113190
}
114191

115192
func (c *connIO) Stats() (start int64, lastWrite int64) {
116-
return c.updates.Stats()
193+
return c.start, atomic.LoadInt64(&c.lastWrite)
117194
}
118195

119196
func (c *connIO) Overwrites() int64 {
120-
return c.updates.Overwrites()
197+
return atomic.LoadInt64(&c.overwrites)
121198
}
122199

123200
// CoordinatorClose is used by the coordinator when closing a Queue. It
124201
// should skip removing itself from the coordinator.
125202
func (c *connIO) CoordinatorClose() error {
126-
c.cancel()
127-
return c.updates.CoordinatorClose()
203+
return c.Close()
128204
}
129205

130206
func (c *connIO) Done() <-chan struct{} {
131207
return c.ctx.Done()
132208
}
133209

134210
func (c *connIO) Close() error {
211+
c.mu.Lock()
212+
defer c.mu.Unlock()
135213
c.cancel()
136-
return c.updates.Close()
214+
c.closed = true
215+
close(c.responses)
216+
return nil
137217
}

0 commit comments

Comments
 (0)