1
1
from itertools import izip_longest , repeat
2
2
import logging
3
+ import time
3
4
from threading import Lock
4
5
from multiprocessing import Process , Queue , Event , Value
6
+ from Queue import Empty
5
7
6
8
from kafka .common import (
7
9
ErrorMapping , FetchRequest ,
@@ -412,14 +414,14 @@ def __init__(self, client, group, topic, auto_commit=True,
412
414
413
415
# Initiate the base consumer class
414
416
super (MultiProcessConsumer , self ).__init__ (client , group , topic ,
415
- partitions = partitions ,
417
+ partitions = None ,
416
418
auto_commit = auto_commit ,
417
419
auto_commit_every_n = auto_commit_every_n ,
418
420
auto_commit_every_t = auto_commit_every_t )
419
421
420
422
# Variables for managing and controlling the data flow from
421
423
# consumer child process to master
422
- self .queue = Queue () # Child consumers dump messages into this
424
+ self .queue = Queue (1024 ) # Child consumers dump messages into this
423
425
self .start = Event () # Indicates the consumers to start fetch
424
426
self .exit = Event () # Requests the consumers to shutdown
425
427
self .pause = Event () # Requests the consumers to pause fetch
@@ -441,9 +443,11 @@ def __init__(self, client, group, topic, auto_commit=True,
441
443
442
444
self .procs = []
443
445
for chunk in chunks :
444
- proc = Process (target = _self ._consume , args = (chunk ,))
446
+ chunk = filter (lambda x : x is not None , list (chunk ))
447
+ proc = Process (target = self ._consume , args = (chunk ,))
445
448
proc .daemon = True
446
449
proc .start ()
450
+ time .sleep (0.2 )
447
451
self .procs .append (proc )
448
452
449
453
def _consume (self , partitions ):
@@ -468,7 +472,7 @@ def _consume(self, partitions):
468
472
self .start .wait ()
469
473
470
474
# If we are asked to quit, do so
471
- if self .exit .isSet ():
475
+ if self .exit .is_set ():
472
476
break
473
477
474
478
# Consume messages and add them to the queue. If the controller
@@ -488,6 +492,11 @@ def _consume(self, partitions):
488
492
self .pause .wait ()
489
493
break
490
494
495
+ # In case we did not receive any message, give up the CPU for
496
+ # a while before we try again
497
+ if count == 0 :
498
+ time .sleep (0.1 )
499
+
491
500
consumer .stop ()
492
501
493
502
def stop (self ):
@@ -507,21 +516,22 @@ def __iter__(self):
507
516
# Trigger the consumer procs to start off.
508
517
# We will iterate till there are no more messages available
509
518
self .size .value = 0
510
- self .start .set ()
511
519
self .pause .set ()
512
520
513
521
while True :
522
+ self .start .set ()
514
523
try :
515
524
# We will block for a small while so that the consumers get
516
525
# a chance to run and put some messages in the queue
517
- partition , message = self .queue .get (block = True , timeout = 0. 1 )
518
- except Queue . Empty :
526
+ partition , message = self .queue .get (block = True , timeout = 1 )
527
+ except Empty :
519
528
break
520
529
521
- yield message
522
-
523
530
# Count, check and commit messages if necessary
524
531
self .offsets [partition ] = message .offset
532
+ self .start .clear ()
533
+ yield message
534
+
525
535
self .count_since_commit += 1
526
536
self ._auto_commit ()
527
537
@@ -555,7 +565,7 @@ def get_messages(self, count=1, block=True, timeout=10):
555
565
556
566
try :
557
567
partition , message = self .queue .get (block , timeout )
558
- except Queue . Empty :
568
+ except Empty :
559
569
break
560
570
561
571
messages .append (message )
0 commit comments