From 09dc241970b337ffeea717b666d9439195c99119 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Mon, 24 Apr 2023 11:48:05 +0000 Subject: [PATCH 01/10] Enable discovery (disco) debug Signed-off-by: Spike Curtis --- .github/workflows/ci.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 17711ea0fe6c0..6ab0b00017c2a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -301,6 +301,7 @@ jobs: echo "cover=false" >> $GITHUB_OUTPUT fi + export TS_DEBUG_DISCO=true gotestsum --junitfile="gotests.xml" --jsonfile="gotests.json" --packages="./..." -- -parallel=8 -timeout=7m -short -failfast $COVERAGE_FLAGS - name: Print test stats @@ -377,6 +378,7 @@ jobs: - name: Test with PostgreSQL Database run: | + export TS_DEBUG_DISCO=true make test-postgres - name: Print test stats From 504c0cf5877fcae52c698465174a075e1eb7def9 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Mon, 24 Apr 2023 12:26:10 +0000 Subject: [PATCH 02/10] Better debug on reconnectingPTY Signed-off-by: Spike Curtis --- agent/agent.go | 1 + coderd/workspaceapps/proxy.go | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 82f06e66fa2b2..212ebba89e3ec 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -967,6 +967,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m connectionID := uuid.NewString() logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID)) + logger.Debug(ctx, "handling ReconnectingPTY") defer func() { if err := retErr; err != nil { diff --git a/coderd/workspaceapps/proxy.go b/coderd/workspaceapps/proxy.go index 5ee0d4671537f..8d969e6bce0c6 100644 --- a/coderd/workspaceapps/proxy.go +++ b/coderd/workspaceapps/proxy.go @@ -600,6 +600,8 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { if !ok { return } + log := s.Logger.With(slog.F("agent_id", appToken.AgentID)) + log.Debug(ctx, "resolved PTY request") values := r.URL.Query() parser := httpapi.NewQueryParamParser() @@ -632,19 +634,22 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { agentConn, release, err := s.WorkspaceConnCache.Acquire(appToken.AgentID) if err != nil { - s.Logger.Debug(ctx, "dial workspace agent", slog.Error(err)) + log.Debug(ctx, "dial workspace agent", slog.Error(err)) _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err)) return } defer release() + log.Debug(ctx, "dialed workspace agent") ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command")) if err != nil { - s.Logger.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err)) + log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err)) _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err)) return } defer ptNetConn.Close() + log.Debug(ctx, "obtained PTY") agentssh.Bicopy(ctx, wsNetConn, ptNetConn) + log.Debug(ctx, "pty Bicopy finished") } // wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func From b279f1a8aef8934d99ab4de743823bad1a3f8c76 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Tue, 25 Apr 2023 06:21:29 +0000 Subject: [PATCH 03/10] Agent logging in appstest Signed-off-by: Spike Curtis --- agent/agent.go | 3 +++ coderd/workspaceapps/apptest/setup.go | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index 212ebba89e3ec..3936d580214d7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -648,6 +648,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_ } break } + logger.Debug(ctx, "accepted pty conn", slog.F("remote", conn.RemoteAddr().String())) wg.Add(1) closed := make(chan struct{}) go func() { @@ -676,6 +677,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_ var msg codersdk.WorkspaceAgentReconnectingPTYInit err = json.Unmarshal(data, &msg) if err != nil { + logger.Warn(ctx, "failed to unmarshal init", slog.F("raw", data)) return } _ = a.handleReconnectingPTY(ctx, logger, msg, conn) @@ -787,6 +789,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error } func (a *agent) runStartupScript(ctx context.Context, script string) error { + a.logger.Debug(ctx, "running agent startup script") return a.runScript(ctx, "startup", script) } diff --git a/coderd/workspaceapps/apptest/setup.go b/coderd/workspaceapps/apptest/setup.go index 3fceb190c7268..29815dc55c5ae 100644 --- a/coderd/workspaceapps/apptest/setup.go +++ b/coderd/workspaceapps/apptest/setup.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/agent" "github.com/coder/coder/coderd/coderdtest" @@ -364,7 +365,7 @@ func createWorkspaceWithApps(t *testing.T, client *codersdk.Client, orgID uuid.U } agentCloser := agent.New(agent.Options{ Client: agentClient, - Logger: slogtest.Make(t, nil).Named("agent"), + Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), }) t.Cleanup(func() { _ = agentCloser.Close() From 62f3155d266b4ec127b49f38920634849383345f Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Tue, 25 Apr 2023 06:55:05 +0000 Subject: [PATCH 04/10] More reconnectingPTY logging Signed-off-by: Spike Curtis --- agent/agent.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 3936d580214d7..ce76c92a3894b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1038,6 +1038,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m // 1. The timeout completed. // 2. The parent context was canceled. <-ctx.Done() + logger.Debug(ctx, "context done", slog.Error(ctx.Err())) _ = process.Kill() }() // We don't need to separately monitor for the process exiting. @@ -1049,6 +1050,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m read, err := rpty.ptty.OutputReader().Read(buffer) if err != nil { // When the PTY is closed, this is triggered. + // Error is typically a benign EOF, so only log for debugging. + logger.Debug(ctx, "PTY output read error", slog.Error(err)) break } part := buffer[:read] @@ -1060,8 +1063,14 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m break } rpty.activeConnsMutex.Lock() - for _, conn := range rpty.activeConns { - _, _ = conn.Write(part) + for cid, conn := range rpty.activeConns { + _, err = conn.Write(part) + logger.Debug(ctx, + "wrote to active conn", + slog.F("other_conn_id", cid), + slog.F("data", part), + slog.Error(err), + ) } rpty.activeConnsMutex.Unlock() } From 1b9f3285f429128672de815d4c57eb5b7028394e Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Tue, 25 Apr 2023 11:08:00 +0000 Subject: [PATCH 05/10] Add logging to coordinator Signed-off-by: Spike Curtis --- agent/agent_test.go | 36 ++++++++++--------- coderd/coderd.go | 2 +- .../prometheusmetrics_test.go | 3 +- coderd/wsconncache/wsconncache_test.go | 6 ++-- enterprise/coderd/coderd.go | 2 +- tailnet/coordinator.go | 32 +++++++++++++++-- tailnet/coordinator_test.go | 15 +++++--- 7 files changed, 67 insertions(+), 29 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 6527e82031f13..8b5a54fa7dd38 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -714,7 +714,7 @@ func TestAgent_UnixRemoteForwarding(t *testing.T) { var err error conn, err = net.Dial("unix", remoteSocketPath) return err == nil - }, testutil.WaitLong, testutil.IntervalFast) + }, testutil.WaitShort, testutil.IntervalFast) defer conn.Close() _, err = conn.Write([]byte("test")) require.NoError(t, err) @@ -879,6 +879,7 @@ func TestAgent_StartupScript(t *testing.T) { } t.Run("Success", func(t *testing.T) { t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) client := &client{ t: t, agentID: uuid.New(), @@ -887,12 +888,12 @@ func TestAgent_StartupScript(t *testing.T) { DERPMap: &tailcfg.DERPMap{}, }, statsChan: make(chan *agentsdk.Stats), - coordinator: tailnet.NewCoordinator(), + coordinator: tailnet.NewCoordinator(logger), } closer := agent.New(agent.Options{ Client: client, Filesystem: afero.NewMemMapFs(), - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: 0, }) t.Cleanup(func() { @@ -910,6 +911,7 @@ func TestAgent_StartupScript(t *testing.T) { // script has written too many lines it will still succeed! t.Run("OverflowsAndSkips", func(t *testing.T) { t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) client := &client{ t: t, agentID: uuid.New(), @@ -927,12 +929,12 @@ func TestAgent_StartupScript(t *testing.T) { return codersdk.ReadBodyAsError(res) }, statsChan: make(chan *agentsdk.Stats), - coordinator: tailnet.NewCoordinator(), + coordinator: tailnet.NewCoordinator(logger), } closer := agent.New(agent.Options{ Client: client, Filesystem: afero.NewMemMapFs(), - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: 0, }) t.Cleanup(func() { @@ -1282,7 +1284,7 @@ func TestAgent_Lifecycle(t *testing.T) { t.Run("ShutdownScriptOnce", func(t *testing.T) { t.Parallel() - + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) expected := "this-is-shutdown" client := &client{ t: t, @@ -1293,13 +1295,13 @@ func TestAgent_Lifecycle(t *testing.T) { ShutdownScript: "echo " + expected, }, statsChan: make(chan *agentsdk.Stats), - coordinator: tailnet.NewCoordinator(), + coordinator: tailnet.NewCoordinator(logger), } fs := afero.NewMemMapFs() agent := agent.New(agent.Options{ Client: client, - Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), Filesystem: fs, }) @@ -1548,9 +1550,10 @@ func TestAgent_Speedtest(t *testing.T) { func TestAgent_Reconnect(t *testing.T) { t.Parallel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) // After the agent is disconnected from a coordinator, it's supposed // to reconnect! - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(logger) defer coordinator.Close() agentID := uuid.New() @@ -1572,7 +1575,7 @@ func TestAgent_Reconnect(t *testing.T) { return "", nil }, Client: client, - Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), }) defer closer.Close() @@ -1587,8 +1590,8 @@ func TestAgent_Reconnect(t *testing.T) { func TestAgent_WriteVSCodeConfigs(t *testing.T) { t.Parallel() - - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) defer coordinator.Close() client := &client{ @@ -1607,7 +1610,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) { return "", nil }, Client: client, - Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), Filesystem: filesystem, }) defer closer.Close() @@ -1698,10 +1701,11 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati afero.Fs, io.Closer, ) { + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) if metadata.DERPMap == nil { metadata.DERPMap = tailnettest.RunDERPAndSTUN(t) } - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(logger) t.Cleanup(func() { _ = coordinator.Close() }) @@ -1718,7 +1722,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati closer := agent.New(agent.Options{ Client: c, Filesystem: fs, - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: ptyTimeout, }) t.Cleanup(func() { @@ -1727,7 +1731,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati conn, err := tailnet.NewConn(&tailnet.Options{ Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)}, DERPMap: metadata.DERPMap, - Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug), + Logger: logger.Named("client"), }) require.NoError(t, err) clientConn, serverConn := net.Pipe() diff --git a/coderd/coderd.go b/coderd/coderd.go index 4013c0cc77e8b..53cae8721562c 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -221,7 +221,7 @@ func New(options *Options) *API { options.PrometheusRegistry = prometheus.NewRegistry() } if options.TailnetCoordinator == nil { - options.TailnetCoordinator = tailnet.NewCoordinator() + options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger) } if options.DERPServer == nil { options.DERPServer = derp.NewServer(key.NewNode(), tailnet.Logger(options.Logger.Named("derp"))) diff --git a/coderd/prometheusmetrics/prometheusmetrics_test.go b/coderd/prometheusmetrics/prometheusmetrics_test.go index 56d32cc6dd6de..9101288cca570 100644 --- a/coderd/prometheusmetrics/prometheusmetrics_test.go +++ b/coderd/prometheusmetrics/prometheusmetrics_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/coderd/coderdtest" @@ -298,7 +299,7 @@ func TestAgents(t *testing.T) { coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) // given - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(slogtest.Make(t, nil).Leveled(slog.LevelDebug)) coordinatorPtr := atomic.Pointer[tailnet.Coordinator]{} coordinatorPtr.Store(&coordinator) derpMap := tailnettest.RunDERPAndSTUN(t) diff --git a/coderd/wsconncache/wsconncache_test.go b/coderd/wsconncache/wsconncache_test.go index 24f0f241a123d..6fdecbcf7bf3f 100644 --- a/coderd/wsconncache/wsconncache_test.go +++ b/coderd/wsconncache/wsconncache_test.go @@ -156,10 +156,10 @@ func TestCache(t *testing.T) { func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Duration) *codersdk.WorkspaceAgentConn { t.Helper() - + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) manifest.DERPMap = tailnettest.RunDERPAndSTUN(t) - coordinator := tailnet.NewCoordinator() + coordinator := tailnet.NewCoordinator(logger) t.Cleanup(func() { _ = coordinator.Close() }) @@ -171,7 +171,7 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati manifest: manifest, coordinator: coordinator, }, - Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelInfo), + Logger: logger.Named("agent"), ReconnectingPTYTimeout: ptyTimeout, }) t.Cleanup(func() { diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 3a7ac382506e2..0979a25809d43 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -390,7 +390,7 @@ func (api *API) updateEntitlements(ctx context.Context) error { } if changed, enabled := featureChanged(codersdk.FeatureHighAvailability); changed { - coordinator := agpltailnet.NewCoordinator() + coordinator := agpltailnet.NewCoordinator(api.Logger) if enabled { haCoordinator, err := tailnet.NewCoordinator(api.Logger, api.Pubsub) if err != nil { diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index d7cbfc13db2ca..894d1de207cd9 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -13,6 +13,8 @@ import ( "sync/atomic" "time" + "cdr.dev/slog" + "github.com/google/uuid" lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/exp/slices" @@ -111,16 +113,19 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func }, errChan } +const LoggerName = "coord" + // NewCoordinator constructs a new in-memory connection coordinator. This // coordinator is incompatible with multiple Coder replicas as all node data is // in-memory. -func NewCoordinator() Coordinator { +func NewCoordinator(logger slog.Logger) Coordinator { nameCache, err := lru.New[uuid.UUID, string](512) if err != nil { panic("make lru cache: " + err.Error()) } return &coordinator{ + logger: logger.Named(LoggerName), closed: false, nodes: map[uuid.UUID]*Node{}, agentSockets: map[uuid.UUID]*TrackedConn{}, @@ -137,6 +142,7 @@ func NewCoordinator() Coordinator { // This coordinator is incompatible with multiple Coder // replicas as all node data is in-memory. type coordinator struct { + logger slog.Logger mutex sync.RWMutex closed bool @@ -194,6 +200,8 @@ func (c *coordinator) AgentCount() int { // ServeClient accepts a WebSocket connection that wants to connect to an agent // with the specified ID. func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error { + logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent)) + logger.Debug(context.TODO(), "coordinating client") c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -210,6 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) return xerrors.Errorf("marshal node: %w", err) } _, err = conn.Write(data) + logger.Debug(context.TODO(), "wrote initial node") if err != nil { return xerrors.Errorf("write nodes: %w", err) } @@ -230,7 +239,9 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) LastWrite: now, } c.mutex.Unlock() + logger.Debug(context.TODO(), "added tracked connection") defer func() { + logger.Debug(context.TODO(), "deleting tracked connection") c.mutex.Lock() defer c.mutex.Unlock() // Clean all traces of this connection from the map. @@ -259,11 +270,13 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) } func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json.Decoder) error { + logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent)) var node Node err := decoder.Decode(&node) if err != nil { return xerrors.Errorf("read json: %w", err) } + logger.Debug(context.TODO(), "got client node update", slog.F("node", node)) c.mutex.Lock() // Update the node of this client in our in-memory map. If an agent entirely @@ -274,6 +287,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json agentSocket, ok := c.agentSockets[agent] if !ok { c.mutex.Unlock() + logger.Debug(context.TODO(), "no agent socket") return nil } c.mutex.Unlock() @@ -291,6 +305,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json } return xerrors.Errorf("write json: %w", err) } + logger.Debug(context.TODO(), "sent client node to agent") return nil } @@ -298,6 +313,8 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json // ServeAgent accepts a WebSocket connection to an agent that // listens to incoming connections and publishes node updates. func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error { + logger := c.logger.With(slog.F("agent_id", id)) + logger.Debug(context.TODO(), "coordinating agent") c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -324,6 +341,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error return xerrors.Errorf("marshal json: %w", err) } _, err = conn.Write(data) + logger.Debug(context.TODO(), "wrote initial client(s) to agent", slog.F("nodes", nodes)) if err != nil { return xerrors.Errorf("write nodes: %w", err) } @@ -356,6 +374,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error } c.mutex.Unlock() + logger.Debug(context.TODO(), "added agent socket") defer func() { c.mutex.Lock() defer c.mutex.Unlock() @@ -365,6 +384,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique { delete(c.agentSockets, id) delete(c.nodes, id) + logger.Debug(context.TODO(), "deleted agent socket") } }() @@ -381,17 +401,20 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error } func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder) error { + logger := c.logger.With(slog.F("agent_id", id)) var node Node err := decoder.Decode(&node) if err != nil { return xerrors.Errorf("read json: %w", err) } + logger.Debug(context.TODO(), "decoded agent node", slog.F("node", node)) c.mutex.Lock() c.nodes[id] = &node connectionSockets, ok := c.agentToConnectionSockets[id] if !ok { c.mutex.Unlock() + logger.Debug(context.TODO(), "no client sockets") return nil } data, err := json.Marshal([]*Node{&node}) @@ -403,11 +426,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder // Publish the new node to every listening socket. var wg sync.WaitGroup wg.Add(len(connectionSockets)) - for _, connectionSocket := range connectionSockets { + for clientID, connectionSocket := range connectionSockets { + clientID := clientID connectionSocket := connectionSocket go func() { _ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second)) - _, _ = connectionSocket.Write(data) + _, err := connectionSocket.Write(data) + logger.Debug(context.TODO(), "sent agent node to client", + slog.F("client_id", clientID), slog.Error(err)) wg.Done() }() } diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index 7dc90ff6f49f0..61117751cfc96 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -4,6 +4,9 @@ import ( "net" "testing" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,7 +19,8 @@ func TestCoordinator(t *testing.T) { t.Parallel() t.Run("ClientWithoutAgent", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) client, server := net.Pipe() sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error { return nil @@ -40,7 +44,8 @@ func TestCoordinator(t *testing.T) { t.Run("AgentWithoutClients", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) client, server := net.Pipe() sendNode, errChan := tailnet.ServeCoordinator(client, func(node []*tailnet.Node) error { return nil @@ -64,7 +69,8 @@ func TestCoordinator(t *testing.T) { t.Run("AgentWithClient", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) agentWS, agentServerWS := net.Pipe() defer agentWS.Close() @@ -148,7 +154,8 @@ func TestCoordinator(t *testing.T) { t.Run("AgentDoubleConnect", func(t *testing.T) { t.Parallel() - coordinator := tailnet.NewCoordinator() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + coordinator := tailnet.NewCoordinator(logger) agentWS1, agentServerWS1 := net.Pipe() defer agentWS1.Close() From ce1bf3e50c1a248f8742cabd856f433e0695fd58 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 27 Apr 2023 07:52:44 +0400 Subject: [PATCH 06/10] Update agent/agent.go Co-authored-by: Mathias Fredriksson --- agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index ce76c92a3894b..5604aa81232b9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -648,7 +648,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_ } break } - logger.Debug(ctx, "accepted pty conn", slog.F("remote", conn.RemoteAddr().String())) + logger.Debug(ctx, "accepted conn", slog.F("remote", conn.RemoteAddr().String())) wg.Add(1) closed := make(chan struct{}) go func() { From c85417cc1f6e0203e67a7fdf5c3e1fa256cf446f Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 27 Apr 2023 07:52:50 +0400 Subject: [PATCH 07/10] Update agent/agent.go Co-authored-by: Mathias Fredriksson --- agent/agent.go | 1 - 1 file changed, 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index 5604aa81232b9..27ca516bb9675 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -789,7 +789,6 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error } func (a *agent) runStartupScript(ctx context.Context, script string) error { - a.logger.Debug(ctx, "running agent startup script") return a.runScript(ctx, "startup", script) } From 8d40b7939219a3bcc894fddca3ca729586abeb8f Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 27 Apr 2023 07:52:59 +0400 Subject: [PATCH 08/10] Update agent/agent.go Co-authored-by: Mathias Fredriksson --- agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index 27ca516bb9675..482bb303474c3 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -969,7 +969,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m connectionID := uuid.NewString() logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID)) - logger.Debug(ctx, "handling ReconnectingPTY") + logger.Debug(ctx, "starting handler") defer func() { if err := retErr; err != nil { From 1e2ba57f56c271637677775d3787a464d8c3cf8c Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 27 Apr 2023 07:53:15 +0400 Subject: [PATCH 09/10] Update agent/agent.go Co-authored-by: Mathias Fredriksson --- agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index 482bb303474c3..9a70564253b87 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1050,7 +1050,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m if err != nil { // When the PTY is closed, this is triggered. // Error is typically a benign EOF, so only log for debugging. - logger.Debug(ctx, "PTY output read error", slog.Error(err)) + logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err)) break } part := buffer[:read] From 96d43cbaa593366db352345df2afdf35122ea893 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Thu, 27 Apr 2023 04:22:58 +0000 Subject: [PATCH 10/10] Clarify logs; remove unrelated changes Signed-off-by: Spike Curtis --- agent/agent.go | 13 +++++++------ agent/agent_test.go | 2 +- tailnet/coordinator.go | 34 ++++++++++++++++++---------------- 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 9a70564253b87..426cee1e0c8f9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1064,12 +1064,13 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m rpty.activeConnsMutex.Lock() for cid, conn := range rpty.activeConns { _, err = conn.Write(part) - logger.Debug(ctx, - "wrote to active conn", - slog.F("other_conn_id", cid), - slog.F("data", part), - slog.Error(err), - ) + if err != nil { + logger.Debug(ctx, + "error writing to active conn", + slog.F("other_conn_id", cid), + slog.Error(err), + ) + } } rpty.activeConnsMutex.Unlock() } diff --git a/agent/agent_test.go b/agent/agent_test.go index 8b5a54fa7dd38..1d5a852f7dc26 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -714,7 +714,7 @@ func TestAgent_UnixRemoteForwarding(t *testing.T) { var err error conn, err = net.Dial("unix", remoteSocketPath) return err == nil - }, testutil.WaitShort, testutil.IntervalFast) + }, testutil.WaitLong, testutil.IntervalFast) defer conn.Close() _, err = conn.Write([]byte("test")) require.NoError(t, err) diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index 894d1de207cd9..0fc790053a822 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -113,7 +113,7 @@ func ServeCoordinator(conn net.Conn, updateNodes func(node []*Node) error) (func }, errChan } -const LoggerName = "coord" +const loggerName = "coord" // NewCoordinator constructs a new in-memory connection coordinator. This // coordinator is incompatible with multiple Coder replicas as all node data is @@ -125,7 +125,7 @@ func NewCoordinator(logger slog.Logger) Coordinator { } return &coordinator{ - logger: logger.Named(LoggerName), + logger: logger.Named(loggerName), closed: false, nodes: map[uuid.UUID]*Node{}, agentSockets: map[uuid.UUID]*TrackedConn{}, @@ -201,7 +201,7 @@ func (c *coordinator) AgentCount() int { // with the specified ID. func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) error { logger := c.logger.With(slog.F("client_id", id), slog.F("agent_id", agent)) - logger.Debug(context.TODO(), "coordinating client") + logger.Debug(context.Background(), "coordinating client") c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -218,7 +218,7 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) return xerrors.Errorf("marshal node: %w", err) } _, err = conn.Write(data) - logger.Debug(context.TODO(), "wrote initial node") + logger.Debug(context.Background(), "wrote initial node") if err != nil { return xerrors.Errorf("write nodes: %w", err) } @@ -239,22 +239,24 @@ func (c *coordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) LastWrite: now, } c.mutex.Unlock() - logger.Debug(context.TODO(), "added tracked connection") + logger.Debug(context.Background(), "added tracked connection") defer func() { - logger.Debug(context.TODO(), "deleting tracked connection") c.mutex.Lock() defer c.mutex.Unlock() // Clean all traces of this connection from the map. delete(c.nodes, id) + logger.Debug(context.Background(), "deleted client node") connectionSockets, ok := c.agentToConnectionSockets[agent] if !ok { return } delete(connectionSockets, id) + logger.Debug(context.Background(), "deleted client connectionSocket from map") if len(connectionSockets) != 0 { return } delete(c.agentToConnectionSockets, agent) + logger.Debug(context.Background(), "deleted last client connectionSocket from map") }() decoder := json.NewDecoder(conn) @@ -276,7 +278,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json if err != nil { return xerrors.Errorf("read json: %w", err) } - logger.Debug(context.TODO(), "got client node update", slog.F("node", node)) + logger.Debug(context.Background(), "got client node update", slog.F("node", node)) c.mutex.Lock() // Update the node of this client in our in-memory map. If an agent entirely @@ -287,7 +289,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json agentSocket, ok := c.agentSockets[agent] if !ok { c.mutex.Unlock() - logger.Debug(context.TODO(), "no agent socket") + logger.Debug(context.Background(), "no agent socket, unable to send node") return nil } c.mutex.Unlock() @@ -305,7 +307,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json } return xerrors.Errorf("write json: %w", err) } - logger.Debug(context.TODO(), "sent client node to agent") + logger.Debug(context.Background(), "sent client node to agent") return nil } @@ -314,7 +316,7 @@ func (c *coordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *json // listens to incoming connections and publishes node updates. func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error { logger := c.logger.With(slog.F("agent_id", id)) - logger.Debug(context.TODO(), "coordinating agent") + logger.Debug(context.Background(), "coordinating agent") c.mutex.Lock() if c.closed { c.mutex.Unlock() @@ -341,7 +343,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error return xerrors.Errorf("marshal json: %w", err) } _, err = conn.Write(data) - logger.Debug(context.TODO(), "wrote initial client(s) to agent", slog.F("nodes", nodes)) + logger.Debug(context.Background(), "wrote initial client(s) to agent", slog.F("nodes", nodes)) if err != nil { return xerrors.Errorf("write nodes: %w", err) } @@ -374,7 +376,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error } c.mutex.Unlock() - logger.Debug(context.TODO(), "added agent socket") + logger.Debug(context.Background(), "added agent socket") defer func() { c.mutex.Lock() defer c.mutex.Unlock() @@ -384,7 +386,7 @@ func (c *coordinator) ServeAgent(conn net.Conn, id uuid.UUID, name string) error if idConn, ok := c.agentSockets[id]; ok && idConn.ID == unique { delete(c.agentSockets, id) delete(c.nodes, id) - logger.Debug(context.TODO(), "deleted agent socket") + logger.Debug(context.Background(), "deleted agent socket") } }() @@ -407,14 +409,14 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder if err != nil { return xerrors.Errorf("read json: %w", err) } - logger.Debug(context.TODO(), "decoded agent node", slog.F("node", node)) + logger.Debug(context.Background(), "decoded agent node", slog.F("node", node)) c.mutex.Lock() c.nodes[id] = &node connectionSockets, ok := c.agentToConnectionSockets[id] if !ok { c.mutex.Unlock() - logger.Debug(context.TODO(), "no client sockets") + logger.Debug(context.Background(), "no client sockets; unable to send node") return nil } data, err := json.Marshal([]*Node{&node}) @@ -432,7 +434,7 @@ func (c *coordinator) handleNextAgentMessage(id uuid.UUID, decoder *json.Decoder go func() { _ = connectionSocket.SetWriteDeadline(time.Now().Add(5 * time.Second)) _, err := connectionSocket.Write(data) - logger.Debug(context.TODO(), "sent agent node to client", + logger.Debug(context.Background(), "sent agent node to client", slog.F("client_id", clientID), slog.Error(err)) wg.Done() }()