Skip to content

Commit 29f5619

Browse files
committed
Merge pull request dpkp#233 from dpkp/str_join_speedup
Improve string concatenation performance on pypy and python 3
2 parents 931670f + 55e377b commit 29f5619

File tree

3 files changed

+91
-71
lines changed

3 files changed

+91
-71
lines changed

kafka/protocol.py

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,13 @@ def _encode_message_set(cls, messages):
7171
Offset => int64
7272
MessageSize => int32
7373
"""
74-
message_set = b""
74+
message_set = []
7575
for message in messages:
7676
encoded_message = KafkaProtocol._encode_message(message)
77-
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
78-
return message_set
77+
message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
78+
len(encoded_message),
79+
encoded_message))
80+
return b''.join(message_set)
7981

8082
@classmethod
8183
def _encode_message(cls, message):
@@ -95,9 +97,11 @@ def _encode_message(cls, message):
9597
Value => bytes
9698
"""
9799
if message.magic == 0:
98-
msg = struct.pack('>BB', message.magic, message.attributes)
99-
msg += write_int_string(message.key)
100-
msg += write_int_string(message.value)
100+
msg = b''.join([
101+
struct.pack('>BB', message.magic, message.attributes),
102+
write_int_string(message.key),
103+
write_int_string(message.value)
104+
])
101105
crc = crc32(msg)
102106
msg = struct.pack('>I%ds' % len(msg), crc, msg)
103107
else:
@@ -197,21 +201,24 @@ def encode_produce_request(cls, client_id, correlation_id,
197201
payloads = [] if payloads is None else payloads
198202
grouped_payloads = group_by_topic_and_partition(payloads)
199203

200-
message = cls._encode_message_header(client_id, correlation_id,
201-
KafkaProtocol.PRODUCE_KEY)
204+
message = []
205+
message.append(cls._encode_message_header(client_id, correlation_id,
206+
KafkaProtocol.PRODUCE_KEY))
202207

203-
message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
208+
message.append(struct.pack('>hii', acks, timeout,
209+
len(grouped_payloads)))
204210

205211
for topic, topic_payloads in grouped_payloads.items():
206-
message += struct.pack('>h%dsi' % len(topic),
207-
len(topic), topic, len(topic_payloads))
212+
message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic,
213+
len(topic_payloads)))
208214

209215
for partition, payload in topic_payloads.items():
210216
msg_set = KafkaProtocol._encode_message_set(payload.messages)
211-
message += struct.pack('>ii%ds' % len(msg_set), partition,
212-
len(msg_set), msg_set)
217+
message.append(struct.pack('>ii%ds' % len(msg_set), partition,
218+
len(msg_set), msg_set))
213219

214-
return struct.pack('>i%ds' % len(message), len(message), message)
220+
msg = b''.join(message)
221+
return struct.pack('>i%ds' % len(msg), len(msg), msg)
215222

216223
@classmethod
217224
def decode_produce_response(cls, data):
@@ -254,21 +261,23 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
254261
payloads = [] if payloads is None else payloads
255262
grouped_payloads = group_by_topic_and_partition(payloads)
256263

257-
message = cls._encode_message_header(client_id, correlation_id,
258-
KafkaProtocol.FETCH_KEY)
264+
message = []
265+
message.append(cls._encode_message_header(client_id, correlation_id,
266+
KafkaProtocol.FETCH_KEY))
259267

260268
# -1 is the replica id
261-
message += struct.pack('>iiii', -1, max_wait_time, min_bytes,
262-
len(grouped_payloads))
269+
message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes,
270+
len(grouped_payloads)))
263271

264272
for topic, topic_payloads in grouped_payloads.items():
265-
message += write_short_string(topic)
266-
message += struct.pack('>i', len(topic_payloads))
273+
message.append(write_short_string(topic))
274+
message.append(struct.pack('>i', len(topic_payloads)))
267275
for partition, payload in topic_payloads.items():
268-
message += struct.pack('>iqi', partition, payload.offset,
269-
payload.max_bytes)
276+
message.append(struct.pack('>iqi', partition, payload.offset,
277+
payload.max_bytes))
270278

