Skip to content

Commit ad024d1

Browse files
authored
KAFKA-3888 Use background thread to process consumer heartbeats (dpkp#1266)
1 parent 995664c commit ad024d1

14 files changed

+977
-738
lines changed

kafka/client_async.py

+182-283
Large diffs are not rendered by default.

kafka/conn.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ def can_send_more(self):
685685
def recv(self):
686686
"""Non-blocking network receive.
687687
688-
Return list of (response, future)
688+
Return list of (response, future) tuples
689689
"""
690690
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
691691
log.warning('%s cannot recv: socket not connected', self)

kafka/consumer/fetcher.py

+3
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,9 @@ def _create_fetch_requests(self):
674674
fetchable[node_id][partition.topic].append(partition_info)
675675
log.debug("Adding fetch request for partition %s at offset %d",
676676
partition, position)
677+
else:
678+
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
679+
partition, node_id)
677680

678681
if self.config['api_version'] >= (0, 11, 0):
679682
version = 4

kafka/consumer/group.py

+56-46
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import absolute_import
1+
from __future__ import absolute_import, division
22

33
import copy
44
import logging
@@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator):
125125
distribute partition ownership amongst consumer instances when
126126
group management is used.
127127
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
128+
max_poll_records (int): The maximum number of records returned in a
129+
single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
130+
max_poll_interval_ms (int): The maximum delay between invocations of
131+
:meth:`~kafka.KafkaConsumer.poll` when using consumer group
132+
management. This places an upper bound on the amount of time that
133+
the consumer can be idle before fetching more records. If
134+
:meth:`~kafka.KafkaConsumer.poll` is not called before expiration
135+
of this timeout, then the consumer is considered failed and the
136+
group will rebalance in order to reassign the partitions to another
137+
member. Default 300000
138+
session_timeout_ms (int): The timeout used to detect failures when
139+
using Kafka's group management facilities. The consumer sends
140+
periodic heartbeats to indicate its liveness to the broker. If
141+
no heartbeats are received by the broker before the expiration of
142+
this session timeout, then the broker will remove this consumer
143+
from the group and initiate a rebalance. Note that the value must
144+
be in the allowable range as configured in the broker configuration
145+
by group.min.session.timeout.ms and group.max.session.timeout.ms.
146+
Default: 10000
128147
heartbeat_interval_ms (int): The expected time in milliseconds
129148
between heartbeats to the consumer coordinator when using
130-
Kafka's group management feature. Heartbeats are used to ensure
149+
Kafka's group management facilities. Heartbeats are used to ensure
131150
that the consumer's session stays active and to facilitate
132151
rebalancing when new consumers join or leave the group. The
133152
value must be set lower than session_timeout_ms, but typically
134153
should be set no higher than 1/3 of that value. It can be
135154
adjusted even lower to control the expected time for normal
136155
rebalances. Default: 3000
137-
session_timeout_ms (int): The timeout used to detect failures when
138-
using Kafka's group management facilities. Default: 30000
139-
max_poll_records (int): The maximum number of records returned in a
140-
single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
141156
receive_buffer_bytes (int): The size of the TCP receive buffer
142157
(SO_RCVBUF) to use when reading data. Default: None (relies on
143158
system defaults). The java client defaults to 32768.
@@ -236,7 +251,7 @@ class KafkaConsumer(six.Iterator):
236251
'fetch_min_bytes': 1,
237252
'fetch_max_bytes': 52428800,
238253
'max_partition_fetch_bytes': 1 * 1024 * 1024,
239-
'request_timeout_ms': 40 * 1000,
254+
'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
240255
'retry_backoff_ms': 100,
241256
'reconnect_backoff_ms': 50,
242257
'reconnect_backoff_max_ms': 1000,
@@ -248,9 +263,10 @@ class KafkaConsumer(six.Iterator):
248263
'check_crcs': True,
249264
'metadata_max_age_ms': 5 * 60 * 1000,
250265
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
251-
'heartbeat_interval_ms': 3000,
252-
'session_timeout_ms': 30000,
253266
'max_poll_records': 500,
267+
'max_poll_interval_ms': 300000,
268+
'session_timeout_ms': 10000,
269+
'heartbeat_interval_ms': 3000,
254270
'receive_buffer_bytes': None,
255271
'send_buffer_bytes': None,
256272
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
@@ -278,15 +294,16 @@ class KafkaConsumer(six.Iterator):
278294
'sasl_plain_password': None,
279295
'sasl_kerberos_service_name': 'kafka'
280296
}
297+
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
281298

282299
def __init__(self, *topics, **configs):
283-
self.config = copy.copy(self.DEFAULT_CONFIG)
284-
for key in self.config:
285-
if key in configs:
286-
self.config[key] = configs.pop(key)
287-
288300
# Only check for extra config keys in top-level class
289-
assert not configs, 'Unrecognized configs: %s' % configs
301+
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
302+
if extra_configs:
303+
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
304+
305+
self.config = copy.copy(self.DEFAULT_CONFIG)
306+
self.config.update(configs)
290307

