Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: add DRPC tailnet & cli implementation
  • Loading branch information
ethanndickson committed Jul 2, 2024
commit 974c570320d14cc2ff9dfbe2d9e76b7e1eb99acb
3 changes: 3 additions & 0 deletions cli/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (r *RootCmd) ping() *serpent.Command {
_, _ = fmt.Fprintln(inv.Stderr, "Direct connections disabled.")
opts.BlockEndpoints = true
}
if r.noNetworkTelemetry {
opts.EnableTelemetry = true
}
conn, err := workspacesdk.New(client).DialAgent(ctx, workspaceAgent.ID, opts)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cli/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (r *RootCmd) portForward() *serpent.Command {
_, _ = fmt.Fprintln(inv.Stderr, "Direct connections disabled.")
opts.BlockEndpoints = true
}
if r.noNetworkTelemetry {
opts.EnableTelemetry = true
}
conn, err := workspacesdk.New(client).DialAgent(ctx, workspaceAgent.ID, opts)
if err != nil {
return err
Expand Down
55 changes: 41 additions & 14 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
)

const (
<<<<<<< HEAD
varURL = "url"
varToken = "token"
varAgentToken = "agent-token"
Expand All @@ -65,6 +66,23 @@ const (
varForceTty = "force-tty"
varVerbose = "verbose"
varDisableDirect = "disable-direct-connections"
=======
varURL = "url"
varToken = "token"
varAgentToken = "agent-token"
varAgentTokenFile = "agent-token-file"
varAgentURL = "agent-url"
varHeader = "header"
varHeaderCommand = "header-command"
varNoOpen = "no-open"
varNoVersionCheck = "no-version-warning"
varNoFeatureWarning = "no-feature-warning"
varForceTty = "force-tty"
varVerbose = "verbose"
varOrganizationSelect = "organization"
varDisableDirect = "disable-direct-connections"
varDisableNetworkTelemetry = "disable-network-telemetry"
>>>>>>> 365c3bc71 (chore: add DRPC tailnet & cli implementation)

notLoggedInMessage = "You are not logged in. Try logging in using 'coder login <url>'."

Expand Down Expand Up @@ -435,6 +453,13 @@ func (r *RootCmd) Command(subcommands []*serpent.Command) (*serpent.Command, err
Value: serpent.BoolOf(&r.disableDirect),
Group: globalGroup,
},
{
Flag: varDisableNetworkTelemetry,
Env: "CODER_DISABLE_NETWORK_TELEMETRY",
Description: "Disable network telemetry.",
Value: serpent.BoolOf(&r.noNetworkTelemetry),
Group: globalGroup,
},
{
Flag: "debug-http",
Description: "Debug codersdk HTTP requests.",
Expand Down Expand Up @@ -466,20 +491,22 @@ func (r *RootCmd) Command(subcommands []*serpent.Command) (*serpent.Command, err

// RootCmd contains parameters and helpers useful to all commands.
type RootCmd struct {
clientURL *url.URL
token string
globalConfig string
header []string
headerCommand string
agentToken string
agentTokenFile string
agentURL *url.URL
forceTTY bool
noOpen bool
verbose bool
versionFlag bool
disableDirect bool
debugHTTP bool
clientURL *url.URL
token string
globalConfig string
header []string
headerCommand string
agentToken string
agentTokenFile string
agentURL *url.URL
forceTTY bool
noOpen bool
verbose bool
organizationSelect string
versionFlag bool
disableDirect bool
debugHTTP bool
noNetworkTelemetry bool

noVersionCheck bool
noFeatureWarning bool
Expand Down
3 changes: 3 additions & 0 deletions cli/speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func (r *RootCmd) speedtest() *serpent.Command {
_, _ = fmt.Fprintln(inv.Stderr, "Direct connections disabled.")
opts.BlockEndpoints = true
}
if r.noNetworkTelemetry {
opts.EnableTelemetry = true
}
if pcapFile != "" {
s := capture.New()
opts.CaptureHook = s.LogPacket
Expand Down
5 changes: 3 additions & 2 deletions cli/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ func (r *RootCmd) ssh() *serpent.Command {
}
conn, err := workspacesdk.New(client).
DialAgent(ctx, workspaceAgent.ID, &workspacesdk.DialAgentOptions{
Logger: logger,
BlockEndpoints: r.disableDirect,
Logger: logger,
BlockEndpoints: r.disableDirect,
EnableTelemetry: !r.noNetworkTelemetry,
})
if err != nil {
return xerrors.Errorf("dial agent: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions cli/testdata/coder_--help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ variables or flags.
--disable-direct-connections bool, $CODER_DISABLE_DIRECT_CONNECTIONS
Disable direct (P2P) connections to workspaces.

--disable-network-telemetry bool, $CODER_DISABLE_NETWORK_TELEMETRY
Disable network telemetry.

--global-config string, $CODER_CONFIG_DIR (default: ~/.config/coderv2)
Path to the global `coder` config directory.

Expand Down
76 changes: 50 additions & 26 deletions codersdk/workspacesdk/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,28 @@ type tailnetAPIConnector struct {
coordinateURL string
dialOptions *websocket.DialOptions
conn tailnetConn
customDialFn func() (proto.DRPCTailnetClient, error)

clientMu sync.RWMutex
client proto.DRPCTailnetClient

connected chan error
isFirst bool
closed chan struct{}
}

// runTailnetAPIConnector creates and runs a tailnetAPIConnector
func runTailnetAPIConnector(
ctx context.Context, logger slog.Logger,
agentID uuid.UUID, coordinateURL string, dialOptions *websocket.DialOptions,
conn tailnetConn,
) *tailnetAPIConnector {
tac := &tailnetAPIConnector{
// Create a new tailnetAPIConnector without running it
func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uuid.UUID, coordinateURL string, dialOptions *websocket.DialOptions) *tailnetAPIConnector {
return &tailnetAPIConnector{
ctx: ctx,
logger: logger,
agentID: agentID,
coordinateURL: coordinateURL,
dialOptions: dialOptions,
conn: conn,
conn: nil,
connected: make(chan error, 1),
closed: make(chan struct{}),
}
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
go tac.manageGracefulTimeout()
go tac.run()
return tac
}

// manageGracefulTimeout allows the gracefulContext to last 1 second longer than the main context
Expand All @@ -99,21 +95,27 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
}
}

func (tac *tailnetAPIConnector) run() {
tac.isFirst = true
defer close(tac.closed)
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(tac.ctx); {
tailnetClient, err := tac.dial()
if xerrors.Is(err, &codersdk.Error{}) {
return
}
if err != nil {
continue
// Runs a tailnetAPIConnector using the provided connection
func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
tac.conn = conn
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
go tac.manageGracefulTimeout()
go func() {
tac.isFirst = true
defer close(tac.closed)
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(tac.ctx); {
tailnetClient, err := tac.dial()
if err != nil {
continue
}
tac.clientMu.Lock()
tac.client = tailnetClient
tac.clientMu.Unlock()
tac.logger.Debug(tac.ctx, "obtained tailnet API v2+ client")
tac.coordinateAndDERPMap(tailnetClient)
tac.logger.Debug(tac.ctx, "tailnet API v2+ connection lost")
}
tac.logger.Debug(tac.ctx, "obtained tailnet API v2+ client")
tac.coordinateAndDERPMap(tailnetClient)
tac.logger.Debug(tac.ctx, "tailnet API v2+ connection lost")
}
}()
}

var permanentErrorStatuses = []int{
Expand All @@ -123,6 +125,10 @@ var permanentErrorStatuses = []int{
}

func (tac *tailnetAPIConnector) dial() (proto.DRPCTailnetClient, error) {
if tac.customDialFn != nil {
return tac.customDialFn()
}

tac.logger.Debug(tac.ctx, "dialing Coder tailnet v2+ API")
// nolint:bodyclose
ws, res, err := websocket.Dial(tac.ctx, tac.coordinateURL, tac.dialOptions)
Expand Down Expand Up @@ -194,7 +200,10 @@ func (tac *tailnetAPIConnector) coordinateAndDERPMap(client proto.DRPCTailnetCli
// we do NOT want to gracefully disconnect on the coordinate() routine. So, we'll just
// close the underlying connection. This will trigger a retry of the control plane in
// run().
tac.clientMu.Lock()
client.DRPCConn().Close()
tac.client = nil
tac.clientMu.Unlock()
// Note that derpMap() logs it own errors, we don't bother here.
}
}()
Expand Down Expand Up @@ -258,3 +267,18 @@ func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
tac.conn.SetDERPMap(dm)
}
}

func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent) {
tac.clientMu.RLock()
// We hold the lock for the entire telemetry request, but this would only block
// a coordinate retry, and closing the connection.
defer tac.clientMu.RUnlock()
if tac.client == nil {
return
}
ctx, cancel := context.WithTimeout(tac.ctx, 5*time.Second)
defer cancel()
_, _ = tac.client.PostTelemetry(ctx, &proto.TelemetryRequest{
Events: []*proto.TelemetryEvent{event},
})
}
3 changes: 2 additions & 1 deletion codersdk/workspacesdk/connector_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func TestTailnetAPIConnector_Disconnects(t *testing.T) {

fConn := newFakeTailnetConn()

uut := runTailnetAPIConnector(ctx, logger, agentID, svr.URL, &websocket.DialOptions{}, fConn)
uut := newTailnetAPIConnector(ctx, logger, agentID, svr.URL, &websocket.DialOptions{})
uut.runConnector(fConn)

call := testutil.RequireRecvCtx(ctx, t, fCoord.CoordinateCalls)
reqTun := testutil.RequireRecvCtx(ctx, t, call.Reqs)
Expand Down
63 changes: 35 additions & 28 deletions codersdk/workspacesdk/workspacesdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ type DialAgentOptions struct {
// CaptureHook is a callback that captures Disco packets and packets sent
// into the tailnet tunnel.
CaptureHook capture.Callback
// Whether the client will send network telemetry events
// Enable instead of Disable so it's initialized to false
EnableTelemetry bool
}

func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *DialAgentOptions) (agentConn *AgentConn, err error) {
Expand All @@ -196,29 +199,6 @@ func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *
options.BlockEndpoints = true
}

ip := tailnet.IP()
var header http.Header
if headerTransport, ok := c.client.HTTPClient.Transport.(*codersdk.HeaderTransport); ok {
header = headerTransport.Header
}
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)},
DERPMap: connInfo.DERPMap,
DERPHeader: &header,
DERPForceWebSockets: connInfo.DERPForceWebSockets,
Logger: options.Logger,
BlockEndpoints: c.client.DisableDirectConnections || options.BlockEndpoints,
CaptureHook: options.CaptureHook,
})
if err != nil {
return nil, xerrors.Errorf("create tailnet: %w", err)
}
defer func() {
if err != nil {
_ = conn.Close()
}
}()

headers := make(http.Header)
tokenHeader := codersdk.SessionTokenHeader
if c.client.SessionTokenHeader != "" {
Expand Down Expand Up @@ -251,16 +231,43 @@ func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *
q.Add("version", "2.0")
coordinateURL.RawQuery = q.Encode()

connector := runTailnetAPIConnector(ctx, options.Logger,
agentID, coordinateURL.String(),
connector := newTailnetAPIConnector(ctx, options.Logger, agentID, coordinateURL.String(),
&websocket.DialOptions{
HTTPClient: c.client.HTTPClient,
HTTPHeader: headers,
// Need to disable compression to avoid a data-race.
CompressionMode: websocket.CompressionDisabled,
},
conn,
)
})

ip := tailnet.IP()
var header http.Header
if headerTransport, ok := c.client.HTTPClient.Transport.(*codersdk.HeaderTransport); ok {
header = headerTransport.Header
}
var telemetrySink tailnet.TelemetrySink
if options.EnableTelemetry {
telemetrySink = connector
}
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)},
DERPMap: connInfo.DERPMap,
DERPHeader: &header,
DERPForceWebSockets: connInfo.DERPForceWebSockets,
Logger: options.Logger,
BlockEndpoints: c.client.DisableDirectConnections || options.BlockEndpoints,
CaptureHook: options.CaptureHook,
TelemetrySink: telemetrySink,
})
if err != nil {
return nil, xerrors.Errorf("create tailnet: %w", err)
}
defer func() {
if err != nil {
_ = conn.Close()
}
}()
connector.runConnector(conn)

options.Logger.Debug(ctx, "running tailnet API v2+ connector")

select {
Expand Down
9 changes: 9 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ Enable verbose output.

Disable direct (P2P) connections to workspaces.

### --disable-network-telemetry

| | |
| ----------- | --------------------------------------------- |
| Type | <code>bool</code> |
| Environment | <code>$CODER_DISABLE_NETWORK_TELEMETRY</code> |

Disable network telemetry.

### --global-config

| | |
Expand Down
3 changes: 3 additions & 0 deletions enterprise/cli/testdata/coder_--help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ variables or flags.
--disable-direct-connections bool, $CODER_DISABLE_DIRECT_CONNECTIONS
Disable direct (P2P) connections to workspaces.

--disable-network-telemetry bool, $CODER_DISABLE_NETWORK_TELEMETRY
Disable network telemetry.

--global-config string, $CODER_CONFIG_DIR (default: ~/.config/coderv2)
Path to the global `coder` config directory.

Expand Down
Loading