@@ -7,11 +7,13 @@ import (
7
7
"strconv"
8
8
"strings"
9
9
"sync/atomic"
10
+ "time"
10
11
11
12
"github.com/google/uuid"
12
13
"github.com/hashicorp/yamux"
13
14
"storj.io/drpc/drpcmux"
14
15
"storj.io/drpc/drpcserver"
16
+ "tailscale.com/tailcfg"
15
17
16
18
"cdr.dev/slog"
17
19
"github.com/coder/coder/v2/tailnet/proto"
@@ -92,10 +94,22 @@ type ClientService struct {
92
94
93
95
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
94
96
// loaded on each processed connection.
95
- func NewClientService (logger slog.Logger , coordPtr * atomic.Pointer [Coordinator ]) (* ClientService , error ) {
97
+ func NewClientService (
98
+ logger slog.Logger ,
99
+ coordPtr * atomic.Pointer [Coordinator ],
100
+ derpMapUpdateFrequency time.Duration ,
101
+ derpMapFn func () * tailcfg.DERPMap ,
102
+ ) (
103
+ * ClientService , error ,
104
+ ) {
96
105
s := & ClientService {logger : logger , coordPtr : coordPtr }
97
106
mux := drpcmux .New ()
98
- drpcService := NewDRPCService (logger , coordPtr )
107
+ drpcService := & DRPCService {
108
+ CoordPtr : coordPtr ,
109
+ Logger : logger ,
110
+ DerpMapUpdateFrequency : derpMapUpdateFrequency ,
111
+ DerpMapFn : derpMapFn ,
112
+ }
99
113
err := proto .DRPCRegisterClient (mux , drpcService )
100
114
if err != nil {
101
115
return nil , xerrors .Errorf ("register DRPC service: %w" , err )
@@ -145,20 +159,37 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne
145
159
146
160
// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
147
161
type DRPCService struct {
148
- coordPtr * atomic.Pointer [Coordinator ]
149
- logger slog.Logger
162
+ CoordPtr * atomic.Pointer [Coordinator ]
163
+ Logger slog.Logger
164
+ DerpMapUpdateFrequency time.Duration
165
+ DerpMapFn func () * tailcfg.DERPMap
150
166
}
151
167
152
- func NewDRPCService (logger slog.Logger , coordPtr * atomic.Pointer [Coordinator ]) * DRPCService {
153
- return & DRPCService {
154
- coordPtr : coordPtr ,
155
- logger : logger ,
156
- }
157
- }
168
+ func (s * DRPCService ) StreamDERPMaps (_ * proto.StreamDERPMapsRequest , stream proto.DRPCClient_StreamDERPMapsStream ) error {
169
+ defer stream .Close ()
170
+
171
+ ticker := time .NewTicker (s .DerpMapUpdateFrequency )
172
+ defer ticker .Stop ()
158
173
159
- func (* DRPCService ) StreamDERPMaps (* proto.StreamDERPMapsRequest , proto.DRPCClient_StreamDERPMapsStream ) error {
160
- // TODO integrate with Dean's PR implementation
161
- return xerrors .New ("unimplemented" )
174
+ var lastDERPMap * tailcfg.DERPMap
175
+ for {
176
+ derpMap := s .DerpMapFn ()
177
+ if lastDERPMap == nil || ! CompareDERPMaps (lastDERPMap , derpMap ) {
178
+ protoDERPMap := DERPMapToProto (derpMap )
179
+ err := stream .Send (protoDERPMap )
180
+ if err != nil {
181
+ return xerrors .Errorf ("send derp map: %w" , err )
182
+ }
183
+ lastDERPMap = derpMap
184
+ }
185
+
186
+ ticker .Reset (s .DerpMapUpdateFrequency )
187
+ select {
188
+ case <- stream .Context ().Done ():
189
+ return nil
190
+ case <- ticker .C :
191
+ }
192
+ }
162
193
}
163
194
164
195
func (s * DRPCService ) CoordinateTailnet (stream proto.DRPCClient_CoordinateTailnetStream ) error {
@@ -168,9 +199,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
168
199
_ = stream .Close ()
169
200
return xerrors .New ("no Stream ID" )
170
201
}
171
- logger := s .logger .With (slog .F ("peer_id" , streamID ), slog .F ("name" , streamID .Name ))
202
+ logger := s .Logger .With (slog .F ("peer_id" , streamID ), slog .F ("name" , streamID .Name ))
172
203
logger .Debug (ctx , "starting tailnet Coordinate" )
173
- coord := * (s .coordPtr .Load ())
204
+ coord := * (s .CoordPtr .Load ())
174
205
reqs , resps := coord .Coordinate (ctx , streamID .ID , streamID .Name , streamID .Auth )
175
206
c := communicator {
176
207
logger : logger ,
0 commit comments