Skip to content

Commit 6c15325

Browse files
authored
feat: speed up processing incoming packets (#1352)
1 parent a014c7c commit 6c15325

File tree

7 files changed

+47
-47
lines changed

7 files changed

+47
-47
lines changed

src/zeroconf/_cache.pxd

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@ cdef class DNSCache:
2626

2727
cpdef bint async_add_records(self, object entries)
2828

29-
cpdef async_remove_records(self, object entries)
29+
cpdef void async_remove_records(self, object entries)
3030

3131
@cython.locals(
3232
store=cython.dict,
3333
)
34-
cpdef async_get_unique(self, DNSRecord entry)
34+
cpdef DNSRecord async_get_unique(self, DNSRecord entry)
3535

3636
@cython.locals(
3737
record=DNSRecord,
3838
)
39-
cpdef async_expire(self, double now)
39+
cpdef list async_expire(self, double now)
4040

4141
@cython.locals(
4242
records=cython.dict,
4343
record=DNSRecord,
4444
)
45-
cpdef async_all_by_details(self, str name, object type_, object class_)
45+
cpdef list async_all_by_details(self, str name, object type_, object class_)
4646

4747
cpdef cython.dict async_entries_with_name(self, str name)
4848

@@ -51,7 +51,7 @@ cdef class DNSCache:
5151
@cython.locals(
5252
cached_entry=DNSRecord,
5353
)
54-
cpdef get_by_details(self, str name, object type_, object class_)
54+
cpdef DNSRecord get_by_details(self, str name, object type_, object class_)
5555

5656
@cython.locals(
5757
records=cython.dict,
@@ -62,12 +62,12 @@ cdef class DNSCache:
6262
@cython.locals(
6363
store=cython.dict,
6464
)
65-
cdef _async_add(self, DNSRecord record)
65+
cdef bint _async_add(self, DNSRecord record)
6666

67-
cdef _async_remove(self, DNSRecord record)
67+
cdef void _async_remove(self, DNSRecord record)
6868

6969
@cython.locals(
7070
record=DNSRecord,
7171
created_double=double,
7272
)
73-
cpdef async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now)
73+
cpdef void async_mark_unique_records_older_than_1s_to_expire(self, cython.set unique_types, object answers, double now)

src/zeroconf/_core.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,16 +182,17 @@ def __init__(
182182
self.registry = ServiceRegistry()
183183
self.cache = DNSCache()
184184
self.question_history = QuestionHistory()
185+
186+
self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
187+
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)
188+
185189
self.query_handler = QueryHandler(self)
186190
self.record_manager = RecordManager(self)
187191

188192
self._notify_futures: Set[asyncio.Future] = set()
189193
self.loop: Optional[asyncio.AbstractEventLoop] = None
190194
self._loop_thread: Optional[threading.Thread] = None
191195

192-
self.out_queue = MulticastOutgoingQueue(self, 0, _AGGREGATION_DELAY)
193-
self.out_delay_queue = MulticastOutgoingQueue(self, _ONE_SECOND, _PROTECTED_AGGREGATION_DELAY)
194-
195196
self.start()
196197

197198
@property

src/zeroconf/_handlers/answers.pxd

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ from .._protocol.outgoing cimport DNSOutgoing
77

88
cdef class QuestionAnswers:
99

10-
cdef public object ucast
11-
cdef public object mcast_now
12-
cdef public object mcast_aggregate
13-
cdef public object mcast_aggregate_last_second
10+
cdef public dict ucast
11+
cdef public dict mcast_now
12+
cdef public dict mcast_aggregate
13+
cdef public dict mcast_aggregate_last_second
1414

1515

1616
cdef class AnswerGroup:
@@ -25,11 +25,11 @@ cdef class AnswerGroup:
2525
cdef object _FLAGS_QR_RESPONSE_AA
2626
cdef object NAME_GETTER
2727

28-
cpdef construct_outgoing_multicast_answers(cython.dict answers)
28+
cpdef DNSOutgoing construct_outgoing_multicast_answers(cython.dict answers)
2929

30-
cpdef construct_outgoing_unicast_answers(
30+
cpdef DNSOutgoing construct_outgoing_unicast_answers(
3131
cython.dict answers, bint ucast_source, cython.list questions, object id_
3232
)
3333

3434
@cython.locals(answer=DNSRecord, additionals=cython.set, additional=DNSRecord)
35-
cdef _add_answers_additionals(DNSOutgoing out, cython.dict answers)
35+
cdef void _add_answers_additionals(DNSOutgoing out, cython.dict answers)

src/zeroconf/_handlers/multicast_outgoing_queue.pxd

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ cdef class MulticastOutgoingQueue:
1919
cdef object _aggregation_delay
2020

2121
@cython.locals(last_group=AnswerGroup, random_int=cython.uint)
22-
cpdef async_add(self, double now, cython.dict answers)
22+
cpdef void async_add(self, double now, cython.dict answers)
2323

2424
@cython.locals(pending=AnswerGroup)
25-
cdef _remove_answers_from_queue(self, cython.dict answers)
25+
cdef void _remove_answers_from_queue(self, cython.dict answers)
2626

27-
cpdef async_ready(self)
27+
cpdef void async_ready(self)

src/zeroconf/_handlers/query_handler.pxd

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ cdef class _QueryResponse:
5353
cdef cython.set _mcast_aggregate_last_second
5454

5555
@cython.locals(record=DNSRecord)
56-
cdef add_qu_question_response(self, cython.dict answers)
56+
cdef void add_qu_question_response(self, cython.dict answers)
5757

58-
cdef add_ucast_question_response(self, cython.dict answers)
58+
cdef void add_ucast_question_response(self, cython.dict answers)
5959

6060
@cython.locals(answer=DNSRecord, question=DNSQuestion)
61-
cdef add_mcast_question_response(self, cython.dict answers)
61+
cdef void add_mcast_question_response(self, cython.dict answers)
6262

6363
@cython.locals(maybe_entry=DNSRecord)
6464
cdef bint _has_mcast_within_one_quarter_ttl(self, DNSRecord record)
@@ -74,15 +74,17 @@ cdef class QueryHandler:
7474
cdef ServiceRegistry registry
7575
cdef DNSCache cache
7676
cdef QuestionHistory question_history
77+
cdef MulticastOutgoingQueue out_queue
78+
cdef MulticastOutgoingQueue out_delay_queue
7779

7880
@cython.locals(service=ServiceInfo)
79-
cdef _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers)
81+
cdef void _add_service_type_enumeration_query_answers(self, list types, cython.dict answer_set, DNSRRSet known_answers)
8082

8183
@cython.locals(service=ServiceInfo)
82-
cdef _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers)
84+
cdef void _add_pointer_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers)
8385

