Skip to content

Commit 478de24

Browse files
committed
Adding some integration tests
1 parent 51d8bbb commit 478de24

File tree

3 files changed

+44
-8
lines changed

3 files changed

+44
-8
lines changed

example.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
def produce_example(kafka):
66
message = kafka.create_message("testing")
7-
request = ProduceRequest("my-topic", 0, [message])
7+
request = ProduceRequest("my-topic", -1, [message])
88
kafka.send_message_set(request)
99

1010
def consume_example(kafka):

kafka.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -584,18 +584,17 @@ def get_offsets(self, offsetRequest):
584584
# Simple User API #
585585
#######################
586586

587-
def send_messages_simple(self, topic, partition, *payloads):
587+
def send_messages_simple(self, topic, *payloads):
588588
"""
589589
Send one or more strings to Kafka
590590
591591
Params
592592
======
593593
topic: string
594-
partition: int
595594
payloads: strings
596595
"""
597596
messages = tuple([create_message(payload) for payload in payloads])
598-
self.send_message_set(ProduceRequest(topic, partition, messages))
597+
self.send_message_set(ProduceRequest(topic, -1, messages))
599598

600599
def iter_messages(self, topic, partition, offset, size, auto=True):
601600
"""

test/integration.py

+41-4
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,50 @@ def setUp(self):
8383
self.kafka = KafkaClient("localhost", port)
8484

8585
def test_produce(self):
86-
req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
86+
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
8787
self.kafka.send_message_set(req)
88-
self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
88+
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
8989

90-
req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
90+
req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
9191
self.kafka.send_message_set(req)
92-
self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
92+
self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
93+
94+
def test_produce_consume(self):
95+
message1 = KafkaClient.create_message("testing 1")
96+
message2 = KafkaClient.create_message("testing 2")
97+
req = ProduceRequest("test-produce-consume", 0, [message1, message2])
98+
self.kafka.send_message_set(req)
99+
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
100+
time.sleep(1)
101+
req = FetchRequest("test-produce-consume", 0, 0, 1024)
102+
(messages, req) = self.kafka.get_message_set(req)
103+
self.assertEquals(len(messages), 2)
104+
self.assertEquals(messages[0], message1)
105+
self.assertEquals(messages[1], message2)
106+
107+
message3 = KafkaClient.create_message("testing 3")
108+
message4 = KafkaClient.create_message("testing 4")
109+
req = ProduceRequest("test-produce-consume", 1, [message3, message4])
110+
self.kafka.send_message_set(req)
111+
self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
112+
time.sleep(1)
113+
req = FetchRequest("test-produce-consume", 1, 0, 1024)
114+
(messages, req) = self.kafka.get_message_set(req)
115+
self.assertEquals(len(messages), 2)
116+
self.assertEquals(messages[0], message3)
117+
self.assertEquals(messages[1], message4)
118+
119+
def test_check_offset(self):
120+
message1 = KafkaClient.create_message("testing 1")
121+
req = ProduceRequest("test-check-offset", 0, [message1])
122+
self.kafka.send_message_set(req)
123+
self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
124+
time.sleep(1)
125+
req = FetchRequest("test-check-offset", 0, 0, 1024)
126+
(messages, req) = self.kafka.get_message_set(req)
127+
self.assertEquals(len(messages), 1)
128+
self.assertEquals(messages[0], message1)
129+
assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
93130

94131
def tearDown(self):
95132
self.kafka.close()

0 commit comments

Comments
 (0)