@@ -2,52 +2,72 @@ package tailnet
2
2
3
3
import (
4
4
"context"
5
- "encoding/json"
6
5
"io"
7
- "net"
6
+ "sync"
7
+ "sync/atomic"
8
+ "time"
8
9
9
10
"github.com/google/uuid"
10
11
"golang.org/x/xerrors"
11
- "nhooyr.io/websocket"
12
12
13
13
"cdr.dev/slog"
14
+
14
15
agpl "github.com/coder/coder/v2/tailnet"
16
+ "github.com/coder/coder/v2/tailnet/proto"
15
17
)
16
18
17
19
// connIO manages the reading and writing to a connected client or agent. Agent connIOs have their client field set to
18
20
// uuid.Nil. It reads node updates via its decoder, then pushes them onto the bindings channel. It receives mappings
19
21
// via its updates TrackedConn, which then writes them.
20
22
type connIO struct {
21
- pCtx context.Context
22
- ctx context.Context
23
- cancel context.CancelFunc
24
- logger slog.Logger
25
- decoder * json.Decoder
26
- updates * agpl.TrackedConn
27
- bindings chan <- binding
23
+ id uuid.UUID
24
+ pCtx context.Context
25
+ ctx context.Context
26
+ cancel context.CancelFunc
27
+ logger slog.Logger
28
+ requests <- chan * proto.CoordinateRequest
29
+ responses chan <- * proto.CoordinateResponse
30
+ bindings chan <- binding
31
+ tunnels chan <- tunnel
32
+ auth agpl.TunnelAuth
33
+ mu sync.Mutex
34
+ closed bool
35
+
36
+ name string
37
+ start int64
38
+ lastWrite int64
39
+ overwrites int64
28
40
}
29
41
30
42
func newConnIO (pCtx context.Context ,
31
43
logger slog.Logger ,
32
44
bindings chan <- binding ,
33
- conn net.Conn ,
45
+ tunnels chan <- tunnel ,
46
+ requests <- chan * proto.CoordinateRequest ,
47
+ responses chan <- * proto.CoordinateResponse ,
34
48
id uuid.UUID ,
35
49
name string ,
36
- kind agpl.QueueKind ,
50
+ auth agpl.TunnelAuth ,
37
51
) * connIO {
38
52
ctx , cancel := context .WithCancel (pCtx )
53
+ now := time .Now ().Unix ()
39
54
c := & connIO {
40
- pCtx : pCtx ,
41
- ctx : ctx ,
42
- cancel : cancel ,
43
- logger : logger ,
44
- decoder : json .NewDecoder (conn ),
45
- updates : agpl .NewTrackedConn (ctx , cancel , conn , id , logger , name , 0 , kind ),
46
- bindings : bindings ,
55
+ id : id ,
56
+ pCtx : pCtx ,
57
+ ctx : ctx ,
58
+ cancel : cancel ,
59
+ logger : logger .With (slog .F ("name" , name )),
60
+ requests : requests ,
61
+ responses : responses ,
62
+ bindings : bindings ,
63
+ tunnels : tunnels ,
64
+ auth : auth ,
65
+ name : name ,
66
+ start : now ,
67
+ lastWrite : now ,
47
68
}
48
69
go c .recvLoop ()
49
- go c .updates .SendUpdates ()
50
- logger .Info (ctx , "serving connection" )
70
+ c .logger .Info (pCtx , "serving connection" )
51
71
return c
52
72
}
53
73
@@ -56,82 +76,142 @@ func (c *connIO) recvLoop() {
56
76
// withdraw bindings when we exit. We need to use the parent context here, since our own context might be
57
77
// canceled, but we still need to withdraw bindings.
58
78
b := binding {
59
- bKey : bKey {
60
- id : c .UniqueID (),
61
- kind : c .Kind (),
62
- },
79
+ bKey : bKey (c .UniqueID ()),
63
80
}
64
81
if err := sendCtx (c .pCtx , c .bindings , b ); err != nil {
65
- c .logger .Debug (c .ctx , "parent context expired while withdrawing bindings" , slog .Error (err ))
82
+ c .logger .Debug (c .pCtx , "parent context expired while withdrawing bindings" , slog .Error (err ))
66
83
}
67
84
}()
68
- defer c .cancel ()
85
+ defer c .Close ()
69
86
for {
70
- var node agpl.Node
71
- err := c .decoder .Decode (& node )
87
+ req , err := recvCtx (c .ctx , c .requests )
72
88
if err != nil {
73
- if xerrors .Is (err , io .EOF ) ||
74
- xerrors .Is (err , io .ErrClosedPipe ) ||
75
- xerrors .Is (err , context .Canceled ) ||
89
+ if xerrors .Is (err , context .Canceled ) ||
76
90
xerrors .Is (err , context .DeadlineExceeded ) ||
77
- websocket . CloseStatus (err ) > 0 {
78
- c .logger .Debug (c .ctx , "exiting recvLoop" , slog .Error (err ))
91
+ xerrors . Is (err , io . EOF ) {
92
+ c .logger .Debug (c .pCtx , "exiting recvLoop" , slog .Error (err ))
79
93
} else {
80
- c .logger .Error (c .ctx , "failed to decode Node update " , slog .Error (err ))
94
+ c .logger .Error (c .pCtx , "failed to receive request " , slog .Error (err ))
81
95
}
82
96
return
83
97
}
84
- c .logger .Debug (c .ctx , "got node update" , slog .F ("node" , node ))
98
+ if err := c .handleRequest (req ); err != nil {
99
+ return
100
+ }
101
+ }
102
+ }
103
+
104
+ func (c * connIO ) handleRequest (req * proto.CoordinateRequest ) error {
105
+ c .logger .Debug (c .ctx , "got request" )
106
+ if req .UpdateSelf != nil {
107
+ c .logger .Debug (c .ctx , "got node update" , slog .F ("node" , req .UpdateSelf ))
85
108
b := binding {
86
- bKey : bKey {
87
- id : c .UniqueID (),
88
- kind : c .Kind (),
109
+ bKey : bKey (c .UniqueID ()),
110
+ node : req .UpdateSelf .Node ,
111
+ }
112
+ if err := sendCtx (c .pCtx , c .bindings , b ); err != nil {
113
+ c .logger .Debug (c .ctx , "failed to send binding, context expired?" , slog .Error (err ))
114
+ return err
115
+ }
116
+ }
117
+ if req .AddTunnel != nil {
118
+ c .logger .Debug (c .ctx , "got add tunnel" , slog .F ("tunnel" , req .AddTunnel ))
119
+ dst , err := uuid .FromBytes (req .AddTunnel .Uuid )
120
+ if err != nil {
121
+ c .logger .Error (c .ctx , "unable to convert bytes to UUID" , slog .Error (err ))
122
+ // this shouldn't happen unless there is a client error. Close the connection so the client
123
+ // doesn't just happily continue thinking everything is fine.
124
+ return err
125
+ }
126
+ if ! c .auth .Authorize (dst ) {
127
+ return xerrors .New ("unauthorized tunnel" )
128
+ }
129
+ t := tunnel {
130
+ tKey : tKey {
131
+ src : c .UniqueID (),
132
+ dst : dst ,
89
133
},
90
- node : & node ,
134
+ active : true ,
91
135
}
92
- if err := sendCtx (c .ctx , c .bindings , b ); err != nil {
93
- c .logger .Debug (c .ctx , "recvLoop ctx expired" , slog .Error (err ))
94
- return
136
+ if err := sendCtx (c .pCtx , c .tunnels , t ); err != nil {
137
+ c .logger .Debug (c .ctx , "failed to send add tunnel, context expired? " , slog .Error (err ))
138
+ return err
95
139
}
96
140
}
141
+ if req .RemoveTunnel != nil {
142
+ c .logger .Debug (c .ctx , "got remove tunnel" , slog .F ("tunnel" , req .RemoveTunnel ))
143
+ dst , err := uuid .FromBytes (req .RemoveTunnel .Uuid )
144
+ if err != nil {
145
+ c .logger .Error (c .ctx , "unable to convert bytes to UUID" , slog .Error (err ))
146
+ // this shouldn't happen unless there is a client error. Close the connection so the client
147
+ // doesn't just happily continue thinking everything is fine.
148
+ return err
149
+ }
150
+ t := tunnel {
151
+ tKey : tKey {
152
+ src : c .UniqueID (),
153
+ dst : dst ,
154
+ },
155
+ active : false ,
156
+ }
157
+ if err := sendCtx (c .pCtx , c .tunnels , t ); err != nil {
158
+ c .logger .Debug (c .ctx , "failed to send remove tunnel, context expired?" , slog .Error (err ))
159
+ return err
160
+ }
161
+ }
162
+ // TODO: (spikecurtis) support Disconnect
163
+ return nil
97
164
}
98
165
99
166
func (c * connIO ) UniqueID () uuid.UUID {
100
- return c .updates .UniqueID ()
101
- }
102
-
103
- func (c * connIO ) Kind () agpl.QueueKind {
104
- return c .updates .Kind ()
167
+ return c .id
105
168
}
106
169
107
- func (c * connIO ) Enqueue (n []* agpl.Node ) error {
108
- return c .updates .Enqueue (n )
170
+ func (c * connIO ) Enqueue (resp * proto.CoordinateResponse ) error {
171
+ atomic .StoreInt64 (& c .lastWrite , time .Now ().Unix ())
172
+ c .mu .Lock ()
173
+ closed := c .closed
174
+ c .mu .Unlock ()
175
+ if closed {
176
+ return xerrors .New ("connIO closed" )
177
+ }
178
+ select {
179
+ case <- c .pCtx .Done ():
180
+ return c .pCtx .Err ()
181
+ case c .responses <- resp :
182
+ return nil
183
+ default :
184
+ return agpl .ErrWouldBlock
185
+ }
109
186
}
110
187
111
188
func (c * connIO ) Name () string {
112
- return c .updates . Name ()
189
+ return c .name
113
190
}
114
191
115
192
func (c * connIO ) Stats () (start int64 , lastWrite int64 ) {
116
- return c .updates . Stats ( )
193
+ return c .start , atomic . LoadInt64 ( & c . lastWrite )
117
194
}
118
195
119
196
func (c * connIO ) Overwrites () int64 {
120
- return c . updates . Overwrites ( )
197
+ return atomic . LoadInt64 ( & c . overwrites )
121
198
}
122
199
123
200
// CoordinatorClose is used by the coordinator when closing a Queue. It
124
201
// should skip removing itself from the coordinator.
125
202
func (c * connIO ) CoordinatorClose () error {
126
- c .cancel ()
127
- return c .updates .CoordinatorClose ()
203
+ return c .Close ()
128
204
}
129
205
130
206
func (c * connIO ) Done () <- chan struct {} {
131
207
return c .ctx .Done ()
132
208
}
133
209
134
210
func (c * connIO ) Close () error {
211
+ c .mu .Lock ()
212
+ defer c .mu .Unlock ()
135
213
c .cancel ()
136
- return c .updates .Close ()
214
+ c .closed = true
215
+ close (c .responses )
216
+ return nil
137
217
}
0 commit comments