271-
return struct.pack('>i%ds' % len(message), len(message), message)
279+
msg = b''.join(message)
280+
return struct.pack('>i%ds' % len(msg), len(msg), msg)
272281

273282
@classmethod
274283
def decode_fetch_response(cls, data):
@@ -301,21 +310,23 @@ def encode_offset_request(cls, client_id, correlation_id, payloads=None):
301310
payloads = [] if payloads is None else payloads
302311
grouped_payloads = group_by_topic_and_partition(payloads)
303312

304-
message = cls._encode_message_header(client_id, correlation_id,
305-
KafkaProtocol.OFFSET_KEY)
313+
message = []
314+
message.append(cls._encode_message_header(client_id, correlation_id,
315+
KafkaProtocol.OFFSET_KEY))
306316

307317
# -1 is the replica id
308-
message += struct.pack('>ii', -1, len(grouped_payloads))
318+
message.append(struct.pack('>ii', -1, len(grouped_payloads)))
309319

310320
for topic, topic_payloads in grouped_payloads.items():
311-
message += write_short_string(topic)
312-
message += struct.pack('>i', len(topic_payloads))
321+
message.append(write_short_string(topic))
322+
message.append(struct.pack('>i', len(topic_payloads)))
313323

314324
for partition, payload in topic_payloads.items():
315-
message += struct.pack('>iqi', partition, payload.time,
316-
payload.max_offsets)
325+
message.append(struct.pack('>iqi', partition, payload.time,
326+
payload.max_offsets))
317327

318-
return struct.pack('>i%ds' % len(message), len(message), message)
328+
msg = b''.join(message)
329+
return struct.pack('>i%ds' % len(msg), len(msg), msg)
319330

