Skip to content

Commit c04820d

Browse files
committed
Bring acks and timeout down to the client
1 parent 8bf624e commit c04820d

File tree

3 files changed

+11
-10
lines changed

3 files changed

+11
-10
lines changed

kafka/client.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def close(self):
143143
for conn in self.conns.values():
144144
conn.close()
145145

146-
def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
146+
def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None):
147147
"""
148148
Encode and send some ProduceRequests
149149
@@ -162,8 +162,8 @@ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
162162
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
163163
"""
164164
resps = self._send_broker_aware_request(payloads,
165-
KafkaProtocol.encode_produce_request,
166-
KafkaProtocol.decode_produce_response)
165+
partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout),
166+
KafkaProtocol.decode_produce_response)
167167
out = []
168168
for resp in resps:
169169
# Check for errors

kafka/producer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ def __init__(self, client, topic):
1616
self.client._load_metadata_for_topics(topic)
1717
self.next_partition = cycle(self.client.topic_partitions[topic])
1818

19-
def send_message(self, msg):
19+
def send_messages(self, *msg):
2020
req = ProduceRequest(self.topic, self.next_partition.next(),
21-
messages=[create_message(msg)])
21+
messages=[create_message(m) for m in msg])
2222
resp = self.client.send_produce_request([req])[0]
2323
assert resp.error == 0

test/integration.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -359,22 +359,23 @@ def test_commit_fetch_offsets(self):
359359

360360
def test_simple_producer(self):
361361
producer = SimpleProducer(self.client, "test_simple_producer")
362-
producer.send_message("one")
363-
producer.send_message("two")
362+
producer.send_messages("one", "two")
363+
producer.send_messages("three")
364364

365365
fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024)
366366
fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024)
367367
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2])
368368
self.assertEquals(fetch_resp1.error, 0)
369-
self.assertEquals(fetch_resp1.highwaterMark, 1)
369+
self.assertEquals(fetch_resp1.highwaterMark, 2)
370370
messages = list(fetch_resp1.messages)
371-
self.assertEquals(len(messages), 1)
371+
self.assertEquals(len(messages), 2)
372372
self.assertEquals(messages[0].message.value, "one")
373+
self.assertEquals(messages[1].message.value, "two")
373374
self.assertEquals(fetch_resp2.error, 0)
374375
self.assertEquals(fetch_resp2.highwaterMark, 1)
375376
messages = list(fetch_resp2.messages)
376377
self.assertEquals(len(messages), 1)
377-
self.assertEquals(messages[0].message.value, "two")
378+
self.assertEquals(messages[0].message.value, "three")
378379

379380
class TestConsumer(unittest.TestCase):
380381
@classmethod

0 commit comments

Comments
 (0)