14
14
from .util import read_short_string , read_int_string
15
15
from .util import relative_unpack
16
16
from .util import write_short_string , write_int_string
17
- from .util import group_list_by_key
17
+ from .util import group_by_topic_and_partition
18
18
from .util import BufferUnderflowError , ChecksumError
19
19
20
20
log = logging .getLogger ("kafka" )
33
33
# Response payloads
34
34
ProduceResponse = namedtuple ("ProduceResponse" , ["topic" , "partition" , "error" , "offset" ])
35
35
FetchResponse = namedtuple ("FetchResponse" , ["topic" , "partition" , "error" , "highwaterMark" , "messages" ])
36
- OffsetResponse = namedtuple ("OffsetResponse" , ["topic" , "partition" , "error" , "offset " ])
36
+ OffsetResponse = namedtuple ("OffsetResponse" , ["topic" , "partition" , "error" , "offsets " ])
37
37
OffsetCommitResponse = namedtuple ("OffsetCommitResponse" , ["topic" , "partition" , "error" ])
38
38
OffsetFetchResponse = namedtuple ("OffsetFetchResponse" , ["topic" , "partition" , "offset" , "metadata" , "error" ])
39
39
BrokerMetadata = namedtuple ("BrokerMetadata" , ["nodeId" , "host" , "port" ])
@@ -74,6 +74,9 @@ class KafkaProtocol(object):
74
74
OFFSET_FETCH_KEY = 7
75
75
76
76
ATTRIBUTE_CODEC_MASK = 0x03
77
+ CODEC_NONE = 0x00
78
+ CODEC_GZIP = 0x01
79
+ CODEC_SNAPPY = 0x02
77
80
78
81
###################
79
82
# Private API #
@@ -171,13 +174,13 @@ def _decode_message(cls, data, offset):
171
174
172
175
(key , cur ) = read_int_string (data , cur )
173
176
(value , cur ) = read_int_string (data , cur )
174
- if att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == 0 :
177
+ if att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == KafkaProtocol . CODEC_NONE :
175
178
yield (offset , Message (magic , att , key , value ))
176
- elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == 1 :
179
+ elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == KafkaProtocol . CODEC_GZIP :
177
180
gz = gzip_decode (value )
178
181
for (offset , message ) in KafkaProtocol ._decode_message_set_iter (gz ):
179
182
yield (offset , message )
180
- elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == 2 :
183
+ elif att & KafkaProtocol .ATTRIBUTE_CODEC_MASK == KafkaProtocol . CODEC_SNAPPY :
181
184
snp = snappy_decode (value )
182
185
for (offset , message ) in KafkaProtocol ._decode_message_set_iter (snp ):
183
186
yield (offset , message )
@@ -214,8 +217,25 @@ def create_gzip_message(cls, payloads, key=None):
214
217
message_set = KafkaProtocol ._encode_message_set (
215
218
[KafkaProtocol .create_message (payload ) for payload in payloads ])
216
219
gzipped = gzip_encode (message_set )
217
- return Message (0 , 0x00 | (KafkaProtocol .ATTRIBUTE_CODEC_MASK & 0x01 ), key , gzipped )
220
+ return Message (0 , 0x00 | (KafkaProtocol .ATTRIBUTE_CODEC_MASK & KafkaProtocol . CODEC_GZIP ), key , gzipped )
218
221
222
+ @classmethod
223
+ def create_snappy_message (cls , payloads , key = None ):
224
+ """
225
+ Construct a Snappy Message containing multiple Messages
226
+
227
+ The given payloads will be encoded, compressed, and sent as a single atomic
228
+ message to Kafka.
229
+
230
+ Params
231
+ ======
232
+ payloads: list(bytes), a list of payload to send be sent to Kafka
233
+ key: bytes, a key used for partition routing (optional)
234
+ """
235
+ message_set = KafkaProtocol ._encode_message_set (
236
+ [KafkaProtocol .create_message (payload ) for payload in payloads ])
237
+ snapped = snappy_encode (message_set )
238
+ return Message (0 , 0x00 | (KafkaProtocol .ATTRIBUTE_CODEC_MASK & KafkaProtocol .CODEC_SNAPPY ), key , snapped )
219
239
220
240
@classmethod
221
241
def encode_produce_request (cls , client_id , correlation_id , payloads = [], acks = 1 , timeout = 1000 ):
@@ -234,14 +254,14 @@ def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1,
234
254
-1: waits for all replicas to be in sync
235
255
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
236
256
"""
237
- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
257
+ grouped_payloads = group_by_topic_and_partition (payloads )
238
258
message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .PRODUCE_KEY )
239
- message += struct .pack ('>hii' , acks , timeout , len (payloads_by_topic ))
240
- for topic , payloads in payloads_by_topic .items ():
241
- message += struct .pack ('>h%dsi' % len (topic ), len (topic ), topic , len (payloads ))
242
- for payload in payloads :
259
+ message += struct .pack ('>hii' , acks , timeout , len (grouped_payloads ))
260
+ for topic , topic_payloads in grouped_payloads .items ():
261
+ message += struct .pack ('>h%dsi' % len (topic ), len (topic ), topic , len (topic_payloads ))
262
+ for partition , payload in topic_payloads . items () :
243
263
message_set = KafkaProtocol ._encode_message_set (payload .messages )
244
- message += struct .pack ('>ii%ds' % len (message_set ), payload . partition , len (message_set ), message_set )
264
+ message += struct .pack ('>ii%ds' % len (message_set ), partition , len (message_set ), message_set )
245
265
return struct .pack ('>i%ds' % len (message ), len (message ), message )
246
266
247
267
@classmethod
@@ -276,15 +296,15 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_t
276
296
max_wait_time: int, how long to block waiting on min_bytes of data
277
297
min_bytes: int, the minimum number of bytes to accumulate before returning the response
278
298
"""
279
-
280
- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
299
+
300
+ grouped_payloads = group_by_topic_and_partition (payloads )
281
301
message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .FETCH_KEY )
282
- message += struct .pack ('>iiii' , - 1 , max_wait_time , min_bytes , len (payloads_by_topic )) # -1 is the replica id
283
- for topic , payloads in payloads_by_topic .items ():
302
+ message += struct .pack ('>iiii' , - 1 , max_wait_time , min_bytes , len (grouped_payloads )) # -1 is the replica id
303
+ for topic , topic_payloads in grouped_payloads .items ():
284
304
message += write_short_string (topic )
285
- message += struct .pack ('>i' , len (payloads ))
286
- for payload in payloads :
287
- message += struct .pack ('>iqi' , payload . partition , payload .offset , payload .max_bytes )
305
+ message += struct .pack ('>i' , len (topic_payloads ))
306
+ for partition , payload in topic_payloads . items () :
307
+ message += struct .pack ('>iqi' , partition , payload .offset , payload .max_bytes )
288
308
return struct .pack ('>i%ds' % len (message ), len (message ), message )
289
309
290
310
@classmethod
@@ -308,14 +328,14 @@ def decode_fetch_response_iter(cls, data):
308
328
309
329
@classmethod
310
330
def encode_offset_request (cls , client_id , correlation_id , payloads = []):
311
- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
331
+ grouped_payloads = group_by_topic_and_partition (payloads )
312
332
message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .OFFSET_KEY )
313
- message += struct .pack ('>ii' , - 1 , len (payloads_by_topic )) # -1 is the replica id
314
- for topic , payloads in payloads_by_topic .items ():
333
+ message += struct .pack ('>ii' , - 1 , len (grouped_payloads )) # -1 is the replica id
334
+ for topic , topic_payloads in grouped_payloads .items ():
315
335
message += write_short_string (topic )
316
- message += struct .pack ('>i' , len (payloads ))
317
- for payload in payloads :
318
- message += struct .pack ('>iqi' , payload . partition , payload .time , payload .max_offsets )
336
+ message += struct .pack ('>i' , len (topic_payloads ))
337
+ for partition , payload in topic_payloads . items () :
338
+ message += struct .pack ('>iqi' , partition , payload .time , payload .max_offsets )
319
339
return struct .pack ('>i%ds' % len (message ), len (message ), message )
320
340
321
341
@classmethod
@@ -332,8 +352,12 @@ def decode_offset_response(cls, data):
332
352
(topic , cur ) = read_short_string (data , cur )
333
353
((num_partitions ,), cur ) = relative_unpack ('>i' , data , cur )
334
354
for i in range (num_partitions ):
335
- ((partition , error , offset ), cur ) = relative_unpack ('>ihq' , data , cur )
336
- yield OffsetResponse (topic , partition , error , offset )
355
+ ((partition , error , num_offsets ,), cur ) = relative_unpack ('>ihi' , data , cur )
356
+ offsets = []
357
+ for j in range (num_offsets ):
358
+ ((offset ,), cur ) = relative_unpack ('>q' , data , cur )
359
+ offsets .append (offset )
360
+ yield OffsetResponse (topic , partition , error , tuple (offsets ))
337
361
338
362
@classmethod
339
363
def encode_metadata_request (cls , client_id , correlation_id , topics = []):
@@ -400,15 +424,15 @@ def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads
400
424
group: string, the consumer group you are committing offsets for
401
425
payloads: list of OffsetCommitRequest
402
426
"""
403
- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
427
+ grouped_payloads = group_by_topic_and_partition (payloads )
404
428
message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .OFFSET_COMMIT_KEY )
405
429
message += write_short_string (group )
406
- message += struct .pack ('>i' , len (payloads_by_topic ))
407
- for topic , payloads in payloads_by_topic .items ():
430
+ message += struct .pack ('>i' , len (grouped_payloads ))
431
+ for topic , topic_payloads in grouped_payloads .items ():
408
432
message += write_short_string (topic )
409
- message += struct .pack ('>i' , len (payloads ))
410
- for payload in payloads :
411
- message += struct .pack ('>iq' , payload . partition , payload .offset )
433
+ message += struct .pack ('>i' , len (topic_payloads ))
434
+ for partition , payload in topic_payloads . items () :
435
+ message += struct .pack ('>iq' , partition , payload .offset )
412
436
message += write_short_string (payload .metadata )
413
437
return struct .pack ('>i%ds' % len (message ), len (message ), message )
414
438
@@ -421,6 +445,7 @@ def decode_offset_commit_response(cls, data):
421
445
======
422
446
data: bytes to decode
423
447
"""
448
+ data = data [2 :] # TODO remove me when versionId is removed
424
449
((correlation_id ,), cur ) = relative_unpack ('>i' , data , 0 )
425
450
(client_id , cur ) = read_short_string (data , cur )
426
451
((num_topics ,), cur ) = relative_unpack ('>i' , data , cur )
@@ -443,15 +468,15 @@ def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads)
443
468
group: string, the consumer group you are fetching offsets for
444
469
payloads: list of OffsetFetchRequest
445
470
"""
446
- payloads_by_topic = group_list_by_key (payloads , key = attrgetter ( "topic" ) )
471
+ grouped_payloads = group_by_topic_and_partition (payloads )
447
472
message = cls ._encode_message_header (client_id , correlation_id , KafkaProtocol .OFFSET_FETCH_KEY )
448
473
message += write_short_string (group )
449
- message += struct .pack ('>i' , len (payloads_by_topic ))
450
- for topic , payloads in payloads_by_topic .items ():
474
+ message += struct .pack ('>i' , len (grouped_payloads ))
475
+ for topic , topic_payloads in grouped_payloads .items ():
451
476
message += write_short_string (topic )
452
- message += struct .pack ('>i' , len (payloads ))
453
- for payload in payloads :
454
- message += struct .pack ('>i' , payload . partition )
477
+ message += struct .pack ('>i' , len (topic_payloads ))
478
+ for partition , payload in topic_payloads . items () :
479
+ message += struct .pack ('>i' , partition )
455
480
return struct .pack ('>i%ds' % len (message ), len (message ), message )
456
481
457
482
@classmethod
@@ -493,6 +518,9 @@ def __init__(self, host, port, bufsize=4096):
493
518
self ._sock .connect ((host , port ))
494
519
self ._sock .settimeout (10 )
495
520
521
+ def __str__ (self ):
522
+ return "<KafkaConnection host=%s port=%d>" % (self .host , self .port )
523
+
496
524
###################
497
525
# Private API #
498
526
###################
@@ -536,6 +564,8 @@ def _consume_response_iter(self):
536
564
# Public API #
537
565
##################
538
566
567
+ # TODO multiplex socket communication to allow for multi-threaded clients
568
+
539
569
def send (self , requestId , payload ):
540
570
"Send a request to Kafka"
541
571
sent = self ._sock .sendall (payload )
@@ -566,6 +596,10 @@ def __init__(self, host, port, bufsize=4096):
566
596
self .topics_to_brokers = {} # topic_id -> broker_id
567
597
self .load_metadata_for_topics ()
568
598
599
+ def close (self ):
600
+ for conn in self .conns .values ():
601
+ conn .close ()
602
+
569
603
def get_conn_for_broker (self , broker ):
570
604
"Get or create a connection to a broker"
571
605
if (broker .host , broker .port ) not in self .conns :
@@ -626,20 +660,14 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
626
660
======
627
661
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
628
662
"""
629
- key_fn = lambda x : (x .topic , x .partition )
630
-
631
- # Note the order of the incoming payloads
632
- original_keys = [key_fn (payload ) for payload in payloads ]
633
-
634
- # Group the produce requests by topic+partition
635
- payloads_by_topic_and_partition = group_list_by_key (payloads , key = key_fn )
636
-
637
663
# Group the produce requests by which broker they go to
664
+ original_keys = []
638
665
payloads_by_broker = defaultdict (list )
639
- for (topic , partition ), payloads in payloads_by_topic_and_partition .items ():
640
- payloads_by_broker [self .get_leader_for_partition (topic , partition )] += payloads
666
+ for payload in payloads :
667
+ payloads_by_broker [self .get_leader_for_partition (payload .topic , payload .partition )] += payloads
668
+ original_keys .append ((payload .topic , payload .partition ))
641
669
642
- # Accumulate the responses in a dictionary, keyed by key_fn
670
+ # Accumulate the responses in a dictionary
643
671
acc = {}
644
672
645
673
# For each broker, send the list of request payloads
@@ -657,11 +685,10 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
657
685
(TopicAndPartition (produce_response .topic , produce_response .partition ), produce_response .error ))
658
686
# Run the callback
659
687
if callback is not None :
660
- acc [key_fn (produce_response )] = callback (produce_response )
688
+ acc [(produce_response . topic , produce_response . partition )] = callback (produce_response )
661
689
else :
662
- acc [key_fn (produce_response )] = produce_response
690
+ acc [(produce_response . topic , produce_response . partition )] = produce_response
663
691
664
- print (acc )
665
692
# Order the accumulated responses by the original key order
666
693
return (acc [k ] for k in original_keys )
667
694
@@ -672,20 +699,14 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
672
699
Payloads are grouped by topic and partition so they can be pipelined to the same
673
700
brokers.
674
701
"""
675
- key_fn = lambda x : (x .topic , x .partition )
676
-
677
- # Note the order of the incoming payloads
678
- original_keys = [key_fn (payload ) for payload in payloads ]
679
-
680
- # Group the produce requests by topic+partition
681
- payloads_by_topic_and_partition = group_list_by_key (payloads , key = key_fn )
682
-
683
702
# Group the produce requests by which broker they go to
703
+ original_keys = []
684
704
payloads_by_broker = defaultdict (list )
685
- for (topic , partition ), payloads in payloads_by_topic_and_partition .items ():
686
- payloads_by_broker [self .get_leader_for_partition (topic , partition )] += payloads
705
+ for payload in payloads :
706
+ payloads_by_broker [self .get_leader_for_partition (payload .topic , payload .partition )].append (payload )
707
+ original_keys .append ((payload .topic , payload .partition ))
687
708
688
- # Accumulate the responses in a dictionary, keyed by key_fn
709
+ # Accumulate the responses in a dictionary, keyed by topic+partition
689
710
acc = {}
690
711
691
712
# For each broker, send the list of request payloads
@@ -703,9 +724,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
703
724
(TopicAndPartition (fetch_response .topic , fetch_response .partition ), fetch_response .error ))
704
725
# Run the callback
705
726
if callback is not None :
706
- acc [key_fn (fetch_response )] = callback (fetch_response )
727
+ acc [(fetch_response . topic , fetch_response . partition )] = callback (fetch_response )
707
728
else :
708
- acc [key_fn (fetch_response )] = fetch_response
729
+ acc [(fetch_response . topic , fetch_response . partition )] = fetch_response
709
730
710
731
# Order the accumulated responses by the original key order
711
732
return (acc [k ] for k in original_keys )
@@ -720,11 +741,30 @@ def try_send_request(self, requestId, request):
720
741
conn .send (requestId , request )
721
742
response = conn .recv (requestId )
722
743
return response
723
- except Exception :
724
- log .warning ("Could not commit offset to server %s, trying next server" , conn )
744
+ except Exception , e :
745
+ log .warning ("Could not send request [%r] to server %s, trying next server: %s" % ( request , conn , e ) )
725
746
continue
726
747
return None
727
748
749
+ def send_offset_request (self , payloads = [], fail_on_error = True , callback = None ):
750
+ requestId = self .next_id ()
751
+ request = KafkaProtocol .encode_offset_request (KafkaClient .CLIENT_ID , requestId , payloads )
752
+ response = self .try_send_request (requestId , request )
753
+ if response is None :
754
+ if fail_on_error is True :
755
+ raise Exception ("All servers failed to process request" )
756
+ else :
757
+ return None
758
+ out = []
759
+ for offset_response in KafkaProtocol .decode_offset_response (response ):
760
+ if fail_on_error == True and offset_response .error != 0 :
761
+ raise Exception ("OffsetRequest failed with errorcode=%s" , offset_response .error )
762
+ if callback is not None :
763
+ out .append (callback (offset_response ))
764
+ else :
765
+ out .append (offset_response )
766
+ return out
767
+
728
768
def send_offset_commit_request (self , group , payloads = [], fail_on_error = True , callback = None ):
729
769
requestId = self .next_id ()
730
770
request = KafkaProtocol .encode_offset_commit_request (KafkaClient .CLIENT_ID , requestId , group , payloads )
@@ -737,6 +777,7 @@ def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, cal
737
777
out = []
738
778
for offset_commit_response in KafkaProtocol .decode_offset_commit_response (response ):
739
779
if fail_on_error == True and offset_commit_response .error != 0 :
780
+ print (offset_commit_response )
740
781
raise Exception ("OffsetCommitRequest failed with errorcode=%s" , offset_commit_response .error )
741
782
if callback is not None :
742
783
out .append (callback (offset_commit_response ))
@@ -770,7 +811,7 @@ def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, call
770
811
771
812
topic = "foo8"
772
813
# Bootstrap connection
773
- conn = KafkaClient ("localhost" , 9092 )
814
+ conn = KafkaClient ("localhost" , 49720 )
774
815
775
816
# Create some Messages
776
817
messages = (KafkaProtocol .create_gzip_message (["GZIPPed" ]),
@@ -799,7 +840,6 @@ def init_offsets(offset_response):
799
840
return 0
800
841
else :
801
842
return offset_response .offset
802
-
803
843
# Load offsets
804
844
(offset1 , offset2 ) = conn .send_offset_fetch_request (
805
845
group = "group1" ,
0 commit comments