Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cli/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (r *RootCmd) ping() *serpent.Command {
_, _ = fmt.Fprintln(inv.Stderr, "Direct connections disabled.")
opts.BlockEndpoints = true
}
if !r.noNetworkTelemetry {
opts.EnableTelemetry = true
}
conn, err := workspacesdk.New(client).DialAgent(ctx, workspaceAgent.ID, opts)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cli/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (r *RootCmd) portForward() *serpent.Command {
_, _ = fmt.Fprintln(inv.Stderr, "Direct connections disabled.")
opts.BlockEndpoints = true
}
if !r.noNetworkTelemetry {
opts.EnableTelemetry = true
}
conn, err := workspacesdk.New(client).DialAgent(ctx, workspaceAgent.ID, opts)
if err != nil {
return err
Expand Down
39 changes: 24 additions & 15 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,20 @@ var (
)

const (
varURL = "url"
varToken = "token"
varAgentToken = "agent-token"
varAgentTokenFile = "agent-token-file"
varAgentURL = "agent-url"
varHeader = "header"
varHeaderCommand = "header-command"
varNoOpen = "no-open"
varNoVersionCheck = "no-version-warning"
varNoFeatureWarning = "no-feature-warning"
varForceTty = "force-tty"
varVerbose = "verbose"
varDisableDirect = "disable-direct-connections"
varURL = "url"
varToken = "token"
varAgentToken = "agent-token"
varAgentTokenFile = "agent-token-file"
varAgentURL = "agent-url"
varHeader = "header"
varHeaderCommand = "header-command"
varNoOpen = "no-open"
varNoVersionCheck = "no-version-warning"
varNoFeatureWarning = "no-feature-warning"
varForceTty = "force-tty"
varVerbose = "verbose"
varDisableDirect = "disable-direct-connections"
varDisableNetworkTelemetry = "disable-network-telemetry"

notLoggedInMessage = "You are not logged in. Try logging in using 'coder login <url>'."

Expand Down Expand Up @@ -435,6 +436,13 @@ func (r *RootCmd) Command(subcommands []*serpent.Command) (*serpent.Command, err
Value: serpent.BoolOf(&r.disableDirect),
Group: globalGroup,
},
{
Flag: varDisableNetworkTelemetry,
Env: "CODER_DISABLE_NETWORK_TELEMETRY",
Description: "Disable network telemetry.",
Value: serpent.BoolOf(&r.noNetworkTelemetry),
Group: globalGroup,
},
{
Flag: "debug-http",
Description: "Debug codersdk HTTP requests.",
Expand Down Expand Up @@ -481,8 +489,9 @@ type RootCmd struct {
disableDirect bool
debugHTTP bool

noVersionCheck bool
noFeatureWarning bool
noNetworkTelemetry bool
noVersionCheck bool
noFeatureWarning bool
}

// InitClient authenticates the client with files from disk
Expand Down
4 changes: 4 additions & 0 deletions cli/speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func (r *RootCmd) speedtest() *serpent.Command {
_, _ = fmt.Fprintln(inv.Stderr, "Direct connections disabled.")
opts.BlockEndpoints = true
}
if !r.noNetworkTelemetry {
opts.EnableTelemetry = true
}
if pcapFile != "" {
s := capture.New()
opts.CaptureHook = s.LogPacket
Expand Down Expand Up @@ -183,6 +186,7 @@ func (r *RootCmd) speedtest() *serpent.Command {
outputResult.Intervals[i] = interval
}
}
conn.Conn.SendSpeedtestTelemetry(outputResult.Overall.ThroughputMbits)
out, err := formatter.Format(inv.Context(), outputResult)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions cli/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ func (r *RootCmd) ssh() *serpent.Command {
}
conn, err := workspacesdk.New(client).
DialAgent(ctx, workspaceAgent.ID, &workspacesdk.DialAgentOptions{
Logger: logger,
BlockEndpoints: r.disableDirect,
Logger: logger,
BlockEndpoints: r.disableDirect,
EnableTelemetry: !r.noNetworkTelemetry,
})
if err != nil {
return xerrors.Errorf("dial agent: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions cli/testdata/coder_--help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ variables or flags.
--disable-direct-connections bool, $CODER_DISABLE_DIRECT_CONNECTIONS
Disable direct (P2P) connections to workspaces.

--disable-network-telemetry bool, $CODER_DISABLE_NETWORK_TELEMETRY
Disable network telemetry.

--global-config string, $CODER_CONFIG_DIR (default: ~/.config/coderv2)
Path to the global `coder` config directory.

Expand Down
40 changes: 14 additions & 26 deletions coderd/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,9 +1163,9 @@ type Netcheck struct {
IPv4 bool `json:"ipv4"`
IPv6CanSend bool `json:"ipv6_can_send"`
IPv4CanSend bool `json:"ipv4_can_send"`
OSHasIPv6 bool `json:"os_has_ipv6"`
ICMPv4 bool `json:"icmpv4"`

OSHasIPv6 *bool `json:"os_has_ipv6"`
MappingVariesByDestIP *bool `json:"mapping_varies_by_dest_ip"`
HairPinning *bool `json:"hair_pinning"`
UPnP *bool `json:"upnp"`
Expand Down Expand Up @@ -1210,9 +1210,9 @@ func netcheckFromProto(proto *tailnetproto.Netcheck) Netcheck {
IPv4: proto.IPv4,
IPv6CanSend: proto.IPv6CanSend,
IPv4CanSend: proto.IPv4CanSend,
OSHasIPv6: proto.OSHasIPv6,
ICMPv4: proto.ICMPv4,

OSHasIPv6: protoBool(proto.OSHasIPv6),
MappingVariesByDestIP: protoBool(proto.MappingVariesByDestIP),
HairPinning: protoBool(proto.HairPinning),
UPnP: protoBool(proto.UPnP),
Expand All @@ -1221,33 +1221,28 @@ func netcheckFromProto(proto *tailnetproto.Netcheck) Netcheck {

PreferredDERP: proto.PreferredDERP,

RegionLatency: durationMapFromProto(proto.RegionLatency),
RegionV4Latency: durationMapFromProto(proto.RegionV4Latency),
RegionV6Latency: durationMapFromProto(proto.RegionV6Latency),

GlobalV4: netcheckIPFromProto(proto.GlobalV4),
GlobalV6: netcheckIPFromProto(proto.GlobalV6),

CaptivePortal: protoBool(proto.CaptivePortal),
}
}

// NetworkEvent and all related structs come from tailnet.proto.
type NetworkEvent struct {
ID uuid.UUID `json:"id"`
Time time.Time `json:"time"`
Application string `json:"application"`
Status string `json:"status"` // connected, disconnected
DisconnectionReason string `json:"disconnection_reason"`
ClientType string `json:"client_type"` // cli, agent, coderd, wsproxy
NodeIDSelf uint64 `json:"node_id_self"`
NodeIDRemote uint64 `json:"node_id_remote"`
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
LogIPHashes map[string]NetworkEventIPFields `json:"log_ip_hashes"`
HomeDERP string `json:"home_derp"`
Logs []string `json:"logs"`
DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"`
ID uuid.UUID `json:"id"`
Time time.Time `json:"time"`
Application string `json:"application"`
Status string `json:"status"` // connected, disconnected
DisconnectionReason string `json:"disconnection_reason"`
ClientType string `json:"client_type"` // cli, agent, coderd, wsproxy
NodeIDSelf uint64 `json:"node_id_self"`
NodeIDRemote uint64 `json:"node_id_remote"`
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
HomeDERP string `json:"home_derp"`
DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"`

ConnectionAge *time.Duration `json:"connection_age"`
ConnectionSetup *time.Duration `json:"connection_setup"`
Expand Down Expand Up @@ -1281,11 +1276,6 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
return NetworkEvent{}, xerrors.Errorf("parse id %q: %w", proto.Id, err)
}

logIPHashes := make(map[string]NetworkEventIPFields, len(proto.LogIpHashes))
for k, v := range proto.LogIpHashes {
logIPHashes[k] = ipFieldsFromProto(v)
}

return NetworkEvent{
ID: id,
Time: proto.Time.AsTime(),
Expand All @@ -1296,9 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
NodeIDSelf: proto.NodeIdSelf,
NodeIDRemote: proto.NodeIdRemote,
P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint),
LogIPHashes: logIPHashes,
HomeDERP: proto.HomeDerp,
Logs: proto.Logs,
DERPMap: derpMapFromProto(proto.DerpMap),
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),

Expand Down
2 changes: 2 additions & 0 deletions codersdk/workspacesdk/agentconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (c *AgentConn) SSH(ctx context.Context) (*gonet.TCPConn, error) {
return nil, xerrors.Errorf("workspace agent not reachable in time: %v", ctx.Err())
}

c.Conn.SendConnectedTelemetry(c.agentAddress(), tailnet.TelemetryApplicationSSH)
return c.Conn.DialContextTCP(ctx, netip.AddrPortFrom(c.agentAddress(), AgentSSHPort))
}

Expand Down Expand Up @@ -185,6 +186,7 @@ func (c *AgentConn) Speedtest(ctx context.Context, direction speedtest.Direction
return nil, xerrors.Errorf("workspace agent not reachable in time: %v", ctx.Err())
}

c.Conn.SendConnectedTelemetry(c.agentAddress(), tailnet.TelemetryApplicationSpeedtest)
speedConn, err := c.Conn.DialContextTCP(ctx, netip.AddrPortFrom(c.agentAddress(), AgentSpeedtestPort))
if err != nil {
return nil, xerrors.Errorf("dial speedtest: %w", err)
Expand Down
88 changes: 62 additions & 26 deletions codersdk/workspacesdk/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import (
"io"
"net/http"
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"storj.io/drpc"
"storj.io/drpc/drpcerr"
"tailscale.com/tailcfg"

"cdr.dev/slog"
Expand All @@ -38,6 +42,7 @@ type tailnetConn interface {
//
// 1) run the Coordinate API and pass node information back and forth
// 2) stream DERPMap updates and program the Conn
// 3) Send network telemetry events
//
// These functions share the same websocket, and so are combined here so that if we hit a problem
// we tear the whole thing down and start over with a new websocket.
Expand All @@ -58,32 +63,32 @@ type tailnetAPIConnector struct {
coordinateURL string
dialOptions *websocket.DialOptions
conn tailnetConn
customDialFn func() (proto.DRPCTailnetClient, error)

clientMu sync.RWMutex
client proto.DRPCTailnetClient

connected chan error
isFirst bool
closed chan struct{}

// Only set to true if we get a response from the server that it doesn't support
// network telemetry.
telemetryUnavailable atomic.Bool
}

// runTailnetAPIConnector creates and runs a tailnetAPIConnector
func runTailnetAPIConnector(
ctx context.Context, logger slog.Logger,
agentID uuid.UUID, coordinateURL string, dialOptions *websocket.DialOptions,
conn tailnetConn,
) *tailnetAPIConnector {
tac := &tailnetAPIConnector{
// Create a new tailnetAPIConnector without running it
func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uuid.UUID, coordinateURL string, dialOptions *websocket.DialOptions) *tailnetAPIConnector {
return &tailnetAPIConnector{
ctx: ctx,
logger: logger,
agentID: agentID,
coordinateURL: coordinateURL,
dialOptions: dialOptions,
conn: conn,
conn: nil,
connected: make(chan error, 1),
closed: make(chan struct{}),
}
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
go tac.manageGracefulTimeout()
go tac.run()
return tac
}

// manageGracefulTimeout allows the gracefulContext to last 1 second longer than the main context
Expand All @@ -99,21 +104,27 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
}
}

func (tac *tailnetAPIConnector) run() {
tac.isFirst = true
defer close(tac.closed)
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(tac.ctx); {
tailnetClient, err := tac.dial()
if xerrors.Is(err, &codersdk.Error{}) {
return
}
if err != nil {
continue
// Runs a tailnetAPIConnector using the provided connection
func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
tac.conn = conn
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
go tac.manageGracefulTimeout()
go func() {
tac.isFirst = true
defer close(tac.closed)
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(tac.ctx); {
tailnetClient, err := tac.dial()
if err != nil {
continue
}
tac.clientMu.Lock()
tac.client = tailnetClient
tac.clientMu.Unlock()
tac.logger.Debug(tac.ctx, "obtained tailnet API v2+ client")
tac.coordinateAndDERPMap(tailnetClient)
tac.logger.Debug(tac.ctx, "tailnet API v2+ connection lost")
}
tac.logger.Debug(tac.ctx, "obtained tailnet API v2+ client")
tac.coordinateAndDERPMap(tailnetClient)
tac.logger.Debug(tac.ctx, "tailnet API v2+ connection lost")
}
}()
}

var permanentErrorStatuses = []int{
Expand All @@ -123,6 +134,9 @@ var permanentErrorStatuses = []int{
}

func (tac *tailnetAPIConnector) dial() (proto.DRPCTailnetClient, error) {
if tac.customDialFn != nil {
return tac.customDialFn()
}
tac.logger.Debug(tac.ctx, "dialing Coder tailnet v2+ API")
// nolint:bodyclose
ws, res, err := websocket.Dial(tac.ctx, tac.coordinateURL, tac.dialOptions)
Expand Down Expand Up @@ -194,7 +208,10 @@ func (tac *tailnetAPIConnector) coordinateAndDERPMap(client proto.DRPCTailnetCli
// we do NOT want to gracefully disconnect on the coordinate() routine. So, we'll just
// close the underlying connection. This will trigger a retry of the control plane in
// run().
tac.clientMu.Lock()
client.DRPCConn().Close()
tac.client = nil
tac.clientMu.Unlock()
// Note that derpMap() logs it own errors, we don't bother here.
}
}()
Expand Down Expand Up @@ -258,3 +275,22 @@ func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
tac.conn.SetDERPMap(dm)
}
}

func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent) {
tac.clientMu.RLock()
// We hold the lock for the entire telemetry request, but this would only block
// a coordinate retry, and closing the connection.
defer tac.clientMu.RUnlock()
if tac.client == nil || tac.telemetryUnavailable.Load() {
return
}
ctx, cancel := context.WithTimeout(tac.ctx, 5*time.Second)
defer cancel()
_, err := tac.client.PostTelemetry(ctx, &proto.TelemetryRequest{
Events: []*proto.TelemetryEvent{event},
})
if drpcerr.Code(err) == drpcerr.Unimplemented || drpc.ProtocolError.Has(err) && strings.Contains(err.Error(), "unknown rpc: ") {
tac.logger.Debug(tac.ctx, "attempted to send telemetry to a server that doesn't support it", slog.Error(err))
tac.telemetryUnavailable.Store(true)
}
}
Loading