Skip to content

Commit be54e89

Browse files
committed
Add test_poll.py from Cpython v3.11.2
1 parent 5f17d28 commit be54e89

File tree

1 file changed

+241
-0
lines changed

1 file changed

+241
-0
lines changed

Lib/test/test_poll.py

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
# Test case for the os.poll() function
2+
3+
import os
4+
import subprocess
5+
import random
6+
import select
7+
import threading
8+
import time
9+
import unittest
10+
from test.support import (
11+
cpython_only, requires_subprocess, requires_working_socket
12+
)
13+
from test.support import threading_helper
14+
from test.support.os_helper import TESTFN
15+
16+
17+
try:
18+
select.poll
19+
except AttributeError:
20+
raise unittest.SkipTest("select.poll not defined")
21+
22+
requires_working_socket(module=True)
23+
24+
def find_ready_matching(ready, flag):
25+
match = []
26+
for fd, mode in ready:
27+
if mode & flag:
28+
match.append(fd)
29+
return match
30+
31+
class PollTests(unittest.TestCase):
32+
33+
def test_poll1(self):
34+
# Basic functional test of poll object
35+
# Create a bunch of pipe and test that poll works with them.
36+
37+
p = select.poll()
38+
39+
NUM_PIPES = 12
40+
MSG = b" This is a test."
41+
MSG_LEN = len(MSG)
42+
readers = []
43+
writers = []
44+
r2w = {}
45+
w2r = {}
46+
47+
for i in range(NUM_PIPES):
48+
rd, wr = os.pipe()
49+
p.register(rd)
50+
p.modify(rd, select.POLLIN)
51+
p.register(wr, select.POLLOUT)
52+
readers.append(rd)
53+
writers.append(wr)
54+
r2w[rd] = wr
55+
w2r[wr] = rd
56+
57+
bufs = []
58+
59+
while writers:
60+
ready = p.poll()
61+
ready_writers = find_ready_matching(ready, select.POLLOUT)
62+
if not ready_writers:
63+
raise RuntimeError("no pipes ready for writing")
64+
wr = random.choice(ready_writers)
65+
os.write(wr, MSG)
66+
67+
ready = p.poll()
68+
ready_readers = find_ready_matching(ready, select.POLLIN)
69+
if not ready_readers:
70+
raise RuntimeError("no pipes ready for reading")
71+
rd = random.choice(ready_readers)
72+
buf = os.read(rd, MSG_LEN)
73+
self.assertEqual(len(buf), MSG_LEN)
74+
bufs.append(buf)
75+
os.close(r2w[rd]) ; os.close( rd )
76+
p.unregister( r2w[rd] )
77+
p.unregister( rd )
78+
writers.remove(r2w[rd])
79+
80+
self.assertEqual(bufs, [MSG] * NUM_PIPES)
81+
82+
def test_poll_unit_tests(self):
83+
# returns NVAL for invalid file descriptor
84+
FD, w = os.pipe()
85+
os.close(FD)
86+
os.close(w)
87+
p = select.poll()
88+
p.register(FD)
89+
r = p.poll()
90+
self.assertEqual(r[0], (FD, select.POLLNVAL))
91+
92+
with open(TESTFN, 'w') as f:
93+
fd = f.fileno()
94+
p = select.poll()
95+
p.register(f)
96+
r = p.poll()
97+
self.assertEqual(r[0][0], fd)
98+
r = p.poll()
99+
self.assertEqual(r[0], (fd, select.POLLNVAL))
100+
os.unlink(TESTFN)
101+
102+
# type error for invalid arguments
103+
p = select.poll()
104+
self.assertRaises(TypeError, p.register, p)
105+
self.assertRaises(TypeError, p.unregister, p)
106+
107+
# can't unregister non-existent object
108+
p = select.poll()
109+
self.assertRaises(KeyError, p.unregister, 3)
110+
111+
# Test error cases
112+
pollster = select.poll()
113+
class Nope:
114+
pass
115+
116+
class Almost:
117+
def fileno(self):
118+
return 'fileno'
119+
120+
self.assertRaises(TypeError, pollster.register, Nope(), 0)
121+
self.assertRaises(TypeError, pollster.register, Almost(), 0)
122+
123+
# Another test case for poll(). This is copied from the test case for
124+
# select(), modified to use poll() instead.
125+
126+
@requires_subprocess()
127+
def test_poll2(self):
128+
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
129+
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
130+
bufsize=0)
131+
self.enterContext(proc)
132+
p = proc.stdout
133+
pollster = select.poll()
134+
pollster.register( p, select.POLLIN )
135+
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
136+
fdlist = pollster.poll(tout)
137+
if (fdlist == []):
138+
continue
139+
fd, flags = fdlist[0]
140+
if flags & select.POLLHUP:
141+
line = p.readline()
142+
if line != b"":
143+
self.fail('error: pipe seems to be closed, but still returns data')
144+
continue
145+
146+
elif flags & select.POLLIN:
147+
line = p.readline()
148+
if not line:
149+
break
150+
self.assertEqual(line, b'testing...\n')
151+
continue
152+
else:
153+
self.fail('Unexpected return value from select.poll: %s' % fdlist)
154+
155+
# TODO: RUSTPYTHON int overflow
156+
@unittest.expectedFailure
157+
def test_poll3(self):
158+
# test int overflow
159+
pollster = select.poll()
160+
pollster.register(1)
161+
162+
self.assertRaises(OverflowError, pollster.poll, 1 << 64)
163+
164+
x = 2 + 3
165+
if x != 5:
166+
self.fail('Overflow must have occurred')
167+
168+
# Issues #15989, #17919
169+
self.assertRaises(ValueError, pollster.register, 0, -1)
170+
self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
171+
self.assertRaises(ValueError, pollster.modify, 1, -1)
172+
self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
173+
174+
@cpython_only
175+
def test_poll_c_limits(self):
176+
from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
177+
pollster = select.poll()
178+
pollster.register(1)
179+
180+
# Issues #15989, #17919
181+
self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
182+
self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
183+
self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
184+
self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
185+
186+
@unittest.skip("TODO: RUSTPYTHON")
187+
@threading_helper.reap_threads
188+
def test_threaded_poll(self):
189+
r, w = os.pipe()
190+
self.addCleanup(os.close, r)
191+
self.addCleanup(os.close, w)
192+
rfds = []
193+
for i in range(10):
194+
fd = os.dup(r)
195+
self.addCleanup(os.close, fd)
196+
rfds.append(fd)
197+
pollster = select.poll()
198+
for fd in rfds:
199+
pollster.register(fd, select.POLLIN)
200+
201+
t = threading.Thread(target=pollster.poll)
202+
t.start()
203+
try:
204+
time.sleep(0.5)
205+
# trigger ufds array reallocation
206+
for fd in rfds:
207+
pollster.unregister(fd)
208+
pollster.register(w, select.POLLOUT)
209+
self.assertRaises(RuntimeError, pollster.poll)
210+
finally:
211+
# and make the call to poll() from the thread return
212+
os.write(w, b'spam')
213+
t.join()
214+
215+
# TODO: RUSTPYTHON
216+
@unittest.expectedFailure
217+
@unittest.skipUnless(threading, 'Threading required for this test.')
218+
@threading_helper.reap_threads
219+
def test_poll_blocks_with_negative_ms(self):
220+
for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
221+
# Create two file descriptors. This will be used to unlock
222+
# the blocking call to poll.poll inside the thread
223+
r, w = os.pipe()
224+
pollster = select.poll()
225+
pollster.register(r, select.POLLIN)
226+
227+
poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
228+
poll_thread.start()
229+
poll_thread.join(timeout=0.1)
230+
self.assertTrue(poll_thread.is_alive())
231+
232+
# Write to the pipe so pollster.poll unblocks and the thread ends.
233+
os.write(w, b'spam')
234+
poll_thread.join()
235+
self.assertFalse(poll_thread.is_alive())
236+
os.close(r)
237+
os.close(w)
238+
239+
240+
if __name__ == '__main__':
241+
unittest.main()

0 commit comments

Comments
 (0)