8486
@cython.locals(service=ServiceInfo, dns_address=DNSAddress)
85-
cdef _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_)
87+
cdef void _add_address_answers(self, list services, cython.dict answer_set, DNSRRSet known_answers, cython.uint type_)
8688

8789
@cython.locals(question_lower_name=str, type_=cython.uint, service=ServiceInfo)
8890
cdef cython.dict _answer_question(self, DNSQuestion question, unsigned int strategy_type, list types, list services, DNSRRSet known_answers)
@@ -102,13 +104,11 @@ cdef class QueryHandler:
102104
cpdef QuestionAnswers async_response(self, cython.list msgs, cython.bint unicast_source)
103105

104106
@cython.locals(name=str, question_lower_name=str)
105-
cdef _get_answer_strategies(self, DNSQuestion question)
107+
cdef list _get_answer_strategies(self, DNSQuestion question)
106108

107109
@cython.locals(
108110
first_packet=DNSIncoming,
109111
ucast_source=bint,
110-
out_queue=MulticastOutgoingQueue,
111-
out_delay_queue=MulticastOutgoingQueue
112112
)
113113
cpdef void handle_assembled_query(
114114
self,

src/zeroconf/_handlers/query_handler.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,16 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool:
191191
class QueryHandler:
192192
"""Query the ServiceRegistry."""
193193

194-
__slots__ = ("zc", "registry", "cache", "question_history")
194+
__slots__ = ("zc", "registry", "cache", "question_history", "out_queue", "out_delay_queue")
195195

196196
def __init__(self, zc: 'Zeroconf') -> None:
197197
"""Init the query handler."""
198198
self.zc = zc
199199
self.registry = zc.registry
200200
self.cache = zc.cache
201201
self.question_history = zc.question_history
202+
self.out_queue = zc.out_queue
203+
self.out_delay_queue = zc.out_delay_queue
202204

203205
def _add_service_type_enumeration_query_answers(
204206
self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet
@@ -301,7 +303,7 @@ def async_response( # pylint: disable=unused-argument
301303
"""
302304
strategies: List[_AnswerStrategy] = []
303305
for msg in msgs:
304-
for question in msg.questions:
306+
for question in msg._questions:
305307
strategies.extend(self._get_answer_strategies(question))
306308

307309
if not strategies:
@@ -311,7 +313,8 @@ def async_response( # pylint: disable=unused-argument
311313
return None
312314

313315
is_probe = False
314-
questions = msg.questions
316+
msg = msgs[0]
317+
questions = msg._questions
315318
# Only decode known answers if we are not a probe and we have
316319
# at least one answer strategy
317320
answers: List[DNSRecord] = []
@@ -321,7 +324,6 @@ def async_response( # pylint: disable=unused-argument
321324
else:
322325
answers.extend(msg.answers())
323326

324-
msg = msgs[0]
325327
query_res = _QueryResponse(self.cache, questions, is_probe, msg.now)
326328
known_answers = DNSRRSet(answers)
327329
known_answers_set: Optional[Set[DNSRecord]] = None
@@ -412,13 +414,12 @@ def handle_assembled_query(
412414
packet will be in packets.
413415
"""
414416
first_packet = packets[0]
415-
now = first_packet.now
416417
ucast_source = port != _MDNS_PORT
417418
question_answers = self.async_response(packets, ucast_source)
418-
if not question_answers:
419+
if question_answers is None:
419420
return
420421
if question_answers.ucast:
421-
questions = first_packet.questions
422+
questions = first_packet._questions
422423
id_ = first_packet.id
423424
out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_)
424425
# When sending unicast, only send back the reply
@@ -428,11 +429,9 @@ def handle_assembled_query(
428429
if question_answers.mcast_now:
429430
self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now))
430431
if question_answers.mcast_aggregate:
431-
out_queue = self.zc.out_queue
432-
out_queue.async_add(now, question_answers.mcast_aggregate)
432+
self.out_queue.async_add(first_packet.now, question_answers.mcast_aggregate)
433433
if question_answers.mcast_aggregate_last_second:
434434
# https://datatracker.ietf.org/doc/html/rfc6762#section-14
435435
# If we broadcast it in the last second, we have to delay
436436
# at least a second before we send it again
437-
out_delay_queue = self.zc.out_delay_queue
438-
out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second)
437+
self.out_delay_queue.async_add(first_packet.now, question_answers.mcast_aggregate_last_second)

