@@ -71,11 +71,13 @@ def _encode_message_set(cls, messages):
71
71
Offset => int64
72
72
MessageSize => int32
73
73
"""
74
- message_set = b""
74
+ message_set = []
75
75
for message in messages :
76
76
encoded_message = KafkaProtocol ._encode_message (message )
77
- message_set += struct .pack ('>qi%ds' % len (encoded_message ), 0 , len (encoded_message ), encoded_message )
78
- return message_set
77
+ message_set .append (struct .pack ('>qi%ds' % len (encoded_message ), 0 ,
78
+ len (encoded_message ),
79
+ encoded_message ))
80
+ return b'' .join (message_set )
79
81
80
82
@classmethod
81
83
def _encode_message (cls , message ):
@@ -95,9 +97,11 @@ def _encode_message(cls, message):
95
97
Value => bytes
96
98
"""
97
99
if message .magic == 0 :
98
- msg = struct .pack ('>BB' , message .magic , message .attributes )
99
- msg += write_int_string (message .key )
100
- msg += write_int_string (message .value )
100
+ msg = b'' .join ([
101
+ struct .pack ('>BB' , message .magic , message .attributes ),
102
+ write_int_string (message .key ),
103
+ write_int_string (message .value )
104
+ ])
101
105
crc = crc32 (msg )
102
106
msg = struct .pack ('>I%ds' % len (msg ), crc , msg )
103
107
else :
@@ -197,21 +201,24 @@ def encode_produce_request(cls, client_id, correlation_id,
197
201
payloads = [] if payloads is None else payloads
198
202
grouped_payloads = group_by_topic_and_partition (payloads )
199
203
200
- message = cls ._encode_message_header (client_id , correlation_id ,
201
- KafkaProtocol .PRODUCE_KEY )
204
+ message = []
205
+ message .append (cls ._encode_message_header (client_id , correlation_id ,
206
+ KafkaProtocol .PRODUCE_KEY ))
202
207
203
- message += struct .pack ('>hii' , acks , timeout , len (grouped_payloads ))
208
+ message .append (struct .pack ('>hii' , acks , timeout ,
209
+ len (grouped_payloads )))
204
210
205
211
for topic , topic_payloads in grouped_payloads .items ():
206
- message += struct .pack ('>h%dsi' % len (topic ),
207
- len ( topic ), topic , len (topic_payloads ))
212
+ message . append ( struct .pack ('>h%dsi' % len (topic ), len ( topic ), topic ,
213
+ len (topic_payloads ) ))
208
214
209
215
for partition , payload in topic_payloads .items ():
210
216
msg_set = KafkaProtocol ._encode_message_set (payload .messages )
211
- message += struct .pack ('>ii%ds' % len (msg_set ), partition ,
212
- len (msg_set ), msg_set )
217
+ message . append ( struct .pack ('>ii%ds' % len (msg_set ), partition ,
218
+ len (msg_set ), msg_set ) )
213
219
214
- return struct .pack ('>i%ds' % len (message ), len (message ), message )
220
+ msg = b'' .join (message )
221
+ return struct .pack ('>i%ds' % len (msg ), len (msg ), msg )
215
222
216
223
@classmethod
217
224
def decode_produce_response (cls , data ):
@@ -254,21 +261,23 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
254
261
payloads = [] if payloads is None else payloads
255
262
grouped_payloads = group_by_topic_and_partition (payloads )
256
263
257
- message = cls ._encode_message_header (client_id , correlation_id ,
258
- KafkaProtocol .FETCH_KEY )
264
+ message = []
265
+ message .append (cls ._encode_message_header (client_id , correlation_id ,
266
+ KafkaProtocol .FETCH_KEY ))
259
267
260
268
# -1 is the replica id
261
- message += struct .pack ('>iiii' , - 1 , max_wait_time , min_bytes ,
262
- len (grouped_payloads ))
269
+ message . append ( struct .pack ('>iiii' , - 1 , max_wait_time , min_bytes ,
270
+ len (grouped_payloads ) ))
263
271
264
272
for topic , topic_payloads in grouped_payloads .items ():
265
- message += write_short_string (topic )
266
- message += struct .pack ('>i' , len (topic_payloads ))
273
+ message . append ( write_short_string (topic ) )
274
+ message . append ( struct .pack ('>i' , len (topic_payloads ) ))
267
275
for partition , payload in topic_payloads .items ():
268
- message += struct .pack ('>iqi' , partition , payload .offset ,
269
- payload .max_bytes )
276
+ message . append ( struct .pack ('>iqi' , partition , payload .offset ,
277
+ payload .max_bytes ) )
270
278
271
- return struct .pack ('>i%ds' % len (message ), len (message ), message )
279
+ msg = b'' .join (message )
280
+ return struct .pack ('>i%ds' % len (msg ), len (msg ), msg )
272
281
273
282
@classmethod
274
283
def decode_fetch_response (cls , data ):
@@ -301,21 +310,23 @@ def encode_offset_request(cls, client_id, correlation_id, payloads=None):
301
310
payloads = [] if payloads is None else payloads
302
311
grouped_payloads = group_by_topic_and_partition (payloads )
303
312
304
- message = cls ._encode_message_header (client_id , correlation_id ,
305
- KafkaProtocol .OFFSET_KEY )
313
+ message = []
314
+ message .append (cls ._encode_message_header (client_id , correlation_id ,
315
+ KafkaProtocol .OFFSET_KEY ))
306
316
307
317
# -1 is the replica id
308
- message += struct .pack ('>ii' , - 1 , len (grouped_payloads ))
318
+ message . append ( struct .pack ('>ii' , - 1 , len (grouped_payloads ) ))
309
319
310
320
for topic , topic_payloads in grouped_payloads .items ():
311
- message += write_short_string (topic )
312
- message += struct .pack ('>i' , len (topic_payloads ))
321
+ message . append ( write_short_string (topic ) )
322
+ message . append ( struct .pack ('>i' , len (topic_payloads ) ))
313
323
314
324
for partition , payload in topic_payloads .items ():
315
- message += struct .pack ('>iqi' , partition , payload .time ,
316
- payload .max_offsets )
325
+ message . append ( struct .pack ('>iqi' , partition , payload .time ,
326
+ payload .max_offsets ) )
317
327
318
- return struct .pack ('>i%ds' % len (message ), len (message ), message )
328
+ msg = b'' .join (message )
329
+ return struct .pack ('>i%ds' % len (msg ), len (msg ), msg )
319
330
320
331
@classmethod
321
332
def decode_offset_response (cls , data ):
@@ -360,15 +371,17 @@ def encode_metadata_request(cls, client_id, correlation_id, topics=None,
360
371
else :
361
372
topics = payloads
362
373
363
- message = cls ._encode_message_header (client_id , correlation_id ,
364
- KafkaProtocol .METADATA_KEY )
374
+ message = []
375
+ message .append (cls ._encode_message_header (client_id , correlation_id ,
376
+ KafkaProtocol .METADATA_KEY ))
365
377
366
- message += struct .pack ('>i' , len (topics ))
378
+ message . append ( struct .pack ('>i' , len (topics ) ))
367
379
368
380
for topic in topics :
369
- message += struct .pack ('>h%ds' % len (topic ), len (topic ), topic )
381
+ message . append ( struct .pack ('>h%ds' % len (topic ), len (topic ), topic ) )
370
382
371
- return write_int_string (message )
383
+ msg = b'' .join (message )
384
+ return write_int_string (msg )
372
385
373
386
@classmethod
374
387
def decode_metadata_response (cls , data ):
@@ -435,20 +448,22 @@ def encode_offset_commit_request(cls, client_id, correlation_id,
435
448
"""
436
449
grouped_payloads = group_by_topic_and_partition (payloads )
437
450
438
- message = cls ._encode_message_header (client_id , correlation_id ,
439
- KafkaProtocol .OFFSET_COMMIT_KEY )
440
- message += write_short_string (group )
441
- message += struct .pack ('>i' , len (grouped_payloads ))
451
+ message = []
452
+ message .append (cls ._encode_message_header (client_id , correlation_id ,
453
+ KafkaProtocol .OFFSET_COMMIT_KEY ))
454
+ message .append (write_short_string (group ))
455
+ message .append (struct .pack ('>i' , len (grouped_payloads )))
442
456
443
457
for topic , topic_payloads in grouped_payloads .items ():
444
- message += write_short_string (topic )
445
- message += struct .pack ('>i' , len (topic_payloads ))
458
+ message . append ( write_short_string (topic ) )
459
+ message . append ( struct .pack ('>i' , len (topic_payloads ) ))
446
460
447
461
for partition , payload in topic_payloads .items ():
448
- message += struct .pack ('>iq' , partition , payload .offset )
449
- message += write_short_string (payload .metadata )
462
+ message . append ( struct .pack ('>iq' , partition , payload .offset ) )
463
+ message . append ( write_short_string (payload .metadata ) )
450
464
451
- return struct .pack ('>i%ds' % len (message ), len (message ), message )
465
+ msg = b'' .join (message )
466
+ return struct .pack ('>i%ds' % len (msg ), len (msg ), msg )
452
467
453
468
@classmethod
454
469
def decode_offset_commit_response (cls , data ):
@@ -484,20 +499,23 @@ def encode_offset_fetch_request(cls, client_id, correlation_id,
484
499
payloads: list of OffsetFetchRequest
485
500
"""
486
501
grouped_payloads = group_by_topic_and_partition (payloads )
487
- message = cls ._encode_message_header (client_id , correlation_id ,
488
- KafkaProtocol .OFFSET_FETCH_KEY )
489
502
490
- message += write_short_string (group )
491
- message += struct .pack ('>i' , len (grouped_payloads ))
503
+ message = []
504
+ message .append (cls ._encode_message_header (client_id , correlation_id ,
505
+ KafkaProtocol .OFFSET_FETCH_KEY ))
506
+
507
+ message .append (write_short_string (group ))
508
+ message .append (struct .pack ('>i' , len (grouped_payloads )))
492
509
493
510
for topic , topic_payloads in grouped_payloads .items ():
494
- message += write_short_string (topic )
495
- message += struct .pack ('>i' , len (topic_payloads ))
511
+ message . append ( write_short_string (topic ) )
512
+ message . append ( struct .pack ('>i' , len (topic_payloads ) ))
496
513
497
514
for partition , payload in topic_payloads .items ():
498
- message += struct .pack ('>i' , partition )
515
+ message . append ( struct .pack ('>i' , partition ) )
499
516
500
- return struct .pack ('>i%ds' % len (message ), len (message ), message )
517
+ msg = b'' .join (message )
518
+ return struct .pack ('>i%ds' % len (msg ), len (msg ), msg )
501
519
502
520
@classmethod
503
521
def decode_offset_fetch_response (cls , data ):
0 commit comments