@@ -124,29 +124,47 @@ def test_simple_producer(self):
124
124
start_offset1 = self .current_offset (self .topic , 1 )
125
125
producer = SimpleProducer (self .client )
126
126
127
- # Will go to partition 0
128
- msg1 , msg2 , msg3 , msg4 , msg5 = [ str (uuid .uuid4 ()) for x in xrange (5 ) ]
127
+ # Goes to first partition, randomly.
129
128
resp = producer .send_messages (self .topic , self .msg ("one" ), self .msg ("two" ))
130
129
self .assert_produce_response (resp , start_offset0 )
131
130
132
- # Will go to partition 1
131
+ # Goes to the next partition, randomly.
133
132
resp = producer .send_messages (self .topic , self .msg ("three" ))
134
133
self .assert_produce_response (resp , start_offset1 )
135
134
136
135
self .assert_fetch_offset (0 , start_offset0 , [ self .msg ("one" ), self .msg ("two" ) ])
137
136
self .assert_fetch_offset (1 , start_offset1 , [ self .msg ("three" ) ])
138
137
139
- # Will go to partition 0
138
+ # Goes back to the first partition because there's only two partitions
140
139
resp = producer .send_messages (self .topic , self .msg ("four" ), self .msg ("five" ))
141
140
self .assert_produce_response (resp , start_offset0 + 2 )
142
141
self .assert_fetch_offset (0 , start_offset0 , [ self .msg ("one" ), self .msg ("two" ), self .msg ("four" ), self .msg ("five" ) ])
143
142
144
143
producer .stop ()
145
144
146
145
@kafka_versions ("all" )
147
- def test_round_robin_partitioner (self ):
148
- msg1 , msg2 , msg3 , msg4 = [ str (uuid .uuid4 ()) for _ in range (4 ) ]
146
+ def test_producer_random_order (self ):
147
+ producer = SimpleProducer (self .client , random_start = True )
148
+ resp1 = producer .send_messages (self .topic , self .msg ("one" ), self .msg ("two" ))
149
+ resp2 = producer .send_messages (self .topic , self .msg ("three" ))
150
+ resp3 = producer .send_messages (self .topic , self .msg ("four" ), self .msg ("five" ))
151
+
152
+ self .assertEqual (resp1 [0 ].partition , resp3 [0 ].partition )
153
+ self .assertNotEqual (resp1 [0 ].partition , resp2 [0 ].partition )
154
+
155
+ @kafka_versions ("all" )
156
+ def test_producer_ordered_start (self ):
157
+ producer = SimpleProducer (self .client , random_start = False )
158
+ resp1 = producer .send_messages (self .topic , self .msg ("one" ), self .msg ("two" ))
159
+ resp2 = producer .send_messages (self .topic , self .msg ("three" ))
160
+ resp3 = producer .send_messages (self .topic , self .msg ("four" ), self .msg ("five" ))
149
161
162
+ self .assertEqual (resp1 [0 ].partition , 0 )
163
+ self .assertEqual (resp2 [0 ].partition , 1 )
164
+ self .assertEqual (resp3 [0 ].partition , 0 )
165
+
166
+ @kafka_versions ("all" )
167
+ def test_round_robin_partitioner (self ):
150
168
start_offset0 = self .current_offset (self .topic , 0 )
151
169
start_offset1 = self .current_offset (self .topic , 1 )
152
170
0 commit comments