Skip to content

Commit 61fa0b2

Browse files
authored
Convert remaining KafkaConsumer tests to pytest (dpkp#1886)
This makes it so the only remaining use of `unittest` is in the old tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are migrated to `pytest`. I also had to bump the test iterations up on one of the tests, I think there was a race condition there that was more commonly hit under pytest , planning to cleanup that in a followup PR. See dpkp#1886 (comment) for details.
1 parent 6e6d0cc commit 61fa0b2

File tree

4 files changed

+284
-256
lines changed

4 files changed

+284
-256
lines changed

test/conftest.py

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
22

3+
import uuid
4+
35
import pytest
46

57
from test.testutil import env_kafka_version, random_string
@@ -137,3 +139,27 @@ def _set_conn_state(state):
137139
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
138140
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
139141
return conn
142+
143+
144+
@pytest.fixture()
145+
def send_messages(topic, kafka_producer, request):
146+
"""A factory that returns a send_messages function with a pre-populated
147+
topic topic / producer."""
148+
149+
def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
150+
"""
151+
messages is typically `range(0,100)`
152+
partition is an int
153+
"""
154+
messages_and_futures = [] # [(message, produce_future),]
155+
for i in number_range:
156+
# request.node.name provides the test name (including parametrized values)
157+
encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
158+
future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
159+
messages_and_futures.append((encoded_msg, future))
160+
kafka_producer.flush()
161+
for (msg, f) in messages_and_futures:
162+
assert f.succeeded()
163+
return [msg for (msg, f) in messages_and_futures]
164+
165+
return _send_messages

test/test_consumer_group.py

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def test_consumer(kafka_broker, topic):
2929
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
3030
consumer.close()
3131

32+
3233
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
3334
def test_consumer_topics(kafka_broker, topic):
3435
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
@@ -38,6 +39,7 @@ def test_consumer_topics(kafka_broker, topic):
3839
assert len(consumer.partitions_for_topic(topic)) > 0
3940
consumer.close()
4041

42+
4143
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
4244
def test_group(kafka_broker, topic):
4345
num_partitions = 4

0 commit comments

Comments
 (0)