@@ -35,8 +35,10 @@ def build_kafka_classpath():
35
35
return cp
36
36
37
37
class KafkaFixture (Thread ):
38
- def __init__ (self , host , port ):
38
+ def __init__ (self , host , port , broker_id , zk_chroot = None ):
39
39
Thread .__init__ (self )
40
+ self .broker_id = broker_id
41
+ self .zk_chroot = zk_chroot
40
42
self .port = port
41
43
self .capture = ""
42
44
self .shouldDie = Event ()
@@ -50,19 +52,24 @@ def run(self):
50
52
stdout = open (os .path .join (logDir , 'stdout' ), 'w' )
51
53
52
54
# Create the config file
53
- zkChroot = "kafka-python_%s" % self .tmpDir .replace ("/" , "_" )
55
+ if self .zk_chroot is None :
56
+ self .zk_chroot = "kafka-python_%s" % self .tmpDir .replace ("/" , "_" )
54
57
logConfig = "test/resources/log4j.properties"
55
58
configFile = os .path .join (self .tmpDir , 'server.properties' )
56
59
f = open ('test/resources/server.properties' , 'r' )
57
60
props = f .read ()
58
61
f = open (configFile , 'w' )
59
- f .write (props % {'kafka.port' : self .port , 'kafka.tmp.dir' : logDir , 'kafka.partitions' : 2 , 'zk.chroot' : zkChroot })
62
+ f .write (props % {'broker.id' : self .broker_id ,
63
+ 'kafka.port' : self .port ,
64
+ 'kafka.tmp.dir' : logDir ,
65
+ 'kafka.partitions' : 2 ,
66
+ 'zk.chroot' : self .zk_chroot })
60
67
f .close ()
61
68
62
69
cp = build_kafka_classpath ()
63
70
64
71
# Create the Zookeeper chroot
65
- args = shlex .split ("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp , zkChroot ))
72
+ args = shlex .split ("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp , self . zk_chroot ))
66
73
proc = subprocess .Popen (args )
67
74
ret = proc .wait ()
68
75
assert ret == 0
@@ -123,7 +130,7 @@ def setUpClass(cls):
123
130
cls .client = KafkaClient (host , port )
124
131
else :
125
132
port = get_open_port ()
126
- cls .server = KafkaFixture ("localhost" , port )
133
+ cls .server = KafkaFixture ("localhost" , port , 0 )
127
134
cls .server .start ()
128
135
cls .server .wait_for ("Kafka server started" )
129
136
cls .client = KafkaClient ("localhost" , port )
@@ -367,10 +374,55 @@ def test_simple_producer(self):
367
374
self .assertEquals (len (messages ), 1 )
368
375
self .assertEquals (messages [0 ].message .value , "two" )
369
376
370
- # Consumer Tests
377
+ class TestConsumer (unittest .TestCase ):
378
+ @classmethod
379
+ def setUpClass (cls ):
380
+ # Broker 0
381
+ port = get_open_port ()
382
+ cls .server1 = KafkaFixture ("localhost" , port , 0 )
383
+ cls .server1 .start ()
384
+ cls .server1 .wait_for ("Kafka server started" )
385
+
386
+ # Broker 1
387
+ zk = cls .server1 .zk_chroot
388
+ port = get_open_port ()
389
+ cls .server2 = KafkaFixture ("localhost" , port , 1 , zk )
390
+ cls .server2 .start ()
391
+ cls .server2 .wait_for ("Kafka server started" )
392
+
393
+ # Client bootstraps from broker 1
394
+ cls .client = KafkaClient ("localhost" , port )
395
+
396
+ @classmethod
397
+ def tearDownClass (cls ):
398
+ cls .client .close ()
399
+ cls .server1 .close ()
400
+ cls .server2 .close ()
371
401
372
402
def test_consumer (self ):
403
+ produce1 = ProduceRequest ("test_consumer" , 0 , messages = [
404
+ KafkaProtocol .create_message ("Test message 0 %d" % i ) for i in range (100 )
405
+ ])
406
+
407
+ produce2 = ProduceRequest ("test_consumer" , 1 , messages = [
408
+ KafkaProtocol .create_message ("Test message 1 %d" % i ) for i in range (100 )
409
+ ])
410
+
411
+ for resp in self .client .send_produce_request ([produce1 ]):
412
+ self .assertEquals (resp .error , 0 )
413
+ self .assertEquals (resp .offset , 0 )
414
+
415
+ for resp in self .client .send_produce_request ([produce2 ]):
416
+ self .assertEquals (resp .error , 0 )
417
+ self .assertEquals (resp .offset , 0 )
418
+
373
419
consumer = SimpleConsumer (self .client , "group1" , "test_consumer" )
420
+ all_messages = []
421
+ for message in consumer :
422
+ all_messages .append (message )
423
+
424
+ self .assertEquals (len (all_messages ), 200 )
425
+ self .assertEquals (len (all_messages ), len (set (all_messages ))) # make sure there are no dupes
374
426
375
427
if __name__ == "__main__" :
376
428
logging .basicConfig (level = logging .INFO )
0 commit comments