Skip to content

Commit b053da2

Browse files
committed
Merge branch 'master' into add_tests
kafka/client.py contained duplicate copies of same refactor, merged. Move test/test_integration.py changes into test/test_producer_integration. Conflicts: kafka/client.py servers/0.8.0/kafka-src test/test_integration.py
2 parents efcf58b + 3b18043 commit b053da2

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

kafka/producer.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
import time
5+
import random
56

67
from Queue import Empty
78
from collections import defaultdict
@@ -180,14 +181,20 @@ class SimpleProducer(Producer):
180181
batch_send - If True, messages are send in batches
181182
batch_send_every_n - If set, messages are send in batches of this size
182183
batch_send_every_t - If set, messages are send after this timeout
184+
random_start - If true, randomize the initial partition which the
185+
the first message block will be published to, otherwise
186+
if false, the first message block will always publish
187+
to partition 0 before cycling through each partition
183188
"""
184189
def __init__(self, client, async=False,
185190
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
186191
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
187192
batch_send=False,
188193
batch_send_every_n=BATCH_SEND_MSG_COUNT,
189-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
194+
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
195+
random_start=False):
190196
self.partition_cycles = {}
197+
self.random_start = random_start
191198
super(SimpleProducer, self).__init__(client, async, req_acks,
192199
ack_timeout, batch_send,
193200
batch_send_every_n,
@@ -198,6 +205,13 @@ def _next_partition(self, topic):
198205
if topic not in self.client.topic_partitions:
199206
self.client.load_metadata_for_topics(topic)
200207
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
208+
209+
# Randomize the initial partition that is returned
210+
if self.random_start:
211+
num_partitions = len(self.client.topic_partitions[topic])
212+
for _ in xrange(random.randint(0, num_partitions-1)):
213+
self.partition_cycles[topic].next()
214+
201215
return self.partition_cycles[topic].next()
202216

203217
def send_messages(self, topic, *msg):

test/test_producer_integration.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,29 +124,47 @@ def test_simple_producer(self):
124124
start_offset1 = self.current_offset(self.topic, 1)
125125
producer = SimpleProducer(self.client)
126126

127-
# Will go to partition 0
128-
msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ]
127+
# Goes to first partition, randomly.
129128
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
130129
self.assert_produce_response(resp, start_offset0)
131130

132-
# Will go to partition 1
131+
# Goes to the next partition, randomly.
133132
resp = producer.send_messages(self.topic, self.msg("three"))
134133
self.assert_produce_response(resp, start_offset1)
135134

136135
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
137136
self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
138137

139-
# Will go to partition 0
138+
# Goes back to the first partition because there's only two partitions
140139
resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
141140
self.assert_produce_response(resp, start_offset0+2)
142141
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
143142

144143
producer.stop()
145144

146145
@kafka_versions("all")
147-
def test_round_robin_partitioner(self):
148-
msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
146+
def test_producer_random_order(self):
147+
producer = SimpleProducer(self.client, random_start = True)
148+
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
149+
resp2 = producer.send_messages(self.topic, self.msg("three"))
150+
resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
151+
152+
self.assertEqual(resp1[0].partition, resp3[0].partition)
153+
self.assertNotEqual(resp1[0].partition, resp2[0].partition)
154+
155+
@kafka_versions("all")
156+
def test_producer_ordered_start(self):
157+
producer = SimpleProducer(self.client, random_start = False)
158+
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
159+
resp2 = producer.send_messages(self.topic, self.msg("three"))
160+
resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
149161

162+
self.assertEqual(resp1[0].partition, 0)
163+
self.assertEqual(resp2[0].partition, 1)
164+
self.assertEqual(resp3[0].partition, 0)
165+
166+
@kafka_versions("all")
167+
def test_round_robin_partitioner(self):
150168
start_offset0 = self.current_offset(self.topic, 0)
151169
start_offset1 = self.current_offset(self.topic, 1)
152170

0 commit comments

Comments
 (0)