Skip to content

feat(tailnet): add workspace updates support to Controller #15529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions tailnet/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,12 +1177,44 @@ func (c *Controller) Run(ctx context.Context) {
continue
}
c.logger.Info(c.ctx, "obtained tailnet API v2+ client")
err = c.precheckClientsAndControllers(tailnetClients)
if err != nil {
c.logger.Critical(c.ctx, "failed precheck", slog.Error(err))
_ = tailnetClients.Closer.Close()
continue
}
retrier.Reset()
c.runControllersOnce(tailnetClients)
c.logger.Info(c.ctx, "tailnet API v2+ connection lost")
}
}()
}

// precheckClientsAndControllers checks that the set of clients we got is compatible with the
// configured controllers. These checks will fail if the dialer is incompatible with the set of
// controllers, or not configured correctly with respect to Tailnet API version.
func (c *Controller) precheckClientsAndControllers(clients ControlProtocolClients) error {
if clients.Coordinator == nil && c.CoordCtrl != nil {
return xerrors.New("missing Coordinator client; have controller")
}
if clients.DERP == nil && c.DERPCtrl != nil {
return xerrors.New("missing DERPMap client; have controller")
}
if clients.WorkspaceUpdates == nil && c.WorkspaceUpdatesCtrl != nil {
return xerrors.New("missing WorkspaceUpdates client; have controller")
}

// Telemetry and ResumeToken support is considered optional, but the clients must be present
// so that we can call the functions and get an "unimplemented" error.
if clients.ResumeToken == nil && c.ResumeTokenCtrl != nil {
return xerrors.New("missing ResumeToken client; have controller")
}
if clients.Telemetry == nil && c.TelemetryCtrl != nil {
return xerrors.New("missing Telemetry client; have controller")
}
return nil
}

// runControllersOnce uses the provided clients to call into the controllers once. It is combined
// into one function so that a problem with one tears down the other and triggers a retry (if
// appropriate). We typically multiplex all RPCs over the same websocket, so we want them to share
Expand Down Expand Up @@ -1236,6 +1268,18 @@ func (c *Controller) runControllersOnce(clients ControlProtocolClients) {
}
}()
}
if c.WorkspaceUpdatesCtrl != nil {
wg.Add(1)
go func() {
defer wg.Done()
c.workspaceUpdates(clients.WorkspaceUpdates)
if c.ctx.Err() == nil {
// Main context is still active, but our workspace updates stream exited, due to
// some error. Close down all the rest of the clients so we'll exit and retry.
closeClients()
}
}()
}

// Refresh token is a little different, in that we don't want its controller to hold open the
// connection on its own. So we keep it separate from the other wait group, and cancel its
Expand Down Expand Up @@ -1308,6 +1352,30 @@ func (c *Controller) derpMap(client DERPClient) error {
}
}

func (c *Controller) workspaceUpdates(client WorkspaceUpdatesClient) {
defer func() {
c.logger.Debug(c.ctx, "exiting workspaceUpdates control routine")
cErr := client.Close()
if cErr != nil {
c.logger.Debug(c.ctx, "error closing WorkspaceUpdates RPC", slog.Error(cErr))
}
}()
cw := c.WorkspaceUpdatesCtrl.New(client)
select {
case <-c.ctx.Done():
c.logger.Debug(c.ctx, "workspaceUpdates: context done")
return
case err := <-cw.Wait():
c.logger.Debug(c.ctx, "workspaceUpdates: wait done")
if err != nil &&
!xerrors.Is(err, io.EOF) &&
!xerrors.Is(err, context.Canceled) &&
!xerrors.Is(err, context.DeadlineExceeded) {
c.logger.Error(c.ctx, "workspace updates stream error", slog.Error(err))
}
}
}

