Skip to content

Commit c4077dd

Browse files
authored
feat: reduce overhead to send responses (#1135)
1 parent d45c2f9 commit c4077dd

File tree

1 file changed

+64
-24
lines changed

1 file changed

+64
-24
lines changed

src/zeroconf/_core.py

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import sys
2929
import threading
3030
from types import TracebackType # noqa # used in type hints
31-
from typing import Awaitable, Dict, List, Optional, Tuple, Type, Union, cast
31+
from typing import Any, Awaitable, Dict, List, Optional, Tuple, Type, Union, cast
3232

3333
from ._cache import DNSCache
3434
from ._dns import DNSQuestion, DNSQuestionType
@@ -105,6 +105,48 @@
105105
_REGISTER_BROADCASTS = 3
106106

107107

108+
class _WrappedTransport:
109+
"""A wrapper for transports."""
110+
111+
__slots__ = (
112+
'transport',
113+
'is_ipv6',
114+
'sock',
115+
'fileno',
116+
'sock_name',
117+
)
118+
119+
def __init__(
120+
self,
121+
transport: asyncio.DatagramTransport,
122+
is_ipv6: bool,
123+
sock: socket.socket,
124+
fileno: int,
125+
sock_name: Any,
126+
) -> None:
127+
"""Initialize the wrapped transport.
128+
129+
These attributes are used when sending packets.
130+
"""
131+
self.transport = transport
132+
self.is_ipv6 = is_ipv6
133+
self.sock = sock
134+
self.fileno = fileno
135+
self.sock_name = sock_name
136+
137+
138+
def _make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTransport:
139+
"""Make a wrapped transport."""
140+
sock: socket.socket = transport.get_extra_info('socket')
141+
return _WrappedTransport(
142+
transport=transport,
143+
is_ipv6=sock.family == socket.AF_INET6,
144+
sock=sock,
145+
fileno=sock.fileno(),
146+
sock_name=sock.getsockname(),
147+
)
148+
149+
108150
class AsyncEngine:
109151
"""An engine wraps sockets in the event loop."""
110152

@@ -117,8 +159,8 @@ def __init__(
117159
self.loop: Optional[asyncio.AbstractEventLoop] = None
118160
self.zc = zeroconf
119161
self.protocols: List[AsyncListener] = []
120-
self.readers: List[asyncio.DatagramTransport] = []
121-
self.senders: List[asyncio.DatagramTransport] = []
162+
self.readers: List[_WrappedTransport] = []
163+
self.senders: List[_WrappedTransport] = []
122164
self.running_event: Optional[asyncio.Event] = None
123165
self._listen_socket = listen_socket
124166
self._respond_sockets = respond_sockets
@@ -158,9 +200,9 @@ async def _async_create_endpoints(self) -> None:
158200
for s in reader_sockets:
159201
transport, protocol = await loop.create_datagram_endpoint(lambda: AsyncListener(self.zc), sock=s)
160202
self.protocols.append(cast(AsyncListener, protocol))
161-
self.readers.append(cast(asyncio.DatagramTransport, transport))
203+
self.readers.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport)))
162204
if s in sender_sockets:
163-
self.senders.append(cast(asyncio.DatagramTransport, transport))
205+
self.senders.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport)))
164206

165207
def _async_cache_cleanup(self) -> None:
166208
"""Periodic cache cleanup."""
@@ -186,8 +228,8 @@ def _async_shutdown(self) -> None:
186228
"""Shutdown transports and sockets."""
187229
assert self.running_event is not None
188230
self.running_event.clear()
189-
for transport in itertools.chain(self.senders, self.readers):
190-
transport.close()
231+
for wrapped_transport in itertools.chain(self.senders, self.readers):
232+
wrapped_transport.transport.close()
191233

