Skip to content

Commit d9aafb1

Browse files
committed
Isn't it nice when tests actually find bugs
1 parent cab6fee commit d9aafb1

File tree

2 files changed

+51
-6
lines changed

2 files changed

+51
-6
lines changed

kafka/client.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,6 @@ def send_message_set(self, produceRequest):
454454
if sent == 0:
455455
raise RuntimeError("Kafka went away")
456456

457-
458457
def send_multi_message_set(self, produceRequests):
459458
"""
460459
Send a MultiProduceRequest
@@ -550,8 +549,8 @@ def get_offsets(self, offsetRequest):
550549
<offset> ::= <int64>
551550
552551
"""
553-
req = length_prefix_message(encode_offset_request(offsetRequest))
554-
log.debug("Sending %d bytes to Kafka", len(req))
552+
req = length_prefix_message(self.encode_offset_request(offsetRequest))
553+
log.debug("Sending OffsetRequest of %d bytes to Kafka", len(req))
555554
sent = self._sock.send(req)
556555
if sent == 0:
557556
raise RuntimeError("Kafka went away")
@@ -574,7 +573,7 @@ def send_messages_simple(self, topic, *payloads):
574573
topic: string
575574
payloads: strings
576575
"""
577-
messages = tuple([create_message(payload) for payload in payloads])
576+
messages = tuple([self.create_message(payload) for payload in payloads])
578577
self.send_message_set(ProduceRequest(topic, -1, messages))
579578

580579
def iter_messages(self, topic, partition, offset, size, auto=True):

test/integration.py

+48-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import glob
2+
import logging
23
import os
34
import select
45
import shlex
@@ -11,7 +12,7 @@
1112
import time
1213
import unittest
1314

14-
from kafka.client import KafkaClient, ProduceRequest, FetchRequest
15+
from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
1516

1617
def get_open_port():
1718
sock = socket.socket()
@@ -70,7 +71,7 @@ def run(self):
7071
killed = True
7172

7273
if proc.poll() is not None:
73-
#shutil.rmtree(self.tmpDir)
74+
shutil.rmtree(self.tmpDir)
7475
if killed:
7576
break
7677
else:
@@ -101,6 +102,11 @@ def tearDownClass(cls):
101102
cls.kafka.close()
102103
cls.server.shouldDie.set()
103104

105+
def test_send_simple(self):
106+
self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3")
107+
self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'"))
108+
self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple"))
109+
104110
def test_produce(self):
105111
# Produce a message, check that the log got created
106112
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
@@ -164,6 +170,46 @@ def test_check_offset(self):
164170
self.assertEquals(messages[0], message2)
165171
self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
166172

173+
def test_iterator(self):
174+
# Produce 100 messages
175+
messages = []
176+
for i in range(100):
177+
messages.append(KafkaClient.create_message("testing %d" % i))
178+
req = ProduceRequest("test-iterator", 0, messages)
179+
self.kafka.send_message_set(req)
180+
self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0"))
181+
self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'"))
182+
183+
# Initialize an iterator of fetch size 64 bytes - big enough for one message
184+
# but not enough for all 100 messages
185+
cnt = 0
186+
for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)):
187+
self.assertEquals(messages[i], msg)
188+
cnt += 1
189+
self.assertEquals(cnt, 100)
190+
191+
# Same thing, but don't auto paginate
192+
cnt = 0
193+
for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)):
194+
self.assertEquals(messages[i], msg)
195+
cnt += 1
196+
self.assertTrue(cnt < 100)
197+
198+
def test_offset_request(self):
199+
# Produce a message to create the topic/partition
200+
message1 = KafkaClient.create_message("testing 1")
201+
req = ProduceRequest("test-offset-request", 0, [message1])
202+
self.kafka.send_message_set(req)
203+
self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0"))
204+
self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'"))
205+
206+
t1 = int(time.time()*1000) # now
207+
t2 = t1 + 60000 # one minute from now
208+
req = OffsetRequest("test-offset-request", 0, t1, 1024)
209+
print self.kafka.get_offsets(req)
210+
211+
req = OffsetRequest("test-offset-request", 0, t2, 1024)
212+
print self.kafka.get_offsets(req)
167213

168214
if __name__ == "__main__":
169215
unittest.main()

0 commit comments

Comments
 (0)