1
- from __future__ import absolute_import
1
+ from __future__ import absolute_import , division
2
2
3
3
import copy
4
4
import logging
@@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator):
125
125
distribute partition ownership amongst consumer instances when
126
126
group management is used.
127
127
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
128
+ max_poll_records (int): The maximum number of records returned in a
129
+ single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
130
+ max_poll_interval_ms (int): The maximum delay between invocations of
131
+ :meth:`~kafka.KafkaConsumer.poll` when using consumer group
132
+ management. This places an upper bound on the amount of time that
133
+ the consumer can be idle before fetching more records. If
134
+ :meth:`~kafka.KafkaConsumer.poll` is not called before expiration
135
+ of this timeout, then the consumer is considered failed and the
136
+ group will rebalance in order to reassign the partitions to another
137
+ member. Default 300000
138
+ session_timeout_ms (int): The timeout used to detect failures when
139
+ using Kafka's group management facilities. The consumer sends
140
+ periodic heartbeats to indicate its liveness to the broker. If
141
+ no heartbeats are received by the broker before the expiration of
142
+ this session timeout, then the broker will remove this consumer
143
+ from the group and initiate a rebalance. Note that the value must
144
+ be in the allowable range as configured in the broker configuration
145
+ by group.min.session.timeout.ms and group.max.session.timeout.ms.
146
+ Default: 10000
128
147
heartbeat_interval_ms (int): The expected time in milliseconds
129
148
between heartbeats to the consumer coordinator when using
130
- Kafka's group management feature . Heartbeats are used to ensure
149
+ Kafka's group management facilities . Heartbeats are used to ensure
131
150
that the consumer's session stays active and to facilitate
132
151
rebalancing when new consumers join or leave the group. The
133
152
value must be set lower than session_timeout_ms, but typically
134
153
should be set no higher than 1/3 of that value. It can be
135
154
adjusted even lower to control the expected time for normal
136
155
rebalances. Default: 3000
137
- session_timeout_ms (int): The timeout used to detect failures when
138
- using Kafka's group management facilities. Default: 30000
139
- max_poll_records (int): The maximum number of records returned in a
140
- single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
141
156
receive_buffer_bytes (int): The size of the TCP receive buffer
142
157
(SO_RCVBUF) to use when reading data. Default: None (relies on
143
158
system defaults). The java client defaults to 32768.
@@ -236,7 +251,7 @@ class KafkaConsumer(six.Iterator):
236
251
'fetch_min_bytes' : 1 ,
237
252
'fetch_max_bytes' : 52428800 ,
238
253
'max_partition_fetch_bytes' : 1 * 1024 * 1024 ,
239
- 'request_timeout_ms' : 40 * 1000 ,
254
+ 'request_timeout_ms' : 305000 , # chosen to be higher than the default of max_poll_interval_ms
240
255
'retry_backoff_ms' : 100 ,
241
256
'reconnect_backoff_ms' : 50 ,
242
257
'reconnect_backoff_max_ms' : 1000 ,
@@ -248,9 +263,10 @@ class KafkaConsumer(six.Iterator):
248
263
'check_crcs' : True ,
249
264
'metadata_max_age_ms' : 5 * 60 * 1000 ,
250
265
'partition_assignment_strategy' : (RangePartitionAssignor , RoundRobinPartitionAssignor ),
251
- 'heartbeat_interval_ms' : 3000 ,
252
- 'session_timeout_ms' : 30000 ,
253
266
'max_poll_records' : 500 ,
267
+ 'max_poll_interval_ms' : 300000 ,
268
+ 'session_timeout_ms' : 10000 ,
269
+ 'heartbeat_interval_ms' : 3000 ,
254
270
'receive_buffer_bytes' : None ,
255
271
'send_buffer_bytes' : None ,
256
272
'socket_options' : [(socket .IPPROTO_TCP , socket .TCP_NODELAY , 1 )],
@@ -278,15 +294,16 @@ class KafkaConsumer(six.Iterator):
278
294
'sasl_plain_password' : None ,
279
295
'sasl_kerberos_service_name' : 'kafka'
280
296
}
297
+ DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
281
298
282
299
def __init__ (self , * topics , ** configs ):
283
- self .config = copy .copy (self .DEFAULT_CONFIG )
284
- for key in self .config :
285
- if key in configs :
286
- self .config [key ] = configs .pop (key )
287
-
288
300
# Only check for extra config keys in top-level class
289
- assert not configs , 'Unrecognized configs: %s' % configs
301
+ extra_configs = set (configs ).difference (self .DEFAULT_CONFIG )
302
+ if extra_configs :
303
+ raise KafkaConfigurationError ("Unrecognized configs: %s" % extra_configs )
304
+
305
+ self .config = copy .copy (self .DEFAULT_CONFIG )
306
+ self .config .update (configs )
290
307
291
308
deprecated = {'smallest' : 'earliest' , 'largest' : 'latest' }
292
309
if self .config ['auto_offset_reset' ] in deprecated :
@@ -296,12 +313,7 @@ def __init__(self, *topics, **configs):
296
313
self .config ['auto_offset_reset' ] = new_config
297
314
298
315
request_timeout_ms = self .config ['request_timeout_ms' ]
299
- session_timeout_ms = self .config ['session_timeout_ms' ]
300
316
fetch_max_wait_ms = self .config ['fetch_max_wait_ms' ]
301
- if request_timeout_ms <= session_timeout_ms :
302
- raise KafkaConfigurationError (
303
- "Request timeout (%s) must be larger than session timeout (%s)" %
304
- (request_timeout_ms , session_timeout_ms ))
305
317
if request_timeout_ms <= fetch_max_wait_ms :
306
318
raise KafkaConfigurationError ("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
307
319
(request_timeout_ms , fetch_max_wait_ms ))
@@ -330,6 +342,25 @@ def __init__(self, *topics, **configs):
330
342
if self .config ['api_version' ] is None :
331
343
self .config ['api_version' ] = self ._client .config ['api_version' ]
332
344
345
+ # Coordinator configurations are different for older brokers
346
+ # max_poll_interval_ms is not supported directly -- it must the be
347
+ # the same as session_timeout_ms. If the user provides one of them,
348
+ # use it for both. Otherwise use the old default of 30secs
349
+ if self .config ['api_version' ] < (0 , 10 , 1 ):
350
+ if 'session_timeout_ms' not in configs :
351
+ if 'max_poll_interval_ms' in configs :
352
+ self .config ['session_timeout_ms' ] = configs ['max_poll_interval_ms' ]
353
+ else :
354
+ self .config ['session_timeout_ms' ] = self .DEFAULT_SESSION_TIMEOUT_MS_0_9
355
+ if 'max_poll_interval_ms' not in configs :
356
+ self .config ['max_poll_interval_ms' ] = self .config ['session_timeout_ms' ]
357
+
358
+ if self .config ['group_id' ] is not None :
359
+ if self .config ['request_timeout_ms' ] <= self .config ['session_timeout_ms' ]:
360
+ raise KafkaConfigurationError (
361
+ "Request timeout (%s) must be larger than session timeout (%s)" %
362
+ (self .config ['request_timeout_ms' ], self .config ['session_timeout_ms' ]))
363
+
333
364
self ._subscription = SubscriptionState (self .config ['auto_offset_reset' ])
334
365
self ._fetcher = Fetcher (
335
366
self ._client , self ._subscription , self ._metrics , ** self .config )
@@ -587,12 +618,7 @@ def _poll_once(self, timeout_ms, max_records):
587
618
Returns:
588
619
dict: Map of topic to list of records (may be empty).
589
620
"""
590
- if self ._use_consumer_group ():
591
- self ._coordinator .ensure_active_group ()
592
-
593
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
594
- elif self .config ['group_id' ] is not None and self .config ['api_version' ] >= (0 , 8 , 2 ):
595
- self ._coordinator .ensure_coordinator_ready ()
621
+ self ._coordinator .poll ()
596
622
597
623
# Fetch positions if we have partitions we're subscribed to that we
598
624
# don't know the offset for
@@ -614,6 +640,7 @@ def _poll_once(self, timeout_ms, max_records):
614
640
# Send any new fetches (won't resend pending fetches)
615
641
self ._fetcher .send_fetches ()
616
642
643
+ timeout_ms = min (timeout_ms , self ._coordinator .time_to_next_poll ())
617
644
self ._client .poll (timeout_ms = timeout_ms )
618
645
records , _ = self ._fetcher .fetched_records (max_records )
619
646
return records
@@ -1014,13 +1041,7 @@ def _message_generator(self):
1014
1041
assert self .assignment () or self .subscription () is not None , 'No topic subscription or manual partition assignment'
1015
1042
while time .time () < self ._consumer_timeout :
1016
1043
1017
- if self ._use_consumer_group ():
1018
- self ._coordinator .ensure_coordinator_ready ()
1019
- self ._coordinator .ensure_active_group ()
1020
-
1021
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
1022
- elif self .config ['group_id' ] is not None and self .config ['api_version' ] >= (0 , 8 , 2 ):
1023
- self ._coordinator .ensure_coordinator_ready ()
1044
+ self ._coordinator .poll ()
1024
1045
1025
1046
# Fetch offsets for any subscribed partitions that we arent tracking yet
1026
1047
if not self ._subscription .has_all_fetch_positions ():
@@ -1068,19 +1089,8 @@ def _message_generator(self):
1068
1089
1069
1090
def _next_timeout (self ):
1070
1091
timeout = min (self ._consumer_timeout ,
1071
- self ._client ._delayed_tasks .next_at () + time .time (),
1072
- self ._client .cluster .ttl () / 1000.0 + time .time ())
1073
-
1074
- # Although the delayed_tasks timeout above should cover processing
1075
- # HeartbeatRequests, it is still possible that HeartbeatResponses
1076
- # are left unprocessed during a long _fetcher iteration without
1077
- # an intermediate poll(). And because tasks are responsible for
1078
- # rescheduling themselves, an unprocessed response will prevent
1079
- # the next heartbeat from being sent. This check should help
1080
- # avoid that.
1081
- if self ._use_consumer_group ():
1082
- heartbeat = time .time () + self ._coordinator .heartbeat .ttl ()
1083
- timeout = min (timeout , heartbeat )
1092
+ self ._client .cluster .ttl () / 1000.0 + time .time (),
1093
+ self ._coordinator .time_to_next_poll () + time .time ())
1084
1094
return timeout
1085
1095
1086
1096
def __iter__ (self ): # pylint: disable=non-iterator-returned
0 commit comments