diff --git a/composer/functions/composer_storage_trigger.py b/composer/functions/composer_storage_trigger.py index 22843952bbf..475f1182f18 100644 --- a/composer/functions/composer_storage_trigger.py +++ b/composer/functions/composer_storage_trigger.py @@ -21,6 +21,9 @@ IAM_SCOPE = 'https://www.googleapis.com/auth/iam' OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token' +# If you are using the stable API, set this value to False +# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api +USE_EXPERIMENTAL_API = True def trigger_dag(data, context=None): @@ -46,16 +49,22 @@ def trigger_dag(data, context=None): webserver_id = 'YOUR-TENANT-PROJECT' # The name of the DAG you wish to trigger dag_name = 'composer_sample_trigger_response_dag' + + if USE_EXPERIMENTAL_API: + endpoint = f'api/experimental/dags/{dag_name}/dag_runs' + json_data = {'conf': data, 'replace_microseconds': 'false'} + else: + endpoint = f'api/v1/dags/{dag_name}/dagRuns' + json_data = {'conf': data} webserver_url = ( 'https://' + webserver_id - + '.appspot.com/api/experimental/dags/' - + dag_name - + '/dag_runs' + + '.appspot.com/' + + endpoint ) # Make a POST request to IAP which then Triggers the DAG make_iap_request( - webserver_url, client_id, method='POST', json={"conf": data, "replace_microseconds": 'false'}) + webserver_url, client_id, method='POST', json=json_data) # This code is copied from diff --git a/composer/functions/composer_storage_trigger_test.py b/composer/functions/composer_storage_trigger_test.py index fa5e1d23d70..7d7410ac878 100644 --- a/composer/functions/composer_storage_trigger_test.py +++ b/composer/functions/composer_storage_trigger_test.py @@ -35,3 +35,26 @@ def test_iap_response_error(make_iap_request_mock): trigger_event = {'file': 'some-gcs-file'} with pytest.raises(Exception): composer_storage_trigger.trigger_dag(trigger_event) + + +@mock.patch('composer_storage_trigger.make_iap_request', autospec=True) +def test_experimental_api_endpoint(make_iap_request_mock): + composer_storage_trigger.trigger_dag({'test': 'a'}) + + make_iap_request_mock.assert_called_once_with( + 'https://YOUR-TENANT-PROJECT.appspot.com/api/experimental/dags/composer_sample_trigger_response_dag/dag_runs', + 'YOUR-CLIENT-ID', method='POST', + json={'conf': {'test': 'a'}, 'replace_microseconds': 'false'}, + ) + + +@mock.patch('composer_storage_trigger.make_iap_request', autospec=True) +@mock.patch('composer_storage_trigger.USE_EXPERIMENTAL_API', False) +def test_stable_api_endpoint(make_iap_request_mock): + composer_storage_trigger.trigger_dag({'test': 'a'}) + + make_iap_request_mock.assert_called_once_with( + 'https://YOUR-TENANT-PROJECT.appspot.com/api/v1/dags/composer_sample_trigger_response_dag/dagRuns', + 'YOUR-CLIENT-ID', method='POST', + json={'conf': {'test': 'a'}}, + )