Skip to content

Commit d0abb07

Browse files
committed
Protocol and low-level client done, adding tests
1 parent 5eb79c0 commit d0abb07

File tree

6 files changed

+397
-273
lines changed

6 files changed

+397
-273
lines changed

kafka-src

Submodule kafka-src updated 578 files

kafka/client08.py

+110-70
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from .util import read_short_string, read_int_string
1515
from .util import relative_unpack
1616
from .util import write_short_string, write_int_string
17-
from .util import group_list_by_key
17+
from .util import group_by_topic_and_partition
1818
from .util import BufferUnderflowError, ChecksumError
1919

2020
log = logging.getLogger("kafka")
@@ -33,7 +33,7 @@
3333
# Response payloads
3434
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
3535
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"])
36-
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"])
36+
OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
3737
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
3838
OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"])
3939
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
@@ -74,6 +74,9 @@ class KafkaProtocol(object):
7474
OFFSET_FETCH_KEY = 7
7575

7676
ATTRIBUTE_CODEC_MASK = 0x03
77+
CODEC_NONE = 0x00
78+
CODEC_GZIP = 0x01
79+
CODEC_SNAPPY = 0x02
7780

7881
###################
7982
# Private API #
@@ -171,13 +174,13 @@ def _decode_message(cls, data, offset):
171174

172175
(key, cur) = read_int_string(data, cur)
173176
(value, cur) = read_int_string(data, cur)
174-
if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0:
177+
if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE:
175178
yield (offset, Message(magic, att, key, value))
176-
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1:
179+
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP:
177180
gz = gzip_decode(value)
178181
for (offset, message) in KafkaProtocol._decode_message_set_iter(gz):
179182
yield (offset, message)
180-
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2:
183+
elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY:
181184
snp = snappy_decode(value)
182185
for (offset, message) in KafkaProtocol._decode_message_set_iter(snp):
183186
yield (offset, message)
@@ -214,8 +217,25 @@ def create_gzip_message(cls, payloads, key=None):
214217
message_set = KafkaProtocol._encode_message_set(
215218
[KafkaProtocol.create_message(payload) for payload in payloads])
216219
gzipped = gzip_encode(message_set)
217-
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped)
220+
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped)
218221

222+
@classmethod
223+
def create_snappy_message(cls, payloads, key=None):
224+
"""
225+
Construct a Snappy Message containing multiple Messages
226+
227+
The given payloads will be encoded, compressed, and sent as a single atomic
228+
message to Kafka.
229+
230+
Params
231+
======
232+
payloads: list(bytes), a list of payload to send be sent to Kafka
233+
key: bytes, a key used for partition routing (optional)
234+
"""
235+
message_set = KafkaProtocol._encode_message_set(
236+
[KafkaProtocol.create_message(payload) for payload in payloads])
237+
snapped = snappy_encode(message_set)
238+
return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped)
219239

220240
@classmethod
221241
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
@@ -234,14 +254,14 @@ def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1,
234254
-1: waits for all replicas to be in sync
235255
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
236256
"""
237-
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
257+
grouped_payloads = group_by_topic_and_partition(payloads)
238258
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
239-
message += struct.pack('>hii', acks, timeout, len(payloads_by_topic))
240-
for topic, payloads in payloads_by_topic.items():
241-
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads))
242-
for payload in payloads:
259+
message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
260+
for topic, topic_payloads in grouped_payloads.items():
261+
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads))
262+
for partition, payload in topic_payloads.items():
243263
message_set = KafkaProtocol._encode_message_set(payload.messages)
244-
message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set)
264+
message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set)
245265
return struct.pack('>i%ds' % len(message), len(message), message)
246266

247267
@classmethod
@@ -276,15 +296,15 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_t
276296
max_wait_time: int, how long to block waiting on min_bytes of data
277297
min_bytes: int, the minimum number of bytes to accumulate before returning the response
278298
"""
279-
280-
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
299+
300+
grouped_payloads = group_by_topic_and_partition(payloads)
281301
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
282-
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id
283-
for topic, payloads in payloads_by_topic.items():
302+
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id
303+
for topic, topic_payloads in grouped_payloads.items():
284304
message += write_short_string(topic)
285-
message += struct.pack('>i', len(payloads))
286-
for payload in payloads:
287-
message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes)
305+
message += struct.pack('>i', len(topic_payloads))
306+
for partition, payload in topic_payloads.items():
307+
message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes)
288308
return struct.pack('>i%ds' % len(message), len(message), message)
289309

290310
@classmethod
@@ -308,14 +328,14 @@ def decode_fetch_response_iter(cls, data):
308328

