diff --git a/agent/agentssh/x11.go b/agent/agentssh/x11.go index 05d9f866c16f6..28286a5da9305 100644 --- a/agent/agentssh/x11.go +++ b/agent/agentssh/x11.go @@ -83,16 +83,17 @@ func (*Server) x11Callback(_ ssh.Context, _ ssh.X11) bool { // x11Handler is called when a session has requested X11 forwarding. // It listens for X11 connections and forwards them to the client. -func (x *x11Forwarder) x11Handler(ctx ssh.Context, sshSession ssh.Session) (displayNumber int, handled bool) { +func (x *x11Forwarder) x11Handler(sshCtx ssh.Context, sshSession ssh.Session) (displayNumber int, handled bool) { x11, hasX11 := sshSession.X11() if !hasX11 { return -1, false } - serverConn, valid := ctx.Value(ssh.ContextKeyConn).(*gossh.ServerConn) + serverConn, valid := sshCtx.Value(ssh.ContextKeyConn).(*gossh.ServerConn) if !valid { - x.logger.Warn(ctx, "failed to get server connection") + x.logger.Warn(sshCtx, "failed to get server connection") return -1, false } + ctx := slog.With(sshCtx, slog.F("session_id", fmt.Sprintf("%x", serverConn.SessionID()))) hostname, err := os.Hostname() if err != nil { @@ -127,6 +128,7 @@ func (x *x11Forwarder) x11Handler(ctx ssh.Context, sshSession ssh.Session) (disp }() go x.listenForConnections(ctx, x11session, serverConn, x11) + x.logger.Debug(ctx, "X11 forwarding started", slog.F("display", x11session.display)) return x11session.display, true } diff --git a/agent/agentssh/x11_test.go b/agent/agentssh/x11_test.go index a680c088de703..c8e3dd45c1db5 100644 --- a/agent/agentssh/x11_test.go +++ b/agent/agentssh/x11_test.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/hex" "fmt" + "io" "net" "os" "path/filepath" @@ -127,3 +128,162 @@ func TestServer_X11(t *testing.T) { _, err = fs.Stat(filepath.Join(home, ".Xauthority")) require.NoError(t, err) } + +func TestServer_X11_EvictionLRU(t *testing.T) { + t.Parallel() + if runtime.GOOS != "linux" { + t.Skip("X11 forwarding is only supported on Linux") + } + + ctx := testutil.Context(t, testutil.WaitLong) + logger := testutil.Logger(t) + fs := afero.NewMemMapFs() + + // Use in-process networking for X11 forwarding. + inproc := testutil.NewInProcNet() + + cfg := &agentssh.Config{ + X11Net: inproc, + } + + s, err := agentssh.NewServer(ctx, logger, prometheus.NewRegistry(), fs, agentexec.DefaultExecer, cfg) + require.NoError(t, err) + defer s.Close() + err = s.UpdateHostSigner(42) + require.NoError(t, err) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + done := testutil.Go(t, func() { + err := s.Serve(ln) + assert.Error(t, err) + }) + + c := sshClient(t, ln.Addr().String()) + + // Calculate how many simultaneous X11 sessions we can create given the + // configured port range. + startPort := agentssh.X11StartPort + agentssh.X11DefaultDisplayOffset + maxSessions := agentssh.X11MaxPort - startPort + 1 + require.Greater(t, maxSessions, 0, "expected a positive maxSessions value") + + // shellSession holds references to the session and its standard streams so + // that the test can keep them open (and optionally interact with them) for + // the lifetime of the test. If we don't start the Shell with pipes in place, + // the session will be torn down asynchronously during the test. + type shellSession struct { + sess *gossh.Session + stdin io.WriteCloser + stdout io.Reader + stderr io.Reader + // scanner is used to read the output of the session, line by line. + scanner *bufio.Scanner + } + + sessions := make([]shellSession, 0, maxSessions) + for i := 0; i < maxSessions; i++ { + sess, err := c.NewSession() + require.NoError(t, err) + + _, err = sess.SendRequest("x11-req", true, gossh.Marshal(ssh.X11{ + AuthProtocol: "MIT-MAGIC-COOKIE-1", + AuthCookie: hex.EncodeToString([]byte(fmt.Sprintf("cookie%d", i))), + ScreenNumber: uint32(0), + })) + require.NoError(t, err) + + stdin, err := sess.StdinPipe() + require.NoError(t, err) + stdout, err := sess.StdoutPipe() + require.NoError(t, err) + stderr, err := sess.StderrPipe() + require.NoError(t, err) + require.NoError(t, sess.Shell()) + + // The SSH server lazily starts the session. We need to write a command + // and read back to ensure the X11 forwarding is started. + scanner := bufio.NewScanner(stdout) + msg := fmt.Sprintf("ready-%d", i) + _, err = stdin.Write([]byte("echo " + msg + "\n")) + require.NoError(t, err) + // Read until we get the message (first token may be empty due to shell prompt) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if strings.Contains(line, msg) { + break + } + } + require.NoError(t, scanner.Err()) + + sessions = append(sessions, shellSession{ + sess: sess, + stdin: stdin, + stdout: stdout, + stderr: stderr, + scanner: scanner, + }) + } + + // Create one more session which should evict the first (LRU) session and + // therefore reuse the very first display/port. + extraSess, err := c.NewSession() + require.NoError(t, err) + + _, err = extraSess.SendRequest("x11-req", true, gossh.Marshal(ssh.X11{ + AuthProtocol: "MIT-MAGIC-COOKIE-1", + AuthCookie: hex.EncodeToString([]byte("extra")), + ScreenNumber: uint32(0), + })) + require.NoError(t, err) + + // Ask the remote side for the DISPLAY value so we can extract the display + // number that was assigned to this session. + out, err := extraSess.Output("echo DISPLAY=$DISPLAY") + require.NoError(t, err) + + // Example output line: "DISPLAY=localhost:10.0". + var newDisplayNumber int + { + sc := bufio.NewScanner(bytes.NewReader(out)) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if strings.HasPrefix(line, "DISPLAY=") { + parts := strings.SplitN(line, ":", 2) + require.Len(t, parts, 2) + displayPart := parts[1] + if strings.Contains(displayPart, ".") { + displayPart = strings.SplitN(displayPart, ".", 2)[0] + } + var convErr error + newDisplayNumber, convErr = strconv.Atoi(displayPart) + require.NoError(t, convErr) + break + } + } + require.NoError(t, sc.Err()) + } + + // The display number should have wrapped around to the starting value. + assert.Equal(t, agentssh.X11DefaultDisplayOffset, newDisplayNumber, "expected display number to be reused after eviction") + + // validate that the first session was torn down. + _, err = sessions[0].stdin.Write([]byte("echo DISPLAY=$DISPLAY\n")) + require.ErrorIs(t, err, io.EOF) + err = sessions[0].sess.Wait() + require.Error(t, err) + + // Cleanup. + for _, sh := range sessions[1:] { + err = sh.stdin.Close() + require.NoError(t, err) + err = sh.sess.Wait() + require.NoError(t, err) + } + err = extraSess.Close() + require.ErrorIs(t, err, io.EOF) + + err = s.Close() + require.NoError(t, err) + _ = testutil.TryReceive(ctx, t, done) +}