From 356757471d11d653acde5ca7463fd58287bec14b Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Fri, 17 Apr 2020 16:45:20 -0700 Subject: [PATCH 1/3] [iot] fix: use fixtures for resource setup/teardown fixes #2861 fixes #3323 --- .../cloudiot_mqtt_example_test.py | 356 +++++++++--------- .../mqtt_example/requirements-test.txt | 1 + 2 files changed, 170 insertions(+), 187 deletions(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index 4f57b31ca28..0d7966b8f20 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -17,16 +17,16 @@ import time import uuid +import backoff +from googleapiclient.errors import HttpError from google.cloud import pubsub +from google.api_core.exceptions import NotFound +import pytest # Add manager for bootstrapping device registry / device for testing sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa -from flaky import flaky -import manager - -import pytest - import cloudiot_mqtt_example +import manager cloud_region = 'us-central1' @@ -40,6 +40,8 @@ service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) + +# This format is used in the `../manager.py::clean_up_registries()`. registry_id = 'test-registry-{}-{}'.format(uuid.uuid1(), int(time.time())) mqtt_bridge_hostname = 'mqtt.googleapis.com' @@ -57,25 +59,143 @@ def test_topic(): pubsub_client.delete_topic(topic_path) -def test_event(test_topic, capsys): - device_id = device_id_template.format('RSA256') - manager.open_registry( +@pytest.fixture(scope='module') +def test_registry_id(): + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_registry(): + manager.open_registry( service_account_json, project_id, cloud_region, pubsub_topic, registry_id) - manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, + create_registry() + + yield registry_id + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_registry(): + try: + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + except NotFound as e: + # We ignore this case. + print("The registry doesn't exist: detail: {}".format(str(e))) + + delete_registry() + + +@pytest.fixture(scope='module') +def rsa256_device_id(test_registry_id): + device_id = device_id_template.format('RSA256') + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_device(): + manager.create_rs256_device( + service_account_json, project_id, cloud_region, test_registry_id, device_id, rsa_cert_path) - manager.get_device( + create_device() + + yield device_id + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_device(): + try: + manager.delete_device( + service_account_json, project_id, cloud_region, + test_registry_id, device_id) + except NotFound as e: + # We ignore this case. + print("The device doesn't exist: detail: {}".format(str(e))) + + delete_device() + + +@pytest.fixture(scope='module') +def device_and_gateways(): + device_id = device_id_template.format('noauthbind') + gateway_id = device_id_template.format('RS256') + bad_gateway_id = device_id_template.format('RS256-err') + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_device(): + manager.create_device( service_account_json, project_id, cloud_region, registry_id, device_id) + create_device() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_gateways(): + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_gateway( + service_account_json, project_id, cloud_region, registry_id, + None, bad_gateway_id, rsa_cert_path, 'RS256') + + create_gateways() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def bind_device_to_gateways(): + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, bad_gateway_id) + + bind_device_to_gateways() + + yield (device_id, gateway_id, bad_gateway_id) + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def unbind(): + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, gateway_id) + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, registry_id, + device_id, bad_gateway_id) + + unbind() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_device(): + try: + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + device_id) + except NotFound as e: + # We ignore this case. + print("The device doesn't exist: detail: {}".format(str(e))) + + delete_device() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_gateways(): + try: + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + gateway_id) + manager.delete_device( + service_account_json, project_id, cloud_region, registry_id, + bad_gateway_id) + except NotFound as e: + # We ignore this case. + print("The gateway doesn't exist: detail: {}".format(str(e))) + + delete_gateways() + + +def test_event(test_topic, test_registry_id, rsa256_device_id, capsys): + manager.get_device( + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id) sub_topic = 'events' - mqtt_topic = '/devices/{}/{}'.format(device_id, sub_topic) + mqtt_topic = '/devices/{}/{}'.format(rsa256_device_id, sub_topic) client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, registry_id, device_id, + project_id, cloud_region, test_registry_id, rsa256_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) @@ -85,39 +205,23 @@ def test_event(test_topic, capsys): client.loop_stop() manager.get_state( - service_account_json, project_id, cloud_region, registry_id, - device_id) - - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id) out, _ = capsys.readouterr() assert 'on_publish' in out -def test_state(test_topic, capsys): - device_id = device_id_template.format('RSA256') - manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - - manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) - +def test_state(test_topic, test_registry_id, rsa256_device_id, capsys): manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id) sub_topic = 'state' - mqtt_topic = '/devices/{}/{}'.format(device_id, sub_topic) + mqtt_topic = '/devices/{}/{}'.format(rsa256_device_id, sub_topic) client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, registry_id, device_id, + project_id, cloud_region, test_registry_id, rsa256_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.publish(mqtt_topic, 'state test', qos=1) @@ -128,37 +232,21 @@ def test_state(test_topic, capsys): client.loop_stop() manager.get_state( - service_account_json, project_id, cloud_region, registry_id, - device_id) - - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id) out, _ = capsys.readouterr() assert 'on_publish' in out assert 'binary_data: "state test"' in out -def test_config(test_topic, capsys): - device_id = device_id_template.format('RSA256') - manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - - manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) - +def test_config(test_topic, test_registry_id, rsa256_device_id, capsys): manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id) client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, registry_id, device_id, + project_id, cloud_region, test_registry_id, rsa256_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.loop_start() @@ -168,43 +256,19 @@ def test_config(test_topic, capsys): client.loop_stop() manager.get_state( - service_account_json, project_id, cloud_region, registry_id, - device_id) - - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id) out, _ = capsys.readouterr() assert "Received message" in out - assert '/devices/{}/config'.format(device_id) in out - - -@flaky(max_runs=5, min_passes=1) -def test_receive_command(capsys): - device_id = device_id_template.format('RSA256') - manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + assert '/devices/{}/config'.format(rsa256_device_id) in out - exists = False - devices = manager.list_devices( - service_account_json, project_id, cloud_region, registry_id) - for device in devices: - if device.get('id') == device_id: - exists = True - - if not exists: - manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) +@pytest.mark.flaky(max_runs=5, min_passes=1) +def test_receive_command(test_registry_id, rsa256_device_id, capsys): # Exercize the functionality client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, registry_id, device_id, + project_id, cloud_region, test_registry_id, rsa256_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.loop_start() @@ -215,42 +279,23 @@ def test_receive_command(capsys): time.sleep(1) manager.send_command( - service_account_json, project_id, cloud_region, registry_id, - device_id, 'me want cookies') + service_account_json, project_id, cloud_region, test_registry_id, + rsa256_device_id, 'me want cookies') # Process commands for i in range(1, 5): client.loop() time.sleep(1) - # Clean up - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) - out, _ = capsys.readouterr() assert 'on_connect' in out # Verify can connect assert '\'me want cookies\'' in out # Verify can receive command -@flaky(max_runs=5, min_passes=1) -def test_gateway_listen_for_bound_device_configs(test_topic, capsys): - gateway_id = device_id_template.format('RS256') - device_id = device_id_template.format('noauthbind') - manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') - manager.create_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.bind_device_to_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) +@pytest.mark.flaky(max_runs=5, min_passes=1) +def test_gateway_listen_for_bound_device_configs( + test_topic, test_registry_id, device_and_gateways, capsys): + (device_id, gateway_id, _) = device_and_gateways # Setup for listening for config messages num_messages = 0 @@ -259,44 +304,19 @@ def test_gateway_listen_for_bound_device_configs(test_topic, capsys): # Connect the gateway cloudiot_mqtt_example.listen_for_messages( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id, num_messages, rsa_private_path, - 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, - jwt_exp_time, listen_time, None) - - # Clean up - manager.unbind_device_from_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, test_registry_id, + device_id, gateway_id, num_messages, rsa_private_path, 'RS256', + ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, jwt_exp_time, + listen_time, None) out, _ = capsys.readouterr() assert 'Received message' in out -@flaky(max_runs=5, min_passes=1) -def test_gateway_send_data_for_device(test_topic, capsys): - gateway_id = device_id_template.format('RS256') - device_id = device_id_template.format('noauthbind') - manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') - manager.create_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.bind_device_to_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) +@pytest.mark.flaky(max_runs=5, min_passes=1) +def test_gateway_send_data_for_device( + test_topic, test_registry_id, device_and_gateways, capsys): + (device_id, gateway_id, _) = device_and_gateways # Setup for listening for config messages num_messages = 5 @@ -310,39 +330,14 @@ def test_gateway_send_data_for_device(test_topic, capsys): 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, jwt_exp_time, listen_time) - # Clean up - manager.unbind_device_from_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) - out, _ = capsys.readouterr() assert 'Publishing message 5/5' in out assert 'Out of memory' not in out # Indicates could not connect -def test_gateway_trigger_error_topic(test_topic, capsys): - gateway_id = device_id_template.format('RS256-err') - device_id = device_id_template.format('noauthbind') - manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') - manager.create_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.bind_device_to_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) +def test_gateway_trigger_error_topic( + test_topic, test_registry_id, device_and_gateways, capsys): + (device_id, _, gateway_id) = device_and_gateways # Setup for listening for config messages num_messages = 4 @@ -364,18 +359,5 @@ def trigger_error(client): 'RS256', ca_cert_path, 'mqtt.googleapis.com', 8883, 20, 15, trigger_error) - # Clean up - manager.unbind_device_from_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) - out, _ = capsys.readouterr() assert 'GATEWAY_ATTACHMENT_ERROR' in out diff --git a/iot/api-client/mqtt_example/requirements-test.txt b/iot/api-client/mqtt_example/requirements-test.txt index 781d4326c94..132ae92cb32 100644 --- a/iot/api-client/mqtt_example/requirements-test.txt +++ b/iot/api-client/mqtt_example/requirements-test.txt @@ -1 +1,2 @@ pytest==5.3.2 +backoff==1.10.0 From f2c71b39efe82df23af22ac9e980815eed04e97f Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Sat, 18 Apr 2020 17:42:34 -0700 Subject: [PATCH 2/3] avoid topic name conflict --- .../cloudiot_mqtt_example_test.py | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index 0d7966b8f20..fc0d5ace5f3 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -20,11 +20,12 @@ import backoff from googleapiclient.errors import HttpError from google.cloud import pubsub +from google.api_core.exceptions import AlreadyExists from google.api_core.exceptions import NotFound import pytest # Add manager for bootstrapping device registry / device for testing -sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa import cloudiot_mqtt_example import manager @@ -34,7 +35,7 @@ ca_cert_path = 'resources/roots.pem' rsa_cert_path = 'resources/rsa_cert.pem' rsa_private_path = 'resources/rsa_private.pem' -topic_id = 'test-device-events-{}'.format(int(time.time())) +topic_id = 'test-device-events-{}'.format(uuid.uuid4()) project_id = os.environ['GCLOUD_PROJECT'] service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] @@ -42,7 +43,7 @@ pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) # This format is used in the `../manager.py::clean_up_registries()`. -registry_id = 'test-registry-{}-{}'.format(uuid.uuid1(), int(time.time())) +registry_id = 'test-registry-{}-{}'.format(uuid.uuid4(), int(time.time())) mqtt_bridge_hostname = 'mqtt.googleapis.com' mqtt_bridge_port = 443 @@ -50,13 +51,29 @@ @pytest.fixture(scope='module') def test_topic(): - topic = manager.create_iot_topic(project_id, topic_id) + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_topic(): + try: + return manager.create_iot_topic(project_id, topic_id) + except AlreadyExists as e: + # We ignore this case. + print("The topic already exists, detail: {}".format(str(e))) + + topic = create_topic() yield topic pubsub_client = pubsub.PublisherClient() topic_path = pubsub_client.topic_path(project_id, topic_id) - pubsub_client.delete_topic(topic_path) + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_topic(): + try: + pubsub_client.delete_topic(topic_path) + except NotFound as e: + # We ignore this case. + print("The topic doesn't exist: detail: {}".format(str(e))) + + delete_topic() @pytest.fixture(scope='module') From b6686e2f4cc4df7fd36e3116484be3238b3a752d Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Sat, 18 Apr 2020 20:58:45 -0700 Subject: [PATCH 3/3] refactored out the fixtures * share the fixtures within two tests * correct dependencies between fixtures * use threading for cordinating between background and foreground threads. --- .../cloudiot_mqtt_example_test.py | 238 +++--------------- .../mqtt_example/cloudiot_mqtt_image.py | 17 +- .../mqtt_example/cloudiot_mqtt_image_test.py | 107 ++------ iot/api-client/mqtt_example/fixtures.py | 218 ++++++++++++++++ 4 files changed, 285 insertions(+), 295 deletions(-) create mode 100644 iot/api-client/mqtt_example/fixtures.py diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py index fc0d5ace5f3..50b6accd413 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_example_test.py @@ -15,204 +15,38 @@ import os import sys import time -import uuid -import backoff -from googleapiclient.errors import HttpError -from google.cloud import pubsub -from google.api_core.exceptions import AlreadyExists -from google.api_core.exceptions import NotFound import pytest # Add manager for bootstrapping device registry / device for testing sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa import cloudiot_mqtt_example import manager - +from fixtures import test_topic # noqa +from fixtures import test_registry_id # noqa +from fixtures import test_device_id # noqa +from fixtures import device_and_gateways # noqa cloud_region = 'us-central1' -device_id_template = 'test-device-{}' ca_cert_path = 'resources/roots.pem' -rsa_cert_path = 'resources/rsa_cert.pem' rsa_private_path = 'resources/rsa_private.pem' -topic_id = 'test-device-events-{}'.format(uuid.uuid4()) - project_id = os.environ['GCLOUD_PROJECT'] service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] -pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) - -# This format is used in the `../manager.py::clean_up_registries()`. -registry_id = 'test-registry-{}-{}'.format(uuid.uuid4(), int(time.time())) - mqtt_bridge_hostname = 'mqtt.googleapis.com' mqtt_bridge_port = 443 -@pytest.fixture(scope='module') -def test_topic(): - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def create_topic(): - try: - return manager.create_iot_topic(project_id, topic_id) - except AlreadyExists as e: - # We ignore this case. - print("The topic already exists, detail: {}".format(str(e))) - - topic = create_topic() - - yield topic - - pubsub_client = pubsub.PublisherClient() - topic_path = pubsub_client.topic_path(project_id, topic_id) - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def delete_topic(): - try: - pubsub_client.delete_topic(topic_path) - except NotFound as e: - # We ignore this case. - print("The topic doesn't exist: detail: {}".format(str(e))) - - delete_topic() - - -@pytest.fixture(scope='module') -def test_registry_id(): - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def create_registry(): - manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - - create_registry() - - yield registry_id - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def delete_registry(): - try: - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) - except NotFound as e: - # We ignore this case. - print("The registry doesn't exist: detail: {}".format(str(e))) - - delete_registry() - - -@pytest.fixture(scope='module') -def rsa256_device_id(test_registry_id): - device_id = device_id_template.format('RSA256') - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def create_device(): - manager.create_rs256_device( - service_account_json, project_id, cloud_region, test_registry_id, - device_id, rsa_cert_path) - - create_device() - - yield device_id - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def delete_device(): - try: - manager.delete_device( - service_account_json, project_id, cloud_region, - test_registry_id, device_id) - except NotFound as e: - # We ignore this case. - print("The device doesn't exist: detail: {}".format(str(e))) - - delete_device() - - -@pytest.fixture(scope='module') -def device_and_gateways(): - device_id = device_id_template.format('noauthbind') - gateway_id = device_id_template.format('RS256') - bad_gateway_id = device_id_template.format('RS256-err') - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def create_device(): - manager.create_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - create_device() - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def create_gateways(): - manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') - manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, bad_gateway_id, rsa_cert_path, 'RS256') - - create_gateways() - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def bind_device_to_gateways(): - manager.bind_device_to_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) - manager.bind_device_to_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, bad_gateway_id) - - bind_device_to_gateways() - - yield (device_id, gateway_id, bad_gateway_id) - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def unbind(): - manager.unbind_device_from_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) - manager.unbind_device_from_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, bad_gateway_id) - - unbind() - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def delete_device(): - try: - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - except NotFound as e: - # We ignore this case. - print("The device doesn't exist: detail: {}".format(str(e))) - - delete_device() - - @backoff.on_exception(backoff.expo, HttpError, max_time=60) - def delete_gateways(): - try: - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - bad_gateway_id) - except NotFound as e: - # We ignore this case. - print("The gateway doesn't exist: detail: {}".format(str(e))) - - delete_gateways() - - -def test_event(test_topic, test_registry_id, rsa256_device_id, capsys): +def test_event(test_topic, test_registry_id, test_device_id, capsys): # noqa manager.get_device( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id) + test_device_id) sub_topic = 'events' - mqtt_topic = '/devices/{}/{}'.format(rsa256_device_id, sub_topic) + mqtt_topic = '/devices/{}/{}'.format(test_device_id, sub_topic) client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, test_registry_id, rsa256_device_id, + project_id, cloud_region, test_registry_id, test_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) @@ -223,22 +57,22 @@ def test_event(test_topic, test_registry_id, rsa256_device_id, capsys): manager.get_state( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id) + test_device_id) out, _ = capsys.readouterr() assert 'on_publish' in out -def test_state(test_topic, test_registry_id, rsa256_device_id, capsys): +def test_state(test_topic, test_registry_id, test_device_id, capsys): # noqa manager.get_device( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id) + test_device_id) sub_topic = 'state' - mqtt_topic = '/devices/{}/{}'.format(rsa256_device_id, sub_topic) + mqtt_topic = '/devices/{}/{}'.format(test_device_id, sub_topic) client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, test_registry_id, rsa256_device_id, + project_id, cloud_region, test_registry_id, test_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.publish(mqtt_topic, 'state test', qos=1) @@ -250,20 +84,20 @@ def test_state(test_topic, test_registry_id, rsa256_device_id, capsys): manager.get_state( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id) + test_device_id) out, _ = capsys.readouterr() assert 'on_publish' in out assert 'binary_data: "state test"' in out -def test_config(test_topic, test_registry_id, rsa256_device_id, capsys): +def test_config(test_topic, test_registry_id, test_device_id, capsys): # noqa manager.get_device( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id) + test_device_id) client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, test_registry_id, rsa256_device_id, + project_id, cloud_region, test_registry_id, test_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.loop_start() @@ -274,18 +108,18 @@ def test_config(test_topic, test_registry_id, rsa256_device_id, capsys): manager.get_state( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id) + test_device_id) out, _ = capsys.readouterr() assert "Received message" in out - assert '/devices/{}/config'.format(rsa256_device_id) in out + assert '/devices/{}/config'.format(test_device_id) in out @pytest.mark.flaky(max_runs=5, min_passes=1) -def test_receive_command(test_registry_id, rsa256_device_id, capsys): +def test_receive_command(test_registry_id, test_device_id, capsys): # noqa # Exercize the functionality client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, test_registry_id, rsa256_device_id, + project_id, cloud_region, test_registry_id, test_device_id, rsa_private_path, 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443) client.loop_start() @@ -297,7 +131,7 @@ def test_receive_command(test_registry_id, rsa256_device_id, capsys): manager.send_command( service_account_json, project_id, cloud_region, test_registry_id, - rsa256_device_id, 'me want cookies') + test_device_id, 'me want cookies') # Process commands for i in range(1, 5): @@ -311,7 +145,7 @@ def test_receive_command(test_registry_id, rsa256_device_id, capsys): @pytest.mark.flaky(max_runs=5, min_passes=1) def test_gateway_listen_for_bound_device_configs( - test_topic, test_registry_id, device_and_gateways, capsys): + test_topic, test_registry_id, device_and_gateways, capsys): # noqa (device_id, gateway_id, _) = device_and_gateways # Setup for listening for config messages @@ -332,7 +166,7 @@ def test_gateway_listen_for_bound_device_configs( @pytest.mark.flaky(max_runs=5, min_passes=1) def test_gateway_send_data_for_device( - test_topic, test_registry_id, device_and_gateways, capsys): + test_topic, test_registry_id, device_and_gateways, capsys): # noqa (device_id, gateway_id, _) = device_and_gateways # Setup for listening for config messages @@ -342,10 +176,10 @@ def test_gateway_send_data_for_device( # Connect the gateway cloudiot_mqtt_example.send_data_from_bound_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id, num_messages, rsa_private_path, - 'RS256', ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, - jwt_exp_time, listen_time) + service_account_json, project_id, cloud_region, test_registry_id, + device_id, gateway_id, num_messages, rsa_private_path, 'RS256', + ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port, jwt_exp_time, + listen_time) out, _ = capsys.readouterr() assert 'Publishing message 5/5' in out @@ -353,7 +187,7 @@ def test_gateway_send_data_for_device( def test_gateway_trigger_error_topic( - test_topic, test_registry_id, device_and_gateways, capsys): + test_topic, test_registry_id, device_and_gateways, capsys): # noqa (device_id, _, gateway_id) = device_and_gateways # Setup for listening for config messages @@ -365,16 +199,14 @@ def trigger_error(client): # Connect the gateway cloudiot_mqtt_example.listen_for_messages( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id, num_messages, rsa_private_path, - 'RS256', ca_cert_path, 'mqtt.googleapis.com', 443, - 20, 42, trigger_error) + service_account_json, project_id, cloud_region, test_registry_id, + device_id, gateway_id, num_messages, rsa_private_path, 'RS256', + ca_cert_path, 'mqtt.googleapis.com', 443, 20, 42, trigger_error) # Try to connect the gateway aagin on 8883 cloudiot_mqtt_example.listen_for_messages( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id, num_messages, rsa_private_path, - 'RS256', ca_cert_path, 'mqtt.googleapis.com', 8883, - 20, 15, trigger_error) + service_account_json, project_id, cloud_region, test_registry_id, + device_id, gateway_id, num_messages, rsa_private_path, 'RS256', + ca_cert_path, 'mqtt.googleapis.com', 8883, 20, 15, trigger_error) out, _ = capsys.readouterr() assert 'GATEWAY_ATTACHMENT_ERROR' in out diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_image.py b/iot/api-client/mqtt_example/cloudiot_mqtt_image.py index 41e4963c030..60d7ded3e25 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_image.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_image.py @@ -18,6 +18,7 @@ import io import os import sys +import threading import time from google.cloud import pubsub @@ -51,15 +52,18 @@ def transmit_image( # [END iot_mqtt_image] -def receive_image(project_id, sub_name, prefix, extension, duration): +def receive_image(project_id, subscription_path, prefix, extension, timeout): """Receieve images transmitted to a PubSub subscription.""" subscriber = pubsub.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, sub_name) global count count = 0 file_pattern = '{}-{}.{}' + # Set up a callback to acknowledge a message. This closes around an event + # so that it can signal that it is done and the main thread can continue. + job_done = threading.Event() + def callback(message): global count try: @@ -71,17 +75,18 @@ def callback(message): file_pattern.format(prefix, count, extension), 'wb') as f: f.write(image_data) message.ack() + # Signal to the main thread that we can exit. + job_done.set() except binascii.Error: message.ack() # To move forward if a message can't be processed subscriber.subscribe(subscription_path, callback=callback) - sleep_count = 0 print('Listening for messages on {}'.format(subscription_path)) - while sleep_count < duration: - time.sleep(1) - sleep_count = sleep_count + 1 + finished = job_done.wait(timeout=timeout) + if not finished: + print("No event received before the timeout.") def parse_command_line_args(): diff --git a/iot/api-client/mqtt_example/cloudiot_mqtt_image_test.py b/iot/api-client/mqtt_example/cloudiot_mqtt_image_test.py index bfebd4a99ce..2a41857982f 100644 --- a/iot/api-client/mqtt_example/cloudiot_mqtt_image_test.py +++ b/iot/api-client/mqtt_example/cloudiot_mqtt_image_test.py @@ -14,122 +14,57 @@ # limitations under the License. import os import sys -import time -import uuid - -from google.cloud import pubsub - -import pytest # Add manager as library -sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa import cloudiot_mqtt_image -from flaky import flaky import manager +from fixtures import test_topic # noqa +from fixtures import test_subscription # noqa +from fixtures import test_registry_id # noqa +from fixtures import test_device_id # noqa cloud_region = 'us-central1' -device_id_template = 'test-device-{}' ca_cert_path = 'resources/roots.pem' -rsa_cert_path = 'resources/rsa_cert.pem' rsa_private_path = 'resources/rsa_private.pem' -topic_id = 'test-device-events-{}'.format(int(time.time())) -subscription_name = 'test-device-images-{}'.format(int(time.time())) - project_id = os.environ['GCLOUD_PROJECT'] service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] - -pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) -registry_id = 'test-registry-{}-{}'.format(uuid.uuid1(), int(time.time())) - image_path = './resources/owlister_hootie.png' -mqtt_bridge_hostname = 'mqtt.googleapis.com' -mqtt_bridge_port = 443 - - -@pytest.fixture(scope='module') -def test_topic(): - topic = manager.create_iot_topic(project_id, topic_id) - - yield topic - - pubsub_client = pubsub.PublisherClient() - topic_path = pubsub_client.topic_path(project_id, topic_id) - pubsub_client.delete_topic(topic_path) - -def test_image(test_topic, capsys): +def test_image(test_topic, test_registry_id, test_device_id, capsys): # noqa """Send an inage to a device registry""" - device_id = device_id_template.format('RSA256') - manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - - manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) - manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, test_registry_id, + test_device_id) cloudiot_mqtt_image.transmit_image( - cloud_region, registry_id, device_id, rsa_private_path, ca_cert_path, - image_path, project_id, service_account_json) - - # Clean up - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + cloud_region, test_registry_id, test_device_id, rsa_private_path, + ca_cert_path, image_path, project_id, service_account_json) out, _ = capsys.readouterr() assert 'on_publish' in out -@flaky(max_runs=5, min_passes=1) -def test_image_recv(test_topic, capsys): +def test_image_recv( + test_topic, # noqa + test_subscription, # noqa + test_registry_id, # noqa + test_device_id, # noqa + capsys): """Transmit an image with IoT Core and receive it from PubSub""" - subscriber = pubsub.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path( - project_id, subscription_name) - - subscriber.create_subscription(subscription_path, topic_path) - time.sleep(10) - - device_id = device_id_template.format('RSA256') - manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) - - manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, test_registry_id, + test_device_id) cloudiot_mqtt_image.transmit_image( - cloud_region, registry_id, device_id, rsa_private_path, ca_cert_path, - image_path, project_id, service_account_json) - - time.sleep(10) + cloud_region, test_registry_id, test_device_id, rsa_private_path, + ca_cert_path, image_path, project_id, service_account_json) cloudiot_mqtt_image.receive_image( - project_id, subscription_name, 'test', 'png', 60) - - # Clean up - subscriber.delete_subscription(subscription_path) - - manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + project_id, test_subscription.name, 'test', 'png', 120) out, _ = capsys.readouterr() assert 'Received image' in out diff --git a/iot/api-client/mqtt_example/fixtures.py b/iot/api-client/mqtt_example/fixtures.py new file mode 100644 index 00000000000..4fc7be9aa7e --- /dev/null +++ b/iot/api-client/mqtt_example/fixtures.py @@ -0,0 +1,218 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys +import time +import uuid + +import backoff +from googleapiclient.errors import HttpError +from google.cloud import pubsub +from google.api_core.exceptions import AlreadyExists +from google.api_core.exceptions import NotFound +import pytest + +# Add manager as library +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'manager')) # noqa +import manager + + +cloud_region = 'us-central1' +device_id_template = 'test-device-{}' +rsa_cert_path = 'resources/rsa_cert.pem' +topic_id = 'test-device-events-{}'.format(uuid.uuid4()) +subscription_name = 'test-device-images-{}'.format(uuid.uuid4()) +project_id = os.environ['GCLOUD_PROJECT'] +service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] +registry_id = 'test-registry-{}-{}'.format(uuid.uuid4(), int(time.time())) + + +@pytest.fixture(scope='session') +def test_topic(): + pubsub_client = pubsub.PublisherClient() + try: + topic = manager.create_iot_topic(project_id, topic_id) + except AlreadyExists as e: + print("The topic already exists, detail: {}".format(str(e))) + # Ignore the error, fetch the topic + topic = pubsub_client.get_topic( + pubsub_client.topic_path(project_id, topic_id)) + + yield topic + + topic_path = pubsub_client.topic_path(project_id, topic_id) + try: + pubsub_client.delete_topic(topic_path) + except NotFound as e: + # We ignore this case. + print("The topic doesn't exist: detail: {}".format(str(e))) + + +@pytest.fixture(scope='session') +def test_subscription(test_topic): + subscriber = pubsub.SubscriberClient() + subscription_path = subscriber.subscription_path( + project_id, subscription_name) + + try: + subscription = subscriber.create_subscription( + subscription_path, test_topic.name) + except AlreadyExists as e: + print("The topic already exists, detail: {}".format(str(e))) + # Ignore the error, fetch the subscription + subscription = subscriber.get_subscription(subscription_path) + + yield subscription + + try: + subscriber.delete_subscription(subscription_path) + except NotFound as e: + # We ignore this case. + print("The subscription doesn't exist: detail: {}".format(str(e))) + + +@pytest.fixture(scope='session') +def test_registry_id(test_topic): + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_registry(): + manager.open_registry( + service_account_json, project_id, cloud_region, test_topic.name, + registry_id) + + create_registry() + + yield registry_id + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_registry(): + try: + manager.delete_registry( + service_account_json, project_id, cloud_region, registry_id) + except NotFound as e: + # We ignore this case. + print("The registry doesn't exist: detail: {}".format(str(e))) + + delete_registry() + + +@pytest.fixture(scope='session') +def test_device_id(test_registry_id): + device_id = device_id_template.format('RSA256') + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_device(): + try: + manager.create_rs256_device( + service_account_json, project_id, cloud_region, test_registry_id, + device_id, rsa_cert_path) + except AlreadyExists as e: + # We ignore this case. + print("The device already exists: detail: {}".format(str(e))) + + create_device() + + yield device_id + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_device(): + try: + manager.delete_device( + service_account_json, project_id, cloud_region, + test_registry_id, device_id) + except NotFound as e: + # We ignore this case. + print("The device doesn't exist: detail: {}".format(str(e))) + + delete_device() + + +@pytest.fixture(scope='module') +def device_and_gateways(test_registry_id): + device_id = device_id_template.format('noauthbind') + gateway_id = device_id_template.format('RS256') + bad_gateway_id = device_id_template.format('RS256-err') + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_device(): + manager.create_device( + service_account_json, project_id, cloud_region, test_registry_id, + device_id) + create_device() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def create_gateways(): + manager.create_gateway( + service_account_json, project_id, cloud_region, test_registry_id, + None, gateway_id, rsa_cert_path, 'RS256') + manager.create_gateway( + service_account_json, project_id, cloud_region, test_registry_id, + None, bad_gateway_id, rsa_cert_path, 'RS256') + + create_gateways() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def bind_device_to_gateways(): + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, test_registry_id, + device_id, gateway_id) + manager.bind_device_to_gateway( + service_account_json, project_id, cloud_region, test_registry_id, + device_id, bad_gateway_id) + + bind_device_to_gateways() + + yield (device_id, gateway_id, bad_gateway_id) + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def unbind(): + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, test_registry_id, + device_id, gateway_id) + manager.unbind_device_from_gateway( + service_account_json, project_id, cloud_region, test_registry_id, + device_id, bad_gateway_id) + + unbind() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_device(): + try: + manager.delete_device( + service_account_json, project_id, cloud_region, + test_registry_id, device_id) + except NotFound as e: + # We ignore this case. + print("The device doesn't exist: detail: {}".format(str(e))) + + delete_device() + + @backoff.on_exception(backoff.expo, HttpError, max_time=60) + def delete_gateways(): + try: + manager.delete_device( + service_account_json, project_id, cloud_region, + test_registry_id, gateway_id) + except NotFound as e: + # We ignore this case. + print("The gateway doesn't exist: detail: {}".format(str(e))) + try: + manager.delete_device( + service_account_json, project_id, cloud_region, + test_registry_id, bad_gateway_id) + except NotFound as e: + # We ignore this case. + print("The gateway doesn't exist: detail: {}".format(str(e))) + + delete_gateways()