@@ -126,8 +126,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
126
126
auto_commit_every_t = auto_commit_every_t )
127
127
128
128
if max_buffer_size is not None and buffer_size > max_buffer_size :
129
- raise ValueError (" buffer_size (%d) is greater than "
130
- " max_buffer_size (%d)" %
129
+ raise ValueError (' buffer_size (%d) is greater than '
130
+ ' max_buffer_size (%d)' %
131
131
(buffer_size , max_buffer_size ))
132
132
self .buffer_size = buffer_size
133
133
self .max_buffer_size = max_buffer_size
@@ -227,7 +227,7 @@ def seek(self, offset, whence):
227
227
self .offsets [resp .partition ] = \
228
228
resp .offsets [0 ] + deltas [resp .partition ]
229
229
else :
230
- raise ValueError (" Unexpected value for `whence`, %d" % whence )
230
+ raise ValueError (' Unexpected value for `whence`, %d' % whence )
231
231
232
232
# Reset queue and fetch offsets since they are invalid
233
233
self .fetch_offsets = self .offsets .copy ()
@@ -250,35 +250,32 @@ def get_messages(self, count=1, block=True, timeout=0.1):
250
250
"""
251
251
messages = []
252
252
if timeout is not None :
253
- max_time = time .time () + timeout
253
+ timeout + = time .time ()
254
254
255
255
new_offsets = {}
256
- while count > 0 and (timeout is None or timeout > 0 ):
257
- result = self ._get_message (block , timeout , get_partition_info = True ,
256
+ log .debug ('getting %d messages' , count )
257
+ while len (messages ) < count :
258
+ block_time = timeout - time .time ()
259
+ log .debug ('calling _get_message block=%s timeout=%s' , block , block_time )
260
+ result = self ._get_message (block , block_time ,
261
+ get_partition_info = True ,
258
262
update_offset = False )
259
- if result :
260
- partition , message = result
261
- if self .partition_info :
262
- messages .append (result )
263
- else :
264
- messages .append (message )
265
- new_offsets [partition ] = message .offset + 1
266
- count -= 1
267
- else :
268
- # Ran out of messages for the last request.
269
- if not block :
270
- # If we're not blocking, break.
271
- break
263
+ log .debug ('got %s from _get_messages' , result )
264
+ if not result :
265
+ if block and (timeout is None or time .time () <= timeout ):
266
+ continue
267
+ break
272
268
273
- # If we have a timeout, reduce it to the
274
- # appropriate value
275
- if timeout is not None :
276
- timeout = max_time - time . time ()
269
+ partition , message = result
270
+ _msg = ( partition , message ) if self . partition_info else message
271
+ messages . append ( _msg )
272
+ new_offsets [ partition ] = message . offset + 1
277
273
278
274
# Update and commit offsets if necessary
279
275
self .offsets .update (new_offsets )
280
276
self .count_since_commit += len (messages )
281
277
self ._auto_commit ()
278
+ log .debug ('got %d messages: %s' , len (messages ), messages )
282
279
return messages
283
280
284
281
def get_message (self , block = True , timeout = 0.1 , get_partition_info = None ):
@@ -292,10 +289,16 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
292
289
If get_partition_info is True, returns (partition, message)
293
290
If get_partition_info is False, returns message
294
291
"""
295
- if self .queue .empty ():
292
+ start_at = time .time ()
293
+ while self .queue .empty ():
296
294
# We're out of messages, go grab some more.
295
+ log .debug ('internal queue empty, fetching more messages' )
297
296
with FetchContext (self , block , timeout ):
298
297
self ._fetch ()
298
+
299
+ if not block or time .time () > (start_at + timeout ):
300
+ break
301
+
299
302
try :
300
303
partition , message = self .queue .get_nowait ()
301
304
@@ -314,6 +317,7 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
314
317
else :
315
318
return message
316
319
except Empty :
320
+ log .debug ('internal queue empty after fetch - returning None' )
317
321
return None
318
322
319
323
def __iter__ (self ):
@@ -396,20 +400,20 @@ def _fetch(self):
396
400
except ConsumerFetchSizeTooSmall :
397
401
if (self .max_buffer_size is not None and
398
402
buffer_size == self .max_buffer_size ):
399
- log .error (" Max fetch size %d too small" ,
403
+ log .error (' Max fetch size %d too small' ,
400
404
self .max_buffer_size )
401
405
raise
402
406
if self .max_buffer_size is None :
403
407
buffer_size *= 2
404
408
else :
405
409
buffer_size = min (buffer_size * 2 ,
406
410
self .max_buffer_size )
407
- log .warn ( " Fetch size too small, increase to %d (2x) "
408
- " and retry" , buffer_size )
411
+ log .warning ( ' Fetch size too small, increase to %d (2x) '
412
+ ' and retry' , buffer_size )
409
413
retry_partitions [partition ] = buffer_size
410
414
except ConsumerNoMoreData as e :
411
- log .debug (" Iteration was ended by %r" , e )
415
+ log .debug (' Iteration was ended by %r' , e )
412
416
except StopIteration :
413
417
# Stop iterating through this partition
414
- log .debug (" Done iterating over partition %s" % partition )
418
+ log .debug (' Done iterating over partition %s' , partition )
415
419
partitions = retry_partitions
0 commit comments