diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 17711ea0fe6c0..6ab0b00017c2a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -301,6 +301,7 @@ jobs: echo "cover=false" >> $GITHUB_OUTPUT fi + export TS_DEBUG_DISCO=true gotestsum --junitfile="gotests.xml" --jsonfile="gotests.json" --packages="./..." -- -parallel=8 -timeout=7m -short -failfast $COVERAGE_FLAGS - name: Print test stats @@ -377,6 +378,7 @@ jobs: - name: Test with PostgreSQL Database run: | + export TS_DEBUG_DISCO=true make test-postgres - name: Print test stats diff --git a/agent/agent.go b/agent/agent.go index 82f06e66fa2b2..426cee1e0c8f9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -648,6 +648,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_ } break } + logger.Debug(ctx, "accepted conn", slog.F("remote", conn.RemoteAddr().String())) wg.Add(1) closed := make(chan struct{}) go func() { @@ -676,6 +677,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_ var msg codersdk.WorkspaceAgentReconnectingPTYInit err = json.Unmarshal(data, &msg) if err != nil { + logger.Warn(ctx, "failed to unmarshal init", slog.F("raw", data)) return } _ = a.handleReconnectingPTY(ctx, logger, msg, conn) @@ -967,6 +969,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m connectionID := uuid.NewString() logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID)) + logger.Debug(ctx, "starting handler") defer func() { if err := retErr; err != nil { @@ -1034,6 +1037,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m // 1. The timeout completed. // 2. The parent context was canceled. <-ctx.Done() + logger.Debug(ctx, "context done", slog.Error(ctx.Err())) _ = process.Kill() }() // We don't need to separately monitor for the process exiting. @@ -1045,6 +1049,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m read, err := rpty.ptty.OutputReader().Read(buffer) if err != nil { // When the PTY is closed, this is triggered. + // Error is typically a benign EOF, so only log for debugging. + logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err)) break } part := buffer[:read] @@ -1056,8 +1062,15 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m break } rpty.activeConnsMutex.Lock() - for _, conn := range rpty.activeConns { - _, _ = conn.Write(part) + for cid, conn := range rpty.activeConns { + _, err = conn.Write(part) + if err != nil { + logger.Debug(ctx, + "error writing to active conn", + slog.F("other_conn_id", cid), + slog.Error(err), + ) + } } rpty.activeConnsMutex.Unlock() } diff --git a/agent/agent_test.go b/agent/agent_test.go index 6527e82031f13..1d5a852f7dc26 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -879,6 +879,7 @@ func TestAgent_StartupScript(t *testing.T) { } t.Run("Success", func(t *testing.T) { t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) client := &client{ t: t, agentID: uuid.New(), @@ -887,12 +888,12 @@ func TestAgent_StartupScript(t *testing.T) { DERPMap: &tailcfg.DERPMap{}, }, statsChan: make(chan *agentsdk.Stats), - coordinator: tailnet.NewCoordinator(), + coordinator: tailnet.NewCoordinator(logger), } closer := agent.New(agent.Options{ Client: client, Filesystem: afero.NewMemMapFs(), - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: 0, }) t.Cleanup(func() { @@ -910,6 +911,7 @@ func TestAgent_StartupScript(t *testing.T) { // script has written too many lines it will still succeed! t.Run("OverflowsAndSkips", func(t *testing.T) { t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) client := &client{ t: t, agentID: uuid.New(), @@ -927,12 +929,12 @@ func TestAgent_StartupScript(t *testing.T) { return codersdk.ReadBodyAsError(res) }, statsChan: make(chan *agentsdk.Stats), - coordinator: tailnet.NewCoordinator(), + coordinator: tailnet.NewCoordinator(logger), } closer := agent.New(agent.Options{ Client: client, Filesystem: afero.NewMemMapFs(), - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: 0, }) t.Cleanup(func() { @@ -1282,7 +1284,7 @@ func TestAgent_Lifecycle(t *testing.T) { t.Run("ShutdownScriptOnce", func(t *testing.T) { t.Parallel() - + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) expected := "this-is-shutdown" client := &client{ t: t, @@ -1293,13 +1295,13 @@ func TestAgent_Lifecycle(t *testing.T) { ShutdownScript: "echo " + expected, }, statsChan: make(chan *agentsdk.Stats), - coordinator: tailnet.NewCoordinator(), + coordinator: tailnet.NewCoordinator(logger), } fs := afero.NewMemMapFs() agent := agent.New(agent.Options{ Client: client, - Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), Filesystem: fs, }) @@ -1548,9 +1550,10 @@ func TestAgent_Speedtest(t *testing.T) { func TestAgent_Reconnect(t *testing.T) { t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) // After the agent is disconnected from a coordinator, it's supposed // to reconnect! - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(logger) defer coordinator.Close() agentID := uuid.New() @@ -1572,7 +1575,7 @@ func TestAgent_Reconnect(t *testing.T) { return "", nil }, Client: client, - Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), }) defer closer.Close() @@ -1587,8 +1590,8 @@ func TestAgent_Reconnect(t *testing.T) { func TestAgent_WriteVSCodeConfigs(t *testing.T) { t.Parallel() - - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) defer coordinator.Close() client := &client{ @@ -1607,7 +1610,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) { return "", nil }, Client: client, - Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), Filesystem: filesystem, }) defer closer.Close() @@ -1698,10 +1701,11 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati afero.Fs, io.Closer, ) { + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) if metadata.DERPMap == nil { metadata.DERPMap = tailnettest.RunDERPAndSTUN(t) } - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(logger) t.Cleanup(func() { _ = coordinator.Close() }) @@ -1718,7 +1722,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati closer := agent.New(agent.Options{ Client: c, Filesystem: fs, - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: ptyTimeout, }) t.Cleanup(func() { @@ -1727,7 +1731,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati conn, err := tailnet.NewConn(&tailnet.Options{ Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)}, DERPMap: metadata.DERPMap, - Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug), + Logger: logger.Named("client"), }) require.NoError(t, err) clientConn, serverConn := net.Pipe() diff --git a/coderd/coderd.go b/coderd/coderd.go index 4013c0cc77e8b..53cae8721562c 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -221,7 +221,7 @@ func New(options *Options) *API { options.PrometheusRegistry = prometheus.NewRegistry() } if options.TailnetCoordinator == nil { - options.TailnetCoordinator = tailnet.NewCoordinator() + options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger) } if options.DERPServer == nil { options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp"))) diff --git a/coderd/prometheusmetrics/prometheusmetrics_test.go b/coderd/prometheusmetrics/prometheusmetrics_test.go index 56d32cc6dd6de..9101288cca570 100644 --- a/coderd/prometheusmetrics/prometheusmetrics_test.go +++ b/coderd/prometheusmetrics/prometheusmetrics_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/coderd/coderdtest" @@ -298,7 +299,7 @@ func TestAgents(t *testing.T) { coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) // given - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug)) coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{} coordinatorPtr.Store(&coordinator) derpMap := tailnettest.RunDERPAndSTUN(t) diff --git a/coderd/workspaceapps/apptest/setup.go b/coderd/workspaceapps/apptest/setup.go index 3fceb190c7268..29815dc55c5ae 100644 --- a/coderd/workspaceapps/apptest/setup.go +++ b/coderd/workspaceapps/apptest/setup.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/agent" "github.com/coder/coder/coderd/coderdtest" @@ -364,7 +365,7 @@ func createWorkspaceWithApps(t *testing.T, client *codersdk.Client, orgID uuid.U } agentCloser := agent.New(agent.Options{ Client: agentClient, - Logger: slogtest.Make(t, nil).Named("agent"), + Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), }) t.Cleanup(func() { _ = agentCloser.Close() diff --git a/coderd/workspaceapps/proxy.go b/coderd/workspaceapps/proxy.go index 5ee0d4671537f..8d969e6bce0c6 100644 --- a/coderd/workspaceapps/proxy.go +++ b/coderd/workspaceapps/proxy.go @@ -600,6 +600,8 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { if !ok { return } + log := s.Logger.With(slog.F("agent_id", appToken.AgentID)) + log.Debug(ctx, "resolved PTY request") values := r.URL.Query() parser := httpapi.NewQueryParamParser() @@ -632,19 +634,22 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { agentConn, release, err := s.WorkspaceConnCache.Acquire(appToken.AgentID) if err != nil { - s.Logger.Debug(ctx, "dial workspace agent", slog.Error(err)) + log.Debug(ctx, "dial workspace agent", slog.Error(err)) _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err)) return } defer release() + log.Debug(ctx, "dialed workspace agent") ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command")) if err != nil { - s.Logger.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err)) + log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err)) _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err)) return } defer ptNetConn.Close() + log.Debug(ctx, "obtained PTY") agentssh.Bicopy(ctx, wsNetConn, ptNetConn) + log.Debug(ctx, "pty Bicopy finished") } // wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func diff --git a/coderd/wsconncache/wsconncache_test.go b/coderd/wsconncache/wsconncache_test.go index 24f0f241a123d..6fdecbcf7bf3f 100644 --- a/coderd/wsconncache/wsconncache_test.go +++ b/coderd/wsconncache/wsconncache_test.go @@ -156,10 +156,10 @@ func TestCache(t *testing.T) { func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Duration) *codersdk.WorkspaceAgentConn { t.Helper() - + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) manifest.DERPMap = tailnettest.RunDERPAndSTUN(t) - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(logger) t.Cleanup(func() { _ = coordinator.Close() }) @@ -171,7 +171,7 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati manifest: manifest, coordinator: coordinator, }, - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: ptyTimeout, }) t.Cleanup(func() { diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 3a7ac382506e2..0979a25809d43 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -390,7 +390,7 @@ func (api *API) updateEntitlements(ctx context.Context) error { } if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed { - coordinator := agpltailnet.NewCoordinator() + coordinator := agpltailnet.NewCoordinator(api.Logger) if enabled { haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub) if err != nil { diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index d7cbfc13db2ca..0fc790053a822 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -13,6 +13,8 @@ import ( "sync/atomic" "time" + "cdr.dev/slog" + "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/exp/slices" @@ -111,16 +113,19 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func }, errChan } +const loggerName = "coord" + // NewCoordinator constructs a new in-memory connection coordinator. This // coordinator is incompatible with multiple Coder replicas as all node data is // in-memory. -func NewCoordinator() Coordinator { +func NewCoordinator(logger slog.Logger) Coordinator { nameCache, err := lru.New[uuid.UUID, string](512) if err != nil { panic("make lru cache: " + err.Error()) } return &coordinator{ + logger: logger.Named(loggerName), closed: false, nodes: map[uuid.UUID]*Node{}, agentSockets: map[uuid.UUID]*TrackedConn{}, @@ -137,6 +142,7 @@ func NewCoordinator() Coordinator { // This coordinator is incompatible with multiple Coder // replicas as all node data is in-memory. type coordinator struct { + logger slog.Logger mutex sync.RWMutex closed bool @@ -194,6 +200,8 @@ func (c *coordinator) AgentCount() int { // ServeClient accepts a WebSocket connection that wants to connect to an agent // with the specified ID. func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error { + logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent)) + logger.Debug(context.Background(), "coordinating client") c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -210,6 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) return xerrors.Errorf("marshal node: %w", err) } _, err = conn.Write(data) + logger.Debug(context.Background(), "wrote initial node") if err != nil { return xerrors.Errorf("write nodes: %w", err) } @@ -230,20 +239,24 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) LastWrite: now, } c.mutex.Unlock() + logger.Debug(context.Background(), "added tracked connection") defer func() { c.mutex.Lock() defer c.mutex.Unlock() // Clean all traces of this connection from the map. delete(c.nodes, id) + logger.Debug(context.Background(), "deleted client node") connectionSockets, ok := c.agentToConnectionSockets[agent] if !ok { return } delete(connectionSockets, id) + logger.Debug(context.Background(), "deleted client connectionSocket from map") if len(connectionSockets) != 0 { return } delete(c.agentToConnectionSockets, agent) + logger.Debug(context.Background(), "deleted last client connectionSocket from map") }() decoder := json.NewDecoder(conn) @@ -259,11 +272,13 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) } func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error { + logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent)) var node Node err := decoder.Decode(&node) if err != nil { return xerrors.Errorf("read json: %w", err) } + logger.Debug(context.Background(), "got client node update", slog.F("node", node)) c.mutex.Lock() // Update the node of this client in our in-memory map. If an agent entirely @@ -274,6 +289,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json agentSocket, ok := c.agentSockets[agent] if !ok { c.mutex.Unlock() + logger.Debug(context.Background(), "no agent socket, unable to send node") return nil } c.mutex.Unlock() @@ -291,6 +307,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json } return xerrors.Errorf("write json: %w", err) } + logger.Debug(context.Background(), "sent client node to agent") return nil } @@ -298,6 +315,8 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json // ServeAgent accepts a WebSocket connection to an agent that // listens to incoming connections and publishes node updates. func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error { + logger := c.logger.With(slog.F("agent_id", id)) + logger.Debug(context.Background(), "coordinating agent") c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -324,6 +343,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error return xerrors.Errorf("marshal json: %w", err) } _, err = conn.Write(data) + logger.Debug(context.Background(), "wrote initial client(s) to agent", slog.F("nodes", nodes)) if err != nil { return xerrors.Errorf("write nodes: %w", err) } @@ -356,6 +376,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error } c.mutex.Unlock() + logger.Debug(context.Background(), "added agent socket") defer func() { c.mutex.Lock() defer c.mutex.Unlock() @@ -365,6 +386,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique { delete(c.agentSockets, id) delete(c.nodes, id) + logger.Debug(context.Background(), "deleted agent socket") } }() @@ -381,17 +403,20 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error } func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error { + logger := c.logger.With(slog.F("agent_id", id)) var node Node err := decoder.Decode(&node) if err != nil { return xerrors.Errorf("read json: %w", err) } + logger.Debug(context.Background(), "decoded agent node", slog.F("node", node)) c.mutex.Lock() c.nodes[id] = &node connectionSockets, ok := c.agentToConnectionSockets[id] if !ok { c.mutex.Unlock() + logger.Debug(context.Background(), "no client sockets; unable to send node") return nil } data, err := json.Marshal([]*Node{&node}) @@ -403,11 +428,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder // Publish the new node to every listening socket. var wg sync.WaitGroup wg.Add(len(connectionSockets)) - for _, connectionSocket := range connectionSockets { + for clientID, connectionSocket := range connectionSockets { + clientID := clientID connectionSocket := connectionSocket go func() { _ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second)) - _, _ = connectionSocket.Write(data) + _, err := connectionSocket.Write(data) + logger.Debug(context.Background(), "sent agent node to client", + slog.F("client_id", clientID), slog.Error(err)) wg.Done() }() } diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index 7dc90ff6f49f0..61117751cfc96 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -4,6 +4,9 @@ import ( "net" "testing" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,7 +19,8 @@ func TestCoordinator(t *testing.T) { t.Parallel() t.Run("ClientWithoutAgent", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) client, server := net.Pipe() sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error { return nil @@ -40,7 +44,8 @@ func TestCoordinator(t *testing.T) { t.Run("AgentWithoutClients", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) client, server := net.Pipe() sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error { return nil @@ -64,7 +69,8 @@ func TestCoordinator(t *testing.T) { t.Run("AgentWithClient", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) agentWS, agentServerWS := net.Pipe() defer agentWS.Close() @@ -148,7 +154,8 @@ func TestCoordinator(t *testing.T) { t.Run("AgentDoubleConnect", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) agentWS1, agentServerWS1 := net.Pipe() defer agentWS1.Close()