@@ -7,11 +7,14 @@ import (
7
7
"strconv"
8
8
"strings"
9
9
"sync/atomic"
10
+ "time"
10
11
11
12
"github.com/google/uuid"
12
13
"github.com/hashicorp/yamux"
14
+ "storj.io/drpc"
13
15
"storj.io/drpc/drpcmux"
14
16
"storj.io/drpc/drpcserver"
17
+ "tailscale.com/tailcfg"
15
18
16
19
"cdr.dev/slog"
17
20
"github.com/coder/coder/v2/tailnet/proto"
@@ -92,10 +95,22 @@ type ClientService struct {
92
95
93
96
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
94
97
// loaded on each processed connection.
95
- func NewClientService (logger slog.Logger , coordPtr * atomic.Pointer [Coordinator ]) (* ClientService , error ) {
98
+ func NewClientService (
99
+ logger slog.Logger ,
100
+ coordPtr * atomic.Pointer [Coordinator ],
101
+ derpMapUpdateFrequency time.Duration ,
102
+ derpMapFn func () * tailcfg.DERPMap ,
103
+ ) (
104
+ * ClientService , error ,
105
+ ) {
96
106
s := & ClientService {logger : logger , coordPtr : coordPtr }
97
107
mux := drpcmux .New ()
98
- drpcService := NewDRPCService (logger , coordPtr )
108
+ drpcService := & DRPCService {
109
+ CoordPtr : coordPtr ,
110
+ Logger : logger ,
111
+ DerpMapUpdateFrequency : derpMapUpdateFrequency ,
112
+ DerpMapFn : derpMapFn ,
113
+ }
99
114
err := proto .DRPCRegisterClient (mux , drpcService )
100
115
if err != nil {
101
116
return nil , xerrors .Errorf ("register DRPC service: %w" , err )
@@ -145,20 +160,42 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne
145
160
146
161
// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
147
162
type DRPCService struct {
148
- coordPtr * atomic.Pointer [Coordinator ]
149
- logger slog.Logger
163
+ CoordPtr * atomic.Pointer [Coordinator ]
164
+ Logger slog.Logger
165
+ DerpMapUpdateFrequency time.Duration
166
+ DerpMapFn func () * tailcfg.DERPMap
150
167
}
151
168
152
- func NewDRPCService (logger slog.Logger , coordPtr * atomic.Pointer [Coordinator ]) * DRPCService {
153
- return & DRPCService {
154
- coordPtr : coordPtr ,
155
- logger : logger ,
156
- }
169
+ type StreamDERPMapsStream interface {
170
+ drpc.Stream
171
+ Send (* proto.DERPMap ) error
157
172
}
158
173
159
- func (* DRPCService ) StreamDERPMaps (* proto.StreamDERPMapsRequest , proto.DRPCClient_StreamDERPMapsStream ) error {
160
- // TODO integrate with Dean's PR implementation
161
- return xerrors .New ("unimplemented" )
174
+ func (s * DRPCService ) StreamDERPMaps (_ * proto.StreamDERPMapsRequest , stream proto.DRPCClient_StreamDERPMapsStream ) error {
175
+ defer stream .Close ()
176
+
177
+ ticker := time .NewTicker (s .DerpMapUpdateFrequency )
178
+ defer ticker .Stop ()
179
+
180
+ var lastDERPMap * tailcfg.DERPMap
181
+ for {
182
+ derpMap := s .DerpMapFn ()
183
+ if lastDERPMap == nil || ! CompareDERPMaps (lastDERPMap , derpMap ) {
184
+ protoDERPMap := DERPMapToProto (derpMap )
185
+ err := stream .Send (protoDERPMap )
186
+ if err != nil {
187
+ return xerrors .Errorf ("send derp map: %w" , err )
188
+ }
189
+ lastDERPMap = derpMap
190
+ }
191
+
192
+ ticker .Reset (s .DerpMapUpdateFrequency )
193
+ select {
194
+ case <- stream .Context ().Done ():
195
+ return nil
196
+ case <- ticker .C :
197
+ }
198
+ }
162
199
}
163
200
164
201
func (s * DRPCService ) CoordinateTailnet (stream proto.DRPCClient_CoordinateTailnetStream ) error {
@@ -168,9 +205,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
168
205
_ = stream .Close ()
169
206
return xerrors .New ("no Stream ID" )
170
207
}
171
- logger := s .logger .With (slog .F ("peer_id" , streamID ), slog .F ("name" , streamID .Name ))
208
+ logger := s .Logger .With (slog .F ("peer_id" , streamID ), slog .F ("name" , streamID .Name ))
172
209
logger .Debug (ctx , "starting tailnet Coordinate" )
173
- coord := * (s .coordPtr .Load ())
210
+ coord := * (s .CoordPtr .Load ())
174
211
reqs , resps := coord .Coordinate (ctx , streamID .ID , streamID .Name , streamID .Auth )
175
212
c := communicator {
176
213
logger : logger ,
0 commit comments