Skip to content

Commit b97d17e

Browse files
authored
Managed kafka code samples (GoogleCloudPlatform#11994)
* samples(managedkafka): Add snippets for all API methods * address checklist * Add requirements.txt. Address stylistic/organizational comments. Refactor get/list to return the values they are retrieving. * address updating consumer group * Remove blank space
1 parent ebc9766 commit b97d17e

19 files changed

+1226
-0
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
/composer/**/* @GoogleCloudPlatform/cloud-dpes-composer @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
9393
/pubsub/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
9494
/pubsublite/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
95+
/managedkafka/**/* @GoogleCloudPlatform/api-pubsub-and-pubsublite @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
9596
/cloud_tasks/**/* @GoogleCloudPlatform/torus-dpe @GoogleCloudPlatform/python-samples-reviewers @GoogleCloudPlatform/cloud-samples-reviewers
9697

9798
# For practicing
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import mock
16+
from unittest.mock import MagicMock
17+
18+
import create_cluster
19+
import delete_cluster
20+
import get_cluster
21+
from google.api_core.operation import Operation
22+
from google.cloud import managedkafka_v1
23+
import list_clusters
24+
import pytest
25+
import update_cluster
26+
27+
PROJECT_ID = "test-project-id"
28+
REGION = "us-central1"
29+
CLUSTER_ID = "test-cluster-id"
30+
31+
32+
@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.create_cluster")
33+
def test_create_cluster(
34+
mock_method: MagicMock,
35+
capsys: pytest.CaptureFixture[str],
36+
):
37+
cpu = 3
38+
memory_bytes = 3221225472
39+
subnet = "test-subnet"
40+
operation = mock.MagicMock(spec=Operation)
41+
cluster = managedkafka_v1.Cluster()
42+
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
43+
PROJECT_ID, REGION, CLUSTER_ID
44+
)
45+
operation.result = mock.MagicMock(return_value=cluster)
46+
mock_method.return_value = operation
47+
48+
create_cluster.create_cluster(
49+
project_id=PROJECT_ID,
50+
region=REGION,
51+
cluster_id=CLUSTER_ID,
52+
subnet=subnet,
53+
cpu=cpu,
54+
memory_bytes=memory_bytes,
55+
)
56+
57+
out, _ = capsys.readouterr()
58+
assert "Created cluster" in out
59+
assert CLUSTER_ID in out
60+
mock_method.assert_called_once()
61+
62+
63+
@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.get_cluster")
64+
def test_get_cluster(
65+
mock_method: MagicMock,
66+
capsys: pytest.CaptureFixture[str],
67+
):
68+
cluster = managedkafka_v1.Cluster()
69+
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
70+
PROJECT_ID, REGION, CLUSTER_ID
71+
)
72+
mock_method.return_value = cluster
73+
74+
get_cluster.get_cluster(
75+
project_id=PROJECT_ID,
76+
region=REGION,
77+
cluster_id=CLUSTER_ID,
78+
)
79+
80+
out, _ = capsys.readouterr()
81+
assert "Got cluster" in out
82+
assert CLUSTER_ID in out
83+
mock_method.assert_called_once()
84+
85+
86+
@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.update_cluster")
87+
def test_update_cluster(
88+
mock_method: MagicMock,
89+
capsys: pytest.CaptureFixture[str],
90+
):
91+
new_memory_bytes = 3221225475
92+
operation = mock.MagicMock(spec=Operation)
93+
cluster = managedkafka_v1.Cluster()
94+
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
95+
PROJECT_ID, REGION, CLUSTER_ID
96+
)
97+
cluster.capacity_config.memory_bytes = new_memory_bytes
98+
operation.result = mock.MagicMock(return_value=cluster)
99+
mock_method.return_value = operation
100+
101+
update_cluster.update_cluster(
102+
project_id=PROJECT_ID,
103+
region=REGION,
104+
cluster_id=CLUSTER_ID,
105+
memory_bytes=new_memory_bytes,
106+
)
107+
108+
out, _ = capsys.readouterr()
109+
assert "Updated cluster" in out
110+
assert CLUSTER_ID in out
111+
assert str(new_memory_bytes) in out
112+
mock_method.assert_called_once()
113+
114+
115+
@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.list_clusters")
116+
def test_list_clusters(
117+
mock_method: MagicMock,
118+
capsys: pytest.CaptureFixture[str],
119+
):
120+
cluster = managedkafka_v1.Cluster()
121+
cluster.name = managedkafka_v1.ManagedKafkaClient.cluster_path(
122+
PROJECT_ID, REGION, CLUSTER_ID
123+
)
124+
response = [cluster]
125+
mock_method.return_value = response
126+
127+
list_clusters.list_clusters(
128+
project_id=PROJECT_ID,
129+
region=REGION,
130+
)
131+
132+
out, _ = capsys.readouterr()
133+
assert "Got cluster" in out
134+
assert CLUSTER_ID in out
135+
mock_method.assert_called_once()
136+
137+
138+
@mock.patch("google.cloud.managedkafka_v1.ManagedKafkaClient.delete_cluster")
139+
def test_delete_cluster(
140+
mock_method: MagicMock,
141+
capsys: pytest.CaptureFixture[str],
142+
):
143+
operation = mock.MagicMock(spec=Operation)
144+
mock_method.return_value = operation
145+
146+
delete_cluster.delete_cluster(
147+
project_id=PROJECT_ID,
148+
region=REGION,
149+
cluster_id=CLUSTER_ID,
150+
)
151+
152+
out, _ = capsys.readouterr()
153+
assert "Deleted cluster" in out
154+
mock_method.assert_called_once()
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START managedkafka_create_cluster]
16+
from google.api_core.exceptions import GoogleAPICallError
17+
from google.cloud import managedkafka_v1
18+
19+
20+
def create_cluster(
21+
project_id: str,
22+
region: str,
23+
cluster_id: str,
24+
subnet: str,
25+
cpu: int,
26+
memory_bytes: int,
27+
) -> None:
28+
"""
29+
Create a Kafka cluster.
30+
31+
Args:
32+
project_id: Google Cloud project ID.
33+
region: Cloud region.
34+
cluster_id: ID of the Kafka cluster.
35+
subnet: VPC subnet from which the cluster is accessible. The expected format is projects/{project_id}/regions{region}/subnetworks/{subnetwork}.
36+
cpu: Number of vCPUs to provision for the cluster.
37+
memory_bytes: The memory to provision for the cluster in bytes.
38+
39+
Raises:
40+
This method will raise the exception if the operation errors or
41+
the timeout before the operation completes is reached.
42+
"""
43+
44+
client = managedkafka_v1.ManagedKafkaClient()
45+
46+
cluster = managedkafka_v1.Cluster()
47+
cluster.name = client.cluster_path(project_id, region, cluster_id)
48+
cluster.capacity_config.vcpu_count = cpu
49+
cluster.capacity_config.memory_bytes = memory_bytes
50+
cluster.gcp_config.access_config.network_configs.subnet = subnet
51+
cluster.rebalance_config.mode = (
52+
managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
53+
)
54+
55+
request = managedkafka_v1.CreateClusterRequest(
56+
parent=client.common_location_path(project_id, region),
57+
cluster_id=cluster_id,
58+
cluster=cluster,
59+
)
60+
61+
try:
62+
# The duration of this operation can vary considerably, typically taking 10-40 minutes.
63+
# We can set a timeout of 3000s (50 minutes).
64+
operation = client.create_cluster(request=request, timeout=3000)
65+
response = operation.result()
66+
print("Created cluster:", response)
67+
except GoogleAPICallError:
68+
print(operation.operation.error)
69+
70+
71+
# [END managedkafka_create_cluster]
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START managedkafka_delete_cluster]
16+
from google.api_core.exceptions import GoogleAPICallError
17+
from google.cloud import managedkafka_v1
18+
19+
20+
def delete_cluster(
21+
project_id: str,
22+
region: str,
23+
cluster_id: str,
24+
) -> None:
25+
"""
26+
Delete a Kafka cluster.
27+
28+
Args:
29+
project_id: Google Cloud project ID.
30+
region: Cloud region.
31+
cluster_id: ID of the Kafka cluster.
32+
33+
Raises:
34+
This method will raise the exception if the operation errors or
35+
the timeout before the operation completes is reached.
36+
"""
37+
38+
client = managedkafka_v1.ManagedKafkaClient()
39+
40+
request = managedkafka_v1.DeleteClusterRequest(
41+
name=client.cluster_path(project_id, region, cluster_id),
42+
)
43+
44+
try:
45+
operation = client.delete_cluster(request=request)
46+
operation.result()
47+
print("Deleted cluster")
48+
except GoogleAPICallError:
49+
print(operation.operation.error)
50+
51+
52+
# [END managedkafka_delete_cluster]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START managedkafka_get_cluster]
16+
from google.cloud import managedkafka_v1
17+
18+
19+
def get_cluster(
20+
project_id: str,
21+
region: str,
22+
cluster_id: str,
23+
) -> managedkafka_v1.Cluster:
24+
"""
25+
Get a Kafka cluster.
26+
27+
Args:
28+
project_id: Google Cloud project ID.
29+
region: Cloud region.
30+
cluster_id: ID of the Kafka cluster.
31+
"""
32+
33+
client = managedkafka_v1.ManagedKafkaClient()
34+
35+
cluster_path = client.cluster_path(project_id, region, cluster_id)
36+
request = managedkafka_v1.GetClusterRequest(
37+
name=cluster_path,
38+
)
39+
40+
cluster = client.get_cluster(request=request)
41+
print("Got cluster:", cluster)
42+
43+
return cluster
44+
45+
46+
# [END managedkafka_get_cluster]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START managedkafka_list_clusters]
16+
from typing import List
17+
18+
from google.cloud import managedkafka_v1
19+
20+
21+
def list_clusters(
22+
project_id: str,
23+
region: str,
24+
) -> List[str]:
25+
"""
26+
List Kafka clusters in a given project ID and region.
27+
28+
Args:
29+
project_id: Google Cloud project ID.
30+
region: Cloud region.
31+
"""
32+
33+
client = managedkafka_v1.ManagedKafkaClient()
34+
35+
request = managedkafka_v1.ListClustersRequest(
36+
parent=client.common_location_path(project_id, region),
37+
)
38+
39+
response = client.list_clusters(request=request)
40+
for cluster in response:
41+
print("Got cluster:", cluster)
42+
43+
return [cluster.name for cluster in response]
44+
45+
46+
# [END managedkafka_list_clusters]

0 commit comments

Comments
 (0)