Skip to content

Reconfigure tests to complete in a more timely manner and skip some iterations for Kafka 0.8.2 and Python 3.12 #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 8, 2024
10 changes: 7 additions & 3 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ jobs:
- "3.10"
- "3.11"
- "3.12"
- "pypy3.9"
experimental: [ false ]
include:
- python-version: "pypy3.9"
experimental: true
- python-version: "~3.13.0-0"
experimental: true
steps:
Expand Down Expand Up @@ -115,11 +114,11 @@ jobs:
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
kafka-version:
- "0.8.2.2"
- "0.9.0.1"
- "0.10.2.2"
- "0.11.0.2"
Expand All @@ -128,6 +127,11 @@ jobs:
- "2.4.0"
- "2.5.0"
- "2.6.0"
experimental: [false]
include:
- kafka-version: '0.8.2.2'
experimental: true
continue-on-error: ${{ matrix.experimental }}
steps:
- name: Checkout the source code
uses: actions/checkout@v4
Expand Down
5 changes: 5 additions & 0 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import platform

import pytest

from logging import info
Expand Down Expand Up @@ -151,6 +153,9 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client):
group_description = kafka_admin_client.describe_consumer_groups(['test'])


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline."
)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
Expand Down
4 changes: 4 additions & 0 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import logging
import platform
import threading
import time

Expand Down Expand Up @@ -40,6 +41,9 @@ def test_consumer_topics(kafka_broker, topic):
consumer.close()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Works on PyPy if run locally, but not in CI/CD pipeline."
)
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
Expand Down
1 change: 1 addition & 0 deletions test/test_partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest


from kafka.partitioner import DefaultPartitioner, murmur2


Expand Down
6 changes: 5 additions & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gc
import platform
import sys
import time
import threading

Expand All @@ -10,6 +11,7 @@
from test.testutil import env_kafka_version, random_string


@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
def test_buffer_pool():
pool = SimpleBufferPool(1000, 1000)

Expand All @@ -21,8 +23,8 @@ def test_buffer_pool():
buf2 = pool.allocate(1000, 1000)
assert buf2.read() == b''


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
if compression == 'lz4':
Expand Down Expand Up @@ -70,6 +72,7 @@ def test_end_to_end(kafka_broker, compression):

@pytest.mark.skipif(platform.python_implementation() != 'CPython',
reason='Test relies on CPython-specific gc policies')
@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
def test_kafka_producer_gc_cleanup():
gc.collect()
threads = threading.active_count()
Expand All @@ -81,6 +84,7 @@ def test_kafka_producer_gc_cleanup():


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
Expand Down