Skip to content

Commit 16992ee

Browse files
authored
feat(tailnet): add workspace updates support to Controller (#15529)
re: #14730 Adds support in `tailnet.Controller` for WorkspaceUpdates. Also checks configured controllers against the clients returned by the dialer, so that if we connect with a dialer that doesn't support an RPC (for instance the in-memory dialer for ServerTailnet doesn't support WorkspaceUpdates), we throw an error if there is a controller expecting it.
1 parent aa0dc2d commit 16992ee

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

tailnet/controllers.go

+68
Original file line numberDiff line numberDiff line change
@@ -1177,12 +1177,44 @@ func (c *Controller) Run(ctx context.Context) {
11771177
continue
11781178
}
11791179
c.logger.Info(c.ctx, "obtained tailnet API v2+ client")
1180+
err = c.precheckClientsAndControllers(tailnetClients)
1181+
if err != nil {
1182+
c.logger.Critical(c.ctx, "failed precheck", slog.Error(err))
1183+
_ = tailnetClients.Closer.Close()
1184+
continue
1185+
}
1186+
retrier.Reset()
11801187
c.runControllersOnce(tailnetClients)
11811188
c.logger.Info(c.ctx, "tailnet API v2+ connection lost")
11821189
}
11831190
}()
11841191
}
11851192

1193+
// precheckClientsAndControllers checks that the set of clients we got is compatible with the
1194+
// configured controllers. These checks will fail if the dialer is incompatible with the set of
1195+
// controllers, or not configured correctly with respect to Tailnet API version.
1196+
func (c *Controller) precheckClientsAndControllers(clients ControlProtocolClients) error {
1197+
if clients.Coordinator == nil && c.CoordCtrl != nil {
1198+
return xerrors.New("missing Coordinator client; have controller")
1199+
}
1200+
if clients.DERP == nil && c.DERPCtrl != nil {
1201+
return xerrors.New("missing DERPMap client; have controller")
1202+
}
1203+
if clients.WorkspaceUpdates == nil && c.WorkspaceUpdatesCtrl != nil {
1204+
return xerrors.New("missing WorkspaceUpdates client; have controller")
1205+
}
1206+
1207+
// Telemetry and ResumeToken support is considered optional, but the clients must be present
1208+
// so that we can call the functions and get an "unimplemented" error.
1209+
if clients.ResumeToken == nil && c.ResumeTokenCtrl != nil {
1210+
return xerrors.New("missing ResumeToken client; have controller")
1211+
}
1212+
if clients.Telemetry == nil && c.TelemetryCtrl != nil {
1213+
return xerrors.New("missing Telemetry client; have controller")
1214+
}
1215+
return nil
1216+
}
1217+
11861218
// runControllersOnce uses the provided clients to call into the controllers once. It is combined
11871219
// into one function so that a problem with one tears down the other and triggers a retry (if
11881220
// appropriate). We typically multiplex all RPCs over the same websocket, so we want them to share
@@ -1236,6 +1268,18 @@ func (c *Controller) runControllersOnce(clients ControlProtocolClients) {
12361268
}
12371269
}()
12381270
}
1271+
if c.WorkspaceUpdatesCtrl != nil {
1272+
wg.Add(1)
1273+
go func() {
1274+
defer wg.Done()
1275+
c.workspaceUpdates(clients.WorkspaceUpdates)
1276+
if c.ctx.Err() == nil {
1277+
// Main context is still active, but our workspace updates stream exited, due to
1278+
// some error. Close down all the rest of the clients so we'll exit and retry.
1279+
closeClients()
1280+
}
1281+
}()
1282+
}
12391283

12401284
// Refresh token is a little different, in that we don't want its controller to hold open the
12411285
// connection on its own. So we keep it separate from the other wait group, and cancel its
@@ -1308,6 +1352,30 @@ func (c *Controller) derpMap(client DERPClient) error {
13081352
}
13091353
}
13101354

