From bec32d99dca75a2e39b048927782d1cea3ceb445 Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Mon, 8 Apr 2024 17:57:09 -0700 Subject: [PATCH 1/7] Fix ssl connection after wrap_ssl --- kafka/client_async.py | 8 ++++++-- kafka/conn.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index b46b879f9..7658d4b24 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -257,7 +257,7 @@ def _can_connect(self, node_id): conn = self._conns[node_id] return conn.disconnected() and not conn.blacked_out() - def _conn_state_change(self, node_id, sock, conn): + def _conn_state_change(self, node_id, sock, conn, ssl_upgraded = False): with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) @@ -266,7 +266,11 @@ def _conn_state_change(self, node_id, sock, conn): try: self._selector.register(sock, selectors.EVENT_WRITE, conn) except KeyError: - self._selector.modify(sock, selectors.EVENT_WRITE, conn) + if ssl_upgraded: + self._selector.unregister(sock) + self._selector.register(sock, selectors.EVENT_WRITE, conn) + else: + self._selector.modify(sock, selectors.EVENT_WRITE, conn) if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() diff --git a/kafka/conn.py b/kafka/conn.py index 745e4bca6..9f1153bdc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -378,10 +378,10 @@ def connect(self): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', self) - self.state = ConnectionStates.HANDSHAKE - self.config['state_change_callback'](self.node_id, self._sock, self) # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() + self.state = ConnectionStates.HANDSHAKE + self.config['state_change_callback'](self.node_id, self._sock, self, True) elif self.config['security_protocol'] == 'SASL_PLAINTEXT': log.debug('%s: initiating SASL authentication', self) From 5c8ec94dc5997cc35ca1677d28c673ae387e9273 Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Mon, 8 Apr 2024 17:59:05 -0700 Subject: [PATCH 2/7] test --- kafka/client_async.py | 5 +++++ servers/2.6.0/resources/kafka.properties | 7 +++++-- test/conftest.py | 4 +++- test/fixtures.py | 11 +++++++++-- test/service.py | 2 ++ tox.ini | 3 ++- 6 files changed, 26 insertions(+), 6 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 7658d4b24..ae3c06f00 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -359,6 +359,7 @@ def _should_recycle_connection(self, conn): return False def _maybe_connect(self, node_id): + # TEST """Idempotent non-blocking connection attempt to the given node id.""" with self._lock: conn = self._conns.get(node_id) @@ -752,6 +753,7 @@ def least_loaded_node(self): """ nodes = [broker.nodeId for broker in self.cluster.brokers()] random.shuffle(nodes) + print(f"xxxx xiong {nodes}") inflight = float('inf') found = None @@ -894,8 +896,11 @@ def check_version(self, node_id=None, timeout=2, strict=False): # It is possible that least_loaded_node falls back to bootstrap, # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() + import sys + print(f"xxxx xiong {node_id}, {self.least_loaded_node()}, {try_node}", file=sys. stderr) if try_node is None: self._lock.release() + # breakpoint() raise Errors.NoBrokersAvailable() self._maybe_connect(try_node) conn = self._conns[try_node] diff --git a/servers/2.6.0/resources/kafka.properties b/servers/2.6.0/resources/kafka.properties index 5775cfdc4..156b8f142 100644 --- a/servers/2.6.0/resources/kafka.properties +++ b/servers/2.6.0/resources/kafka.properties @@ -21,8 +21,8 @@ broker.id={broker_id} ############################# Socket Server Settings ############################# -listeners={transport}://{host}:{port} -security.inter.broker.protocol={transport} +listeners=PLAINTEXT://:9092,SSL://:{port} +advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:{port} {sasl_config} @@ -32,9 +32,12 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar + + authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/test/conftest.py b/test/conftest.py index 824c0fa76..59f1cb9c5 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -18,7 +18,9 @@ def zookeeper(): @pytest.fixture(scope="module") def kafka_broker(kafka_broker_factory): """Return a Kafka broker fixture""" - return kafka_broker_factory()[0] + # return kafka_broker_factory()[0] + return kafka_broker_factory(transport='SSL')[0] + # return kafka_broker_factory(transport='SSL', sasl_mechanism='PLAIN')[0] @pytest.fixture(scope="module") diff --git a/test/fixtures.py b/test/fixtures.py index 4ed515da3..34857e654 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -38,7 +38,7 @@ def gen_ssl_resources(directory): # Step 1 keytool -keystore kafka.server.keystore.jks -alias localhost -validity 1 \ - -genkey -storepass foobar -keypass foobar \ + -genkey -keyalg RSA -storepass foobar -keypass foobar \ -dname "CN=localhost, OU=kafka-python, O=kafka-python, L=SF, ST=CA, C=US" \ -ext SAN=dns:localhost @@ -410,6 +410,11 @@ def start(self): jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") properties_template = self.test_resource("kafka.properties") jaas_conf_template = self.test_resource("kafka_server_jaas.conf") + # ssl_keystore = self.tmp_dir.join("kafka.server.keystore.jks") + # ssl_trustore = self.tmp_dir.join("kafka.server.keystore.jks") + self.ssl_dir = self.tmp_dir + gen_ssl_resources(self.tmp_dir.strpath) + print(f"---- xiong temp {self.tmp_dir}") args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() @@ -516,7 +521,7 @@ def stop(self): def close(self): self.stop() if self.tmp_dir is not None: - self.tmp_dir.remove() + # self.tmp_dir.remove() self.tmp_dir = None self.out("Done!") @@ -641,6 +646,8 @@ def _enrich_client_params(self, params, **defaults): if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): params.setdefault('sasl_plain_username', self.broker_user) params.setdefault('sasl_plain_password', self.broker_password) + params.setdefault("ssl_cafile", self.ssl_dir.join('ca-cert').strpath) + params.setdefault("security_protocol", "SSL") return params @staticmethod diff --git a/test/service.py b/test/service.py index 045d780e7..824dd56ca 100644 --- a/test/service.py +++ b/test/service.py @@ -90,11 +90,13 @@ def run(self): if self.child.stdout in rds: line = self.child.stdout.readline().decode('utf-8').rstrip() if line: + print(line) self.captured_stdout.append(line) if self.child.stderr in rds: line = self.child.stderr.readline().decode('utf-8').rstrip() if line: + print(line) self.captured_stderr.append(line) if self.child.poll() is not None: diff --git a/tox.ini b/tox.ini index a574dc136..b45f94bd4 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,8 @@ deps = crc32c botocore commands = - pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} + # pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} + pytest {posargs:-s test/test_admin_integration.py::test_describe_configs_broker_resource_returns_configs} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From 04786cd36c29c1ea4f92ed41e0fe422487031bec Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Wed, 10 Apr 2024 08:02:10 +0000 Subject: [PATCH 3/7] refactor --- kafka/client_async.py | 4 --- servers/2.6.0/resources/kafka.properties | 7 ++--- test/conftest.py | 13 ++++++---- test/fixtures.py | 33 ++++++++++++++++++------ tox.ini | 2 +- 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index ae3c06f00..c99b025db 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -359,7 +359,6 @@ def _should_recycle_connection(self, conn): return False def _maybe_connect(self, node_id): - # TEST """Idempotent non-blocking connection attempt to the given node id.""" with self._lock: conn = self._conns.get(node_id) @@ -753,7 +752,6 @@ def least_loaded_node(self): """ nodes = [broker.nodeId for broker in self.cluster.brokers()] random.shuffle(nodes) - print(f"xxxx xiong {nodes}") inflight = float('inf') found = None @@ -897,10 +895,8 @@ def check_version(self, node_id=None, timeout=2, strict=False): # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() import sys - print(f"xxxx xiong {node_id}, {self.least_loaded_node()}, {try_node}", file=sys. stderr) if try_node is None: self._lock.release() - # breakpoint() raise Errors.NoBrokersAvailable() self._maybe_connect(try_node) conn = self._conns[try_node] diff --git a/servers/2.6.0/resources/kafka.properties b/servers/2.6.0/resources/kafka.properties index 156b8f142..37f54f5bf 100644 --- a/servers/2.6.0/resources/kafka.properties +++ b/servers/2.6.0/resources/kafka.properties @@ -21,8 +21,8 @@ broker.id={broker_id} ############################# Socket Server Settings ############################# -listeners=PLAINTEXT://:9092,SSL://:{port} -advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:{port} +listeners={listeners_config} +advertised.listeners={advertised_listeners_config} {sasl_config} @@ -32,12 +32,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar - - authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true - # The port the socket server listens on #port=9092 diff --git a/test/conftest.py b/test/conftest.py index 59f1cb9c5..edbd8f1e0 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -15,12 +15,15 @@ def zookeeper(): zk_instance.close() -@pytest.fixture(scope="module") -def kafka_broker(kafka_broker_factory): +# Right now, only test SSL for 2.6.0. +# TODO: enable it for all Kafka versions. +_transport_param = ["PLAINTEXT", "SSL"] if env_kafka_version() == (2, 6, 0) else ["PLAINTEXT"] + + +@pytest.fixture(scope="module", params=_transport_param) +def kafka_broker(kafka_broker_factory, request): """Return a Kafka broker fixture""" - # return kafka_broker_factory()[0] - return kafka_broker_factory(transport='SSL')[0] - # return kafka_broker_factory(transport='SSL', sasl_mechanism='PLAIN')[0] + return kafka_broker_factory(transport=request.param)[0] @pytest.fixture(scope="module") diff --git a/test/fixtures.py b/test/fixtures.py index 34857e654..efceb1bb9 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -281,6 +281,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.host = host self.port = port + self.ssl_port = None self.broker_id = broker_id self.auto_create_topic = auto_create_topic @@ -289,7 +290,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.sasl_mechanism = sasl_mechanism.upper() else: self.sasl_mechanism = None - self.ssl_dir = self.test_resource('ssl') + self.ssl_dir = None # TODO: checking for port connection would be better than scanning logs # until then, we need the pattern to work across all supported broker versions @@ -372,8 +373,22 @@ def _add_scram_user(self): def sasl_enabled(self): return self.sasl_mechanism is not None + def _listeners_config(self): + if self.ssl_port: + return f"PLAINTEXT://{self.host}:{self.port},{self.transport}://{self.host}:{self.ssl_port}" + return f"{self.transport}://{self.host}:{self.port}" + + def _advertised_listeners_config(self): + if self.ssl_port: + return f"PLAINTEXT://{self.host}:{self.port},{self.transport}://{self.host}:{self.ssl_port}" + return f"{self.transport}://{self.host}:{self.port}" + def bootstrap_server(self): - return '%s:%d' % (self.host, self.port) + port = self.port + if self.transport in ["SASL_SSL", "SSL"]: + port = self.ssl_port + assert port + return '%s:%d' % (self.host, port) def kafka_run_class_env(self): env = super(KafkaFixture, self).kafka_run_class_env() @@ -410,11 +425,8 @@ def start(self): jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") properties_template = self.test_resource("kafka.properties") jaas_conf_template = self.test_resource("kafka_server_jaas.conf") - # ssl_keystore = self.tmp_dir.join("kafka.server.keystore.jks") - # ssl_trustore = self.tmp_dir.join("kafka.server.keystore.jks") self.ssl_dir = self.tmp_dir - gen_ssl_resources(self.tmp_dir.strpath) - print(f"---- xiong temp {self.tmp_dir}") + gen_ssl_resources(self.ssl_dir.strpath) args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() @@ -436,6 +448,10 @@ def start(self): # unless the fixture was passed a specific port if auto_port: self.port = get_open_port() + if self.transport in ["SSL", "SASL_SSL"]: + self.ssl_port = get_open_port() + self.listeners_config = self._listeners_config() + self.advertised_listeners_config = self._advertised_listeners_config() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) self.render_template(properties_template, properties, vars(self)) @@ -646,8 +662,9 @@ def _enrich_client_params(self, params, **defaults): if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): params.setdefault('sasl_plain_username', self.broker_user) params.setdefault('sasl_plain_password', self.broker_password) - params.setdefault("ssl_cafile", self.ssl_dir.join('ca-cert').strpath) - params.setdefault("security_protocol", "SSL") + if self.transport in ["SASL_SSL", "SSL"]: + params.setdefault("ssl_cafile", self.ssl_dir.join('ca-cert').strpath) + params.setdefault("security_protocol", self.transport) return params @staticmethod diff --git a/tox.ini b/tox.ini index b45f94bd4..09fee9935 100644 --- a/tox.ini +++ b/tox.ini @@ -31,7 +31,7 @@ deps = botocore commands = # pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} - pytest {posargs:-s test/test_admin_integration.py::test_describe_configs_broker_resource_returns_configs} + pytest {posargs:-s test/} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From 1d446c10e1f081af03bf17cb8ac34118a3c16261 Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Wed, 10 Apr 2024 09:00:48 +0000 Subject: [PATCH 4/7] remove global level --- kafka/client_async.py | 1 - test/conftest.py | 11 ++----- test/test_ssl_integration.py | 64 ++++++++++++++++++++++++++++++++++++ tox.ini | 2 +- 4 files changed, 68 insertions(+), 10 deletions(-) create mode 100644 test/test_ssl_integration.py diff --git a/kafka/client_async.py b/kafka/client_async.py index c99b025db..7658d4b24 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -894,7 +894,6 @@ def check_version(self, node_id=None, timeout=2, strict=False): # It is possible that least_loaded_node falls back to bootstrap, # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() - import sys if try_node is None: self._lock.release() raise Errors.NoBrokersAvailable() diff --git a/test/conftest.py b/test/conftest.py index edbd8f1e0..824c0fa76 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -15,15 +15,10 @@ def zookeeper(): zk_instance.close() -# Right now, only test SSL for 2.6.0. -# TODO: enable it for all Kafka versions. -_transport_param = ["PLAINTEXT", "SSL"] if env_kafka_version() == (2, 6, 0) else ["PLAINTEXT"] - - -@pytest.fixture(scope="module", params=_transport_param) -def kafka_broker(kafka_broker_factory, request): +@pytest.fixture(scope="module") +def kafka_broker(kafka_broker_factory): """Return a Kafka broker fixture""" - return kafka_broker_factory(transport=request.param)[0] + return kafka_broker_factory()[0] @pytest.fixture(scope="module") diff --git a/test/test_ssl_integration.py b/test/test_ssl_integration.py new file mode 100644 index 000000000..7ef2d9a78 --- /dev/null +++ b/test/test_ssl_integration.py @@ -0,0 +1,64 @@ +import logging +import uuid + +import pytest + +from kafka.admin import NewTopic +from kafka.protocol.metadata import MetadataRequest_v1 +from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore + + +@pytest.fixture(scope="module") +def ssl_kafka(request, kafka_broker_factory): + return kafka_broker_factory(transport="SSL")[0] + + +def test_admin(request, ssl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + admin, = ssl_kafka.get_admin_clients(1) + admin.create_topics([NewTopic(topic_name, 1, 1)]) + assert topic_name in ssl_kafka.get_topic_names() + + +def test_produce_and_consume(request, ssl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + ssl_kafka.create_topics([topic_name], num_partitions=2) + producer, = ssl_kafka.get_producers(1) + + messages_and_futures = [] # [(message, produce_future),] + for i in range(100): + encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8") + future = producer.send(topic_name, value=encoded_msg, partition=i % 2) + messages_and_futures.append((encoded_msg, future)) + producer.flush() + + for (msg, f) in messages_and_futures: + assert f.succeeded() + + consumer, = ssl_kafka.get_consumers(1, [topic_name]) + messages = {0: [], 1: []} + for i, message in enumerate(consumer, 1): + logging.debug("Consumed message %s", repr(message)) + messages[message.partition].append(message) + if i >= 100: + break + + assert_message_count(messages[0], 50) + assert_message_count(messages[1], 50) + + +def test_client(request, ssl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + ssl_kafka.create_topics([topic_name], num_partitions=1) + + client, = ssl_kafka.get_clients(1) + request = MetadataRequest_v1(None) + client.send(0, request) + for _ in range(10): + result = client.poll(timeout_ms=10000) + if len(result) > 0: + break + else: + raise RuntimeError("Couldn't fetch topic response from Broker.") + result = result[0] + assert topic_name in [t[1] for t in result.topics] diff --git a/tox.ini b/tox.ini index 09fee9935..45d5d1918 100644 --- a/tox.ini +++ b/tox.ini @@ -31,7 +31,7 @@ deps = botocore commands = # pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} - pytest {posargs:-s test/} + pytest {posargs:-s test/test_ssl_integration.py} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From 01371373736800822ca2246fb23204261a5c826d Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Wed, 10 Apr 2024 09:17:06 +0000 Subject: [PATCH 5/7] test --- servers/2.6.0/resources/kafka.properties | 1 + test/fixtures.py | 5 ++++- test/test_ssl_integration.py | 3 +++ tox.ini | 4 ++-- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/servers/2.6.0/resources/kafka.properties b/servers/2.6.0/resources/kafka.properties index 37f54f5bf..3e6db1eb3 100644 --- a/servers/2.6.0/resources/kafka.properties +++ b/servers/2.6.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={listeners_config} advertised.listeners={advertised_listeners_config} +security.inter.broker.protocol={transport} {sasl_config} diff --git a/test/fixtures.py b/test/fixtures.py index efceb1bb9..e2e281959 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -374,11 +374,13 @@ def sasl_enabled(self): return self.sasl_mechanism is not None def _listeners_config(self): + return f"{self.transport}://{self.host}:{self.port}" if self.ssl_port: return f"PLAINTEXT://{self.host}:{self.port},{self.transport}://{self.host}:{self.ssl_port}" return f"{self.transport}://{self.host}:{self.port}" def _advertised_listeners_config(self): + return f"{self.transport}://{self.host}:{self.port}" if self.ssl_port: return f"PLAINTEXT://{self.host}:{self.port},{self.transport}://{self.host}:{self.ssl_port}" return f"{self.transport}://{self.host}:{self.port}" @@ -386,7 +388,8 @@ def _advertised_listeners_config(self): def bootstrap_server(self): port = self.port if self.transport in ["SASL_SSL", "SSL"]: - port = self.ssl_port + # port = self.ssl_port + pass assert port return '%s:%d' % (self.host, port) diff --git a/test/test_ssl_integration.py b/test/test_ssl_integration.py index 7ef2d9a78..8453e7831 100644 --- a/test/test_ssl_integration.py +++ b/test/test_ssl_integration.py @@ -13,6 +13,7 @@ def ssl_kafka(request, kafka_broker_factory): return kafka_broker_factory(transport="SSL")[0] +@pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") def test_admin(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) admin, = ssl_kafka.get_admin_clients(1) @@ -20,6 +21,7 @@ def test_admin(request, ssl_kafka): assert topic_name in ssl_kafka.get_topic_names() +@pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") def test_produce_and_consume(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) ssl_kafka.create_topics([topic_name], num_partitions=2) @@ -47,6 +49,7 @@ def test_produce_and_consume(request, ssl_kafka): assert_message_count(messages[1], 50) +@pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") def test_client(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) ssl_kafka.create_topics([topic_name], num_partitions=1) diff --git a/tox.ini b/tox.ini index 45d5d1918..9d64f2bca 100644 --- a/tox.ini +++ b/tox.ini @@ -30,8 +30,8 @@ deps = crc32c botocore commands = - # pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} - pytest {posargs:-s test/test_ssl_integration.py} + pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} + # pytest {posargs:-s test/test_ssl_integration.py} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From e686d6bd512f6ed2d35052a9bb2bdf8dfec56552 Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Wed, 10 Apr 2024 10:04:21 +0000 Subject: [PATCH 6/7] revert test --- servers/2.6.0/resources/kafka.properties | 3 +-- test/fixtures.py | 26 ++---------------------- test/service.py | 2 -- tox.ini | 1 - 4 files changed, 3 insertions(+), 29 deletions(-) diff --git a/servers/2.6.0/resources/kafka.properties b/servers/2.6.0/resources/kafka.properties index 3e6db1eb3..5775cfdc4 100644 --- a/servers/2.6.0/resources/kafka.properties +++ b/servers/2.6.0/resources/kafka.properties @@ -21,8 +21,7 @@ broker.id={broker_id} ############################# Socket Server Settings ############################# -listeners={listeners_config} -advertised.listeners={advertised_listeners_config} +listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} {sasl_config} diff --git a/test/fixtures.py b/test/fixtures.py index e2e281959..998dc429f 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -281,7 +281,6 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.host = host self.port = port - self.ssl_port = None self.broker_id = broker_id self.auto_create_topic = auto_create_topic @@ -373,25 +372,8 @@ def _add_scram_user(self): def sasl_enabled(self): return self.sasl_mechanism is not None - def _listeners_config(self): - return f"{self.transport}://{self.host}:{self.port}" - if self.ssl_port: - return f"PLAINTEXT://{self.host}:{self.port},{self.transport}://{self.host}:{self.ssl_port}" - return f"{self.transport}://{self.host}:{self.port}" - - def _advertised_listeners_config(self): - return f"{self.transport}://{self.host}:{self.port}" - if self.ssl_port: - return f"PLAINTEXT://{self.host}:{self.port},{self.transport}://{self.host}:{self.ssl_port}" - return f"{self.transport}://{self.host}:{self.port}" - def bootstrap_server(self): - port = self.port - if self.transport in ["SASL_SSL", "SSL"]: - # port = self.ssl_port - pass - assert port - return '%s:%d' % (self.host, port) + return '%s:%d' % (self.host, self.port) def kafka_run_class_env(self): env = super(KafkaFixture, self).kafka_run_class_env() @@ -451,10 +433,6 @@ def start(self): # unless the fixture was passed a specific port if auto_port: self.port = get_open_port() - if self.transport in ["SSL", "SASL_SSL"]: - self.ssl_port = get_open_port() - self.listeners_config = self._listeners_config() - self.advertised_listeners_config = self._advertised_listeners_config() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) self.render_template(properties_template, properties, vars(self)) @@ -540,7 +518,7 @@ def stop(self): def close(self): self.stop() if self.tmp_dir is not None: - # self.tmp_dir.remove() + self.tmp_dir.remove() self.tmp_dir = None self.out("Done!") diff --git a/test/service.py b/test/service.py index 824dd56ca..045d780e7 100644 --- a/test/service.py +++ b/test/service.py @@ -90,13 +90,11 @@ def run(self): if self.child.stdout in rds: line = self.child.stdout.readline().decode('utf-8').rstrip() if line: - print(line) self.captured_stdout.append(line) if self.child.stderr in rds: line = self.child.stderr.readline().decode('utf-8').rstrip() if line: - print(line) self.captured_stderr.append(line) if self.child.poll() is not None: diff --git a/tox.ini b/tox.ini index 9d64f2bca..a574dc136 100644 --- a/tox.ini +++ b/tox.ini @@ -31,7 +31,6 @@ deps = botocore commands = pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} - # pytest {posargs:-s test/test_ssl_integration.py} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From f76365c66e4addbc4fb6e9d984bf1b929a28d180 Mon Sep 17 00:00:00 2001 From: Xiong Ding Date: Wed, 10 Apr 2024 16:34:30 +0000 Subject: [PATCH 7/7] address comments --- kafka/client_async.py | 7 +++++-- kafka/conn.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 7658d4b24..984cd81fb 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -257,7 +257,7 @@ def _can_connect(self, node_id): conn = self._conns[node_id] return conn.disconnected() and not conn.blacked_out() - def _conn_state_change(self, node_id, sock, conn, ssl_upgraded = False): + def _conn_state_change(self, node_id, sock, conn): with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) @@ -266,7 +266,10 @@ def _conn_state_change(self, node_id, sock, conn, ssl_upgraded = False): try: self._selector.register(sock, selectors.EVENT_WRITE, conn) except KeyError: - if ssl_upgraded: + # SSL detaches the original socket, and transfers the + # underlying file descriptor to a new SSLSocket. We should + # explicitly unregister the original socket. + if conn.state == ConnectionStates.HANDSHAKE: self._selector.unregister(sock) self._selector.register(sock, selectors.EVENT_WRITE, conn) else: diff --git a/kafka/conn.py b/kafka/conn.py index 9f1153bdc..b9ef0e2d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -381,7 +381,7 @@ def connect(self): # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() self.state = ConnectionStates.HANDSHAKE - self.config['state_change_callback'](self.node_id, self._sock, self, True) + self.config['state_change_callback'](self.node_id, self._sock, self) elif self.config['security_protocol'] == 'SASL_PLAINTEXT': log.debug('%s: initiating SASL authentication', self)