Skip to content

gh-110697: test_os TimerfdTests uses selectors #110789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 67 additions & 102 deletions Lib/test/test_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import pickle
import select
import selectors
import shutil
import signal
import socket
Expand Down Expand Up @@ -3940,6 +3941,11 @@ def timerfd_create(self, *args, **kwargs):
self.addCleanup(os.close, fd)
return fd

def read_count_signaled(self, fd):
# read 8 bytes
data = os.read(fd, 8)
return int.from_bytes(data, byteorder=sys.byteorder)

def test_timerfd_initval(self):
fd = self.timerfd_create(time.CLOCK_REALTIME)

Expand All @@ -3962,25 +3968,22 @@ def test_timerfd_initval(self):
self.assertAlmostEqual(next_expiration, initial_expiration, places=3)

def test_timerfd_non_blocking(self):
size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)

# 0.1 second later
initial_expiration = 0.1
_, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=0)
os.timerfd_settime(fd, initial=initial_expiration, interval=0)

# read() raises OSError with errno is EAGAIN for non-blocking timer.
with self.assertRaises(OSError) as ctx:
_ = os.read(fd, size)
self.read_count_signaled(fd)
self.assertEqual(ctx.exception.errno, errno.EAGAIN)

# Wait more than 0.1 seconds
time.sleep(initial_expiration + 0.1)

# confirm if timerfd is readable and read() returns 1 as bytes.
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)
self.assertEqual(self.read_count_signaled(fd), 1)

def test_timerfd_negative(self):
one_sec_in_nsec = 10**9
Expand All @@ -3995,25 +3998,24 @@ def test_timerfd_negative(self):
for flags in test_flags:
with self.subTest(flags=flags, initial=initial, interval=interval):
with self.assertRaises(OSError) as context:
_, _ = os.timerfd_settime(fd, flags=flags, initial=initial, interval=interval)
os.timerfd_settime(fd, flags=flags, initial=initial, interval=interval)
self.assertEqual(context.exception.errno, errno.EINVAL)

with self.assertRaises(OSError) as context:
initial_ns = int( one_sec_in_nsec * initial )
interval_ns = int( one_sec_in_nsec * interval )
_, _ = os.timerfd_settime_ns(fd, flags=flags, initial=initial_ns, interval=interval_ns)
os.timerfd_settime_ns(fd, flags=flags, initial=initial_ns, interval=interval_ns)
self.assertEqual(context.exception.errno, errno.EINVAL)

def test_timerfd_interval(self):
size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME)

# 1 second
initial_expiration = 1
# 0.5 second
interval = 0.5

_, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
os.timerfd_settime(fd, initial=initial_expiration, interval=interval)

# timerfd_gettime
next_expiration, interval2 = os.timerfd_gettime(fd)
Expand All @@ -4023,22 +4025,17 @@ def test_timerfd_interval(self):
count = 3
t = time.perf_counter()
for _ in range(count):
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)
self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter() - t

total_time = initial_expiration + interval * (count - 1)
self.assertGreater(t, total_time)

# wait 3.5 time of interval
time.sleep( (count+0.5) * interval)
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, count)
self.assertEqual(self.read_count_signaled(fd), count)

def test_timerfd_TFD_TIMER_ABSTIME(self):
size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME)

now = time.clock_gettime(time.CLOCK_REALTIME)
Expand All @@ -4049,7 +4046,7 @@ def test_timerfd_TFD_TIMER_ABSTIME(self):
# not interval timer
interval = 0

_, _ = os.timerfd_settime(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration, interval=interval)
os.timerfd_settime(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration, interval=interval)

# timerfd_gettime
# Note: timerfd_gettime returns relative values even if TFD_TIMER_ABSTIME is specified.
Expand All @@ -4058,15 +4055,13 @@ def test_timerfd_TFD_TIMER_ABSTIME(self):
self.assertAlmostEqual(next_expiration, offset, places=3)

t = time.perf_counter()
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
count_signaled = self.read_count_signaled(fd)
t = time.perf_counter() - t
self.assertEqual(count_signaled, 1)

self.assertGreater(t, offset - self.CLOCK_RES)

def test_timerfd_select(self):
size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)

rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
Expand All @@ -4077,56 +4072,74 @@ def test_timerfd_select(self):
# every 0.125 second
interval = 0.125

_, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
os.timerfd_settime(fd, initial=initial_expiration, interval=interval)

count = 3
t = time.perf_counter()
for _ in range(count):
rfd, wfd, xfd = select.select([fd], [fd], [fd], initial_expiration + interval)
self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)
self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter() - t

total_time = initial_expiration + interval * (count - 1)
self.assertGreater(t, total_time)

def test_timerfd_epoll(self):
size = 8 # read 8 bytes
def check_timerfd_poll(self, nanoseconds):
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)

ep = select.epoll()
ep.register(fd, select.EPOLLIN)
self.addCleanup(ep.close)
selector = selectors.DefaultSelector()
selector.register(fd, selectors.EVENT_READ)
self.addCleanup(selector.close)

sec_to_nsec = 10 ** 9
# 0.25 second
initial_expiration = 0.25
initial_expiration_ns = sec_to_nsec // 4
# every 0.125 second
interval = 0.125
interval_ns = sec_to_nsec // 8

_, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
if nanoseconds:
os.timerfd_settime_ns(fd,
initial=initial_expiration_ns,
interval=interval_ns)
else:
os.timerfd_settime(fd,
initial=initial_expiration_ns / sec_to_nsec,
interval=interval_ns / sec_to_nsec)

