|
1 | 1 | import glob
|
| 2 | +import logging |
2 | 3 | import os
|
3 | 4 | import select
|
4 | 5 | import shlex
|
|
11 | 12 | import time
|
12 | 13 | import unittest
|
13 | 14 |
|
14 |
| -from kafka.client import KafkaClient, ProduceRequest, FetchRequest |
| 15 | +from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest |
15 | 16 |
|
16 | 17 | def get_open_port():
|
17 | 18 | sock = socket.socket()
|
@@ -70,7 +71,7 @@ def run(self):
|
70 | 71 | killed = True
|
71 | 72 |
|
72 | 73 | if proc.poll() is not None:
|
73 |
| - #shutil.rmtree(self.tmpDir) |
| 74 | + shutil.rmtree(self.tmpDir) |
74 | 75 | if killed:
|
75 | 76 | break
|
76 | 77 | else:
|
@@ -101,6 +102,11 @@ def tearDownClass(cls):
|
101 | 102 | cls.kafka.close()
|
102 | 103 | cls.server.shouldDie.set()
|
103 | 104 |
|
| 105 | + def test_send_simple(self): |
| 106 | + self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3") |
| 107 | + self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'")) |
| 108 | + self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple")) |
| 109 | + |
104 | 110 | def test_produce(self):
|
105 | 111 | # Produce a message, check that the log got created
|
106 | 112 | req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
|
@@ -164,6 +170,46 @@ def test_check_offset(self):
|
164 | 170 | self.assertEquals(messages[0], message2)
|
165 | 171 | self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
|
166 | 172 |
|
| 173 | + def test_iterator(self): |
| 174 | + # Produce 100 messages |
| 175 | + messages = [] |
| 176 | + for i in range(100): |
| 177 | + messages.append(KafkaClient.create_message("testing %d" % i)) |
| 178 | + req = ProduceRequest("test-iterator", 0, messages) |
| 179 | + self.kafka.send_message_set(req) |
| 180 | + self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0")) |
| 181 | + self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'")) |
| 182 | + |
| 183 | + # Initialize an iterator of fetch size 64 bytes - big enough for one message |
| 184 | + # but not enough for all 100 messages |
| 185 | + cnt = 0 |
| 186 | + for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)): |
| 187 | + self.assertEquals(messages[i], msg) |
| 188 | + cnt += 1 |
| 189 | + self.assertEquals(cnt, 100) |
| 190 | + |
| 191 | + # Same thing, but don't auto paginate |
| 192 | + cnt = 0 |
| 193 | + for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)): |
| 194 | + self.assertEquals(messages[i], msg) |
| 195 | + cnt += 1 |
| 196 | + self.assertTrue(cnt < 100) |
| 197 | + |
| 198 | + def test_offset_request(self): |
| 199 | + # Produce a message to create the topic/partition |
| 200 | + message1 = KafkaClient.create_message("testing 1") |
| 201 | + req = ProduceRequest("test-offset-request", 0, [message1]) |
| 202 | + self.kafka.send_message_set(req) |
| 203 | + self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0")) |
| 204 | + self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'")) |
| 205 | + |
| 206 | + t1 = int(time.time()*1000) # now |
| 207 | + t2 = t1 + 60000 # one minute from now |
| 208 | + req = OffsetRequest("test-offset-request", 0, t1, 1024) |
| 209 | + print self.kafka.get_offsets(req) |
| 210 | + |
| 211 | + req = OffsetRequest("test-offset-request", 0, t2, 1024) |
| 212 | + print self.kafka.get_offsets(req) |
167 | 213 |
|
168 | 214 | if __name__ == "__main__":
|
169 | 215 | unittest.main()
|
0 commit comments