Skip to content

Commit 3499e2f

Browse files
committed
Some work on a simple consumer
1 parent 1b72132 commit 3499e2f

File tree

3 files changed

+91
-11
lines changed

3 files changed

+91
-11
lines changed

kafka/client.py

+32-4
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
685685
for produce_response in KafkaProtocol.decode_produce_response(response):
686686
# Check for errors
687687
if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
688-
raise Exception("ProduceRequest for %s failed with errorcode=%d",
688+
raise Exception("ProduceRequest for %s failed with errorcode=%d" %
689689
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
690690
# Run the callback
691691
if callback is not None:
@@ -825,14 +825,17 @@ def send_message(self, msg):
825825
resp = self.client.send_produce_request([req]).next()
826826

827827
class SimpleConsumer(object):
828+
"""
829+
A simple consumer implementation that consumes all partitions for a topic
830+
"""
828831
def __init__(self, client, group, topic):
829832
self.client = client
830833
self.topic = topic
831834
self.group = group
832835
self.client.load_metadata_for_topics(topic)
833836
self.offsets = {}
834837

835-
def get_or_init_offset(resp):
838+
def get_or_init_offset_callback(resp):
836839
if resp.error == ErrorMapping.NO_ERROR:
837840
return resp.offset
838841
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
@@ -843,8 +846,33 @@ def get_or_init_offset(resp):
843846

844847
for partition in self.client.topic_partitions[topic]:
845848
req = OffsetFetchRequest(topic, partition)
846-
(offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False)
849+
(offset,) = self.client.send_offset_fetch_request(group, [req],
850+
callback=get_or_init_offset_callback, fail_on_error=False)
847851
self.offsets[partition] = offset
848852

849-
print self.offsets
853+
def __iter__(self):
854+
iters = {}
855+
for partition, offset in self.offsets.items():
856+
iters[partition] = self.__iter_partition__(partition, offset)
857+
858+
while True:
859+
for it in iters.values():
860+
yield it.next()
861+
862+
def __iter_partition__(self, partition, offset):
863+
while True:
864+
req = FetchRequest(self.topic, partition, offset, 1024)
865+
(resp,) = self.client.send_fetch_request([req])
866+
assert resp.topic == self.topic
867+
assert resp.partition == partition
868+
next_offset = None
869+
for message in resp.messages:
870+
next_offset = message.offset
871+
yield message
872+
if next_offset is None:
873+
raise StopIteration("No more messages")
874+
else:
875+
offset = next_offset + 1
876+
# Commit offset here?
877+
850878

test/integration.py

+58-6
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ def build_kafka_classpath():
3535
return cp
3636

3737
class KafkaFixture(Thread):
38-
def __init__(self, host, port):
38+
def __init__(self, host, port, broker_id, zk_chroot=None):
3939
Thread.__init__(self)
40+
self.broker_id = broker_id
41+
self.zk_chroot = zk_chroot
4042
self.port = port
4143
self.capture = ""
4244
self.shouldDie = Event()
@@ -50,19 +52,24 @@ def run(self):
5052
stdout = open(os.path.join(logDir, 'stdout'), 'w')
5153

5254
# Create the config file
53-
zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_")
55+
if self.zk_chroot is None:
56+
self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_")
5457
logConfig = "test/resources/log4j.properties"
5558
configFile = os.path.join(self.tmpDir, 'server.properties')
5659
f = open('test/resources/server.properties', 'r')
5760
props = f.read()
5861
f = open(configFile, 'w')
59-
f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot})
62+
f.write(props % {'broker.id': self.broker_id,
63+
'kafka.port': self.port,
64+
'kafka.tmp.dir': logDir,
65+
'kafka.partitions': 2,
66+
'zk.chroot': self.zk_chroot})
6067
f.close()
6168

6269
cp = build_kafka_classpath()
6370

6471
# Create the Zookeeper chroot
65-
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot))
72+
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
6673
proc = subprocess.Popen(args)
6774
ret = proc.wait()
6875
assert ret == 0
@@ -123,7 +130,7 @@ def setUpClass(cls):
123130
cls.client = KafkaClient(host, port)
124131
else:
125132
port = get_open_port()
126-
cls.server = KafkaFixture("localhost", port)
133+
cls.server = KafkaFixture("localhost", port, 0)
127134
cls.server.start()
128135
cls.server.wait_for("Kafka server started")
129136
cls.client = KafkaClient("localhost", port)
@@ -367,10 +374,55 @@ def test_simple_producer(self):
367374
self.assertEquals(len(messages), 1)
368375
self.assertEquals(messages[0].message.value, "two")
369376

370-
# Consumer Tests
377+
class TestConsumer(unittest.TestCase):
378+
@classmethod
379+
def setUpClass(cls):
380+
# Broker 0
381+
port = get_open_port()
382+
cls.server1 = KafkaFixture("localhost", port, 0)
383+
cls.server1.start()
384+
cls.server1.wait_for("Kafka server started")
385+
386+
# Broker 1
387+
zk = cls.server1.zk_chroot
388+
port = get_open_port()
389+
cls.server2 = KafkaFixture("localhost", port, 1, zk)
390+
cls.server2.start()
391+
cls.server2.wait_for("Kafka server started")
392+
393+
# Client bootstraps from broker 1
394+
cls.client = KafkaClient("localhost", port)
395+
396+
@classmethod
397+
def tearDownClass(cls):
398+
cls.client.close()
399+
cls.server1.close()
400+
cls.server2.close()
371401

372402
def test_consumer(self):
403+
produce1 = ProduceRequest("test_consumer", 0, messages=[
404+
KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100)
405+
])
406+
407+
produce2 = ProduceRequest("test_consumer", 1, messages=[
408+
KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100)
409+
])
410+
411+
for resp in self.client.send_produce_request([produce1]):
412+
self.assertEquals(resp.error, 0)
413+
self.assertEquals(resp.offset, 0)
414+
415+
for resp in self.client.send_produce_request([produce2]):
416+
self.assertEquals(resp.error, 0)
417+
self.assertEquals(resp.offset, 0)
418+
373419
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
420+
all_messages = []
421+
for message in consumer:
422+
all_messages.append(message)
423+
424+
self.assertEquals(len(all_messages), 200)
425+
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
374426

375427
if __name__ == "__main__":
376428
logging.basicConfig(level=logging.INFO)

test/resources/server.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
############################# Server Basics #############################
1818

1919
# The id of the broker. This must be set to a unique integer for each broker.
20-
broker.id=0
20+
broker.id=%(broker.id)d
2121

2222
############################# Socket Server Settings #############################
2323

0 commit comments

Comments
 (0)