Skip to content

Commit 21cc1ad

Browse files
leahecoleengelke
andauthored
add airflow 1 samples to legacy samples (GoogleCloudPlatform#5896)
* add airflow 1 samples to legacy samples * rename to airflow_1_samples * rename directory * airfow -> airflow * remove anything featuring python 2 * fix lint Co-authored-by: Charles Engelke <engelke@google.com>
1 parent 62e7c2c commit 21cc1ad

40 files changed

+2296
-0
lines changed

composer/airflow_1_samples/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Airflow 1 Legacy Samples
2+
3+
The samples in this directory are all samples that are compatible with supported versions of Airflow 1.10.x in Composer. They will be supported as long as Airflow 1 is supported in Cloud Composer. To see the versions of Airflow currently supported in Cloud Composer, check out the [versions list](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions).

composer/airflow_1_samples/__init__.py

Whitespace-only changes.
Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START composer_metadb_cleanup_airflow_1]
16+
"""
17+
A maintenance workflow that you can deploy into Airflow to periodically clean
18+
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
19+
having too much data in your Airflow MetaStore.
20+
21+
## Authors
22+
23+
The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup)
24+
25+
## Usage
26+
27+
1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME,
28+
ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values
29+
30+
2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each
31+
dictionary in the list features the following parameters:
32+
- airflow_db_model: Model imported from airflow.models corresponding to
33+
a table in the airflow metadata database
34+
- age_check_column: Column in the model/table to use for calculating max
35+
date of data deletion
36+
- keep_last: Boolean to specify whether to preserve last run instance
37+
- keep_last_filters: List of filters to preserve data from deleting
38+
during clean-up, such as DAG runs where the external trigger is set to 0.
39+
- keep_last_group_by: Option to specify column by which to group the
40+
database entries and perform aggregate functions.
41+
42+
3. Create and Set the following Variables in the Airflow Web Server
43+
(Admin -> Variables)
44+
- airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain
45+
the log files if not already provided in the conf. If this is set to 30,
46+
the job will remove those files that are 30 days old or older.
47+
48+
4. Put the DAG in your gcs bucket.
49+
"""
50+
from datetime import datetime, timedelta
51+
import logging
52+
import os
53+
54+
import airflow
55+
from airflow import settings
56+
from airflow.configuration import conf
57+
from airflow.jobs import BaseJob
58+
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
59+
TaskInstance, Variable, XCom
60+
from airflow.operators.python_operator import PythonOperator
61+
import dateutil.parser
62+
from sqlalchemy import and_, func
63+
from sqlalchemy.exc import ProgrammingError
64+
from sqlalchemy.orm import load_only
65+
66+
try:
67+
# airflow.utils.timezone is available from v1.10 onwards
68+
from airflow.utils import timezone
69+
now = timezone.utcnow
70+
except ImportError:
71+
now = datetime.utcnow
72+
73+
# airflow-db-cleanup
74+
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
75+
START_DATE = airflow.utils.dates.days_ago(1)
76+
# How often to Run. @daily - Once a day at Midnight (UTC)
77+
SCHEDULE_INTERVAL = "@daily"
78+
# Who is listed as the owner of this DAG in the Airflow Web Server
79+
DAG_OWNER_NAME = "operations"
80+
# List of email address to send email alerts to if this job fails
81+
ALERT_EMAIL_ADDRESSES = []
82+
# Length to retain the log files if not already provided in the conf. If this
83+
# is set to 30, the job will remove those files that arE 30 days old or older.
84+
85+
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
86+
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
87+
# Prints the database entries which will be getting deleted; set to False
88+
# to avoid printing large lists and slowdown process
89+
PRINT_DELETES = False
90+
# Whether the job should delete the db entries or not. Included if you want to
91+
# temporarily avoid deleting the db entries.
92+
ENABLE_DELETE = True
93+
# List of all the objects that will be deleted. Comment out the DB objects you
94+
# want to skip.
95+
DATABASE_OBJECTS = [{
96+
"airflow_db_model": BaseJob,
97+
"age_check_column": BaseJob.latest_heartbeat,
98+
"keep_last": False,
99+
"keep_last_filters": None,
100+
"keep_last_group_by": None
101+
}, {
102+
"airflow_db_model": DagRun,
103+
"age_check_column": DagRun.execution_date,
104+
"keep_last": True,
105+
"keep_last_filters": [DagRun.external_trigger.is_(False)],
106+
"keep_last_group_by": DagRun.dag_id
107+
}, {
108+
"airflow_db_model": TaskInstance,
109+
"age_check_column": TaskInstance.execution_date,
110+
"keep_last": False,
111+
"keep_last_filters": None,
112+
"keep_last_group_by": None
113+
}, {
114+
"airflow_db_model": Log,
115+
"age_check_column": Log.dttm,
116+
"keep_last": False,
117+
"keep_last_filters": None,
118+
"keep_last_group_by": None
119+
}, {
120+
"airflow_db_model": XCom,
121+
"age_check_column": XCom.execution_date,
122+
"keep_last": False,
123+
"keep_last_filters": None,
124+
"keep_last_group_by": None
125+
}, {
126+
"airflow_db_model": SlaMiss,
127+
"age_check_column": SlaMiss.execution_date,
128+
"keep_last": False,
129+
"keep_last_filters": None,
130+
"keep_last_group_by": None
131+
}, {
132+
"airflow_db_model": DagModel,
133+
"age_check_column": DagModel.last_scheduler_run,
134+
"keep_last": False,
135+
"keep_last_filters": None,
136+
"keep_last_group_by": None
137+
}]
138+
139+
# Check for TaskReschedule model
140+
try:
141+
from airflow.models import TaskReschedule
142+
DATABASE_OBJECTS.append({
143+
"airflow_db_model": TaskReschedule,
144+
"age_check_column": TaskReschedule.execution_date,
145+
"keep_last": False,
146+
"keep_last_filters": None,
147+
"keep_last_group_by": None
148+
})
149+
150+
except Exception as e:
151+
logging.error(e)
152+
153+
# Check for TaskFail model
154+
try:
155+
from airflow.models import TaskFail
156+
DATABASE_OBJECTS.append({
157+
"airflow_db_model": TaskFail,
158+
"age_check_column": TaskFail.execution_date,
159+
"keep_last": False,
160+
"keep_last_filters": None,
161+
"keep_last_group_by": None
162+
})
163+
164+
except Exception as e:
165+
logging.error(e)
166+
167+
# Check for RenderedTaskInstanceFields model
168+
try:
169+
from airflow.models import RenderedTaskInstanceFields
170+
DATABASE_OBJECTS.append({
171+
"airflow_db_model": RenderedTaskInstanceFields,
172+
"age_check_column": RenderedTaskInstanceFields.execution_date,
173+
"keep_last": False,
174+
"keep_last_filters": None,
175+
"keep_last_group_by": None
176+
})
177+
178+
except Exception as e:
179+
logging.error(e)
180+
181+
# Check for ImportError model
182+
try:
183+
from airflow.models import ImportError
184+
DATABASE_OBJECTS.append({
185+
"airflow_db_model": ImportError,
186+
"age_check_column": ImportError.timestamp,
187+
"keep_last": False,
188+
"keep_last_filters": None,
189+
"keep_last_group_by": None
190+
})
191+
192+
except Exception as e:
193+
logging.error(e)
194+
195+
# Check for celery executor
196+
airflow_executor = str(conf.get("core", "executor"))
197+
logging.info("Airflow Executor: " + str(airflow_executor))
198+
if (airflow_executor == "CeleryExecutor"):
199+
logging.info("Including Celery Modules")
200+
try:
201+
from celery.backends.database.models import Task, TaskSet
202+
DATABASE_OBJECTS.extend(({
203+
"airflow_db_model": Task,
204+
"age_check_column": Task.date_done,
205+
"keep_last": False,
206+
"keep_last_filters": None,
207+
"keep_last_group_by": None
208+
}, {
209+
"airflow_db_model": TaskSet,
210+
"age_check_column": TaskSet.date_done,
211+
"keep_last": False,
212+
"keep_last_filters": None,
213+
"keep_last_group_by": None
214+
}))
215+
216+
except Exception as e:
217+
logging.error(e)
218+
219+
session = settings.Session()
220+
221+
default_args = {
222+
"owner": DAG_OWNER_NAME,
223+
"depends_on_past": False,
224+
"email": ALERT_EMAIL_ADDRESSES,
225+
"email_on_failure": True,
226+
"email_on_retry": False,
227+
"start_date": START_DATE,
228+
"retries": 1,
229+
"retry_delay": timedelta(minutes=1)
230+
}
231+
232+
dag = DAG(
233+
DAG_ID,
234+
default_args=default_args,
235+
schedule_interval=SCHEDULE_INTERVAL,
236+
start_date=START_DATE)
237+
if hasattr(dag, "doc_md"):
238+
dag.doc_md = __doc__
239+
if hasattr(dag, "catchup"):
240+
dag.catchup = False
241+
242+
243+
def print_configuration_function(**context):
244+
logging.info("Loading Configurations...")
245+
dag_run_conf = context.get("dag_run").conf
246+
logging.info("dag_run.conf: " + str(dag_run_conf))
247+
max_db_entry_age_in_days = None
248+
if dag_run_conf:
249+
max_db_entry_age_in_days = dag_run_conf.get(
250+
"maxDBEntryAgeInDays", None)
251+
logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
252+
if (max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1):
253+
logging.info(
254+
"maxDBEntryAgeInDays conf variable isn't included or Variable " +
255+
"value is less than 1. Using Default '" +
256+
str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + "'")
257+
max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS
258+
max_date = now() + timedelta(-max_db_entry_age_in_days)
259+
logging.info("Finished Loading Configurations")
260+
logging.info("")
261+
262+
logging.info("Configurations:")
263+
logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days))
264+
logging.info("max_date: " + str(max_date))
265+
logging.info("enable_delete: " + str(ENABLE_DELETE))
266+
logging.info("session: " + str(session))
267+
logging.info("")
268+
269+
logging.info("Setting max_execution_date to XCom for Downstream Processes")
270+
context["ti"].xcom_push(key="max_date", value=max_date.isoformat())
271+
272+
273+
print_configuration = PythonOperator(
274+
task_id="print_configuration",
275+
python_callable=print_configuration_function,
276+
provide_context=True,
277+
dag=dag)
278+
279+
280+
def cleanup_function(**context):
281+
282+
logging.info("Retrieving max_execution_date from XCom")
283+
max_date = context["ti"].xcom_pull(
284+
task_ids=print_configuration.task_id, key="max_date")
285+
max_date = dateutil.parser.parse(max_date) # stored as iso8601 str in xcom
286+
287+
airflow_db_model = context["params"].get("airflow_db_model")
288+
state = context["params"].get("state")
289+
age_check_column = context["params"].get("age_check_column")
290+
keep_last = context["params"].get("keep_last")
291+
keep_last_filters = context["params"].get("keep_last_filters")
292+
keep_last_group_by = context["params"].get("keep_last_group_by")
293+
294+
logging.info("Configurations:")
295+
logging.info("max_date: " + str(max_date))
296+
logging.info("enable_delete: " + str(ENABLE_DELETE))
297+
logging.info("session: " + str(session))
298+
logging.info("airflow_db_model: " + str(airflow_db_model))
299+
logging.info("state: " + str(state))
300+
logging.info("age_check_column: " + str(age_check_column))
301+
logging.info("keep_last: " + str(keep_last))
302+
logging.info("keep_last_filters: " + str(keep_last_filters))
303+
logging.info("keep_last_group_by: " + str(keep_last_group_by))
304+
305+
logging.info("")
306+
307+
logging.info("Running Cleanup Process...")
308+
309+
try:
310+
query = session.query(airflow_db_model).options(
311+
load_only(age_check_column))
312+
313+
logging.info("INITIAL QUERY : " + str(query))
314+
315+
if keep_last:
316+
317+
subquery = session.query(func.max(DagRun.execution_date))
318+
# workaround for MySQL "table specified twice" issue
319+
# https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
320+
if keep_last_filters is not None:
321+
for entry in keep_last_filters:
322+
subquery = subquery.filter(entry)
323+
324+
logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))
325+
326+
if keep_last_group_by is not None:
327+
subquery = subquery.group_by(keep_last_group_by)
328+
logging.info(
329+
"SUB QUERY [keep_last_group_by]: " +
330+
str(subquery))
331+
332+
subquery = subquery.from_self()
333+
334+
query = query.filter(
335+
and_(age_check_column.notin_(subquery)),
336+
and_(age_check_column <= max_date))
337+
338+
else:
339+
query = query.filter(age_check_column <= max_date,)
340+
341+
if PRINT_DELETES:
342+
entries_to_delete = query.all()
343+
344+
logging.info("Query: " + str(query))
345+
logging.info("Process will be Deleting the following " +
346+
str(airflow_db_model.__name__) + "(s):")
347+
for entry in entries_to_delete:
348+
date = str(entry.__dict__[str(age_check_column).split(".")[1]])
349+
logging.info("\tEntry: " + str(entry) + ", Date: " + date)
350+
351+
logging.info("Process will be Deleting "
352+
+ str(len(entries_to_delete)) + " "
353+
+ str(airflow_db_model.__name__) + "(s)")
354+
else:
355+
logging.warn(
356+
"You've opted to skip printing the db entries to be deleted. "
357+
"Set PRINT_DELETES to True to show entries!!!")
358+
359+
if ENABLE_DELETE:
360+
logging.info("Performing Delete...")
361+
# using bulk delete
362+
query.delete(synchronize_session=False)
363+
session.commit()
364+
logging.info("Finished Performing Delete")
365+
else:
366+
logging.warn("You've opted to skip deleting the db entries. "
367+
"Set ENABLE_DELETE to True to delete entries!!!")
368+
369+
logging.info("Finished Running Cleanup Process")
370+
371+
except ProgrammingError as e:
372+
logging.error(e)
373+
logging.error(
374+
str(airflow_db_model) + " is not present in the metadata."
375+
"Skipping...")
376+
377+
378+
for db_object in DATABASE_OBJECTS:
379+
380+
cleanup_op = PythonOperator(
381+
task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
382+
python_callable=cleanup_function,
383+
params=db_object,
384+
provide_context=True,
385+
dag=dag)
386+
387+
print_configuration.set_downstream(cleanup_op)
388+
389+
# [END composer_metadb_cleanup_airflow_1]

0 commit comments

Comments
 (0)