@@ -138,7 +138,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
138
138
Arguments:
139
139
140
140
payloads: list of object-like entities with a topic (str) and
141
- partition (int) attribute
141
+ partition (int) attribute; payloads with duplicate topic-partitions
142
+ are not supported.
142
143
143
144
encode_fn: a method to encode the list of payloads to a request body,
144
145
must accept client_id, correlation_id, and payloads as
@@ -152,6 +153,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
152
153
153
154
List of response objects in the same order as the supplied payloads
154
155
"""
156
+ # encoders / decoders do not maintain ordering currently
157
+ # so we need to keep this so we can rebuild order before returning
158
+ original_ordering = [(p .topic , p .partition ) for p in payloads ]
159
+
155
160
# Group the requests by topic+partition
156
161
brokers_for_payloads = []
157
162
payloads_by_broker = collections .defaultdict (list )
@@ -165,7 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
165
170
166
171
# For each broker, send the list of request payloads
167
172
# and collect the responses and errors
168
- responses_by_broker = collections . defaultdict ( list )
173
+ responses = {}
169
174
broker_failures = []
170
175
for broker , payloads in payloads_by_broker .items ():
171
176
requestId = self ._next_id ()
@@ -184,7 +189,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
184
189
'to server %s: %s' , requestId , broker , e )
185
190
186
191
for payload in payloads :
187
- responses_by_broker [broker ].append (FailedPayloadsError (payload ))
192
+ topic_partition = (payload .topic , payload .partition )
193
+ responses [topic_partition ] = FailedPayloadsError (payload )
188
194
189
195
# No exception, try to get response
190
196
else :
@@ -196,7 +202,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
196
202
log .debug ('Request %s does not expect a response '
197
203
'(skipping conn.recv)' , requestId )
198
204
for payload in payloads :
199
- responses_by_broker [broker ].append (None )
205
+ topic_partition = (payload .topic , payload .partition )
206
+ responses [topic_partition ] = None
200
207
continue
201
208
202
209
try :
@@ -208,12 +215,17 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
208
215
requestId , broker , e )
209
216
210
217
for payload in payloads :
211
- responses_by_broker [broker ].append (FailedPayloadsError (payload ))
218
+ topic_partition = (payload .topic , payload .partition )
219
+ responses [topic_partition ] = FailedPayloadsError (payload )
212
220
213
221
else :
222
+ _resps = []
214
223
for payload_response in decoder_fn (response ):
215
- responses_by_broker [broker ].append (payload_response )
216
- log .debug ('Response %s: %s' , requestId , responses_by_broker [broker ])
224
+ topic_partition = (payload_response .topic ,
225
+ payload_response .partition )
226
+ responses [topic_partition ] = payload_response
227
+ _resps .append (payload_response )
228
+ log .debug ('Response %s: %s' , requestId , _resps )
217
229
218
230
# Connection errors generally mean stale metadata
219
231
# although sometimes it means incorrect api request
@@ -223,9 +235,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
223
235
self .reset_all_metadata ()
224
236
225
237
# Return responses in the same order as provided
226
- responses_by_payload = [responses_by_broker [broker ].pop (0 )
227
- for broker in brokers_for_payloads ]
228
- return responses_by_payload
238
+ return [responses [tp ] for tp in original_ordering ]
229
239
230
240
def __repr__ (self ):
231
241
return '<KafkaClient client_id=%s>' % (self .client_id )
0 commit comments