diff --git a/composer/data/python2_script.py b/composer/data/python2_script.py new file mode 100644 index 00000000000..aec80592f75 --- /dev/null +++ b/composer/data/python2_script.py @@ -0,0 +1,15 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +print 'Output from Python 2.' # noqa diff --git a/composer/workflows/bashoperator_python2.py b/composer/workflows/bashoperator_python2.py new file mode 100644 index 00000000000..29bcdb7af79 --- /dev/null +++ b/composer/workflows/bashoperator_python2.py @@ -0,0 +1,44 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START composer_bashoperator_python2] +import datetime + +from airflow import models +from airflow.operators import bash_operator + + +yesterday = datetime.datetime.combine( + datetime.datetime.today() - datetime.timedelta(1), + datetime.datetime.min.time()) + + +default_dag_args = { + # Setting start date as yesterday starts the DAG immediately when it is + # detected in the Cloud Storage bucket. + 'start_date': yesterday, +} + +with models.DAG( + 'composer_sample_bashoperator_python2', + schedule_interval=datetime.timedelta(days=1), + default_args=default_dag_args) as dag: + + run_python2 = bash_operator.BashOperator( + task_id='run_python2', + # This example runs a Python script from the data folder to prevent + # Airflow from attempting to parse the script as a DAG. + bash_command='python2 /home/airflow/gcs/data/python2_script.py', + ) +# [END composer_bashoperator_python2] diff --git a/composer/workflows/bashoperator_python2_test.py b/composer/workflows/bashoperator_python2_test.py new file mode 100644 index 00000000000..7e035607f6b --- /dev/null +++ b/composer/workflows/bashoperator_python2_test.py @@ -0,0 +1,26 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import unit_testing + + +def test_dag_import(): + """Test that the DAG file can be successfully imported. + + This tests that the DAG can be parsed, but does not run it in an Airflow + environment. This is a recommended sanity check by the official Airflow + docs: https://airflow.incubator.apache.org/tutorial.html#testing + """ + from . import bashoperator_python2 as module + unit_testing.assert_has_valid_dag(module) diff --git a/composer/workflows/pythonvirtualenvoperator_python2.py b/composer/workflows/pythonvirtualenvoperator_python2.py new file mode 100644 index 00000000000..af8c983a9dd --- /dev/null +++ b/composer/workflows/pythonvirtualenvoperator_python2.py @@ -0,0 +1,63 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START composer_pythonvirtualenvoperator_python2] +import datetime + +from airflow import models +from airflow.operators import python_operator + + +def python2_function(): + """A function which has not been converted to Python 3.""" + # Use the global variable virtualenv_string_args to pass in values when the + # Python version differs from that used by the Airflow process. + global virtualenv_string_args + + # Imports must happen within the function when run with the + # PythonVirtualenvOperator. + import cStringIO + import logging + + arg0 = virtualenv_string_args[0] + buffer = cStringIO.StringIO() + buffer.write('Wrote an ASCII string to buffer:\n') + buffer.write(arg0) + logging.info(buffer.getvalue()) + + +yesterday = datetime.datetime.combine( + datetime.datetime.today() - datetime.timedelta(1), + datetime.datetime.min.time()) + + +default_dag_args = { + # Setting start date as yesterday starts the DAG immediately when it is + # detected in the Cloud Storage bucket. + 'start_date': yesterday, +} + +with models.DAG( + 'composer_sample_pythonvirtualenvoperator_python2', + schedule_interval=datetime.timedelta(days=1), + default_args=default_dag_args) as dag: + + # Use the PythonVirtualenvOperator to select an explicit python_version. + run_python2 = python_operator.PythonVirtualenvOperator( + task_id='run_python2', + python_callable=python2_function, + python_version='2', + string_args=['An example input string'], + ) +# [END composer_pythonvirtualenvoperator_python2] diff --git a/composer/workflows/pythonvirtualenvoperator_python2_test.py b/composer/workflows/pythonvirtualenvoperator_python2_test.py new file mode 100644 index 00000000000..da603550a2c --- /dev/null +++ b/composer/workflows/pythonvirtualenvoperator_python2_test.py @@ -0,0 +1,31 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import pytest + +from . import unit_testing + + +@pytest.mark.skipif(sys.version_info >= (3, 0), reason="requires Python 2") +def test_dag_import(): + """Test that the DAG file can be successfully imported. + + This tests that the DAG can be parsed, but does not run it in an Airflow + environment. This is a recommended sanity check by the official Airflow + docs: https://airflow.incubator.apache.org/tutorial.html#testing + """ + from . import pythonvirtualenvoperator_python2 as module + unit_testing.assert_has_valid_dag(module)