Skip to content

Commit 66b6b4a

Browse files
author
Dana Powers
committed
Fix KafkaClient request / response ordering
1 parent 6117265 commit 66b6b4a

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

kafka/client.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
138138
Arguments:
139139
140140
payloads: list of object-like entities with a topic (str) and
141-
partition (int) attribute
141+
partition (int) attribute; payloads with duplicate topic-partitions
142+
are not supported.
142143
143144
encode_fn: a method to encode the list of payloads to a request body,
144145
must accept client_id, correlation_id, and payloads as
@@ -152,6 +153,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
152153
153154
List of response objects in the same order as the supplied payloads
154155
"""
156+
# encoders / decoders do not maintain ordering currently
157+
# so we need to keep this so we can rebuild order before returning
158+
original_ordering = [(p.topic, p.partition) for p in payloads]
159+
155160
# Group the requests by topic+partition
156161
brokers_for_payloads = []
157162
payloads_by_broker = collections.defaultdict(list)
@@ -165,7 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
165170

166171
# For each broker, send the list of request payloads
167172
# and collect the responses and errors
168-
responses_by_broker = collections.defaultdict(list)
173+
responses = {}
169174
broker_failures = []
170175
for broker, payloads in payloads_by_broker.items():
171176
requestId = self._next_id()
@@ -184,7 +189,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
184189
'to server %s: %s', requestId, broker, e)
185190

186191
for payload in payloads:
187-
responses_by_broker[broker].append(FailedPayloadsError(payload))
192+
topic_partition = (payload.topic, payload.partition)
193+
responses[topic_partition] = FailedPayloadsError(payload)
188194

189195
# No exception, try to get response
190196
else:
@@ -196,7 +202,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
196202
log.debug('Request %s does not expect a response '
197203
'(skipping conn.recv)', requestId)
198204
for payload in payloads:
199-
responses_by_broker[broker].append(None)
205+
topic_partition = (payload.topic, payload.partition)
206+
responses[topic_partition] = None
200207
continue
201208

202209
try:
@@ -208,12 +215,17 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
208215
requestId, broker, e)
209216

210217
for payload in payloads:
211-
responses_by_broker[broker].append(FailedPayloadsError(payload))
218+
topic_partition = (payload.topic, payload.partition)
219+
responses[topic_partition] = FailedPayloadsError(payload)
212220

213221
else:
222+
_resps = []
214223
for payload_response in decoder_fn(response):
215-
responses_by_broker[broker].append(payload_response)
216-
log.debug('Response %s: %s', requestId, responses_by_broker[broker])
224+
topic_partition = (payload_response.topic,
225+
payload_response.partition)
226+
responses[topic_partition] = payload_response
227+
_resps.append(payload_response)
228+
log.debug('Response %s: %s', requestId, _resps)
217229

218230
# Connection errors generally mean stale metadata
219231
# although sometimes it means incorrect api request
@@ -223,9 +235,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
223235
self.reset_all_metadata()
224236

225237
# Return responses in the same order as provided
226-
responses_by_payload = [responses_by_broker[broker].pop(0)
227-
for broker in brokers_for_payloads]
228-
return responses_by_payload
238+
return [responses[tp] for tp in original_ordering]
229239

230240
def __repr__(self):
231241
return '<KafkaClient client_id=%s>' % (self.client_id)

0 commit comments

Comments
 (0)