Skip to content

Commit bbd6444

Browse files
author
Dana Powers
committed
ConsumerCoordinator cleanups
- default assignors to RoundRobinPartitionAssignor - check offsets types in commit_offsets_* methods - succeed future in _send_offset_commit_request when no offsets - raise exception if no subscribed topics in group_protocols() - fix _subscription typo in metadata listener callbacks - short circuit if no partitions passed to fetch_committed_offsets - line-wrap comments - return future from commit_offsets_async - return future value from commit_offsets_sync - fix self._failed_request callback partial args - comment out metrics class for now
1 parent 35ed2e7 commit bbd6444

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

kafka/coordinator/base.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ def _handle_heartbeat_failure(self, e):
621621
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
622622
self._client.schedule(self, etd)
623623

624-
624+
'''
625625
class GroupCoordinatorMetrics(object):
626626
def __init__(self, metrics, prefix, tags=None):
627627
self.metrics = metrics
@@ -674,5 +674,4 @@ def __init__(self, metrics, prefix, tags=None):
674674
"The number of seconds since the last controller heartbeat",
675675
tags), lastHeartbeat)
676676
"""
677-
678-
677+
'''

kafka/coordinator/consumer.py

+34-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import six
99

1010
from .base import BaseCoordinator
11+
from .assignors.roundrobin import RoundRobinPartitionAssignor
1112
from .protocol import (
1213
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
1314
ConsumerProtocol)
@@ -29,7 +30,7 @@ class ConsumerCoordinator(BaseCoordinator):
2930
'enable_auto_commit': True,
3031
'auto_commit_interval_ms': 5000,
3132
'default_offset_commit_callback': lambda offsets, response: True,
32-
'assignors': (),
33+
'assignors': (RoundRobinPartitionAssignor,),
3334
'session_timeout_ms': 30000,
3435
'heartbeat_interval_ms': 3000,
3536
'retry_backoff_ms': 100,
@@ -100,6 +101,7 @@ def protocol_type(self):
100101
def group_protocols(self):
101102
"""Returns list of preferred (protocols, metadata)"""
102103
topics = self._subscription.subscription
104+
assert topics is not None, 'Consumer has not subscribed to topics'
103105
metadata_list = []
104106
for assignor in self.config['assignors']:
105107
metadata = assignor.metadata(topics)
@@ -111,7 +113,7 @@ def _handle_metadata_update(self, cluster):
111113
# if we encounter any unauthorized topics, raise an exception
112114
# TODO
113115
#if self._cluster.unauthorized_topics:
114-
# raise Errors.TopicAuthorizationError(self._cluster.unauthorized_topics)
116+
# raise TopicAuthorizationError(self._cluster.unauthorized_topics)
115117

116118
if self._subscription.subscribed_pattern:
117119
topics = []
@@ -122,7 +124,8 @@ def _handle_metadata_update(self, cluster):
122124
self._subscription.change_subscription(topics)
123125
self._client.set_topics(self._subscription.group_subscription())
124126

125-
# check if there are any changes to the metadata which should trigger a rebalance
127+
# check if there are any changes to the metadata which should trigger
128+
# a rebalance
126129
if self._subscription_metadata_changed():
127130
if self.config['api_version'] >= (0, 9):
128131
self._subscription.mark_for_reassignment()
@@ -182,7 +185,7 @@ def _on_join_complete(self, generation, member_id, protocol,
182185
# execute the user's callback after rebalance
183186
if self._subscription.listener:
184187
try:
185-
self._subscriptions.listener.on_partitions_assigned(assigned)
188+
self._subscription.listener.on_partitions_assigned(assigned)
186189
except Exception:
187190
log.exception("User provided listener failed on partition"
188191
" assignment: %s", assigned)
@@ -263,6 +266,9 @@ def fetch_committed_offsets(self, partitions):
263266
Returns:
264267
dict: {TopicPartition: OffsetAndMetadata}
265268
"""
269+
if not partitions:
270+
return {}
271+
266272
while True:
267273
if self.config['api_version'] >= (0, 8, 2):
268274
self.ensure_coordinator_known()
@@ -297,11 +303,16 @@ def commit_offsets_async(self, offsets, callback=None):
297303
Returns:
298304
Future: indicating whether the commit was successful or not
299305
"""
306+
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
307+
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
308+
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
309+
offsets.values()))
300310
if callback is None:
301311
callback = self.config['default_offset_commit_callback']
302312
self._subscription.needs_fetch_committed_offsets = True
303313
future = self._send_offset_commit_request(offsets)
304314
future.add_both(callback, offsets)
315+
return future
305316

306317
def commit_offsets_sync(self, offsets):
307318
"""Commit specific offsets synchronously.
@@ -314,6 +325,10 @@ def commit_offsets_sync(self, offsets):
314325
315326
Raises error on failure
316327
"""
328+
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
329+
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
330+
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
331+
offsets.values()))
317332
if not offsets:
318333
return
319334

@@ -325,7 +340,7 @@ def commit_offsets_sync(self, offsets):
325340
self._client.poll(future=future)
326341

327342
if future.succeeded():
328-
return
343+
return future.value
329344

330345
if not future.retriable():
331346
raise future.exception # pylint: disable-msg=raising-bad-type
@@ -369,16 +384,20 @@ def _send_offset_commit_request(self, offsets):
369384
Returns:
370385
Future: indicating whether the commit was successful or not
371386
"""
387+
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
388+
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
389+
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
390+
offsets.values()))
391+
if not offsets:
392+
return Future().success(None)
393+
372394
if self.config['api_version'] >= (0, 8, 2):
373395
if self.coordinator_unknown():
374396
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
375397
node_id = self.coordinator_id
376398
else:
377399
node_id = self._client.least_loaded_node()
378400

379-
if not offsets:
380-
return Future().failure(None)
381-
382401
# create the offset commit request
383402
offset_data = collections.defaultdict(dict)
384403
for tp, offset in six.iteritems(offsets):
@@ -428,7 +447,7 @@ def _send_offset_commit_request(self, offsets):
428447
future = Future()
429448
_f = self._client.send(node_id, request)
430449
_f.add_callback(self._handle_offset_commit_response, offsets, future)
431-
_f.add_errback(self._failed_request, future)
450+
_f.add_errback(self._failed_request, node_id, request, future)
432451
return future
433452

434453
def _handle_offset_commit_response(self, offsets, future, response):
@@ -513,6 +532,11 @@ def _send_offset_fetch_request(self, partitions):
513532
Returns:
514533
Future: resolves to dict of offsets: {TopicPartition: int}
515534
"""
535+
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
536+
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
537+
if not partitions:
538+
return Future().success({})
539+
516540
if self.config['api_version'] >= (0, 8, 2):
517541
if self.coordinator_unknown():
518542
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
@@ -541,7 +565,7 @@ def _send_offset_fetch_request(self, partitions):
541565
future = Future()
542566
_f = self._client.send(node_id, request)
543567
_f.add_callback(self._handle_offset_fetch_response, future)
544-
_f.add_errback(self._failed_request, future)
568+
_f.add_errback(self._failed_request, node_id, request, future)
545569
return future
546570

547571
def _handle_offset_fetch_response(self, future, response):

0 commit comments

Comments
 (0)