Best Practices Apache Airflow

Download as pdf or txt
Download as pdf or txt
You are on page 1of 28

Airflow

A CLAIRVOYANT Story

www.clairvoyantsoft.com
Quick Poll

|2
Robert Sanders Shekhar Vemuri

Big Data Manager and Engineer CTO

One of the early employees of Clairvoyant, Robert Shekhar works with clients across various industries and
primarily works with clients to enable them along their helps define data strategy, and lead the implementation of
big data journey. Robert has deep background in web data engineering and data science efforts.
and enterprise systems, working on full-stack
implementations and then focusing on Data Was Co-founder and CTO of Blue Canary, a Predictive
management platforms. analytics solution to help with student retention, Blue
Canary was later Acquired by Blackboard in 2015.

|3
About

Background Awards & Recognition

Boutique consulting firm centered on building data solutions and


products

All things Web and Data Engineering, Analytics, ML and User


Experience to bring it all together

Support core Hadoop platform, data engineering pipelines and


provide administrative and devops expertise focused on Hadoop

|4
Currently working on building a data security solution to help enterprises
discover, secure and monitor sensitive data in their environment.

|5
Agenda

● What is Apache Airflow?


○ Features
○ Architecture
● Use Cases
● Lessons Learned
● Best Practices
● Scaling & High Availability
● Deployment, Management & More
● Questions

|6
Why?

● Mostly used Cron and Oozie


● Did some crazy things with Java and Quartz in a past life
● Lot of operational support was going into debugging Oozie workloads and issues we ran into
with that
○ 4+ Years of working with Oozie “built expertise??”
● Needed a scalable, open source, user friendly engine for
○ Internal product needs
○ Client engagements
○ Making our Ops and Support teams lives easier

| 10
Scheduler Landscape

| 11
What is Apache Airflow?

● “Apache Airflow is an Open Source platform to programmatically Author, Schedule and Monitor workflows”
○ Workflows as Python Code (this is huge!!!!!)
○ Provides monitoring tools like alerts and a web interface
● Written in Python
● Apache Incubator Project
○ Joined Apache Foundation in early 2016
○ https://github.com/apache/incubator-airflow/

| 12
Why use Apache Airflow?

● Lightweight Workflow Platform


● Full blown Python scripts as DSL
● More flexible execution and workflow generation
● Feature Rich Web Interface
● Worker Processes can Scale Horizontally and Vertically
● Extensible

| 13
Building Blocks

| 14
Executors

What are Executors? Different Executors

Executors are the mechanism by which task ● SequentialExecutor


instances get run. ● LocalExecutor
● CeleryExecutor
● MesosExecutor
● KubernetesExecutor (Coming Soon)

| 15
Single Node Deployment

| 16
Multi-Node Deployment

| 17
Defining a Workflow

# Library Imports
from airflow.models import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

# Define global variables and default arguments


START_DATE = datetime.now() - timedelta(minutes=1)
default_args = dict(
'owner'='Airflow',
'retries'=1,
'retry_delay'=timedelta(minutes=5),
)
# Define the DAG
dag = DAG('dag_id', default_args=default_args, schedule_interval='0 0 * * *’, start_date=START_DATE)

# Define the Tasks


task1 = BashOperator(task_id='task1', bash_command="echo 'Task 1'", dag=dag)
task2 = BashOperator(task_id='task2', bash_command="echo 'Task 2'", dag=dag)
task3 = BashOperator(task_id='task3', bash_command="echo 'Task 3'", dag=dag)

# Define the Task Relationships


task1.set_downstream(task2)
task2.set_downstream(task3)

| 18
Defining a Dynamic Workflow

dag = DAG('dag_id', …)

last_task = None
for i in range(1, 3):

task = BashOperator(
task_id='task' + str(i),
bash_command="echo 'Task" + str(i) + "'",
dag=dag)

if last_task is None:
last_task = task
else:
last_task.set_downstream(task)

last_task = task

| 19
Operators

● Action Operators
○ BashOperator(bash_command))
○ SSHOperator(ssh_hook, ssh_conn_id, remote_host, command)
○ PythonOperator(python_callable=python_function)
● Transfer Operators
○ HiveToMySqlTransfer(sql, mysql_table, hiveserver2_conn_id, mysql_conn_id, mysql_preoperator, mysql_postoperator, bulk_load)
○ MySqlToHiveTransfer(sql, hive_table, create, recreate, partition, delimiter, mysql_conn_id, hive_cli_conn_id, tblproperties)
● Sensor Operators
○ HdfsSensor(filepath, hdfs_conn_id, ignored_ext, ignore_copying, file_size, hook)
○ HttpSensor(endpoint, http_conn_id, method, request_params, headers, response_check, extra_options)

