Skip to content

Conversation

dsdinter
Copy link
Contributor

Author DAG, including master csv file sample and test:

Uses gcs_to_gcs operator from plugins
Uses Cloud logging features
Dynamically generates tasks based on master csv file

@googlebot
Copy link

We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google.
In order to pass this check, please resolve this problem and have the pull request author add another comment and the bot will run again. If the bot doesn't comment, it means it doesn't think anything has changed.

@googlebot googlebot added the cla: no This human has *not* signed the Contributor License Agreement. label Aug 31, 2018
is because is not currently available in current Composer Airflow
version (1.9.0)
is because is not currently available in current Composer Airflow
version (1.9.0)
1. Uses gcs_to_gcs operator from plugins
2. Uses Cloud logging features
3. Dynamically generates tasks based on master csv file
@googlebot
Copy link

CLAs look good, thanks!

@googlebot googlebot added cla: yes This human has signed the Contributor License Agreement. and removed cla: no This human has *not* signed the Contributor License Agreement. labels Sep 1, 2018
@dsdinter
Copy link
Contributor Author

dsdinter commented Sep 3, 2018

@tswast can you please review and approve if appropriate?

from googleapiclient import errors


class GoogleCloudStorageHook(GoogleCloudBaseHook):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Are these files copied from the Airflow repository? If so, could we provide download instructions rather than copying them here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, both hook and operator are from the Airflow master branch (1.10), I just copied this snapshot version to avoid breaking the code with future changes in the Airflow project.
Let me know if there is a more elegant way (I guess can give download instructions from a specific git commit from the Airflow repo).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I removed these and added instructions to download with wget from 1.10-stable branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the operator as well.

If we cannot, please retain the original license headers and move to a third_party/ directory per go/thirdparty#projects

import logging

from airflow import models
from airflow.contrib.operators.bigquery_to_gcs import \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import modules not classes. from airflow.contrib.operators import bigquery_to_gcs. Likewise for other imports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* master_file_path - CSV file listing source and target tables, including
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest table_definition_file_path to avoid use of word "master"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - I renamed to "table_list_file_path"

# --------------------------------------------------------------------------------

# Instantiates a client
client = google.cloud.logging.Client()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it common to set up logging within DAG parsing? Won't Airflow and/or Composer ensure logs get written to the correct place? Why is this code present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code is actually there to test logging against GCP Stackdriver while running in local Airflow environment, I would remove to keep the example clean as it is less relevant with Composer sending logs automatically to Stackdriver.

# --------------------------------------------------------------------------------


def read_master_file(master_file):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_table_definition_file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)

# Get the table list from master file
all_records = read_master_file(master_file_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fenglu-g Do you have thoughts on doing dynamic calls to construct a DAG a parse time? I thought this was not a recommended practice.

Copy link
Contributor

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. A few more things to fix up.

Will this need to be updated in a few weeks once Composer supports Airflow 1.10?

@@ -0,0 +1,172 @@
# Copyright 2018 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the operator as well.

If we cannot, please retain the original license headers (we should definitely not replace any license headers with Google copyright) and move to a third_party/ directory per go/thirdparty#projects

# --------------------------------------------------------------------------------

import csv
from datetime import datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import modules, not classes/functions. import datetime.

environment. This is a recommended sanity check by the official Airflow
docs: https://airflow.incubator.apache.org/tutorial.html#testing
"""
models.Variable.set('master_file_path', 'example_file_path')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update variable name. Also, the file path should point to bq_copy_eu_to_us_sample.csv, right?

Use __file__ to get the path to this Python file and construct the path from there. For example:

locale_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'locales')

"""
models.Variable.set('master_file_path', 'example_file_path')
models.Variable.set('gcs_source_bucket', 'example-project')
models.Variable.set('gcs_dest_bucket', 'us-central1-f')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to import bq_copy_across_locations so that we can verify that it correctly parses.

@tswast
Copy link
Contributor

tswast commented Sep 7, 2018

LGTM. I rearrange the files a little bit and updated the tests in e3adbda

@tswast
Copy link
Contributor

tswast commented Sep 7, 2018

@dpebot merge when green

@dpebot
Copy link
Collaborator

dpebot commented Sep 7, 2018

Okay! I'll merge when all statuses are green and all reviewers approve.

@dpebot dpebot added the automerge Merge the pull request once unit tests and other checks pass. label Sep 7, 2018
@dpebot dpebot self-assigned this Sep 7, 2018
@tswast tswast merged commit 16de3b4 into GoogleCloudPlatform:master Sep 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
automerge Merge the pull request once unit tests and other checks pass. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants