From 78d8b755f284b70a0784bce3dbe75273141153c1 Mon Sep 17 00:00:00 2001 From: Adam Luong Date: Tue, 2 Jul 2024 14:22:15 -0400 Subject: [PATCH 1/5] samples(managedkafka): Add snippets for all API methods --- .../snippets/clusters/clusters_test.py | 157 +++++++++++++++++ .../snippets/clusters/create_cluster.py | 70 ++++++++ .../snippets/clusters/delete_cluster.py | 52 ++++++ managedkafka/snippets/clusters/get_cluster.py | 51 ++++++ .../snippets/clusters/list_clusters.py | 42 +++++ .../snippets/clusters/update_cluster.py | 59 +++++++ .../consumergroups/consumer_groups_test.py | 134 +++++++++++++++ .../consumergroups/delete_consumer_group.py | 55 ++++++ .../consumergroups/get_consumer_group.py | 55 ++++++ .../consumergroups/list_consumer_groups.py | 44 +++++ .../consumergroups/update_consumer_group.py | 74 ++++++++ managedkafka/snippets/topics/create_topic.py | 66 +++++++ managedkafka/snippets/topics/delete_topic.py | 51 ++++++ managedkafka/snippets/topics/get_topic.py | 53 ++++++ managedkafka/snippets/topics/list_topics.py | 44 +++++ managedkafka/snippets/topics/topics_test.py | 162 ++++++++++++++++++ managedkafka/snippets/topics/update_topic.py | 65 +++++++ 17 files changed, 1234 insertions(+) create mode 100644 managedkafka/snippets/clusters/clusters_test.py create mode 100644 managedkafka/snippets/clusters/create_cluster.py create mode 100644 managedkafka/snippets/clusters/delete_cluster.py create mode 100644 managedkafka/snippets/clusters/get_cluster.py create mode 100644 managedkafka/snippets/clusters/list_clusters.py create mode 100644 managedkafka/snippets/clusters/update_cluster.py create mode 100644 managedkafka/snippets/consumergroups/consumer_groups_test.py create mode 100644 managedkafka/snippets/consumergroups/delete_consumer_group.py create mode 100644 managedkafka/snippets/consumergroups/get_consumer_group.py create mode 100644 managedkafka/snippets/consumergroups/list_consumer_groups.py create mode 100644 managedkafka/snippets/consumergroups/update_consumer_group.py create mode 100644 managedkafka/snippets/topics/create_topic.py create mode 100644 managedkafka/snippets/topics/delete_topic.py create mode 100644 managedkafka/snippets/topics/get_topic.py create mode 100644 managedkafka/snippets/topics/list_topics.py create mode 100644 managedkafka/snippets/topics/topics_test.py create mode 100644 managedkafka/snippets/topics/update_topic.py diff --git a/managedkafka/snippets/clusters/clusters_test.py b/managedkafka/snippets/clusters/clusters_test.py new file mode 100644 index 00000000000..18d02aee7de --- /dev/null +++ b/managedkafka/snippets/clusters/clusters_test.py @@ -0,0 +1,157 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock +from google.cloud import managedkafka_v1 +from google.api_core.operation import Operation +from google.protobuf import message + +import get_cluster +import create_cluster +import delete_cluster +import update_cluster +import list_clusters + +import pytest + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CLUSTER_ID = "test-cluster-id" + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.create_cluster") +def test_create_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + cpu = 3 + memory_bytes = 3221225472 + subnet = "test-subnet" + operation = mock.MagicMock(spec=Operation) + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + operation.result = mock.MagicMock(return_value=cluster) + mock_method.return_value = operation + + create_cluster.create_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + subnet=subnet, + cpu=cpu, + memory_bytes=memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Created cluster" in out + assert CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_cluster") +def test_get_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + mock_method.return_value = cluster + + get_cluster.get_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got cluster" in out + assert CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_cluster") +def test_update_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + new_memory_bytes = 3221225475 + operation = mock.MagicMock(spec=Operation) + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + cluster.capacity_config.memory_bytes = new_memory_bytes + operation.result = mock.MagicMock(return_value=cluster) + mock_method.return_value = operation + + update_cluster.update_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + memory_bytes=new_memory_bytes, + ) + + out, _ = capsys.readouterr() + assert "Updated cluster" in out + assert CLUSTER_ID in out + assert str(new_memory_bytes) in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_clusters") +def test_list_clusters( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + cluster = managedkafka_v1.Cluster() + cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( + PROJECT_ID, REGION, CLUSTER_ID + ) + response = managedkafka_v1.ListClustersResponse() + response.clusters.append(cluster) + mock_method.return_value = [response] + + list_clusters.list_clusters( + project_id=PROJECT_ID, + region=REGION, + ) + + out, _ = capsys.readouterr() + assert "Got cluster" in out + assert CLUSTER_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_cluster") +def test_delete_cluster( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + operation = mock.MagicMock(spec=Operation) + mock_method.return_value = operation + + delete_cluster.delete_cluster( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted cluster" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/clusters/create_cluster.py b/managedkafka/snippets/clusters/create_cluster.py new file mode 100644 index 00000000000..83e65eb8942 --- /dev/null +++ b/managedkafka/snippets/clusters/create_cluster.py @@ -0,0 +1,70 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_create_cluster] +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import managedkafka_v1 + + +def create_cluster( + project_id: str, + region: str, + cluster_id: str, + subnet: str, + cpu: int, + memory_bytes: int, +) -> None: + """ + Create a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + subnet: VPC subnet from which the cluster is accessible. + cpu: Number of vCPUs to provision for the cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the exception if the operation errors or + the timeout before the operation completes is reached. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + cluster = managedkafka_v1.Cluster() + cluster.name = client.cluster_path(project_id, region, cluster_id) + cluster.capacity_config.vcpu_count = cpu + cluster.capacity_config.memory_bytes = memory_bytes + cluster.gcp_config.access_config.network_configs.subnet = subnet + cluster.rebalance_config.mode = ( + managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP + ) + + request = managedkafka_v1.CreateClusterRequest( + parent=client.common_location_path(project_id, region), + cluster_id=cluster_id, + cluster=cluster, + ) + + try: + operation = client.create_cluster(request=request) + # The duration of this operation can vary considerably, typically taking 10-40 minutes. + response = operation.result() + print("Created cluster:", response) + except GoogleAPICallError: + print(operation.operation.error) + + +# [END managedkafka_create_cluster] diff --git a/managedkafka/snippets/clusters/delete_cluster.py b/managedkafka/snippets/clusters/delete_cluster.py new file mode 100644 index 00000000000..6dc3bfef504 --- /dev/null +++ b/managedkafka/snippets/clusters/delete_cluster.py @@ -0,0 +1,52 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_delete_cluster] +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import managedkafka_v1 + + +def delete_cluster( + project_id: str, + region: str, + cluster_id: str, +) -> None: + """ + Delete a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + + Raises: + This method will raise the exception if the operation errors or + the timeout before the operation completes is reached. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.DeleteClusterRequest( + name=client.cluster_path(project_id, region, cluster_id), + ) + + try: + operation = client.delete_cluster(request=request) + operation.result() + print("Deleted cluster") + except GoogleAPICallError: + print(operation.operation.error) + + +# [END managedkafka_delete_cluster] diff --git a/managedkafka/snippets/clusters/get_cluster.py b/managedkafka/snippets/clusters/get_cluster.py new file mode 100644 index 00000000000..b3f5e674bf4 --- /dev/null +++ b/managedkafka/snippets/clusters/get_cluster.py @@ -0,0 +1,51 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_get_cluster] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def get_cluster( + project_id: str, + region: str, + cluster_id: str, +) -> None: + """ + Get a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + + Raises: + This method will raise the exception if the cluster is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + cluster_path = client.cluster_path(project_id, region, cluster_id) + request = managedkafka_v1.GetClusterRequest( + name=cluster_path, + ) + + try: + cluster = client.get_cluster(request=request) + print("Got cluster:", cluster) + except NotFound: + print(f"Cluster {cluster_path} not found") + + +# [END managedkafka_get_cluster] diff --git a/managedkafka/snippets/clusters/list_clusters.py b/managedkafka/snippets/clusters/list_clusters.py new file mode 100644 index 00000000000..e38181719e0 --- /dev/null +++ b/managedkafka/snippets/clusters/list_clusters.py @@ -0,0 +1,42 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_list_clusters] +from google.cloud import managedkafka_v1 + + +def list_clusters( + project_id: str, + region: str, +) -> None: + """ + List Kafka clusters in a given project ID and region. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.ListClustersRequest( + parent=client.common_location_path(project_id, region), + ) + + response = client.list_clusters(request=request) + for cluster in response: + print("Got cluster:", cluster) + + +# [END managedkafka_list_clusters] diff --git a/managedkafka/snippets/clusters/update_cluster.py b/managedkafka/snippets/clusters/update_cluster.py new file mode 100644 index 00000000000..9ea1687ff92 --- /dev/null +++ b/managedkafka/snippets/clusters/update_cluster.py @@ -0,0 +1,59 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_update_cluster] +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import managedkafka_v1 +from google.protobuf import field_mask_pb2 + + +def update_cluster( + project_id: str, region: str, cluster_id: str, memory_bytes: int +) -> None: + """ + Update a Kafka cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + memory_bytes: The memory to provision for the cluster in bytes. + + Raises: + This method will raise the exception if the operation errors or + the timeout before the operation completes is reached. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + cluster = managedkafka_v1.Cluster() + cluster.name = client.cluster_path(project_id, region, cluster_id) + cluster.capacity_config.memory_bytes = memory_bytes + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("capacity_config.memory_bytes") + + request = managedkafka_v1.UpdateClusterRequest( + update_mask=update_mask, + cluster=cluster, + ) + + try: + operation = client.update_cluster(request=request) + response = operation.result() + print("Updated cluster:", response) + except GoogleAPICallError: + print(operation.operation.error) + + +# [END managedkafka_update_cluster] diff --git a/managedkafka/snippets/consumergroups/consumer_groups_test.py b/managedkafka/snippets/consumergroups/consumer_groups_test.py new file mode 100644 index 00000000000..8d7a3e5b0b8 --- /dev/null +++ b/managedkafka/snippets/consumergroups/consumer_groups_test.py @@ -0,0 +1,134 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock +from google.cloud import managedkafka_v1 + +import get_consumer_group +import update_consumer_group +import list_consumer_groups +import delete_consumer_group + +import pytest + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CLUSTER_ID = "test-cluster-id" +CONSUMER_GROUP_ID = "test-consumer-group-id" + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_consumer_group") +def test_get_consumer_group( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( + PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID + ) + mock_method.return_value = consumer_group + + get_consumer_group.get_consumer_group( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + consumer_group_id=CONSUMER_GROUP_ID, + ) + + out, _ = capsys.readouterr() + assert "Got consumer group" in out + assert CONSUMER_GROUP_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_consumer_group") +def test_update_consumer_group( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + new_partition_offsets = {10: 10} + topic_path = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, "test-topic-id" + ) + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( + PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID + ) + topic_metadata = managedkafka_v1.ConsumerTopicMetadata() + for partition, offset in new_partition_offsets.items(): + partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset) + topic_metadata.partitions = {partition: partition_metadata} + consumer_group.topics = { + topic_path: topic_metadata, + } + mock_method.return_value = consumer_group + + update_consumer_group.update_consumer_group( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + consumer_group_id=CONSUMER_GROUP_ID, + topic_path=topic_path, + partition_offsets=new_partition_offsets, + ) + + out, _ = capsys.readouterr() + assert "Updated consumer group" in out + assert topic_path in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_consumer_groups") +def test_list_consumer_groups( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( + PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID + ) + response = managedkafka_v1.ListConsumerGroupsResponse() + response.consumer_groups.append(consumer_group) + mock_method.return_value = [response] + + list_consumer_groups.list_consumer_groups( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got consumer group" in out + assert CONSUMER_GROUP_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_consumer_group") +def test_delete_consumer_group( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + mock_method.return_value = None + + delete_consumer_group.delete_consumer_group( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + consumer_group_id=CONSUMER_GROUP_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted consumer group" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/consumergroups/delete_consumer_group.py b/managedkafka/snippets/consumergroups/delete_consumer_group.py new file mode 100644 index 00000000000..b3102079493 --- /dev/null +++ b/managedkafka/snippets/consumergroups/delete_consumer_group.py @@ -0,0 +1,55 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_delete_consumergroup] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def delete_consumer_group( + project_id: str, + region: str, + cluster_id: str, + consumer_group_id: str, +) -> None: + """ + Delete a Kafka consumer group. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + consumer_group_id: ID of the Kafka consumer group. + + Raises: + This method will raise the exception if the consumer group is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + consumer_group_path = client.consumer_group_path( + project_id, region, cluster_id, consumer_group_id + ) + request = managedkafka_v1.DeleteConsumerGroupRequest( + name=consumer_group_path, + ) + + try: + client.delete_consumer_group(request=request) + print("Deleted consumer group") + except NotFound: + print(f"Consumer group {consumer_group_path} not found") + + +# [END managedkafka_delete_consumergroup] diff --git a/managedkafka/snippets/consumergroups/get_consumer_group.py b/managedkafka/snippets/consumergroups/get_consumer_group.py new file mode 100644 index 00000000000..7626cbbf850 --- /dev/null +++ b/managedkafka/snippets/consumergroups/get_consumer_group.py @@ -0,0 +1,55 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_get_consumergroup] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def get_consumer_group( + project_id: str, + region: str, + cluster_id: str, + consumer_group_id: str, +) -> None: + """ + Get a Kafka consumer group. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + consumer_group_id: ID of the Kafka consumer group. + + Raises: + This method will raise the exception if the consumer group is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + consumer_group_path = client.consumer_group_path( + project_id, region, cluster_id, consumer_group_id + ) + request = managedkafka_v1.GetConsumerGroupRequest( + name=consumer_group_path, + ) + + try: + consumer_group = client.get_consumer_group(request=request) + print("Got consumer group:", consumer_group) + except NotFound: + print(f"Consumer group {consumer_group_path} not found") + + +# [END managedkafka_get_consumergroup] diff --git a/managedkafka/snippets/consumergroups/list_consumer_groups.py b/managedkafka/snippets/consumergroups/list_consumer_groups.py new file mode 100644 index 00000000000..44e09f90bd5 --- /dev/null +++ b/managedkafka/snippets/consumergroups/list_consumer_groups.py @@ -0,0 +1,44 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_list_consumergroups] +from google.cloud import managedkafka_v1 + + +def list_consumer_groups( + project_id: str, + region: str, + cluster_id: str, +) -> None: + """ + List Kafka consumer groups in a cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.ListConsumerGroupsRequest( + parent=client.cluster_path(project_id, region, cluster_id), + ) + + response = client.list_consumer_groups(request=request) + for consumer_group in response: + print("Got consumer group:", consumer_group) + + +# [END managedkafka_list_consumergroups] diff --git a/managedkafka/snippets/consumergroups/update_consumer_group.py b/managedkafka/snippets/consumergroups/update_consumer_group.py new file mode 100644 index 00000000000..a841f361f21 --- /dev/null +++ b/managedkafka/snippets/consumergroups/update_consumer_group.py @@ -0,0 +1,74 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_update_consumergroup] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 +from google.protobuf import field_mask_pb2 + + +def update_consumer_group( + project_id: str, + region: str, + cluster_id: str, + consumer_group_id: str, + topic_path: str, + partition_offsets: dict[int, int], +) -> None: + """ + Update a Kafka consumer group. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + consumer_group_id: ID of the Kafka consumer group. + topic_path: Name of the Kafka topic. + partition_offsets: Configuration of the topics. + + Raises: + This method will raise the exception if the consumer group is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + consumer_group = managedkafka_v1.ConsumerGroup() + consumer_group.name = client.consumer_group_path( + project_id, region, cluster_id, consumer_group_id + ) + + topic_metadata = managedkafka_v1.ConsumerTopicMetadata() + for partition, offset in partition_offsets.items(): + partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset) + topic_metadata.partitions = {partition: partition_metadata} + consumer_group.topics = { + topic_path: topic_metadata, + } + + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("topics") + + request = managedkafka_v1.UpdateConsumerGroupRequest( + update_mask=update_mask, + consumer_group=consumer_group, + ) + + try: + response = client.update_consumer_group(request=request) + print("Updated consumer group:", response) + except NotFound: + print(f"Consumer group {consumer_group.name} not found") + + +# [END managedkafka_update_consumergroup] diff --git a/managedkafka/snippets/topics/create_topic.py b/managedkafka/snippets/topics/create_topic.py new file mode 100644 index 00000000000..9a5c13dae83 --- /dev/null +++ b/managedkafka/snippets/topics/create_topic.py @@ -0,0 +1,66 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_create_topic] +from google.api_core.exceptions import AlreadyExists +from google.cloud import managedkafka_v1 + + +def create_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, + partition_count: int, + replication_factor: int, + configs: dict[str, str], +) -> None: + """ + Create a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + partition_count: Number of partitions in a topic.. + replication_factor: Number of replicas of each partition. + configs: Configuration of the topic. + + Raises: + This method will raise the exception if the topic already exists. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic = managedkafka_v1.Topic() + topic.name = client.topic_path(project_id, region, cluster_id, topic_id) + topic.partition_count = partition_count + topic.replication_factor = replication_factor + topic.configs = configs + + request = managedkafka_v1.CreateTopicRequest( + parent=client.cluster_path(project_id, region, cluster_id), + topic_id=topic_id, + topic=topic, + ) + + try: + response = client.create_topic(request=request) + print("Created topic:", response.name) + except AlreadyExists: + print(f"{topic.name} already exists") + + +# [END managedkafka_create_topic] diff --git a/managedkafka/snippets/topics/delete_topic.py b/managedkafka/snippets/topics/delete_topic.py new file mode 100644 index 00000000000..e9390eb1ba0 --- /dev/null +++ b/managedkafka/snippets/topics/delete_topic.py @@ -0,0 +1,51 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_delete_topic] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def delete_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, +) -> None: + """ + Delete a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + + Raises: + This method will raise the exception if the topic is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic_path = client.topic_path(project_id, region, cluster_id, topic_id) + request = managedkafka_v1.DeleteTopicRequest(name=topic_path) + + try: + client.delete_topic(request=request) + print("Deleted topic") + except NotFound: + print(f"Topic {topic_path} not found") + + +# [END managedkafka_delete_topic] diff --git a/managedkafka/snippets/topics/get_topic.py b/managedkafka/snippets/topics/get_topic.py new file mode 100644 index 00000000000..82ab30e475a --- /dev/null +++ b/managedkafka/snippets/topics/get_topic.py @@ -0,0 +1,53 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_get_topic] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 + + +def get_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, +) -> None: + """ + Get a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + + Raises: + This method will raise the exception if the topic is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic_path = client.topic_path(project_id, region, cluster_id, topic_id) + request = managedkafka_v1.GetTopicRequest( + name=topic_path, + ) + + try: + topic = client.get_topic(request=request) + print("Got topic:", topic) + except NotFound: + print(f"Topic {topic_path} not found") + + +# [END managedkafka_get_topic] diff --git a/managedkafka/snippets/topics/list_topics.py b/managedkafka/snippets/topics/list_topics.py new file mode 100644 index 00000000000..141e775c420 --- /dev/null +++ b/managedkafka/snippets/topics/list_topics.py @@ -0,0 +1,44 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_list_topics] +from google.cloud import managedkafka_v1 + + +def list_topics( + project_id: str, + region: str, + cluster_id: str, +) -> None: + """ + List Kafka topics in a cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + request = managedkafka_v1.ListTopicsRequest( + parent=client.cluster_path(project_id, region, cluster_id), + ) + + response = client.list_topics(request=request) + for topic in response: + print("Got topic:", topic) + + +# [END managedkafka_list_topics] diff --git a/managedkafka/snippets/topics/topics_test.py b/managedkafka/snippets/topics/topics_test.py new file mode 100644 index 00000000000..e744c9aaebd --- /dev/null +++ b/managedkafka/snippets/topics/topics_test.py @@ -0,0 +1,162 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock +from google.cloud import managedkafka_v1 + +import create_topic +import get_topic +import update_topic +import list_topics +import delete_topic + +import pytest + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CLUSTER_ID = "test-cluster-id" +TOPIC_ID = "test-topic-id" + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.create_topic") +def test_create_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + partition_count = 10 + replication_factor = 3 + configs = {"min.insync.replicas": "1"} + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + topic.partition_count = partition_count + topic.replication_factor = replication_factor + topic.configs = configs + mock_method.return_value = topic + + create_topic.create_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + partition_count=partition_count, + replication_factor=replication_factor, + configs=configs, + ) + + out, _ = capsys.readouterr() + assert "Created topic" in out + assert TOPIC_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_topic") +def test_get_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + mock_method.return_value = topic + + get_topic.get_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + ) + + out, _ = capsys.readouterr() + assert "Got topic" in out + assert TOPIC_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_topic") +def test_update_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + new_partition_count = 20 + new_configs = {"min.insync.replicas": "2"} + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + topic.partition_count + topic.configs = new_configs + mock_method.return_value = topic + + update_topic.update_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + partition_count=new_partition_count, + configs=new_configs, + ) + + out, _ = capsys.readouterr() + assert "Updated topic" in out + assert TOPIC_ID in out + assert 'min.insync.replicas"\n value: "2"' in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_topics") +def test_list_topics( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + topic = managedkafka_v1.Topic() + topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( + PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID + ) + response = managedkafka_v1.ListTopicsResponse() + response.topics.append(topic) + mock_method.return_value = [response] + + list_topics.list_topics( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got topic" in out + assert TOPIC_ID in out + mock_method.assert_called_once() + + +@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_topic") +def test_delete_topic( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +): + mock_method.return_value = None + + delete_topic.delete_topic( + project_id=PROJECT_ID, + region=REGION, + cluster_id=CLUSTER_ID, + topic_id=TOPIC_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted topic" in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/topics/update_topic.py b/managedkafka/snippets/topics/update_topic.py new file mode 100644 index 00000000000..072fa85893f --- /dev/null +++ b/managedkafka/snippets/topics/update_topic.py @@ -0,0 +1,65 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START managedkafka_update_topic] +from google.api_core.exceptions import NotFound +from google.cloud import managedkafka_v1 +from google.protobuf import field_mask_pb2 + + +def update_topic( + project_id: str, + region: str, + cluster_id: str, + topic_id: str, + partition_count: int, + configs: dict[str, str], +) -> None: + """ + Update a Kafka topic. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + cluster_id: ID of the Kafka cluster. + topic_id: ID of the Kafka topic. + partition_count: Number of partitions in a topic.. + configs: Configuration of the topic. + + Raises: + This method will raise the exception if the topic is not found. + """ + + client = managedkafka_v1.ManagedKafkaClient() + + topic = managedkafka_v1.Topic() + topic.name = client.topic_path(project_id, region, cluster_id, topic_id) + topic.partition_count = partition_count + topic.configs = configs + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.extend(["partition_count", "configs"]) + + request = managedkafka_v1.UpdateTopicRequest( + update_mask=update_mask, + topic=topic, + ) + + try: + response = client.update_topic(request=request) + print("Updated topic:", response) + except NotFound: + print(f"Topic {topic.name} not found") + + +# [END managedkafka_update_topic] From 89e56918df7ee5a8fcbfd84005b0b1614ead910e Mon Sep 17 00:00:00 2001 From: Adam Luong Date: Tue, 2 Jul 2024 14:38:34 -0400 Subject: [PATCH 2/5] address checklist --- .github/CODEOWNERS | 1 + managedkafka/snippets/clusters/clusters_test.py | 10 ++++------ .../snippets/consumergroups/consumer_groups_test.py | 7 +++---- managedkafka/snippets/topics/topics_test.py | 7 +++---- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index bfe7beb6c0e..e65b720e89c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -92,6 +92,7 @@ /composer/**/* @GoogleCloudPlatform/cloud-dpes-composer @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers /pubsub/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers /pubsublite/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers +/managedkafka/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers /cloud_tasks/**/* @GoogleCloudPlatform/torus-dpe @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers # For practicing diff --git a/managedkafka/snippets/clusters/clusters_test.py b/managedkafka/snippets/clusters/clusters_test.py index 18d02aee7de..771d608f35f 100644 --- a/managedkafka/snippets/clusters/clusters_test.py +++ b/managedkafka/snippets/clusters/clusters_test.py @@ -14,17 +14,15 @@ from unittest import mock from unittest.mock import MagicMock -from google.cloud import managedkafka_v1 -from google.api_core.operation import Operation -from google.protobuf import message -import get_cluster import create_cluster import delete_cluster -import update_cluster +import get_cluster +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 import list_clusters - import pytest +import update_cluster PROJECT_ID = "test-project-id" REGION = "us-central1" diff --git a/managedkafka/snippets/consumergroups/consumer_groups_test.py b/managedkafka/snippets/consumergroups/consumer_groups_test.py index 8d7a3e5b0b8..16517a8d9e1 100644 --- a/managedkafka/snippets/consumergroups/consumer_groups_test.py +++ b/managedkafka/snippets/consumergroups/consumer_groups_test.py @@ -14,14 +14,13 @@ from unittest import mock from unittest.mock import MagicMock -from google.cloud import managedkafka_v1 +import delete_consumer_group import get_consumer_group -import update_consumer_group +from google.cloud import managedkafka_v1 import list_consumer_groups -import delete_consumer_group - import pytest +import update_consumer_group PROJECT_ID = "test-project-id" REGION = "us-central1" diff --git a/managedkafka/snippets/topics/topics_test.py b/managedkafka/snippets/topics/topics_test.py index e744c9aaebd..06a53117b78 100644 --- a/managedkafka/snippets/topics/topics_test.py +++ b/managedkafka/snippets/topics/topics_test.py @@ -14,15 +14,14 @@ from unittest import mock from unittest.mock import MagicMock -from google.cloud import managedkafka_v1 import create_topic +import delete_topic import get_topic -import update_topic +from google.cloud import managedkafka_v1 import list_topics -import delete_topic - import pytest +import update_topic PROJECT_ID = "test-project-id" REGION = "us-central1" From 3d41acecccb9ca0b83e9cf569000dd1fbfadbacc Mon Sep 17 00:00:00 2001 From: Adam Luong Date: Thu, 11 Jul 2024 16:10:37 -0400 Subject: [PATCH 3/5] Add requirements.txt. Address stylistic/organizational comments. Refactor get/list to return the values they are retrieving. --- managedkafka/snippets/clusters/clusters_test.py | 5 ++--- managedkafka/snippets/clusters/create_cluster.py | 5 +++-- managedkafka/snippets/clusters/get_cluster.py | 15 +++++---------- managedkafka/snippets/clusters/list_clusters.py | 6 +++++- managedkafka/snippets/clusters/update_cluster.py | 2 +- .../consumergroups/consumer_groups_test.py | 5 ++--- .../snippets/consumergroups/get_consumer_group.py | 15 +++++---------- .../consumergroups/list_consumer_groups.py | 5 ++++- .../consumergroups/update_consumer_group.py | 2 +- managedkafka/snippets/requirements.txt | 2 ++ managedkafka/snippets/topics/create_topic.py | 2 +- managedkafka/snippets/topics/get_topic.py | 15 +++++---------- managedkafka/snippets/topics/list_topics.py | 5 ++++- managedkafka/snippets/topics/topics_test.py | 5 ++--- managedkafka/snippets/topics/update_topic.py | 2 +- 15 files changed, 43 insertions(+), 48 deletions(-) create mode 100644 managedkafka/snippets/requirements.txt diff --git a/managedkafka/snippets/clusters/clusters_test.py b/managedkafka/snippets/clusters/clusters_test.py index 771d608f35f..9417283d42d 100644 --- a/managedkafka/snippets/clusters/clusters_test.py +++ b/managedkafka/snippets/clusters/clusters_test.py @@ -121,9 +121,8 @@ def test_list_clusters( cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path( PROJECT_ID, REGION, CLUSTER_ID ) - response = managedkafka_v1.ListClustersResponse() - response.clusters.append(cluster) - mock_method.return_value = [response] + response = [cluster] + mock_method.return_value = response list_clusters.list_clusters( project_id=PROJECT_ID, diff --git a/managedkafka/snippets/clusters/create_cluster.py b/managedkafka/snippets/clusters/create_cluster.py index 83e65eb8942..562c0dd0612 100644 --- a/managedkafka/snippets/clusters/create_cluster.py +++ b/managedkafka/snippets/clusters/create_cluster.py @@ -32,7 +32,7 @@ def create_cluster( project_id: Google Cloud project ID. region: Cloud region. cluster_id: ID of the Kafka cluster. - subnet: VPC subnet from which the cluster is accessible. + subnet: VPC subnet from which the cluster is accessible. The expected format is projects/{project_id}/regions{region}/subnetworks/{subnetwork}. cpu: Number of vCPUs to provision for the cluster. memory_bytes: The memory to provision for the cluster in bytes. @@ -59,8 +59,9 @@ def create_cluster( ) try: - operation = client.create_cluster(request=request) # The duration of this operation can vary considerably, typically taking 10-40 minutes. + # We can set a timeout of 3000s (50 minutes). + operation = client.create_cluster(request=request, timeout=3000) response = operation.result() print("Created cluster:", response) except GoogleAPICallError: diff --git a/managedkafka/snippets/clusters/get_cluster.py b/managedkafka/snippets/clusters/get_cluster.py index b3f5e674bf4..4f9f7096ef3 100644 --- a/managedkafka/snippets/clusters/get_cluster.py +++ b/managedkafka/snippets/clusters/get_cluster.py @@ -13,7 +13,6 @@ # limitations under the License. # [START managedkafka_get_cluster] -from google.api_core.exceptions import NotFound from google.cloud import managedkafka_v1 @@ -21,7 +20,7 @@ def get_cluster( project_id: str, region: str, cluster_id: str, -) -> None: +) -> managedkafka_v1.Cluster: """ Get a Kafka cluster. @@ -29,9 +28,6 @@ def get_cluster( project_id: Google Cloud project ID. region: Cloud region. cluster_id: ID of the Kafka cluster. - - Raises: - This method will raise the exception if the cluster is not found. """ client = managedkafka_v1.ManagedKafkaClient() @@ -41,11 +37,10 @@ def get_cluster( name=cluster_path, ) - try: - cluster = client.get_cluster(request=request) - print("Got cluster:", cluster) - except NotFound: - print(f"Cluster {cluster_path} not found") + cluster = client.get_cluster(request=request) + print("Got cluster:", cluster) + + return cluster # [END managedkafka_get_cluster] diff --git a/managedkafka/snippets/clusters/list_clusters.py b/managedkafka/snippets/clusters/list_clusters.py index e38181719e0..643df8ad060 100644 --- a/managedkafka/snippets/clusters/list_clusters.py +++ b/managedkafka/snippets/clusters/list_clusters.py @@ -13,13 +13,15 @@ # limitations under the License. # [START managedkafka_list_clusters] +from typing import List + from google.cloud import managedkafka_v1 def list_clusters( project_id: str, region: str, -) -> None: +) -> List[str]: """ List Kafka clusters in a given project ID and region. @@ -38,5 +40,7 @@ def list_clusters( for cluster in response: print("Got cluster:", cluster) + return [cluster.name for cluster in response] + # [END managedkafka_list_clusters] diff --git a/managedkafka/snippets/clusters/update_cluster.py b/managedkafka/snippets/clusters/update_cluster.py index 9ea1687ff92..fe5206bab02 100644 --- a/managedkafka/snippets/clusters/update_cluster.py +++ b/managedkafka/snippets/clusters/update_cluster.py @@ -22,7 +22,7 @@ def update_cluster( project_id: str, region: str, cluster_id: str, memory_bytes: int ) -> None: """ - Update a Kafka cluster. + Update a Kafka cluster. For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties. Args: project_id: Google Cloud project ID. diff --git a/managedkafka/snippets/consumergroups/consumer_groups_test.py b/managedkafka/snippets/consumergroups/consumer_groups_test.py index 16517a8d9e1..8467470bcdf 100644 --- a/managedkafka/snippets/consumergroups/consumer_groups_test.py +++ b/managedkafka/snippets/consumergroups/consumer_groups_test.py @@ -98,9 +98,8 @@ def test_list_consumer_groups( consumer_group.name = managedkafka_v1.ManagedKafkaClient.consumer_group_path( PROJECT_ID, REGION, CLUSTER_ID, CONSUMER_GROUP_ID ) - response = managedkafka_v1.ListConsumerGroupsResponse() - response.consumer_groups.append(consumer_group) - mock_method.return_value = [response] + response = [consumer_group] + mock_method.return_value = response list_consumer_groups.list_consumer_groups( project_id=PROJECT_ID, diff --git a/managedkafka/snippets/consumergroups/get_consumer_group.py b/managedkafka/snippets/consumergroups/get_consumer_group.py index 7626cbbf850..b293899aa35 100644 --- a/managedkafka/snippets/consumergroups/get_consumer_group.py +++ b/managedkafka/snippets/consumergroups/get_consumer_group.py @@ -13,7 +13,6 @@ # limitations under the License. # [START managedkafka_get_consumergroup] -from google.api_core.exceptions import NotFound from google.cloud import managedkafka_v1 @@ -22,7 +21,7 @@ def get_consumer_group( region: str, cluster_id: str, consumer_group_id: str, -) -> None: +) -> managedkafka_v1.ConsumerGroup: """ Get a Kafka consumer group. @@ -31,9 +30,6 @@ def get_consumer_group( region: Cloud region. cluster_id: ID of the Kafka cluster. consumer_group_id: ID of the Kafka consumer group. - - Raises: - This method will raise the exception if the consumer group is not found. """ client = managedkafka_v1.ManagedKafkaClient() @@ -45,11 +41,10 @@ def get_consumer_group( name=consumer_group_path, ) - try: - consumer_group = client.get_consumer_group(request=request) - print("Got consumer group:", consumer_group) - except NotFound: - print(f"Consumer group {consumer_group_path} not found") + consumer_group = client.get_consumer_group(request=request) + print("Got consumer group:", consumer_group) + + return consumer_group # [END managedkafka_get_consumergroup] diff --git a/managedkafka/snippets/consumergroups/list_consumer_groups.py b/managedkafka/snippets/consumergroups/list_consumer_groups.py index 44e09f90bd5..bf2bb78acf0 100644 --- a/managedkafka/snippets/consumergroups/list_consumer_groups.py +++ b/managedkafka/snippets/consumergroups/list_consumer_groups.py @@ -13,6 +13,8 @@ # limitations under the License. # [START managedkafka_list_consumergroups] +from typing import List + from google.cloud import managedkafka_v1 @@ -20,7 +22,7 @@ def list_consumer_groups( project_id: str, region: str, cluster_id: str, -) -> None: +) -> List[str]: """ List Kafka consumer groups in a cluster. @@ -40,5 +42,6 @@ def list_consumer_groups( for consumer_group in response: print("Got consumer group:", consumer_group) + return [consumer_group.name for consumer_group in response] # [END managedkafka_list_consumergroups] diff --git a/managedkafka/snippets/consumergroups/update_consumer_group.py b/managedkafka/snippets/consumergroups/update_consumer_group.py index a841f361f21..75c2feeb8a2 100644 --- a/managedkafka/snippets/consumergroups/update_consumer_group.py +++ b/managedkafka/snippets/consumergroups/update_consumer_group.py @@ -27,7 +27,7 @@ def update_consumer_group( partition_offsets: dict[int, int], ) -> None: """ - Update a Kafka consumer group. + Update a single partition's offset in a Kafka consumer group. Args: project_id: Google Cloud project ID. diff --git a/managedkafka/snippets/requirements.txt b/managedkafka/snippets/requirements.txt new file mode 100644 index 00000000000..3b44d914c72 --- /dev/null +++ b/managedkafka/snippets/requirements.txt @@ -0,0 +1,2 @@ +protobuf==5.27.2 +pytest==8.2.2 diff --git a/managedkafka/snippets/topics/create_topic.py b/managedkafka/snippets/topics/create_topic.py index 9a5c13dae83..f758d5624f5 100644 --- a/managedkafka/snippets/topics/create_topic.py +++ b/managedkafka/snippets/topics/create_topic.py @@ -36,7 +36,7 @@ def create_topic( topic_id: ID of the Kafka topic. partition_count: Number of partitions in a topic.. replication_factor: Number of replicas of each partition. - configs: Configuration of the topic. + configs: Configuration of the topic. For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs. Raises: This method will raise the exception if the topic already exists. diff --git a/managedkafka/snippets/topics/get_topic.py b/managedkafka/snippets/topics/get_topic.py index 82ab30e475a..9f69410b895 100644 --- a/managedkafka/snippets/topics/get_topic.py +++ b/managedkafka/snippets/topics/get_topic.py @@ -13,7 +13,6 @@ # limitations under the License. # [START managedkafka_get_topic] -from google.api_core.exceptions import NotFound from google.cloud import managedkafka_v1 @@ -22,7 +21,7 @@ def get_topic( region: str, cluster_id: str, topic_id: str, -) -> None: +) -> managedkafka_v1.Topic: """ Get a Kafka topic. @@ -31,9 +30,6 @@ def get_topic( region: Cloud region. cluster_id: ID of the Kafka cluster. topic_id: ID of the Kafka topic. - - Raises: - This method will raise the exception if the topic is not found. """ client = managedkafka_v1.ManagedKafkaClient() @@ -43,11 +39,10 @@ def get_topic( name=topic_path, ) - try: - topic = client.get_topic(request=request) - print("Got topic:", topic) - except NotFound: - print(f"Topic {topic_path} not found") + topic = client.get_topic(request=request) + print("Got topic:", topic) + + return topic # [END managedkafka_get_topic] diff --git a/managedkafka/snippets/topics/list_topics.py b/managedkafka/snippets/topics/list_topics.py index 141e775c420..2d0ade0e0e0 100644 --- a/managedkafka/snippets/topics/list_topics.py +++ b/managedkafka/snippets/topics/list_topics.py @@ -13,6 +13,8 @@ # limitations under the License. # [START managedkafka_list_topics] +from typing import List + from google.cloud import managedkafka_v1 @@ -20,7 +22,7 @@ def list_topics( project_id: str, region: str, cluster_id: str, -) -> None: +) -> List[str]: """ List Kafka topics in a cluster. @@ -40,5 +42,6 @@ def list_topics( for topic in response: print("Got topic:", topic) + return [topic.name for topic in response] # [END managedkafka_list_topics] diff --git a/managedkafka/snippets/topics/topics_test.py b/managedkafka/snippets/topics/topics_test.py index 06a53117b78..62bd11fc4f0 100644 --- a/managedkafka/snippets/topics/topics_test.py +++ b/managedkafka/snippets/topics/topics_test.py @@ -126,9 +126,8 @@ def test_list_topics( topic.name = managedkafka_v1.ManagedKafkaClient.topic_path( PROJECT_ID, REGION, CLUSTER_ID, TOPIC_ID ) - response = managedkafka_v1.ListTopicsResponse() - response.topics.append(topic) - mock_method.return_value = [response] + response = [topic] + mock_method.return_value = response list_topics.list_topics( project_id=PROJECT_ID, diff --git a/managedkafka/snippets/topics/update_topic.py b/managedkafka/snippets/topics/update_topic.py index 072fa85893f..8ee71960eb3 100644 --- a/managedkafka/snippets/topics/update_topic.py +++ b/managedkafka/snippets/topics/update_topic.py @@ -27,7 +27,7 @@ def update_topic( configs: dict[str, str], ) -> None: """ - Update a Kafka topic. + Update a Kafka topic. For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties. Args: project_id: Google Cloud project ID. From a7bcc50dc71ae695ed01cb5c17070b53eeb976a3 Mon Sep 17 00:00:00 2001 From: Adam Luong Date: Sun, 14 Jul 2024 20:21:21 -0400 Subject: [PATCH 4/5] address updating consumer group --- .../snippets/consumergroups/update_consumer_group.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managedkafka/snippets/consumergroups/update_consumer_group.py b/managedkafka/snippets/consumergroups/update_consumer_group.py index 75c2feeb8a2..44ba261ab93 100644 --- a/managedkafka/snippets/consumergroups/update_consumer_group.py +++ b/managedkafka/snippets/consumergroups/update_consumer_group.py @@ -35,7 +35,7 @@ def update_consumer_group( cluster_id: ID of the Kafka cluster. consumer_group_id: ID of the Kafka consumer group. topic_path: Name of the Kafka topic. - partition_offsets: Configuration of the topics. + partition_offsets: Configuration of the topic, represented as a map of partition indexes to their offset value. Raises: This method will raise the exception if the consumer group is not found. @@ -47,11 +47,11 @@ def update_consumer_group( consumer_group.name = client.consumer_group_path( project_id, region, cluster_id, consumer_group_id ) - + topic_metadata = managedkafka_v1.ConsumerTopicMetadata() for partition, offset in partition_offsets.items(): partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset) - topic_metadata.partitions = {partition: partition_metadata} + topic_metadata.partitions[partition] = partition_metadata consumer_group.topics = { topic_path: topic_metadata, } From 1b391fb6b7f682eb56501762def015eb9e09f676 Mon Sep 17 00:00:00 2001 From: Adam Luong Date: Sun, 14 Jul 2024 20:22:54 -0400 Subject: [PATCH 5/5] Remove blank space --- managedkafka/snippets/consumergroups/update_consumer_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/snippets/consumergroups/update_consumer_group.py b/managedkafka/snippets/consumergroups/update_consumer_group.py index 44ba261ab93..a255dcadd3a 100644 --- a/managedkafka/snippets/consumergroups/update_consumer_group.py +++ b/managedkafka/snippets/consumergroups/update_consumer_group.py @@ -47,7 +47,7 @@ def update_consumer_group( consumer_group.name = client.consumer_group_path( project_id, region, cluster_id, consumer_group_id ) - + topic_metadata = managedkafka_v1.ConsumerTopicMetadata() for partition, offset in partition_offsets.items(): partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)