291308
deprecated = {'smallest': 'earliest', 'largest': 'latest'}
292309
if self.config['auto_offset_reset'] in deprecated:
@@ -296,12 +313,7 @@ def __init__(self, *topics, **configs):
296313
self.config['auto_offset_reset'] = new_config
297314

298315
request_timeout_ms = self.config['request_timeout_ms']
299-
session_timeout_ms = self.config['session_timeout_ms']
300316
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
301-
if request_timeout_ms <= session_timeout_ms:
302-
raise KafkaConfigurationError(
303-
"Request timeout (%s) must be larger than session timeout (%s)" %
304-
(request_timeout_ms, session_timeout_ms))
305317
if request_timeout_ms <= fetch_max_wait_ms:
306318
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
307319
(request_timeout_ms, fetch_max_wait_ms))
@@ -330,6 +342,25 @@ def __init__(self, *topics, **configs):
330342
if self.config['api_version'] is None:
331343
self.config['api_version'] = self._client.config['api_version']
332344

345+
# Coordinator configurations are different for older brokers
346+
# max_poll_interval_ms is not supported directly -- it must the be
347+
# the same as session_timeout_ms. If the user provides one of them,
348+
# use it for both. Otherwise use the old default of 30secs
349+
if self.config['api_version'] < (0, 10, 1):
350+
if 'session_timeout_ms' not in configs:
351+
if 'max_poll_interval_ms' in configs:
352+
self.config['session_timeout_ms'] = configs['max_poll_interval_ms']
353+
else:
354+
self.config['session_timeout_ms'] = self.DEFAULT_SESSION_TIMEOUT_MS_0_9
355+
if 'max_poll_interval_ms' not in configs:
356+
self.config['max_poll_interval_ms'] = self.config['session_timeout_ms']
357+
358+
if self.config['group_id'] is not None:
359+
if self.config['request_timeout_ms'] <= self.config['session_timeout_ms']:
360+
raise KafkaConfigurationError(
361+
"Request timeout (%s) must be larger than session timeout (%s)" %
362+
(self.config['request_timeout_ms'], self.config['session_timeout_ms']))
363+
333364
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
334365
self._fetcher = Fetcher(
335366
self._client, self._subscription, self._metrics, **self.config)
@@ -587,12 +618,7 @@ def _poll_once(self, timeout_ms, max_records):
587618
Returns:
588619
dict: Map of topic to list of records (may be empty).
589620
"""
590-
if self._use_consumer_group():
591-
self._coordinator.ensure_active_group()
592-
593-
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
594-
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
595-
self._coordinator.ensure_coordinator_ready()
621+
self._coordinator.poll()
596622

597623
# Fetch positions if we have partitions we're subscribed to that we
598624
# don't know the offset for
@@ -614,6 +640,7 @@ def _poll_once(self, timeout_ms, max_records):
614640
# Send any new fetches (won't resend pending fetches)
615641
self._fetcher.send_fetches()
616642

643+
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
617644
self._client.poll(timeout_ms=timeout_ms)
618645
records, _ = self._fetcher.fetched_records(max_records)
619646
return records
@@ -1014,13 +1041,7 @@ def _message_generator(self):
10141041
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
10151042
while time.time() < self._consumer_timeout:
10161043

1017-
if self._use_consumer_group():
1018-
self._coordinator.ensure_coordinator_ready()
1019-
self._coordinator.ensure_active_group()
1020-
1021-
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
1022-
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
1023-
self._coordinator.ensure_coordinator_ready()
1044+
self._coordinator.poll()
10241045

10251046
# Fetch offsets for any subscribed partitions that we arent tracking yet
10261047
if not self._subscription.has_all_fetch_positions():
@@ -1068,19 +1089,8 @@ def _message_generator(self):
10681089

10691090
def _next_timeout(self):
10701091
timeout = min(self._consumer_timeout,
1071-
self._client._delayed_tasks.next_at() + time.time(),
1072-
self._client.cluster.ttl() / 1000.0 + time.time())
1073-
1074-
# Although the delayed_tasks timeout above should cover processing
1075-
# HeartbeatRequests, it is still possible that HeartbeatResponses
1076-
# are left unprocessed during a long _fetcher iteration without
1077-
# an intermediate poll(). And because tasks are responsible for
1078-
# rescheduling themselves, an unprocessed response will prevent
1079-
# the next heartbeat from being sent. This check should help
1080-
# avoid that.
1081-
if self._use_consumer_group():
1082-
heartbeat = time.time() + self._coordinator.heartbeat.ttl()
1083-
timeout = min(timeout, heartbeat)
1092+
self._client.cluster.ttl() / 1000.0 + time.time(),
1093+
self._coordinator.time_to_next_poll() + time.time())
10841094
return timeout
10851095

10861096
def __iter__(self): # pylint: disable=non-iterator-returned

0 commit comments

Comments
 (0)