320331
@classmethod
321332
def decode_offset_response(cls, data):
@@ -360,15 +371,17 @@ def encode_metadata_request(cls, client_id, correlation_id, topics=None,
360371
else:
361372
topics = payloads
362373

363-
message = cls._encode_message_header(client_id, correlation_id,
364-
KafkaProtocol.METADATA_KEY)
374+
message = []
375+
message.append(cls._encode_message_header(client_id, correlation_id,
376+
KafkaProtocol.METADATA_KEY))
365377

366-
message += struct.pack('>i', len(topics))
378+
message.append(struct.pack('>i', len(topics)))
367379

368380
for topic in topics:
369-
message += struct.pack('>h%ds' % len(topic), len(topic), topic)
381+
message.append(struct.pack('>h%ds' % len(topic), len(topic), topic))
370382

371-
return write_int_string(message)
383+
msg = b''.join(message)
384+
return write_int_string(msg)
372385

373386
@classmethod
374387
def decode_metadata_response(cls, data):
@@ -435,20 +448,22 @@ def encode_offset_commit_request(cls, client_id, correlation_id,
435448
"""
436449
grouped_payloads = group_by_topic_and_partition(payloads)
437450

438-
message = cls._encode_message_header(client_id, correlation_id,
439-
KafkaProtocol.OFFSET_COMMIT_KEY)
440-
message += write_short_string(group)
441-
message += struct.pack('>i', len(grouped_payloads))
451+
message = []
452+
message.append(cls._encode_message_header(client_id, correlation_id,
453+
KafkaProtocol.OFFSET_COMMIT_KEY))
454+
message.append(write_short_string(group))
455+
message.append(struct.pack('>i', len(grouped_payloads)))
442456

443457
for topic, topic_payloads in grouped_payloads.items():
444-
message += write_short_string(topic)
445-
message += struct.pack('>i', len(topic_payloads))
458+
message.append(write_short_string(topic))
459+
message.append(struct.pack('>i', len(topic_payloads)))
446460

447461
for partition, payload in topic_payloads.items():
448-
message += struct.pack('>iq', partition, payload.offset)
449-
message += write_short_string(payload.metadata)
462+
message.append(struct.pack('>iq', partition, payload.offset))
463+
message.append(write_short_string(payload.metadata))
450464

451-
return struct.pack('>i%ds' % len(message), len(message), message)
465+
msg = b''.join(message)
466+
return struct.pack('>i%ds' % len(msg), len(msg), msg)
452467

453468
@classmethod
454469
def decode_offset_commit_response(cls, data):
@@ -484,20 +499,23 @@ def encode_offset_fetch_request(cls, client_id, correlation_id,
484499
payloads: list of OffsetFetchRequest
485500
"""
486501
grouped_payloads = group_by_topic_and_partition(payloads)
487-
message = cls._encode_message_header(client_id, correlation_id,
488-
KafkaProtocol.OFFSET_FETCH_KEY)
489502

490-
message += write_short_string(group)
491-
message += struct.pack('>i', len(grouped_payloads))
503+
message = []
504+
message.append(cls._encode_message_header(client_id, correlation_id,
505+
KafkaProtocol.OFFSET_FETCH_KEY))
506+
507+
message.append(write_short_string(group))
508+
message.append(struct.pack('>i', len(grouped_payloads)))
492509

493510
for topic, topic_payloads in grouped_payloads.items():
494-
message += write_short_string(topic)
495-
message += struct.pack('>i', len(topic_payloads))
511+
message.append(write_short_string(topic))
512+
message.append(struct.pack('>i', len(topic_payloads)))
496513

497514
for partition, payload in topic_payloads.items():
498-
message += struct.pack('>i', partition)
515+
message.append(struct.pack('>i', partition))
499516

500-
return struct.pack('>i%ds' % len(message), len(message), message)
517+
msg = b''.join(message)
518+
return struct.pack('>i%ds' % len(msg), len(msg), msg)
501519

502520
@classmethod
503521
def decode_offset_fetch_response(cls, data):

test/test_producer_integration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import time
33
import uuid
44

5+
from six.moves import range
6+
57
from kafka import (
68
SimpleProducer, KeyedProducer,
79
create_message, create_gzip_message, create_snappy_message,

test/test_protocol.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -453,31 +453,31 @@ def test_encode_metadata_request_with_topics(self):
453453
self.assertEqual(encoded, expected)
454454

455455
def _create_encoded_metadata_response(self, brokers, topics):
456-
encoded = struct.pack('>ii', 3, len(brokers))
456+
encoded = []
457+
encoded.append(struct.pack('>ii', 3, len(brokers)))
457458
for broker in brokers:
458-
encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId,
459-
len(broker.host), broker.host, broker.port)
459+
encoded.append(struct.pack('>ih%dsi' % len(broker.host),
460+
broker.nodeId, len(broker.host),
461+
broker.host, broker.port))
460462

461-
encoded += struct.pack('>i', len(topics))
463+
encoded.append(struct.pack('>i', len(topics)))
462464
for topic in topics:
463-
encoded += struct.pack('>hh%dsi' % len(topic.topic),
464-
topic.error, len(topic.topic),
465-
topic.topic, len(topic.partitions))
465+
encoded.append(struct.pack('>hh%dsi' % len(topic.topic),
466+
topic.error, len(topic.topic),
467+
topic.topic, len(topic.partitions)))
466468
for metadata in topic.partitions:
467-
encoded += struct.pack('>hiii',
468-
metadata.error,
469-
metadata.partition,
470-
metadata.leader,
471-
len(metadata.replicas))
469+
encoded.append(struct.pack('>hiii', metadata.error,
470+
metadata.partition, metadata.leader,
471+
len(metadata.replicas)))
472472
if len(metadata.replicas) > 0:
473-
encoded += struct.pack('>%di' % len(metadata.replicas),
474-
*metadata.replicas)
473+
encoded.append(struct.pack('>%di' % len(metadata.replicas),
474+
*metadata.replicas))
475475

476-
encoded += struct.pack('>i', len(metadata.isr))
476+
encoded.append(struct.pack('>i', len(metadata.isr)))
477477
if len(metadata.isr) > 0:
478-
encoded += struct.pack('>%di' % len(metadata.isr),
479-
*metadata.isr)
480-
return encoded
478+
encoded.append(struct.pack('>%di' % len(metadata.isr),
479+
*metadata.isr))
480+
return b''.join(encoded)
481481

482482
def test_decode_metadata_response(self):
483483
node_brokers = [

0 commit comments

Comments
 (0)