192234
def close(self) -> None:
193235
"""Close from sync context.
@@ -221,7 +263,7 @@ def __init__(self, zc: 'Zeroconf') -> None:
221263
self.zc = zc
222264
self.data: Optional[bytes] = None
223265
self.last_time: float = 0
224-
self.transport: Optional[asyncio.DatagramTransport] = None
266+
self.transport: Optional[_WrappedTransport] = None
225267
self.sock_description: Optional[str] = None
226268
self._deferred: Dict[str, List[DNSIncoming]] = {}
227269
self._timers: Dict[str, asyncio.TimerHandle] = {}
@@ -309,7 +351,7 @@ def handle_query_or_defer(
309351
msg: DNSIncoming,
310352
addr: str,
311353
port: int,
312-
transport: asyncio.DatagramTransport,
354+
transport: _WrappedTransport,
313355
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
314356
) -> None:
315357
"""Deal with incoming query packets. Provides a response if
@@ -341,7 +383,7 @@ def _respond_query(
341383
msg: Optional[DNSIncoming],
342384
addr: str,
343385
port: int,
344-
transport: asyncio.DatagramTransport,
386+
transport: _WrappedTransport,
345387
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
346388
) -> None:
347389
"""Respond to a query and reassemble any truncated deferred packets."""
@@ -362,27 +404,25 @@ def error_received(self, exc: Exception) -> None:
362404
self.log_exception_once(exc, msg_str, exc)
363405

364406
def connection_made(self, transport: asyncio.BaseTransport) -> None:
365-
self.transport = cast(asyncio.DatagramTransport, transport)
366-
sock_name = self.transport.get_extra_info('sockname')
367-
sock_fileno = self.transport.get_extra_info('socket').fileno()
368-
self.sock_description = f"{sock_fileno} ({sock_name})"
407+
wrapped_transport = _make_wrapped_transport(cast(asyncio.DatagramTransport, transport))
408+
self.transport = wrapped_transport
409+
self.sock_description = f"{wrapped_transport.fileno} ({wrapped_transport.sock_name})"
369410

370411
def connection_lost(self, exc: Optional[Exception]) -> None:
371412
"""Handle connection lost."""
372413

373414

374415
def async_send_with_transport(
375416
log_debug: bool,
376-
transport: asyncio.DatagramTransport,
417+
transport: _WrappedTransport,
377418
packet: bytes,
378419
packet_num: int,
379420
out: DNSOutgoing,
380421
addr: Optional[str],
381422
port: int,
382423
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
383424
) -> None:
384-
s = transport.get_extra_info('socket')
385-
ipv6_socket = s.family == socket.AF_INET6
425+
ipv6_socket = transport.is_ipv6
386426
if addr is None:
387427
real_addr = _MDNS_ADDR6 if ipv6_socket else _MDNS_ADDR
388428
else:
@@ -394,8 +434,8 @@ def async_send_with_transport(
394434
'Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...',
395435
real_addr,
396436
port or _MDNS_PORT,
397-
s.fileno(),
398-
transport.get_extra_info('sockname'),
437+
transport.fileno,
438+
transport.sock_name,
399439
len(packet),
400440
packet_num + 1,
401441
out,
@@ -404,9 +444,9 @@ def async_send_with_transport(
404444
# Get flowinfo and scopeid for the IPV6 socket to create a complete IPv6
405445
# address tuple: https://docs.python.org/3.6/library/socket.html#socket-families
406446
if ipv6_socket and not v6_flow_scope:
407-
_, _, sock_flowinfo, sock_scopeid = s.getsockname()
447+
_, _, sock_flowinfo, sock_scopeid = transport.sock_name
408448
v6_flow_scope = (sock_flowinfo, sock_scopeid)
409-
transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
449+
transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
410450

411451

412452
class Zeroconf(QuietLogger):
@@ -832,7 +872,7 @@ def handle_assembled_query(
832872
packets: List[DNSIncoming],
833873
addr: str,
834874
port: int,
835-
transport: asyncio.DatagramTransport,
875+
transport: _WrappedTransport,
836876
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
837877
) -> None:
838878
"""Respond to a (re)assembled query.
@@ -870,7 +910,7 @@ def send(
870910
addr: Optional[str] = None,
871911
port: int = _MDNS_PORT,
872912
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
873-
transport: Optional[asyncio.DatagramTransport] = None,
913+
transport: Optional[_WrappedTransport] = None,
874914
) -> None:
875915
"""Sends an outgoing packet threadsafe."""
876916
assert self.loop is not None
@@ -882,7 +922,7 @@ def async_send(
882922
addr: Optional[str] = None,
883923
port: int = _MDNS_PORT,
884924
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
885-
transport: Optional[asyncio.DatagramTransport] = None,
925+
transport: Optional[_WrappedTransport] = None,
886926
) -> None:
887927
"""Sends an outgoing packet."""
888928
if self.done:

0 commit comments

Comments
 (0)