src/zeroconf/_protocol/incoming.pxd

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ cdef class DNSIncoming:
7070

7171
cpdef bint is_probe(self)
7272

73-
cpdef answers(self)
73+
cpdef list answers(self)
7474

7575
cpdef bint is_response(self)
7676

@@ -86,16 +86,16 @@ cdef class DNSIncoming:
8686
cdef unsigned int _decode_labels_at_offset(self, unsigned int off, cython.list labels, cython.set seen_pointers)
8787

8888
@cython.locals(offset="unsigned int")
89-
cdef _read_header(self)
89+
cdef void _read_header(self)
9090

91-
cdef _initial_parse(self)
91+
cdef void _initial_parse(self)
9292

9393
@cython.locals(
9494
end="unsigned int",
9595
length="unsigned int",
9696
offset="unsigned int"
9797
)
98-
cdef _read_others(self)
98+
cdef void _read_others(self)
9999

100100
@cython.locals(offset="unsigned int")
101101
cdef _read_questions(self)
@@ -123,6 +123,6 @@ cdef class DNSIncoming:
123123
i="unsigned int",
124124
bitmap_length="unsigned int",
125125
)
126-
cdef _read_bitmap(self, unsigned int end)
126+
cdef list _read_bitmap(self, unsigned int end)
127127

128-
cdef _read_name(self)
128+
cdef str _read_name(self)

0 commit comments

Comments
 (0)