-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Adding sample python DAG and gcs_to_gcs operator as plugin #1678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. |
is because is not currently available in current Composer Airflow version (1.9.0)
is because is not currently available in current Composer Airflow version (1.9.0)
1. Uses gcs_to_gcs operator from plugins 2. Uses Cloud logging features 3. Dynamically generates tasks based on master csv file
CLAs look good, thanks! |
@tswast can you please review and approve if appropriate? |
- Enable export and import multiple Avro files
from googleapiclient import errors | ||
|
||
|
||
class GoogleCloudStorageHook(GoogleCloudBaseHook): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Are these files copied from the Airflow repository? If so, could we provide download instructions rather than copying them here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, both hook and operator are from the Airflow master branch (1.10), I just copied this snapshot version to avoid breaking the code with future changes in the Airflow project.
Let me know if there is a more elegant way (I guess can give download instructions from a specific git commit from the Airflow repo).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I removed these and added instructions to download with wget from 1.10-stable branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the operator as well.
If we cannot, please retain the original license headers and move to a third_party/
directory per go/thirdparty#projects
import logging | ||
|
||
from airflow import models | ||
from airflow.contrib.operators.bigquery_to_gcs import \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import modules not classes. from airflow.contrib.operators import bigquery_to_gcs
. Likewise for other imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
This DAG relies on three Airflow variables | ||
https://airflow.apache.org/concepts.html#variables | ||
* master_file_path - CSV file listing source and target tables, including |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest table_definition_file_path
to avoid use of word "master"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - I renamed to "table_list_file_path"
# -------------------------------------------------------------------------------- | ||
|
||
# Instantiates a client | ||
client = google.cloud.logging.Client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it common to set up logging within DAG parsing? Won't Airflow and/or Composer ensure logs get written to the correct place? Why is this code present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code is actually there to test logging against GCP Stackdriver while running in local Airflow environment, I would remove to keep the example clean as it is less relevant with Composer sending logs automatically to Stackdriver.
# -------------------------------------------------------------------------------- | ||
|
||
|
||
def read_master_file(master_file): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read_table_definition_file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
) | ||
|
||
# Get the table list from master file | ||
all_records = read_master_file(master_file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fenglu-g Do you have thoughts on doing dynamic calls to construct a DAG a parse time? I thought this was not a recommended practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. A few more things to fix up.
Will this need to be updated in a few weeks once Composer supports Airflow 1.10?
@@ -0,0 +1,172 @@ | |||
# Copyright 2018 Google LLC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the operator as well.
If we cannot, please retain the original license headers (we should definitely not replace any license headers with Google copyright) and move to a third_party/ directory per go/thirdparty#projects
# -------------------------------------------------------------------------------- | ||
|
||
import csv | ||
from datetime import datetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import modules, not classes/functions. import datetime
.
environment. This is a recommended sanity check by the official Airflow | ||
docs: https://airflow.incubator.apache.org/tutorial.html#testing | ||
""" | ||
models.Variable.set('master_file_path', 'example_file_path') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update variable name. Also, the file path should point to bq_copy_eu_to_us_sample.csv
, right?
Use __file__
to get the path to this Python file and construct the path from there. For example:
python-docs-samples/appengine/standard/i18n/i18n_utils.py
Lines 180 to 181 in 35fe662
locale_path = os.path.join( | |
os.path.abspath(os.path.dirname(__file__)), 'locales') |
""" | ||
models.Variable.set('master_file_path', 'example_file_path') | ||
models.Variable.set('gcs_source_bucket', 'example-project') | ||
models.Variable.set('gcs_dest_bucket', 'us-central1-f') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to import bq_copy_across_locations
so that we can verify that it correctly parses.
…g for Composer DAG as plugin
I moved the plugins directory to third-party to avoid nox from running the linter and tests on Apache Airflow code.
LGTM. I rearrange the files a little bit and updated the tests in e3adbda |
@dpebot merge when green |
Okay! I'll merge when all statuses are green and all reviewers approve. |
Author DAG, including master csv file sample and test:
Uses gcs_to_gcs operator from plugins
Uses Cloud logging features
Dynamically generates tasks based on master csv file