309329
@classmethod
310330
def encode_offset_request(cls, client_id, correlation_id, payloads=[]):
311-
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
331+
grouped_payloads = group_by_topic_and_partition(payloads)
312332
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
313-
message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id
314-
for topic, payloads in payloads_by_topic.items():
333+
message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id
334+
for topic, topic_payloads in grouped_payloads.items():
315335
message += write_short_string(topic)
316-
message += struct.pack('>i', len(payloads))
317-
for payload in payloads:
318-
message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets)
336+
message += struct.pack('>i', len(topic_payloads))
337+
for partition, payload in topic_payloads.items():
338+
message += struct.pack('>iqi', partition, payload.time, payload.max_offsets)
319339
return struct.pack('>i%ds' % len(message), len(message), message)
320340

321341
@classmethod
@@ -332,8 +352,12 @@ def decode_offset_response(cls, data):
332352
(topic, cur) = read_short_string(data, cur)
333353
((num_partitions,), cur) = relative_unpack('>i', data, cur)
334354
for i in range(num_partitions):
335-
((partition, error, offset), cur) = relative_unpack('>ihq', data, cur)
336-
yield OffsetResponse(topic, partition, error, offset)
355+
((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur)
356+
offsets = []
357+
for j in range(num_offsets):
358+
((offset,), cur) = relative_unpack('>q', data, cur)
359+
offsets.append(offset)
360+
yield OffsetResponse(topic, partition, error, tuple(offsets))
337361

338362
@classmethod
339363
def encode_metadata_request(cls, client_id, correlation_id, topics=[]):
@@ -400,15 +424,15 @@ def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads
400424
group: string, the consumer group you are committing offsets for
401425
payloads: list of OffsetCommitRequest
402426
"""
403-
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
427+
grouped_payloads= group_by_topic_and_partition(payloads)
404428
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
405429
message += write_short_string(group)
406-
message += struct.pack('>i', len(payloads_by_topic))
407-
for topic, payloads in payloads_by_topic.items():
430+
message += struct.pack('>i', len(grouped_payloads))
431+
for topic, topic_payloads in grouped_payloads.items():
408432
message += write_short_string(topic)
409-
message += struct.pack('>i', len(payloads))
410-
for payload in payloads:
411-
message += struct.pack('>iq', payload.partition, payload.offset)
433+
message += struct.pack('>i', len(topic_payloads))
434+
for partition, payload in topic_payloads.items():
435+
message += struct.pack('>iq', partition, payload.offset)
412436
message += write_short_string(payload.metadata)
413437
return struct.pack('>i%ds' % len(message), len(message), message)
414438

@@ -421,6 +445,7 @@ def decode_offset_commit_response(cls, data):
421445
======
422446
data: bytes to decode
423447
"""
448+
data = data[2:] # TODO remove me when versionId is removed
424449
((correlation_id,), cur) = relative_unpack('>i', data, 0)
425450
(client_id, cur) = read_short_string(data, cur)
426451
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -443,15 +468,15 @@ def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads)
443468
group: string, the consumer group you are fetching offsets for
444469
payloads: list of OffsetFetchRequest
445470
"""
446-
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
471+
grouped_payloads = group_by_topic_and_partition(payloads)
447472
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
448473
message += write_short_string(group)
449-
message += struct.pack('>i', len(payloads_by_topic))
450-
for topic, payloads in payloads_by_topic.items():
474+
message += struct.pack('>i', len(grouped_payloads))
475+
for topic, topic_payloads in grouped_payloads.items():
451476
message += write_short_string(topic)
452-
message += struct.pack('>i', len(payloads))
453-
for payload in payloads:
454-
message += struct.pack('>i', payload.partition)
477+
message += struct.pack('>i', len(topic_payloads))
478+
for partition, payload in topic_payloads.items():
479+
message += struct.pack('>i', partition)
455480
return struct.pack('>i%ds' % len(message), len(message), message)
456481

457482
@classmethod
@@ -493,6 +518,9 @@ def __init__(self, host, port, bufsize=4096):
493518
self._sock.connect((host, port))
494519
self._sock.settimeout(10)
495520

521+
def __str__(self):
522+
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
523+
496524
###################
497525
# Private API #
498526
###################
@@ -536,6 +564,8 @@ def _consume_response_iter(self):
536564
# Public API #
537565
##################
538566

567+
# TODO multiplex socket communication to allow for multi-threaded clients
568+
539569
def send(self, requestId, payload):
540570
"Send a request to Kafka"
541571
sent = self._sock.sendall(payload)
@@ -566,6 +596,10 @@ def __init__(self, host, port, bufsize=4096):
566596
self.topics_to_brokers = {} # topic_id -> broker_id
567597
self.load_metadata_for_topics()
568598

599+
def close(self):
600+
for conn in self.conns.values():
601+
conn.close()
602+
569603
def get_conn_for_broker(self, broker):
570604
"Get or create a connection to a broker"
571605
if (broker.host, broker.port) not in self.conns:
@@ -626,20 +660,14 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
626660
======
627661
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
628662
"""
629-
key_fn = lambda x: (x.topic, x.partition)
630-
631-
# Note the order of the incoming payloads
632-
original_keys = [key_fn(payload) for payload in payloads]
633-
634-
# Group the produce requests by topic+partition
635-
payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
636-
637663
# Group the produce requests by which broker they go to
664+
original_keys = []
638665
payloads_by_broker = defaultdict(list)
639-
for (topic, partition), payloads in payloads_by_topic_and_partition.items():
640-
payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
666+
for payload in payloads:
667+
payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads
668+
original_keys.append((payload.topic, payload.partition))
641669

