@@ -83,13 +83,50 @@ def setUp(self):
83
83
self .kafka = KafkaClient ("localhost" , port )
84
84
85
85
def test_produce (self ):
86
- req = ProduceRequest ("my-topic " , 0 , [KafkaClient .create_message ("testing" )])
86
+ req = ProduceRequest ("test-produce " , 0 , [KafkaClient .create_message ("testing" )])
87
87
self .kafka .send_message_set (req )
88
- self .assertTrue (self .server .wait_for ("Created log for 'my-topic '-0" ))
88
+ self .assertTrue (self .server .wait_for ("Created log for 'test-produce '-0" ))
89
89
90
- req = ProduceRequest ("my-topic " , 1 , [KafkaClient .create_message ("testing" )])
90
+ req = ProduceRequest ("test-produce " , 1 , [KafkaClient .create_message ("testing" )])
91
91
self .kafka .send_message_set (req )
92
- self .assertTrue (self .server .wait_for ("Created log for 'my-topic'-1" ))
92
+ self .assertTrue (self .server .wait_for ("Created log for 'test-produce'-1" ))
93
+
94
+ def test_produce_consume (self ):
95
+ message1 = KafkaClient .create_message ("testing 1" )
96
+ message2 = KafkaClient .create_message ("testing 2" )
97
+ req = ProduceRequest ("test-produce-consume" , 0 , [message1 , message2 ])
98
+ self .kafka .send_message_set (req )
99
+ self .assertTrue (self .server .wait_for ("Created log for 'test-produce-consume'-0" ))
100
+ time .sleep (1 )
101
+ req = FetchRequest ("test-produce-consume" , 0 , 0 , 1024 )
102
+ (messages , req ) = self .kafka .get_message_set (req )
103
+ self .assertEquals (len (messages ), 2 )
104
+ self .assertEquals (messages [0 ], message1 )
105
+ self .assertEquals (messages [1 ], message2 )
106
+
107
+ message3 = KafkaClient .create_message ("testing 3" )
108
+ message4 = KafkaClient .create_message ("testing 4" )
109
+ req = ProduceRequest ("test-produce-consume" , 1 , [message3 , message4 ])
110
+ self .kafka .send_message_set (req )
111
+ self .assertTrue (self .server .wait_for ("Created log for 'test-produce-consume'-1" ))
112
+ time .sleep (1 )
113
+ req = FetchRequest ("test-produce-consume" , 1 , 0 , 1024 )
114
+ (messages , req ) = self .kafka .get_message_set (req )
115
+ self .assertEquals (len (messages ), 2 )
116
+ self .assertEquals (messages [0 ], message3 )
117
+ self .assertEquals (messages [1 ], message4 )
118
+
119
+ def test_check_offset (self ):
120
+ message1 = KafkaClient .create_message ("testing 1" )
121
+ req = ProduceRequest ("test-check-offset" , 0 , [message1 ])
122
+ self .kafka .send_message_set (req )
123
+ self .assertTrue (self .server .wait_for ("Created log for 'test-check-offset'-0" ))
124
+ time .sleep (1 )
125
+ req = FetchRequest ("test-check-offset" , 0 , 0 , 1024 )
126
+ (messages , req ) = self .kafka .get_message_set (req )
127
+ self .assertEquals (len (messages ), 1 )
128
+ self .assertEquals (messages [0 ], message1 )
129
+ assertEquals (req .offset , len (KafkaClient .encode_message (message1 )))
93
130
94
131
def tearDown (self ):
95
132
self .kafka .close ()
0 commit comments