From f33f2955233491edae6ba917b05ed15928c8be9b Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Tue, 6 Apr 2021 15:37:28 +0100 Subject: [PATCH 01/17] adding new gcp blog directory and sample dag for my blog --- composer/blog/gcp-cloud-blog/README.md | 2 + composer/blog/gcp-cloud-blog/__init__.py | 0 .../data-orchestration-blog-sample-dag.py | 70 +++++++++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 composer/blog/gcp-cloud-blog/README.md create mode 100644 composer/blog/gcp-cloud-blog/__init__.py create mode 100644 composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py diff --git a/composer/blog/gcp-cloud-blog/README.md b/composer/blog/gcp-cloud-blog/README.md new file mode 100644 index 00000000000..30cc4210ffb --- /dev/null +++ b/composer/blog/gcp-cloud-blog/README.md @@ -0,0 +1,2 @@ +# GCP Cloud Blog +The code here is featured in posts found on the [Google Cloud Platform Blog](https://cloud.google.com/blog) \ No newline at end of file diff --git a/composer/blog/gcp-cloud-blog/__init__.py b/composer/blog/gcp-cloud-blog/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py b/composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py new file mode 100644 index 00000000000..89bc9de7d11 --- /dev/null +++ b/composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py @@ -0,0 +1,70 @@ +from airflow.utils import dates +from airflow import models +from airflow.hooks.base_hook import BaseHook +from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator +from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator +from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator +from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator + +BUCKET_NAME = models.Variable.get("bucket_name") +FILE_NAME = models.Variable.get("file_name") +BQ_SQL_STRING = models.Variable.get("bq_sql_string") + +def on_failure_callback(context): + ti = context.get('task_instance') + slack_msg = """ + :red_circle: Task Failed. + *Task*: {task} + *Dag*: {dag} + *Execution Time*: {exec_date} + *Log Url*: {log_url} + """.format( + task=ti.task_id, + dag=ti.dag_id, + log_url=ti.log_url, + exec_date=context.get('execution_date'), + ) + slack_webhook_token = BaseHook.get_connection('slack_connection').password + slack_error = SlackWebhookOperator( + task_id='post_slack_error', + http_conn_id='slack_connection', + channel='#airflow-alerts', + webhook_token=slack_webhook_token, + message=slack_msg) + slack_error.execute(context) + +with models.DAG( + 'transform-crm-workload', + schedule_interval=None, + start_date=dates.days_ago(0), + default_args={ 'on_failure_callback': on_failure_callback} +) as crm_workflow_dag: + + validate_file_exists = GCSObjectExistenceSensor( + task_id="validate_file_exists", + bucket=BUCKET_NAME, + object=FILE_NAME + ) + + start_dataflow_job = DataflowTemplatedJobStartOperator( + task_id="start-dataflow-template-job", + job_name='crm_wordcount', + template='gs://dataflow-templates/latest/Word_Count', + parameters={ + 'inputFile': "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=FILE_NAME), + 'output': "gs://{bucket}/output".format(bucket=BUCKET_NAME)} + ) + + execute_bigquery_sql = BigQueryCheckOperator( + task_id='execute_bigquery_sql', + sql=BQ_SQL_STRING, + use_legacy_sql=False + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", + bucket_name=BUCKET_NAME + ) + +validate_file_exists >> start_dataflow_job >> execute_bigquery_sql >> delete_bucket \ No newline at end of file From 5cb325bac36341ffc3b7131f8546b7bd7de23d32 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Mon, 12 Apr 2021 11:33:03 +0100 Subject: [PATCH 02/17] added required files and moved to gcp tech blog folder --- composer/blog/gcp-cloud-blog/README.md | 2 - composer/blog/gcp-cloud-blog/__init__.py | 0 .../constraints.txt | 1 + .../data_orchestration_blog_sample_dag.py} | 7 ++- ...data_orchestration_blog_sample_dag_test.py | 41 ++++++++++++++++ .../noxfile_config.py | 48 +++++++++++++++++++ .../requirements-test.txt | 2 + .../requirements.txt | 1 + 8 files changed, 99 insertions(+), 3 deletions(-) delete mode 100644 composer/blog/gcp-cloud-blog/README.md delete mode 100644 composer/blog/gcp-cloud-blog/__init__.py create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt rename composer/blog/{gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py => gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py} (91%) create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt diff --git a/composer/blog/gcp-cloud-blog/README.md b/composer/blog/gcp-cloud-blog/README.md deleted file mode 100644 index 30cc4210ffb..00000000000 --- a/composer/blog/gcp-cloud-blog/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# GCP Cloud Blog -The code here is featured in posts found on the [Google Cloud Platform Blog](https://cloud.google.com/blog) \ No newline at end of file diff --git a/composer/blog/gcp-cloud-blog/__init__.py b/composer/blog/gcp-cloud-blog/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt new file mode 100644 index 00000000000..5784f524232 --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt @@ -0,0 +1 @@ +SQLAlchemy==1.3.23 # must be under 1.4 until at least Airflow 2.0 (check airflow setup.py for restrictions) diff --git a/composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py similarity index 91% rename from composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py rename to composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 89bc9de7d11..03a1dc6b9cf 100644 --- a/composer/blog/gcp-cloud-blog/data-orchestration-with-composer/dags/data-orchestration-blog-sample-dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -1,6 +1,7 @@ from airflow.utils import dates from airflow import models from airflow.hooks.base_hook import BaseHook +from airflow.utils.state import State from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator @@ -67,4 +68,8 @@ def on_failure_callback(context): bucket_name=BUCKET_NAME ) -validate_file_exists >> start_dataflow_job >> execute_bigquery_sql >> delete_bucket \ No newline at end of file + validate_file_exists >> start_dataflow_job >> execute_bigquery_sql >> delete_bucket + +if __name__ == "__main__": + crm_workflow_dag.clear(dag_run_state=State.NONE) + crm_workflow_dag.run() diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py new file mode 100644 index 00000000000..c734853b1da --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -0,0 +1,41 @@ +# Copyright 2021 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# https://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import models +import internal_unit_testing +import pytest + +# user should substitute their project ID +PROJECT_ID = 'rds-sandbox' +BUCKET_NAME = 'rds_demo_data' +FILE_NAME = 'incoming_100.txt' +BQ_SQL_STRING = 'SELECT COUNT(*) FROM `rds-demos.iwd_demo.inspiring_women` LIMIT 1000' + +@pytest.fixture(autouse=True, scope="function") +# The fixture `airflow_database` lives in composer/conftest.py. +def set_variables(airflow_database): + models.Variable.set('gcp_project', PROJECT_ID) + models.Variable.set('bucket_name', BUCKET_NAME) + models.Variable.set('file_name', FILE_NAME) + models.Variable.set('bq_sql_string', BQ_SQL_STRING) + yield + models.Variable.delete('gcp_project') + models.Variable.delete('bucket_name') + models.Variable.delete('file_name') + models.Variable.delete('bq_sql_string') + + +def test_dag_import(): + from . import data_orchestration_blog_sample_dag + internal_unit_testing.assert_has_valid_dag(data_orchestration_blog_sample_dag) \ No newline at end of file diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py new file mode 100644 index 00000000000..31a17ad81f9 --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py @@ -0,0 +1,48 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +import tempfile + + +# Airflow creates a config file at the installation, so we want to set +# `AIRFLOW_HOME` envvar before running pytest. + +_tmpdir = tempfile.TemporaryDirectory() + + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + # Skipping for Python 3.9 due to numpy compilation failure. + "ignored_versions": ["2.7", "3.9"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": False, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {"AIRFLOW_HOME": _tmpdir.name}, +} diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt new file mode 100644 index 00000000000..21170ba2145 --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt @@ -0,0 +1,2 @@ +pytest==6.2.1 +git+https://github.com/GoogleCloudPlatform/python-docs-samples.git#egg=dag_test_utils&subdirectory=composer/dag_test_utils # DAG unit test utility diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt new file mode 100644 index 00000000000..ee1387155b3 --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt @@ -0,0 +1 @@ +apache-airflow[gcp]==1.10.14 \ No newline at end of file From 732a6e95bdd861717079f3f5a77d0fe5215c4baa Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Mon, 12 Apr 2021 11:52:40 +0100 Subject: [PATCH 03/17] adding licence to example dag --- .../dags/data_orchestration_blog_sample_dag.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 03a1dc6b9cf..440eac5724d 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -1,3 +1,17 @@ +# Copyright 2021 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# https://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from airflow.utils import dates from airflow import models from airflow.hooks.base_hook import BaseHook From 2711943ab7bea8eac1abb49a1f74bddd1b3604e5 Mon Sep 17 00:00:00 2001 From: rachael-ds <45947385+rachael-ds@users.noreply.github.com> Date: Tue, 13 Apr 2021 09:51:08 +0100 Subject: [PATCH 04/17] Apply suggestions from code review Commits from Leah Cole - replace my specific environment details with generic environment strings Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> --- .../dags/data_orchestration_blog_sample_dag_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py index c734853b1da..9c2344cd384 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -17,10 +17,10 @@ import pytest # user should substitute their project ID -PROJECT_ID = 'rds-sandbox' -BUCKET_NAME = 'rds_demo_data' +PROJECT_ID = 'your-project-id' +BUCKET_NAME = 'your-bucket-name' FILE_NAME = 'incoming_100.txt' -BQ_SQL_STRING = 'SELECT COUNT(*) FROM `rds-demos.iwd_demo.inspiring_women` LIMIT 1000' +BQ_SQL_STRING = 'SELECT COUNT(*) FROM `your-project-id.iwd_demo.inspiring_women` LIMIT 1000' @pytest.fixture(autouse=True, scope="function") # The fixture `airflow_database` lives in composer/conftest.py. @@ -38,4 +38,4 @@ def set_variables(airflow_database): def test_dag_import(): from . import data_orchestration_blog_sample_dag - internal_unit_testing.assert_has_valid_dag(data_orchestration_blog_sample_dag) \ No newline at end of file + internal_unit_testing.assert_has_valid_dag(data_orchestration_blog_sample_dag) From 928681eb855e6a18d7ed7a3e5adb81c64fd37881 Mon Sep 17 00:00:00 2001 From: rachael-ds <45947385+rachael-ds@users.noreply.github.com> Date: Tue, 13 Apr 2021 16:20:50 +0100 Subject: [PATCH 05/17] Update composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> --- .../data-orchestration-with-composer/requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt index ee1387155b3..689304bfbbd 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt @@ -1 +1,2 @@ -apache-airflow[gcp]==1.10.14 \ No newline at end of file +apache-airflow[gcp]==1.10.14 +apache-airflow-backport-providers-google==2021.3.3 From 9e6be491594158d4f4c530937151ac0f3724d388 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Tue, 13 Apr 2021 17:50:45 +0100 Subject: [PATCH 06/17] updated dataflow template and variables --- .../data_orchestration_blog_sample_dag.py | 30 ++++++++++++------- ...data_orchestration_blog_sample_dag_test.py | 15 ++++++---- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 440eac5724d..3b886c09c5f 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -22,10 +22,15 @@ from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +# Assumes existence of the following Airflow Variables +PROJECT_ID = models.Variable.get("gcp_project") BUCKET_NAME = models.Variable.get("bucket_name") -FILE_NAME = models.Variable.get("file_name") -BQ_SQL_STRING = models.Variable.get("bq_sql_string") +DATA_FILE_NAME = models.Variable.get("file_name") +DATASET = models.Variable.get("bigquery_dataset") +TABLE = models.Variable.get("bigquery_table") +# Slack error notification example taken from Kaxil Naik's blog on Slack Integratin: +# https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105 def on_failure_callback(context): ti = context.get('task_instance') slack_msg = """ @@ -50,7 +55,7 @@ def on_failure_callback(context): slack_error.execute(context) with models.DAG( - 'transform-crm-workload', + 'transform_crm_workload', schedule_interval=None, start_date=dates.days_ago(0), default_args={ 'on_failure_callback': on_failure_callback} @@ -59,21 +64,26 @@ def on_failure_callback(context): validate_file_exists = GCSObjectExistenceSensor( task_id="validate_file_exists", bucket=BUCKET_NAME, - object=FILE_NAME + object=DATA_FILE_NAME ) start_dataflow_job = DataflowTemplatedJobStartOperator( task_id="start-dataflow-template-job", - job_name='crm_wordcount', - template='gs://dataflow-templates/latest/Word_Count', + job_name='crm_customers_transform', + template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", parameters={ - 'inputFile': "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=FILE_NAME), - 'output': "gs://{bucket}/output".format(bucket=BUCKET_NAME)} + "javascriptTextTransformGcsPath": "gs://{bucket}/crm_transform_udf.js".format(bucket=BUCKET_NAME), + "javascriptTextTransformFunctionName": "transform", + "JSONPath": "gs://{bucket}/crm_schema.json".format(bucket=BUCKET_NAME), + "outputTable": "{project_id}:{dataset}.{table}".format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE), + "inputFilePattern": "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=DATA_FILE_NAME), + "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format(bucket=BUCKET_NAME) + } ) - + execute_bigquery_sql = BigQueryCheckOperator( task_id='execute_bigquery_sql', - sql=BQ_SQL_STRING, + sql='SELECT COUNT(*) FROM `{project_id}.{dataset}.{table}` LIMIT 1000'.format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE), use_legacy_sql=False ) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py index 9c2344cd384..f429598594d 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -19,22 +19,25 @@ # user should substitute their project ID PROJECT_ID = 'your-project-id' BUCKET_NAME = 'your-bucket-name' -FILE_NAME = 'incoming_100.txt' -BQ_SQL_STRING = 'SELECT COUNT(*) FROM `your-project-id.iwd_demo.inspiring_women` LIMIT 1000' +DATA_FILE_NAME = 'gcs://tbc' +DATASET = 'your-output-dataset' +TABLE = 'your-output-table' + @pytest.fixture(autouse=True, scope="function") # The fixture `airflow_database` lives in composer/conftest.py. def set_variables(airflow_database): models.Variable.set('gcp_project', PROJECT_ID) models.Variable.set('bucket_name', BUCKET_NAME) - models.Variable.set('file_name', FILE_NAME) - models.Variable.set('bq_sql_string', BQ_SQL_STRING) + models.Variable.set('file_name', DATA_FILE_NAME) + models.Variable.set('bigquery_dataset', DATASET) + models.Variable.set('bigquery_table', TABLE) yield models.Variable.delete('gcp_project') models.Variable.delete('bucket_name') models.Variable.delete('file_name') - models.Variable.delete('bq_sql_string') - + models.Variable.delete('bigquery_dataset') + models.Variable.delete('bigquery_table') def test_dag_import(): from . import data_orchestration_blog_sample_dag From f2dad4724b303771e4439bfdb3589cf1108ba3d0 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Fri, 23 Apr 2021 10:06:58 +0100 Subject: [PATCH 07/17] updated source destinations and added init.py --- .../__init__.py | 0 .../dags/__init__.py | 0 .../data_orchestration_blog_sample_dag.py | 41 +++++++++---------- ...data_orchestration_blog_sample_dag_test.py | 11 +---- 4 files changed, 22 insertions(+), 30 deletions(-) create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/__init__.py create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/__init__.py diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/__init__.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/__init__.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 3b886c09c5f..2f69b761b79 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -18,18 +18,19 @@ from airflow.utils.state import State from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator -from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +# Sample data +BUCKET_NAME = 'gs://cloud-samples-data/composer/data-orchestration-blog-example' +DATA_FILE_NAME = 'bike_station_data.csv' + # Assumes existence of the following Airflow Variables PROJECT_ID = models.Variable.get("gcp_project") -BUCKET_NAME = models.Variable.get("bucket_name") -DATA_FILE_NAME = models.Variable.get("file_name") DATASET = models.Variable.get("bigquery_dataset") TABLE = models.Variable.get("bigquery_table") -# Slack error notification example taken from Kaxil Naik's blog on Slack Integratin: +# Slack error notification example taken from Kaxil Naik's blog on Slack Integration: # https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105 def on_failure_callback(context): ti = context.get('task_instance') @@ -55,11 +56,11 @@ def on_failure_callback(context): slack_error.execute(context) with models.DAG( - 'transform_crm_workload', + 'dataflow_to_bq_workflow', schedule_interval=None, - start_date=dates.days_ago(0), - default_args={ 'on_failure_callback': on_failure_callback} -) as crm_workflow_dag: + start_date=dates.days_ago(1), + default_args={'on_failure_callback': on_failure_callback} +) as dag: validate_file_exists = GCSObjectExistenceSensor( task_id="validate_file_exists", @@ -67,17 +68,20 @@ def on_failure_callback(context): object=DATA_FILE_NAME ) +# See Launching Dataflow pipelines with Cloud Composer tutorial for further guidance +# https://cloud.google.com/composer/docs/how-to/using/using-dataflow-template-operator + start_dataflow_job = DataflowTemplatedJobStartOperator( task_id="start-dataflow-template-job", - job_name='crm_customers_transform', + job_name='csv_to_bq_transform', template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", parameters={ - "javascriptTextTransformGcsPath": "gs://{bucket}/crm_transform_udf.js".format(bucket=BUCKET_NAME), "javascriptTextTransformFunctionName": "transform", - "JSONPath": "gs://{bucket}/crm_schema.json".format(bucket=BUCKET_NAME), - "outputTable": "{project_id}:{dataset}.{table}".format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE), + "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format(bucket=BUCKET_NAME), + "JSONPath": "gs://{bucket}/bq_schema.json".format(bucket=BUCKET_NAME), "inputFilePattern": "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=DATA_FILE_NAME), - "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format(bucket=BUCKET_NAME) + "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format(bucket=BUCKET_NAME), + "outputTable": "{project_id}:{dataset}.{table}".format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE) } ) @@ -87,13 +91,8 @@ def on_failure_callback(context): use_legacy_sql=False ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=BUCKET_NAME - ) - - validate_file_exists >> start_dataflow_job >> execute_bigquery_sql >> delete_bucket + validate_file_exists >> start_dataflow_job >> execute_bigquery_sql if __name__ == "__main__": - crm_workflow_dag.clear(dag_run_state=State.NONE) - crm_workflow_dag.run() + dag.clear(dag_run_state=State.NONE) + dag.run() diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py index f429598594d..c978bdad15f 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -18,24 +18,17 @@ # user should substitute their project ID PROJECT_ID = 'your-project-id' -BUCKET_NAME = 'your-bucket-name' -DATA_FILE_NAME = 'gcs://tbc' -DATASET = 'your-output-dataset' -TABLE = 'your-output-table' - +DATASET = 'your-bq-output-dataset' +TABLE = 'your-bq-output-table' @pytest.fixture(autouse=True, scope="function") # The fixture `airflow_database` lives in composer/conftest.py. def set_variables(airflow_database): models.Variable.set('gcp_project', PROJECT_ID) - models.Variable.set('bucket_name', BUCKET_NAME) - models.Variable.set('file_name', DATA_FILE_NAME) models.Variable.set('bigquery_dataset', DATASET) models.Variable.set('bigquery_table', TABLE) yield models.Variable.delete('gcp_project') - models.Variable.delete('bucket_name') - models.Variable.delete('file_name') models.Variable.delete('bigquery_dataset') models.Variable.delete('bigquery_table') From 89767b8765afbdd08917f77944974471eb3dccc0 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Fri, 23 Apr 2021 11:51:49 +0100 Subject: [PATCH 08/17] added beam package requirement and lint changes --- .../README.md | 3 +++ .../data_orchestration_blog_sample_dag.py | 24 +++++++++++-------- ...data_orchestration_blog_sample_dag_test.py | 2 ++ .../requirements.txt | 1 + 4 files changed, 20 insertions(+), 10 deletions(-) create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md new file mode 100644 index 00000000000..56a8c512a7f --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md @@ -0,0 +1,3 @@ +# Data Orchestration with Cloud Composer + +Code found in this directory is part of a [blog post](https://cloud.google.com/blog/topics/developers-practitioners) published in April 2021 \ No newline at end of file diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 2f69b761b79..56761a29f67 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.utils import dates from airflow import models + from airflow.hooks.base_hook import BaseHook -from airflow.utils.state import State from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.utils.dates import days_ago +from airflow.utils.state import State + # Sample data BUCKET_NAME = 'gs://cloud-samples-data/composer/data-orchestration-blog-example' DATA_FILE_NAME = 'bike_station_data.csv' @@ -30,16 +32,17 @@ DATASET = models.Variable.get("bigquery_dataset") TABLE = models.Variable.get("bigquery_table") + # Slack error notification example taken from Kaxil Naik's blog on Slack Integration: # https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105 def on_failure_callback(context): ti = context.get('task_instance') slack_msg = """ - :red_circle: Task Failed. - *Task*: {task} - *Dag*: {dag} - *Execution Time*: {exec_date} - *Log Url*: {log_url} + :red_circle: Task Failed. + *Task*: {task} + *Dag*: {dag} + *Execution Time*: {exec_date} + *Log Url*: {log_url} """.format( task=ti.task_id, dag=ti.dag_id, @@ -55,10 +58,11 @@ def on_failure_callback(context): message=slack_msg) slack_error.execute(context) + with models.DAG( 'dataflow_to_bq_workflow', schedule_interval=None, - start_date=dates.days_ago(1), + start_date=days_ago(1), default_args={'on_failure_callback': on_failure_callback} ) as dag: @@ -70,14 +74,13 @@ def on_failure_callback(context): # See Launching Dataflow pipelines with Cloud Composer tutorial for further guidance # https://cloud.google.com/composer/docs/how-to/using/using-dataflow-template-operator - start_dataflow_job = DataflowTemplatedJobStartOperator( task_id="start-dataflow-template-job", job_name='csv_to_bq_transform', template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", parameters={ "javascriptTextTransformFunctionName": "transform", - "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format(bucket=BUCKET_NAME), + "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format(bucket=BUCKET_NAME), "JSONPath": "gs://{bucket}/bq_schema.json".format(bucket=BUCKET_NAME), "inputFilePattern": "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=DATA_FILE_NAME), "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format(bucket=BUCKET_NAME), @@ -93,6 +96,7 @@ def on_failure_callback(context): validate_file_exists >> start_dataflow_job >> execute_bigquery_sql + if __name__ == "__main__": dag.clear(dag_run_state=State.NONE) dag.run() diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py index c978bdad15f..86b7dcaeeaa 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -21,6 +21,7 @@ DATASET = 'your-bq-output-dataset' TABLE = 'your-bq-output-table' + @pytest.fixture(autouse=True, scope="function") # The fixture `airflow_database` lives in composer/conftest.py. def set_variables(airflow_database): @@ -32,6 +33,7 @@ def set_variables(airflow_database): models.Variable.delete('bigquery_dataset') models.Variable.delete('bigquery_table') + def test_dag_import(): from . import data_orchestration_blog_sample_dag internal_unit_testing.assert_has_valid_dag(data_orchestration_blog_sample_dag) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt index 689304bfbbd..f53289b8594 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt @@ -1,2 +1,3 @@ apache-airflow[gcp]==1.10.14 apache-airflow-backport-providers-google==2021.3.3 +apache-airflow-backport-providers-apache-beam==2021.3.13 \ No newline at end of file From 2100efc0653c1cc6c25b1f907ceadd3b3bca249e Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Fri, 23 Apr 2021 12:50:47 +0100 Subject: [PATCH 09/17] updated slack package and further lint changes --- .../dags/data_orchestration_blog_sample_dag.py | 8 ++++---- .../data-orchestration-with-composer/requirements.txt | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 56761a29f67..aeffe596e45 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -15,10 +15,10 @@ from airflow import models from airflow.hooks.base_hook import BaseHook -from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor -from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator -from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator +from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor +from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator from airflow.utils.dates import days_ago from airflow.utils.state import State @@ -80,7 +80,7 @@ def on_failure_callback(context): template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", parameters={ "javascriptTextTransformFunctionName": "transform", - "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format(bucket=BUCKET_NAME), + "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format(bucket=BUCKET_NAME), "JSONPath": "gs://{bucket}/bq_schema.json".format(bucket=BUCKET_NAME), "inputFilePattern": "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=DATA_FILE_NAME), "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format(bucket=BUCKET_NAME), diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt index f53289b8594..73a91e8255b 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt @@ -1,3 +1,5 @@ apache-airflow[gcp]==1.10.14 +apache-airflow-backport-providers-apache-beam==2021.3.13 apache-airflow-backport-providers-google==2021.3.3 -apache-airflow-backport-providers-apache-beam==2021.3.13 \ No newline at end of file +apache-airflow-backport-providers-http==2021.4.10 +apache-airflow-backport-providers-slack==2021.3.3 \ No newline at end of file From 5b5b929c6821ba5c9b8954c46c03237495d7696a Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Fri, 23 Apr 2021 13:09:26 +0100 Subject: [PATCH 10/17] lint fix --- .../dags/data_orchestration_blog_sample_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index aeffe596e45..9297621b605 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -87,7 +87,7 @@ def on_failure_callback(context): "outputTable": "{project_id}:{dataset}.{table}".format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE) } ) - + execute_bigquery_sql = BigQueryCheckOperator( task_id='execute_bigquery_sql', sql='SELECT COUNT(*) FROM `{project_id}.{dataset}.{table}` LIMIT 1000'.format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE), From ff62adb669e8cf9465977bc7a1c88cba6928d15b Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Mon, 26 Apr 2021 18:03:57 +0100 Subject: [PATCH 11/17] trigger build From ba8e02b970eb9cfa55eefa28747091dec307e9e1 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Tue, 8 Jun 2021 16:37:02 +0100 Subject: [PATCH 12/17] updated to use Airflow 2.0 Signed-off-by: rachael-ds --- .../README.md | 2 +- .../conftest.py | 61 +++++++++++++++++++ .../data_orchestration_blog_sample_dag.py | 2 +- .../noxfile_config.py | 7 ++- .../requirements-test.txt | 4 +- .../requirements.txt | 9 ++- 6 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md index 56a8c512a7f..7d0e250dd81 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/README.md @@ -1,3 +1,3 @@ # Data Orchestration with Cloud Composer -Code found in this directory is part of a [blog post](https://cloud.google.com/blog/topics/developers-practitioners) published in April 2021 \ No newline at end of file +Code found in this directory is part of a [blog post](https://cloud.google.com/blog/topics/developers-practitioners) published in June 2021 \ No newline at end of file diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py new file mode 100644 index 00000000000..0bdc0dbd4f0 --- /dev/null +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py @@ -0,0 +1,61 @@ +# Copyright 2019 Google LLC All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import tempfile + +import pytest + + +# this fixture initializes the Airflow DB once per session +# it is used by DAGs in both the blogs and workflows directories, +# unless there exists a conftest at a lower level +@pytest.fixture(scope="session") +def airflow_database(): + import airflow.utils.db + + # We use separate directory for local db path per session + # by setting AIRFLOW_HOME env var, which is done in noxfile_config.py. + + assert('AIRFLOW_HOME' in os.environ) + + airflow_home = os.environ["AIRFLOW_HOME"] + airflow_db = f"{airflow_home}/airflow.db" + + # reset both resets and initializes a new database + airflow.utils.db.resetdb() + + # Making sure we are using a data file there. + assert(os.path.isfile(airflow_db)) + +# this fixture initializes the Airflow DB once per session +# it is used by DAGs in both the blogs and workflows directories +@pytest.fixture(scope="session") +def airflow_database(): + import airflow.utils.db + + # We use separate directory for local db path per session + # by setting AIRFLOW_HOME env var, which is done in noxfile_config.py. + + assert('AIRFLOW_HOME' in os.environ) + + airflow_home = os.environ["AIRFLOW_HOME"] + airflow_db = f"{airflow_home}/airflow.db" + + # reset both resets and initializes a new database + airflow.utils.db.resetdb() + + # Making sure we are using a data file there. + assert(os.path.isfile(airflow_db)) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 9297621b605..06a117d39d2 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -14,7 +14,7 @@ from airflow import models -from airflow.hooks.base_hook import BaseHook +from airflow.hooks.base import BaseHook from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py index 31a17ad81f9..87a9d218289 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py @@ -31,8 +31,7 @@ TEST_CONFIG_OVERRIDE = { # You can opt out from the test for specific Python versions. - # Skipping for Python 3.9 due to numpy compilation failure. - "ignored_versions": ["2.7", "3.9"], + "ignored_versions": ["2.7", "3.6", "3.7", "3.9"], # Composer w/ Airflow 2 only supports Python 3.8 # Old samples are opted out of enforcing Python type hints # All new samples should feature them "enforce_type_hints": False, @@ -42,6 +41,10 @@ # to use your own Cloud project. "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": "20.2.4", # A dictionary you want to inject into your test. Don't put any # secrets here. These values will override predefined values. "envs": {"AIRFLOW_HOME": _tmpdir.name}, diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt index 21170ba2145..17d9dabaee1 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements-test.txt @@ -1,2 +1,2 @@ -pytest==6.2.1 -git+https://github.com/GoogleCloudPlatform/python-docs-samples.git#egg=dag_test_utils&subdirectory=composer/dag_test_utils # DAG unit test utility +pytest==6.2.4 +cloud-composer-dag-test-utils==1.0.0 \ No newline at end of file diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt index 73a91e8255b..060b683e35b 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt @@ -1,5 +1,4 @@ -apache-airflow[gcp]==1.10.14 -apache-airflow-backport-providers-apache-beam==2021.3.13 -apache-airflow-backport-providers-google==2021.3.3 -apache-airflow-backport-providers-http==2021.4.10 -apache-airflow-backport-providers-slack==2021.3.3 \ No newline at end of file +apache-airflow[google]==2.1.0 +apache-airflow-providers-apache-beam==2.0.0 +apache-airflow-providers-slack==3.0.0 +apache-airflow-providers-http==1.1.1 \ No newline at end of file From edd4aac37cd83eff3a13b270f1b97998bfdffc03 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Tue, 8 Jun 2021 16:50:10 +0100 Subject: [PATCH 13/17] Fixed conftest.py file header and lint issues Signed-off-by: rachael-ds --- .../conftest.py | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py index 0bdc0dbd4f0..909f0429bcf 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py @@ -1,11 +1,11 @@ -# Copyright 2019 Google LLC All Rights Reserved. -# +# Copyright 2021 Google LLC + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# + +# https://www.apache.org/licenses/LICENSE-2.0 + # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,13 +14,12 @@ import os -import tempfile import pytest # this fixture initializes the Airflow DB once per session -# it is used by DAGs in both the blogs and workflows directories, +# it is used by DAGs in both the blogs and workflows directories, # unless there exists a conftest at a lower level @pytest.fixture(scope="session") def airflow_database(): @@ -39,23 +38,3 @@ def airflow_database(): # Making sure we are using a data file there. assert(os.path.isfile(airflow_db)) - -# this fixture initializes the Airflow DB once per session -# it is used by DAGs in both the blogs and workflows directories -@pytest.fixture(scope="session") -def airflow_database(): - import airflow.utils.db - - # We use separate directory for local db path per session - # by setting AIRFLOW_HOME env var, which is done in noxfile_config.py. - - assert('AIRFLOW_HOME' in os.environ) - - airflow_home = os.environ["AIRFLOW_HOME"] - airflow_db = f"{airflow_home}/airflow.db" - - # reset both resets and initializes a new database - airflow.utils.db.resetdb() - - # Making sure we are using a data file there. - assert(os.path.isfile(airflow_db)) From 72a80fba98fbc04721183fa6f4c640913acf9ac3 Mon Sep 17 00:00:00 2001 From: rachael-ds <45947385+rachael-ds@users.noreply.github.com> Date: Wed, 9 Jun 2021 08:32:55 +0100 Subject: [PATCH 14/17] Apply suggestions from code review Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> --- .../constraints.txt | 489 +++++++++++++++++- ...data_orchestration_blog_sample_dag_test.py | 2 +- .../requirements.txt | 4 +- 3 files changed, 491 insertions(+), 4 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt index 5784f524232..40217ff8887 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/constraints.txt @@ -1 +1,488 @@ -SQLAlchemy==1.3.23 # must be under 1.4 until at least Airflow 2.0 (check airflow setup.py for restrictions) +# This file is from https://raw.githubusercontent.com/apache/airflow/constraints-2.0.1/constraints-3.8.txt +APScheduler==3.7.0 +Authlib==0.15.3 +Babel==2.9.1 +Flask-AppBuilder==3.3.0 +Flask-Babel==1.0.0 +Flask-Bcrypt==0.7.1 +Flask-Caching==1.10.1 +Flask-JWT-Extended==3.25.1 +Flask-Login==0.5.0 +Flask-OAuthlib==0.9.6 +Flask-OpenID==1.2.5 +Flask-SQLAlchemy==2.5.1 +Flask-WTF==0.15.1 +Flask==1.1.2 +GitPython==3.1.17 +HeapDict==1.0.1 +JPype1==1.2.1 +JayDeBeApi==1.2.3 +Jinja2==2.11.3 +Mako==1.1.4 +Markdown==3.3.3 +MarkupSafe==1.1.1 +PyHive==0.6.4 +PyJWT==1.7.1 +PyNaCl==1.4.0 +PySmbClient==0.1.5 +PyYAML==5.4.1 +Pygments==2.8.0 +SQLAlchemy-JSONField==1.0.0 +SQLAlchemy-Utils==0.36.8 +SQLAlchemy==1.3.23 +Sphinx==3.5.4 +Unidecode==1.2.0 +WTForms==2.3.3 +Werkzeug==1.0.1 +adal==1.2.7 +aiohttp==3.7.4.post0 +alabaster==0.7.12 +alembic==1.6.5 +amqp==2.6.1 +analytics-python==1.3.1 +ansiwrap==0.8.4 +apache-airflow-providers-amazon==1.4.0 +apache-airflow-providers-apache-cassandra==1.0.1 +apache-airflow-providers-apache-druid==1.1.0 +apache-airflow-providers-apache-hdfs==1.0.1 +apache-airflow-providers-apache-hive==1.0.3 +apache-airflow-providers-apache-kylin==1.0.1 +apache-airflow-providers-apache-livy==1.1.0 +apache-airflow-providers-apache-pig==1.0.1 +apache-airflow-providers-apache-pinot==1.0.1 +apache-airflow-providers-apache-spark==1.0.3 +apache-airflow-providers-apache-sqoop==1.0.1 +apache-airflow-providers-celery==1.0.1 +apache-airflow-providers-cloudant==1.0.1 +apache-airflow-providers-cncf-kubernetes==1.2.0 +apache-airflow-providers-databricks==1.0.1 +apache-airflow-providers-datadog==1.0.1 +apache-airflow-providers-dingding==1.0.2 +apache-airflow-providers-discord==1.0.1 +apache-airflow-providers-docker==1.2.0 +apache-airflow-providers-elasticsearch==1.0.4 +apache-airflow-providers-exasol==1.1.1 +apache-airflow-providers-facebook==1.1.0 +apache-airflow-providers-ftp==1.1.0 +apache-airflow-providers-google==1.0.0 +apache-airflow-providers-grpc==1.1.0 +apache-airflow-providers-hashicorp==1.0.2 +apache-airflow-providers-http==1.1.1 +apache-airflow-providers-imap==1.0.1 +apache-airflow-providers-jdbc==1.0.1 +apache-airflow-providers-jenkins==1.1.0 +apache-airflow-providers-jira==1.0.2 +apache-airflow-providers-microsoft-azure==1.3.0 +apache-airflow-providers-microsoft-mssql==1.1.0 +apache-airflow-providers-microsoft-winrm==1.2.0 +apache-airflow-providers-mongo==1.0.1 +apache-airflow-providers-mysql==1.1.0 +apache-airflow-providers-odbc==1.0.1 +apache-airflow-providers-openfaas==1.1.1 +apache-airflow-providers-opsgenie==1.0.2 +apache-airflow-providers-oracle==1.1.0 +apache-airflow-providers-pagerduty==1.0.1 +apache-airflow-providers-papermill==1.0.2 +apache-airflow-providers-plexus==1.0.1 +apache-airflow-providers-postgres==1.0.2 +apache-airflow-providers-presto==1.0.2 +apache-airflow-providers-qubole==1.0.2 +apache-airflow-providers-redis==1.0.1 +apache-airflow-providers-salesforce==1.0.1 +apache-airflow-providers-samba==1.0.1 +apache-airflow-providers-segment==1.0.1 +apache-airflow-providers-sendgrid==1.0.2 +apache-airflow-providers-sftp==1.2.0 +apache-airflow-providers-singularity==1.1.0 +apache-airflow-providers-slack==2.0.0 +apache-airflow-providers-snowflake==1.3.0 +apache-airflow-providers-sqlite==1.0.2 +apache-airflow-providers-ssh==1.3.0 +apache-airflow-providers-telegram==1.0.2 +apache-airflow-providers-vertica==1.0.1 +apache-airflow-providers-yandex==1.0.1 +apache-airflow-providers-zendesk==1.0.1 +apipkg==1.5 +apispec==3.3.2 +appdirs==1.4.4 +argcomplete==1.12.3 +arrow==0.17.0 +asn1crypto==1.4.0 +astroid==2.5.7 +async-generator==1.10 +async-timeout==3.0.1 +atlasclient==1.0.0 +attrs==20.3.0 +aws-sam-translator==1.35.0 +aws-xray-sdk==2.8.0 +azure-batch==10.0.0 +azure-common==1.1.27 +azure-core==1.14.0 +azure-cosmos==3.2.0 +azure-datalake-store==0.0.52 +azure-identity==1.6.0 +azure-keyvault-certificates==4.2.1 +azure-keyvault-keys==4.3.1 +azure-keyvault-secrets==4.2.0 +azure-keyvault==4.1.0 +azure-kusto-data==0.1.0 +azure-mgmt-containerinstance==1.5.0 +azure-mgmt-core==1.2.2 +azure-mgmt-datalake-nspkg==3.0.1 +azure-mgmt-datalake-store==0.5.0 +azure-mgmt-nspkg==3.0.2 +azure-mgmt-resource==15.0.0 +azure-nspkg==3.0.2 +azure-storage-blob==12.8.1 +azure-storage-common==2.1.0 +azure-storage-file==2.1.0 +backcall==0.2.0 +bcrypt==3.2.0 +beautifulsoup4==4.9.3 +billiard==3.6.4.0 +black==20.8b1 +blinker==1.4 +boto3==1.15.18 +boto==2.49.0 +botocore==1.20.84 +bowler==0.9.0 +cached-property==1.5.2 +cachetools==4.2.2 +cassandra-driver==3.25.0 +cattrs==1.7.1 +celery==4.4.7 +certifi==2020.12.5 +cffi==1.14.5 +cfgv==3.3.0 +cfn-lint==0.49.2 +cgroupspy==0.1.6 +chardet==3.0.4 +click==7.1.2 +clickclick==20.10.2 +cloudant==2.14.0 +cloudpickle==1.6.0 +colorama==0.4.4 +colorlog==4.8.0 +commonmark==0.9.1 +connexion==2.7.0 +coverage==5.5 +croniter==0.3.37 +cryptography==3.4.5 +curlify==2.2.1 +cx-Oracle==8.2.0 +dask==2021.5.1 +datadog==0.41.0 +decorator==4.4.2 +defusedxml==0.7.1 +dill==0.3.3 +distlib==0.3.2 +distributed==2.30.1 +dnspython==1.16.0 +docker-pycreds==0.4.0 +docker==3.7.3 +docopt==0.6.2 +docutils==0.17.1 +ecdsa==0.17.0 +elasticsearch-dbapi==0.2.3 +elasticsearch-dsl==7.3.0 +elasticsearch==7.13.0 +email-validator==1.1.2 +entrypoints==0.3 +eventlet==0.31.0 +execnet==1.8.1 +facebook-business==9.0.3 +fastavro==1.4.1 +filelock==3.0.12 +fissix==20.8.0 +flake8-colors==0.1.9 +flake8==3.9.2 +flaky==3.7.0 +flower==0.9.7 +freezegun==1.1.0 +fsspec==0.9.0 +future==0.18.2 +gcsfs==0.7.2 +gevent==21.1.2 +gitdb==4.0.7 +github3.py==1.3.0 +google-ads==7.0.0 +google-api-core==1.26.0 +google-api-python-client==1.12.8 +google-auth-httplib2==0.0.4 +google-auth-oauthlib==0.4.2 +google-auth==1.26.1 +google-cloud-automl==1.0.1 +google-cloud-bigquery-datatransfer==1.1.1 +google-cloud-bigquery-storage==2.2.1 +google-cloud-bigquery==2.8.0 +google-cloud-bigtable==1.7.0 +google-cloud-container==1.0.1 +google-cloud-core==1.6.0 +google-cloud-datacatalog==0.8.0 +google-cloud-dataproc==1.1.1 +google-cloud-dlp==1.0.0 +google-cloud-kms==1.4.0 +google-cloud-language==1.3.0 +google-cloud-logging==1.15.1 +google-cloud-memcache==0.3.0 +google-cloud-monitoring==1.1.0 +google-cloud-os-login==1.0.0 +google-cloud-pubsub==1.7.0 +google-cloud-redis==1.0.0 +google-cloud-secret-manager==1.0.0 +google-cloud-spanner==1.19.1 +google-cloud-speech==1.3.2 +google-cloud-storage==1.36.0 +google-cloud-tasks==1.5.0 +google-cloud-texttospeech==1.0.1 +google-cloud-translate==1.7.0 +google-cloud-videointelligence==1.16.1 +google-cloud-vision==1.0.0 +google-crc32c==1.1.2 +google-resumable-media==1.3.0 +googleapis-common-protos==1.52.0 +graphviz==0.16 +greenlet==1.1.0 +grpc-google-iam-v1==0.12.3 +grpcio-gcp==0.2.2 +grpcio==1.35.0 +gunicorn==19.10.0 +hdfs==2.6.0 +hmsclient==0.1.1 +httplib2==0.19.0 +humanize==3.6.0 +hvac==0.10.14 +identify==1.6.2 +idna==2.10 +imagesize==1.2.0 +importlib-metadata==1.7.0 +importlib-resources==1.5.0 +inflection==0.5.1 +iniconfig==1.1.1 +ipdb==0.13.8 +ipython-genutils==0.2.0 +ipython==7.24.0 +iso8601==0.1.14 +isodate==0.6.0 +isort==5.8.0 +itsdangerous==1.1.0 +jedi==0.18.0 +jira==2.0.0 +jmespath==0.10.0 +json-merge-patch==0.2 +jsondiff==1.3.0 +jsonpatch==1.32 +jsonpath-ng==1.5.2 +jsonpickle==2.0.0 +jsonpointer==2.0 +jsonschema==3.2.0 +junit-xml==1.9 +jupyter-client==6.1.12 +jupyter-core==4.7.1 +jwcrypto==0.8 +kombu==4.6.11 +kubernetes==11.0.0 +kylinpy==2.8.4 +lazy-object-proxy==1.4.3 +ldap3==2.9 +libcst==0.3.19 +lockfile==0.12.2 +marshmallow-enum==1.5.1 +marshmallow-oneofschema==2.1.0 +marshmallow-sqlalchemy==0.23.1 +marshmallow==3.10.0 +mccabe==0.6.1 +mock==4.0.2 +mongomock==3.22.1 +more-itertools==8.7.0 +moreorless==0.3.0 +moto==1.3.16 +msal-extensions==0.3.0 +msal==1.9.0 +msgpack==1.0.2 +msrest==0.6.21 +msrestazure==0.6.4 +multi-key-dict==2.0.3 +multidict==5.1.0 +mypy-extensions==0.4.3 +mypy==0.812 +mysql-connector-python==8.0.25 +mysqlclient==1.3.14 +natsort==7.1.1 +nbclient==0.5.3 +nbformat==5.1.3 +nest-asyncio==1.5.1 +networkx==2.5.1 +nodeenv==1.5.0 +nteract-scrapbook==0.4.2 +ntlm-auth==1.5.0 +numpy==1.20.3 +oauthlib==2.1.0 +openapi-spec-validator==0.2.9 +oscrypto==1.2.1 +packaging==20.9 +pandas-gbq==0.14.1 +pandas==1.2.4 +papermill==2.3.3 +parameterized==0.8.1 +paramiko==2.7.2 +parso==0.8.2 +pathspec==0.8.1 +pbr==5.6.0 +pdpyras==4.1.2 +pendulum==2.1.2 +pexpect==4.8.0 +pickleshare==0.7.5 +pinotdb==0.3.3 +pipdeptree==2.0.0 +pluggy==0.13.1 +ply==3.11 +portalocker==1.7.1 +pre-commit==2.10.1 +presto-python-client==0.7.0 +prison==0.1.3 +prometheus-client==0.8.0 +prompt-toolkit==3.0.18 +proto-plus==1.13.0 +protobuf==3.14.0 +psutil==5.8.0 +psycopg2-binary==2.8.6 +ptyprocess==0.7.0 +py4j==0.10.9.2 +py==1.10.0 +pyOpenSSL==19.1.0 +pyarrow==3.0.0 +pyasn1-modules==0.2.8 +pyasn1==0.4.8 +pycodestyle==2.6.0 +pycountry==20.7.3 +pycparser==2.20 +pycryptodomex==3.10.1 +pydata-google-auth==1.1.0 +pydruid==0.6.2 +pyenchant==3.2.0 +pyexasol==0.17.0 +pyflakes==2.2.0 +pykerberos==1.2.1 +pylint==2.6.0 +pymongo==3.11.4 +pymssql==2.1.5 +pyodbc==4.0.30 +pyparsing==2.4.7 +pyrsistent==0.17.3 +pysftp==0.2.9 +pyspark==3.0.1 +pytest-cov==2.11.1 +pytest-forked==1.3.0 +pytest-instafail==0.4.2 +pytest-rerunfailures==9.1.1 +pytest-timeouts==1.2.1 +pytest-xdist==2.2.1 +pytest==6.2.2 +python-daemon==2.2.4 +python-dateutil==2.8.1 +python-editor==1.0.4 +python-http-client==3.3.2 +python-jenkins==1.7.0 +python-jose==3.2.0 +python-ldap==3.3.1 +python-nvd3==0.15.0 +python-slugify==4.0.1 +python-telegram-bot==13.0 +python3-openid==3.2.0 +pytz==2020.5 +pytzdata==2020.1 +pywinrm==0.4.2 +pyzmq==22.0.3 +qds-sdk==1.16.1 +redis==3.5.3 +regex==2020.11.13 +requests-kerberos==0.12.0 +requests-mock==1.8.0 +requests-ntlm==1.1.0 +requests-oauthlib==1.1.0 +requests-toolbelt==0.9.1 +requests==2.23.0 +responses==0.12.1 +rich==9.2.0 +rsa==4.7 +s3transfer==0.3.4 +sasl==0.2.1 +semver==2.13.0 +sendgrid==6.6.0 +sentinels==1.0.0 +sentry-sdk==0.20.3 +setproctitle==1.2.2 +simple-salesforce==1.10.1 +six==1.15.0 +slack-sdk==3.3.2 +slackclient==2.9.3 +smmap==3.0.5 +snakebite-py3==3.0.5 +snowballstemmer==2.1.0 +snowflake-connector-python==2.3.10 +snowflake-sqlalchemy==1.2.4 +sortedcontainers==2.3.0 +soupsieve==2.2.1 +sphinx-airflow-theme==0.0.2 +sphinx-argparse==0.2.5 +sphinx-autoapi==1.0.0 +sphinx-copybutton==0.3.1 +sphinx-jinja==1.1.1 +sphinx-rtd-theme==0.5.2 +sphinxcontrib-applehelp==1.0.2 +sphinxcontrib-devhelp==1.0.2 +sphinxcontrib-dotnetdomain==0.4 +sphinxcontrib-golangdomain==0.2.0.dev0 +sphinxcontrib-htmlhelp==1.0.3 +sphinxcontrib-httpdomain==1.7.0 +sphinxcontrib-jsmath==1.0.1 +sphinxcontrib-qthelp==1.0.3 +sphinxcontrib-redoc==1.6.0 +sphinxcontrib-serializinghtml==1.1.5 +sphinxcontrib-spelling==5.2.1 +spython==0.0.85 +sshpubkeys==3.3.1 +sshtunnel==0.1.5 +starkbank-ecdsa==1.1.0 +statsd==3.3.0 +swagger-ui-bundle==0.0.8 +tableauserverclient==0.14.1 +tabulate==0.8.9 +tblib==1.7.0 +tenacity==6.2.0 +termcolor==1.1.0 +testfixtures==6.17.1 +text-unidecode==1.3 +textwrap3==0.9.2 +thrift-sasl==0.4.3 +thrift==0.13.0 +toml==0.10.2 +toolz==0.11.1 +tornado==6.1 +tqdm==4.56.2 +traitlets==5.0.5 +typed-ast==1.4.3 +typing-extensions==3.7.4.3 +typing-inspect==0.6.0 +tzlocal==2.1 +unicodecsv==0.14.1 +uritemplate==3.0.1 +urllib3==1.25.11 +vertica-python==1.0.1 +vine==1.3.0 +virtualenv==20.4.7 +volatile==2.1.0 +watchtower==0.7.3 +wcwidth==0.2.5 +websocket-client==0.57.0 +wrapt==1.12.1 +xmltodict==0.12.0 +yamllint==1.26.1 +yandexcloud==0.73.0 +yarl==1.6.3 +zdesk==2.7.1 +zict==2.0.0 +zipp==3.4.1 +zope.event==4.5.0 +zope.interface==5.2.0 diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py index 86b7dcaeeaa..4cc2d709e2b 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -23,7 +23,7 @@ @pytest.fixture(autouse=True, scope="function") -# The fixture `airflow_database` lives in composer/conftest.py. +# The fixture `airflow_database` lives in ./conftest.py. def set_variables(airflow_database): models.Variable.set('gcp_project', PROJECT_ID) models.Variable.set('bigquery_dataset', DATASET) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt index 060b683e35b..9c9eb16e556 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/requirements.txt @@ -1,4 +1,4 @@ -apache-airflow[google]==2.1.0 +apache-airflow[google]==2.0.1 apache-airflow-providers-apache-beam==2.0.0 apache-airflow-providers-slack==3.0.0 -apache-airflow-providers-http==1.1.1 \ No newline at end of file +apache-airflow-providers-http==1.1.1 From e6e3a6f456819577ef33f79195f12f5dd27e1531 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Wed, 16 Jun 2021 15:16:02 +0100 Subject: [PATCH 15/17] formatting changes after running nox black --- .../conftest.py | 4 +- .../data_orchestration_blog_sample_dag.py | 67 +++++++++++-------- ...data_orchestration_blog_sample_dag_test.py | 19 +++--- .../noxfile_config.py | 7 +- 4 files changed, 56 insertions(+), 41 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py index 909f0429bcf..e7033247004 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/conftest.py @@ -28,7 +28,7 @@ def airflow_database(): # We use separate directory for local db path per session # by setting AIRFLOW_HOME env var, which is done in noxfile_config.py. - assert('AIRFLOW_HOME' in os.environ) + assert "AIRFLOW_HOME" in os.environ airflow_home = os.environ["AIRFLOW_HOME"] airflow_db = f"{airflow_home}/airflow.db" @@ -37,4 +37,4 @@ def airflow_database(): airflow.utils.db.resetdb() # Making sure we are using a data file there. - assert(os.path.isfile(airflow_db)) + assert os.path.isfile(airflow_db) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 06a117d39d2..6ac2d0f19bd 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -24,8 +24,8 @@ from airflow.utils.state import State # Sample data -BUCKET_NAME = 'gs://cloud-samples-data/composer/data-orchestration-blog-example' -DATA_FILE_NAME = 'bike_station_data.csv' +BUCKET_NAME = "gs://cloud-samples-data/composer/data-orchestration-blog-example" +DATA_FILE_NAME = "bike_station_data.csv" # Assumes existence of the following Airflow Variables PROJECT_ID = models.Variable.get("gcp_project") @@ -36,7 +36,7 @@ # Slack error notification example taken from Kaxil Naik's blog on Slack Integration: # https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105 def on_failure_callback(context): - ti = context.get('task_instance') + ti = context.get("task_instance") slack_msg = """ :red_circle: Task Failed. *Task*: {task} @@ -44,54 +44,63 @@ def on_failure_callback(context): *Execution Time*: {exec_date} *Log Url*: {log_url} """.format( - task=ti.task_id, - dag=ti.dag_id, - log_url=ti.log_url, - exec_date=context.get('execution_date'), - ) - slack_webhook_token = BaseHook.get_connection('slack_connection').password + task=ti.task_id, + dag=ti.dag_id, + log_url=ti.log_url, + exec_date=context.get("execution_date"), + ) + slack_webhook_token = BaseHook.get_connection("slack_connection").password slack_error = SlackWebhookOperator( - task_id='post_slack_error', - http_conn_id='slack_connection', - channel='#airflow-alerts', + task_id="post_slack_error", + http_conn_id="slack_connection", + channel="#airflow-alerts", webhook_token=slack_webhook_token, - message=slack_msg) + message=slack_msg, + ) slack_error.execute(context) with models.DAG( - 'dataflow_to_bq_workflow', + "dataflow_to_bq_workflow", schedule_interval=None, start_date=days_ago(1), - default_args={'on_failure_callback': on_failure_callback} + default_args={"on_failure_callback": on_failure_callback}, ) as dag: validate_file_exists = GCSObjectExistenceSensor( - task_id="validate_file_exists", - bucket=BUCKET_NAME, - object=DATA_FILE_NAME + task_id="validate_file_exists", bucket=BUCKET_NAME, object=DATA_FILE_NAME ) -# See Launching Dataflow pipelines with Cloud Composer tutorial for further guidance -# https://cloud.google.com/composer/docs/how-to/using/using-dataflow-template-operator + # See Launching Dataflow pipelines with Cloud Composer tutorial for further guidance + # https://cloud.google.com/composer/docs/how-to/using/using-dataflow-template-operator start_dataflow_job = DataflowTemplatedJobStartOperator( task_id="start-dataflow-template-job", - job_name='csv_to_bq_transform', + job_name="csv_to_bq_transform", template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", parameters={ "javascriptTextTransformFunctionName": "transform", - "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format(bucket=BUCKET_NAME), + "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format( + bucket=BUCKET_NAME + ), "JSONPath": "gs://{bucket}/bq_schema.json".format(bucket=BUCKET_NAME), - "inputFilePattern": "gs://{bucket}/{filename}".format(bucket=BUCKET_NAME, filename=DATA_FILE_NAME), - "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format(bucket=BUCKET_NAME), - "outputTable": "{project_id}:{dataset}.{table}".format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE) - } + "inputFilePattern": "gs://{bucket}/{filename}".format( + bucket=BUCKET_NAME, filename=DATA_FILE_NAME + ), + "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format( + bucket=BUCKET_NAME + ), + "outputTable": "{project_id}:{dataset}.{table}".format( + project_id=PROJECT_ID, dataset=DATASET, table=TABLE + ), + }, ) execute_bigquery_sql = BigQueryCheckOperator( - task_id='execute_bigquery_sql', - sql='SELECT COUNT(*) FROM `{project_id}.{dataset}.{table}` LIMIT 1000'.format(project_id=PROJECT_ID, dataset=DATASET, table=TABLE), - use_legacy_sql=False + task_id="execute_bigquery_sql", + sql="SELECT COUNT(*) FROM `{project_id}.{dataset}.{table}`".format( + project_id=PROJECT_ID, dataset=DATASET, table=TABLE + ), + use_legacy_sql=False, ) validate_file_exists >> start_dataflow_job >> execute_bigquery_sql diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py index 4cc2d709e2b..bbe14fdeff7 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag_test.py @@ -17,23 +17,24 @@ import pytest # user should substitute their project ID -PROJECT_ID = 'your-project-id' -DATASET = 'your-bq-output-dataset' -TABLE = 'your-bq-output-table' +PROJECT_ID = "your-project-id" +DATASET = "your-bq-output-dataset" +TABLE = "your-bq-output-table" @pytest.fixture(autouse=True, scope="function") # The fixture `airflow_database` lives in ./conftest.py. def set_variables(airflow_database): - models.Variable.set('gcp_project', PROJECT_ID) - models.Variable.set('bigquery_dataset', DATASET) - models.Variable.set('bigquery_table', TABLE) + models.Variable.set("gcp_project", PROJECT_ID) + models.Variable.set("bigquery_dataset", DATASET) + models.Variable.set("bigquery_table", TABLE) yield - models.Variable.delete('gcp_project') - models.Variable.delete('bigquery_dataset') - models.Variable.delete('bigquery_table') + models.Variable.delete("gcp_project") + models.Variable.delete("bigquery_dataset") + models.Variable.delete("bigquery_table") def test_dag_import(): from . import data_orchestration_blog_sample_dag + internal_unit_testing.assert_has_valid_dag(data_orchestration_blog_sample_dag) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py index 87a9d218289..89d7b0d164a 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/noxfile_config.py @@ -31,7 +31,12 @@ TEST_CONFIG_OVERRIDE = { # You can opt out from the test for specific Python versions. - "ignored_versions": ["2.7", "3.6", "3.7", "3.9"], # Composer w/ Airflow 2 only supports Python 3.8 + "ignored_versions": [ + "2.7", + "3.6", + "3.7", + "3.9", + ], # Composer w/ Airflow 2 only supports Python 3.8 # Old samples are opted out of enforcing Python type hints # All new samples should feature them "enforce_type_hints": False, From 0422943eda2d20143894b7a8b660ee00ae5ce0f8 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Wed, 16 Jun 2021 17:11:35 +0100 Subject: [PATCH 16/17] updated to use f-strings --- .../data_orchestration_blog_sample_dag.py | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index 6ac2d0f19bd..ee0b57f24b3 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -24,7 +24,7 @@ from airflow.utils.state import State # Sample data -BUCKET_NAME = "gs://cloud-samples-data/composer/data-orchestration-blog-example" +BUCKET_NAME = "cloud-samples-data/composer/data-orchestration-blog-example" DATA_FILE_NAME = "bike_station_data.csv" # Assumes existence of the following Airflow Variables @@ -79,27 +79,17 @@ def on_failure_callback(context): template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", parameters={ "javascriptTextTransformFunctionName": "transform", - "javascriptTextTransformGcsPath": "gs://{bucket}/udf_transform.js".format( - bucket=BUCKET_NAME - ), - "JSONPath": "gs://{bucket}/bq_schema.json".format(bucket=BUCKET_NAME), - "inputFilePattern": "gs://{bucket}/{filename}".format( - bucket=BUCKET_NAME, filename=DATA_FILE_NAME - ), - "bigQueryLoadingTemporaryDirectory": "gs://{bucket}/tmp/".format( - bucket=BUCKET_NAME - ), - "outputTable": "{project_id}:{dataset}.{table}".format( - project_id=PROJECT_ID, dataset=DATASET, table=TABLE - ), + "javascriptTextTransformGcsPath": f"gs://{BUCKET_NAME}/udf_transform.js", + "JSONPath": f"gs://{BUCKET_NAME}/bq_schema.json", + "inputFilePattern": f"gs://{BUCKET_NAME}/{DATA_FILE_NAME}", + "bigQueryLoadingTemporaryDirectory": f"gs://{BUCKET_NAME}/tmp/", + "outputTable": f"{PROJECT_ID}:{DATASET}.{TABLE}", }, ) execute_bigquery_sql = BigQueryCheckOperator( task_id="execute_bigquery_sql", - sql="SELECT COUNT(*) FROM `{project_id}.{dataset}.{table}`".format( - project_id=PROJECT_ID, dataset=DATASET, table=TABLE - ), + sql=f"SELECT COUNT(*) FROM `{PROJECT_ID}.{DATASET}.{TABLE}`", use_legacy_sql=False, ) From 35c37348477d04984312ed327661690806f327c4 Mon Sep 17 00:00:00 2001 From: rachael-ds Date: Wed, 16 Jun 2021 18:00:20 +0100 Subject: [PATCH 17/17] updated slack connection formatting --- .../dags/data_orchestration_blog_sample_dag.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py index ee0b57f24b3..b3c0baf474c 100644 --- a/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py +++ b/composer/blog/gcp-tech-blog/data-orchestration-with-composer/dags/data_orchestration_blog_sample_dag.py @@ -37,18 +37,13 @@ # https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105 def on_failure_callback(context): ti = context.get("task_instance") - slack_msg = """ + slack_msg = f""" :red_circle: Task Failed. - *Task*: {task} - *Dag*: {dag} - *Execution Time*: {exec_date} - *Log Url*: {log_url} - """.format( - task=ti.task_id, - dag=ti.dag_id, - log_url=ti.log_url, - exec_date=context.get("execution_date"), - ) + *Task*: {ti.task_id} + *Dag*: {ti.dag_id} + *Execution Time*: {context.get('execution_date')} + *Log Url*: {ti.log_url} + """ slack_webhook_token = BaseHook.get_connection("slack_connection").password slack_error = SlackWebhookOperator( task_id="post_slack_error",