8
8
import six
9
9
10
10
from .base import BaseCoordinator
11
+ from .assignors .roundrobin import RoundRobinPartitionAssignor
11
12
from .protocol import (
12
13
ConsumerProtocolMemberMetadata , ConsumerProtocolMemberAssignment ,
13
14
ConsumerProtocol )
@@ -29,7 +30,7 @@ class ConsumerCoordinator(BaseCoordinator):
29
30
'enable_auto_commit' : True ,
30
31
'auto_commit_interval_ms' : 5000 ,
31
32
'default_offset_commit_callback' : lambda offsets , response : True ,
32
- 'assignors' : (),
33
+ 'assignors' : (RoundRobinPartitionAssignor , ),
33
34
'session_timeout_ms' : 30000 ,
34
35
'heartbeat_interval_ms' : 3000 ,
35
36
'retry_backoff_ms' : 100 ,
@@ -100,6 +101,7 @@ def protocol_type(self):
100
101
def group_protocols (self ):
101
102
"""Returns list of preferred (protocols, metadata)"""
102
103
topics = self ._subscription .subscription
104
+ assert topics is not None , 'Consumer has not subscribed to topics'
103
105
metadata_list = []
104
106
for assignor in self .config ['assignors' ]:
105
107
metadata = assignor .metadata (topics )
@@ -111,7 +113,7 @@ def _handle_metadata_update(self, cluster):
111
113
# if we encounter any unauthorized topics, raise an exception
112
114
# TODO
113
115
#if self._cluster.unauthorized_topics:
114
- # raise Errors. TopicAuthorizationError(self._cluster.unauthorized_topics)
116
+ # raise TopicAuthorizationError(self._cluster.unauthorized_topics)
115
117
116
118
if self ._subscription .subscribed_pattern :
117
119
topics = []
@@ -122,7 +124,8 @@ def _handle_metadata_update(self, cluster):
122
124
self ._subscription .change_subscription (topics )
123
125
self ._client .set_topics (self ._subscription .group_subscription ())
124
126
125
- # check if there are any changes to the metadata which should trigger a rebalance
127
+ # check if there are any changes to the metadata which should trigger
128
+ # a rebalance
126
129
if self ._subscription_metadata_changed ():
127
130
if self .config ['api_version' ] >= (0 , 9 ):
128
131
self ._subscription .mark_for_reassignment ()
@@ -182,7 +185,7 @@ def _on_join_complete(self, generation, member_id, protocol,
182
185
# execute the user's callback after rebalance
183
186
if self ._subscription .listener :
184
187
try :
185
- self ._subscriptions .listener .on_partitions_assigned (assigned )
188
+ self ._subscription .listener .on_partitions_assigned (assigned )
186
189
except Exception :
187
190
log .exception ("User provided listener failed on partition"
188
191
" assignment: %s" , assigned )
@@ -263,6 +266,9 @@ def fetch_committed_offsets(self, partitions):
263
266
Returns:
264
267
dict: {TopicPartition: OffsetAndMetadata}
265
268
"""
269
+ if not partitions :
270
+ return {}
271
+
266
272
while True :
267
273
if self .config ['api_version' ] >= (0 , 8 , 2 ):
268
274
self .ensure_coordinator_known ()
@@ -297,11 +303,16 @@ def commit_offsets_async(self, offsets, callback=None):
297
303
Returns:
298
304
Future: indicating whether the commit was successful or not
299
305
"""
306
+ assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
307
+ assert all (map (lambda k : isinstance (k , TopicPartition ), offsets ))
308
+ assert all (map (lambda v : isinstance (v , OffsetAndMetadata ),
309
+ offsets .values ()))
300
310
if callback is None :
301
311
callback = self .config ['default_offset_commit_callback' ]
302
312
self ._subscription .needs_fetch_committed_offsets = True
303
313
future = self ._send_offset_commit_request (offsets )
304
314
future .add_both (callback , offsets )
315
+ return future
305
316
306
317
def commit_offsets_sync (self , offsets ):
307
318
"""Commit specific offsets synchronously.
@@ -314,6 +325,10 @@ def commit_offsets_sync(self, offsets):
314
325
315
326
Raises error on failure
316
327
"""
328
+ assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
329
+ assert all (map (lambda k : isinstance (k , TopicPartition ), offsets ))
330
+ assert all (map (lambda v : isinstance (v , OffsetAndMetadata ),
331
+ offsets .values ()))
317
332
if not offsets :
318
333
return
319
334
@@ -325,7 +340,7 @@ def commit_offsets_sync(self, offsets):
325
340
self ._client .poll (future = future )
326
341
327
342
if future .succeeded ():
328
- return
343
+ return future . value
329
344
330
345
if not future .retriable ():
331
346
raise future .exception # pylint: disable-msg=raising-bad-type
@@ -369,16 +384,20 @@ def _send_offset_commit_request(self, offsets):
369
384
Returns:
370
385
Future: indicating whether the commit was successful or not
371
386
"""
387
+ assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
388
+ assert all (map (lambda k : isinstance (k , TopicPartition ), offsets ))
389
+ assert all (map (lambda v : isinstance (v , OffsetAndMetadata ),
390
+ offsets .values ()))
391
+ if not offsets :
392
+ return Future ().success (None )
393
+
372
394
if self .config ['api_version' ] >= (0 , 8 , 2 ):
373
395
if self .coordinator_unknown ():
374
396
return Future ().failure (Errors .GroupCoordinatorNotAvailableError )
375
397
node_id = self .coordinator_id
376
398
else :
377
399
node_id = self ._client .least_loaded_node ()
378
400
379
- if not offsets :
380
- return Future ().failure (None )
381
-
382
401
# create the offset commit request
383
402
offset_data = collections .defaultdict (dict )
384
403
for tp , offset in six .iteritems (offsets ):
@@ -428,7 +447,7 @@ def _send_offset_commit_request(self, offsets):
428
447
future = Future ()
429
448
_f = self ._client .send (node_id , request )
430
449
_f .add_callback (self ._handle_offset_commit_response , offsets , future )
431
- _f .add_errback (self ._failed_request , future )
450
+ _f .add_errback (self ._failed_request , node_id , request , future )
432
451
return future
433
452
434
453
def _handle_offset_commit_response (self , offsets , future , response ):
@@ -513,6 +532,11 @@ def _send_offset_fetch_request(self, partitions):
513
532
Returns:
514
533
Future: resolves to dict of offsets: {TopicPartition: int}
515
534
"""
535
+ assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Unsupported Broker API'
536
+ assert all (map (lambda k : isinstance (k , TopicPartition ), partitions ))
537
+ if not partitions :
538
+ return Future ().success ({})
539
+
516
540
if self .config ['api_version' ] >= (0 , 8 , 2 ):
517
541
if self .coordinator_unknown ():
518
542
return Future ().failure (Errors .GroupCoordinatorNotAvailableError )
@@ -541,7 +565,7 @@ def _send_offset_fetch_request(self, partitions):
541
565
future = Future ()
542
566
_f = self ._client .send (node_id , request )
543
567
_f .add_callback (self ._handle_offset_fetch_response , future )
544
- _f .add_errback (self ._failed_request , future )
568
+ _f .add_errback (self ._failed_request , node_id , request , future )
545
569
return future
546
570
547
571
def _handle_offset_fetch_response (self , future , response ):
0 commit comments