From 615f316b1f6aed10b03848a269c881639d7f6d03 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Fri, 15 Nov 2024 12:46:35 +0400 Subject: [PATCH] feat: adds support for WorkspaceUpdates to tailnet.Controller --- tailnet/controllers.go | 68 ++++++++++++++++++ tailnet/controllers_test.go | 137 ++++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) diff --git a/tailnet/controllers.go b/tailnet/controllers.go index 7ddcfd9e8de36..0afe74efb837e 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -1177,12 +1177,44 @@ func (c *Controller) Run(ctx context.Context) { continue } c.logger.Info(c.ctx, "obtained tailnet API v2+ client") + err = c.precheckClientsAndControllers(tailnetClients) + if err != nil { + c.logger.Critical(c.ctx, "failed precheck", slog.Error(err)) + _ = tailnetClients.Closer.Close() + continue + } + retrier.Reset() c.runControllersOnce(tailnetClients) c.logger.Info(c.ctx, "tailnet API v2+ connection lost") } }() } +// precheckClientsAndControllers checks that the set of clients we got is compatible with the +// configured controllers. These checks will fail if the dialer is incompatible with the set of +// controllers, or not configured correctly with respect to Tailnet API version. +func (c *Controller) precheckClientsAndControllers(clients ControlProtocolClients) error { + if clients.Coordinator == nil && c.CoordCtrl != nil { + return xerrors.New("missing Coordinator client; have controller") + } + if clients.DERP == nil && c.DERPCtrl != nil { + return xerrors.New("missing DERPMap client; have controller") + } + if clients.WorkspaceUpdates == nil && c.WorkspaceUpdatesCtrl != nil { + return xerrors.New("missing WorkspaceUpdates client; have controller") + } + + // Telemetry and ResumeToken support is considered optional, but the clients must be present + // so that we can call the functions and get an "unimplemented" error. + if clients.ResumeToken == nil && c.ResumeTokenCtrl != nil { + return xerrors.New("missing ResumeToken client; have controller") + } + if clients.Telemetry == nil && c.TelemetryCtrl != nil { + return xerrors.New("missing Telemetry client; have controller") + } + return nil +} + // runControllersOnce uses the provided clients to call into the controllers once. It is combined // into one function so that a problem with one tears down the other and triggers a retry (if // appropriate). We typically multiplex all RPCs over the same websocket, so we want them to share @@ -1236,6 +1268,18 @@ func (c *Controller) runControllersOnce(clients ControlProtocolClients) { } }() } + if c.WorkspaceUpdatesCtrl != nil { + wg.Add(1) + go func() { + defer wg.Done() + c.workspaceUpdates(clients.WorkspaceUpdates) + if c.ctx.Err() == nil { + // Main context is still active, but our workspace updates stream exited, due to + // some error. Close down all the rest of the clients so we'll exit and retry. + closeClients() + } + }() + } // Refresh token is a little different, in that we don't want its controller to hold open the // connection on its own. So we keep it separate from the other wait group, and cancel its @@ -1308,6 +1352,30 @@ func (c *Controller) derpMap(client DERPClient) error { } } +func (c *Controller) workspaceUpdates(client WorkspaceUpdatesClient) { + defer func() { + c.logger.Debug(c.ctx, "exiting workspaceUpdates control routine") + cErr := client.Close() + if cErr != nil { + c.logger.Debug(c.ctx, "error closing WorkspaceUpdates RPC", slog.Error(cErr)) + } + }() + cw := c.WorkspaceUpdatesCtrl.New(client) + select { + case <-c.ctx.Done(): + c.logger.Debug(c.ctx, "workspaceUpdates: context done") + return + case err := <-cw.Wait(): + c.logger.Debug(c.ctx, "workspaceUpdates: wait done") + if err != nil && + !xerrors.Is(err, io.EOF) && + !xerrors.Is(err, context.Canceled) && + !xerrors.Is(err, context.DeadlineExceeded) { + c.logger.Error(c.ctx, "workspace updates stream error", slog.Error(err)) + } + } +} + func (c *Controller) refreshToken(ctx context.Context, client ResumeTokenClient) { cw := c.ResumeTokenCtrl.New(client) go func() { diff --git a/tailnet/controllers_test.go b/tailnet/controllers_test.go index 994a524b5c258..01db2664a7c6a 100644 --- a/tailnet/controllers_test.go +++ b/tailnet/controllers_test.go @@ -1135,6 +1135,49 @@ func TestController_TelemetrySuccess(t *testing.T) { require.Equal(t, []byte("test event"), testEvents[0].Id) } +func TestController_WorkspaceUpdates(t *testing.T) { + t.Parallel() + theError := xerrors.New("a bad thing happened") + testCtx := testutil.Context(t, testutil.WaitShort) + ctx, cancel := context.WithCancel(testCtx) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoredErrorIs: append(slogtest.DefaultIgnoredErrorIs, theError), + }).Leveled(slog.LevelDebug) + + fClient := newFakeWorkspaceUpdateClient(testCtx, t) + dialer := &fakeWorkspaceUpdatesDialer{ + client: fClient, + } + + uut := tailnet.NewController(logger.Named("ctrl"), dialer) + fCtrl := newFakeUpdatesController(ctx, t) + uut.WorkspaceUpdatesCtrl = fCtrl + uut.Run(ctx) + + // it should dial and pass the client to the controller + call := testutil.RequireRecvCtx(testCtx, t, fCtrl.calls) + require.Equal(t, fClient, call.client) + fCW := newFakeCloserWaiter() + testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW) + + // if the CloserWaiter exits... + testutil.RequireSendCtx(testCtx, t, fCW.errCh, theError) + + // it should close, redial and reconnect + cCall := testutil.RequireRecvCtx(testCtx, t, fClient.close) + testutil.RequireSendCtx(testCtx, t, cCall, nil) + + call = testutil.RequireRecvCtx(testCtx, t, fCtrl.calls) + require.Equal(t, fClient, call.client) + fCW = newFakeCloserWaiter() + testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW) + + // canceling the context should close the client + cancel() + cCall = testutil.RequireRecvCtx(testCtx, t, fClient.close) + testutil.RequireSendCtx(testCtx, t, cCall, nil) +} + type fakeTailnetConn struct { peersLostCh chan struct{} } @@ -1717,3 +1760,97 @@ func TestTunnelAllWorkspaceUpdatesController_HandleErrors(t *testing.T) { }) } } + +type fakeWorkspaceUpdatesController struct { + ctx context.Context + t testing.TB + calls chan *newWorkspaceUpdatesCall +} + +type newWorkspaceUpdatesCall struct { + client tailnet.WorkspaceUpdatesClient + resp chan<- tailnet.CloserWaiter +} + +func (f fakeWorkspaceUpdatesController) New(client tailnet.WorkspaceUpdatesClient) tailnet.CloserWaiter { + resps := make(chan tailnet.CloserWaiter) + call := &newWorkspaceUpdatesCall{ + client: client, + resp: resps, + } + select { + case <-f.ctx.Done(): + f.t.Error("timed out waiting to send New call") + cw := newFakeCloserWaiter() + cw.errCh <- f.ctx.Err() + return cw + case f.calls <- call: + // OK + } + select { + case <-f.ctx.Done(): + f.t.Error("timed out waiting to get New call response") + cw := newFakeCloserWaiter() + cw.errCh <- f.ctx.Err() + return cw + case resp := <-resps: + return resp + } +} + +func newFakeUpdatesController(ctx context.Context, t *testing.T) *fakeWorkspaceUpdatesController { + return &fakeWorkspaceUpdatesController{ + ctx: ctx, + t: t, + calls: make(chan *newWorkspaceUpdatesCall), + } +} + +type fakeCloserWaiter struct { + closeCalls chan chan error + errCh chan error +} + +func (f *fakeCloserWaiter) Close(ctx context.Context) error { + errRes := make(chan error) + select { + case <-ctx.Done(): + return ctx.Err() + case f.closeCalls <- errRes: + // OK + } + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errRes: + return err + } +} + +func (f *fakeCloserWaiter) Wait() <-chan error { + return f.errCh +} + +func newFakeCloserWaiter() *fakeCloserWaiter { + return &fakeCloserWaiter{ + closeCalls: make(chan chan error), + errCh: make(chan error, 1), + } +} + +type fakeWorkspaceUpdatesDialer struct { + client tailnet.WorkspaceUpdatesClient +} + +func (f *fakeWorkspaceUpdatesDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) { + return tailnet.ControlProtocolClients{ + WorkspaceUpdates: f.client, + Closer: fakeCloser{}, + }, nil +} + +type fakeCloser struct{} + +func (fakeCloser) Close() error { + return nil +}