diff --git a/cli/vpndaemon_windows.go b/cli/vpndaemon_windows.go index 004fb6493b0c1..d09733817d787 100644 --- a/cli/vpndaemon_windows.go +++ b/cli/vpndaemon_windows.go @@ -60,7 +60,7 @@ func (r *RootCmd) vpnDaemonRun() *serpent.Command { defer pipe.Close() logger.Info(ctx, "starting tunnel") - tunnel, err := vpn.NewTunnel(ctx, logger, pipe) + tunnel, err := vpn.NewTunnel(ctx, logger, pipe, vpn.NewClient()) if err != nil { return xerrors.Errorf("create new tunnel for client: %w", err) } diff --git a/codersdk/wsjson/encoder.go b/codersdk/wsjson/encoder.go index 4cde05984e690..3ba5d8745d121 100644 --- a/codersdk/wsjson/encoder.go +++ b/codersdk/wsjson/encoder.go @@ -27,6 +27,7 @@ func (e *Encoder[T]) Encode(v T) error { return nil } +// nolint: revive // complains that Decoder has the same function name func (e *Encoder[T]) Close(c websocket.StatusCode) error { return e.conn.Close(c, "") } diff --git a/tailnet/conn.go b/tailnet/conn.go index 17affa770d5ee..ff96211702485 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -14,6 +14,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/google/uuid" + "github.com/tailscale/wireguard-go/tun" "golang.org/x/xerrors" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -113,6 +114,8 @@ type Options struct { DNSConfigurator dns.OSConfigurator // Router is optional, and is passed to the underlying wireguard engine. Router router.Router + // TUNDev is optional, and is passed to the underlying wireguard engine. + TUNDev tun.Device } // TelemetrySink allows tailnet.Conn to send network telemetry to the Coder @@ -143,6 +146,8 @@ func NewConn(options *Options) (conn *Conn, err error) { return nil, xerrors.New("At least one IP range must be provided") } + netns.SetEnabled(options.TUNDev != nil) + var telemetryStore *TelemetryStore if options.TelemetrySink != nil { var err error @@ -187,6 +192,7 @@ func NewConn(options *Options) (conn *Conn, err error) { SetSubsystem: sys.Set, DNS: options.DNSConfigurator, Router: options.Router, + Tun: options.TUNDev, }) if err != nil { return nil, xerrors.Errorf("create wgengine: %w", err) @@ -197,11 +203,14 @@ func NewConn(options *Options) (conn *Conn, err error) { } }() wireguardEngine.InstallCaptureHook(options.CaptureHook) - dialer.UseNetstackForIP = func(ip netip.Addr) bool { - _, ok := wireguardEngine.PeerForIP(ip) - return ok + if options.TUNDev == nil { + dialer.UseNetstackForIP = func(ip netip.Addr) bool { + _, ok := wireguardEngine.PeerForIP(ip) + return ok + } } + wireguardEngine = wgengine.NewWatchdog(wireguardEngine) sys.Set(wireguardEngine) magicConn := sys.MagicSock.Get() @@ -244,11 +253,12 @@ func NewConn(options *Options) (conn *Conn, err error) { return nil, xerrors.Errorf("create netstack: %w", err) } - dialer.NetstackDialTCP = func(ctx context.Context, dst netip.AddrPort) (net.Conn, error) { - return netStack.DialContextTCP(ctx, dst) + if options.TUNDev == nil { + dialer.NetstackDialTCP = func(ctx context.Context, dst netip.AddrPort) (net.Conn, error) { + return netStack.DialContextTCP(ctx, dst) + } + netStack.ProcessLocalIPs = true } - netStack.ProcessLocalIPs = true - wireguardEngine = wgengine.NewWatchdog(wireguardEngine) cfgMaps := newConfigMaps( options.Logger, diff --git a/tailnet/controllers.go b/tailnet/controllers.go index 29278482a1c13..e0a57660624e2 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -7,6 +7,7 @@ import ( "maps" "math" "net/netip" + "slices" "strings" "sync" "time" @@ -19,6 +20,7 @@ import ( "tailscale.com/util/dnsname" "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/quartz" @@ -112,6 +114,11 @@ type DNSHostsSetter interface { SetDNSHosts(hosts map[dnsname.FQDN][]netip.Addr) error } +// UpdatesHandler is anything that expects a stream of workspace update diffs. +type UpdatesHandler interface { + Update(WorkspaceUpdate) error +} + // ControlProtocolClients represents an abstract interface to the tailnet control plane via a set // of protocol clients. The Closer should close all the clients (e.g. by closing the underlying // connection). @@ -855,65 +862,121 @@ func (r *basicResumeTokenRefresher) refresh() { r.timer.Reset(dur, "basicResumeTokenRefresher", "refresh") } -type tunnelAllWorkspaceUpdatesController struct { +type TunnelAllWorkspaceUpdatesController struct { coordCtrl *TunnelSrcCoordController dnsHostSetter DNSHostsSetter + updateHandler UpdatesHandler ownerUsername string logger slog.Logger + + mu sync.Mutex + updater *tunnelUpdater } -type workspace struct { - id uuid.UUID - name string - agents map[uuid.UUID]agent +type Workspace struct { + ID uuid.UUID + Name string + Status proto.Workspace_Status + + ownerUsername string + agents map[uuid.UUID]*Agent } -// addAllDNSNames adds names for all of its agents to the given map of names -func (w workspace) addAllDNSNames(names map[dnsname.FQDN][]netip.Addr, owner string) error { - for _, a := range w.agents { +// updateDNSNames updates the DNS names for all agents in the workspace. +func (w *Workspace) updateDNSNames() error { + for id, a := range w.agents { + names := make(map[dnsname.FQDN][]netip.Addr) // TODO: technically, DNS labels cannot start with numbers, but the rules are often not // strictly enforced. - fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.%s.me.coder.", a.name, w.name)) - if err != nil { - return err - } - names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.id)} - fqdn, err = dnsname.ToFQDN(fmt.Sprintf("%s.%s.%s.coder.", a.name, w.name, owner)) + fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.%s.me.coder.", a.Name, w.Name)) if err != nil { return err } - names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.id)} - } - if len(w.agents) == 1 { - fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.coder.", w.name)) + names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)} + fqdn, err = dnsname.ToFQDN(fmt.Sprintf("%s.%s.%s.coder.", a.Name, w.Name, w.ownerUsername)) if err != nil { return err } - for _, a := range w.agents { - names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.id)} + names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)} + if len(w.agents) == 1 { + fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.coder.", w.Name)) + if err != nil { + return err + } + for _, a := range w.agents { + names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)} + } } + a.Hosts = names + w.agents[id] = a } return nil } -type agent struct { - id uuid.UUID - name string +type Agent struct { + ID uuid.UUID + Name string + WorkspaceID uuid.UUID + Hosts map[dnsname.FQDN][]netip.Addr +} + +func (a *Agent) Clone() Agent { + hosts := make(map[dnsname.FQDN][]netip.Addr, len(a.Hosts)) + for k, v := range a.Hosts { + hosts[k] = slices.Clone(v) + } + return Agent{ + ID: a.ID, + Name: a.Name, + WorkspaceID: a.WorkspaceID, + Hosts: hosts, + } } -func (t *tunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter { +func (t *TunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter { + t.mu.Lock() + defer t.mu.Unlock() updater := &tunnelUpdater{ client: client, errChan: make(chan error, 1), logger: t.logger, coordCtrl: t.coordCtrl, dnsHostsSetter: t.dnsHostSetter, + updateHandler: t.updateHandler, ownerUsername: t.ownerUsername, recvLoopDone: make(chan struct{}), - workspaces: make(map[uuid.UUID]*workspace), + workspaces: make(map[uuid.UUID]*Workspace), + } + t.updater = updater + go t.updater.recvLoop() + return t.updater +} + +func (t *TunnelAllWorkspaceUpdatesController) CurrentState() (WorkspaceUpdate, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.updater == nil { + return WorkspaceUpdate{}, xerrors.New("no updater") + } + t.updater.Lock() + defer t.updater.Unlock() + out := WorkspaceUpdate{ + UpsertedWorkspaces: make([]*Workspace, 0, len(t.updater.workspaces)), + UpsertedAgents: make([]*Agent, 0, len(t.updater.workspaces)), + DeletedWorkspaces: make([]*Workspace, 0), + DeletedAgents: make([]*Agent, 0), + } + for _, w := range t.updater.workspaces { + out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &Workspace{ + ID: w.ID, + Name: w.Name, + Status: w.Status, + }) + for _, a := range w.agents { + out.UpsertedAgents = append(out.UpsertedAgents, ptr.Ref(a.Clone())) + } } - go updater.recvLoop() - return updater + return out, nil } type tunnelUpdater struct { @@ -922,14 +985,13 @@ type tunnelUpdater struct { client WorkspaceUpdatesClient coordCtrl *TunnelSrcCoordController dnsHostsSetter DNSHostsSetter + updateHandler UpdatesHandler ownerUsername string recvLoopDone chan struct{} - // don't need the mutex since only manipulated by the recvLoop - workspaces map[uuid.UUID]*workspace - sync.Mutex - closed bool + workspaces map[uuid.UUID]*Workspace + closed bool } func (t *tunnelUpdater) Close(ctx context.Context) error { @@ -990,18 +1052,68 @@ func (t *tunnelUpdater) recvLoop() { } } +type WorkspaceUpdate struct { + UpsertedWorkspaces []*Workspace + UpsertedAgents []*Agent + DeletedWorkspaces []*Workspace + DeletedAgents []*Agent +} + +func (w *WorkspaceUpdate) Clone() WorkspaceUpdate { + clone := WorkspaceUpdate{ + UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)), + UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)), + DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)), + DeletedAgents: make([]*Agent, len(w.DeletedAgents)), + } + for i, ws := range w.UpsertedWorkspaces { + clone.UpsertedWorkspaces[i] = &Workspace{ + ID: ws.ID, + Name: ws.Name, + Status: ws.Status, + } + } + for i, a := range w.UpsertedAgents { + clone.UpsertedAgents[i] = ptr.Ref(a.Clone()) + } + for i, ws := range w.DeletedWorkspaces { + clone.DeletedWorkspaces[i] = &Workspace{ + ID: ws.ID, + Name: ws.Name, + Status: ws.Status, + } + } + for i, a := range w.DeletedAgents { + clone.DeletedAgents[i] = ptr.Ref(a.Clone()) + } + return clone +} + func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { + t.Lock() + defer t.Unlock() + + currentUpdate := WorkspaceUpdate{ + UpsertedWorkspaces: []*Workspace{}, + UpsertedAgents: []*Agent{}, + DeletedWorkspaces: []*Workspace{}, + DeletedAgents: []*Agent{}, + } + for _, uw := range update.UpsertedWorkspaces { workspaceID, err := uuid.FromBytes(uw.Id) if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } - w := workspace{ - id: workspaceID, - name: uw.Name, - agents: make(map[uuid.UUID]agent), + w := &Workspace{ + ID: workspaceID, + Name: uw.Name, + Status: uw.Status, + ownerUsername: t.ownerUsername, + agents: make(map[uuid.UUID]*Agent), } - t.upsertWorkspace(w) + t.upsertWorkspaceLocked(w) + currentUpdate.UpsertedWorkspaces = append(currentUpdate.UpsertedWorkspaces, w) } // delete agents before deleting workspaces, since the agents have workspace ID references @@ -1014,17 +1126,22 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } - err = t.deleteAgent(workspaceID, agentID) + deletedAgent, err := t.deleteAgentLocked(workspaceID, agentID) if err != nil { return xerrors.Errorf("failed to delete agent: %w", err) } + currentUpdate.DeletedAgents = append(currentUpdate.DeletedAgents, deletedAgent) } for _, dw := range update.DeletedWorkspaces { workspaceID, err := uuid.FromBytes(dw.Id) if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } - t.deleteWorkspace(workspaceID) + deletedWorkspace, err := t.deleteWorkspaceLocked(workspaceID) + if err != nil { + return xerrors.Errorf("failed to delete workspace: %w", err) + } + currentUpdate.DeletedWorkspaces = append(currentUpdate.DeletedWorkspaces, deletedWorkspace) } // upsert agents last, after all workspaces have been added and deleted, since agents reference @@ -1038,17 +1155,18 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { if err != nil { return xerrors.Errorf("failed to parse workspace ID: %w", err) } - a := agent{name: ua.Name, id: agentID} - err = t.upsertAgent(workspaceID, a) + a := &Agent{Name: ua.Name, ID: agentID, WorkspaceID: workspaceID} + err = t.upsertAgentLocked(workspaceID, a) if err != nil { return xerrors.Errorf("failed to upsert agent: %w", err) } + currentUpdate.UpsertedAgents = append(currentUpdate.UpsertedAgents, a) } - allAgents := t.allAgentIDs() + allAgents := t.allAgentIDsLocked() t.coordCtrl.SyncDestinations(allAgents) + dnsNames := t.updateDNSNamesLocked() if t.dnsHostsSetter != nil { t.logger.Debug(context.Background(), "updating dns hosts") - dnsNames := t.allDNSNames() err := t.dnsHostsSetter.SetDNSHosts(dnsNames) if err != nil { return xerrors.Errorf("failed to set DNS hosts: %w", err) @@ -1056,41 +1174,60 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error { } else { t.logger.Debug(context.Background(), "skipping setting DNS names because we have no setter") } + if t.updateHandler != nil { + t.logger.Debug(context.Background(), "calling update handler") + err := t.updateHandler.Update(currentUpdate.Clone()) + if err != nil { + t.logger.Error(context.Background(), "failed to call update handler", slog.Error(err)) + } + } return nil } -func (t *tunnelUpdater) upsertWorkspace(w workspace) { - old, ok := t.workspaces[w.id] +func (t *tunnelUpdater) upsertWorkspaceLocked(w *Workspace) *Workspace { + old, ok := t.workspaces[w.ID] if !ok { - t.workspaces[w.id] = &w - return + t.workspaces[w.ID] = w + return w } - old.name = w.name + old.Name = w.Name + old.Status = w.Status + old.ownerUsername = w.ownerUsername + return w } -func (t *tunnelUpdater) deleteWorkspace(id uuid.UUID) { +func (t *tunnelUpdater) deleteWorkspaceLocked(id uuid.UUID) (*Workspace, error) { + w, ok := t.workspaces[id] + if !ok { + return nil, xerrors.Errorf("workspace %s not found", id) + } delete(t.workspaces, id) + return w, nil } -func (t *tunnelUpdater) upsertAgent(workspaceID uuid.UUID, a agent) error { +func (t *tunnelUpdater) upsertAgentLocked(workspaceID uuid.UUID, a *Agent) error { w, ok := t.workspaces[workspaceID] if !ok { return xerrors.Errorf("workspace %s not found", workspaceID) } - w.agents[a.id] = a + w.agents[a.ID] = a return nil } -func (t *tunnelUpdater) deleteAgent(workspaceID, id uuid.UUID) error { +func (t *tunnelUpdater) deleteAgentLocked(workspaceID, id uuid.UUID) (*Agent, error) { w, ok := t.workspaces[workspaceID] if !ok { - return xerrors.Errorf("workspace %s not found", workspaceID) + return nil, xerrors.Errorf("workspace %s not found", workspaceID) + } + a, ok := w.agents[id] + if !ok { + return nil, xerrors.Errorf("agent %s not found in workspace %s", id, workspaceID) } delete(w.agents, id) - return nil + return a, nil } -func (t *tunnelUpdater) allAgentIDs() []uuid.UUID { +func (t *tunnelUpdater) allAgentIDsLocked() []uuid.UUID { out := make([]uuid.UUID, 0, len(t.workspaces)) for _, w := range t.workspaces { for id := range w.agents { @@ -1100,41 +1237,54 @@ func (t *tunnelUpdater) allAgentIDs() []uuid.UUID { return out } -func (t *tunnelUpdater) allDNSNames() map[dnsname.FQDN][]netip.Addr { +// updateDNSNamesLocked updates the DNS names for all workspaces in the tunnelUpdater. +// t.Mutex must be held. +func (t *tunnelUpdater) updateDNSNamesLocked() map[dnsname.FQDN][]netip.Addr { names := make(map[dnsname.FQDN][]netip.Addr) for _, w := range t.workspaces { - err := w.addAllDNSNames(names, t.ownerUsername) + err := w.updateDNSNames() if err != nil { // This should never happen in production, because converting the FQDN only fails // if names are too long, and we put strict length limits on agent, workspace, and user // names. t.logger.Critical(context.Background(), "failed to include DNS name(s)", - slog.F("workspace_id", w.id), + slog.F("workspace_id", w.ID), slog.Error(err)) } + for _, a := range w.agents { + for name, addrs := range a.Hosts { + names[name] = addrs + } + } } return names } -type TunnelAllOption func(t *tunnelAllWorkspaceUpdatesController) +type TunnelAllOption func(t *TunnelAllWorkspaceUpdatesController) // WithDNS configures the tunnelAllWorkspaceUpdatesController to set DNS names for all workspaces // and agents it learns about. func WithDNS(d DNSHostsSetter, ownerUsername string) TunnelAllOption { - return func(t *tunnelAllWorkspaceUpdatesController) { + return func(t *TunnelAllWorkspaceUpdatesController) { t.dnsHostSetter = d t.ownerUsername = ownerUsername } } +func WithHandler(h UpdatesHandler) TunnelAllOption { + return func(t *TunnelAllWorkspaceUpdatesController) { + t.updateHandler = h + } +} + // NewTunnelAllWorkspaceUpdatesController creates a WorkspaceUpdatesController that creates tunnels // (via the TunnelSrcCoordController) to all agents received over the WorkspaceUpdates RPC. If a // DNSHostSetter is provided, it also programs DNS hosts based on the agent and workspace names. func NewTunnelAllWorkspaceUpdatesController( logger slog.Logger, c *TunnelSrcCoordController, opts ...TunnelAllOption, -) WorkspaceUpdatesController { - t := &tunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c} +) *TunnelAllWorkspaceUpdatesController { + t := &TunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c} for _, opt := range opts { opt(t) } diff --git a/tailnet/controllers_test.go b/tailnet/controllers_test.go index 70da418698fb2..ee3c07ff745ac 100644 --- a/tailnet/controllers_test.go +++ b/tailnet/controllers_test.go @@ -7,6 +7,7 @@ import ( "net" "net/netip" "slices" + "strings" "sync" "sync/atomic" "testing" @@ -1451,10 +1452,35 @@ func (f *fakeDNSSetter) SetDNSHosts(hosts map[dnsname.FQDN][]netip.Addr) error { } } +func newFakeUpdateHandler(ctx context.Context, t testing.TB) *fakeUpdateHandler { + return &fakeUpdateHandler{ + ctx: ctx, + t: t, + ch: make(chan tailnet.WorkspaceUpdate), + } +} + +type fakeUpdateHandler struct { + ctx context.Context + t testing.TB + ch chan tailnet.WorkspaceUpdate +} + +func (f *fakeUpdateHandler) Update(wu tailnet.WorkspaceUpdate) error { + f.t.Helper() + select { + case <-f.ctx.Done(): + return timeoutOnFakeErr + case f.ch <- wu: + // OK + } + return nil +} + func setupConnectedAllWorkspaceUpdatesController( ctx context.Context, t testing.TB, logger slog.Logger, opts ...tailnet.TunnelAllOption, ) ( - *fakeCoordinatorClient, *fakeWorkspaceUpdateClient, + *fakeCoordinatorClient, *fakeWorkspaceUpdateClient, *tailnet.TunnelAllWorkspaceUpdatesController, ) { fConn := &fakeCoordinatee{} tsc := tailnet.NewTunnelSrcCoordController(logger, fConn) @@ -1484,7 +1510,7 @@ func setupConnectedAllWorkspaceUpdatesController( err := testutil.RequireRecvCtx(ctx, t, updateCW.Wait()) require.ErrorIs(t, err, io.EOF) }) - return coordC, updateC + return coordC, updateC, uut } func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) { @@ -1492,9 +1518,12 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) { ctx := testutil.Context(t, testutil.WaitShort) logger := testutil.Logger(t) + fUH := newFakeUpdateHandler(ctx, t) fDNS := newFakeDNSSetter(ctx, t) - coordC, updateC := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger, - tailnet.WithDNS(fDNS, "testy")) + coordC, updateC, updateCtrl := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger, + tailnet.WithDNS(fDNS, "testy"), + tailnet.WithHandler(fUH), + ) // Initial update contains 2 workspaces with 1 & 2 agents, respectively w1ID := testUUID(1) @@ -1528,19 +1557,71 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) { require.Contains(t, adds, w2a1ID) require.Contains(t, adds, w2a2ID) + ws1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::") + w2a1IP := netip.MustParseAddr("fd60:627a:a42b:0201::") + w2a2IP := netip.MustParseAddr("fd60:627a:a42b:0202::") + // Also triggers setting DNS hosts expectedDNS := map[dnsname.FQDN][]netip.Addr{ - "w1a1.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, - "w2a1.w2.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0201::")}, - "w2a2.w2.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0202::")}, - "w1a1.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, - "w2a1.w2.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0201::")}, - "w2a2.w2.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0202::")}, - "w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + "w1a1.w1.me.coder.": {ws1a1IP}, + "w2a1.w2.me.coder.": {w2a1IP}, + "w2a2.w2.me.coder.": {w2a2IP}, + "w1a1.w1.testy.coder.": {ws1a1IP}, + "w2a1.w2.testy.coder.": {w2a1IP}, + "w2a2.w2.testy.coder.": {w2a2IP}, + "w1.coder.": {ws1a1IP}, } dnsCall := testutil.RequireRecvCtx(ctx, t, fDNS.calls) require.Equal(t, expectedDNS, dnsCall.hosts) testutil.RequireSendCtx(ctx, t, dnsCall.err, nil) + + currentState := tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + {ID: w1ID, Name: "w1"}, + {ID: w2ID, Name: "w2"}, + }, + UpsertedAgents: []*tailnet.Agent{ + { + ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "w1.coder.": {ws1a1IP}, + "w1a1.w1.me.coder.": {ws1a1IP}, + "w1a1.w1.testy.coder.": {ws1a1IP}, + }, + }, + { + ID: w2a1ID, Name: "w2a1", WorkspaceID: w2ID, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "w2a1.w2.me.coder.": {w2a1IP}, + "w2a1.w2.testy.coder.": {w2a1IP}, + }, + }, + { + ID: w2a2ID, Name: "w2a2", WorkspaceID: w2ID, + Hosts: map[dnsname.FQDN][]netip.Addr{ + "w2a2.w2.me.coder.": {w2a2IP}, + "w2a2.w2.testy.coder.": {w2a2IP}, + }, + }, + }, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + } + + // And the callback + cbUpdate := testutil.RequireRecvCtx(ctx, t, fUH.ch) + require.Equal(t, currentState, cbUpdate) + + // Current recvState should match + recvState, err := updateCtrl.CurrentState() + require.NoError(t, err) + slices.SortFunc(recvState.UpsertedWorkspaces, func(a, b *tailnet.Workspace) int { + return strings.Compare(a.Name, b.Name) + }) + slices.SortFunc(recvState.UpsertedAgents, func(a, b *tailnet.Agent) int { + return strings.Compare(a.Name, b.Name) + }) + require.Equal(t, currentState, recvState) } func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) { @@ -1548,13 +1629,19 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) { ctx := testutil.Context(t, testutil.WaitShort) logger := testutil.Logger(t) + fUH := newFakeUpdateHandler(ctx, t) fDNS := newFakeDNSSetter(ctx, t) - coordC, updateC := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger, - tailnet.WithDNS(fDNS, "testy")) + coordC, updateC, updateCtrl := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger, + tailnet.WithDNS(fDNS, "testy"), + tailnet.WithHandler(fUH), + ) w1ID := testUUID(1) w1a1ID := testUUID(1, 1) w1a2ID := testUUID(1, 2) + ws1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::") + ws1a2IP := netip.MustParseAddr("fd60:627a:a42b:0102::") + initUp := &proto.WorkspaceUpdate{ UpsertedWorkspaces: []*proto.Workspace{ {Id: w1ID[:], Name: "w1"}, @@ -1574,14 +1661,37 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) { // DNS for w1a1 expectedDNS := map[dnsname.FQDN][]netip.Addr{ - "w1a1.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, - "w1a1.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, - "w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + "w1a1.w1.testy.coder.": {ws1a1IP}, + "w1a1.w1.me.coder.": {ws1a1IP}, + "w1.coder.": {ws1a1IP}, } dnsCall := testutil.RequireRecvCtx(ctx, t, fDNS.calls) require.Equal(t, expectedDNS, dnsCall.hosts) testutil.RequireSendCtx(ctx, t, dnsCall.err, nil) + initRecvUp := tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + {ID: w1ID, Name: "w1"}, + }, + UpsertedAgents: []*tailnet.Agent{ + {ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{ + "w1a1.w1.testy.coder.": {ws1a1IP}, + "w1a1.w1.me.coder.": {ws1a1IP}, + "w1.coder.": {ws1a1IP}, + }}, + }, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + } + + cbUpdate := testutil.RequireRecvCtx(ctx, t, fUH.ch) + require.Equal(t, initRecvUp, cbUpdate) + + // Current state should match initial + state, err := updateCtrl.CurrentState() + require.NoError(t, err) + require.Equal(t, initRecvUp, state) + // Send update that removes w1a1 and adds w1a2 agentUpdate := &proto.WorkspaceUpdate{ UpsertedAgents: []*proto.Agent{ @@ -1606,13 +1716,51 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) { // DNS contains only w1a2 expectedDNS = map[dnsname.FQDN][]netip.Addr{ - "w1a2.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0102::")}, - "w1a2.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0102::")}, - "w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0102::")}, + "w1a2.w1.testy.coder.": {ws1a2IP}, + "w1a2.w1.me.coder.": {ws1a2IP}, + "w1.coder.": {ws1a2IP}, } dnsCall = testutil.RequireRecvCtx(ctx, t, fDNS.calls) require.Equal(t, expectedDNS, dnsCall.hosts) testutil.RequireSendCtx(ctx, t, dnsCall.err, nil) + + cbUpdate = testutil.RequireRecvCtx(ctx, t, fUH.ch) + sndRecvUpdate := tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{}, + UpsertedAgents: []*tailnet.Agent{ + {ID: w1a2ID, Name: "w1a2", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{ + "w1a2.w1.testy.coder.": {ws1a2IP}, + "w1a2.w1.me.coder.": {ws1a2IP}, + "w1.coder.": {ws1a2IP}, + }}, + }, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{ + {ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{ + "w1a1.w1.testy.coder.": {ws1a1IP}, + "w1a1.w1.me.coder.": {ws1a1IP}, + "w1.coder.": {ws1a1IP}, + }}, + }, + } + require.Equal(t, sndRecvUpdate, cbUpdate) + + state, err = updateCtrl.CurrentState() + require.NoError(t, err) + require.Equal(t, tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + {ID: w1ID, Name: "w1"}, + }, + UpsertedAgents: []*tailnet.Agent{ + {ID: w1a2ID, Name: "w1a2", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{ + "w1a2.w1.testy.coder.": {ws1a2IP}, + "w1a2.w1.me.coder.": {ws1a2IP}, + "w1.coder.": {ws1a2IP}, + }}, + }, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, state) } func TestTunnelAllWorkspaceUpdatesController_DNSError(t *testing.T) { @@ -1635,6 +1783,8 @@ func TestTunnelAllWorkspaceUpdatesController_DNSError(t *testing.T) { w1ID := testUUID(1) w1a1ID := testUUID(1, 1) + ws1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::") + initUp := &proto.WorkspaceUpdate{ UpsertedWorkspaces: []*proto.Workspace{ {Id: w1ID[:], Name: "w1"}, @@ -1648,9 +1798,9 @@ func TestTunnelAllWorkspaceUpdatesController_DNSError(t *testing.T) { // DNS for w1a1 expectedDNS := map[dnsname.FQDN][]netip.Addr{ - "w1a1.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, - "w1a1.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, - "w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")}, + "w1a1.w1.me.coder.": {ws1a1IP}, + "w1a1.w1.testy.coder.": {ws1a1IP}, + "w1.coder.": {ws1a1IP}, } dnsCall := testutil.RequireRecvCtx(ctx, t, fDNS.calls) require.Equal(t, expectedDNS, dnsCall.hosts) @@ -1778,6 +1928,10 @@ type fakeWorkspaceUpdatesController struct { calls chan *newWorkspaceUpdatesCall } +func (*fakeWorkspaceUpdatesController) CurrentState() *proto.WorkspaceUpdate { + panic("unimplemented") +} + type newWorkspaceUpdatesCall struct { client tailnet.WorkspaceUpdatesClient resp chan<- tailnet.CloserWaiter diff --git a/vpn/client.go b/vpn/client.go new file mode 100644 index 0000000000000..954f27a7aa668 --- /dev/null +++ b/vpn/client.go @@ -0,0 +1,180 @@ +package vpn + +import ( + "context" + "net/http" + "net/netip" + "net/url" + + "golang.org/x/xerrors" + "nhooyr.io/websocket" + "tailscale.com/net/dns" + "tailscale.com/wgengine/router" + + "github.com/tailscale/wireguard-go/tun" + + "cdr.dev/slog" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/workspacesdk" + "github.com/coder/coder/v2/tailnet" + "github.com/coder/coder/v2/tailnet/proto" + "github.com/coder/quartz" +) + +type Conn interface { + CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) + Close() error +} + +type vpnConn struct { + *tailnet.Conn + + cancelFn func() + controller *tailnet.Controller + updatesCtrl *tailnet.TunnelAllWorkspaceUpdatesController +} + +func (c *vpnConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) { + return c.updatesCtrl.CurrentState() +} + +func (c *vpnConn) Close() error { + c.cancelFn() + <-c.controller.Closed() + return c.Conn.Close() +} + +type client struct{} + +type Client interface { + NewConn(ctx context.Context, serverURL *url.URL, token string, options *Options) (Conn, error) +} + +func NewClient() Client { + return &client{} +} + +type Options struct { + Headers http.Header + Logger slog.Logger + DNSConfigurator dns.OSConfigurator + Router router.Router + TUNFileDescriptor *int + UpdateHandler tailnet.UpdatesHandler +} + +func (*client) NewConn(initCtx context.Context, serverURL *url.URL, token string, options *Options) (vpnC Conn, err error) { + if options == nil { + options = &Options{} + } + + if options.Headers == nil { + options.Headers = http.Header{} + } + + var dev tun.Device + if options.TUNFileDescriptor != nil { + // No-op on non-Darwin platforms. + dev, err = makeTUN(*options.TUNFileDescriptor) + if err != nil { + return nil, xerrors.Errorf("make TUN: %w", err) + } + } + + headers := options.Headers + sdk := codersdk.New(serverURL) + sdk.SetSessionToken(token) + sdk.HTTPClient.Transport = &codersdk.HeaderTransport{ + Transport: http.DefaultTransport, + Header: headers, + } + + // New context, separate from initCtx. We don't want to cancel the + // connection if initCtx is canceled. + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + if err != nil { + cancel() + } + }() + + rpcURL, err := sdk.URL.Parse("/api/v2/tailnet") + if err != nil { + return nil, xerrors.Errorf("parse rpc url: %w", err) + } + + me, err := sdk.User(initCtx, codersdk.Me) + if err != nil { + return nil, xerrors.Errorf("get user: %w", err) + } + + connInfo, err := workspacesdk.New(sdk).AgentConnectionInfoGeneric(initCtx) + if err != nil { + return nil, xerrors.Errorf("get connection info: %w", err) + } + + headers.Set(codersdk.SessionTokenHeader, token) + dialer := workspacesdk.NewWebsocketDialer(options.Logger, rpcURL, &websocket.DialOptions{ + HTTPClient: sdk.HTTPClient, + HTTPHeader: headers, + CompressionMode: websocket.CompressionDisabled, + }, workspacesdk.WithWorkspaceUpdates(&proto.WorkspaceUpdatesRequest{ + WorkspaceOwnerId: tailnet.UUIDToByteSlice(me.ID), + })) + + ip := tailnet.CoderServicePrefix.RandomAddr() + conn, err := tailnet.NewConn(&tailnet.Options{ + Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)}, + DERPMap: connInfo.DERPMap, + DERPHeader: &headers, + DERPForceWebSockets: connInfo.DERPForceWebSockets, + Logger: options.Logger, + BlockEndpoints: connInfo.DisableDirectConnections, + DNSConfigurator: options.DNSConfigurator, + Router: options.Router, + TUNDev: dev, + }) + if err != nil { + return nil, xerrors.Errorf("create tailnet: %w", err) + } + defer func() { + if err != nil { + _ = conn.Close() + } + }() + + clk := quartz.NewReal() + controller := tailnet.NewController(options.Logger, dialer) + coordCtrl := tailnet.NewTunnelSrcCoordController(options.Logger, conn) + controller.ResumeTokenCtrl = tailnet.NewBasicResumeTokenController(options.Logger, clk) + controller.CoordCtrl = coordCtrl + controller.DERPCtrl = tailnet.NewBasicDERPController(options.Logger, conn) + updatesCtrl := tailnet.NewTunnelAllWorkspaceUpdatesController( + options.Logger, + coordCtrl, + tailnet.WithDNS(conn, me.Name), + tailnet.WithHandler(options.UpdateHandler), + ) + controller.WorkspaceUpdatesCtrl = updatesCtrl + controller.Run(ctx) + + options.Logger.Debug(ctx, "running tailnet API v2+ connector") + + select { + case <-initCtx.Done(): + return nil, xerrors.Errorf("timed out waiting for coordinator and derp map: %w", initCtx.Err()) + case err = <-dialer.Connected(): + if err != nil { + options.Logger.Error(ctx, "failed to connect to tailnet v2+ API", slog.Error(err)) + return nil, xerrors.Errorf("start connector: %w", err) + } + options.Logger.Debug(ctx, "connected to tailnet v2+ API") + } + + return &vpnConn{ + Conn: conn, + cancelFn: cancel, + controller: controller, + updatesCtrl: updatesCtrl, + }, nil +} diff --git a/vpn/client_test.go b/vpn/client_test.go new file mode 100644 index 0000000000000..7aac3433bdb6d --- /dev/null +++ b/vpn/client_test.go @@ -0,0 +1,188 @@ +package vpn_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "nhooyr.io/websocket" + "tailscale.com/net/dns" + "tailscale.com/tailcfg" + + "github.com/coder/coder/v2/coderd/httpapi" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/workspacesdk" + "github.com/coder/coder/v2/tailnet" + "github.com/coder/coder/v2/tailnet/proto" + "github.com/coder/coder/v2/tailnet/tailnettest" + "github.com/coder/coder/v2/testutil" + "github.com/coder/coder/v2/vpn" +) + +func TestClient_WorkspaceUpdates(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + logger := testutil.Logger(t) + + userID := uuid.UUID{1} + wsID := uuid.UUID{2} + peerID := uuid.UUID{3} + + fCoord := tailnettest.NewFakeCoordinator() + var coord tailnet.Coordinator = fCoord + coordPtr := atomic.Pointer[tailnet.Coordinator]{} + coordPtr.Store(&coord) + ctrl := gomock.NewController(t) + mProvider := tailnettest.NewMockWorkspaceUpdatesProvider(ctrl) + + mSub := tailnettest.NewMockSubscription(ctrl) + outUpdateCh := make(chan *proto.WorkspaceUpdate, 1) + inUpdateCh := make(chan tailnet.WorkspaceUpdate, 1) + mProvider.EXPECT().Subscribe(gomock.Any(), userID).Times(1).Return(mSub, nil) + mSub.EXPECT().Updates().MinTimes(1).Return(outUpdateCh) + mSub.EXPECT().Close().Times(1).Return(nil) + + svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{ + Logger: logger, + CoordPtr: &coordPtr, + DERPMapUpdateFrequency: time.Hour, + DERPMapFn: func() *tailcfg.DERPMap { return &tailcfg.DERPMap{} }, + WorkspaceUpdatesProvider: mProvider, + ResumeTokenProvider: tailnet.NewInsecureTestResumeTokenProvider(), + }) + require.NoError(t, err) + + user := make(chan struct{}) + connInfo := make(chan struct{}) + serveErrCh := make(chan error) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/users/me": + httpapi.Write(ctx, w, http.StatusOK, codersdk.User{ + ReducedUser: codersdk.ReducedUser{ + MinimalUser: codersdk.MinimalUser{ + ID: userID, + }, + }, + }) + user <- struct{}{} + + case "/api/v2/workspaceagents/connection": + httpapi.Write(ctx, w, http.StatusOK, workspacesdk.AgentConnectionInfo{ + DisableDirectConnections: false, + }) + connInfo <- struct{}{} + + case "/api/v2/tailnet": + // need 2.3 for WorkspaceUpdates RPC + cVer := r.URL.Query().Get("version") + assert.Equal(t, "2.3", cVer) + + sws, err := websocket.Accept(w, r, nil) + if !assert.NoError(t, err) { + return + } + wsCtx, nc := codersdk.WebsocketNetConn(ctx, sws, websocket.MessageBinary) + serveErrCh <- svc.ServeConnV2(wsCtx, nc, tailnet.StreamID{ + Name: "client", + ID: peerID, + // Auth can be nil as we use a mock update provider + Auth: tailnet.ClientUserCoordinateeAuth{ + Auth: nil, + }, + }) + default: + http.NotFound(w, r) + } + })) + t.Cleanup(server.Close) + + svrURL, err := url.Parse(server.URL) + require.NoError(t, err) + connErrCh := make(chan error) + connCh := make(chan vpn.Conn) + go func() { + conn, err := vpn.NewClient().NewConn(ctx, svrURL, "fakeToken", &vpn.Options{ + UpdateHandler: updateHandler(func(wu tailnet.WorkspaceUpdate) error { + inUpdateCh <- wu + return nil + }), + DNSConfigurator: &noopConfigurator{}, + }) + connErrCh <- err + connCh <- conn + }() + testutil.RequireRecvCtx(ctx, t, user) + testutil.RequireRecvCtx(ctx, t, connInfo) + err = testutil.RequireRecvCtx(ctx, t, connErrCh) + require.NoError(t, err) + conn := testutil.RequireRecvCtx(ctx, t, connCh) + + // Send a workspace update + update := &proto.WorkspaceUpdate{ + UpsertedWorkspaces: []*proto.Workspace{ + { + Id: wsID[:], + }, + }, + } + testutil.RequireSendCtx(ctx, t, outUpdateCh, update) + + // It'll be received by the update handler + recvUpdate := testutil.RequireRecvCtx(ctx, t, inUpdateCh) + require.Len(t, recvUpdate.UpsertedWorkspaces, 1) + require.Equal(t, wsID, recvUpdate.UpsertedWorkspaces[0].ID) + + // And be reflected on the Conn's state + state, err := conn.CurrentWorkspaceState() + require.NoError(t, err) + require.Equal(t, tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wsID, + }, + }, + UpsertedAgents: []*tailnet.Agent{}, + DeletedWorkspaces: []*tailnet.Workspace{}, + DeletedAgents: []*tailnet.Agent{}, + }, state) + + // Close the conn + conn.Close() + err = testutil.RequireRecvCtx(ctx, t, serveErrCh) + require.NoError(t, err) +} + +type updateHandler func(tailnet.WorkspaceUpdate) error + +func (h updateHandler) Update(u tailnet.WorkspaceUpdate) error { + return h(u) +} + +type noopConfigurator struct{} + +func (*noopConfigurator) Close() error { + return nil +} + +func (*noopConfigurator) GetBaseConfig() (dns.OSConfig, error) { + return dns.OSConfig{}, nil +} + +func (*noopConfigurator) SetDNS(dns.OSConfig) error { + return nil +} + +func (*noopConfigurator) SupportsSplitDNS() bool { + return true +} + +var _ dns.OSConfigurator = (*noopConfigurator)(nil) diff --git a/vpn/dylib/lib.go b/vpn/dylib/lib.go index 346937c384e95..63062d2ed5bf8 100644 --- a/vpn/dylib/lib.go +++ b/vpn/dylib/lib.go @@ -22,7 +22,7 @@ const ( ) // OpenTunnel creates a new VPN tunnel by `dup`ing the provided 'PIPE' -// file descriptors for reading, writing, and logging. +// file descriptors for reading and writing. // //export OpenTunnel func OpenTunnel(cReadFD, cWriteFD int32) int32 { @@ -46,8 +46,11 @@ func OpenTunnel(cReadFD, cWriteFD int32) int32 { return ErrOpenPipe } - // Logs will be sent over the protocol - _, err = vpn.NewTunnel(ctx, slog.Make(), conn) + _, err = vpn.NewTunnel(ctx, slog.Make(), conn, vpn.NewClient(), + vpn.UseAsDNSConfig(), + vpn.UseAsRouter(), + vpn.UseAsLogger(), + ) if err != nil { unix.Close(readFD) unix.Close(writeFD) diff --git a/vpn/tun.go b/vpn/tun.go new file mode 100644 index 0000000000000..f8c51bff34390 --- /dev/null +++ b/vpn/tun.go @@ -0,0 +1,10 @@ +//go:build !darwin + +package vpn + +import "github.com/tailscale/wireguard-go/tun" + +// This is a no-op on non-Darwin platforms. +func makeTUN(int) (tun.Device, error) { + return nil, nil +} diff --git a/vpn/tun_darwin.go b/vpn/tun_darwin.go new file mode 100644 index 0000000000000..f710c75575009 --- /dev/null +++ b/vpn/tun_darwin.go @@ -0,0 +1,30 @@ +//go:build darwin + +package vpn + +import ( + "os" + + "github.com/tailscale/wireguard-go/tun" + "golang.org/x/sys/unix" + "golang.org/x/xerrors" +) + +func makeTUN(tunFD int) (tun.Device, error) { + dupTunFd, err := unix.Dup(tunFD) + if err != nil { + return nil, xerrors.Errorf("dup tun fd: %w", err) + } + + err = unix.SetNonblock(dupTunFd, true) + if err != nil { + unix.Close(dupTunFd) + return nil, xerrors.Errorf("set nonblock: %w", err) + } + fileTun, err := tun.CreateTUNFromFile(os.NewFile(uintptr(dupTunFd), "/dev/tun"), 0) + if err != nil { + unix.Close(dupTunFd) + return nil, xerrors.Errorf("create TUN from File: %w", err) + } + return fileTun, nil +} diff --git a/vpn/tunnel.go b/vpn/tunnel.go index f077f8eb1a442..dae555483cf99 100644 --- a/vpn/tunnel.go +++ b/vpn/tunnel.go @@ -6,32 +6,56 @@ import ( "encoding/json" "fmt" "io" + "net/http" + "net/url" "reflect" "strconv" "sync" "unicode" "golang.org/x/xerrors" + "tailscale.com/net/dns" + "tailscale.com/wgengine/router" "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/util/ptr" + "github.com/coder/coder/v2/tailnet" ) type Tunnel struct { speaker[*TunnelMessage, *ManagerMessage, ManagerMessage] ctx context.Context - logger slog.Logger requestLoopDone chan struct{} + logger slog.Logger + logMu sync.Mutex logs []*TunnelMessage + + client Client + conn Conn + + // clientLogger is a separate logger than `logger` when the `UseAsLogger` + // option is used, to avoid the tunnel using itself as a sink for it's own + // logs, which could lead to deadlocks. + clientLogger slog.Logger + // router and dnsConfigurator may be nil + router router.Router + dnsConfigurator dns.OSConfigurator } +type TunnelOption func(t *Tunnel) + func NewTunnel( - ctx context.Context, logger slog.Logger, conn io.ReadWriteCloser, + ctx context.Context, + logger slog.Logger, + mgrConn io.ReadWriteCloser, + client Client, + opts ...TunnelOption, ) (*Tunnel, error) { logger = logger.Named("vpn") s, err := newSpeaker[*TunnelMessage, *ManagerMessage]( - ctx, logger, conn, SpeakerRoleTunnel, SpeakerRoleManager) + ctx, logger, mgrConn, SpeakerRoleTunnel, SpeakerRoleManager) if err != nil { return nil, err } @@ -40,7 +64,13 @@ func NewTunnel( speaker: *(s), ctx: ctx, logger: logger, + clientLogger: logger, requestLoopDone: make(chan struct{}), + client: client, + } + + for _, opt := range opts { + opt(t) } t.speaker.start() go t.requestLoop() @@ -55,6 +85,14 @@ func (t *Tunnel) requestLoop() { if err := req.sendReply(resp); err != nil { t.logger.Debug(t.ctx, "failed to send RPC reply", slog.Error(err)) } + if _, ok := resp.GetMsg().(*TunnelMessage_Stop); ok { + // TODO: Wait for the reply to be sent before closing the speaker. + // err := t.speaker.Close() + // if err != nil { + // t.logger.Error(t.ctx, "failed to close speaker", slog.Error(err)) + // } + return + } continue } // Not a unary RPC. We don't know of any message types that are neither a response nor a @@ -70,12 +108,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage { resp.Rpc = &RPC{ResponseTo: msgID} switch msg := req.GetMsg().(type) { case *ManagerMessage_GetPeerUpdate: - // TODO: actually get the peer updates + state, err := t.conn.CurrentWorkspaceState() + if err != nil { + t.logger.Critical(t.ctx, "failed to get current workspace state", slog.Error(err)) + } resp.Msg = &TunnelMessage_PeerUpdate{ - PeerUpdate: &PeerUpdate{ - UpsertedWorkspaces: nil, - UpsertedAgents: nil, - }, + PeerUpdate: convertWorkspaceUpdate(state), } return resp case *ManagerMessage_Start: @@ -84,27 +122,35 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage { slog.F("url", startReq.CoderUrl), slog.F("tunnel_fd", startReq.TunnelFileDescriptor), ) - // TODO: actually start the tunnel + err := t.start(startReq) + var errStr string + if err != nil { + t.logger.Error(t.ctx, "failed to start tunnel", slog.Error(err)) + errStr = err.Error() + } resp.Msg = &TunnelMessage_Start{ Start: &StartResponse{ - Success: true, + Success: err == nil, + ErrorMessage: errStr, }, } return resp case *ManagerMessage_Stop: t.logger.Info(t.ctx, "stopping CoderVPN tunnel") - // TODO: actually stop the tunnel - resp.Msg = &TunnelMessage_Stop{ - Stop: &StopResponse{ - Success: true, - }, - } - err := t.speaker.Close() + err := t.stop(msg.Stop) + var errStr string if err != nil { - t.logger.Error(t.ctx, "failed to close speaker", slog.Error(err)) + t.logger.Error(t.ctx, "failed to stop tunnel", slog.Error(err)) + errStr = err.Error() } else { t.logger.Info(t.ctx, "coderVPN tunnel stopped") } + resp.Msg = &TunnelMessage_Stop{ + Stop: &StopResponse{ + Success: err == nil, + ErrorMessage: errStr, + }, + } return resp default: t.logger.Warn(t.ctx, "unhandled manager request", slog.F("request", msg)) @@ -112,6 +158,24 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage { } } +func UseAsRouter() TunnelOption { + return func(t *Tunnel) { + t.router = NewRouter(t) + } +} + +func UseAsLogger() TunnelOption { + return func(t *Tunnel) { + t.clientLogger = slog.Make(t) + } +} + +func UseAsDNSConfig() TunnelOption { + return func(t *Tunnel) { + t.dnsConfigurator = NewDNSConfigurator(t) + } +} + // ApplyNetworkSettings sends a request to the manager to apply the given network settings func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRequest) error { msg, err := t.speaker.unaryRPC(ctx, &TunnelMessage{ @@ -129,6 +193,65 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe return nil } +func (t *Tunnel) Update(update tailnet.WorkspaceUpdate) error { + msg := &TunnelMessage{ + Msg: &TunnelMessage_PeerUpdate{ + PeerUpdate: convertWorkspaceUpdate(update), + }, + } + select { + case <-t.ctx.Done(): + return t.ctx.Err() + case t.sendCh <- msg: + } + return nil +} + +func (t *Tunnel) start(req *StartRequest) error { + rawURL := req.GetCoderUrl() + if rawURL == "" { + return xerrors.New("missing coder url") + } + svrURL, err := url.Parse(rawURL) + if err != nil { + return xerrors.Errorf("parse url %q: %w", rawURL, err) + } + apiToken := req.GetApiToken() + if apiToken == "" { + return xerrors.New("missing api token") + } + var header http.Header + for _, h := range req.GetHeaders() { + header.Add(h.GetName(), h.GetValue()) + } + + if t.conn == nil { + t.conn, err = t.client.NewConn( + t.ctx, + svrURL, + apiToken, + &Options{ + Headers: header, + Logger: t.clientLogger, + DNSConfigurator: t.dnsConfigurator, + Router: t.router, + TUNFileDescriptor: ptr.Ref(int(req.GetTunnelFileDescriptor())), + UpdateHandler: t, + }, + ) + } else { + t.logger.Warn(t.ctx, "asked to start tunnel, but tunnel is already running") + } + return err +} + +func (t *Tunnel) stop(*StopRequest) error { + if t.conn == nil { + return nil + } + return t.conn.Close() +} + var _ slog.Sink = &Tunnel{} func (t *Tunnel) LogEntry(_ context.Context, e slog.SinkEntry) { @@ -170,6 +293,60 @@ func sinkEntryToPb(e slog.SinkEntry) *Log { return l } +func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate { + out := &PeerUpdate{ + UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)), + UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)), + DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)), + DeletedAgents: make([]*Agent, len(update.DeletedAgents)), + } + for i, ws := range update.UpsertedWorkspaces { + out.UpsertedWorkspaces[i] = &Workspace{ + Id: tailnet.UUIDToByteSlice(ws.ID), + Name: ws.Name, + Status: Workspace_Status(ws.Status), + } + } + for i, agent := range update.UpsertedAgents { + fqdn := make([]string, 0, len(agent.Hosts)) + for name := range agent.Hosts { + fqdn = append(fqdn, name.WithTrailingDot()) + } + out.UpsertedAgents[i] = &Agent{ + Id: tailnet.UUIDToByteSlice(agent.ID), + Name: agent.Name, + WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID), + Fqdn: fqdn, + IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()}, + // TODO: Populate + LastHandshake: nil, + } + } + for i, ws := range update.DeletedWorkspaces { + out.DeletedWorkspaces[i] = &Workspace{ + Id: tailnet.UUIDToByteSlice(ws.ID), + Name: ws.Name, + Status: Workspace_Status(ws.Status), + } + } + for i, agent := range update.DeletedAgents { + fqdn := make([]string, 0, len(agent.Hosts)) + for name := range agent.Hosts { + fqdn = append(fqdn, name.WithTrailingDot()) + } + out.DeletedAgents[i] = &Agent{ + Id: tailnet.UUIDToByteSlice(agent.ID), + Name: agent.Name, + WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID), + Fqdn: fqdn, + IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()}, + // TODO: Populate + LastHandshake: nil, + } + } + return out +} + // the following are taken from sloghuman: func formatValue(v interface{}) string { diff --git a/vpn/tunnel_internal_test.go b/vpn/tunnel_internal_test.go new file mode 100644 index 0000000000000..ed5a7e429dccd --- /dev/null +++ b/vpn/tunnel_internal_test.go @@ -0,0 +1,280 @@ +package vpn + +import ( + "context" + "net" + "net/url" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/tailnet" + "github.com/coder/coder/v2/testutil" +) + +func newFakeClient(ctx context.Context, t *testing.T) *fakeClient { + return &fakeClient{ + t: t, + ctx: ctx, + ch: make(chan *fakeConn, 1), + } +} + +type fakeClient struct { + t *testing.T + ctx context.Context + ch chan *fakeConn +} + +var _ Client = (*fakeClient)(nil) + +func (f *fakeClient) NewConn(context.Context, *url.URL, string, *Options) (Conn, error) { + select { + case <-f.ctx.Done(): + return nil, f.ctx.Err() + case conn := <-f.ch: + return conn, nil + } +} + +func newFakeConn(state tailnet.WorkspaceUpdate) *fakeConn { + return &fakeConn{ + closed: make(chan struct{}), + state: state, + } +} + +type fakeConn struct { + state tailnet.WorkspaceUpdate + closed chan struct{} + doClose sync.Once +} + +var _ Conn = (*fakeConn)(nil) + +func (f *fakeConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) { + return f.state, nil +} + +func (f *fakeConn) Close() error { + f.doClose.Do(func() { + close(f.closed) + }) + return nil +} + +func TestTunnel_StartStop(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + client := newFakeClient(ctx, t) + conn := newFakeConn(tailnet.WorkspaceUpdate{}) + + _, mgr := setupTunnel(t, ctx, client) + + errCh := make(chan error, 1) + var resp *TunnelMessage + // When: we start the tunnel + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_Start{ + Start: &StartRequest{ + TunnelFileDescriptor: 2, + CoderUrl: "https://coder.example.com", + ApiToken: "fakeToken", + }, + }, + }) + resp = r + errCh <- err + }() + // Then: `NewConn` is called, + testutil.RequireSendCtx(ctx, t, client.ch, conn) + // And: a response is received + err := testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + _, ok := resp.Msg.(*TunnelMessage_Start) + require.True(t, ok) + + // When: we stop the tunnel + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_Stop{}, + }) + resp = r + errCh <- err + }() + // Then: `Close` is called on the connection + testutil.RequireRecvCtx(ctx, t, conn.closed) + // And: a Stop response is received + err = testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + _, ok = resp.Msg.(*TunnelMessage_Stop) + require.True(t, ok) + + err = mgr.Close() + require.NoError(t, err) +} + +func TestTunnel_PeerUpdate(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + wsID1 := uuid.UUID{1} + wsID2 := uuid.UUID{2} + + client := newFakeClient(ctx, t) + conn := newFakeConn(tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wsID1, + }, + { + ID: wsID2, + }, + }, + }) + + tun, mgr := setupTunnel(t, ctx, client) + + errCh := make(chan error, 1) + var resp *TunnelMessage + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_Start{ + Start: &StartRequest{ + TunnelFileDescriptor: 2, + CoderUrl: "https://coder.example.com", + ApiToken: "fakeToken", + }, + }, + }) + resp = r + errCh <- err + }() + testutil.RequireSendCtx(ctx, t, client.ch, conn) + err := testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + _, ok := resp.Msg.(*TunnelMessage_Start) + require.True(t, ok) + + err = tun.Update(tailnet.WorkspaceUpdate{ + UpsertedWorkspaces: []*tailnet.Workspace{ + { + ID: wsID2, + }, + }, + }) + require.NoError(t, err) + // Then: the tunnel sends a PeerUpdate message + req := testutil.RequireRecvCtx(ctx, t, mgr.requests) + require.Nil(t, req.msg.Rpc) + require.NotNil(t, req.msg.GetPeerUpdate()) + require.Len(t, req.msg.GetPeerUpdate().UpsertedWorkspaces, 1) + require.Equal(t, wsID2[:], req.msg.GetPeerUpdate().UpsertedWorkspaces[0].Id) + + // When: the manager requests a PeerUpdate + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_GetPeerUpdate{}, + }) + resp = r + errCh <- err + }() + // Then: a PeerUpdate message is sent using the Conn's state + err = testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + _, ok = resp.Msg.(*TunnelMessage_PeerUpdate) + require.True(t, ok) + require.Len(t, resp.GetPeerUpdate().UpsertedWorkspaces, 2) + require.Equal(t, wsID1[:], resp.GetPeerUpdate().UpsertedWorkspaces[0].Id) + require.Equal(t, wsID2[:], resp.GetPeerUpdate().UpsertedWorkspaces[1].Id) +} + +func TestTunnel_NetworkSettings(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + client := newFakeClient(ctx, t) + conn := newFakeConn(tailnet.WorkspaceUpdate{}) + + tun, mgr := setupTunnel(t, ctx, client) + + errCh := make(chan error, 1) + var resp *TunnelMessage + go func() { + r, err := mgr.unaryRPC(ctx, &ManagerMessage{ + Msg: &ManagerMessage_Start{ + Start: &StartRequest{ + TunnelFileDescriptor: 2, + CoderUrl: "https://coder.example.com", + ApiToken: "fakeToken", + }, + }, + }) + resp = r + errCh <- err + }() + testutil.RequireSendCtx(ctx, t, client.ch, conn) + err := testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + _, ok := resp.Msg.(*TunnelMessage_Start) + require.True(t, ok) + + // When: we inform the tunnel of network settings + go func() { + err := tun.ApplyNetworkSettings(ctx, &NetworkSettingsRequest{ + Mtu: 1200, + }) + errCh <- err + }() + // Then: the tunnel sends a NetworkSettings message + req := testutil.RequireRecvCtx(ctx, t, mgr.requests) + require.NotNil(t, req.msg.Rpc) + require.Equal(t, uint32(1200), req.msg.GetNetworkSettings().Mtu) + go func() { + testutil.RequireSendCtx(ctx, t, mgr.sendCh, &ManagerMessage{ + Rpc: &RPC{ResponseTo: req.msg.Rpc.MsgId}, + Msg: &ManagerMessage_NetworkSettings{ + NetworkSettings: &NetworkSettingsResponse{ + Success: true, + }, + }, + }) + }() + // And: `ApplyNetworkSettings` returns without error once the manager responds + err = testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) +} + +//nolint:revive // t takes precedence +func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) { + mp, tp := net.Pipe() + t.Cleanup(func() { _ = mp.Close() }) + t.Cleanup(func() { _ = tp.Close() }) + logger := testutil.Logger(t) + + var tun *Tunnel + var mgr *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage] + errCh := make(chan error, 2) + go func() { + tunnel, err := NewTunnel(ctx, logger.Named("tunnel"), tp, client) + tun = tunnel + errCh <- err + }() + go func() { + manager, err := newSpeaker[*ManagerMessage, *TunnelMessage](ctx, logger.Named("manager"), mp, SpeakerRoleManager, SpeakerRoleTunnel) + mgr = manager + errCh <- err + }() + err := testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + err = testutil.RequireRecvCtx(ctx, t, errCh) + require.NoError(t, err) + mgr.start() + return tun, mgr +} diff --git a/vpn/vpn.pb.go b/vpn/vpn.pb.go index 6e47440814993..181f57d617fa4 100644 --- a/vpn/vpn.pb.go +++ b/vpn/vpn.pb.go @@ -718,7 +718,7 @@ type Agent struct { Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // UUID Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` WorkspaceId []byte `protobuf:"bytes,3,opt,name=workspace_id,json=workspaceId,proto3" json:"workspace_id,omitempty"` // UUID - Fqdn string `protobuf:"bytes,4,opt,name=fqdn,proto3" json:"fqdn,omitempty"` + Fqdn []string `protobuf:"bytes,4,rep,name=fqdn,proto3" json:"fqdn,omitempty"` IpAddrs []string `protobuf:"bytes,5,rep,name=ip_addrs,json=ipAddrs,proto3" json:"ip_addrs,omitempty"` // last_handshake is the primary indicator of whether we are connected to a peer. Zero value or // anything longer than 5 minutes ago means there is a problem. @@ -778,11 +778,11 @@ func (x *Agent) GetWorkspaceId() []byte { return nil } -func (x *Agent) GetFqdn() string { +func (x *Agent) GetFqdn() []string { if x != nil { return x.Fqdn } - return "" + return nil } func (x *Agent) GetIpAddrs() []string { @@ -953,9 +953,10 @@ type StartRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TunnelFileDescriptor int32 `protobuf:"varint,1,opt,name=tunnel_file_descriptor,json=tunnelFileDescriptor,proto3" json:"tunnel_file_descriptor,omitempty"` - CoderUrl string `protobuf:"bytes,2,opt,name=coder_url,json=coderUrl,proto3" json:"coder_url,omitempty"` - ApiToken string `protobuf:"bytes,3,opt,name=api_token,json=apiToken,proto3" json:"api_token,omitempty"` + TunnelFileDescriptor int32 `protobuf:"varint,1,opt,name=tunnel_file_descriptor,json=tunnelFileDescriptor,proto3" json:"tunnel_file_descriptor,omitempty"` + CoderUrl string `protobuf:"bytes,2,opt,name=coder_url,json=coderUrl,proto3" json:"coder_url,omitempty"` + ApiToken string `protobuf:"bytes,3,opt,name=api_token,json=apiToken,proto3" json:"api_token,omitempty"` + Headers []*StartRequest_Header `protobuf:"bytes,4,rep,name=headers,proto3" json:"headers,omitempty"` } func (x *StartRequest) Reset() { @@ -1011,6 +1012,13 @@ func (x *StartRequest) GetApiToken() string { return "" } +func (x *StartRequest) GetHeaders() []*StartRequest_Header { + if x != nil { + return x.Headers + } + return nil +} + type StartResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1579,6 +1587,62 @@ func (x *NetworkSettingsRequest_IPv6Settings_IPv6Route) GetRouter() string { return "" } +// Additional HTTP headers added to all requests +type StartRequest_Header struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *StartRequest_Header) Reset() { + *x = StartRequest_Header{} + if protoimpl.UnsafeEnabled { + mi := &file_vpn_vpn_proto_msgTypes[20] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StartRequest_Header) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StartRequest_Header) ProtoMessage() {} + +func (x *StartRequest_Header) ProtoReflect() protoreflect.Message { + mi := &file_vpn_vpn_proto_msgTypes[20] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StartRequest_Header.ProtoReflect.Descriptor instead. +func (*StartRequest_Header) Descriptor() ([]byte, []int) { + return file_vpn_vpn_proto_rawDescGZIP(), []int{10, 0} +} + +func (x *StartRequest_Header) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *StartRequest_Header) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + var File_vpn_vpn_proto protoreflect.FileDescriptor var file_vpn_vpn_proto_rawDesc = []byte{ @@ -1680,7 +1744,7 @@ var file_vpn_vpn_proto_rawDesc = []byte{ 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x71, 0x64, 0x6e, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x71, 0x64, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x69, + 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x66, 0x71, 0x64, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x69, 0x70, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, 0x73, 0x12, 0x41, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, @@ -1775,30 +1839,37 @@ var file_vpn_vpn_proto_rawDesc = []byte{ 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x7e, 0x0a, 0x0c, 0x53, - 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x16, 0x74, - 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x72, - 0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x74, 0x75, 0x6e, - 0x6e, 0x65, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, - 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x12, 0x1b, - 0x0a, 0x09, 0x61, 0x70, 0x69, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x08, 0x61, 0x70, 0x69, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x4e, 0x0a, 0x0d, 0x53, - 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, - 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, - 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, - 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x4d, 0x0a, 0x0c, 0x53, 0x74, - 0x6f, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, - 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, - 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x39, 0x5a, 0x1d, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, - 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x76, 0x70, 0x6e, 0xaa, 0x02, 0x17, 0x43, 0x6f, 0x64, - 0x65, 0x72, 0x2e, 0x44, 0x65, 0x73, 0x6b, 0x74, 0x6f, 0x70, 0x2e, 0x56, 0x70, 0x6e, 0x2e, 0x50, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xe6, 0x01, 0x0a, 0x0c, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x16, + 0x74, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x64, 0x65, 0x73, 0x63, + 0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x74, 0x75, + 0x6e, 0x6e, 0x65, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, + 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x12, + 0x1b, 0x0a, 0x09, 0x61, 0x70, 0x69, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x61, 0x70, 0x69, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x32, 0x0a, 0x07, + 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, + 0x76, 0x70, 0x6e, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x1a, 0x32, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x22, 0x4e, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, + 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x22, 0x4d, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, + 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x42, 0x39, 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, + 0x76, 0x70, 0x6e, 0xaa, 0x02, 0x17, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x44, 0x65, 0x73, 0x6b, + 0x74, 0x6f, 0x70, 0x2e, 0x56, 0x70, 0x6e, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1814,7 +1885,7 @@ func file_vpn_vpn_proto_rawDescGZIP() []byte { } var file_vpn_vpn_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_vpn_vpn_proto_msgTypes = make([]protoimpl.MessageInfo, 20) +var file_vpn_vpn_proto_msgTypes = make([]protoimpl.MessageInfo, 21) var file_vpn_vpn_proto_goTypes = []interface{}{ (Log_Level)(0), // 0: vpn.Log.Level (Workspace_Status)(0), // 1: vpn.Workspace.Status @@ -1838,7 +1909,8 @@ var file_vpn_vpn_proto_goTypes = []interface{}{ (*NetworkSettingsRequest_IPv6Settings)(nil), // 19: vpn.NetworkSettingsRequest.IPv6Settings (*NetworkSettingsRequest_IPv4Settings_IPv4Route)(nil), // 20: vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route (*NetworkSettingsRequest_IPv6Settings_IPv6Route)(nil), // 21: vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route - (*timestamppb.Timestamp)(nil), // 22: google.protobuf.Timestamp + (*StartRequest_Header)(nil), // 22: vpn.StartRequest.Header + (*timestamppb.Timestamp)(nil), // 23: google.protobuf.Timestamp } var file_vpn_vpn_proto_depIdxs = []int32{ 2, // 0: vpn.ManagerMessage.rpc:type_name -> vpn.RPC @@ -1859,19 +1931,20 @@ var file_vpn_vpn_proto_depIdxs = []int32{ 8, // 15: vpn.PeerUpdate.deleted_workspaces:type_name -> vpn.Workspace 9, // 16: vpn.PeerUpdate.deleted_agents:type_name -> vpn.Agent 1, // 17: vpn.Workspace.status:type_name -> vpn.Workspace.Status - 22, // 18: vpn.Agent.last_handshake:type_name -> google.protobuf.Timestamp + 23, // 18: vpn.Agent.last_handshake:type_name -> google.protobuf.Timestamp 17, // 19: vpn.NetworkSettingsRequest.dns_settings:type_name -> vpn.NetworkSettingsRequest.DNSSettings 18, // 20: vpn.NetworkSettingsRequest.ipv4_settings:type_name -> vpn.NetworkSettingsRequest.IPv4Settings 19, // 21: vpn.NetworkSettingsRequest.ipv6_settings:type_name -> vpn.NetworkSettingsRequest.IPv6Settings - 20, // 22: vpn.NetworkSettingsRequest.IPv4Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route - 20, // 23: vpn.NetworkSettingsRequest.IPv4Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route - 21, // 24: vpn.NetworkSettingsRequest.IPv6Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route - 21, // 25: vpn.NetworkSettingsRequest.IPv6Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route - 26, // [26:26] is the sub-list for method output_type - 26, // [26:26] is the sub-list for method input_type - 26, // [26:26] is the sub-list for extension type_name - 26, // [26:26] is the sub-list for extension extendee - 0, // [0:26] is the sub-list for field type_name + 22, // 22: vpn.StartRequest.headers:type_name -> vpn.StartRequest.Header + 20, // 23: vpn.NetworkSettingsRequest.IPv4Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route + 20, // 24: vpn.NetworkSettingsRequest.IPv4Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route + 21, // 25: vpn.NetworkSettingsRequest.IPv6Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route + 21, // 26: vpn.NetworkSettingsRequest.IPv6Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route + 27, // [27:27] is the sub-list for method output_type + 27, // [27:27] is the sub-list for method input_type + 27, // [27:27] is the sub-list for extension type_name + 27, // [27:27] is the sub-list for extension extendee + 0, // [0:27] is the sub-list for field type_name } func init() { file_vpn_vpn_proto_init() } @@ -2120,6 +2193,18 @@ func file_vpn_vpn_proto_init() { return nil } } + file_vpn_vpn_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StartRequest_Header); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_vpn_vpn_proto_msgTypes[1].OneofWrappers = []interface{}{ (*ManagerMessage_GetPeerUpdate)(nil), @@ -2140,7 +2225,7 @@ func file_vpn_vpn_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_vpn_vpn_proto_rawDesc, NumEnums: 2, - NumMessages: 20, + NumMessages: 21, NumExtensions: 0, NumServices: 0, }, diff --git a/vpn/vpn.proto b/vpn/vpn.proto index 1d21f7cabb1a7..10dfeb3916aa6 100644 --- a/vpn/vpn.proto +++ b/vpn/vpn.proto @@ -105,7 +105,7 @@ message Agent { bytes id = 1; // UUID string name = 2; bytes workspace_id = 3; // UUID - string fqdn = 4; + repeated string fqdn = 4; repeated string ip_addrs = 5; // last_handshake is the primary indicator of whether we are connected to a peer. Zero value or // anything longer than 5 minutes ago means there is a problem. @@ -179,6 +179,12 @@ message StartRequest { int32 tunnel_file_descriptor = 1; string coder_url = 2; string api_token = 3; + // Additional HTTP headers added to all requests + message Header { + string name = 1; + string value = 2; + } + repeated Header headers = 4; } message StartResponse {