From a6d0579d3cadd3826dd364b01bc12a2173139abc Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 18:30:02 -0500 Subject: [PATCH 1/7] Update README.rst --- README.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/README.rst b/README.rst index 78a92a884..64f4fb854 100644 --- a/README.rst +++ b/README.rst @@ -17,6 +17,7 @@ Kafka Python client :target: https://github.com/dpkp/kafka-python/blob/master/setup.py +**DUE TO ISSUES WITH RELEASES, IT IS SUGGESTED TO USE https://github.com/wbarnha/kafka-python-ng FOR THE TIME BEING** Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a From deebd8f06eaf951b8f44628e917262b08c84da39 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 23 Apr 2024 09:57:05 -0400 Subject: [PATCH 2/7] Fix badge typo in README.rst --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index ce82c6d3b..794d59a61 100644 --- a/README.rst +++ b/README.rst @@ -11,7 +11,7 @@ Kafka Python client :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE .. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg :target: https://pypistats.org/packages/kafka-python-ng -.. image:: https://img.shields.io/pypi/v/kafka-python.svg +.. image:: https://img.shields.io/pypi/v/kafka-python-ng.svg :target: https://pypi.org/project/kafka-python-ng .. image:: https://img.shields.io/pypi/implementation/kafka-python-ng :target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py From 5e461a7e017130fb9115add8d64291d6966267e9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 12 Jul 2024 01:37:38 -0400 Subject: [PATCH 3/7] Patch pylint warnings so tests pass again (#184) * stop pylint complaint for uncovered conditional flow * add todo to revisit * formatting makes me happy :) * Fix errors raised by new version of Pylint so tests pass again --- kafka/admin/client.py | 5 +++++ kafka/coordinator/consumer.py | 5 +++++ kafka/record/default_records.py | 8 +++++--- kafka/record/legacy_records.py | 2 ++ 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5b01f8fe6..f74e09a80 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): topics=topics, allow_auto_topic_creation=auto_topic_creation ) + else: + raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported") future = self._send_request_to_node( self._client.least_loaded_node(), @@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" if response.API_VERSION <= 3: + group_description = None assert len(response.groups) == 1 for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): if isinstance(response_field, Array): @@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response): if response.API_VERSION <=2: described_group_information_list.append(None) group_description = GroupInformation._make(described_group_information_list) + if group_description is None: + raise Errors.BrokerResponseError("No group description received") error_code = group_description.error_code error_type = Errors.for_code(error_code) # Java has the note: KAFKA-6789, we can retry based on the error code diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cf82b69fe..351641981 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -628,10 +628,15 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in partitions.items()] ) for topic, partitions in offset_data.items()] ) + else: + # TODO: We really shouldn't need this here to begin with, but I'd like to get + # pylint to stop complaining. + raise Exception(f"Unsupported Broker API: {self.config['api_version']}") log.debug("Sending offset-commit request with %s for group %s to %s", offsets, self.group_id, node_id) + future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 8b630cc8b..06be57621 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None: data = memoryview(self._buffer)[self._pos:] if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) - if compression_type == self.CODEC_SNAPPY: + elif compression_type == self.CODEC_SNAPPY: uncompressed = snappy_decode(data.tobytes()) - if compression_type == self.CODEC_LZ4: + elif compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) - if compression_type == self.CODEC_ZSTD: + elif compression_type == self.CODEC_ZSTD: uncompressed = zstd_decode(data.tobytes()) + else: + raise NotImplementedError(f"Compression type {compression_type} is not supported") self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 4439462f6..44b365b06 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool: compressed = lz4_encode_old_kafka(data) else: compressed = lz4_encode(data) + else: + raise NotImplementedError(f"Compression type {self._compression_type} is not supported") size = self.size_in_bytes( 0, timestamp=0, key=None, value=compressed) # We will try to reuse the same buffer if we have enough space From 401896b42a32c356a5453859ae576d166b051afd Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Wed, 17 Jul 2024 11:35:53 -0400 Subject: [PATCH 4/7] Update README.rst to close #179 --- README.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.rst b/README.rst index c2b3d3e67..1a6a8050a 100644 --- a/README.rst +++ b/README.rst @@ -47,6 +47,11 @@ documentation, please see readthedocs and/or python's inline help. $ pip install kafka-python-ng +For those who are concerned regarding the security of this package: +This project uses https://docs.pypi.org/trusted-publishers/ in GitHub +Actions to publish artifacts in https://github.com/wbarnha/kafka-python-ng/deployments/pypi. +This project was forked to keep the project alive for future versions of +Python and Kafka, since `kafka-python` is unable to publish releases in the meantime. KafkaConsumer ************* From 31a6b92e3ff5265dc1f184250115532a30618cc2 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Sat, 10 Aug 2024 00:00:51 +1000 Subject: [PATCH 5/7] Avoid busy retry (#192) Test test/test_consumer_group.py::test_group and test/test_admin_integration.py::test_describe_consumer_group_exists busy-retry and this might have caused Java not having enough CPU time on GitHub runner, and result in test failure. --- test/test_admin_integration.py | 1 + test/test_consumer_group.py | 1 + 2 files changed, 2 insertions(+) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 283023049..0eb06b18d 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -220,6 +220,7 @@ def consumer_thread(i, group_id): else: sleep(1) assert time() < timeout, "timeout waiting for assignments" + sleep(0.25) info('Group stabilized; verifying assignment') output = kafka_admin_client.describe_consumer_groups(group_id_list) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index ed6863fa2..abd0cfe09 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -111,6 +111,7 @@ def consumer_thread(i): logging.info('Rejoining: %s, generations: %s', rejoining, generations) time.sleep(1) assert time.time() < timeout, "timeout waiting for assignments" + time.sleep(0.25) logging.info('Group stabilized; verifying assignment') group_assignment = set() From 9bee9fc599c473437ebec8d90dd22ae7ed7a9bc8 Mon Sep 17 00:00:00 2001 From: debuggings Date: Thu, 15 Aug 2024 10:41:27 +0800 Subject: [PATCH 6/7] fix scram username character escape (#196) According to [rfc5802](https://datatracker.ietf.org/doc/html/rfc5802), username should escape special characters before sending to the server. > The characters ',' or '=' in usernames are sent as '=2C' and '=3D' respectively. If the server receives a username that contains '=' not followed by either '2C' or '3D', then the server MUST fail the authentication. --- kafka/scram.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/scram.py b/kafka/scram.py index 74f4716bd..236ae2149 100644 --- a/kafka/scram.py +++ b/kafka/scram.py @@ -30,7 +30,7 @@ def __init__(self, user, password, mechanism): self.server_signature = None def first_message(self): - client_first_bare = f'n={self.user},r={self.nonce}' + client_first_bare = f'n={self.user.replace("=","=3D").replace(",","=2C")},r={self.nonce}' self.auth_message += client_first_bare return 'n,,' + client_first_bare From 61046232200688ceaba9726ab963b643b223b1d4 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Thu, 3 Oct 2024 10:51:51 +1000 Subject: [PATCH 7/7] Improve test/test_consumer_integration.py in GitHub runner (#194) test/test_consumer_integration.py::test_kafka_consumer__blocking failed in https://github.com/wbarnha/kafka-python-ng/actions/runs/10361086008/job/28680735389?pr=186 because it took 592ms to finish. Output from the GitHub runner attached This commit increase TIMEOUT_MS so it is less likely to fail on GitHub runner. # Ask for 5 messages, 10 in queue. Get 5 back, no blocking messages = [] with Timer() as t: for i in range(5): msg = next(consumer) messages.append(msg) assert_message_count(messages, 5) > assert t.interval < (TIMEOUT_MS / 1000.0) E assert 0.5929090976715088 < (500 / 1000.0) E + where 0.5929090976715088 = .interval Co-authored-by: William Barnhart --- test/test_consumer_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index d3165cd63..62aad5f97 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -61,7 +61,7 @@ def test_kafka_consumer_unsupported_encoding( @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): - TIMEOUT_MS = 500 + TIMEOUT_MS = 1000 consumer = kafka_consumer_factory(auto_offset_reset='earliest', enable_auto_commit=False, consumer_timeout_ms=TIMEOUT_MS) @@ -70,7 +70,7 @@ def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): consumer.unsubscribe() consumer.assign([TopicPartition(topic, 0)]) - # Ask for 5 messages, nothing in queue, block 500ms + # Ask for 5 messages, nothing in queue, block 1000ms with Timer() as t: with pytest.raises(StopIteration): msg = next(consumer) @@ -87,7 +87,7 @@ def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): assert_message_count(messages, 5) assert t.interval < (TIMEOUT_MS / 1000.0) - # Ask for 10 messages, get 5 back, block 500ms + # Ask for 10 messages, get 5 back, block 1000ms messages = [] with Timer() as t: with pytest.raises(StopIteration):