Skip to content

Commit e1d62e0

Browse files
andr-04asvetlov
authored andcommitted
bpo-32015: Asyncio looping during simultaneously socket read/write an… (python#4386)
* bpo-32015: Asyncio cycling during simultaneously socket read/write and reconnection * Tests fix * Tests fix * News add * Add new unit tests
1 parent 56935a5 commit e1d62e0

File tree

3 files changed

+79
-38
lines changed

3 files changed

+79
-38
lines changed

Lib/asyncio/selector_events.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -370,25 +370,25 @@ def sock_recv(self, sock, n):
370370
if self._debug and sock.gettimeout() != 0:
371371
raise ValueError("the socket must be non-blocking")
372372
fut = self.create_future()
373-
self._sock_recv(fut, False, sock, n)
373+
self._sock_recv(fut, None, sock, n)
374374
return fut
375375

376-
def _sock_recv(self, fut, registered, sock, n):
376+
def _sock_recv(self, fut, registered_fd, sock, n):
377377
# _sock_recv() can add itself as an I/O callback if the operation can't
378378
# be done immediately. Don't use it directly, call sock_recv().
379-
fd = sock.fileno()
380-
if registered:
379+
if registered_fd is not None:
381380
# Remove the callback early. It should be rare that the
382381
# selector says the fd is ready but the call still returns
383382
# EAGAIN, and I am willing to take a hit in that case in
384383
# order to simplify the common case.
385-
self.remove_reader(fd)
384+
self.remove_reader(registered_fd)
386385
if fut.cancelled():
387386
return
388387
try:
389388
data = sock.recv(n)
390389
except (BlockingIOError, InterruptedError):
391-
self.add_reader(fd, self._sock_recv, fut, True, sock, n)
390+
fd = sock.fileno()
391+
self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
392392
except Exception as exc:
393393
fut.set_exception(exc)
394394
else:
@@ -405,25 +405,25 @@ def sock_recv_into(self, sock, buf):
405405
if self._debug and sock.gettimeout() != 0:
406406
raise ValueError("the socket must be non-blocking")
407407
fut = self.create_future()
408-
self._sock_recv_into(fut, False, sock, buf)
408+
self._sock_recv_into(fut, None, sock, buf)
409409
return fut
410410

411-
def _sock_recv_into(self, fut, registered, sock, buf):
411+
def _sock_recv_into(self, fut, registered_fd, sock, buf):
412412
# _sock_recv_into() can add itself as an I/O callback if the operation
413413
# can't be done immediately. Don't use it directly, call sock_recv_into().
414-
fd = sock.fileno()
415-
if registered:
414+
if registered_fd is not None:
416415
# Remove the callback early. It should be rare that the
417416
# selector says the fd is ready but the call still returns
418417
# EAGAIN, and I am willing to take a hit in that case in
419418
# order to simplify the common case.
420-
self.remove_reader(fd)
419+
self.remove_reader(registered_fd)
421420
if fut.cancelled():
422421
return
423422
try:
424423
nbytes = sock.recv_into(buf)
425424
except (BlockingIOError, InterruptedError):
426-
self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
425+
fd = sock.fileno()
426+
self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
427427
except Exception as exc:
428428
fut.set_exception(exc)
429429
else:
@@ -444,16 +444,14 @@ def sock_sendall(self, sock, data):
444444
raise ValueError("the socket must be non-blocking")
445445
fut = self.create_future()
446446
if data:
447-
self._sock_sendall(fut, False, sock, data)
447+
self._sock_sendall(fut, None, sock, data)
448448
else:
449449
fut.set_result(None)
450450
return fut
451451

452-
def _sock_sendall(self, fut, registered, sock, data):
453-
fd = sock.fileno()
454-
455-
if registered:
456-
self.remove_writer(fd)
452+
def _sock_sendall(self, fut, registered_fd, sock, data):
453+
if registered_fd is not None:
454+
self.remove_writer(registered_fd)
457455
if fut.cancelled():
458456
return
459457

@@ -470,7 +468,8 @@ def _sock_sendall(self, fut, registered, sock, data):
470468
else:
471469
if n:
472470
data = data[n:]
473-
self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
471+
fd = sock.fileno()
472+
self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
474473

475474
@coroutine
476475
def sock_connect(self, sock, address):

Lib/test/test_asyncio/test_selector_events.py

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,35 @@ def test_sock_recv(self):
182182

183183
f = self.loop.sock_recv(sock, 1024)
184184
self.assertIsInstance(f, asyncio.Future)
185-
self.loop._sock_recv.assert_called_with(f, False, sock, 1024)
185+
self.loop._sock_recv.assert_called_with(f, None, sock, 1024)
186+
187+
def test_sock_recv_reconnection(self):
188+
sock = mock.Mock()
189+
sock.fileno.return_value = 10
190+
sock.recv.side_effect = BlockingIOError
191+
192+
self.loop.add_reader = mock.Mock()
193+
self.loop.remove_reader = mock.Mock()
194+
fut = self.loop.sock_recv(sock, 1024)
195+
callback = self.loop.add_reader.call_args[0][1]
196+
params = self.loop.add_reader.call_args[0][2:]
197+
198+
# emulate the old socket has closed, but the new one has
199+
# the same fileno, so callback is called with old (closed) socket
200+
sock.fileno.return_value = -1
201+
sock.recv.side_effect = OSError(9)
202+
callback(*params)
203+
204+
self.assertIsInstance(fut.exception(), OSError)
205+
self.assertEqual((10,), self.loop.remove_reader.call_args[0])
186206

187207
def test__sock_recv_canceled_fut(self):
188208
sock = mock.Mock()
189209

190210
f = asyncio.Future(loop=self.loop)
191211
f.cancel()
192212

193-
self.loop._sock_recv(f, False, sock, 1024)
213+
self.loop._sock_recv(f, None, sock, 1024)
194214
self.assertFalse(sock.recv.called)
195215

196216
def test__sock_recv_unregister(self):
@@ -201,7 +221,7 @@ def test__sock_recv_unregister(self):
201221
f.cancel()
202222

203223
self.loop.remove_reader = mock.Mock()
204-
self.loop._sock_recv(f, True, sock, 1024)
224+
self.loop._sock_recv(f, 10, sock, 1024)
205225
self.assertEqual((10,), self.loop.remove_reader.call_args[0])
206226

207227
def test__sock_recv_tryagain(self):
@@ -211,8 +231,8 @@ def test__sock_recv_tryagain(self):
211231
sock.recv.side_effect = BlockingIOError
212232

213233
self.loop.add_reader = mock.Mock()
214-
self.loop._sock_recv(f, False, sock, 1024)
215-
self.assertEqual((10, self.loop._sock_recv, f, True, sock, 1024),
234+
self.loop._sock_recv(f, None, sock, 1024)
235+
self.assertEqual((10, self.loop._sock_recv, f, 10, sock, 1024),
216236
self.loop.add_reader.call_args[0])
217237

218238
def test__sock_recv_exception(self):
@@ -221,7 +241,7 @@ def test__sock_recv_exception(self):
221241
sock.fileno.return_value = 10
222242
err = sock.recv.side_effect = OSError()
223243

224-
self.loop._sock_recv(f, False, sock, 1024)
244+
self.loop._sock_recv(f, None, sock, 1024)
225245
self.assertIs(err, f.exception())
226246

227247
def test_sock_sendall(self):
@@ -231,7 +251,7 @@ def test_sock_sendall(self):
231251
f = self.loop.sock_sendall(sock, b'data')
232252
self.assertIsInstance(f, asyncio.Future)
233253
self.assertEqual(
234-
(f, False, sock, b'data'),
254+
(f, None, sock, b'data'),
235255
self.loop._sock_sendall.call_args[0])
236256

237257
def test_sock_sendall_nodata(self):
@@ -244,13 +264,33 @@ def test_sock_sendall_nodata(self):
244264
self.assertIsNone(f.result())
245265
self.assertFalse(self.loop._sock_sendall.called)
246266

267+
def test_sock_sendall_reconnection(self):
268+
sock = mock.Mock()
269+
sock.fileno.return_value = 10
270+
sock.send.side_effect = BlockingIOError
271+
272+
self.loop.add_writer = mock.Mock()
273+
self.loop.remove_writer = mock.Mock()
274+
fut = self.loop.sock_sendall(sock, b'data')
275+
callback = self.loop.add_writer.call_args[0][1]
276+
params = self.loop.add_writer.call_args[0][2:]
277+
278+
# emulate the old socket has closed, but the new one has
279+
# the same fileno, so callback is called with old (closed) socket
280+
sock.fileno.return_value = -1
281+
sock.send.side_effect = OSError(9)
282+
callback(*params)
283+
284+
self.assertIsInstance(fut.exception(), OSError)
285+
self.assertEqual((10,), self.loop.remove_writer.call_args[0])
286+
247287
def test__sock_sendall_canceled_fut(self):
248288
sock = mock.Mock()
249289

250290
f = asyncio.Future(loop=self.loop)
251291
f.cancel()
252292

253-
self.loop._sock_sendall(f, False, sock, b'data')
293+
self.loop._sock_sendall(f, None, sock, b'data')
254294
self.assertFalse(sock.send.called)
255295

256296
def test__sock_sendall_unregister(self):
@@ -261,7 +301,7 @@ def test__sock_sendall_unregister(self):
261301
f.cancel()
262302

263303
self.loop.remove_writer = mock.Mock()
264-
self.loop._sock_sendall(f, True, sock, b'data')
304+
self.loop._sock_sendall(f, 10, sock, b'data')
265305
self.assertEqual((10,), self.loop.remove_writer.call_args[0])
266306

267307
def test__sock_sendall_tryagain(self):
@@ -271,9 +311,9 @@ def test__sock_sendall_tryagain(self):
271311
sock.send.side_effect = BlockingIOError
272312

273313
self.loop.add_writer = mock.Mock()
274-
self.loop._sock_sendall(f, False, sock, b'data')
314+
self.loop._sock_sendall(f, None, sock, b'data')
275315
self.assertEqual(
276-
(10, self.loop._sock_sendall, f, True, sock, b'data'),
316+
(10, self.loop._sock_sendall, f, 10, sock, b'data'),
277317
self.loop.add_writer.call_args[0])
278318

279319
def test__sock_sendall_interrupted(self):
@@ -283,9 +323,9 @@ def test__sock_sendall_interrupted(self):
283323
sock.send.side_effect = InterruptedError
284324

285325
self.loop.add_writer = mock.Mock()
286-
self.loop._sock_sendall(f, False, sock, b'data')
326+
self.loop._sock_sendall(f, None, sock, b'data')
287327
self.assertEqual(
288-
(10, self.loop._sock_sendall, f, True, sock, b'data'),
328+
(10, self.loop._sock_sendall, f, 10, sock, b'data'),
289329
self.loop.add_writer.call_args[0])
290330

291331
def test__sock_sendall_exception(self):
@@ -294,7 +334,7 @@ def test__sock_sendall_exception(self):
294334
sock.fileno.return_value = 10
295335
err = sock.send.side_effect = OSError()
296336

297-
self.loop._sock_sendall(f, False, sock, b'data')
337+
self.loop._sock_sendall(f, None, sock, b'data')
298338
self.assertIs(f.exception(), err)
299339

300340
def test__sock_sendall(self):
@@ -304,7 +344,7 @@ def test__sock_sendall(self):
304344
sock.fileno.return_value = 10
305345
sock.send.return_value = 4
306346

307-
self.loop._sock_sendall(f, False, sock, b'data')
347+
self.loop._sock_sendall(f, None, sock, b'data')
308348
self.assertTrue(f.done())
309349
self.assertIsNone(f.result())
310350

@@ -316,10 +356,10 @@ def test__sock_sendall_partial(self):
316356
sock.send.return_value = 2
317357

318358
self.loop.add_writer = mock.Mock()
319-
self.loop._sock_sendall(f, False, sock, b'data')
359+
self.loop._sock_sendall(f, None, sock, b'data')
320360
self.assertFalse(f.done())
321361
self.assertEqual(
322-
(10, self.loop._sock_sendall, f, True, sock, b'ta'),
362+
(10, self.loop._sock_sendall, f, 10, sock, b'ta'),
323363
self.loop.add_writer.call_args[0])
324364

325365
def test__sock_sendall_none(self):
@@ -330,10 +370,10 @@ def test__sock_sendall_none(self):
330370
sock.send.return_value = 0
331371

332372
self.loop.add_writer = mock.Mock()
333-
self.loop._sock_sendall(f, False, sock, b'data')
373+
self.loop._sock_sendall(f, None, sock, b'data')
334374
self.assertFalse(f.done())
335375
self.assertEqual(
336-
(10, self.loop._sock_sendall, f, True, sock, b'data'),
376+
(10, self.loop._sock_sendall, f, 10, sock, b'data'),
337377
self.loop.add_writer.call_args[0])
338378

339379
def test_sock_connect_timeout(self):
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fixed the looping of asyncio in the case of reconnection the socket during
2+
waiting async read/write from/to the socket.

0 commit comments

Comments
 (0)