Skip to content

[PLEASE IGNORE] Trying to trigger test to find out cause of failure #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ def open(self):
env = self.kafka_run_class_env()

# Party!
timeout = 5
max_timeout = 120
timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1))
max_timeout = 600 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
orange_start = time.monotonic()
while time.time() < end_at:
if auto_port:
self.port = get_open_port()
Expand All @@ -227,6 +228,8 @@ def open(self):
backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
with open("/tmp/orange", "w") as orange_f:
orange_f.write("open " + str(time.monotonic() - orange_start) + "\n")
self.out("Done!")
atexit.register(self.close)

Expand Down Expand Up @@ -421,12 +424,13 @@ def start(self):
env['KAFKA_OPTS'] = opts
self.render_template(jaas_conf_template, jaas_conf, vars(self))

timeout = 5
max_timeout = 120
timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1))
max_timeout = 600 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
orange_start = time.monotonic()
while time.time() < end_at:
# We have had problems with port conflicts on travis
# so we will try a different port on each retry
Expand All @@ -451,6 +455,8 @@ def start(self):
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
with open("/tmp/orange", "w") as orange_f:
orange_f.write("start " + str(time.monotonic() - orange_start) + "\n")

(self._client,) = self.get_clients(1, client_id='_internal_client')

Expand Down
17 changes: 15 additions & 2 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def test_consumer(kafka_broker, topic):
def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
orange_start = time.monotonic()
consumer.poll(5000)
with open("/tmp/orange", "w") as orange_f:
orange_f.write("test_consumer_topics " + str(time.monotonic() - orange_start) + "\n")
assert topic in consumer.topics()
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
Expand Down Expand Up @@ -74,16 +77,21 @@ def consumer_thread(i):
threads[i] = t

try:
timeout = time.time() + 35
timeout = time.time() + 350
orange_start = time.monotonic()
while True:
logging.info("num_consumers is %r", num_consumers)
for c in range(num_consumers):
logging.info("c is %r", c)

# Verify all consumers have been created
if c not in consumers:
logging.info("c %r not in consumers", c)
break

# Verify all consumers have an assignment
elif not consumers[c].assignment():
logging.info("c %r does not have assignment", c)
break

# If all consumers exist and have an assignment
Expand All @@ -94,11 +102,13 @@ def consumer_thread(i):
# then log state and break while loop
generations = {consumer._coordinator._generation.generation_id
for consumer in list(consumers.values())}
logging.info("generations is %r", generations)

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = any([consumer._coordinator.rejoining
for consumer in list(consumers.values())])
logging.info("rejoining is %r", rejoining)

if not rejoining and len(generations) == 1:
for c, consumer in list(consumers.items()):
Expand All @@ -111,6 +121,9 @@ def consumer_thread(i):
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
time.sleep(1)
assert time.time() < timeout, "timeout waiting for assignments"
time.sleep(0.25) # avoid busy retry
with open("/tmp/orange", "w") as orange_f:
orange_f.write("test_group " + str(time.monotonic() - orange_start) + "\n")

logging.info('Group stabilized; verifying assignment')
group_assignment = set()
Expand Down
28 changes: 7 additions & 21 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
[tox]
envlist = py{38,39,310,311,312,py}, docs
envlist = py312

[pytest]
testpaths = kafka test
#testpaths = test/test_consumer_group.py
testpaths = test/test_sasl_integration.py
#test/test_ssl_integration.py
addopts = --durations=10
log_format = %(created)f %(filename)-23s %(threadName)s %(message)s

[gh-actions]
python =
3.8: py38
3.9: py39
3.10: py310
3.11: py311
3.12: py312
pypy-3.9: pypy

[testenv]
deps =
Expand All @@ -30,22 +27,11 @@ deps =
crc32c
botocore
commands =
pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka}
pytest {posargs:--cov=kafka -vvv --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset}
setenv =
CRC32C_SW_MODE = auto
PROJECT_ROOT = {toxinidir}
RETRY_TIMEOUT_MULTIPLIER = 0.25
MAX_TIMEOUT_MULTIPLIER = 1
passenv = KAFKA_VERSION


[testenv:pypy]
# pylint is super slow on pypy...
commands = pytest {posargs:--cov=kafka}

[testenv:docs]
deps =
sphinx_rtd_theme
sphinx

commands =
sphinx-apidoc -o docs/apidoc/ kafka/
sphinx-build -b html docs/ docs/_build