Skip to content

Add test_poll.py from Cpython v3.11.2 #4888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions Lib/test/test_poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# Test case for the os.poll() function

import os
import subprocess
import random
import select
import threading
import time
import unittest
from test.support import (
cpython_only, requires_subprocess, requires_working_socket
)
from test.support import threading_helper
from test.support.os_helper import TESTFN


try:
select.poll
except AttributeError:
raise unittest.SkipTest("select.poll not defined")

requires_working_socket(module=True)

def find_ready_matching(ready, flag):
match = []
for fd, mode in ready:
if mode & flag:
match.append(fd)
return match

class PollTests(unittest.TestCase):

def test_poll1(self):
# Basic functional test of poll object
# Create a bunch of pipe and test that poll works with them.

p = select.poll()

NUM_PIPES = 12
MSG = b" This is a test."
MSG_LEN = len(MSG)
readers = []
writers = []
r2w = {}
w2r = {}

for i in range(NUM_PIPES):
rd, wr = os.pipe()
p.register(rd)
p.modify(rd, select.POLLIN)
p.register(wr, select.POLLOUT)
readers.append(rd)
writers.append(wr)
r2w[rd] = wr
w2r[wr] = rd

bufs = []

while writers:
ready = p.poll()
ready_writers = find_ready_matching(ready, select.POLLOUT)
if not ready_writers:
raise RuntimeError("no pipes ready for writing")
wr = random.choice(ready_writers)
os.write(wr, MSG)

ready = p.poll()
ready_readers = find_ready_matching(ready, select.POLLIN)
if not ready_readers:
raise RuntimeError("no pipes ready for reading")
rd = random.choice(ready_readers)
buf = os.read(rd, MSG_LEN)
self.assertEqual(len(buf), MSG_LEN)
bufs.append(buf)
os.close(r2w[rd]) ; os.close( rd )
p.unregister( r2w[rd] )
p.unregister( rd )
writers.remove(r2w[rd])

self.assertEqual(bufs, [MSG] * NUM_PIPES)

def test_poll_unit_tests(self):
# returns NVAL for invalid file descriptor
FD, w = os.pipe()
os.close(FD)
os.close(w)
p = select.poll()
p.register(FD)
r = p.poll()
self.assertEqual(r[0], (FD, select.POLLNVAL))

with open(TESTFN, 'w') as f:
fd = f.fileno()
p = select.poll()
p.register(f)
r = p.poll()
self.assertEqual(r[0][0], fd)
r = p.poll()
self.assertEqual(r[0], (fd, select.POLLNVAL))
os.unlink(TESTFN)

# type error for invalid arguments
p = select.poll()
self.assertRaises(TypeError, p.register, p)
self.assertRaises(TypeError, p.unregister, p)

# can't unregister non-existent object
p = select.poll()
self.assertRaises(KeyError, p.unregister, 3)

# Test error cases
pollster = select.poll()
class Nope:
pass

class Almost:
def fileno(self):
return 'fileno'

self.assertRaises(TypeError, pollster.register, Nope(), 0)
self.assertRaises(TypeError, pollster.register, Almost(), 0)

# Another test case for poll(). This is copied from the test case for
# select(), modified to use poll() instead.

@requires_subprocess()
def test_poll2(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
bufsize=0)
self.enterContext(proc)
p = proc.stdout
pollster = select.poll()
pollster.register( p, select.POLLIN )
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
fdlist = pollster.poll(tout)
if (fdlist == []):
continue
fd, flags = fdlist[0]
if flags & select.POLLHUP:
line = p.readline()
if line != b"":
self.fail('error: pipe seems to be closed, but still returns data')
continue

elif flags & select.POLLIN:
line = p.readline()
if not line:
break
self.assertEqual(line, b'testing...\n')
continue
else:
self.fail('Unexpected return value from select.poll: %s' % fdlist)

# TODO: RUSTPYTHON int overflow
@unittest.expectedFailure
def test_poll3(self):
# test int overflow
pollster = select.poll()
pollster.register(1)

self.assertRaises(OverflowError, pollster.poll, 1 << 64)

x = 2 + 3
if x != 5:
self.fail('Overflow must have occurred')

# Issues #15989, #17919
self.assertRaises(ValueError, pollster.register, 0, -1)
self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
self.assertRaises(ValueError, pollster.modify, 1, -1)
self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)

@cpython_only
def test_poll_c_limits(self):
from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
pollster = select.poll()
pollster.register(1)

# Issues #15989, #17919
self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)

@unittest.skip("TODO: RUSTPYTHON fd reallocation")
@threading_helper.reap_threads
def test_threaded_poll(self):
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
rfds = []
for i in range(10):
fd = os.dup(r)
self.addCleanup(os.close, fd)
rfds.append(fd)
pollster = select.poll()
for fd in rfds:
pollster.register(fd, select.POLLIN)
t = threading.Thread(target=pollster.poll)
t.start()
try:
time.sleep(0.5)
# trigger ufds array reallocation
for fd in rfds:
pollster.unregister(fd)
pollster.register(w, select.POLLOUT)
self.assertRaises(RuntimeError, pollster.poll)
finally:
# and make the call to poll() from the thread return
os.write(w, b'spam')
t.join()

# TODO: RUSTPYTHON add support for negative timeout
@unittest.expectedFailure
@unittest.skipUnless(threading, 'Threading required for this test.')
@threading_helper.reap_threads
def test_poll_blocks_with_negative_ms(self):
for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
# Create two file descriptors. This will be used to unlock
# the blocking call to poll.poll inside the thread
r, w = os.pipe()
pollster = select.poll()
pollster.register(r, select.POLLIN)

poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
poll_thread.start()
poll_thread.join(timeout=0.1)
self.assertTrue(poll_thread.is_alive())

# Write to the pipe so pollster.poll unblocks and the thread ends.
os.write(w, b'spam')
poll_thread.join()
self.assertFalse(poll_thread.is_alive())
os.close(r)
os.close(w)


if __name__ == '__main__':
unittest.main()