Skip to content
This repository was archived by the owner on Jul 11, 2025. It is now read-only.

[PLEASE IGNORE] Trying to trigger test to find out cause of failure #193

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ def open(self):
env = self.kafka_run_class_env()

# Party!
timeout = 5
max_timeout = 120
timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1))
max_timeout = 600 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
orange_start = time.monotonic()
while time.time() < end_at:
if auto_port:
self.port = get_open_port()
Expand All @@ -227,6 +228,8 @@ def open(self):
backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
with open("/tmp/orange", "w") as orange_f:
orange_f.write("open " + str(time.monotonic() - orange_start) + "\n")
self.out("Done!")
atexit.register(self.close)

Expand Down Expand Up @@ -421,12 +424,13 @@ def start(self):
env['KAFKA_OPTS'] = opts
self.render_template(jaas_conf_template, jaas_conf, vars(self))

timeout = 5
max_timeout = 120
timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1))
max_timeout = 600 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
orange_start = time.monotonic()
while time.time() < end_at:
# We have had problems with port conflicts on travis
# so we will try a different port on each retry
Expand All @@ -451,6 +455,8 @@ def start(self):
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
with open("/tmp/orange", "w") as orange_f:
orange_f.write("start " + str(time.monotonic() - orange_start) + "\n")

(self._client,) = self.get_clients(1, client_id='_internal_client')

Expand Down
17 changes: 15 additions & 2 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def test_consumer(kafka_broker, topic):
def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
orange_start = time.monotonic()
consumer.poll(5000)
with open("/tmp/orange", "w") as orange_f:
orange_f.write("test_consumer_topics " + str(time.monotonic() - orange_start) + "\n")
assert topic in consumer.topics()
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
Expand Down Expand Up @@ -74,16 +77,21 @@ def consumer_thread(i):
threads[i] = t

try:
timeout = time.time() + 35
timeout = time.time() + 350
orange_start = time.monotonic()
while True:
logging.info("num_consumers is %r", num_consumers)
for c in range(num_consumers):
logging.info("c is %r", c)

# Verify all consumers have been created
if c not in consumers:
logging.info("c %r not in consumers", c)
break

# Verify all consumers have an assignment
elif not consumers[c].assignment():
logging.info("c %r does not have assignment", c)
break

# If all consumers exist and have an assignment
Expand All @@ -94,11 +102,13 @@ def consumer_thread(i):
# then log state and break while loop
generations = {consumer._coordinator._generation.generation_id
for consumer in list(consumers.values())}
logging.info("generations is %r", generations)

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = any([consumer._coordinator.rejoining
for consumer in list(consumers.values())])
logging.info("rejoining is %r", rejoining)

if not rejoining and len(generations) == 1:
for c, consumer in list(consumers.items()):
Expand All @@ -111,6 +121,9 @@ def consumer_thread(i):
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
time.sleep(1)
assert time.time() < timeout, "timeout waiting for assignments"
time.sleep(0.25) # avoid busy retry
with open("/tmp/orange", "w") as orange_f:
orange_f.write("test_group " + str(time.monotonic() - orange_start) + "\n")

logging.info('Group stabilized; verifying assignment')
group_assignment = set()
Expand Down
28 changes: 7 additions & 21 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
[tox]
envlist = py{38,39,310,311,312,py}, docs
envlist = py312

[pytest]
testpaths = kafka test
#testpaths = test/test_consumer_group.py
testpaths = test/test_sasl_integration.py
#test/test_ssl_integration.py
addopts = --durations=10
log_format = %(created)f %(filename)-23s %(threadName)s %(message)s

[gh-actions]
python =
3.8: py38
3.9: py39
3.10: py310
3.11: py311
3.12: py312
pypy-3.9: pypy

[testenv]
deps =
Expand All @@ -30,22 +27,11 @@ deps =
crc32c
botocore
commands =
pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka}
pytest {posargs:--cov=kafka -vvv --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset}
setenv =
CRC32C_SW_MODE = auto
PROJECT_ROOT = {toxinidir}
RETRY_TIMEOUT_MULTIPLIER = 0.25
MAX_TIMEOUT_MULTIPLIER = 1
passenv = KAFKA_VERSION


[testenv:pypy]
# pylint is super slow on pypy...
commands = pytest {posargs:--cov=kafka}

[testenv:docs]
deps =
sphinx_rtd_theme
sphinx

commands =
sphinx-apidoc -o docs/apidoc/ kafka/
sphinx-build -b html docs/ docs/_build