forked from dpkp/kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtest_sasl_integration.py
80 lines (65 loc) · 2.76 KB
/
test_sasl_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import logging
import uuid
import pytest
from kafka.admin import NewTopic
from kafka.protocol.metadata import MetadataRequest_v1
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
@pytest.fixture(
params=[
pytest.param(
"PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10")
),
pytest.param(
"SCRAM-SHA-256",
marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
),
pytest.param(
"SCRAM-SHA-512",
marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
),
]
)
def sasl_kafka(request, kafka_broker_factory):
sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0]
yield sasl_kafka
sasl_kafka.child.dump_logs()
def test_admin(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
admin, = sasl_kafka.get_admin_clients(1)
admin.create_topics([NewTopic(topic_name, 1, 1)])
assert topic_name in sasl_kafka.get_topic_names()
def test_produce_and_consume(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
sasl_kafka.create_topics([topic_name], num_partitions=2)
producer, = sasl_kafka.get_producers(1)
messages_and_futures = [] # [(message, produce_future),]
for i in range(100):
encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8")
future = producer.send(topic_name, value=encoded_msg, partition=i % 2)
messages_and_futures.append((encoded_msg, future))
producer.flush()
for (msg, f) in messages_and_futures:
assert f.succeeded()
consumer, = sasl_kafka.get_consumers(1, [topic_name])
messages = {0: [], 1: []}
for i, message in enumerate(consumer, 1):
logging.debug("Consumed message %s", repr(message))
messages[message.partition].append(message)
if i >= 100:
break
assert_message_count(messages[0], 50)
assert_message_count(messages[1], 50)
def test_client(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
sasl_kafka.create_topics([topic_name], num_partitions=1)
client, = sasl_kafka.get_clients(1)
request = MetadataRequest_v1(None)
client.send(0, request)
for _ in range(10):
result = client.poll(timeout_ms=10000)
if len(result) > 0:
break
else:
raise RuntimeError("Couldn't fetch topic response from Broker.")
result = result[0]
assert topic_name in [t[1] for t in result.topics]