Data Engineering Series #3: Apache Airflow - the modern Workflow management tool. Getting Started

srinidhi

Srinidhi

Posted on March 30, 2021

Data Engineering Series #3: Apache Airflow - the modern Workflow management tool. Getting Started

Why such attention towards Airflow ?

Interest rate of airflow. Source: Google Trends

Source: Google Trends

"The software industry has seen a notable rise in the development of tools to manage data. These tools range from storage solutions that house data to pipelines that transport data".

Data-driven companies like Airbnb, Quizlet rely on these data pipelines to resolve some tedious tasks like

  • Scheduling
  • Testing
  • Handling errors
  • Versioning
  • Scaling

the current data flow set up.

We call these pipelines data workflows. And a well-known tool in this space is Apache Airflow.

Airflow solves those tedious tasks by,

click on a feature if you wish to jump to its section directly.

Alt Text

Alt Text

Alt Text

Alt Text

Alt Text

Alt Text

And the best part of all, Airflow is an Open Source tool and has a rapidly growing user base and contributions 🙂


Without further ado,

Let's deep dive into the workflow framework

And create a minimalistic workflow pipeline that leverages all the Airflow features I had listed above.

✨ DAGS


Every workflow is constructed as a Directed Acyclic Graph (DAG).
DAG is created using a .py file which should ideally have three sections configured
  1. DAG configuration - default_args
  2. DAG Instance DAG()
  3. DAG Tasks - Operators


DAG Configuration - default_args

default_args is used to set properties (arguments) that are common for all the tasks.

# Initializing the default arguments that we'll pass to our DAG
default_args = {
    'owner': 'Srinidhi',
    'start_date': datetime(2018, 1, 1),
    'retries': 1, 
    'on_failure_callback': task_failure_alert
}
Enter fullscreen mode Exit fullscreen mode

DAG Instance - DAG()

# Creating a DAG instance
my_workflow_dag = DAG(
    'first_dag',
    default_args=default_args,
    description='Sample DAG for DE Series',
    schedule_interval='0 0 * * *',
)
Enter fullscreen mode Exit fullscreen mode


DAG Tasks - Operators

Airflow Operators are used in creating individual tasks in the DAG. Properties specific to a task will be configured in the operator.

Some of the most commonly used Airflow Operators

PythonOperator - Calls a python function. An alternative way to create a python function task is to use TaskFlow API which is available from Airflow 2.0
BashOperator - Executes a UNIX command


#Operators
# Bash operator tasks that executes unix command - echo
task_1 = BashOperator(
    task_id='first_task',
    bash_command='echo "First Task"',
    dag=my_workflow_dag,
)

# Python Operator that prints the details of current job using context variable
 task_2 = PythonOperator(
    task_id='second_task',
    python_callable=print_fn,
    provide_context=True,
    on_success_callback = task_success_alert,
    dag=my_workflow_dag,
)

#Python Function

def print_fn(ds):
    """
    Implement your python task logic here.
    """
    #Prints job details passed through provide_context parameter
    print(ds)
Enter fullscreen mode Exit fullscreen mode

Now that we have our DAG ready, let's see where I have configured the features in it.

✨ RETRY


Airflow handles errors and failures gracefully.
If you prefer to re-run a failed task multiple times before aborting the workflow run, use retries argument. So that an erroneous task will be executed up to the defined number of times before it's marked as failed.

✨ ALERTS


on_success_callback and on_failure_callback arguments are used to trigger some actions once the workflow succeeds or fails respectively. This will be useful to send personalized alerts to internal team via Slack, Email, or any other API call when a workflow task succeeds or fails.
👇 Sample Slack notification received from Airflow

Alt Text

View Code - Success, Failure

✨ WEBUI


Executing airflow webserver in CLI starts airflow service which opens up Airflow WebUI in 0.0.0.0::8080 URL.

Since Airflow is made of Flask framework, you can even extend WebUI by creating additional pages using Flask AppBuilder.

✨ SCHEDULING


Scheduler service - the heart of Airflow, needs to be Up and Running to run the DAGS.
Once Airflow is installed, execute airflow scheduler to start scheduler service.

Alt Text

In addition to that, DAGs can be made to run automatically at a particular time by providing a cron notation in schedule_interval

✨ API


Airflow 2.0 has released Airflow API that opens the door to Automation with Airflow. With Airflow API, one can automate DAG triggers, reads, and other operations that are possible using WebUI.
Alt Text

Setting up your Airflow Instance

Here are few great resources :
On Windows with Docker
On Ubuntu Local Setup
On Kubernetes
AWS Using cloudformation

If you don't have the bandwidth to set up and maintain airflow in your own infrastructure, here are the commercial Airflow-as-a-service providers:

Google Cloud Composer - managed service built atop Google Cloud and Airflow.
Astronomer.io - In addition to hosting Airflow in their infrastructure, they provide solutions focused on airflow and support services.


Is Airflow the only option for workflows?

Certainly Not. It depends on different factors.
Say your entire product is hosted in a single cloud provider ( AWS / Azure/ GCP) and you are fine with vendor lock-in.
Then,

Whereas if you want to host your product in multi-cloud -
Airflow will be a better fit. There are other workflows in the market similar to Airflow :
Prefect
Luigi
Dagster


Airflow Best practices / Tips:

  1. Once you have airflow installed, modify the configurations of your airflow instance in the master config file - airflow.cfg
  2. Avoid creating complex data processing logic in Airflow, as airflow is not intended to work as a data processing engine. Its focus is to orchestrate data flow jobs. (For data processing, Run the processing script in a batch compute or spark framework and invoke from airflow)
  3. Use macros and templates to avoid hard coding values.
  4. While migrating the airflow metadata db to a new database, use "airflow upgradedb" (For 2.0 airflow db upgrade) instead of "airflow initdb" (For 2.0 airflow db init)
  5. When you want to implement a task out of the box, create custom operators using the BaseOperator Class for re-using them.
  6. Use Airflow Plugins, to connect to third-party tools/data sources.
  7. Use Postgres database as airflow's metadata DB to use Local and CeleryExecutors.

If you have any queries or any more tips that you think might be useful, comment below on the post.

Going forward, I'll publish detailed posts on tools and frameworks used by Data Engineers day in and day out.


Follow for updates.

💖 💪 🙅 🚩
srinidhi
Srinidhi

Posted on March 30, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related