|
1 | 1 | """Test the interactive interpreter."""
|
2 | 2 |
|
3 | 3 | import os
|
| 4 | +import select |
4 | 5 | import subprocess
|
5 | 6 | import sys
|
6 | 7 | import unittest
|
7 | 8 | from textwrap import dedent
|
8 | 9 | from test import support
|
9 |
| -from test.support import cpython_only, has_subprocess_support, SuppressCrashReport |
10 |
| -from test.support.script_helper import assert_python_failure, kill_python, assert_python_ok |
| 10 | +from test.support import ( |
| 11 | + cpython_only, |
| 12 | + has_subprocess_support, |
| 13 | + os_helper, |
| 14 | + SuppressCrashReport, |
| 15 | + SHORT_TIMEOUT, |
| 16 | +) |
| 17 | +from test.support.script_helper import kill_python |
11 | 18 | from test.support.import_helper import import_module
|
12 | 19 |
|
| 20 | +try: |
| 21 | + import pty |
| 22 | +except ImportError: |
| 23 | + pty = None |
| 24 | + |
13 | 25 |
|
14 | 26 | if not has_subprocess_support:
|
15 | 27 | raise unittest.SkipTest("test module requires subprocess")
|
@@ -195,9 +207,56 @@ def bar(x):
|
195 | 207 | expected = "(30, None, [\'def foo(x):\\n\', \' return x + 1\\n\', \'\\n\'], \'<stdin>\')"
|
196 | 208 | self.assertIn(expected, output, expected)
|
197 | 209 |
|
198 |
| - def test_asyncio_repl_no_tty_fails(self): |
199 |
| - assert assert_python_failure("-m", "asyncio") |
| 210 | + def test_asyncio_repl_reaches_python_startup_script(self): |
| 211 | + with os_helper.temp_dir() as tmpdir: |
| 212 | + script = os.path.join(tmpdir, "pythonstartup.py") |
| 213 | + with open(script, "w") as f: |
| 214 | + f.write("print('pythonstartup done!')" + os.linesep) |
| 215 | + f.write("exit(0)" + os.linesep) |
| 216 | + |
| 217 | + env = os.environ.copy() |
| 218 | + env["PYTHONSTARTUP"] = script |
| 219 | + subprocess.check_call( |
| 220 | + [sys.executable, "-m", "asyncio"], |
| 221 | + stdout=subprocess.PIPE, |
| 222 | + stderr=subprocess.PIPE, |
| 223 | + env=env, |
| 224 | + timeout=SHORT_TIMEOUT, |
| 225 | + ) |
| 226 | + |
| 227 | + @unittest.skipUnless(pty, "requires pty") |
| 228 | + def test_asyncio_repl_is_ok(self): |
| 229 | + m, s = pty.openpty() |
| 230 | + cmd = [sys.executable, "-m", "asyncio"] |
| 231 | + proc = subprocess.Popen( |
| 232 | + cmd, |
| 233 | + stdin=s, |
| 234 | + stdout=s, |
| 235 | + stderr=s, |
| 236 | + text=True, |
| 237 | + close_fds=True, |
| 238 | + env=os.environ, |
| 239 | + ) |
| 240 | + os.close(s) |
| 241 | + os.write(m, b"await asyncio.sleep(0)\n") |
| 242 | + os.write(m, b"exit()\n") |
| 243 | + output = [] |
| 244 | + while select.select([m], [], [], SHORT_TIMEOUT)[0]: |
| 245 | + try: |
| 246 | + data = os.read(m, 1024).decode("utf-8") |
| 247 | + if not data: |
| 248 | + break |
| 249 | + except OSError: |
| 250 | + break |
| 251 | + output.append(data) |
| 252 | + os.close(m) |
| 253 | + try: |
| 254 | + exit_code = proc.wait(timeout=SHORT_TIMEOUT) |
| 255 | + except subprocess.TimeoutExpired: |
| 256 | + proc.kill() |
| 257 | + exit_code = proc.wait() |
200 | 258 |
|
| 259 | + self.assertEqual(exit_code, 0) |
201 | 260 |
|
202 | 261 | class TestInteractiveModeSyntaxErrors(unittest.TestCase):
|
203 | 262 |
|
|
0 commit comments