Skip to content

Commit c13ee1d

Browse files
committed
Fix cases of single partition
1 parent 7b2a08f commit c13ee1d

File tree

2 files changed

+5
-2
lines changed

2 files changed

+5
-2
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
5959

6060
## Multiprocess consumer
6161
```python
62+
from kafka.consume import MultiProcessConsumer
63+
6264
# This will split the number of partitions among two processes
6365
consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2)
6466

kafka/consumer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,12 @@ def __init__(self, client, group, topic, auto_commit=True,
437437
partitions_per_proc += 1
438438

439439
# The final set of chunks
440-
chunks = map(None, *[iter(partitions)] * int(partitions_per_proc))
440+
chunker = lambda *x: [] + list(x)
441+
chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
441442

442443
self.procs = []
443444
for chunk in chunks:
444-
chunk = filter(lambda x: x is not None, list(chunk))
445+
chunk = filter(lambda x: x is not None, chunk)
445446
proc = Process(target=self._consume, args=(chunk,))
446447
proc.daemon = True
447448
proc.start()

0 commit comments

Comments
 (0)