func (c *Controller) refreshToken(ctx context.Context, client ResumeTokenClient) {
cw := c.ResumeTokenCtrl.New(client)
go func() {
Expand Down
137 changes: 137 additions & 0 deletions tailnet/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,49 @@ func TestController_TelemetrySuccess(t *testing.T) {
require.Equal(t, []byte("test event"), testEvents[0].Id)
}

func TestController_WorkspaceUpdates(t *testing.T) {
t.Parallel()
theError := xerrors.New("a bad thing happened")
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, &slogtest.Options{
IgnoredErrorIs: append(slogtest.DefaultIgnoredErrorIs, theError),
}).Leveled(slog.LevelDebug)

fClient := newFakeWorkspaceUpdateClient(testCtx, t)
dialer := &fakeWorkspaceUpdatesDialer{
client: fClient,
}

uut := tailnet.NewController(logger.Named("ctrl"), dialer)
fCtrl := newFakeUpdatesController(ctx, t)
uut.WorkspaceUpdatesCtrl = fCtrl
uut.Run(ctx)

// it should dial and pass the client to the controller
call := testutil.RequireRecvCtx(testCtx, t, fCtrl.calls)
require.Equal(t, fClient, call.client)
fCW := newFakeCloserWaiter()
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW)

// if the CloserWaiter exits...
testutil.RequireSendCtx(testCtx, t, fCW.errCh, theError)

// it should close, redial and reconnect
cCall := testutil.RequireRecvCtx(testCtx, t, fClient.close)
testutil.RequireSendCtx(testCtx, t, cCall, nil)

call = testutil.RequireRecvCtx(testCtx, t, fCtrl.calls)
require.Equal(t, fClient, call.client)
fCW = newFakeCloserWaiter()
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW)

// canceling the context should close the client
cancel()
cCall = testutil.RequireRecvCtx(testCtx, t, fClient.close)
testutil.RequireSendCtx(testCtx, t, cCall, nil)
}

type fakeTailnetConn struct {
peersLostCh chan struct{}
}
Expand Down Expand Up @@ -1717,3 +1760,97 @@ func TestTunnelAllWorkspaceUpdatesController_HandleErrors(t *testing.T) {
})
}
}

type fakeWorkspaceUpdatesController struct {
ctx context.Context
t testing.TB
calls chan *newWorkspaceUpdatesCall
}

type newWorkspaceUpdatesCall struct {
client tailnet.WorkspaceUpdatesClient
resp chan<- tailnet.CloserWaiter
}

func (f fakeWorkspaceUpdatesController) New(client tailnet.WorkspaceUpdatesClient) tailnet.CloserWaiter {
resps := make(chan tailnet.CloserWaiter)
call := &newWorkspaceUpdatesCall{
client: client,
resp: resps,
}
select {
case <-f.ctx.Done():
f.t.Error("timed out waiting to send New call")
cw := newFakeCloserWaiter()
cw.errCh <- f.ctx.Err()
return cw
case f.calls <- call:
// OK
}
select {
case <-f.ctx.Done():
f.t.Error("timed out waiting to get New call response")
cw := newFakeCloserWaiter()
cw.errCh <- f.ctx.Err()
return cw
case resp := <-resps:
return resp
}
}

func newFakeUpdatesController(ctx context.Context, t *testing.T) *fakeWorkspaceUpdatesController {
return &fakeWorkspaceUpdatesController{
ctx: ctx,
t: t,
calls: make(chan *newWorkspaceUpdatesCall),
}
}

type fakeCloserWaiter struct {
closeCalls chan chan error
errCh chan error
}

func (f *fakeCloserWaiter) Close(ctx context.Context) error {
errRes := make(chan error)
select {
case <-ctx.Done():
return ctx.Err()
case f.closeCalls <- errRes:
// OK
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errRes:
return err
}
}

func (f *fakeCloserWaiter) Wait() <-chan error {
return f.errCh
}

func newFakeCloserWaiter() *fakeCloserWaiter {
return &fakeCloserWaiter{
closeCalls: make(chan chan error),
errCh: make(chan error, 1),
}
}

type fakeWorkspaceUpdatesDialer struct {
client tailnet.WorkspaceUpdatesClient
}

func (f *fakeWorkspaceUpdatesDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
return tailnet.ControlProtocolClients{
WorkspaceUpdates: f.client,
Closer: fakeCloser{},
}, nil
}

type fakeCloser struct{}

func (fakeCloser) Close() error {
return nil
}
Loading