From 74a6e344583bf70397da94a96504ef97a8544a62 Mon Sep 17 00:00:00 2001 From: Eugene Kostieiev Date: Fri, 27 Aug 2021 17:48:27 +0300 Subject: [PATCH 1/3] Update composer_storage_trigger.py script to support stable API for Airflow 2. --- .../functions/composer_storage_trigger.py | 15 ++++++++---- .../composer_storage_trigger_test.py | 23 +++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/composer/functions/composer_storage_trigger.py b/composer/functions/composer_storage_trigger.py index 22843952bbf..e42d090c840 100644 --- a/composer/functions/composer_storage_trigger.py +++ b/composer/functions/composer_storage_trigger.py @@ -21,6 +21,7 @@ IAM_SCOPE = 'https://www.googleapis.com/auth/iam' OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token' +USE_EXPERIMENTAL_API = True def trigger_dag(data, context=None): @@ -46,16 +47,22 @@ def trigger_dag(data, context=None): webserver_id = 'YOUR-TENANT-PROJECT' # The name of the DAG you wish to trigger dag_name = 'composer_sample_trigger_response_dag' + + if USE_EXPERIMENTAL_API: + endpoint = 'api/experimental/dags/{}/dag_runs'.format(dag_name) + json_data = {'conf': data, 'replace_microseconds': 'false'} + else: + endpoint = 'api/v1/dags/{}/dagRuns'.format(dag_name) + json_data = {'conf': data} webserver_url = ( 'https://' + webserver_id - + '.appspot.com/api/experimental/dags/' - + dag_name - + '/dag_runs' + + '.appspot.com/' + + endpoint ) # Make a POST request to IAP which then Triggers the DAG make_iap_request( - webserver_url, client_id, method='POST', json={"conf": data, "replace_microseconds": 'false'}) + webserver_url, client_id, method='POST', json=json_data) # This code is copied from diff --git a/composer/functions/composer_storage_trigger_test.py b/composer/functions/composer_storage_trigger_test.py index fa5e1d23d70..7d7410ac878 100644 --- a/composer/functions/composer_storage_trigger_test.py +++ b/composer/functions/composer_storage_trigger_test.py @@ -35,3 +35,26 @@ def test_iap_response_error(make_iap_request_mock): trigger_event = {'file': 'some-gcs-file'} with pytest.raises(Exception): composer_storage_trigger.trigger_dag(trigger_event) + + +@mock.patch('composer_storage_trigger.make_iap_request', autospec=True) +def test_experimental_api_endpoint(make_iap_request_mock): + composer_storage_trigger.trigger_dag({'test': 'a'}) + + make_iap_request_mock.assert_called_once_with( + 'https://YOUR-TENANT-PROJECT.appspot.com/api/experimental/dags/composer_sample_trigger_response_dag/dag_runs', + 'YOUR-CLIENT-ID', method='POST', + json={'conf': {'test': 'a'}, 'replace_microseconds': 'false'}, + ) + + +@mock.patch('composer_storage_trigger.make_iap_request', autospec=True) +@mock.patch('composer_storage_trigger.USE_EXPERIMENTAL_API', False) +def test_stable_api_endpoint(make_iap_request_mock): + composer_storage_trigger.trigger_dag({'test': 'a'}) + + make_iap_request_mock.assert_called_once_with( + 'https://YOUR-TENANT-PROJECT.appspot.com/api/v1/dags/composer_sample_trigger_response_dag/dagRuns', + 'YOUR-CLIENT-ID', method='POST', + json={'conf': {'test': 'a'}}, + ) From c1f8de0862dcc9204043de87c275c77eead1ff5f Mon Sep 17 00:00:00 2001 From: "Leah E. Cole" <6719667+leahecole@users.noreply.github.com> Date: Wed, 1 Sep 2021 10:54:14 -0700 Subject: [PATCH 2/3] Update composer/functions/composer_storage_trigger.py --- composer/functions/composer_storage_trigger.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/composer/functions/composer_storage_trigger.py b/composer/functions/composer_storage_trigger.py index e42d090c840..219d07a75b9 100644 --- a/composer/functions/composer_storage_trigger.py +++ b/composer/functions/composer_storage_trigger.py @@ -21,6 +21,8 @@ IAM_SCOPE = 'https://www.googleapis.com/auth/iam' OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token' +# If you are using the stable API, set this value to False +# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api USE_EXPERIMENTAL_API = True From 42c6a7b577b990ad91349362d704ef171d6eac3b Mon Sep 17 00:00:00 2001 From: "Leah E. Cole" <6719667+leahecole@users.noreply.github.com> Date: Wed, 1 Sep 2021 11:28:31 -0700 Subject: [PATCH 3/3] Apply suggestions from code review --- composer/functions/composer_storage_trigger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer/functions/composer_storage_trigger.py b/composer/functions/composer_storage_trigger.py index 219d07a75b9..475f1182f18 100644 --- a/composer/functions/composer_storage_trigger.py +++ b/composer/functions/composer_storage_trigger.py @@ -51,10 +51,10 @@ def trigger_dag(data, context=None): dag_name = 'composer_sample_trigger_response_dag' if USE_EXPERIMENTAL_API: - endpoint = 'api/experimental/dags/{}/dag_runs'.format(dag_name) + endpoint = f'api/experimental/dags/{dag_name}/dag_runs' json_data = {'conf': data, 'replace_microseconds': 'false'} else: - endpoint = 'api/v1/dags/{}/dagRuns'.format(dag_name) + endpoint = f'api/v1/dags/{dag_name}/dagRuns' json_data = {'conf': data} webserver_url = ( 'https://'