From c170ba09f8f34292f94a69739e8a90d199c51dba Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 9 Aug 2024 04:40:50 +0000 Subject: [PATCH] [PLEASE IGNORE] Trying to trigger test to find out cause of failure --- test/fixtures.py | 14 ++++++++++---- test/test_consumer_group.py | 17 +++++++++++++++-- tox.ini | 28 +++++++--------------------- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 998dc429f..0e1bf0323 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -203,12 +203,13 @@ def open(self): env = self.kafka_run_class_env() # Party! - timeout = 5 - max_timeout = 120 + timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1)) + max_timeout = 600 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)) backoff = 1 end_at = time.time() + max_timeout tries = 1 auto_port = (self.port is None) + orange_start = time.monotonic() while time.time() < end_at: if auto_port: self.port = get_open_port() @@ -227,6 +228,8 @@ def open(self): backoff += 1 else: raise RuntimeError('Failed to start Zookeeper before max_timeout') + with open("/tmp/orange", "w") as orange_f: + orange_f.write("open " + str(time.monotonic() - orange_start) + "\n") self.out("Done!") atexit.register(self.close) @@ -421,12 +424,13 @@ def start(self): env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) - timeout = 5 - max_timeout = 120 + timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1)) + max_timeout = 600 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)) backoff = 1 end_at = time.time() + max_timeout tries = 1 auto_port = (self.port is None) + orange_start = time.monotonic() while time.time() < end_at: # We have had problems with port conflicts on travis # so we will try a different port on each retry @@ -451,6 +455,8 @@ def start(self): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') + with open("/tmp/orange", "w") as orange_f: + orange_f.write("start " + str(time.monotonic() - orange_start) + "\n") (self._client,) = self.get_clients(1, client_id='_internal_client') diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index ed6863fa2..871331275 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -34,7 +34,10 @@ def test_consumer(kafka_broker, topic): def test_consumer_topics(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) # Necessary to drive the IO - consumer.poll(500) + orange_start = time.monotonic() + consumer.poll(5000) + with open("/tmp/orange", "w") as orange_f: + orange_f.write("test_consumer_topics " + str(time.monotonic() - orange_start) + "\n") assert topic in consumer.topics() assert len(consumer.partitions_for_topic(topic)) > 0 consumer.close() @@ -74,16 +77,21 @@ def consumer_thread(i): threads[i] = t try: - timeout = time.time() + 35 + timeout = time.time() + 350 + orange_start = time.monotonic() while True: + logging.info("num_consumers is %r", num_consumers) for c in range(num_consumers): + logging.info("c is %r", c) # Verify all consumers have been created if c not in consumers: + logging.info("c %r not in consumers", c) break # Verify all consumers have an assignment elif not consumers[c].assignment(): + logging.info("c %r does not have assignment", c) break # If all consumers exist and have an assignment @@ -94,11 +102,13 @@ def consumer_thread(i): # then log state and break while loop generations = {consumer._coordinator._generation.generation_id for consumer in list(consumers.values())} + logging.info("generations is %r", generations) # New generation assignment is not complete until # coordinator.rejoining = False rejoining = any([consumer._coordinator.rejoining for consumer in list(consumers.values())]) + logging.info("rejoining is %r", rejoining) if not rejoining and len(generations) == 1: for c, consumer in list(consumers.items()): @@ -111,6 +121,9 @@ def consumer_thread(i): logging.info('Rejoining: %s, generations: %s', rejoining, generations) time.sleep(1) assert time.time() < timeout, "timeout waiting for assignments" + time.sleep(0.25) # avoid busy retry + with open("/tmp/orange", "w") as orange_f: + orange_f.write("test_group " + str(time.monotonic() - orange_start) + "\n") logging.info('Group stabilized; verifying assignment') group_assignment = set() diff --git a/tox.ini b/tox.ini index a574dc136..31e92f82e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,19 +1,16 @@ [tox] -envlist = py{38,39,310,311,312,py}, docs +envlist = py312 [pytest] -testpaths = kafka test +#testpaths = test/test_consumer_group.py +testpaths = test/test_sasl_integration.py +#test/test_ssl_integration.py addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s [gh-actions] python = - 3.8: py38 - 3.9: py39 - 3.10: py310 - 3.11: py311 3.12: py312 - pypy-3.9: pypy [testenv] deps = @@ -30,22 +27,11 @@ deps = crc32c botocore commands = - pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} + pytest {posargs:--cov=kafka -vvv --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} + RETRY_TIMEOUT_MULTIPLIER = 0.25 + MAX_TIMEOUT_MULTIPLIER = 1 passenv = KAFKA_VERSION - -[testenv:pypy] -# pylint is super slow on pypy... -commands = pytest {posargs:--cov=kafka} - -[testenv:docs] -deps = - sphinx_rtd_theme - sphinx - -commands = - sphinx-apidoc -o docs/apidoc/ kafka/ - sphinx-build -b html docs/ docs/_build