1355+
func (c *Controller) workspaceUpdates(client WorkspaceUpdatesClient) {
1356+
defer func() {
1357+
c.logger.Debug(c.ctx, "exiting workspaceUpdates control routine")
1358+
cErr := client.Close()
1359+
if cErr != nil {
1360+
c.logger.Debug(c.ctx, "error closing WorkspaceUpdates RPC", slog.Error(cErr))
1361+
}
1362+
}()
1363+
cw := c.WorkspaceUpdatesCtrl.New(client)
1364+
select {
1365+
case <-c.ctx.Done():
1366+
c.logger.Debug(c.ctx, "workspaceUpdates: context done")
1367+
return
1368+
case err := <-cw.Wait():
1369+
c.logger.Debug(c.ctx, "workspaceUpdates: wait done")
1370+
if err != nil &&
1371+
!xerrors.Is(err, io.EOF) &&
1372+
!xerrors.Is(err, context.Canceled) &&
1373+
!xerrors.Is(err, context.DeadlineExceeded) {
1374+
c.logger.Error(c.ctx, "workspace updates stream error", slog.Error(err))
1375+
}
1376+
}
1377+
}
1378+
13111379
func (c *Controller) refreshToken(ctx context.Context, client ResumeTokenClient) {
13121380
cw := c.ResumeTokenCtrl.New(client)
13131381
go func() {

tailnet/controllers_test.go

+137
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,49 @@ func TestController_TelemetrySuccess(t *testing.T) {
11351135
require.Equal(t, []byte("test event"), testEvents[0].Id)
11361136
}
11371137

1138+
func TestController_WorkspaceUpdates(t *testing.T) {
1139+
t.Parallel()
1140+
theError := xerrors.New("a bad thing happened")
1141+
testCtx := testutil.Context(t, testutil.WaitShort)
1142+
ctx, cancel := context.WithCancel(testCtx)
1143+
logger := slogtest.Make(t, &slogtest.Options{
1144+
IgnoredErrorIs: append(slogtest.DefaultIgnoredErrorIs, theError),
1145+
}).Leveled(slog.LevelDebug)
1146+
1147+
fClient := newFakeWorkspaceUpdateClient(testCtx, t)
1148+
dialer := &fakeWorkspaceUpdatesDialer{
1149+
client: fClient,
1150+
}
1151+
1152+
uut := tailnet.NewController(logger.Named("ctrl"), dialer)
1153+
fCtrl := newFakeUpdatesController(ctx, t)
1154+
uut.WorkspaceUpdatesCtrl = fCtrl
1155+
uut.Run(ctx)
1156+
1157+
// it should dial and pass the client to the controller
1158+
call := testutil.RequireRecvCtx(testCtx, t, fCtrl.calls)
1159+
require.Equal(t, fClient, call.client)
1160+
fCW := newFakeCloserWaiter()
1161+
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW)
1162+
1163+
// if the CloserWaiter exits...
1164+
testutil.RequireSendCtx(testCtx, t, fCW.errCh, theError)
1165+
1166+
// it should close, redial and reconnect
1167+
cCall := testutil.RequireRecvCtx(testCtx, t, fClient.close)
1168+
testutil.RequireSendCtx(testCtx, t, cCall, nil)
1169+
1170+
call = testutil.RequireRecvCtx(testCtx, t, fCtrl.calls)
1171+
require.Equal(t, fClient, call.client)
1172+
fCW = newFakeCloserWaiter()
1173+
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW)
1174+
1175+
// canceling the context should close the client
1176+
cancel()
1177+
cCall = testutil.RequireRecvCtx(testCtx, t, fClient.close)
1178+
testutil.RequireSendCtx(testCtx, t, cCall, nil)
1179+
}
1180+
11381181
type fakeTailnetConn struct {
11391182
peersLostCh chan struct{}
11401183
}
@@ -1717,3 +1760,97 @@ func TestTunnelAllWorkspaceUpdatesController_HandleErrors(t *testing.T) {
17171760
})
17181761
}
17191762
}
1763+
1764+
type fakeWorkspaceUpdatesController struct {
1765+
ctx context.Context
1766+
t testing.TB
1767+
calls chan *newWorkspaceUpdatesCall
1768+
}
1769+
1770+
type newWorkspaceUpdatesCall struct {
1771+
client tailnet.WorkspaceUpdatesClient
1772+
resp chan<- tailnet.CloserWaiter
1773+
}
1774+
1775+
func (f fakeWorkspaceUpdatesController) New(client tailnet.WorkspaceUpdatesClient) tailnet.CloserWaiter {
1776+
resps := make(chan tailnet.CloserWaiter)
1777+
call := &newWorkspaceUpdatesCall{
1778+
client: client,
1779+
resp: resps,
1780+
}
1781+
select {
1782+
case <-f.ctx.Done():
1783+
f.t.Error("timed out waiting to send New call")
1784+
cw := newFakeCloserWaiter()
1785+
cw.errCh <- f.ctx.Err()
1786+
return cw
1787+
case f.calls <- call:
1788+
// OK
1789+
}
1790+
select {
1791+
case <-f.ctx.Done():
1792+
f.t.Error("timed out waiting to get New call response")
1793+
cw := newFakeCloserWaiter()
1794+
cw.errCh <- f.ctx.Err()
1795+
return cw
1796+
case resp := <-resps:
1797+
return resp
1798+
}
1799+
}
1800+
1801+
func newFakeUpdatesController(ctx context.Context, t *testing.T) *fakeWorkspaceUpdatesController {
1802+
return &fakeWorkspaceUpdatesController{
1803+
ctx: ctx,
1804+
t: t,
1805+
calls: make(chan *newWorkspaceUpdatesCall),
1806+
}
1807+
}
1808+
1809+
type fakeCloserWaiter struct {
1810+
closeCalls chan chan error
1811+
errCh chan error
1812+
}
1813+
1814+
func (f *fakeCloserWaiter) Close(ctx context.Context) error {
1815+
errRes := make(chan error)
1816+
select {
1817+
case <-ctx.Done():
1818+
return ctx.Err()
1819+
case f.closeCalls <- errRes:
1820+
// OK
1821+
}
1822+
select {
1823+
case <-ctx.Done():
1824+
return ctx.Err()
1825+
case err := <-errRes:
1826+
return err
1827+
}
1828+
}
1829+
1830+
func (f *fakeCloserWaiter) Wait() <-chan error {
1831+
return f.errCh
1832+
}
1833+
1834+
func newFakeCloserWaiter() *fakeCloserWaiter {
1835+
return &fakeCloserWaiter{
1836+
closeCalls: make(chan chan error),
1837+
errCh: make(chan error, 1),
1838+
}
1839+
}
1840+
1841+
type fakeWorkspaceUpdatesDialer struct {
1842+
client tailnet.WorkspaceUpdatesClient
1843+
}
1844+
1845+
func (f *fakeWorkspaceUpdatesDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
1846+
return tailnet.ControlProtocolClients{
1847+
WorkspaceUpdates: f.client,
1848+
Closer: fakeCloser{},
1849+
}, nil
1850+
}
1851+
1852+
type fakeCloser struct{}
1853+
1854+
func (fakeCloser) Close() error {
1855+
return nil
1856+
}

0 commit comments

Comments
 (0)