Skip to content

Commit f0a3842

Browse files
committed
Add test_kqueue.py from Cpython v3.11.2
1 parent 6c6290d commit f0a3842

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed

Lib/test/test_kqueue.py

+261
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
"""
2+
Tests for kqueue wrapper.
3+
"""
4+
import errno
5+
import os
6+
import select
7+
import socket
8+
import time
9+
import unittest
10+
11+
if not hasattr(select, "kqueue"):
12+
raise unittest.SkipTest("test works only on BSD")
13+
14+
class TestKQueue(unittest.TestCase):
15+
def test_create_queue(self):
16+
kq = select.kqueue()
17+
self.assertTrue(kq.fileno() > 0, kq.fileno())
18+
self.assertTrue(not kq.closed)
19+
kq.close()
20+
self.assertTrue(kq.closed)
21+
self.assertRaises(ValueError, kq.fileno)
22+
23+
def test_create_event(self):
24+
from operator import lt, le, gt, ge
25+
26+
fd = os.open(os.devnull, os.O_WRONLY)
27+
self.addCleanup(os.close, fd)
28+
29+
ev = select.kevent(fd)
30+
other = select.kevent(1000)
31+
self.assertEqual(ev.ident, fd)
32+
self.assertEqual(ev.filter, select.KQ_FILTER_READ)
33+
self.assertEqual(ev.flags, select.KQ_EV_ADD)
34+
self.assertEqual(ev.fflags, 0)
35+
self.assertEqual(ev.data, 0)
36+
self.assertEqual(ev.udata, 0)
37+
self.assertEqual(ev, ev)
38+
self.assertNotEqual(ev, other)
39+
self.assertTrue(ev < other)
40+
self.assertTrue(other >= ev)
41+
for op in lt, le, gt, ge:
42+
self.assertRaises(TypeError, op, ev, None)
43+
self.assertRaises(TypeError, op, ev, 1)
44+
self.assertRaises(TypeError, op, ev, "ev")
45+
46+
ev = select.kevent(fd, select.KQ_FILTER_WRITE)
47+
self.assertEqual(ev.ident, fd)
48+
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
49+
self.assertEqual(ev.flags, select.KQ_EV_ADD)
50+
self.assertEqual(ev.fflags, 0)
51+
self.assertEqual(ev.data, 0)
52+
self.assertEqual(ev.udata, 0)
53+
self.assertEqual(ev, ev)
54+
self.assertNotEqual(ev, other)
55+
56+
ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
57+
self.assertEqual(ev.ident, fd)
58+
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
59+
self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
60+
self.assertEqual(ev.fflags, 0)
61+
self.assertEqual(ev.data, 0)
62+
self.assertEqual(ev.udata, 0)
63+
self.assertEqual(ev, ev)
64+
self.assertNotEqual(ev, other)
65+
66+
ev = select.kevent(1, 2, 3, 4, 5, 6)
67+
self.assertEqual(ev.ident, 1)
68+
self.assertEqual(ev.filter, 2)
69+
self.assertEqual(ev.flags, 3)
70+
self.assertEqual(ev.fflags, 4)
71+
self.assertEqual(ev.data, 5)
72+
self.assertEqual(ev.udata, 6)
73+
self.assertEqual(ev, ev)
74+
self.assertNotEqual(ev, other)
75+
76+
bignum = 0x7fff
77+
ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
78+
self.assertEqual(ev.ident, bignum)
79+
self.assertEqual(ev.filter, 1)
80+
self.assertEqual(ev.flags, 2)
81+
self.assertEqual(ev.fflags, 3)
82+
self.assertEqual(ev.data, bignum - 1)
83+
self.assertEqual(ev.udata, bignum)
84+
self.assertEqual(ev, ev)
85+
self.assertNotEqual(ev, other)
86+
87+
# Issue 11973
88+
bignum = 0xffff
89+
ev = select.kevent(0, 1, bignum)
90+
self.assertEqual(ev.ident, 0)
91+
self.assertEqual(ev.filter, 1)
92+
self.assertEqual(ev.flags, bignum)
93+
self.assertEqual(ev.fflags, 0)
94+
self.assertEqual(ev.data, 0)
95+
self.assertEqual(ev.udata, 0)
96+
self.assertEqual(ev, ev)
97+
self.assertNotEqual(ev, other)
98+
99+
# Issue 11973
100+
bignum = 0xffffffff
101+
ev = select.kevent(0, 1, 2, bignum)
102+
self.assertEqual(ev.ident, 0)
103+
self.assertEqual(ev.filter, 1)
104+
self.assertEqual(ev.flags, 2)
105+
self.assertEqual(ev.fflags, bignum)
106+
self.assertEqual(ev.data, 0)
107+
self.assertEqual(ev.udata, 0)
108+
self.assertEqual(ev, ev)
109+
self.assertNotEqual(ev, other)
110+
111+
112+
def test_queue_event(self):
113+
serverSocket = socket.create_server(('127.0.0.1', 0))
114+
client = socket.socket()
115+
client.setblocking(False)
116+
try:
117+
client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
118+
except OSError as e:
119+
self.assertEqual(e.args[0], errno.EINPROGRESS)
120+
else:
121+
#raise AssertionError("Connect should have raised EINPROGRESS")
122+
pass # FreeBSD doesn't raise an exception here
123+
server, addr = serverSocket.accept()
124+
125+
kq = select.kqueue()
126+
kq2 = select.kqueue.fromfd(kq.fileno())
127+
128+
ev = select.kevent(server.fileno(),
129+
select.KQ_FILTER_WRITE,
130+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
131+
kq.control([ev], 0)
132+
ev = select.kevent(server.fileno(),
133+
select.KQ_FILTER_READ,
134+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
135+
kq.control([ev], 0)
136+
ev = select.kevent(client.fileno(),
137+
select.KQ_FILTER_WRITE,
138+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
139+
kq2.control([ev], 0)
140+
ev = select.kevent(client.fileno(),
141+
select.KQ_FILTER_READ,
142+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
143+
kq2.control([ev], 0)
144+
145+
events = kq.control(None, 4, 1)
146+
events = set((e.ident, e.filter) for e in events)
147+
self.assertEqual(events, set([
148+
(client.fileno(), select.KQ_FILTER_WRITE),
149+
(server.fileno(), select.KQ_FILTER_WRITE)]))
150+
151+
client.send(b"Hello!")
152+
server.send(b"world!!!")
153+
154+
# We may need to call it several times
155+
for i in range(10):
156+
events = kq.control(None, 4, 1)
157+
if len(events) == 4:
158+
break
159+
time.sleep(1.0)
160+
else:
161+
self.fail('timeout waiting for event notifications')
162+
163+
events = set((e.ident, e.filter) for e in events)
164+
self.assertEqual(events, set([
165+
(client.fileno(), select.KQ_FILTER_WRITE),
166+
(client.fileno(), select.KQ_FILTER_READ),
167+
(server.fileno(), select.KQ_FILTER_WRITE),
168+
(server.fileno(), select.KQ_FILTER_READ)]))
169+
170+
# Remove completely client, and server read part
171+
ev = select.kevent(client.fileno(),
172+
select.KQ_FILTER_WRITE,
173+
select.KQ_EV_DELETE)
174+
kq.control([ev], 0)
175+
ev = select.kevent(client.fileno(),
176+
select.KQ_FILTER_READ,
177+
select.KQ_EV_DELETE)
178+
kq.control([ev], 0)
179+
ev = select.kevent(server.fileno(),
180+
select.KQ_FILTER_READ,
181+
select.KQ_EV_DELETE)
182+
kq.control([ev], 0, 0)
183+
184+
events = kq.control([], 4, 0.99)
185+
events = set((e.ident, e.filter) for e in events)
186+
self.assertEqual(events, set([
187+
(server.fileno(), select.KQ_FILTER_WRITE)]))
188+
189+
client.close()
190+
server.close()
191+
serverSocket.close()
192+
193+
def testPair(self):
194+
kq = select.kqueue()
195+
a, b = socket.socketpair()
196+
197+
a.send(b'foo')
198+
event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
199+
event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
200+
r = kq.control([event1, event2], 1, 1)
201+
self.assertTrue(r)
202+
self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
203+
self.assertEqual(b.recv(r[0].data), b'foo')
204+
205+
a.close()
206+
b.close()
207+
kq.close()
208+
209+
def test_issue30058(self):
210+
# changelist must be an iterable
211+
kq = select.kqueue()
212+
a, b = socket.socketpair()
213+
ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
214+
215+
kq.control([ev], 0)
216+
# not a list
217+
kq.control((ev,), 0)
218+
# __len__ is not consistent with __iter__
219+
class BadList:
220+
def __len__(self):
221+
return 0
222+
def __iter__(self):
223+
for i in range(100):
224+
yield ev
225+
kq.control(BadList(), 0)
226+
# doesn't have __len__
227+
kq.control(iter([ev]), 0)
228+
229+
a.close()
230+
b.close()
231+
kq.close()
232+
233+
def test_close(self):
234+
open_file = open(__file__, "rb")
235+
self.addCleanup(open_file.close)
236+
fd = open_file.fileno()
237+
kqueue = select.kqueue()
238+
239+
# test fileno() method and closed attribute
240+
self.assertIsInstance(kqueue.fileno(), int)
241+
self.assertFalse(kqueue.closed)
242+
243+
# test close()
244+
kqueue.close()
245+
self.assertTrue(kqueue.closed)
246+
self.assertRaises(ValueError, kqueue.fileno)
247+
248+
# close() can be called more than once
249+
kqueue.close()
250+
251+
# operations must fail with ValueError("I/O operation on closed ...")
252+
self.assertRaises(ValueError, kqueue.control, None, 4)
253+
254+
def test_fd_non_inheritable(self):
255+
kqueue = select.kqueue()
256+
self.addCleanup(kqueue.close)
257+
self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
258+
259+
260+
if __name__ == "__main__":
261+
unittest.main()

0 commit comments

Comments
 (0)