Data Engineering Series #3: Apache Airflow - the modern Workflow management tool. Getting Started
Srinidhi
Posted on March 30, 2021
Why such attention towards Airflow ?
Interest rate of airflow. Source: Google Trends
"The software industry has seen a notable rise in the development of tools to manage data. These tools range from storage solutions that house data to pipelines that transport data".
Data-driven companies like Airbnb, Quizlet rely on these data pipelines to resolve some tedious tasks like
- Scheduling
- Testing
- Handling errors
- Versioning
- Scaling
the current data flow set up.
We call these pipelines data workflows. And a well-known tool in this space is Apache Airflow.
Airflow solves those tedious tasks by,
click on a feature if you wish to jump to its section directly.
And the best part of all, Airflow is an Open Source tool and has a rapidly growing user base and contributions 🙂
Without further ado,
Let's deep dive into the workflow framework
And create a minimalistic workflow pipeline that leverages all the Airflow features I had listed above.
✨ DAGS
Every workflow is constructed as a Directed Acyclic Graph (DAG).
DAG is created using a .py file which should ideally have three sections configured
- DAG configuration -
default_args
- DAG Instance
DAG()
- DAG Tasks -
Operators
DAG Configuration - default_args
default_args
is used to set properties (arguments) that are common for all the tasks.
# Initializing the default arguments that we'll pass to our DAG
default_args = {
'owner': 'Srinidhi',
'start_date': datetime(2018, 1, 1),
'retries': 1,
'on_failure_callback': task_failure_alert
}
DAG Instance - DAG()
# Creating a DAG instance
my_workflow_dag = DAG(
'first_dag',
default_args=default_args,
description='Sample DAG for DE Series',
schedule_interval='0 0 * * *',
)
DAG Tasks - Operators
Airflow Operators are used in creating individual tasks in the DAG. Properties specific to a task will be configured in the operator.
Some of the most commonly used Airflow Operators
PythonOperator
- Calls a python function. An alternative way to create a python function task is to use TaskFlow API which is available from Airflow 2.0
BashOperator
- Executes a UNIX command
#Operators
# Bash operator tasks that executes unix command - echo
task_1 = BashOperator(
task_id='first_task',
bash_command='echo "First Task"',
dag=my_workflow_dag,
)
# Python Operator that prints the details of current job using context variable
task_2 = PythonOperator(
task_id='second_task',
python_callable=print_fn,
provide_context=True,
on_success_callback = task_success_alert,
dag=my_workflow_dag,
)
#Python Function
def print_fn(ds):
"""
Implement your python task logic here.
"""
#Prints job details passed through provide_context parameter
print(ds)
Now that we have our DAG ready, let's see where I have configured the features in it.
✨ RETRY
Airflow handles errors and failures gracefully.
If you prefer to re-run a failed task multiple times before aborting the workflow run, use retries argument. So that an erroneous task will be executed up to the defined number of times before it's marked as failed.
✨ ALERTS
on_success_callback and on_failure_callback arguments are used to trigger some actions once the workflow succeeds or fails respectively. This will be useful to send personalized alerts to internal team via Slack, Email, or any other API call when a workflow task succeeds or fails.
👇 Sample Slack notification received from Airflow
View Code - Success, Failure
✨ WEBUI
Executing
airflow webserver
in CLI starts airflow service which opens up Airflow WebUI in 0.0.0.0::8080
URL.
Since Airflow is made of Flask framework, you can even extend WebUI by creating additional pages using Flask AppBuilder.
✨ SCHEDULING
Scheduler service - the heart of Airflow, needs to be Up and Running to run the DAGS.
Once Airflow is installed, execute
airflow scheduler
to start scheduler service.
In addition to that, DAGs can be made to run automatically at a particular time by providing a cron notation in schedule_interval
✨ API
Airflow 2.0 has released Airflow API that opens the door to Automation with Airflow. With Airflow API, one can automate DAG triggers, reads, and other operations that are possible using WebUI.
Setting up your Airflow Instance
Here are few great resources :
On Windows with Docker
On Ubuntu Local Setup
On Kubernetes
AWS Using cloudformation
If you don't have the bandwidth to set up and maintain airflow in your own infrastructure, here are the commercial Airflow-as-a-service providers:
Google Cloud Composer - managed service built atop Google Cloud and Airflow.
Astronomer.io - In addition to hosting Airflow in their infrastructure, they provide solutions focused on airflow and support services.
Is Airflow the only option for workflows?
Certainly Not. It depends on different factors.
Say your entire product is hosted in a single cloud provider ( AWS / Azure/ GCP) and you are fine with vendor lock-in.
Then,
- For AWS, AWS step function will be a good option.
- For Azure, you can opt for Azure Data Factory.
- For GCP, Google Cloud Composer will be the best fit.
Whereas if you want to host your product in multi-cloud -
Airflow will be a better fit. There are other workflows in the market similar to Airflow :
Prefect
Luigi
Dagster
Airflow Best practices / Tips:
- Once you have airflow installed, modify the configurations of your airflow instance in the master config file - airflow.cfg
- Avoid creating complex data processing logic in Airflow, as airflow is not intended to work as a data processing engine. Its focus is to orchestrate data flow jobs. (For data processing, Run the processing script in a batch compute or spark framework and invoke from airflow)
- Use macros and templates to avoid hard coding values.
- While migrating the airflow metadata db to a new database, use "airflow upgradedb" (For 2.0 airflow db upgrade) instead of "airflow initdb" (For 2.0 airflow db init)
- When you want to implement a task out of the box, create custom operators using the
BaseOperator
Class for re-using them. - Use Airflow Plugins, to connect to third-party tools/data sources.
- Use Postgres database as airflow's metadata DB to use Local and CeleryExecutors.
If you have any queries or any more tips that you think might be useful, comment below on the post.
Going forward, I'll publish detailed posts on tools and frameworks used by Data Engineers day in and day out.
Posted on March 30, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
March 30, 2021