count = 3
t = time.perf_counter()
if nanoseconds:
t = time.perf_counter_ns()
else:
t = time.perf_counter()
for i in range(count):
timeout_margin = interval
timeout_margin_ns = interval_ns
if i == 0:
timeout = initial_expiration + interval + timeout_margin
timeout_ns = initial_expiration_ns + interval_ns + timeout_margin_ns
else:
timeout = interval + timeout_margin
# epoll timeout is in seconds.
events = ep.poll(timeout)
self.assertEqual(events, [(fd, select.EPOLLIN)])
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)
timeout_ns = interval_ns + timeout_margin_ns

t = time.perf_counter() - t
ready = selector.select(timeout_ns / sec_to_nsec)
self.assertEqual(len(ready), 1, ready)
event = ready[0][1]
self.assertEqual(event, selectors.EVENT_READ)

total_time = initial_expiration + interval * (count - 1)
self.assertGreater(t, total_time)
ep.unregister(fd)
self.assertEqual(self.read_count_signaled(fd), 1)

total_time = initial_expiration_ns + interval_ns * (count - 1)
if nanoseconds:
dt = time.perf_counter_ns() - t
self.assertGreater(dt, total_time)
else:
dt = time.perf_counter() - t
self.assertGreater(dt, total_time / sec_to_nsec)
selector.unregister(fd)

def test_timerfd_poll(self):
self.check_timerfd_poll(False)

def test_timerfd_ns_poll(self):
self.check_timerfd_poll(True)

def test_timerfd_ns_initval(self):
one_sec_in_nsec = 10**9
Expand All @@ -4153,7 +4166,6 @@ def test_timerfd_ns_initval(self):
self.assertAlmostEqual(next_expiration_ns, initial_expiration_ns, delta=limit_error)

def test_timerfd_ns_interval(self):
size = 8 # read 8 bytes
one_sec_in_nsec = 10**9
limit_error = one_sec_in_nsec // 10**3
fd = self.timerfd_create(time.CLOCK_REALTIME)
Expand All @@ -4163,7 +4175,7 @@ def test_timerfd_ns_interval(self):
# every 0.5 second
interval_ns = one_sec_in_nsec // 2

_, _ = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)

# timerfd_gettime
next_expiration_ns, interval_ns2 = os.timerfd_gettime_ns(fd)
Expand All @@ -4173,23 +4185,18 @@ def test_timerfd_ns_interval(self):
count = 3
t = time.perf_counter_ns()
for _ in range(count):
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)
self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter_ns() - t

total_time_ns = initial_expiration_ns + interval_ns * (count - 1)
self.assertGreater(t, total_time_ns)

# wait 3.5 time of interval
time.sleep( (count+0.5) * interval_ns / one_sec_in_nsec)
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, count)
self.assertEqual(self.read_count_signaled(fd), count)


def test_timerfd_ns_TFD_TIMER_ABSTIME(self):
size = 8 # read 8 bytes
one_sec_in_nsec = 10**9
limit_error = one_sec_in_nsec // 10**3
fd = self.timerfd_create(time.CLOCK_REALTIME)
Expand All @@ -4202,7 +4209,7 @@ def test_timerfd_ns_TFD_TIMER_ABSTIME(self):
# not interval timer
interval_ns = 0

_, _ = os.timerfd_settime_ns(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration_ns, interval=interval_ns)
os.timerfd_settime_ns(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration_ns, interval=interval_ns)

# timerfd_gettime
# Note: timerfd_gettime returns relative values even if TFD_TIMER_ABSTIME is specified.
Expand All @@ -4211,15 +4218,13 @@ def test_timerfd_ns_TFD_TIMER_ABSTIME(self):
self.assertLess(abs(next_expiration_ns - offset_ns), limit_error)

t = time.perf_counter_ns()
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
count_signaled = self.read_count_signaled(fd)
t = time.perf_counter_ns() - t
self.assertEqual(count_signaled, 1)

self.assertGreater(t, offset_ns - self.CLOCK_RES_NS)

def test_timerfd_ns_select(self):
size = 8 # read 8 bytes
one_sec_in_nsec = 10**9

fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)
Expand All @@ -4232,59 +4237,19 @@ def test_timerfd_ns_select(self):
# every 0.125 second
interval_ns = one_sec_in_nsec // 8

_, _ = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)

count = 3
t = time.perf_counter_ns()
for _ in range(count):
rfd, wfd, xfd = select.select([fd], [fd], [fd], (initial_expiration_ns + interval_ns) / 1e9 )
self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)
self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter_ns() - t

total_time_ns = initial_expiration_ns + interval_ns * (count - 1)
self.assertGreater(t, total_time_ns)

def test_timerfd_ns_epoll(self):
size = 8 # read 8 bytes
one_sec_in_nsec = 10**9
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)

ep = select.epoll()
ep.register(fd, select.EPOLLIN)
self.addCleanup(ep.close)

# 0.25 second
initial_expiration_ns = one_sec_in_nsec // 4
# every 0.125 second
interval_ns = one_sec_in_nsec // 8

_, _ = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)

count = 3
t = time.perf_counter_ns()
for i in range(count):
timeout_margin_ns = interval_ns
if i == 0:
timeout_ns = initial_expiration_ns + interval_ns + timeout_margin_ns
else:
timeout_ns = interval_ns + timeout_margin_ns

# epoll timeout is in seconds.
events = ep.poll(timeout_ns / one_sec_in_nsec)
self.assertEqual(events, [(fd, select.EPOLLIN)])
n = os.read(fd, size)
count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
self.assertEqual(count_signaled, 1)

t = time.perf_counter_ns() - t

total_time = initial_expiration_ns + interval_ns * (count - 1)
self.assertGreater(t, total_time)
ep.unregister(fd)

class OSErrorTests(unittest.TestCase):
def setUp(self):
class Str(str):
Expand Down