diff --git a/README.rst b/README.rst index ce82c6d3b..1a6a8050a 100644 --- a/README.rst +++ b/README.rst @@ -11,12 +11,13 @@ Kafka Python client :target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE .. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg :target: https://pypistats.org/packages/kafka-python-ng -.. image:: https://img.shields.io/pypi/v/kafka-python.svg +.. image:: https://img.shields.io/pypi/v/kafka-python-ng.svg :target: https://pypi.org/project/kafka-python-ng .. image:: https://img.shields.io/pypi/implementation/kafka-python-ng :target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py +**DUE TO ISSUES WITH RELEASES, IT IS SUGGESTED TO USE https://github.com/wbarnha/kafka-python-ng FOR THE TIME BEING** Python client for the Apache Kafka distributed stream processing system. kafka-python-ng is designed to function much like the official java client, with a @@ -46,6 +47,11 @@ documentation, please see readthedocs and/or python's inline help. $ pip install kafka-python-ng +For those who are concerned regarding the security of this package: +This project uses https://docs.pypi.org/trusted-publishers/ in GitHub +Actions to publish artifacts in https://github.com/wbarnha/kafka-python-ng/deployments/pypi. +This project was forked to keep the project alive for future versions of +Python and Kafka, since `kafka-python` is unable to publish releases in the meantime. KafkaConsumer ************* diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5b01f8fe6..f74e09a80 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): topics=topics, allow_auto_topic_creation=auto_topic_creation ) + else: + raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported") future = self._send_request_to_node( self._client.least_loaded_node(), @@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" if response.API_VERSION <= 3: + group_description = None assert len(response.groups) == 1 for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): if isinstance(response_field, Array): @@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response): if response.API_VERSION <=2: described_group_information_list.append(None) group_description = GroupInformation._make(described_group_information_list) + if group_description is None: + raise Errors.BrokerResponseError("No group description received") error_code = group_description.error_code error_type = Errors.for_code(error_code) # Java has the note: KAFKA-6789, we can retry based on the error code diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cf82b69fe..351641981 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -628,10 +628,15 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in partitions.items()] ) for topic, partitions in offset_data.items()] ) + else: + # TODO: We really shouldn't need this here to begin with, but I'd like to get + # pylint to stop complaining. + raise Exception(f"Unsupported Broker API: {self.config['api_version']}") log.debug("Sending offset-commit request with %s for group %s to %s", offsets, self.group_id, node_id) + future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 8b630cc8b..06be57621 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None: data = memoryview(self._buffer)[self._pos:] if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) - if compression_type == self.CODEC_SNAPPY: + elif compression_type == self.CODEC_SNAPPY: uncompressed = snappy_decode(data.tobytes()) - if compression_type == self.CODEC_LZ4: + elif compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) - if compression_type == self.CODEC_ZSTD: + elif compression_type == self.CODEC_ZSTD: uncompressed = zstd_decode(data.tobytes()) + else: + raise NotImplementedError(f"Compression type {compression_type} is not supported") self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 4439462f6..44b365b06 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool: compressed = lz4_encode_old_kafka(data) else: compressed = lz4_encode(data) + else: + raise NotImplementedError(f"Compression type {self._compression_type} is not supported") size = self.size_in_bytes( 0, timestamp=0, key=None, value=compressed) # We will try to reuse the same buffer if we have enough space diff --git a/kafka/scram.py b/kafka/scram.py index 74f4716bd..236ae2149 100644 --- a/kafka/scram.py +++ b/kafka/scram.py @@ -30,7 +30,7 @@ def __init__(self, user, password, mechanism): self.server_signature = None def first_message(self): - client_first_bare = f'n={self.user},r={self.nonce}' + client_first_bare = f'n={self.user.replace("=","=3D").replace(",","=2C")},r={self.nonce}' self.auth_message += client_first_bare return 'n,,' + client_first_bare diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 283023049..0eb06b18d 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -220,6 +220,7 @@ def consumer_thread(i, group_id): else: sleep(1) assert time() < timeout, "timeout waiting for assignments" + sleep(0.25) info('Group stabilized; verifying assignment') output = kafka_admin_client.describe_consumer_groups(group_id_list) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index ed6863fa2..abd0cfe09 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -111,6 +111,7 @@ def consumer_thread(i): logging.info('Rejoining: %s, generations: %s', rejoining, generations) time.sleep(1) assert time.time() < timeout, "timeout waiting for assignments" + time.sleep(0.25) logging.info('Group stabilized; verifying assignment') group_assignment = set() diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index d3165cd63..62aad5f97 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -61,7 +61,7 @@ def test_kafka_consumer_unsupported_encoding( @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): - TIMEOUT_MS = 500 + TIMEOUT_MS = 1000 consumer = kafka_consumer_factory(auto_offset_reset='earliest', enable_auto_commit=False, consumer_timeout_ms=TIMEOUT_MS) @@ -70,7 +70,7 @@ def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): consumer.unsubscribe() consumer.assign([TopicPartition(topic, 0)]) - # Ask for 5 messages, nothing in queue, block 500ms + # Ask for 5 messages, nothing in queue, block 1000ms with Timer() as t: with pytest.raises(StopIteration): msg = next(consumer) @@ -87,7 +87,7 @@ def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): assert_message_count(messages, 5) assert t.interval < (TIMEOUT_MS / 1000.0) - # Ask for 10 messages, get 5 back, block 500ms + # Ask for 10 messages, get 5 back, block 1000ms messages = [] with Timer() as t: with pytest.raises(StopIteration):