Skip to content

Commit 65c8eb1

Browse files
committed
Got MultiProcessConsumer working
Other changes * Put a message size restriction on the shared queue - to prevent message overload * Wait for a while after each process is started (in constructor) * Wait for a while in each child if the consumer does not return any messages Just to be nice to the CPU. * Control the start event more granularly - this prevents infinite loops if the control does not return to the generator. For eg: for msg in consumer: assert False * Update message status before yield
1 parent 99da57f commit 65c8eb1

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

kafka/consumer.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from itertools import izip_longest, repeat
22
import logging
3+
import time
34
from threading import Lock
45
from multiprocessing import Process, Queue, Event, Value
6+
from Queue import Empty
57

68
from kafka.common import (
79
ErrorMapping, FetchRequest,
@@ -412,14 +414,14 @@ def __init__(self, client, group, topic, auto_commit=True,
412414

413415
# Initiate the base consumer class
414416
super(MultiProcessConsumer, self).__init__(client, group, topic,
415-
partitions=partitions,
417+
partitions=None,
416418
auto_commit=auto_commit,
417419
auto_commit_every_n=auto_commit_every_n,
418420
auto_commit_every_t=auto_commit_every_t)
419421

420422
# Variables for managing and controlling the data flow from
421423
# consumer child process to master
422-
self.queue = Queue() # Child consumers dump messages into this
424+
self.queue = Queue(1024) # Child consumers dump messages into this
423425
self.start = Event() # Indicates the consumers to start fetch
424426
self.exit = Event() # Requests the consumers to shutdown
425427
self.pause = Event() # Requests the consumers to pause fetch
@@ -441,9 +443,11 @@ def __init__(self, client, group, topic, auto_commit=True,
441443

442444
self.procs = []
443445
for chunk in chunks:
444-
proc = Process(target=_self._consume, args=(chunk,))
446+
chunk = filter(lambda x: x is not None, list(chunk))
447+
proc = Process(target=self._consume, args=(chunk,))
445448
proc.daemon = True
446449
proc.start()
450+
time.sleep(0.2)
447451
self.procs.append(proc)
448452

449453
def _consume(self, partitions):
@@ -468,7 +472,7 @@ def _consume(self, partitions):
468472
self.start.wait()
469473

470474
# If we are asked to quit, do so
471-
if self.exit.isSet():
475+
if self.exit.is_set():
472476
break
473477

474478
# Consume messages and add them to the queue. If the controller
@@ -488,6 +492,11 @@ def _consume(self, partitions):
488492
self.pause.wait()
489493
break
490494

495+
# In case we did not receive any message, give up the CPU for
496+
# a while before we try again
497+
if count == 0:
498+
time.sleep(0.1)
499+
491500
consumer.stop()
492501

493502
def stop(self):
@@ -507,21 +516,22 @@ def __iter__(self):
507516
# Trigger the consumer procs to start off.
508517
# We will iterate till there are no more messages available
509518
self.size.value = 0
510-
self.start.set()
511519
self.pause.set()
512520

513521
while True:
522+
self.start.set()
514523
try:
515524
# We will block for a small while so that the consumers get
516525
# a chance to run and put some messages in the queue
517-
partition, message = self.queue.get(block=True, timeout=0.1)
518-
except Queue.Empty:
526+
partition, message = self.queue.get(block=True, timeout=1)
527+
except Empty:
519528
break
520529

521-
yield message
522-
523530
# Count, check and commit messages if necessary
524531
self.offsets[partition] = message.offset
532+
self.start.clear()
533+
yield message
534+
525535
self.count_since_commit += 1
526536
self._auto_commit()
527537

@@ -555,7 +565,7 @@ def get_messages(self, count=1, block=True, timeout=10):
555565

556566
try:
557567
partition, message = self.queue.get(block, timeout)
558-
except Queue.Empty:
568+
except Empty:
559569
break
560570

561571
messages.append(message)

0 commit comments

Comments
 (0)