diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index bfe7beb6c0e..e65b720e89c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -92,6 +92,7 @@ /composer/**/* @GoogleCloudPlatform/cloud-dpes-composer @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers /pubsub/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers /pubsublite/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers +/managedkafka/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers /cloud_tasks/**/* @GoogleCloudPlatform/torus-dpe @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers # For practicing diff --git a/managedkafka/snippets/clusters/clusters_test.py b/managedkafka/snippets/clusters/clusters_test.py new file mode 100644 index 00000000000..9417283d42d --- /dev/null +++ b/managedkafka/snippets/clusters/clusters_test.py @@ -0,0 +1,154 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +import create_cluster +import delete_cluster +import get_cluster +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import list_clusters +import pytest +import update_cluster + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CLUSTER_ID = "test-cluster-id" + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.create_cluster") +def test_create_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + cpu = 3 + memory_bytes = 3221225472 + subnet = "test-subnet" + operation = mock.MagicMock(spec=Operation) + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + operation.result = mock.MagicMock(return_value=cluster) + mock_method.return_value = operation + + create_cluster.create_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + subnet=subnet, + cpu=cpu, + memory_bytes=memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Created cluster" in out + assert CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_cluster") +def test_get_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + mock_method.return_value = cluster + + get_cluster.get_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got cluster" in out + assert CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_cluster") +def test_update_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + new_memory_bytes = 3221225475 + operation = mock.MagicMock(spec=Operation) + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + cluster.capacity_config.memory_bytes = new_memory_bytes + operation.result = mock.MagicMock(return_value=cluster) + mock_method.return_value = operation + + update_cluster.update_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + memory_bytes=new_memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Updated cluster" in out + assert CLUSTER_ID in out + assert str(new_memory_bytes) in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_clusters") +def test_list_clusters( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + response = [cluster] + mock_method.return_value = response + + list_clusters.list_clusters( + project_id=PROJECT_ID, + region=REGION, + ) + + out, _ = capsys.readouterr() + assert "Got cluster" in out + assert CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_cluster") +def test_delete_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + operation = mock.MagicMock(spec=Operation) + mock_method.return_value = operation + + delete_cluster.delete_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted cluster" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/clusters/create_cluster.py b/managedkafka/snippets/clusters/create_cluster.py new file mode 100644 index 00000000000..562c0dd0612 --- /dev/null +++ b/managedkafka/snippets/clusters/create_cluster.py @@ -0,0 +1,71 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_create_cluster] +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import managedkafka_v1 + + +def create_cluster( + project_id: str, + region: str, + cluster_id: str, + subnet: str, + cpu: int, + memory_bytes: int, +) -> None: + """ + Create a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + subnet: VPC subnet from which the cluster is accessible. The expected format is projects/{project_id}/regions{region}/subnetworks/{subnetwork}. + cpu: Number of vCPUs to provision for the cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the exception if the operation errors or + the timeout before the operation completes is reached. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + cluster = managedkafka_v1.Cluster() + cluster.name = client.cluster_path(project_id, region, cluster_id) + cluster.capacity_config.vcpu_count = cpu + cluster.capacity_config.memory_bytes = memory_bytes + cluster.gcp_config.access_config.network_configs.subnet = subnet + cluster.rebalance_config.mode = ( + managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP + ) + + request = managedkafka_v1.CreateClusterRequest( + parent=client.common_location_path(project_id, region), + cluster_id=cluster_id, + cluster=cluster, + ) + + try: + # The duration of this operation can vary considerably, typically taking 10-40 minutes. + # We can set a timeout of 3000s (50 minutes). + operation = client.create_cluster(request=request, timeout=3000) + response = operation.result() + print("Created cluster:", response) + except GoogleAPICallError: + print(operation.operation.error) + + +# [END managedkafka_create_cluster] diff --git a/managedkafka/snippets/clusters/delete_cluster.py b/managedkafka/snippets/clusters/delete_cluster.py new file mode 100644 index 00000000000..6dc3bfef504 --- /dev/null +++ b/managedkafka/snippets/clusters/delete_cluster.py @@ -0,0 +1,52 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_delete_cluster] +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import managedkafka_v1 + + +def delete_cluster( + project_id: str, + region: str, + cluster_id: str, +) -> None: + """ + Delete a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + + Raises: + This method will raise the exception if the operation errors or + the timeout before the operation completes is reached. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.DeleteClusterRequest( + name=client.cluster_path(project_id, region, cluster_id), + ) + + try: + operation = client.delete_cluster(request=request) + operation.result() + print("Deleted cluster") + except GoogleAPICallError: + print(operation.operation.error) + + +# [END managedkafka_delete_cluster] diff --git a/managedkafka/snippets/clusters/get_cluster.py b/managedkafka/snippets/clusters/get_cluster.py new file mode 100644 index 00000000000..4f9f7096ef3 --- /dev/null +++ b/managedkafka/snippets/clusters/get_cluster.py @@ -0,0 +1,46 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_get_cluster] +from google.cloud import managedkafka_v1 + + +def get_cluster( + project_id: str, + region: str, + cluster_id: str, +) -> managedkafka_v1.Cluster: + """ + Get a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + cluster_path = client.cluster_path(project_id, region, cluster_id) + request = managedkafka_v1.GetClusterRequest( + name=cluster_path, + ) + + cluster = client.get_cluster(request=request) + print("Got cluster:", cluster) + + return cluster + + +# [END managedkafka_get_cluster] diff --git a/managedkafka/snippets/clusters/list_clusters.py b/managedkafka/snippets/clusters/list_clusters.py new file mode 100644 index 00000000000..643df8ad060 --- /dev/null +++ b/managedkafka/snippets/clusters/list_clusters.py @@ -0,0 +1,46 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_list_clusters] +from typing import List + +from google.cloud import managedkafka_v1 + + +def list_clusters( + project_id: str, + region: str, +) -> List[str]: + """ + List Kafka clusters in a given project ID and region. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.ListClustersRequest( + parent=client.common_location_path(project_id, region), + ) + + response = client.list_clusters(request=request) + for cluster in response: + print("Got cluster:", cluster) + + return [cluster.name for cluster in response] + + +# [END managedkafka_list_clusters] diff --git a/managedkafka/snippets/clusters/update_cluster.py b/managedkafka/snippets/clusters/update_cluster.py new file mode 100644 index 00000000000..fe5206bab02 --- /dev/null +++ b/managedkafka/snippets/clusters/update_cluster.py @@ -0,0 +1,59 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_update_cluster] +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import managedkafka_v1 +from google.protobuf import field_mask_pb2 + + +def update_cluster( + project_id: str, region: str, cluster_id: str, memory_bytes: int +) -> None: + """ + Update a Kafka cluster. For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the exception if the operation errors or + the timeout before the operation completes is reached. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + cluster = managedkafka_v1.Cluster() + cluster.name = client.cluster_path(project_id, region, cluster_id) + cluster.capacity_config.memory_bytes = memory_bytes + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("capacity_config.memory_bytes") + + request = managedkafka_v1.UpdateClusterRequest( + update_mask=update_mask, + cluster=cluster, + ) + + try: + operation = client.update_cluster(request=request) + response = operation.result() + print("Updated cluster:", response) + except GoogleAPICallError: + print(operation.operation.error) + + +# [END managedkafka_update_cluster] diff --git a/managedkafka/snippets/consumergroups/consumer_groups_test.py b/managedkafka/snippets/consumergroups/consumer_groups_test.py new file mode 100644 index 00000000000..8467470bcdf --- /dev/null +++ b/managedkafka/snippets/consumergroups/consumer_groups_test.py @@ -0,0 +1,132 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +import delete_consumer_group +import get_consumer_group +from google.cloud import managedkafka_v1 +import list_consumer_groups +import pytest +import update_consumer_group + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CLUSTER_ID = "test-cluster-id" +CONSUMER_GROUP_ID = "test-consumer-group-id" + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_consumer_group") +def test_get_consumer_group( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( + PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID + ) + mock_method.return_value = consumer_group + + get_consumer_group.get_consumer_group( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + consumer_group_id=CONSUMER_GROUP_ID, + ) + + out, _ = capsys.readouterr() + assert "Got consumer group" in out + assert CONSUMER_GROUP_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_consumer_group") +def test_update_consumer_group( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + new_partition_offsets = {10: 10} + topic_path = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, "test-topic-id" + ) + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( + PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID + ) + topic_metadata = managedkafka_v1.ConsumerTopicMetadata() + for partition, offset in new_partition_offsets.items(): + partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset) + topic_metadata.partitions = {partition: partition_metadata} + consumer_group.topics = { + topic_path: topic_metadata, + } + mock_method.return_value = consumer_group + + update_consumer_group.update_consumer_group( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + consumer_group_id=CONSUMER_GROUP_ID, + topic_path=topic_path, + partition_offsets=new_partition_offsets, + ) + + out, _ = capsys.readouterr() + assert "Updated consumer group" in out + assert topic_path in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_consumer_groups") +def test_list_consumer_groups( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( + PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID + ) + response = [consumer_group] + mock_method.return_value = response + + list_consumer_groups.list_consumer_groups( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got consumer group" in out + assert CONSUMER_GROUP_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_consumer_group") +def test_delete_consumer_group( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + mock_method.return_value = None + + delete_consumer_group.delete_consumer_group( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + consumer_group_id=CONSUMER_GROUP_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted consumer group" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/consumergroups/delete_consumer_group.py b/managedkafka/snippets/consumergroups/delete_consumer_group.py new file mode 100644 index 00000000000..b3102079493 --- /dev/null +++ b/managedkafka/snippets/consumergroups/delete_consumer_group.py @@ -0,0 +1,55 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_delete_consumergroup] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def delete_consumer_group( + project_id: str, + region: str, + cluster_id: str, + consumer_group_id: str, +) -> None: + """ + Delete a Kafka consumer group. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + consumer_group_id: ID of the Kafka consumer group. + + Raises: + This method will raise the exception if the consumer group is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + consumer_group_path = client.consumer_group_path( + project_id, region, cluster_id, consumer_group_id + ) + request = managedkafka_v1.DeleteConsumerGroupRequest( + name=consumer_group_path, + ) + + try: + client.delete_consumer_group(request=request) + print("Deleted consumer group") + except NotFound: + print(f"Consumer group {consumer_group_path} not found") + + +# [END managedkafka_delete_consumergroup] diff --git a/managedkafka/snippets/consumergroups/get_consumer_group.py b/managedkafka/snippets/consumergroups/get_consumer_group.py new file mode 100644 index 00000000000..b293899aa35 --- /dev/null +++ b/managedkafka/snippets/consumergroups/get_consumer_group.py @@ -0,0 +1,50 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_get_consumergroup] +from google.cloud import managedkafka_v1 + + +def get_consumer_group( + project_id: str, + region: str, + cluster_id: str, + consumer_group_id: str, +) -> managedkafka_v1.ConsumerGroup: + """ + Get a Kafka consumer group. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + consumer_group_id: ID of the Kafka consumer group. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + consumer_group_path = client.consumer_group_path( + project_id, region, cluster_id, consumer_group_id + ) + request = managedkafka_v1.GetConsumerGroupRequest( + name=consumer_group_path, + ) + + consumer_group = client.get_consumer_group(request=request) + print("Got consumer group:", consumer_group) + + return consumer_group + + +# [END managedkafka_get_consumergroup] diff --git a/managedkafka/snippets/consumergroups/list_consumer_groups.py b/managedkafka/snippets/consumergroups/list_consumer_groups.py new file mode 100644 index 00000000000..bf2bb78acf0 --- /dev/null +++ b/managedkafka/snippets/consumergroups/list_consumer_groups.py @@ -0,0 +1,47 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_list_consumergroups] +from typing import List + +from google.cloud import managedkafka_v1 + + +def list_consumer_groups( + project_id: str, + region: str, + cluster_id: str, +) -> List[str]: + """ + List Kafka consumer groups in a cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.ListConsumerGroupsRequest( + parent=client.cluster_path(project_id, region, cluster_id), + ) + + response = client.list_consumer_groups(request=request) + for consumer_group in response: + print("Got consumer group:", consumer_group) + + return [consumer_group.name for consumer_group in response] + +# [END managedkafka_list_consumergroups] diff --git a/managedkafka/snippets/consumergroups/update_consumer_group.py b/managedkafka/snippets/consumergroups/update_consumer_group.py new file mode 100644 index 00000000000..a255dcadd3a --- /dev/null +++ b/managedkafka/snippets/consumergroups/update_consumer_group.py @@ -0,0 +1,74 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_update_consumergroup] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 +from google.protobuf import field_mask_pb2 + + +def update_consumer_group( + project_id: str, + region: str, + cluster_id: str, + consumer_group_id: str, + topic_path: str, + partition_offsets: dict[int, int], +) -> None: + """ + Update a single partition's offset in a Kafka consumer group. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + consumer_group_id: ID of the Kafka consumer group. + topic_path: Name of the Kafka topic. + partition_offsets: Configuration of the topic, represented as a map of partition indexes to their offset value. + + Raises: + This method will raise the exception if the consumer group is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = client.consumer_group_path( + project_id, region, cluster_id, consumer_group_id + ) + + topic_metadata = managedkafka_v1.ConsumerTopicMetadata() + for partition, offset in partition_offsets.items(): + partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset) + topic_metadata.partitions[partition] = partition_metadata + consumer_group.topics = { + topic_path: topic_metadata, + } + + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("topics") + + request = managedkafka_v1.UpdateConsumerGroupRequest( + update_mask=update_mask, + consumer_group=consumer_group, + ) + + try: + response = client.update_consumer_group(request=request) + print("Updated consumer group:", response) + except NotFound: + print(f"Consumer group {consumer_group.name} not found") + + +# [END managedkafka_update_consumergroup] diff --git a/managedkafka/snippets/requirements.txt b/managedkafka/snippets/requirements.txt new file mode 100644 index 00000000000..3b44d914c72 --- /dev/null +++ b/managedkafka/snippets/requirements.txt @@ -0,0 +1,2 @@ +protobuf==5.27.2 +pytest==8.2.2 diff --git a/managedkafka/snippets/topics/create_topic.py b/managedkafka/snippets/topics/create_topic.py new file mode 100644 index 00000000000..f758d5624f5 --- /dev/null +++ b/managedkafka/snippets/topics/create_topic.py @@ -0,0 +1,66 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_create_topic] +from google.api_core.exceptions import AlreadyExists +from google.cloud import managedkafka_v1 + + +def create_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, + partition_count: int, + replication_factor: int, + configs: dict[str, str], +) -> None: + """ + Create a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + partition_count: Number of partitions in a topic.. + replication_factor: Number of replicas of each partition. + configs: Configuration of the topic. For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs. + + Raises: + This method will raise the exception if the topic already exists. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic = managedkafka_v1.Topic() + topic.name = client.topic_path(project_id, region, cluster_id, topic_id) + topic.partition_count = partition_count + topic.replication_factor = replication_factor + topic.configs = configs + + request = managedkafka_v1.CreateTopicRequest( + parent=client.cluster_path(project_id, region, cluster_id), + topic_id=topic_id, + topic=topic, + ) + + try: + response = client.create_topic(request=request) + print("Created topic:", response.name) + except AlreadyExists: + print(f"{topic.name} already exists") + + +# [END managedkafka_create_topic] diff --git a/managedkafka/snippets/topics/delete_topic.py b/managedkafka/snippets/topics/delete_topic.py new file mode 100644 index 00000000000..e9390eb1ba0 --- /dev/null +++ b/managedkafka/snippets/topics/delete_topic.py @@ -0,0 +1,51 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_delete_topic] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def delete_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, +) -> None: + """ + Delete a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + + Raises: + This method will raise the exception if the topic is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic_path = client.topic_path(project_id, region, cluster_id, topic_id) + request = managedkafka_v1.DeleteTopicRequest(name=topic_path) + + try: + client.delete_topic(request=request) + print("Deleted topic") + except NotFound: + print(f"Topic {topic_path} not found") + + +# [END managedkafka_delete_topic] diff --git a/managedkafka/snippets/topics/get_topic.py b/managedkafka/snippets/topics/get_topic.py new file mode 100644 index 00000000000..9f69410b895 --- /dev/null +++ b/managedkafka/snippets/topics/get_topic.py @@ -0,0 +1,48 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_get_topic] +from google.cloud import managedkafka_v1 + + +def get_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, +) -> managedkafka_v1.Topic: + """ + Get a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic_path = client.topic_path(project_id, region, cluster_id, topic_id) + request = managedkafka_v1.GetTopicRequest( + name=topic_path, + ) + + topic = client.get_topic(request=request) + print("Got topic:", topic) + + return topic + + +# [END managedkafka_get_topic] diff --git a/managedkafka/snippets/topics/list_topics.py b/managedkafka/snippets/topics/list_topics.py new file mode 100644 index 00000000000..2d0ade0e0e0 --- /dev/null +++ b/managedkafka/snippets/topics/list_topics.py @@ -0,0 +1,47 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_list_topics] +from typing import List + +from google.cloud import managedkafka_v1 + + +def list_topics( + project_id: str, + region: str, + cluster_id: str, +) -> List[str]: + """ + List Kafka topics in a cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.ListTopicsRequest( + parent=client.cluster_path(project_id, region, cluster_id), + ) + + response = client.list_topics(request=request) + for topic in response: + print("Got topic:", topic) + + return [topic.name for topic in response] + +# [END managedkafka_list_topics] diff --git a/managedkafka/snippets/topics/topics_test.py b/managedkafka/snippets/topics/topics_test.py new file mode 100644 index 00000000000..62bd11fc4f0 --- /dev/null +++ b/managedkafka/snippets/topics/topics_test.py @@ -0,0 +1,160 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +import create_topic +import delete_topic +import get_topic +from google.cloud import managedkafka_v1 +import list_topics +import pytest +import update_topic + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CLUSTER_ID = "test-cluster-id" +TOPIC_ID = "test-topic-id" + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.create_topic") +def test_create_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + partition_count = 10 + replication_factor = 3 + configs = {"min.insync.replicas": "1"} + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + topic.partition_count = partition_count + topic.replication_factor = replication_factor + topic.configs = configs + mock_method.return_value = topic + + create_topic.create_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + partition_count=partition_count, + replication_factor=replication_factor, + configs=configs, + ) + + out, _ = capsys.readouterr() + assert "Created topic" in out + assert TOPIC_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_topic") +def test_get_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + mock_method.return_value = topic + + get_topic.get_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + ) + + out, _ = capsys.readouterr() + assert "Got topic" in out + assert TOPIC_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_topic") +def test_update_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + new_partition_count = 20 + new_configs = {"min.insync.replicas": "2"} + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + topic.partition_count + topic.configs = new_configs + mock_method.return_value = topic + + update_topic.update_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + partition_count=new_partition_count, + configs=new_configs, + ) + + out, _ = capsys.readouterr() + assert "Updated topic" in out + assert TOPIC_ID in out + assert 'min.insync.replicas"\n value: "2"' in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_topics") +def test_list_topics( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + response = [topic] + mock_method.return_value = response + + list_topics.list_topics( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got topic" in out + assert TOPIC_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_topic") +def test_delete_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + mock_method.return_value = None + + delete_topic.delete_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted topic" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/topics/update_topic.py b/managedkafka/snippets/topics/update_topic.py new file mode 100644 index 00000000000..8ee71960eb3 --- /dev/null +++ b/managedkafka/snippets/topics/update_topic.py @@ -0,0 +1,65 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_update_topic] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 +from google.protobuf import field_mask_pb2 + + +def update_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, + partition_count: int, + configs: dict[str, str], +) -> None: + """ + Update a Kafka topic. For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + partition_count: Number of partitions in a topic.. + configs: Configuration of the topic. + + Raises: + This method will raise the exception if the topic is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic = managedkafka_v1.Topic() + topic.name = client.topic_path(project_id, region, cluster_id, topic_id) + topic.partition_count = partition_count + topic.configs = configs + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.extend(["partition_count", "configs"]) + + request = managedkafka_v1.UpdateTopicRequest( + update_mask=update_mask, + topic=topic, + ) + + try: + response = client.update_topic(request=request) + print("Updated topic:", response) + except NotFound: + print(f"Topic {topic.name} not found") + + +# [END managedkafka_update_topic]