642-
# Accumulate the responses in a dictionary, keyed by key_fn
670+
# Accumulate the responses in a dictionary
643671
acc = {}
644672

645673
# For each broker, send the list of request payloads
@@ -657,11 +685,10 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
657685
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
658686
# Run the callback
659687
if callback is not None:
660-
acc[key_fn(produce_response)] = callback(produce_response)
688+
acc[(produce_response.topic, produce_response.partition)] = callback(produce_response)
661689
else:
662-
acc[key_fn(produce_response)] = produce_response
690+
acc[(produce_response.topic, produce_response.partition)] = produce_response
663691

664-
print(acc)
665692
# Order the accumulated responses by the original key order
666693
return (acc[k] for k in original_keys)
667694

@@ -672,20 +699,14 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
672699
Payloads are grouped by topic and partition so they can be pipelined to the same
673700
brokers.
674701
"""
675-
key_fn = lambda x: (x.topic, x.partition)
676-
677-
# Note the order of the incoming payloads
678-
original_keys = [key_fn(payload) for payload in payloads]
679-
680-
# Group the produce requests by topic+partition
681-
payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
682-
683702
# Group the produce requests by which broker they go to
703+
original_keys = []
684704
payloads_by_broker = defaultdict(list)
685-
for (topic, partition), payloads in payloads_by_topic_and_partition.items():
686-
payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
705+
for payload in payloads:
706+
payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload)
707+
original_keys.append((payload.topic, payload.partition))
687708

688-
# Accumulate the responses in a dictionary, keyed by key_fn
709+
# Accumulate the responses in a dictionary, keyed by topic+partition
689710
acc = {}
690711

691712
# For each broker, send the list of request payloads
@@ -703,9 +724,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
703724
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
704725
# Run the callback
705726
if callback is not None:
706-
acc[key_fn(fetch_response)] = callback(fetch_response)
727+
acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response)
707728
else:
708-
acc[key_fn(fetch_response)] = fetch_response
729+
acc[(fetch_response.topic, fetch_response.partition)] = fetch_response
709730

710731
# Order the accumulated responses by the original key order
711732
return (acc[k] for k in original_keys)
@@ -720,11 +741,30 @@ def try_send_request(self, requestId, request):
720741
conn.send(requestId, request)
721742
response = conn.recv(requestId)
722743
return response
723-
except Exception:
724-
log.warning("Could not commit offset to server %s, trying next server", conn)
744+
except Exception, e:
745+
log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
725746
continue
726747
return None
727748

749+
def send_offset_request(self, payloads=[], fail_on_error=True, callback=None):
750+
requestId = self.next_id()
751+
request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads)
752+
response = self.try_send_request(requestId, request)
753+
if response is None:
754+
if fail_on_error is True:
755+
raise Exception("All servers failed to process request")
756+
else:
757+
return None
758+
out = []
759+
for offset_response in KafkaProtocol.decode_offset_response(response):
760+
if fail_on_error == True and offset_response.error != 0:
761+
raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error)
762+
if callback is not None:
763+
out.append(callback(offset_response))
764+
else:
765+
out.append(offset_response)
766+
return out
767+
728768
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
729769
requestId = self.next_id()
730770
request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
@@ -737,6 +777,7 @@ def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, cal
737777
out = []
738778
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
739779
if fail_on_error == True and offset_commit_response.error != 0:
780+
print(offset_commit_response)
740781
raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error)
741782
if callback is not None:
742783
out.append(callback(offset_commit_response))
@@ -770,7 +811,7 @@ def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, call
770811

771812
topic = "foo8"
772813
# Bootstrap connection
773-
conn = KafkaClient("localhost", 9092)
814+
conn = KafkaClient("localhost", 49720)
774815

775816
# Create some Messages
776817
messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
@@ -799,7 +840,6 @@ def init_offsets(offset_response):
799840
return 0
800841
else:
801842
return offset_response.offset
802-
803843
# Load offsets
804844
(offset1, offset2) = conn.send_offset_fetch_request(
805845
group="group1",

0 commit comments

Comments
 (0)