diff --git a/google/cloud/tasks_v2/services/cloud_tasks/transports/base.py b/google/cloud/tasks_v2/services/cloud_tasks/transports/base.py index e6e49b63..58dfe560 100644 --- a/google/cloud/tasks_v2/services/cloud_tasks/transports/base.py +++ b/google/cloud/tasks_v2/services/cloud_tasks/transports/base.py @@ -106,7 +106,6 @@ def __init__( credentials, _ = google.auth.load_credentials_from_file( credentials_file, **scopes_kwargs, quota_project_id=quota_project_id ) - elif credentials is None: credentials, _ = google.auth.default( **scopes_kwargs, quota_project_id=quota_project_id diff --git a/google/cloud/tasks_v2beta2/services/cloud_tasks/transports/base.py b/google/cloud/tasks_v2beta2/services/cloud_tasks/transports/base.py index 493e32f2..28d7ef5b 100644 --- a/google/cloud/tasks_v2beta2/services/cloud_tasks/transports/base.py +++ b/google/cloud/tasks_v2beta2/services/cloud_tasks/transports/base.py @@ -106,7 +106,6 @@ def __init__( credentials, _ = google.auth.load_credentials_from_file( credentials_file, **scopes_kwargs, quota_project_id=quota_project_id ) - elif credentials is None: credentials, _ = google.auth.default( **scopes_kwargs, quota_project_id=quota_project_id diff --git a/google/cloud/tasks_v2beta3/services/cloud_tasks/transports/base.py b/google/cloud/tasks_v2beta3/services/cloud_tasks/transports/base.py index 8e7d3c1f..f62c8386 100644 --- a/google/cloud/tasks_v2beta3/services/cloud_tasks/transports/base.py +++ b/google/cloud/tasks_v2beta3/services/cloud_tasks/transports/base.py @@ -106,7 +106,6 @@ def __init__( credentials, _ = google.auth.load_credentials_from_file( credentials_file, **scopes_kwargs, quota_project_id=quota_project_id ) - elif credentials is None: credentials, _ = google.auth.default( **scopes_kwargs, quota_project_id=quota_project_id diff --git a/tests/unit/gapic/tasks_v2/test_cloud_tasks.py b/tests/unit/gapic/tasks_v2/test_cloud_tasks.py index 12dcbe41..3c511e52 100644 --- a/tests/unit/gapic/tasks_v2/test_cloud_tasks.py +++ b/tests/unit/gapic/tasks_v2/test_cloud_tasks.py @@ -251,20 +251,20 @@ def test_cloud_tasks_client_client_options( # unsupported value. with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "Unsupported"}): with pytest.raises(MutualTLSChannelError): - client = client_class() + client = client_class(transport=transport_name) # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. with mock.patch.dict( os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} ): with pytest.raises(ValueError): - client = client_class() + client = client_class(transport=transport_name) # Check the case quota_project_id is provided options = client_options.ClientOptions(quota_project_id="octopus") with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file=None, @@ -321,7 +321,7 @@ def test_cloud_tasks_client_mtls_env_auto( ) with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) if use_client_cert_env == "false": expected_client_cert_source = None @@ -416,7 +416,7 @@ def test_cloud_tasks_client_client_options_scopes( options = client_options.ClientOptions(scopes=["1", "2"],) with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file=None, @@ -447,7 +447,7 @@ def test_cloud_tasks_client_client_options_credentials_file( options = client_options.ClientOptions(credentials_file="credentials.json") with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file="credentials.json", @@ -478,9 +478,8 @@ def test_cloud_tasks_client_client_options_from_dict(): ) -def test_list_queues( - transport: str = "grpc", request_type=cloudtasks.ListQueuesRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.ListQueuesRequest, dict,]) +def test_list_queues(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -507,10 +506,6 @@ def test_list_queues( assert response.next_page_token == "next_page_token_value" -def test_list_queues_from_dict(): - test_list_queues(request_type=dict) - - def test_list_queues_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -680,8 +675,10 @@ async def test_list_queues_flattened_error_async(): ) -def test_list_queues_pager(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_queues_pager(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_queues), "__call__") as call: @@ -712,8 +709,10 @@ def test_list_queues_pager(): assert all(isinstance(i, queue.Queue) for i in results) -def test_list_queues_pages(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_queues_pages(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_queues), "__call__") as call: @@ -794,7 +793,8 @@ async def test_list_queues_async_pages(): assert page_.raw_page.next_page_token == token -def test_get_queue(transport: str = "grpc", request_type=cloudtasks.GetQueueRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.GetQueueRequest, dict,]) +def test_get_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -822,10 +822,6 @@ def test_get_queue(transport: str = "grpc", request_type=cloudtasks.GetQueueRequ assert response.state == queue.Queue.State.RUNNING -def test_get_queue_from_dict(): - test_get_queue(request_type=dict) - - def test_get_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -992,9 +988,8 @@ async def test_get_queue_flattened_error_async(): ) -def test_create_queue( - transport: str = "grpc", request_type=cloudtasks.CreateQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CreateQueueRequest, dict,]) +def test_create_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1022,10 +1017,6 @@ def test_create_queue( assert response.state == gct_queue.Queue.State.RUNNING -def test_create_queue_from_dict(): - test_create_queue(request_type=dict) - - def test_create_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1206,9 +1197,8 @@ async def test_create_queue_flattened_error_async(): ) -def test_update_queue( - transport: str = "grpc", request_type=cloudtasks.UpdateQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.UpdateQueueRequest, dict,]) +def test_update_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1236,10 +1226,6 @@ def test_update_queue( assert response.state == gct_queue.Queue.State.RUNNING -def test_update_queue_from_dict(): - test_update_queue(request_type=dict) - - def test_update_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1422,9 +1408,8 @@ async def test_update_queue_flattened_error_async(): ) -def test_delete_queue( - transport: str = "grpc", request_type=cloudtasks.DeleteQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.DeleteQueueRequest, dict,]) +def test_delete_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1448,10 +1433,6 @@ def test_delete_queue( assert response is None -def test_delete_queue_from_dict(): - test_delete_queue(request_type=dict) - - def test_delete_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1614,9 +1595,8 @@ async def test_delete_queue_flattened_error_async(): ) -def test_purge_queue( - transport: str = "grpc", request_type=cloudtasks.PurgeQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.PurgeQueueRequest, dict,]) +def test_purge_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1644,10 +1624,6 @@ def test_purge_queue( assert response.state == queue.Queue.State.RUNNING -def test_purge_queue_from_dict(): - test_purge_queue(request_type=dict) - - def test_purge_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1814,9 +1790,8 @@ async def test_purge_queue_flattened_error_async(): ) -def test_pause_queue( - transport: str = "grpc", request_type=cloudtasks.PauseQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.PauseQueueRequest, dict,]) +def test_pause_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1844,10 +1819,6 @@ def test_pause_queue( assert response.state == queue.Queue.State.RUNNING -def test_pause_queue_from_dict(): - test_pause_queue(request_type=dict) - - def test_pause_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2014,9 +1985,8 @@ async def test_pause_queue_flattened_error_async(): ) -def test_resume_queue( - transport: str = "grpc", request_type=cloudtasks.ResumeQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.ResumeQueueRequest, dict,]) +def test_resume_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2044,10 +2014,6 @@ def test_resume_queue( assert response.state == queue.Queue.State.RUNNING -def test_resume_queue_from_dict(): - test_resume_queue(request_type=dict) - - def test_resume_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2214,9 +2180,8 @@ async def test_resume_queue_flattened_error_async(): ) -def test_get_iam_policy( - transport: str = "grpc", request_type=iam_policy_pb2.GetIamPolicyRequest -): +@pytest.mark.parametrize("request_type", [iam_policy_pb2.GetIamPolicyRequest, dict,]) +def test_get_iam_policy(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2242,10 +2207,6 @@ def test_get_iam_policy( assert response.etag == b"etag_blob" -def test_get_iam_policy_from_dict(): - test_get_iam_policy(request_type=dict) - - def test_get_iam_policy_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2427,9 +2388,8 @@ async def test_get_iam_policy_flattened_error_async(): ) -def test_set_iam_policy( - transport: str = "grpc", request_type=iam_policy_pb2.SetIamPolicyRequest -): +@pytest.mark.parametrize("request_type", [iam_policy_pb2.SetIamPolicyRequest, dict,]) +def test_set_iam_policy(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2455,10 +2415,6 @@ def test_set_iam_policy( assert response.etag == b"etag_blob" -def test_set_iam_policy_from_dict(): - test_set_iam_policy(request_type=dict) - - def test_set_iam_policy_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2640,9 +2596,10 @@ async def test_set_iam_policy_flattened_error_async(): ) -def test_test_iam_permissions( - transport: str = "grpc", request_type=iam_policy_pb2.TestIamPermissionsRequest -): +@pytest.mark.parametrize( + "request_type", [iam_policy_pb2.TestIamPermissionsRequest, dict,] +) +def test_test_iam_permissions(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2671,10 +2628,6 @@ def test_test_iam_permissions( assert response.permissions == ["permissions_value"] -def test_test_iam_permissions_from_dict(): - test_test_iam_permissions(request_type=dict) - - def test_test_iam_permissions_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2890,7 +2843,8 @@ async def test_test_iam_permissions_flattened_error_async(): ) -def test_list_tasks(transport: str = "grpc", request_type=cloudtasks.ListTasksRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.ListTasksRequest, dict,]) +def test_list_tasks(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2917,10 +2871,6 @@ def test_list_tasks(transport: str = "grpc", request_type=cloudtasks.ListTasksRe assert response.next_page_token == "next_page_token_value" -def test_list_tasks_from_dict(): - test_list_tasks(request_type=dict) - - def test_list_tasks_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3090,8 +3040,10 @@ async def test_list_tasks_flattened_error_async(): ) -def test_list_tasks_pager(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_tasks_pager(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_tasks), "__call__") as call: @@ -3119,8 +3071,10 @@ def test_list_tasks_pager(): assert all(isinstance(i, task.Task) for i in results) -def test_list_tasks_pages(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_tasks_pages(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_tasks), "__call__") as call: @@ -3192,7 +3146,8 @@ async def test_list_tasks_async_pages(): assert page_.raw_page.next_page_token == token -def test_get_task(transport: str = "grpc", request_type=cloudtasks.GetTaskRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.GetTaskRequest, dict,]) +def test_get_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3228,10 +3183,6 @@ def test_get_task(transport: str = "grpc", request_type=cloudtasks.GetTaskReques assert response.view == task.Task.View.BASIC -def test_get_task_from_dict(): - test_get_task(request_type=dict) - - def test_get_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3405,9 +3356,8 @@ async def test_get_task_flattened_error_async(): ) -def test_create_task( - transport: str = "grpc", request_type=cloudtasks.CreateTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CreateTaskRequest, dict,]) +def test_create_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3443,10 +3393,6 @@ def test_create_task( assert response.view == gct_task.Task.View.BASIC -def test_create_task_from_dict(): - test_create_task(request_type=dict) - - def test_create_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3634,9 +3580,8 @@ async def test_create_task_flattened_error_async(): ) -def test_delete_task( - transport: str = "grpc", request_type=cloudtasks.DeleteTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.DeleteTaskRequest, dict,]) +def test_delete_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3660,10 +3605,6 @@ def test_delete_task( assert response is None -def test_delete_task_from_dict(): - test_delete_task(request_type=dict) - - def test_delete_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3826,7 +3767,8 @@ async def test_delete_task_flattened_error_async(): ) -def test_run_task(transport: str = "grpc", request_type=cloudtasks.RunTaskRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.RunTaskRequest, dict,]) +def test_run_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3862,10 +3804,6 @@ def test_run_task(transport: str = "grpc", request_type=cloudtasks.RunTaskReques assert response.view == task.Task.View.BASIC -def test_run_task_from_dict(): - test_run_task(request_type=dict) - - def test_run_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -4577,7 +4515,7 @@ def test_parse_common_location_path(): assert expected == actual -def test_client_withDEFAULT_CLIENT_INFO(): +def test_client_with_default_client_info(): client_info = gapic_v1.client_info.ClientInfo() with mock.patch.object( diff --git a/tests/unit/gapic/tasks_v2beta2/test_cloud_tasks.py b/tests/unit/gapic/tasks_v2beta2/test_cloud_tasks.py index 37590e76..7613f45c 100644 --- a/tests/unit/gapic/tasks_v2beta2/test_cloud_tasks.py +++ b/tests/unit/gapic/tasks_v2beta2/test_cloud_tasks.py @@ -251,20 +251,20 @@ def test_cloud_tasks_client_client_options( # unsupported value. with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "Unsupported"}): with pytest.raises(MutualTLSChannelError): - client = client_class() + client = client_class(transport=transport_name) # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. with mock.patch.dict( os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} ): with pytest.raises(ValueError): - client = client_class() + client = client_class(transport=transport_name) # Check the case quota_project_id is provided options = client_options.ClientOptions(quota_project_id="octopus") with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file=None, @@ -321,7 +321,7 @@ def test_cloud_tasks_client_mtls_env_auto( ) with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) if use_client_cert_env == "false": expected_client_cert_source = None @@ -416,7 +416,7 @@ def test_cloud_tasks_client_client_options_scopes( options = client_options.ClientOptions(scopes=["1", "2"],) with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file=None, @@ -447,7 +447,7 @@ def test_cloud_tasks_client_client_options_credentials_file( options = client_options.ClientOptions(credentials_file="credentials.json") with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file="credentials.json", @@ -478,9 +478,8 @@ def test_cloud_tasks_client_client_options_from_dict(): ) -def test_list_queues( - transport: str = "grpc", request_type=cloudtasks.ListQueuesRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.ListQueuesRequest, dict,]) +def test_list_queues(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -507,10 +506,6 @@ def test_list_queues( assert response.next_page_token == "next_page_token_value" -def test_list_queues_from_dict(): - test_list_queues(request_type=dict) - - def test_list_queues_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -680,8 +675,10 @@ async def test_list_queues_flattened_error_async(): ) -def test_list_queues_pager(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_queues_pager(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_queues), "__call__") as call: @@ -712,8 +709,10 @@ def test_list_queues_pager(): assert all(isinstance(i, queue.Queue) for i in results) -def test_list_queues_pages(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_queues_pages(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_queues), "__call__") as call: @@ -794,7 +793,8 @@ async def test_list_queues_async_pages(): assert page_.raw_page.next_page_token == token -def test_get_queue(transport: str = "grpc", request_type=cloudtasks.GetQueueRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.GetQueueRequest, dict,]) +def test_get_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -828,10 +828,6 @@ def test_get_queue(transport: str = "grpc", request_type=cloudtasks.GetQueueRequ assert response.state == queue.Queue.State.RUNNING -def test_get_queue_from_dict(): - test_get_queue(request_type=dict) - - def test_get_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -998,9 +994,8 @@ async def test_get_queue_flattened_error_async(): ) -def test_create_queue( - transport: str = "grpc", request_type=cloudtasks.CreateQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CreateQueueRequest, dict,]) +def test_create_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1034,10 +1029,6 @@ def test_create_queue( assert response.state == gct_queue.Queue.State.RUNNING -def test_create_queue_from_dict(): - test_create_queue(request_type=dict) - - def test_create_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1218,9 +1209,8 @@ async def test_create_queue_flattened_error_async(): ) -def test_update_queue( - transport: str = "grpc", request_type=cloudtasks.UpdateQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.UpdateQueueRequest, dict,]) +def test_update_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1254,10 +1244,6 @@ def test_update_queue( assert response.state == gct_queue.Queue.State.RUNNING -def test_update_queue_from_dict(): - test_update_queue(request_type=dict) - - def test_update_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1440,9 +1426,8 @@ async def test_update_queue_flattened_error_async(): ) -def test_delete_queue( - transport: str = "grpc", request_type=cloudtasks.DeleteQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.DeleteQueueRequest, dict,]) +def test_delete_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1466,10 +1451,6 @@ def test_delete_queue( assert response is None -def test_delete_queue_from_dict(): - test_delete_queue(request_type=dict) - - def test_delete_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1632,9 +1613,8 @@ async def test_delete_queue_flattened_error_async(): ) -def test_purge_queue( - transport: str = "grpc", request_type=cloudtasks.PurgeQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.PurgeQueueRequest, dict,]) +def test_purge_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1668,10 +1648,6 @@ def test_purge_queue( assert response.state == queue.Queue.State.RUNNING -def test_purge_queue_from_dict(): - test_purge_queue(request_type=dict) - - def test_purge_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1838,9 +1814,8 @@ async def test_purge_queue_flattened_error_async(): ) -def test_pause_queue( - transport: str = "grpc", request_type=cloudtasks.PauseQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.PauseQueueRequest, dict,]) +def test_pause_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1874,10 +1849,6 @@ def test_pause_queue( assert response.state == queue.Queue.State.RUNNING -def test_pause_queue_from_dict(): - test_pause_queue(request_type=dict) - - def test_pause_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2044,9 +2015,8 @@ async def test_pause_queue_flattened_error_async(): ) -def test_resume_queue( - transport: str = "grpc", request_type=cloudtasks.ResumeQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.ResumeQueueRequest, dict,]) +def test_resume_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2080,10 +2050,6 @@ def test_resume_queue( assert response.state == queue.Queue.State.RUNNING -def test_resume_queue_from_dict(): - test_resume_queue(request_type=dict) - - def test_resume_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2250,9 +2216,8 @@ async def test_resume_queue_flattened_error_async(): ) -def test_get_iam_policy( - transport: str = "grpc", request_type=iam_policy_pb2.GetIamPolicyRequest -): +@pytest.mark.parametrize("request_type", [iam_policy_pb2.GetIamPolicyRequest, dict,]) +def test_get_iam_policy(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2278,10 +2243,6 @@ def test_get_iam_policy( assert response.etag == b"etag_blob" -def test_get_iam_policy_from_dict(): - test_get_iam_policy(request_type=dict) - - def test_get_iam_policy_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2463,9 +2424,8 @@ async def test_get_iam_policy_flattened_error_async(): ) -def test_set_iam_policy( - transport: str = "grpc", request_type=iam_policy_pb2.SetIamPolicyRequest -): +@pytest.mark.parametrize("request_type", [iam_policy_pb2.SetIamPolicyRequest, dict,]) +def test_set_iam_policy(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2491,10 +2451,6 @@ def test_set_iam_policy( assert response.etag == b"etag_blob" -def test_set_iam_policy_from_dict(): - test_set_iam_policy(request_type=dict) - - def test_set_iam_policy_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2676,9 +2632,10 @@ async def test_set_iam_policy_flattened_error_async(): ) -def test_test_iam_permissions( - transport: str = "grpc", request_type=iam_policy_pb2.TestIamPermissionsRequest -): +@pytest.mark.parametrize( + "request_type", [iam_policy_pb2.TestIamPermissionsRequest, dict,] +) +def test_test_iam_permissions(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2707,10 +2664,6 @@ def test_test_iam_permissions( assert response.permissions == ["permissions_value"] -def test_test_iam_permissions_from_dict(): - test_test_iam_permissions(request_type=dict) - - def test_test_iam_permissions_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2926,7 +2879,8 @@ async def test_test_iam_permissions_flattened_error_async(): ) -def test_list_tasks(transport: str = "grpc", request_type=cloudtasks.ListTasksRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.ListTasksRequest, dict,]) +def test_list_tasks(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2953,10 +2907,6 @@ def test_list_tasks(transport: str = "grpc", request_type=cloudtasks.ListTasksRe assert response.next_page_token == "next_page_token_value" -def test_list_tasks_from_dict(): - test_list_tasks(request_type=dict) - - def test_list_tasks_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3126,8 +3076,10 @@ async def test_list_tasks_flattened_error_async(): ) -def test_list_tasks_pager(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_tasks_pager(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_tasks), "__call__") as call: @@ -3155,8 +3107,10 @@ def test_list_tasks_pager(): assert all(isinstance(i, task.Task) for i in results) -def test_list_tasks_pages(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_tasks_pages(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_tasks), "__call__") as call: @@ -3228,7 +3182,8 @@ async def test_list_tasks_async_pages(): assert page_.raw_page.next_page_token == token -def test_get_task(transport: str = "grpc", request_type=cloudtasks.GetTaskRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.GetTaskRequest, dict,]) +def test_get_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3260,10 +3215,6 @@ def test_get_task(transport: str = "grpc", request_type=cloudtasks.GetTaskReques assert response.view == task.Task.View.BASIC -def test_get_task_from_dict(): - test_get_task(request_type=dict) - - def test_get_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3430,9 +3381,8 @@ async def test_get_task_flattened_error_async(): ) -def test_create_task( - transport: str = "grpc", request_type=cloudtasks.CreateTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CreateTaskRequest, dict,]) +def test_create_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3464,10 +3414,6 @@ def test_create_task( assert response.view == gct_task.Task.View.BASIC -def test_create_task_from_dict(): - test_create_task(request_type=dict) - - def test_create_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3648,9 +3594,8 @@ async def test_create_task_flattened_error_async(): ) -def test_delete_task( - transport: str = "grpc", request_type=cloudtasks.DeleteTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.DeleteTaskRequest, dict,]) +def test_delete_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3674,10 +3619,6 @@ def test_delete_task( assert response is None -def test_delete_task_from_dict(): - test_delete_task(request_type=dict) - - def test_delete_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3840,9 +3781,8 @@ async def test_delete_task_flattened_error_async(): ) -def test_lease_tasks( - transport: str = "grpc", request_type=cloudtasks.LeaseTasksRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.LeaseTasksRequest, dict,]) +def test_lease_tasks(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3866,10 +3806,6 @@ def test_lease_tasks( assert isinstance(response, cloudtasks.LeaseTasksResponse) -def test_lease_tasks_from_dict(): - test_lease_tasks(request_type=dict) - - def test_lease_tasks_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -4052,9 +3988,8 @@ async def test_lease_tasks_flattened_error_async(): ) -def test_acknowledge_task( - transport: str = "grpc", request_type=cloudtasks.AcknowledgeTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.AcknowledgeTaskRequest, dict,]) +def test_acknowledge_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -4078,10 +4013,6 @@ def test_acknowledge_task( assert response is None -def test_acknowledge_task_from_dict(): - test_acknowledge_task(request_type=dict) - - def test_acknowledge_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -4258,9 +4189,8 @@ async def test_acknowledge_task_flattened_error_async(): ) -def test_renew_lease( - transport: str = "grpc", request_type=cloudtasks.RenewLeaseRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.RenewLeaseRequest, dict,]) +def test_renew_lease(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -4292,10 +4222,6 @@ def test_renew_lease( assert response.view == task.Task.View.BASIC -def test_renew_lease_from_dict(): - test_renew_lease(request_type=dict) - - def test_renew_lease_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -4488,9 +4414,8 @@ async def test_renew_lease_flattened_error_async(): ) -def test_cancel_lease( - transport: str = "grpc", request_type=cloudtasks.CancelLeaseRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CancelLeaseRequest, dict,]) +def test_cancel_lease(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -4522,10 +4447,6 @@ def test_cancel_lease( assert response.view == task.Task.View.BASIC -def test_cancel_lease_from_dict(): - test_cancel_lease(request_type=dict) - - def test_cancel_lease_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -4706,7 +4627,8 @@ async def test_cancel_lease_flattened_error_async(): ) -def test_run_task(transport: str = "grpc", request_type=cloudtasks.RunTaskRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.RunTaskRequest, dict,]) +def test_run_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -4738,10 +4660,6 @@ def test_run_task(transport: str = "grpc", request_type=cloudtasks.RunTaskReques assert response.view == task.Task.View.BASIC -def test_run_task_from_dict(): - test_run_task(request_type=dict) - - def test_run_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -5450,7 +5368,7 @@ def test_parse_common_location_path(): assert expected == actual -def test_client_withDEFAULT_CLIENT_INFO(): +def test_client_with_default_client_info(): client_info = gapic_v1.client_info.ClientInfo() with mock.patch.object( diff --git a/tests/unit/gapic/tasks_v2beta3/test_cloud_tasks.py b/tests/unit/gapic/tasks_v2beta3/test_cloud_tasks.py index cf93e421..858eca12 100644 --- a/tests/unit/gapic/tasks_v2beta3/test_cloud_tasks.py +++ b/tests/unit/gapic/tasks_v2beta3/test_cloud_tasks.py @@ -251,20 +251,20 @@ def test_cloud_tasks_client_client_options( # unsupported value. with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "Unsupported"}): with pytest.raises(MutualTLSChannelError): - client = client_class() + client = client_class(transport=transport_name) # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. with mock.patch.dict( os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} ): with pytest.raises(ValueError): - client = client_class() + client = client_class(transport=transport_name) # Check the case quota_project_id is provided options = client_options.ClientOptions(quota_project_id="octopus") with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file=None, @@ -321,7 +321,7 @@ def test_cloud_tasks_client_mtls_env_auto( ) with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) if use_client_cert_env == "false": expected_client_cert_source = None @@ -416,7 +416,7 @@ def test_cloud_tasks_client_client_options_scopes( options = client_options.ClientOptions(scopes=["1", "2"],) with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file=None, @@ -447,7 +447,7 @@ def test_cloud_tasks_client_client_options_credentials_file( options = client_options.ClientOptions(credentials_file="credentials.json") with mock.patch.object(transport_class, "__init__") as patched: patched.return_value = None - client = client_class(transport=transport_name, client_options=options) + client = client_class(client_options=options, transport=transport_name) patched.assert_called_once_with( credentials=None, credentials_file="credentials.json", @@ -478,9 +478,8 @@ def test_cloud_tasks_client_client_options_from_dict(): ) -def test_list_queues( - transport: str = "grpc", request_type=cloudtasks.ListQueuesRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.ListQueuesRequest, dict,]) +def test_list_queues(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -507,10 +506,6 @@ def test_list_queues( assert response.next_page_token == "next_page_token_value" -def test_list_queues_from_dict(): - test_list_queues(request_type=dict) - - def test_list_queues_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -680,8 +675,10 @@ async def test_list_queues_flattened_error_async(): ) -def test_list_queues_pager(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_queues_pager(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_queues), "__call__") as call: @@ -712,8 +709,10 @@ def test_list_queues_pager(): assert all(isinstance(i, queue.Queue) for i in results) -def test_list_queues_pages(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_queues_pages(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_queues), "__call__") as call: @@ -794,7 +793,8 @@ async def test_list_queues_async_pages(): assert page_.raw_page.next_page_token == token -def test_get_queue(transport: str = "grpc", request_type=cloudtasks.GetQueueRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.GetQueueRequest, dict,]) +def test_get_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -830,10 +830,6 @@ def test_get_queue(transport: str = "grpc", request_type=cloudtasks.GetQueueRequ assert response.type_ == queue.Queue.Type.PULL -def test_get_queue_from_dict(): - test_get_queue(request_type=dict) - - def test_get_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1005,9 +1001,8 @@ async def test_get_queue_flattened_error_async(): ) -def test_create_queue( - transport: str = "grpc", request_type=cloudtasks.CreateQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CreateQueueRequest, dict,]) +def test_create_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1043,10 +1038,6 @@ def test_create_queue( assert response.type_ == gct_queue.Queue.Type.PULL -def test_create_queue_from_dict(): - test_create_queue(request_type=dict) - - def test_create_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1232,9 +1223,8 @@ async def test_create_queue_flattened_error_async(): ) -def test_update_queue( - transport: str = "grpc", request_type=cloudtasks.UpdateQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.UpdateQueueRequest, dict,]) +def test_update_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1270,10 +1260,6 @@ def test_update_queue( assert response.type_ == gct_queue.Queue.Type.PULL -def test_update_queue_from_dict(): - test_update_queue(request_type=dict) - - def test_update_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1461,9 +1447,8 @@ async def test_update_queue_flattened_error_async(): ) -def test_delete_queue( - transport: str = "grpc", request_type=cloudtasks.DeleteQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.DeleteQueueRequest, dict,]) +def test_delete_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1487,10 +1472,6 @@ def test_delete_queue( assert response is None -def test_delete_queue_from_dict(): - test_delete_queue(request_type=dict) - - def test_delete_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1653,9 +1634,8 @@ async def test_delete_queue_flattened_error_async(): ) -def test_purge_queue( - transport: str = "grpc", request_type=cloudtasks.PurgeQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.PurgeQueueRequest, dict,]) +def test_purge_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1691,10 +1671,6 @@ def test_purge_queue( assert response.type_ == queue.Queue.Type.PULL -def test_purge_queue_from_dict(): - test_purge_queue(request_type=dict) - - def test_purge_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -1866,9 +1842,8 @@ async def test_purge_queue_flattened_error_async(): ) -def test_pause_queue( - transport: str = "grpc", request_type=cloudtasks.PauseQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.PauseQueueRequest, dict,]) +def test_pause_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -1904,10 +1879,6 @@ def test_pause_queue( assert response.type_ == queue.Queue.Type.PULL -def test_pause_queue_from_dict(): - test_pause_queue(request_type=dict) - - def test_pause_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2079,9 +2050,8 @@ async def test_pause_queue_flattened_error_async(): ) -def test_resume_queue( - transport: str = "grpc", request_type=cloudtasks.ResumeQueueRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.ResumeQueueRequest, dict,]) +def test_resume_queue(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2117,10 +2087,6 @@ def test_resume_queue( assert response.type_ == queue.Queue.Type.PULL -def test_resume_queue_from_dict(): - test_resume_queue(request_type=dict) - - def test_resume_queue_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2292,9 +2258,8 @@ async def test_resume_queue_flattened_error_async(): ) -def test_get_iam_policy( - transport: str = "grpc", request_type=iam_policy_pb2.GetIamPolicyRequest -): +@pytest.mark.parametrize("request_type", [iam_policy_pb2.GetIamPolicyRequest, dict,]) +def test_get_iam_policy(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2320,10 +2285,6 @@ def test_get_iam_policy( assert response.etag == b"etag_blob" -def test_get_iam_policy_from_dict(): - test_get_iam_policy(request_type=dict) - - def test_get_iam_policy_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2505,9 +2466,8 @@ async def test_get_iam_policy_flattened_error_async(): ) -def test_set_iam_policy( - transport: str = "grpc", request_type=iam_policy_pb2.SetIamPolicyRequest -): +@pytest.mark.parametrize("request_type", [iam_policy_pb2.SetIamPolicyRequest, dict,]) +def test_set_iam_policy(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2533,10 +2493,6 @@ def test_set_iam_policy( assert response.etag == b"etag_blob" -def test_set_iam_policy_from_dict(): - test_set_iam_policy(request_type=dict) - - def test_set_iam_policy_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2718,9 +2674,10 @@ async def test_set_iam_policy_flattened_error_async(): ) -def test_test_iam_permissions( - transport: str = "grpc", request_type=iam_policy_pb2.TestIamPermissionsRequest -): +@pytest.mark.parametrize( + "request_type", [iam_policy_pb2.TestIamPermissionsRequest, dict,] +) +def test_test_iam_permissions(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2749,10 +2706,6 @@ def test_test_iam_permissions( assert response.permissions == ["permissions_value"] -def test_test_iam_permissions_from_dict(): - test_test_iam_permissions(request_type=dict) - - def test_test_iam_permissions_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -2968,7 +2921,8 @@ async def test_test_iam_permissions_flattened_error_async(): ) -def test_list_tasks(transport: str = "grpc", request_type=cloudtasks.ListTasksRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.ListTasksRequest, dict,]) +def test_list_tasks(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -2995,10 +2949,6 @@ def test_list_tasks(transport: str = "grpc", request_type=cloudtasks.ListTasksRe assert response.next_page_token == "next_page_token_value" -def test_list_tasks_from_dict(): - test_list_tasks(request_type=dict) - - def test_list_tasks_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3168,8 +3118,10 @@ async def test_list_tasks_flattened_error_async(): ) -def test_list_tasks_pager(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_tasks_pager(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_tasks), "__call__") as call: @@ -3197,8 +3149,10 @@ def test_list_tasks_pager(): assert all(isinstance(i, task.Task) for i in results) -def test_list_tasks_pages(): - client = CloudTasksClient(credentials=ga_credentials.AnonymousCredentials,) +def test_list_tasks_pages(transport_name: str = "grpc"): + client = CloudTasksClient( + credentials=ga_credentials.AnonymousCredentials, transport=transport_name, + ) # Mock the actual call within the gRPC stub, and fake the request. with mock.patch.object(type(client.transport.list_tasks), "__call__") as call: @@ -3270,7 +3224,8 @@ async def test_list_tasks_async_pages(): assert page_.raw_page.next_page_token == token -def test_get_task(transport: str = "grpc", request_type=cloudtasks.GetTaskRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.GetTaskRequest, dict,]) +def test_get_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3306,10 +3261,6 @@ def test_get_task(transport: str = "grpc", request_type=cloudtasks.GetTaskReques assert response.view == task.Task.View.BASIC -def test_get_task_from_dict(): - test_get_task(request_type=dict) - - def test_get_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3483,9 +3434,8 @@ async def test_get_task_flattened_error_async(): ) -def test_create_task( - transport: str = "grpc", request_type=cloudtasks.CreateTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.CreateTaskRequest, dict,]) +def test_create_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3521,10 +3471,6 @@ def test_create_task( assert response.view == gct_task.Task.View.BASIC -def test_create_task_from_dict(): - test_create_task(request_type=dict) - - def test_create_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3712,9 +3658,8 @@ async def test_create_task_flattened_error_async(): ) -def test_delete_task( - transport: str = "grpc", request_type=cloudtasks.DeleteTaskRequest -): +@pytest.mark.parametrize("request_type", [cloudtasks.DeleteTaskRequest, dict,]) +def test_delete_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3738,10 +3683,6 @@ def test_delete_task( assert response is None -def test_delete_task_from_dict(): - test_delete_task(request_type=dict) - - def test_delete_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -3904,7 +3845,8 @@ async def test_delete_task_flattened_error_async(): ) -def test_run_task(transport: str = "grpc", request_type=cloudtasks.RunTaskRequest): +@pytest.mark.parametrize("request_type", [cloudtasks.RunTaskRequest, dict,]) +def test_run_task(request_type, transport: str = "grpc"): client = CloudTasksClient( credentials=ga_credentials.AnonymousCredentials(), transport=transport, ) @@ -3940,10 +3882,6 @@ def test_run_task(transport: str = "grpc", request_type=cloudtasks.RunTaskReques assert response.view == task.Task.View.BASIC -def test_run_task_from_dict(): - test_run_task(request_type=dict) - - def test_run_task_empty_call(): # This test is a coverage failsafe to make sure that totally empty calls, # i.e. request == None and no flattened fields passed, work. @@ -4655,7 +4593,7 @@ def test_parse_common_location_path(): assert expected == actual -def test_client_withDEFAULT_CLIENT_INFO(): +def test_client_with_default_client_info(): client_info = gapic_v1.client_info.ClientInfo() with mock.patch.object(