From 3a68749a3507d66442eff47969a8004d6f7c82c1 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Fri, 18 Aug 2017 14:12:24 +0200 Subject: [PATCH 01/14] AF_UNIX SOCK_DGRAM support in asyncio --- Lib/asyncio/selector_events.py | 118 +++++++++++++++++++++++++++++++++ Lib/asyncio/unix_events.py | 61 +++++++++++++++++ 2 files changed, 179 insertions(+) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 4b403560c31b2d..dc1d97d9e96c79 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -101,6 +101,11 @@ def _make_datagram_transport(self, sock, protocol, return _SelectorDatagramTransport(self, sock, protocol, address, waiter, extra) + def _make_socket_datagram_transport(self, sock, protocol, address, + waiter=None, extra=None): + return _SelectorSocketDatagramTransport(self, sock, protocol, address, + waiter, extra) + def close(self): if self.is_running(): raise RuntimeError("Cannot close a running event loop") @@ -1135,3 +1140,116 @@ def _sendto_ready(self): self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) + + +class _SelectorSocketDatagramTransport(_SelectorTransport): + + def __init__(self, loop, sock, protocol, address, waiter=None, extra=None): + super().__init__(loop, sock, protocol, extra) + self._address = address + self._closing_waiter = None + self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop._add_reader, self._sock_fd, + self._read_ready) + if waiter is not None: + # only wake up the waiter when connection_made() has been called + self._loop.call_soon(futures._set_result_unless_cancelled, + waiter, None) + + def _read_ready(self): + if self._conn_lost: + return + try: + data = self._sock.recv(self.max_size) + except (BlockingIOError, InterruptedError): + pass + except Exception as exc: + self._fatal_error(exc, 'Fatal read error on datagram transport') + else: + self._protocol.datagram_received(data) + + def send(self, data): + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) + if not data: + return + + if self._conn_lost: + if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: + logger.warning('socket.send() raised exception.') + self._conn_lost += 1 + return + + if not self._buffer: + # Attempt to send it right away first. + try: + self._sock.sendto(data, self._address) + return + except (BlockingIOError, InterruptedError): + self._loop._add_writer(self._sock_fd, self._sendto_ready) + except OSError as exc: + self._protocol.error_received(exc) + return + except Exception as exc: + self._fatal_error(exc, + 'Fatal write error on datagram transport') + return + + # Ensure that what we buffer is immutable. + self._buffer.append(bytes(data)) + self._maybe_pause_protocol() + + def _sendto_ready(self): + while self._buffer: + data = self._buffer.popleft() + try: + self._sock.sendto(data, self._address) + except (BlockingIOError, InterruptedError): + self._buffer.appendleft(data) # Try again later. + break + except OSError as exc: + self._protocol.error_received(exc) + return + except Exception as exc: + self._fatal_error(exc, + 'Fatal write error on datagram transport') + return + + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer: + self._loop._remove_writer(self._sock_fd) + if self._closing: + self._call_connection_lost(None) + + def close(self): + if self._closing: + return + self._closing = True + self._closing_waiter = self._loop.create_future() + self._loop._remove_reader(self._sock_fd) + if not self._buffer: + self._conn_lost += 1 + self._loop._remove_writer(self._sock_fd) + self._loop.call_soon(self._call_connection_lost, None) + + @coroutine + def wait_closed(self): + if self._closing_waiter: + yield from self._closing_waiter + else: + return + + def _call_connection_lost(self, exc): + try: + if self._protocol_connected: + self._protocol.connection_lost(exc) + finally: + self._sock.close() + self._sock = None + self._protocol = None + self._loop = None + + if self._closing_waiter: + self._closing_waiter.set_result(None) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index bf682a1a98a39f..49c3b7b7c52d09 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -307,6 +307,67 @@ def create_unix_server(self, protocol_factory, path=None, *, self._start_serving(protocol_factory, sock, ssl, server) return server + @coroutine + def create_unix_datagram_endpoint(self, protocol_factory, path=None, *, + sock=None, bind=False): + if path is not None: + if sock is not None: + raise ValueError( + 'path and sock can not be specified at the same time') + + path = _fspath(path) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + + # Check for abstract socket. `str` and `bytes` paths are supported. + if bind and path[0] not in (0, '\x00'): + try: + if stat.S_ISSOCK(os.stat(path).st_mode): + os.remove(path) + except FileNotFoundError: + pass + except OSError as err: + # Directory may have permissions only to create socket. + logger.error('Unable to check or remove stale UNIX' + ' socket %r: %r', path, err) + + try: + sock.setblocking(False) + if bind: + sock.bind(path) + else: + yield from self.sock_connect(sock, path) + except: + sock.close() + raise + else: + if sock is None: + raise ValueError( + 'path was not specified, and no sock specified') + + if bind: + raise ValueError( + 'bind and sock can not be specified at the same time') + + if (sock.family != socket.AF_UNIX or + not base_events._is_dgram_socket(sock)): + raise ValueError( + 'A UNIX Domain Datagram Socket was expected, got {!r}' + .format(sock)) + sock.setblocking(False) + path = sock.getsockname() or sock.getpeername() + + protocol = protocol_factory() + waiter = self.create_future() + transport = self._make_socket_datagram_transport(sock, protocol, path, + waiter) + + try: + yield from waiter + except: + transport.close() + raise + + return transport, protocol if hasattr(os, 'set_blocking'): def _set_nonblocking(fd): From a2d0430ad8d9618f767419eeb9910a0806256073 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Mon, 21 Aug 2017 10:36:41 +0200 Subject: [PATCH 02/14] Changes required by Haypo review --- Lib/asyncio/selector_events.py | 8 ++++++-- Lib/asyncio/unix_events.py | 13 +++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index dc1d97d9e96c79..ebbc039fe19949 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1144,6 +1144,8 @@ def _sendto_ready(self): class _SelectorSocketDatagramTransport(_SelectorTransport): + _buffer_factory = collections.deque + def __init__(self, loop, sock, protocol, address, waiter=None, extra=None): super().__init__(loop, sock, protocol, extra) self._address = address @@ -1164,6 +1166,8 @@ def _read_ready(self): data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): pass + except OSError as exc: + self._protocol.error_received(exc) except Exception as exc: self._fatal_error(exc, 'Fatal read error on datagram transport') else: @@ -1188,7 +1192,7 @@ def send(self, data): self._sock.sendto(data, self._address) return except (BlockingIOError, InterruptedError): - self._loop._add_writer(self._sock_fd, self._sendto_ready) + self._loop._add_writer(self._sock_fd, self._send_ready) except OSError as exc: self._protocol.error_received(exc) return @@ -1201,7 +1205,7 @@ def send(self, data): self._buffer.append(bytes(data)) self._maybe_pause_protocol() - def _sendto_ready(self): + def _send_ready(self): while self._buffer: data = self._buffer.popleft() try: diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 49c3b7b7c52d09..0cbfc70bae2631 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -263,8 +263,6 @@ def create_unix_server(self, protocol_factory, path=None, *, 'path and sock can not be specified at the same time') path = _fspath(path) - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # Check for abstract socket. `str` and `bytes` paths are supported. if path[0] not in (0, '\x00'): try: @@ -274,8 +272,10 @@ def create_unix_server(self, protocol_factory, path=None, *, pass except OSError as err: # Directory may have permissions only to create socket. - logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err) + logger.error('Unable to check or remove stale UNIX' + ' socket %r: %r', path, err) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: sock.bind(path) except OSError as exc: @@ -316,8 +316,6 @@ def create_unix_datagram_endpoint(self, protocol_factory, path=None, *, 'path and sock can not be specified at the same time') path = _fspath(path) - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - # Check for abstract socket. `str` and `bytes` paths are supported. if bind and path[0] not in (0, '\x00'): try: @@ -330,6 +328,7 @@ def create_unix_datagram_endpoint(self, protocol_factory, path=None, *, logger.error('Unable to check or remove stale UNIX' ' socket %r: %r', path, err) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) try: sock.setblocking(False) if bind: @@ -354,7 +353,9 @@ def create_unix_datagram_endpoint(self, protocol_factory, path=None, *, 'A UNIX Domain Datagram Socket was expected, got {!r}' .format(sock)) sock.setblocking(False) - path = sock.getsockname() or sock.getpeername() + path = sock.getsockname() + if not path: + path = sock.getpeername() protocol = protocol_factory() waiter = self.create_future() From 77dfbefd0e58eb9f6c580392b18c7ffc183f7039 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Mon, 21 Aug 2017 11:03:15 +0200 Subject: [PATCH 03/14] Split create_unix_datagram into connection & server Split the loop.create_unix_datagram_endpoint method into loop.create_unix_datagram_connection and loop.create_unix_datagram_server for cleaner API --- Lib/asyncio/unix_events.py | 102 ++++++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 0cbfc70bae2631..21179cddbdffea 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -308,8 +308,106 @@ def create_unix_server(self, protocol_factory, path=None, *, return server @coroutine - def create_unix_datagram_endpoint(self, protocol_factory, path=None, *, - sock=None, bind=False): + def create_unix_datagram_connection(self, protocol_factory, path=None, *, + sock=None): + if path is not None: + if sock is not None: + raise ValueError( + 'path and sock can not be specified at the same time') + + path = _fspath(path) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.setblocking(False) + yield from self.sock_connect(sock, path) + except: + sock.close() + raise + else: + if sock is None: + raise ValueError( + 'path was not specified, and no sock specified') + + if (sock.family != socket.AF_UNIX or + not base_events._is_dgram_socket(sock)): + raise ValueError( + 'A UNIX Domain Datagram Socket was expected, got {!r}' + .format(sock)) + + sock.setblocking(False) + path = sock.getpeername() + + protocol = protocol_factory() + waiter = self.create_future() + transport = self._make_socket_datagram_transport(sock, protocol, path, + waiter) + + try: + yield from waiter + except: + transport.close() + raise + + return transport, protocol + + @coroutine + def create_unix_datagram_server(self, protocol_factory, path=None, *, + sock=None): + if path is not None: + if sock is not None: + raise ValueError( + 'path and sock can not be specified at the same time') + + path = _fspath(path) + # Check for abstract socket. `str` and `bytes` paths are supported. + if path[0] not in (0, '\x00'): + try: + if stat.S_ISSOCK(os.stat(path).st_mode): + os.remove(path) + except FileNotFoundError: + pass + except OSError as err: + # Directory may have permissions only to create socket. + logger.error('Unable to check or remove stale UNIX' + ' socket %r: %r', path, err) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.setblocking(False) + sock.bind(path) + except: + sock.close() + raise + else: + if sock is None: + raise ValueError( + 'path was not specified, and no sock specified') + + if (sock.family != socket.AF_UNIX or + not base_events._is_dgram_socket(sock)): + raise ValueError( + 'A UNIX Domain Datagram Socket was expected, got {!r}' + .format(sock)) + + sock.setblocking(False) + path = sock.getsockname() + + protocol = protocol_factory() + waiter = self.create_future() + transport = self._make_socket_datagram_transport(sock, protocol, path, + waiter) + + try: + yield from waiter + except: + transport.close() + raise + + return transport, protocol + + @coroutine + def _create_unix_datagram_endpoint(self, protocol_factory, path=None, *, + sock=None, bind=False): if path is not None: if sock is not None: raise ValueError( From df3b696bae6a006232ddd73e2cd74f1d8020b200 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Mon, 21 Aug 2017 11:06:12 +0200 Subject: [PATCH 04/14] delete create_unix_datagram_endpoint --- Lib/asyncio/unix_events.py | 63 -------------------------------------- 1 file changed, 63 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 21179cddbdffea..d0bc08d92a0d02 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -405,69 +405,6 @@ def create_unix_datagram_server(self, protocol_factory, path=None, *, return transport, protocol - @coroutine - def _create_unix_datagram_endpoint(self, protocol_factory, path=None, *, - sock=None, bind=False): - if path is not None: - if sock is not None: - raise ValueError( - 'path and sock can not be specified at the same time') - - path = _fspath(path) - # Check for abstract socket. `str` and `bytes` paths are supported. - if bind and path[0] not in (0, '\x00'): - try: - if stat.S_ISSOCK(os.stat(path).st_mode): - os.remove(path) - except FileNotFoundError: - pass - except OSError as err: - # Directory may have permissions only to create socket. - logger.error('Unable to check or remove stale UNIX' - ' socket %r: %r', path, err) - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - sock.setblocking(False) - if bind: - sock.bind(path) - else: - yield from self.sock_connect(sock, path) - except: - sock.close() - raise - else: - if sock is None: - raise ValueError( - 'path was not specified, and no sock specified') - - if bind: - raise ValueError( - 'bind and sock can not be specified at the same time') - - if (sock.family != socket.AF_UNIX or - not base_events._is_dgram_socket(sock)): - raise ValueError( - 'A UNIX Domain Datagram Socket was expected, got {!r}' - .format(sock)) - sock.setblocking(False) - path = sock.getsockname() - if not path: - path = sock.getpeername() - - protocol = protocol_factory() - waiter = self.create_future() - transport = self._make_socket_datagram_transport(sock, protocol, path, - waiter) - - try: - yield from waiter - except: - transport.close() - raise - - return transport, protocol - if hasattr(os, 'set_blocking'): def _set_nonblocking(fd): os.set_blocking(fd, False) From e5069ba4d74e931ccc45c57b097a87e0c8fcefec Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Mon, 21 Aug 2017 11:06:46 +0200 Subject: [PATCH 05/14] loop.create_unix_datagram_* documentation --- Doc/library/asyncio-eventloop.rst | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 83bbb70b037928..3d1b8a8a44fd35 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -409,6 +409,25 @@ Creating connections Availability: UNIX. +.. coroutinemethod:: AbstractEventLoop.create_unix_datagram_connection(protocol_factory, path=None, \*, sock=None) + + Create UNIX connection: socket family :py:data:`~socket.AF_UNIX`, socket + type :py:data:`~socket.SOCK_DGRAM`. The :py:data:`~socket.AF_UNIX` socket + family is used to communicate between processes on the same machine + efficiently. + + This method is a :ref:`coroutine ` which will try to + establish the connection in the background. When successful, the + coroutine returns a ``(transport, protocol)`` pair. + + *path* is the name of a UNIX domain socket, and is required unless a *sock* + parameter is specified. Abstract UNIX sockets, :class:`str`, and + :class:`bytes` paths are supported. + + See the :meth:`AbstractEventLoop.create_connection` method for parameters. + + Availability: UNIX. + Creating listening connections ------------------------------ @@ -476,7 +495,16 @@ Creating listening connections .. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None) Similar to :meth:`AbstractEventLoop.create_server`, but specific to the - socket family :py:data:`~socket.AF_UNIX`. + socket family :py:data:`~socket.AF_UNIX`, socket type :py:data:`~socket.SOCK_STREAM`. + + This method is a :ref:`coroutine `. + + Availability: UNIX. + +.. coroutinemethod:: AbstractEventLoop.create_unix_datagram_server(protocol_factory, path=None, \*, sock=None) + + Similar to :meth:`AbstractEventLoop.create_server`, but specific to the + socket family :py:data:`~socket.AF_UNIX`, socket type :py:data:`~socket.SOCK_DGRAM`. This method is a :ref:`coroutine `. From db73adaf25dd637bd9edcac5ebb6f9288fa524a9 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Mon, 21 Aug 2017 15:15:29 +0200 Subject: [PATCH 06/14] Tests --- Lib/asyncio/protocols.py | 15 +- Lib/asyncio/unix_events.py | 9 + Lib/test/test_asyncio/test_selector_events.py | 258 ++++++++++++++++++ Lib/test/test_asyncio/test_unix_events.py | 121 ++++++++ 4 files changed, 402 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index 80fcac9a82dfb2..0d99ce55df4d40 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -1,7 +1,7 @@ """Abstract Protocol class.""" __all__ = ['BaseProtocol', 'Protocol', 'DatagramProtocol', - 'SubprocessProtocol'] + 'SubprocessProtocol', 'UnixDatagramProtocol'] class BaseProtocol: @@ -113,6 +113,19 @@ def error_received(self, exc): """ +class UnixDatagramProtocol(BaseProtocol): + """Interface for unix socket datagram protocol.""" + + def datagram_received(self, data): + """Called when some datagram is received.""" + + def error_received(self, exc): + """Called when a send or receive operation raises an OSError. + + (Other than BlockingIOError or InterruptedError.) + """ + + class SubprocessProtocol(BaseProtocol): """Interface for protocol for subprocess calls.""" diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index d0bc08d92a0d02..7c2ff4f6ae92c6 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -375,6 +375,15 @@ def create_unix_datagram_server(self, protocol_factory, path=None, *, try: sock.setblocking(False) sock.bind(path) + except OSError as exc: + sock.close() + if exc.errno == errno.EADDRINUSE: + # Let's improve the error message by adding + # with what exact address it occurs. + msg = 'Address {!r} is already in use'.format(path) + raise OSError(errno.EADDRINUSE, msg) from None + else: + raise except: sock.close() raise diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index c50b3e49565c92..00140d092c789a 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -17,6 +17,7 @@ from asyncio.selector_events import _SelectorSslTransport from asyncio.selector_events import _SelectorSocketTransport from asyncio.selector_events import _SelectorDatagramTransport +from asyncio.selector_events import _SelectorSocketDatagramTransport MOCK_ANY = mock.ANY @@ -1619,6 +1620,7 @@ def test_sendto_buffer_memoryview(self): [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], list(transport._buffer)) + print(transport._buffer) self.assertIsInstance(transport._buffer[1][0], bytes) def test_sendto_tryagain(self): @@ -1787,5 +1789,261 @@ def test_fatal_error_connected(self, m_exc): 'Fatal error on transport\nprotocol:.*\ntransport:.*'), exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) + +class SelectorSocketDatagramTransportTests(test_utils.TestCase): + + def setUp(self): + super().setUp() + self.loop = self.new_test_loop() + self.protocol = test_utils.make_test_protocol( + asyncio.UnixDatagramProtocol) + self.sock = mock.Mock(socket.socket) + self.sock_fd = self.sock.fileno.return_value = 7 + self.address = 'testsocket' + + def socket_datagram_transport(self, waiter=None): + transport = _SelectorSocketDatagramTransport(self.loop, self.sock, + self.protocol, + address=self.address, + waiter=waiter) + self.addCleanup(close_transport, transport) + return transport + + def test_ctor(self): + waiter = asyncio.Future(loop=self.loop) + tr = self.socket_datagram_transport(waiter=waiter) + self.loop.run_until_complete(waiter) + + self.loop.assert_reader(7, tr._read_ready) + test_utils.run_briefly(self.loop) + self.protocol.connection_made.assert_called_with(tr) + + def test_ctor_with_waiter(self): + waiter = asyncio.Future(loop=self.loop) + self.socket_datagram_transport(waiter=waiter) + self.loop.run_until_complete(waiter) + self.assertIsNone(waiter.result()) + + def test_read_ready(self): + transport = self.socket_datagram_transport() + self.sock.recv.return_value = b'data' + transport._read_ready() + self.protocol.datagram_received.assert_called_with(b'data') + + def test_read_ready_tryagain(self): + self.sock.recv.side_effect = BlockingIOError + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport._read_ready() + + self.assertFalse(transport._fatal_error.called) + + def test_read_ready_err(self): + err = self.sock.recv.side_effect = ValueError() + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport._read_ready() + transport._fatal_error.assert_called_with(err, + 'Fatal read error on' + ' datagram transport') + + def test_read_ready_oserr(self): + err = self.sock.recv.side_effect = OSError() + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport._read_ready() + self.protocol.error_received.assert_called_with(err) + + def test_send(self): + data = b'data' + transport = self.socket_datagram_transport() + transport.send(data) + self.assertTrue(self.sock.sendto.called) + self.assertEqual(self.sock.sendto.call_args[0], (data, self.address)) + + def test_send_bytearray(self): + data = bytearray(b'data') + transport = self.socket_datagram_transport() + transport.send(data) + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, self.address)) + + def test_send_memoryview(self): + data = memoryview(b'data') + transport = self.socket_datagram_transport() + transport.send(data) + self.assertTrue(self.sock.sendto.called) + self.assertEqual(self.sock.sendto.call_args[0], (data, self.address)) + + def test_send_no_data(self): + transport = self.socket_datagram_transport() + transport._buffer.append(b'data') + transport.send(b'') + self.assertFalse(self.sock.sendto.called) + self.assertEqual([b'data'], list(transport._buffer)) + + def test_send_buffer(self): + transport = self.socket_datagram_transport() + transport._buffer.append(b'data1') + transport.send(b'data2') + self.assertFalse(self.sock.sendto.called) + self.assertEqual([b'data1', b'data2'], list(transport._buffer)) + + def test_send_buffer_bytearray(self): + data2 = bytearray(b'data2') + transport = self.socket_datagram_transport() + transport._buffer.append(b'data1') + transport.send(data2) + self.assertFalse(self.sock.sendto.called) + self.assertEqual([b'data1', b'data2'], list(transport._buffer)) + self.assertIsInstance(transport._buffer[1], bytes) + + def test_send_buffer_memoryview(self): + data2 = memoryview(b'data2') + transport = self.socket_datagram_transport() + transport._buffer.append(b'data1') + transport.send(data2) + self.assertFalse(self.sock.sendto.called) + self.assertEqual([b'data1', b'data2'], list(transport._buffer)) + self.assertIsInstance(transport._buffer[1], bytes) + + def test_send_tryagain(self): + data = b'data' + self.sock.sendto.side_effect = BlockingIOError + + transport = self.socket_datagram_transport() + transport.send(data) + + self.loop.assert_writer(7, transport._send_ready) + self.assertEqual([b'data'], list(transport._buffer)) + + @mock.patch('asyncio.selector_events.logger') + def test_sendto_exception(self, m_log): + data = b'data' + err = self.sock.sendto.side_effect = RuntimeError() + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport.send(data) + + self.assertTrue(transport._fatal_error.called) + transport._fatal_error.assert_called_with( + err, + 'Fatal write error on datagram transport') + transport._conn_lost = 1 + + transport.send(data) + transport.send(data) + transport.send(data) + transport.send(data) + transport.send(data) + m_log.warning.assert_called_with('socket.send() raised exception.') + + def test_send_error_received(self): + data = b'data' + + self.sock.sendto.side_effect = ConnectionRefusedError + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport.send(data) + + self.assertEqual(transport._conn_lost, 0) + self.assertFalse(transport._fatal_error.called) + self.assertTrue(self.protocol.error_received.called) + + def test_send_str(self): + transport = self.socket_datagram_transport() + self.assertRaises(TypeError, transport.send, 'str') + + def test_send_closing(self): + transport = self.socket_datagram_transport() + transport.close() + self.assertEqual(transport._conn_lost, 1) + transport.send(b'data') + self.assertEqual(transport._conn_lost, 2) + + def test_send_ready(self): + data = b'data' + self.sock.sendto.return_value = len(data) + + transport = self.socket_datagram_transport() + transport._buffer.append(data) + self.loop._add_writer(7, transport._send_ready) + transport._send_ready() + self.assertTrue(self.sock.sendto.called) + self.assertEqual(self.sock.sendto.call_args[0], (data, self.address)) + self.assertFalse(self.loop.writers) + + def test_send_ready_closing(self): + data = b'data' + self.sock.sendto.return_value = len(data) + + transport = self.socket_datagram_transport() + transport._closing = True + transport._buffer.append(data) + self.loop._add_writer(7, transport._send_ready) + transport._send_ready() + self.sock.sendto.assert_called_with(data, self.address) + self.assertFalse(self.loop.writers) + self.sock.close.assert_called_with() + self.protocol.connection_lost.assert_called_with(None) + + def test_send_ready_no_data(self): + transport = self.socket_datagram_transport() + self.loop._add_writer(7, transport._send_ready) + transport._send_ready() + self.assertFalse(self.sock.sendto.called) + self.assertFalse(self.loop.writers) + + def test_send_ready_tryagain(self): + self.sock.sendto.side_effect = BlockingIOError + + transport = self.socket_datagram_transport() + transport._buffer.extend([b'data1', b'data2']) + self.loop._add_writer(7, transport._send_ready) + transport._send_ready() + + self.loop.assert_writer(7, transport._send_ready) + self.assertEqual([b'data1', b'data2'], list(transport._buffer)) + + def test_send_ready_exception(self): + err = self.sock.sendto.side_effect = RuntimeError() + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport._buffer.append(b'data') + transport._send_ready() + + transport._fatal_error.assert_called_with( + err, + 'Fatal write error on datagram transport') + + def test_send_ready_error_received(self): + self.sock.sendto.side_effect = ConnectionRefusedError + + transport = self.socket_datagram_transport() + transport._fatal_error = mock.Mock() + transport._buffer.append(b'data') + transport._send_ready() + + self.assertFalse(transport._fatal_error.called) + self.assertTrue(self.protocol.error_received.called) + + @mock.patch('asyncio.base_events.logger.error') + def test_fatal_error_connected(self, m_exc): + transport = self.socket_datagram_transport() + err = ConnectionRefusedError() + transport._fatal_error(err) + self.assertFalse(self.protocol.error_received.called) + m_exc.assert_called_with( + test_utils.MockPattern( + 'Fatal error on transport\nprotocol:.*\ntransport:.*'), + exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 11f0890d65f964..2b92a728aed261 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -372,6 +372,127 @@ def test_create_unix_connection_ssl_noserverhost(self): self.loop.run_until_complete(coro) + def test_create_unix_datagram_server_existing_path_sock(self): + with test_utils.unix_socket_path() as path: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.bind(path) + sock.close() + + coro = self.loop.create_unix_datagram_server(mock.MagicMock, path) + transport, protocol = self.loop.run_until_complete(coro) + transport.close() + self.loop.run_until_complete(transport.wait_closed()) + assert protocol.connection_made.call_count == 1 + + @unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath') + def test_create_unix_datagram_server_pathlib(self): + with test_utils.unix_socket_path() as path: + path = pathlib.Path(path) + coro = self.loop.create_unix_datagram_server(mock.MagicMock, path) + transport, protocol = self.loop.run_until_complete(coro) + transport.close() + self.loop.run_until_complete(transport.wait_closed()) + assert protocol.connection_made.call_count == 1 + + def test_create_unix_datagram_server_existing_path_nonsock(self): + with tempfile.NamedTemporaryFile() as file: + coro = self.loop.create_unix_datagram_server(mock.MagicMock, + file.name) + with self.assertRaisesRegex(OSError, + 'Address.*is already in use'): + self.loop.run_until_complete(coro) + + def test_create_unix_datagram_server_nopath_nosock(self): + coro = self.loop.create_unix_datagram_server(mock.MagicMock, path=None) + with self.assertRaisesRegex(ValueError, + 'path was not specified, and no sock'): + self.loop.run_until_complete(coro) + + def test_create_unix_datagram_server_path_inetsock(self): + sock = socket.socket() + with sock: + coro = self.loop.create_unix_datagram_server(mock.MagicMock, + path=None, sock=sock) + with self.assertRaisesRegex(ValueError, + 'A UNIX Domain Datagram.*was ' + 'expected'): + self.loop.run_until_complete(coro) + + def test_create_unix_datagram_server_path_stream(self): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + with sock: + coro = self.loop.create_unix_datagram_server(mock.MagicMock, + path=None, sock=sock) + with self.assertRaisesRegex(ValueError, + 'A UNIX Domain Datagram.*was ' + 'expected'): + self.loop.run_until_complete(coro) + + @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), + 'no socket.SOCK_NONBLOCK (linux only)') + def test_create_unix_datagram_server_path_stream_bittype(self): + sock = socket.socket( + socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK) + with tempfile.NamedTemporaryFile() as file: + fn = file.name + try: + with sock: + sock.bind(fn) + coro = self.loop.create_unix_datagram_server(mock.MagicMock, + path=None, + sock=sock) + transport, protocol = self.loop.run_until_complete(coro) + transport.close() + self.loop.run_until_complete(transport.wait_closed()) + assert protocol.connection_made.call_count == 1 + finally: + os.unlink(fn) + + def test_create_unix_datagram_connection_path_inetsock(self): + sock = socket.socket() + with sock: + coro = self.loop.create_unix_datagram_connection(mock.MagicMock, + path=None, + sock=sock) + with self.assertRaisesRegex(ValueError, + 'A UNIX Domain Datagram.*was ' + 'expected'): + self.loop.run_until_complete(coro) + + @mock.patch('asyncio.unix_events.socket') + def test_create_unix_datagram_server_bind_error(self, m_socket): + # Ensure that the socket is closed on any bind error + sock = mock.Mock() + m_socket.socket.return_value = sock + + sock.bind.side_effect = OSError + coro = self.loop.create_unix_datagram_server(mock.MagicMock, + path="/test") + with self.assertRaises(OSError): + self.loop.run_until_complete(coro) + self.assertTrue(sock.close.called) + + sock.bind.side_effect = MemoryError + coro = self.loop.create_unix_datagram_server(mock.MagicMock, + path="/test") + with self.assertRaises(MemoryError): + self.loop.run_until_complete(coro) + self.assertTrue(sock.close.called) + + def test_create_unix_datagram_connection_path_sock(self): + coro = self.loop.create_unix_datagram_connection( + lambda: None, os.devnull, sock=object()) + with self.assertRaisesRegex(ValueError, 'path and sock can not be'): + self.loop.run_until_complete(coro) + + def test_create_unix_datagram_connection_nopath_nosock(self): + coro = self.loop.create_unix_datagram_connection( + lambda: None, None) + with self.assertRaisesRegex(ValueError, + 'path was not specified, and no sock' + ' specified'): + self.loop.run_until_complete(coro) + class UnixReadPipeTransportTests(test_utils.TestCase): From 6234c20b72a95a4ff9832d62f32af9ec97777812 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Tue, 22 Aug 2017 09:35:59 +0200 Subject: [PATCH 07/14] fix travis --- Lib/asyncio/selector_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index ebbc039fe19949..7b4beb324b3af7 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1237,7 +1237,7 @@ def close(self): self._conn_lost += 1 self._loop._remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) - + @coroutine def wait_closed(self): if self._closing_waiter: From f82fd5fe90669bddd94def9c1457418da1c5876d Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Tue, 22 Aug 2017 09:44:59 +0200 Subject: [PATCH 08/14] Remove not needed print --- Lib/test/test_asyncio/test_selector_events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 00140d092c789a..d40ada27a0f2ac 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1620,7 +1620,6 @@ def test_sendto_buffer_memoryview(self): [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], list(transport._buffer)) - print(transport._buffer) self.assertIsInstance(transport._buffer[1][0], bytes) def test_sendto_tryagain(self): From 43db3c1b9c3951df38bbad5a1d09bf79b79585ab Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Tue, 22 Aug 2017 11:11:53 +0200 Subject: [PATCH 09/14] Misc/News --- .../next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst diff --git a/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst b/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst new file mode 100644 index 00000000000000..e30b93aba898b6 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst @@ -0,0 +1,2 @@ +Added `create_unix_datagram_connection` and `create_unix_datagram_server` to +unix event loop. From 94ee0b68f3b41e711d3d57610d8f5d31789629f3 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Wed, 23 Aug 2017 15:19:53 +0200 Subject: [PATCH 10/14] Haypore review 2. Use sock_fd, allow sending empty data --- Lib/asyncio/selector_events.py | 2 -- Lib/test/test_asyncio/test_selector_events.py | 8 +++----- Lib/test/test_asyncio/test_unix_events.py | 7 +++++-- .../next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst | 4 ++-- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 7b4beb324b3af7..fee139ea23e3cc 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1177,8 +1177,6 @@ def send(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): raise TypeError('data argument must be a bytes-like object, ' 'not %r' % type(data).__name__) - if not data: - return if self._conn_lost: if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index d40ada27a0f2ac..d3aa527bb7a203 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1813,7 +1813,7 @@ def test_ctor(self): tr = self.socket_datagram_transport(waiter=waiter) self.loop.run_until_complete(waiter) - self.loop.assert_reader(7, tr._read_ready) + self.loop.assert_reader(self.sock_fd, tr._read_ready) test_utils.run_briefly(self.loop) self.protocol.connection_made.assert_called_with(tr) @@ -1880,10 +1880,8 @@ def test_send_memoryview(self): def test_send_no_data(self): transport = self.socket_datagram_transport() - transport._buffer.append(b'data') transport.send(b'') - self.assertFalse(self.sock.sendto.called) - self.assertEqual([b'data'], list(transport._buffer)) + self.assertTrue(self.sock.sendto.called) def test_send_buffer(self): transport = self.socket_datagram_transport() @@ -1985,7 +1983,7 @@ def test_send_ready_closing(self): transport = self.socket_datagram_transport() transport._closing = True transport._buffer.append(data) - self.loop._add_writer(7, transport._send_ready) + self.loop._add_writer(self.sock_fd, transport._send_ready) transport._send_ready() self.sock.sendto.assert_called_with(data, self.address) self.assertFalse(self.loop.writers) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 2b92a728aed261..ab2d30b2109b6d 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -398,10 +398,13 @@ def test_create_unix_datagram_server_existing_path_nonsock(self): with tempfile.NamedTemporaryFile() as file: coro = self.loop.create_unix_datagram_server(mock.MagicMock, file.name) - with self.assertRaisesRegex(OSError, - 'Address.*is already in use'): + with self.assertRaises(OSError) as exc: self.loop.run_until_complete(coro) + assert exc.exception.errno == errno.EADDRINUSE + self.assertRegex(exc.exception.strerror, + 'Address.*is already in use') + def test_create_unix_datagram_server_nopath_nosock(self): coro = self.loop.create_unix_datagram_server(mock.MagicMock, path=None) with self.assertRaisesRegex(ValueError, diff --git a/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst b/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst index e30b93aba898b6..91df382879c9e4 100644 --- a/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst +++ b/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst @@ -1,2 +1,2 @@ -Added `create_unix_datagram_connection` and `create_unix_datagram_server` to -unix event loop. +Added in asyncio `create_unix_datagram_connection` and +`create_unix_datagram_server` to unix event loop. From c27a05eb6f1004664eb0d59f77d6afb0dbb0bf06 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 24 Aug 2017 15:25:23 +0200 Subject: [PATCH 11/14] bpo-31245: asyncio datagram support for AF_UNIX --- Lib/asyncio/base_events.py | 6 + Lib/asyncio/selector_events.py | 120 --------- Lib/asyncio/transports.py | 2 +- Lib/asyncio/unix_events.py | 112 +------- Lib/test/test_asyncio/test_base_events.py | 11 + Lib/test/test_asyncio/test_selector_events.py | 255 ------------------ Lib/test/test_asyncio/test_unix_events.py | 124 --------- .../2017-08-22-11-05-35.bpo-31245.AniZuz.rst | 3 +- 8 files changed, 22 insertions(+), 611 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 33b8f4887c6a64..c594579ac42425 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -859,6 +859,12 @@ def create_datagram_endpoint(self, protocol_factory, if family == 0: raise ValueError('unexpected address family') addr_pairs_info = (((family, proto), (None, None)),) + elif family == socket.AF_UNIX: + for addr in (local_addr, remote_addr): + if addr is not None: + assert isinstance(addr, str) + addr_pairs_info = (((family, proto), + (local_addr, remote_addr)), ) else: # join address by (family, protocol) addr_infos = collections.OrderedDict() diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index fee139ea23e3cc..4b403560c31b2d 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -101,11 +101,6 @@ def _make_datagram_transport(self, sock, protocol, return _SelectorDatagramTransport(self, sock, protocol, address, waiter, extra) - def _make_socket_datagram_transport(self, sock, protocol, address, - waiter=None, extra=None): - return _SelectorSocketDatagramTransport(self, sock, protocol, address, - waiter, extra) - def close(self): if self.is_running(): raise RuntimeError("Cannot close a running event loop") @@ -1140,118 +1135,3 @@ def _sendto_ready(self): self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) - - -class _SelectorSocketDatagramTransport(_SelectorTransport): - - _buffer_factory = collections.deque - - def __init__(self, loop, sock, protocol, address, waiter=None, extra=None): - super().__init__(loop, sock, protocol, extra) - self._address = address - self._closing_waiter = None - self._loop.call_soon(self._protocol.connection_made, self) - # only start reading when connection_made() has been called - self._loop.call_soon(self._loop._add_reader, self._sock_fd, - self._read_ready) - if waiter is not None: - # only wake up the waiter when connection_made() has been called - self._loop.call_soon(futures._set_result_unless_cancelled, - waiter, None) - - def _read_ready(self): - if self._conn_lost: - return - try: - data = self._sock.recv(self.max_size) - except (BlockingIOError, InterruptedError): - pass - except OSError as exc: - self._protocol.error_received(exc) - except Exception as exc: - self._fatal_error(exc, 'Fatal read error on datagram transport') - else: - self._protocol.datagram_received(data) - - def send(self, data): - if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be a bytes-like object, ' - 'not %r' % type(data).__name__) - - if self._conn_lost: - if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: - logger.warning('socket.send() raised exception.') - self._conn_lost += 1 - return - - if not self._buffer: - # Attempt to send it right away first. - try: - self._sock.sendto(data, self._address) - return - except (BlockingIOError, InterruptedError): - self._loop._add_writer(self._sock_fd, self._send_ready) - except OSError as exc: - self._protocol.error_received(exc) - return - except Exception as exc: - self._fatal_error(exc, - 'Fatal write error on datagram transport') - return - - # Ensure that what we buffer is immutable. - self._buffer.append(bytes(data)) - self._maybe_pause_protocol() - - def _send_ready(self): - while self._buffer: - data = self._buffer.popleft() - try: - self._sock.sendto(data, self._address) - except (BlockingIOError, InterruptedError): - self._buffer.appendleft(data) # Try again later. - break - except OSError as exc: - self._protocol.error_received(exc) - return - except Exception as exc: - self._fatal_error(exc, - 'Fatal write error on datagram transport') - return - - self._maybe_resume_protocol() # May append to buffer. - if not self._buffer: - self._loop._remove_writer(self._sock_fd) - if self._closing: - self._call_connection_lost(None) - - def close(self): - if self._closing: - return - self._closing = True - self._closing_waiter = self._loop.create_future() - self._loop._remove_reader(self._sock_fd) - if not self._buffer: - self._conn_lost += 1 - self._loop._remove_writer(self._sock_fd) - self._loop.call_soon(self._call_connection_lost, None) - - @coroutine - def wait_closed(self): - if self._closing_waiter: - yield from self._closing_waiter - else: - return - - def _call_connection_lost(self, exc): - try: - if self._protocol_connected: - self._protocol.connection_lost(exc) - finally: - self._sock.close() - self._sock = None - self._protocol = None - self._loop = None - - if self._closing_waiter: - self._closing_waiter.set_result(None) diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py index a94079f433adfc..1edbd46160f596 100644 --- a/Lib/asyncio/transports.py +++ b/Lib/asyncio/transports.py @@ -88,7 +88,7 @@ def get_write_buffer_size(self): """Return the current size of the write buffer.""" raise NotImplementedError - def write(self, data): + def nuwrite(self, data): """Write some data bytes to the transport. This does not block; it buffers the data and arranges for it diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 7c2ff4f6ae92c6..bf682a1a98a39f 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -263,6 +263,8 @@ def create_unix_server(self, protocol_factory, path=None, *, 'path and sock can not be specified at the same time') path = _fspath(path) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # Check for abstract socket. `str` and `bytes` paths are supported. if path[0] not in (0, '\x00'): try: @@ -272,10 +274,8 @@ def create_unix_server(self, protocol_factory, path=None, *, pass except OSError as err: # Directory may have permissions only to create socket. - logger.error('Unable to check or remove stale UNIX' - ' socket %r: %r', path, err) + logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err) - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: sock.bind(path) except OSError as exc: @@ -307,112 +307,6 @@ def create_unix_server(self, protocol_factory, path=None, *, self._start_serving(protocol_factory, sock, ssl, server) return server - @coroutine - def create_unix_datagram_connection(self, protocol_factory, path=None, *, - sock=None): - if path is not None: - if sock is not None: - raise ValueError( - 'path and sock can not be specified at the same time') - - path = _fspath(path) - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - sock.setblocking(False) - yield from self.sock_connect(sock, path) - except: - sock.close() - raise - else: - if sock is None: - raise ValueError( - 'path was not specified, and no sock specified') - - if (sock.family != socket.AF_UNIX or - not base_events._is_dgram_socket(sock)): - raise ValueError( - 'A UNIX Domain Datagram Socket was expected, got {!r}' - .format(sock)) - - sock.setblocking(False) - path = sock.getpeername() - - protocol = protocol_factory() - waiter = self.create_future() - transport = self._make_socket_datagram_transport(sock, protocol, path, - waiter) - - try: - yield from waiter - except: - transport.close() - raise - - return transport, protocol - - @coroutine - def create_unix_datagram_server(self, protocol_factory, path=None, *, - sock=None): - if path is not None: - if sock is not None: - raise ValueError( - 'path and sock can not be specified at the same time') - - path = _fspath(path) - # Check for abstract socket. `str` and `bytes` paths are supported. - if path[0] not in (0, '\x00'): - try: - if stat.S_ISSOCK(os.stat(path).st_mode): - os.remove(path) - except FileNotFoundError: - pass - except OSError as err: - # Directory may have permissions only to create socket. - logger.error('Unable to check or remove stale UNIX' - ' socket %r: %r', path, err) - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - sock.setblocking(False) - sock.bind(path) - except OSError as exc: - sock.close() - if exc.errno == errno.EADDRINUSE: - # Let's improve the error message by adding - # with what exact address it occurs. - msg = 'Address {!r} is already in use'.format(path) - raise OSError(errno.EADDRINUSE, msg) from None - else: - raise - except: - sock.close() - raise - else: - if sock is None: - raise ValueError( - 'path was not specified, and no sock specified') - - if (sock.family != socket.AF_UNIX or - not base_events._is_dgram_socket(sock)): - raise ValueError( - 'A UNIX Domain Datagram Socket was expected, got {!r}' - .format(sock)) - - sock.setblocking(False) - path = sock.getsockname() - - protocol = protocol_factory() - waiter = self.create_future() - transport = self._make_socket_datagram_transport(sock, protocol, path, - waiter) - - try: - yield from waiter - except: - transport.close() - raise - - return transport, protocol if hasattr(os, 'set_blocking'): def _set_nonblocking(fd): diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 3f1ec651742e43..a0ce9fa55e06af 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1528,6 +1528,17 @@ def test_create_datagram_endpoint_sock(self): self.loop.run_until_complete(protocol.done) self.assertEqual('CLOSED', protocol.state) + @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') + def test_create_datagram_endpoint_sock_unix(self): + fut = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(create_future=True, loop=self.loop), + family=socket.AF_UNIX) + transport, protocol = self.loop.run_until_complete(fut) + assert transport._sock.family == socket.AF_UNIX + transport.close() + self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) + def test_create_datagram_endpoint_sock_sockopts(self): class FakeSock: type = socket.SOCK_DGRAM diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index d3aa527bb7a203..c50b3e49565c92 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -17,7 +17,6 @@ from asyncio.selector_events import _SelectorSslTransport from asyncio.selector_events import _SelectorSocketTransport from asyncio.selector_events import _SelectorDatagramTransport -from asyncio.selector_events import _SelectorSocketDatagramTransport MOCK_ANY = mock.ANY @@ -1788,259 +1787,5 @@ def test_fatal_error_connected(self, m_exc): 'Fatal error on transport\nprotocol:.*\ntransport:.*'), exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) - -class SelectorSocketDatagramTransportTests(test_utils.TestCase): - - def setUp(self): - super().setUp() - self.loop = self.new_test_loop() - self.protocol = test_utils.make_test_protocol( - asyncio.UnixDatagramProtocol) - self.sock = mock.Mock(socket.socket) - self.sock_fd = self.sock.fileno.return_value = 7 - self.address = 'testsocket' - - def socket_datagram_transport(self, waiter=None): - transport = _SelectorSocketDatagramTransport(self.loop, self.sock, - self.protocol, - address=self.address, - waiter=waiter) - self.addCleanup(close_transport, transport) - return transport - - def test_ctor(self): - waiter = asyncio.Future(loop=self.loop) - tr = self.socket_datagram_transport(waiter=waiter) - self.loop.run_until_complete(waiter) - - self.loop.assert_reader(self.sock_fd, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) - - def test_ctor_with_waiter(self): - waiter = asyncio.Future(loop=self.loop) - self.socket_datagram_transport(waiter=waiter) - self.loop.run_until_complete(waiter) - self.assertIsNone(waiter.result()) - - def test_read_ready(self): - transport = self.socket_datagram_transport() - self.sock.recv.return_value = b'data' - transport._read_ready() - self.protocol.datagram_received.assert_called_with(b'data') - - def test_read_ready_tryagain(self): - self.sock.recv.side_effect = BlockingIOError - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport._read_ready() - - self.assertFalse(transport._fatal_error.called) - - def test_read_ready_err(self): - err = self.sock.recv.side_effect = ValueError() - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport._read_ready() - transport._fatal_error.assert_called_with(err, - 'Fatal read error on' - ' datagram transport') - - def test_read_ready_oserr(self): - err = self.sock.recv.side_effect = OSError() - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport._read_ready() - self.protocol.error_received.assert_called_with(err) - - def test_send(self): - data = b'data' - transport = self.socket_datagram_transport() - transport.send(data) - self.assertTrue(self.sock.sendto.called) - self.assertEqual(self.sock.sendto.call_args[0], (data, self.address)) - - def test_send_bytearray(self): - data = bytearray(b'data') - transport = self.socket_datagram_transport() - transport.send(data) - self.assertTrue(self.sock.sendto.called) - self.assertEqual( - self.sock.sendto.call_args[0], (data, self.address)) - - def test_send_memoryview(self): - data = memoryview(b'data') - transport = self.socket_datagram_transport() - transport.send(data) - self.assertTrue(self.sock.sendto.called) - self.assertEqual(self.sock.sendto.call_args[0], (data, self.address)) - - def test_send_no_data(self): - transport = self.socket_datagram_transport() - transport.send(b'') - self.assertTrue(self.sock.sendto.called) - - def test_send_buffer(self): - transport = self.socket_datagram_transport() - transport._buffer.append(b'data1') - transport.send(b'data2') - self.assertFalse(self.sock.sendto.called) - self.assertEqual([b'data1', b'data2'], list(transport._buffer)) - - def test_send_buffer_bytearray(self): - data2 = bytearray(b'data2') - transport = self.socket_datagram_transport() - transport._buffer.append(b'data1') - transport.send(data2) - self.assertFalse(self.sock.sendto.called) - self.assertEqual([b'data1', b'data2'], list(transport._buffer)) - self.assertIsInstance(transport._buffer[1], bytes) - - def test_send_buffer_memoryview(self): - data2 = memoryview(b'data2') - transport = self.socket_datagram_transport() - transport._buffer.append(b'data1') - transport.send(data2) - self.assertFalse(self.sock.sendto.called) - self.assertEqual([b'data1', b'data2'], list(transport._buffer)) - self.assertIsInstance(transport._buffer[1], bytes) - - def test_send_tryagain(self): - data = b'data' - self.sock.sendto.side_effect = BlockingIOError - - transport = self.socket_datagram_transport() - transport.send(data) - - self.loop.assert_writer(7, transport._send_ready) - self.assertEqual([b'data'], list(transport._buffer)) - - @mock.patch('asyncio.selector_events.logger') - def test_sendto_exception(self, m_log): - data = b'data' - err = self.sock.sendto.side_effect = RuntimeError() - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport.send(data) - - self.assertTrue(transport._fatal_error.called) - transport._fatal_error.assert_called_with( - err, - 'Fatal write error on datagram transport') - transport._conn_lost = 1 - - transport.send(data) - transport.send(data) - transport.send(data) - transport.send(data) - transport.send(data) - m_log.warning.assert_called_with('socket.send() raised exception.') - - def test_send_error_received(self): - data = b'data' - - self.sock.sendto.side_effect = ConnectionRefusedError - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport.send(data) - - self.assertEqual(transport._conn_lost, 0) - self.assertFalse(transport._fatal_error.called) - self.assertTrue(self.protocol.error_received.called) - - def test_send_str(self): - transport = self.socket_datagram_transport() - self.assertRaises(TypeError, transport.send, 'str') - - def test_send_closing(self): - transport = self.socket_datagram_transport() - transport.close() - self.assertEqual(transport._conn_lost, 1) - transport.send(b'data') - self.assertEqual(transport._conn_lost, 2) - - def test_send_ready(self): - data = b'data' - self.sock.sendto.return_value = len(data) - - transport = self.socket_datagram_transport() - transport._buffer.append(data) - self.loop._add_writer(7, transport._send_ready) - transport._send_ready() - self.assertTrue(self.sock.sendto.called) - self.assertEqual(self.sock.sendto.call_args[0], (data, self.address)) - self.assertFalse(self.loop.writers) - - def test_send_ready_closing(self): - data = b'data' - self.sock.sendto.return_value = len(data) - - transport = self.socket_datagram_transport() - transport._closing = True - transport._buffer.append(data) - self.loop._add_writer(self.sock_fd, transport._send_ready) - transport._send_ready() - self.sock.sendto.assert_called_with(data, self.address) - self.assertFalse(self.loop.writers) - self.sock.close.assert_called_with() - self.protocol.connection_lost.assert_called_with(None) - - def test_send_ready_no_data(self): - transport = self.socket_datagram_transport() - self.loop._add_writer(7, transport._send_ready) - transport._send_ready() - self.assertFalse(self.sock.sendto.called) - self.assertFalse(self.loop.writers) - - def test_send_ready_tryagain(self): - self.sock.sendto.side_effect = BlockingIOError - - transport = self.socket_datagram_transport() - transport._buffer.extend([b'data1', b'data2']) - self.loop._add_writer(7, transport._send_ready) - transport._send_ready() - - self.loop.assert_writer(7, transport._send_ready) - self.assertEqual([b'data1', b'data2'], list(transport._buffer)) - - def test_send_ready_exception(self): - err = self.sock.sendto.side_effect = RuntimeError() - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport._buffer.append(b'data') - transport._send_ready() - - transport._fatal_error.assert_called_with( - err, - 'Fatal write error on datagram transport') - - def test_send_ready_error_received(self): - self.sock.sendto.side_effect = ConnectionRefusedError - - transport = self.socket_datagram_transport() - transport._fatal_error = mock.Mock() - transport._buffer.append(b'data') - transport._send_ready() - - self.assertFalse(transport._fatal_error.called) - self.assertTrue(self.protocol.error_received.called) - - @mock.patch('asyncio.base_events.logger.error') - def test_fatal_error_connected(self, m_exc): - transport = self.socket_datagram_transport() - err = ConnectionRefusedError() - transport._fatal_error(err) - self.assertFalse(self.protocol.error_received.called) - m_exc.assert_called_with( - test_utils.MockPattern( - 'Fatal error on transport\nprotocol:.*\ntransport:.*'), - exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index ab2d30b2109b6d..11f0890d65f964 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -372,130 +372,6 @@ def test_create_unix_connection_ssl_noserverhost(self): self.loop.run_until_complete(coro) - def test_create_unix_datagram_server_existing_path_sock(self): - with test_utils.unix_socket_path() as path: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock.bind(path) - sock.close() - - coro = self.loop.create_unix_datagram_server(mock.MagicMock, path) - transport, protocol = self.loop.run_until_complete(coro) - transport.close() - self.loop.run_until_complete(transport.wait_closed()) - assert protocol.connection_made.call_count == 1 - - @unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath') - def test_create_unix_datagram_server_pathlib(self): - with test_utils.unix_socket_path() as path: - path = pathlib.Path(path) - coro = self.loop.create_unix_datagram_server(mock.MagicMock, path) - transport, protocol = self.loop.run_until_complete(coro) - transport.close() - self.loop.run_until_complete(transport.wait_closed()) - assert protocol.connection_made.call_count == 1 - - def test_create_unix_datagram_server_existing_path_nonsock(self): - with tempfile.NamedTemporaryFile() as file: - coro = self.loop.create_unix_datagram_server(mock.MagicMock, - file.name) - with self.assertRaises(OSError) as exc: - self.loop.run_until_complete(coro) - - assert exc.exception.errno == errno.EADDRINUSE - self.assertRegex(exc.exception.strerror, - 'Address.*is already in use') - - def test_create_unix_datagram_server_nopath_nosock(self): - coro = self.loop.create_unix_datagram_server(mock.MagicMock, path=None) - with self.assertRaisesRegex(ValueError, - 'path was not specified, and no sock'): - self.loop.run_until_complete(coro) - - def test_create_unix_datagram_server_path_inetsock(self): - sock = socket.socket() - with sock: - coro = self.loop.create_unix_datagram_server(mock.MagicMock, - path=None, sock=sock) - with self.assertRaisesRegex(ValueError, - 'A UNIX Domain Datagram.*was ' - 'expected'): - self.loop.run_until_complete(coro) - - def test_create_unix_datagram_server_path_stream(self): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - with sock: - coro = self.loop.create_unix_datagram_server(mock.MagicMock, - path=None, sock=sock) - with self.assertRaisesRegex(ValueError, - 'A UNIX Domain Datagram.*was ' - 'expected'): - self.loop.run_until_complete(coro) - - @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), - 'no socket.SOCK_NONBLOCK (linux only)') - def test_create_unix_datagram_server_path_stream_bittype(self): - sock = socket.socket( - socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK) - with tempfile.NamedTemporaryFile() as file: - fn = file.name - try: - with sock: - sock.bind(fn) - coro = self.loop.create_unix_datagram_server(mock.MagicMock, - path=None, - sock=sock) - transport, protocol = self.loop.run_until_complete(coro) - transport.close() - self.loop.run_until_complete(transport.wait_closed()) - assert protocol.connection_made.call_count == 1 - finally: - os.unlink(fn) - - def test_create_unix_datagram_connection_path_inetsock(self): - sock = socket.socket() - with sock: - coro = self.loop.create_unix_datagram_connection(mock.MagicMock, - path=None, - sock=sock) - with self.assertRaisesRegex(ValueError, - 'A UNIX Domain Datagram.*was ' - 'expected'): - self.loop.run_until_complete(coro) - - @mock.patch('asyncio.unix_events.socket') - def test_create_unix_datagram_server_bind_error(self, m_socket): - # Ensure that the socket is closed on any bind error - sock = mock.Mock() - m_socket.socket.return_value = sock - - sock.bind.side_effect = OSError - coro = self.loop.create_unix_datagram_server(mock.MagicMock, - path="/test") - with self.assertRaises(OSError): - self.loop.run_until_complete(coro) - self.assertTrue(sock.close.called) - - sock.bind.side_effect = MemoryError - coro = self.loop.create_unix_datagram_server(mock.MagicMock, - path="/test") - with self.assertRaises(MemoryError): - self.loop.run_until_complete(coro) - self.assertTrue(sock.close.called) - - def test_create_unix_datagram_connection_path_sock(self): - coro = self.loop.create_unix_datagram_connection( - lambda: None, os.devnull, sock=object()) - with self.assertRaisesRegex(ValueError, 'path and sock can not be'): - self.loop.run_until_complete(coro) - - def test_create_unix_datagram_connection_nopath_nosock(self): - coro = self.loop.create_unix_datagram_connection( - lambda: None, None) - with self.assertRaisesRegex(ValueError, - 'path was not specified, and no sock' - ' specified'): - self.loop.run_until_complete(coro) - class UnixReadPipeTransportTests(test_utils.TestCase): diff --git a/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst b/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst index 91df382879c9e4..ce495ea0ba3783 100644 --- a/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst +++ b/Misc/NEWS.d/next/Library/2017-08-22-11-05-35.bpo-31245.AniZuz.rst @@ -1,2 +1 @@ -Added in asyncio `create_unix_datagram_connection` and -`create_unix_datagram_server` to unix event loop. +Added support for AF_UNIX socket in asyncio `create_datagram_endpoint`. From 567fdf9369baab9788806b09c59be37774ece771 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 24 Aug 2017 16:26:23 +0200 Subject: [PATCH 12/14] documentation --- Doc/library/asyncio-eventloop.rst | 37 +++++-------------------------- Lib/asyncio/events.py | 4 ++-- Lib/asyncio/protocols.py | 15 +------------ Lib/asyncio/transports.py | 2 +- 4 files changed, 9 insertions(+), 49 deletions(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 3d1b8a8a44fd35..69c78489398fb2 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -341,9 +341,10 @@ Creating connections .. coroutinemethod:: AbstractEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, \*, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None) - Create datagram connection: socket family :py:data:`~socket.AF_INET` or - :py:data:`~socket.AF_INET6` depending on *host* (or *family* if specified), - socket type :py:data:`~socket.SOCK_DGRAM`. *protocol_factory* must be a + Create datagram connection: socket family :py:data:`~socket.AF_INET`, + :py:data:`~socket.AF_INET6` or :py:data:`~socket.AF_UNIX` depending on + *host* (or *family* if specified), socket type + :py:data:`~socket.SOCK_DGRAM`. *protocol_factory* must be a callable returning a :ref:`protocol ` instance. This method is a :ref:`coroutine ` which will try to @@ -409,25 +410,6 @@ Creating connections Availability: UNIX. -.. coroutinemethod:: AbstractEventLoop.create_unix_datagram_connection(protocol_factory, path=None, \*, sock=None) - - Create UNIX connection: socket family :py:data:`~socket.AF_UNIX`, socket - type :py:data:`~socket.SOCK_DGRAM`. The :py:data:`~socket.AF_UNIX` socket - family is used to communicate between processes on the same machine - efficiently. - - This method is a :ref:`coroutine ` which will try to - establish the connection in the background. When successful, the - coroutine returns a ``(transport, protocol)`` pair. - - *path* is the name of a UNIX domain socket, and is required unless a *sock* - parameter is specified. Abstract UNIX sockets, :class:`str`, and - :class:`bytes` paths are supported. - - See the :meth:`AbstractEventLoop.create_connection` method for parameters. - - Availability: UNIX. - Creating listening connections ------------------------------ @@ -495,16 +477,7 @@ Creating listening connections .. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None) Similar to :meth:`AbstractEventLoop.create_server`, but specific to the - socket family :py:data:`~socket.AF_UNIX`, socket type :py:data:`~socket.SOCK_STREAM`. - - This method is a :ref:`coroutine `. - - Availability: UNIX. - -.. coroutinemethod:: AbstractEventLoop.create_unix_datagram_server(protocol_factory, path=None, \*, sock=None) - - Similar to :meth:`AbstractEventLoop.create_server`, but specific to the - socket family :py:data:`~socket.AF_UNIX`, socket type :py:data:`~socket.SOCK_DGRAM`. + socket family :py:data:`~socket.AF_UNIX`. This method is a :ref:`coroutine `. diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 6af91374ecface..562ba2abe60985 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -378,8 +378,8 @@ def create_datagram_endpoint(self, protocol_factory, protocol_factory must be a callable returning a protocol instance. - socket family AF_INET or socket.AF_INET6 depending on host (or - family if specified), socket type SOCK_DGRAM. + socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on + host (or family if specified), socket type SOCK_DGRAM. reuse_address tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index 0d99ce55df4d40..80fcac9a82dfb2 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -1,7 +1,7 @@ """Abstract Protocol class.""" __all__ = ['BaseProtocol', 'Protocol', 'DatagramProtocol', - 'SubprocessProtocol', 'UnixDatagramProtocol'] + 'SubprocessProtocol'] class BaseProtocol: @@ -113,19 +113,6 @@ def error_received(self, exc): """ -class UnixDatagramProtocol(BaseProtocol): - """Interface for unix socket datagram protocol.""" - - def datagram_received(self, data): - """Called when some datagram is received.""" - - def error_received(self, exc): - """Called when a send or receive operation raises an OSError. - - (Other than BlockingIOError or InterruptedError.) - """ - - class SubprocessProtocol(BaseProtocol): """Interface for protocol for subprocess calls.""" diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py index 1edbd46160f596..a94079f433adfc 100644 --- a/Lib/asyncio/transports.py +++ b/Lib/asyncio/transports.py @@ -88,7 +88,7 @@ def get_write_buffer_size(self): """Return the current size of the write buffer.""" raise NotImplementedError - def nuwrite(self, data): + def write(self, data): """Write some data bytes to the transport. This does not block; it buffers the data and arranges for it From ff759759b87f0e0433335cb789cb98c3f166d454 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 24 Aug 2017 16:35:47 +0200 Subject: [PATCH 13/14] verify socket.AF_UNIX is defined --- Lib/asyncio/base_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index c594579ac42425..53b83e9a26268f 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -859,7 +859,7 @@ def create_datagram_endpoint(self, protocol_factory, if family == 0: raise ValueError('unexpected address family') addr_pairs_info = (((family, proto), (None, None)),) - elif family == socket.AF_UNIX: + elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: for addr in (local_addr, remote_addr): if addr is not None: assert isinstance(addr, str) From 16b76091500557eed45000669ac7b8f18efb9514 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Mon, 30 Oct 2017 10:02:21 +0100 Subject: [PATCH 14/14] Raise TypeError if addr is not string --- Lib/asyncio/base_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 53b83e9a26268f..2a5a4f98952be8 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -861,8 +861,8 @@ def create_datagram_endpoint(self, protocol_factory, addr_pairs_info = (((family, proto), (None, None)),) elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: for addr in (local_addr, remote_addr): - if addr is not None: - assert isinstance(addr, str) + if addr is not None and not isistance(addr, str): + raise TypeError('string is expected') addr_pairs_info = (((family, proto), (local_addr, remote_addr)), ) else: