Skip to content

bpo-33654: Support BufferedProtocol in set_protocol() and start_tls() #7130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 28, 2018
Prev Previous commit
Next Next commit
Fix proactor bugs
  • Loading branch information
1st1 committed May 27, 2018
commit 0238da4f3e9c0768de2ad324f30f53e6f451042c
22 changes: 18 additions & 4 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,9 @@ def set_protocol(self, protocol):

super().set_protocol(protocol)

if not self._paused:
if self.is_reading():
# reset reading callback / buffers / self._read_fut
self.pause_reading()
self._reschedule_on_resume = True
self.resume_reading()

def is_reading(self):
Expand All @@ -190,6 +189,13 @@ def pause_reading(self):
self._paused = True

if self._read_fut is not None and not self._read_fut.done():
# TODO: This is an ugly hack to cancel the current read future
# *and* avoid potential race conditions, as read cancellation
# goes through `future.cancel()` and `loop.call_soon()`.
# We then this special attribute in the reader callback to
# exit *immediately* without doing any cleanup/rescheduling.
self._read_fut.__asyncio_cancelled_on_pause__ = True

self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True
Expand Down Expand Up @@ -225,6 +231,10 @@ def _loop_reading(self, fut=None):
self._loop_reading_cb(fut)

def _loop_reading__data_received(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return
Expand Down Expand Up @@ -267,14 +277,18 @@ def _loop_reading__data_received(self, fut):
if not self._closing:
raise
else:
self._read_fut.add_done_callback(self._loop_reading)
self._read_fut.add_done_callback(self._loop_reading__data_received)
finally:
if data:
self._protocol.data_received(data)
elif data == b'':
self._loop_reading__on_eof()

def _loop_reading__get_buffer(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return
Expand Down Expand Up @@ -335,7 +349,7 @@ def _loop_reading__get_buffer(self, fut):
try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading)
self._read_fut.add_done_callback(self._loop_reading__get_buffer)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand Down
6 changes: 3 additions & 3 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2095,7 +2095,7 @@ async def connect(cmd=None, **kwds):

class SendfileBase:

DATA = b"12345abcde" * 16 * 1024 # 160 KiB
DATA = b"12345abcde" * 64 * 1024 # 64 KiB (don't use smaller sizes)

@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -2452,7 +2452,7 @@ def test_sendfile_ssl_close_peer_after_receiving(self):
self.assertEqual(srv_proto.data, self.DATA)
self.assertEqual(self.file.tell(), len(self.DATA))

def test_sendfile_close_peer_in_middle_of_receiving(self):
def test_sendfile_close_peer_in_the_middle_of_receiving(self):
srv_proto, cli_proto = self.prepare_sendfile(close_after=1024)
with self.assertRaises(ConnectionError):
self.run_loop(
Expand All @@ -2465,7 +2465,7 @@ def test_sendfile_close_peer_in_middle_of_receiving(self):
self.file.tell())
self.assertTrue(cli_proto.transport.is_closing())

def test_sendfile_fallback_close_peer_in_middle_of_receiving(self):
def test_sendfile_fallback_close_peer_in_the_middle_of_receiving(self):

def sendfile_native(transp, file, offset, count):
# to raise SendfileNotAvailableError
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ def test_proto_type_switch(self):
buf_proto.get_buffer.side_effect = lambda hint: buf

tr.set_protocol(buf_proto)
test_utils.run_briefly(self.loop)
res = asyncio.Future(loop=self.loop)
res.set_result(4)

Expand All @@ -556,6 +557,7 @@ def test_proto_buf_switch(self):
buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
buf = bytearray(4)
buf_proto.get_buffer.side_effect = lambda hint: buf
tr._read_fut.done.side_effect = lambda: False
tr.set_protocol(buf_proto)
self.assertFalse(buf_proto.get_buffer.called)
test_utils.run_briefly(self.loop)
Expand Down
1 change: 0 additions & 1 deletion Lib/test/test_asyncio/test_sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ def new_loop(self):

@unittest.skipIf(ssl is None, 'No ssl module')
@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
@unittest.skipIf(os.environ.get('APPVEYOR'), 'XXX: issue 32458')
class ProactorStartTLSTests(BaseStartTLS, unittest.TestCase):

def new_loop(self):
Expand Down