Many More

| 20
First Use Case (Description)

● Kogni discovers sensitive data across all data sources enterprise


● Need to configure scans with various schedules, work standalone or with a spark cluster
● Orchestrate, execute and manage dozens of pipelines that scan and ingest data in a secure
fashion
● Needed a tool to manage this outside of the core platform
● Started with exporting Oozie configuration from the core app - but conditional aspects and
visibility became an issue
● Needed something that supported deep DAGs and Broad DAGs

| 21
Second Use Case (Description)

● Daily ETL Batch Process to Ingest data into Hadoop


○ Extract
■ 1226 tables from 23 databases
○ Transform
■ Impala scripts to join and transform data
○ Load
■ Impala scripts to load data into common final tables
● Other requirements
○ Make it extensible to allow the client to import more databases and tables in the future
○ Status emails to be sent out after daily job to report on success and failures
● Solution
○ Create a DAG that dynamically generates the workflow based off data in a Metastore

| 22
Second Use Case (Architecture)

| 23
Second Use Case (DAG)
1,000 ft view 100,000 ft view

| 24
Lessons Learned

● Support
● Documentation
● Bugs and Odd Behavior
● Monitor Performance with Charts
● Tune Retries
● Tune Parallelism

| 25
Best Practices

● Load Data Incrementally


● Process Historic Data with Backfill operations
● Enforce Idempotency (retry safe)
● Execute Conditionally (BranchPythonOperator, ShortCuircuitOperator)
● Alert if there are failures (task failures and SLA misses) (Email/Slack)
● Use Sensor Operators to determine when to Start a Task (if applicable)
● Build Validation into your Workflows
● Test as much - but needs some thought

| 26
Scaling & High Availability

| 27
High Availability for the Scheduler
Scheduler Failover Controller: https://github.com/teamclairvoyant/airflow-scheduler-failover-controller

| 28
Deployment & Management

● PIP Install airflow site packages on all Nodes


● Set AIRFLOW_HOME env variable before setup
● Utilize MySQL or PostgreSQL as a Metastore
● Update Web App Port
● Utilize SystemD or Upstart Scripts (https://github.com/apache/incubator-
airflow/tree/master/scripts)
● Set Log Location
○ Local File System, S3 Bucket, Google Cloud Storage
● Daemon Monitoring (Nagios)
● Cloudera Manager CSD (Coming Soon)

| 29
Security

● Web App Authentication


○ Password
○ LDAP
○ OAuth: Google, GitHub
● Role Based Access Control (RBAC) (Coming Soon)
● Protect airflow.cfg (expose_config, read access to airflow.cfg)
● Web App SSL
● Kerberos Ticket Renewer

| 30
Testing

● PyUnit - Unit Testing


● Test DAG Tasks Individually

airflow test [--subdir SUBDIR] [--dry_run] [--task_params


TASK_PARAMS_JSON] dag_id task_id execution_date

● Airflow Unit Test Mode - Loads configurations from the unittests.cfg file

[tests]
unit_test_mode = true

● Always at the very least ensure that the DAG is valid (can be done as part of CI)
● Take it a step ahead by mock pipeline testing(with inputs and outputs) (especially if your DAGs
are broad)

| 31

You might also like