Apache Airflow - Deep Dive | All you need to know about Airflow
Ali KHYAR
Posted on February 24, 2023
This blog was originally published on ali-khyar.com, if you are interested in learning moreon similar subjects visit here
Table of Contents
Airflow for creating and orchestating data pipelines
Airflow's single node architecture vs multi-nodes architecture
Databases and Executors (Sequential, Local and Celery)
Grouping tasks (SubDAGs and TaskGroups)
Tasks conditioning (BranchPythonOperator)
What is a data pipeline?
A data pipeline is a set of processes or tools that are used to move data from one place to another, and to transform and process that data along the way.
A simple example of a data pipeline might involve extracting data from a source system, such as a database or a CSV file, and then using a series of transformation steps to clean and prepare the data for loading into a destination system, such as a data warehouse or a machine learning model.
A data pipeline typically includes several stages, such as data extraction, data transformation, data validation, and data loading. These stages may involve a combination of manual and automated processes, and may include a variety of different tools and technologies.
Data pipeline can be used in various use cases like Extracting, Transforming and Loading (ETL), Extract, Transform, Load and Analyze (ETLA), Extract, Load and Transform (ELT) and many more.
The complexity of the pipeline can vary depending on the scope and purpose; it can be as simple as gathering and combining data from multiple sources into a single, unified view or database. This can be done for a variety of reasons, such as to improve data quality, reduce data redundancy, or to make it easier to analyze and report on the data.
For example, imagine a company that has been acquired by another company and now has multiple databases containing information about customers, sales, and inventory. In order to more easily analyze and report on the company's performance, the data from these multiple databases would need to be consolidated into a single database. This process would involve extracting the relevant data from each of the individual databases, cleaning and standardizing the data, and then loading it into the consolidated database.
A simple sample of an ETL in data pipeline, using Python, is the following:
-
Importing necessary libraries: pandas to load data from source, sqlalchemy’s create_engine to connect to pgsql database
-------------------------------------------------------------- import pandas as pd from sqlalchemy import create_engine --------------------------------------------------------------
-
read sample file (Extract)
-------------------------------------------------------------- df = pd.read_csv("data.csv") df["date"] = pd.to_datetime(df["date"]) --------------------------------------------------------------
-
adding total column (Transform)
-------------------------------------------------------------- df["total"] = df["price"] * df["quantity"] --------------------------------------------------------------
-
Load data into destination (Load)
-------------------------------------------------------------- engine = create_engine("postgresql://username:password@host:port/database") df.to_sql("sales", engine, if_exists="replace") --------------------------------------------------------------
Overall, data pipelines allow organizations to easily collect, process, and analyze large amounts of data, which can help make data-driven decisions and improve business operations, and its architecture is typically based on the following components:
- Data sources: These are the systems or sources from which data is extracted, such as databases, file systems, or external APIs.
- Data extraction: This step involves extracting the data from the sources and converting it into a format that can be used downstream in the pipeline.
- Data transformation: This step involves cleaning, formatting, and transforming the data to make it usable for the next step in the pipeline.
- Data loading: This step involves loading the transformed data into the target system, such as a data warehouse or a data lake.
- Data validation: This step involves validating the data to ensure that it meets the quality standards and requirements before it is loaded into the target system.
- Data monitoring: This step involves monitoring the pipeline to ensure that it is running smoothly and that data is flowing through it as expected.
- Error handling: This step involves handling any errors that may occur during the pipeline and alerting the appropriate parties.
Some data pipeline architectures may also include additional steps such as data enrichment, or data warehousing for data analysis and reporting.
Airflow for creating and orchestating data pipelines
As we saw, Data pipelines are tasks that can either be successive or parallel between a source system and a target one.
Airflow is a popular open-source tool used to manage and schedule data pipeline tasks. It allows for the creation, management, and monitoring of workflows, which can include multiple tasks that are dependent on each other. These tasks can be defined as Python functions and can be scheduled to run on a specific schedule or triggered by certain events. Airflow also provides a web interface for monitoring the status of tasks and troubleshooting any issues that may arise.
The tools is composed from 5 essential components/services:\
- webServer: which is a flask server that is serving the UI with Gunicorn
- Scheduler: The daemon in charge of workflows’ scheduling
- Metastore: a database where metadata is stored, a database is compatible as long as it supports sqlalchemy (Postgres recommended)
-
Executor: defines how tasks should be executed, the most common used ones are:
- Sequential Executor: This is the simplest executor, which runs tasks sequentially on the same machine as the Airflow scheduler. It is the default executor and is suitable for small and simple use cases.
- Local Executor: This executor runs tasks concurrently on the same machine as the Airflow scheduler. It is similar to the Sequential Executor but allows for parallelism.
- Celery Executor: This executor runs tasks concurrently on a separate worker machine or a group of worker machines. It uses the Celery distributed task queue to manage the execution of tasks.
- Kubernetes Executor: This executor runs tasks within a Kubernetes cluster. It allows for scaling the number of worker nodes up or down based on task demands.
- Dask Executor: This executor runs tasks concurrently on a separate worker machine or a group of worker machines. It uses the Dask distributed task scheduler to manage the execution of tasks.
The process or subprocess executing the task
DAGs and Operators:
When you start learning Airflow you hear DAGs a lot, everyone is talking about DAGs like:
a DAG is an abbreviation for “Direct Acyclic Graph”, It is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. DAGs define how tasks are executed, what their dependencies are, and what the order of execution should be.
They are written in Python and can be scheduled to run at a specific interval or triggered by an external event. A DAG has 2 main components
which are Tasks
and Operators
. Operators are used to specify the dependencies between tasks and the order in which they should be executed. For example, an operator can be used to specify that one task should be run only after another task has completed successfully. Here's an example of a simple DAG file in Apache Airflow that defines a single task that runs a bash operator’s task:
You can divide operators into 3 types:
-
action operators
: the ones executing bash commands or python functions -
transfer operators
: allow you to transfer data between systems, such as SftpOperator : This operator which is used to transfer files via SFTP protocol -
sensor operators
: used to check criterias (condition or state) before continuing executing the next task/tasks hence the word sense (wait/percept).AzureBlobStorageSensor for instance
: This operator which is used to check for the existence of a specific blob in an Azure Blob Storage container and wait until it appears or disappears.
In addition, In Airflow we have the other following concepts:
- Task instance: what a task called when executed
- Workflow: combination of: dags with operators with tasks and with dependencies...
Important notes:
Airflow is not:
- a data streaming solution: if you want to process data every seconds better not to go with airflow
- a data processing framework: if you have TB of data, better go with spark or other solutions optimized to do such tasks, and if you challenge it you may end up with memory overflow error. Still, you can use SparkSubmitOperator to trigger a Spark job somewhere outside Airflow.
Airflow's single node architecture vs multi-nodes architecture?
when starting with Airflow you are probably using a single machine, is called in Airflow terms a single node architecture, in which Airflow components are interacting as follow:
Those components are talking with the help of Metastore. The Queue in the executer may not be the best architecture, but it’s suited for single-node architecture, dev environments, as well as limited tasks.
The following is how pipelines are executed in the single node architecture, but also applicable to the multi-node architecture:
- you will have a dags folder for example dags-folder, where data pipelines code is stored.
- both webserver service and scheduler parse dags-folder.
- When it's time for a DAG to get executed, the Scheduler creates a dagRun object (an instantiation of the DAG file ) in the Metastore.
- when the dagRun state becomes Ready, it creates a TaskInstance object.
- The Scheduler sends the TaskInstance to the Executor.
- The Executer runs the TaskInstance, and updates the status of TaskInstance in the metastore
- Once the taskInstance is done, the executer updates its state.
- the scheduler checks dagRun status, if done the WebServer update the status in the UI
To execute as many tasks as you want
, you should use the Multi-Node Architecture
(AKA, Celery), where the Queue will be an external third party service like RabbitMQ or Redis. With Celery, you can have many tasks running and spread on different nodes (workers). Multi-Node Architecture looks like figure below:
Airflow Setup
You can install Airflow with pip/pip3 using the following command pip3 install apache-airflow==version –constraint path-to-constraints
.
The --constraint
or -c
flag specifies a path to a file that contains version constraints for the package being installed. This file is used to specify the version of the package(s) that should be installed, rather than the latest version available.
The path-to-constraints
after the --constraint flag is the path of a file that contains version constraints. This file is a plain text file that lists the package name and the version number or version range that is allowed for that package. It's used to specify the version of the package that should be in
stalled, rather than the latest version available.
In order to initialize the metastore we run the command airflow db init
, this command will also create some additional folders and files(logs, configuration files…). By default if you don’t specify other databases
to use, Airflow will create a sqlite database
named airflow.db.
in Order to start the webserver run airflow webserver
, and visit localhost:8080
.
In airflow no user is created by default
, you should create them manually from the cli, to create a user run the following command:
--------------------------------------------------------------
airflow users create -u admin -p admin -f Ali -l Khyar -r Admin -e admin@airflow.com
--------------------------------------------------------------
Airflow UI Views:
Workflow visualization is crucial for understanding and managing workflows. The following are five views that can be used to visualize Airflow's workflows:
-
Tree View
: The Tree View is a hierarchical view of all the tasks within a DAG. It shows all the tasks and their dependencies in a tree structure. This view is useful for understanding the overall structure of a workflow and for identifying failed or skipped tasks. -
Graph View
: The Graph View displays a DAG and its tasks in a graphical view. This view is useful for visualizing the structure of a workflow and identifying dependencies between tasks. Users can zoom in and out and rearrange tasks to get a better understanding of the workflow. -
Gantt Chart
View: The Gantt Chart View displays the tasks and their dependencies in a timeline. This view is useful for identifying the start and end times of tasks and how they relate to each other. -
Task Instance Details View
: The Task Instance Details View displays detailed information about a specific task, including its start and end time, duration, and status. Users can also view logs for the task, which can help with debugging and troubleshooting. -
Code View
: The Code View displays the code that defines a DAG. This view is useful for understanding how a workflow is defined and for making changes to the code.
DAGs in action:
A we already said a DAG represents a data pipeline, which consists of tasks (nodes) and dependencies (edges) between them, tasks are created using operators. There are many types of operators available in Airflow, including PythonOperator, BashOperator, and SQLiteOperator....
Each operator represents a specific task in the pipeline. For example, let's say we have a data pipeline that involves:
- extracting user data from an API
- processing it using Python functions
- storing it in a SQLite database.
We could create the following tasks using Airflow operators:
- SQLiteOperator: create table
- HttpSensor: check if API is available
- PythonOperator: extract user data
- PythonOperator: process user data
- BashOperator: store user data in SQLite database
We would then define a DAG folder to store our DAGs, and create a DAG file that specifies the order in which these tasks should be executed. It's important to note that combining cleaning and processing data into one Airflow operator is not a best practice
, as it can lead to issues in the pipeline. Instead, each task should be its own operator
.
Code example of the above scenario:
--------------------------------------------------------------
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.sqlite_operator import SQLiteOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 23),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# creating an example dag
dag = DAG(
'example_dag',
default_args=default_args,
description='A DAG to demonstrate the use of Airflow operators',
schedule_interval='@daily',
)
# task to create SQLite table
create_table = SQLiteOperator(
task_id='create_table',
sql='CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL, email TEXT NOT NULL)',
dag=dag,
database='my_db',
)
# task to check if API is available
check_api = SimpleHttpOperator(
task_id='check_api',
endpoint='api/health',
method='GET',
http_conn_id='my_api',
dag=dag,
)
# task to extract user data
extract_user_data = PythonOperator(
task_id='extract_user_data',
python_callable=my_extraction_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)
# task to process user data
process_user_data = PythonOperator(
task_id='process_user_data',
python_callable=my_processing_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)
# task to store user data in SQLite database
store_user_data = BashOperator(
task_id='store_user_data',
bash_command='python /path/to/my_script.py --arg1 value1 --arg2 value2',
dag=dag,
)
# define task dependencies
create_table >> check_api >> extract_user_data >> process_user_data >> store_user_data
--------------------------------------------------------------
To test our DAG, we can use the airflow tasks test
command, which allows us to test individual tasks
within the DAG.
To share data between our tasks we can use XComs mechanism
. XComs allow us to pass data between tasks by creating a key-value pair in the Metastore.
For example, the extract user data task could create an XCom containing the extracted user data as a JSON object, which could then be retrieved by the process user data task using the XCom API. We will see more about this later in this blog.
DAGS Scheduling:
One of the key features of Airflow is its ability to schedule tasks based on a variety of criteria. you can define a task's start date and its scheduled interval.
- The start date determines when the task should begin running.
- scheduled interval determines how often the task should be executed.
For example, let's say we have a task that needs to run every 10 minutes, starting on 01 January 2020 at 10:00am, we can define such task like the following:
--------------------------------------------------------------
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1, 10, 0, 0),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'dummy_dag',
default_args=default_args,
schedule_interval=timedelta(minutes=10)
)
def dummy_task():
print("dummy_task")
pass
task = PythonOperator(
task_id='dummy_task',
python_callable=dummy_task,
dag=dag
)
--------------------------------------------------------------
the above code create dummy_dag, with the start date of 01 January 2020 at 10:00am and a scheduled interval of 10 minutes. We've also defined a PythonOperator called dummy_task that will be executed every 10 minutes.
One thing to note
: is that the task won't start executing immediately at 10:00am. Instead, Airflow will wait until the first scheduled interval has elapsed before triggering the task. In this case, that means the task will be triggered at 10:10am on January 1st, 2020. This is referred to as theexecution date
in Airflow.
Backfilling And CatchUp
super important concept in DAGs are Backfilling And CatchUp.
Backfilling
is the process of running past instances of a DAG that were missed due to a schedule not being set up at the time or when the DAG was paused. it can be achieved by setting the start_date
parameter of the DAG and using the airflow backfill command
. This command can be used to manually trigger a DAG run for all instances between the start_date and the current date.
Catchup
is a feature in Airflow that allows a DAG to process all missed DAG runs during a period of time when the DAG was inactive, either due to the DAG being paused or the scheduler not running. By default, catchup is set to True
, which means that the scheduler will process any missed DAG runs when the DAG is restarted. This ensures that all historical data is processed and accounted for.
Here's an example to illustrate the use of backfilling and catchup
in Airflow:
Let's say we have a DAG
- scheduled to run @daily
- with a start date of January 1, 2023
- The DAG is paused for 3 days
- then restarted on January 5, 2023.
Since catchup is set to True by default
, Airflow will automatically run DAG instances for January 2, 3, and 4, in addition to the January 5 instance.
To deactivate CatchUp you can set it up to False in the DAG instantiation:
--------------------------------------------------------------
with DAG(...
catchup=False
....)
--------------------------------------------------------------
Notes: all dates in airflow are in UTC: don’t get confused if dags are not executed in your local timezone
you can change that airflow.cfg in default_timezone = “”
not recommended to change that keep everything in UTC
Databases and Executors:
Execetors in Airflow are what defines how many tasks u can execute in parallel.
It's important to understand the order in which tasks will be executed. Specifically, if you have two tasks that are dependent on each other, which one will be executed first?
In the example above, we have a DAG with four tasks: task1, task2, task3, and task4. Task1 is the first task in the sequence, but which of the next two tasks - task2 or task3 - will be executed first?
SequentialExecutor:
The answer to the previous question is that they will be executed sequentially, one after the other. This is because they are connected by a bit shift operator (>>), which tells Airflow to execute the tasks in order. In this case, task2 will be executed first, followed by task3. This sequential execution is useful for debugging
, as it allows you to see the output of each task before moving on to the next one. To configure your DAG for sequential execution, you'll need to set the executor
parameter to SequentialExecutor
in your Airflow configuration file, This Executor is the default if configuration file is not touched. You'll also need to specify a sql_alchemy_conn
parameter, which tells Airflow where to store the metadata for your DAG.
You can discover the values for these parameters by running the following commands in your terminal (under pipenv):
--------------------------------------------------------------
airflow config get-value core sql_alchemy_conn
airflow config get-value core executor
--------------------------------------------------------------
It's worth noting
that if you're using a SQLite
database to store your DAG metadata, you won't be able to run multiple write operations at the same time
. This means that if you have multiple tasks that are trying to write to the database simultaneously, you may run into issues. If you anticipate a high volume of write operations, you may want to consider using a different database backend that can handle concurrent writes more effectively.
LocalExecuter:
As you can see SequentialExecutor is not that useful if you want to run multiple tasks in parallel on a single machine
. Here comes LocalExecuter
to help to increase the efficiency of workflows and reduce overall execution time.
To change the executer to LocalExecuter:
- First, you should have a PostgreSQL database
Install the necessary packages by running the command:
pip install ‘apache-airflow[postgres]’
Update the Airflow configuration file (airflow.cfg) by changing the
sql_alchemy_conn
parameterin the [core] section
to the Postgres connection stringVerify that the database is set up correctly by running the command
airflow db check
Change the executor to LocalExecutor by updating the
executor parameter in the [core]
section of airflow.cfg.
Initialize the database by running the command airflow db init.
Create a user account by running the command
airflow users create --username admin --password admin --role admin --firstname ali --lastname khyar --email admin@airflow.com
Start the Airflow webserver and scheduler by running the commands
airflow webserver
andairflow scheduler
, respectively.Run your DAG
andcheck the Gantt view
to see parallel execution in action.
parallel on a single machinewith localExecutor task2 and task3 in the same time (subprocesses)
The above steps will allow us to use LocalExecutor instead of the SequentialExecuter, hence running tasks in parallel and improve execution time.
Ok LocalExector is nice, it allows us to run tasks in parallel in a single machine, but what if our single machine went out of resources? how we can Scale Airflow?
Here comes KubernetesExecutor and CeleryExecutor to save.
CeleryExecutor:
Celery executors allows Airflow to scale worker nodes, Using the distributed task system provided by Celery to spread execution among multiple machine
To configure CeleryExecutor, we follow the below steps:
Install the necessary packages by running the command
pip install ‘apache-airflow[celery]
’Install Redis by running the command
sudo apt update && sudo apt install redis-server
-
Modify the Redis configuration file (
sudo nano /etc/redis/redis.conf
) by adding the following lines:
------------------------- supervised systemd -------------------------
Restart the Redis service by running the commands
sudo systemctl restart redis.service
andsudo systemctl status redis.service
to ensure that it is running correctly.In the Airflow configuration file (airflow.cfg), change the executor to
CeleryExecutor
andupdate the broker_url parameter
toredis://localhost:6379/0
(where 0 is the name of the database) and theresult_backend
parameter tothe same value as sql_alchemy_conn
To interact with Redis from Airflow, install the apache-airflow[redis] package:
pip install 'apache-airflow[redis]'
Celery parameters (Good to Know):
In order to optimize tasks' execution with CeleryExecutor you can adjust the below parameters in airflow.cfg file:
parallelism
: This parameter specifies the maximum number of tasks that can be executed concurrently across the entire Airflow installation. default value is 32,If you set it to 1, Airflow will behave like a sequential executor
.dag_concurrency
: This parameter limits the maximum number of tasks that can be executed concurrently for a specific DAG. By default it is to 16, butit can be overridden on a DAG level by setting the concurrency parameter
.max_active_runs_per_dag
: This parameter limits the maximum number of DAG runs that can be executed concurrently for a specific DAG. default value is 16, but itcan be overridden on a DAG level by setting the max_active_runs parameter
.
Note:
thepriority
of these parametersis parallelism > dag_concurrency
. This means that if you have set parallelism to a low value, it will limit the number of tasks that can be executed concurrently across the entire Airflow installation regardless of the value of dag_concurrency.However, if you have set parallelism to a high value, it will take priority over dag_concurrency, and the maximum number of tasks that can be executed concurrently for a specific DAG will be limited by dag_concurrency.
Grouping tasks:
Sometimes tasks within a DAG can be complex to manage if there's many or if complex processing is involved, so you need to group task, and you can either go with SubDAGs (not recommended but good to know) or with TaskGroups.
The idea is to move from something like this:
to something like this:
SubDAGs:
A SubDAG allows you to bundle related tasks within a DAG into a manageable DAG (DAG within a DAG).
You create SubDAGs by creating a function that returns a DAG object (encapsulate tasks), here's an example of a SubDAG named subdag_task:
--------------------------------------------------------------
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime
def subdag(dag_id, default_args):
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=None
)
task_1 = BashOperator(
task_id='subdag_task_1',
bash_command='echo "SubDAG task 1"',
dag=dag
)
task_2 = BashOperator(
task_id='subdag_task_2',
bash_command='echo "SubDAG task 2"',
dag=dag
)
task_1 >> task_2
return dag
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 2, 23)
}
with DAG(dag_id='parent_dag', default_args=default_args, schedule_interval=None) as dag:
task_1 = BashOperator(
task_id='parent_task_1',
bash_command='echo "Parent DAG task 1"',
dag=dag
)
subdag_task = SubDagOperator(
task_id='subdag_task',
subdag=subdag('parent_dag.subdag_task', default_args=default_args),
dag=dag
)
task_2 = BashOperator(
task_id='parent_task_2',
bash_command='echo "Parent DAG task 2"',
dag=dag
)
task_1 >> subdag_task >> task_2
--------------------------------------------------------------
SubDAGs seems like a cool feature but it has its dark side, like everything in life. Although if you change the Executor for Airflow, tasks within the SubDAG will use SequentialExecutor which will slow total time of execution. Plus, you may fall into deadlocks (DAGs waiting each other to complete, causing a circular dependency that cannot be resolved).
Hence TaskGroups were introduced in Airflow 2.0
TaskGroups:
TaskGroups allows you to better group tasks and manage them easily. TaskGroups are defined with TaskGroup
class like follows:
--------------------------------------------------------------
from airflow import DAG
from airflow.utils.task_group import TaskGroup
with DAG(dag_id='my_dag', ...) as dag:
task_1 = ...
with TaskGroup('group_1') as group_1:
task_2 = ...
task_3 = ...
task_4 = ...
--------------------------------------------------------------
then you can set depency the usual way using bitwise operator (>>) or using set_upstram
, as for the example above you can use:
task_4.set_upstream(group_1)
You can go crazy as much as you want with TaskGroups, to do things such as nested group, like in the following:
--------------------------------------------------------------
from airflow import DAG
from airflow.utils.task_group import TaskGroup
with DAG(dag_id='my_dag', ...) as dag:
task_1 = ...
with TaskGroup('group_1') as group_1:
task_2 = ...
with TaskGroup('subgroup_1') as subgroup_1:
task_3 = ...
task_4 = ...
task_5 = ...
task_6 = ...
--------------------------------------------------------------
By Grouping tasks with TaskGroup you will get the most out of making DAGs code cleaner more manageable and easy to read. this is powerful _init_?
Sharing data with XComs:
We saw somewhere before XComs and we didn't get into it in detail. Xcoms in Airflow are a way of exchanging data between tasks, they are basically key value pair with a timestamp. they are getting used by push pull operations. let's suppose we have a task that download files from a storage account and we need to pass a list of downloaded file (file list) to an downstream task, here's how the push operation can be done from a task:
--------------------------------------------------------------
file_list = ['file1.txt', 'file2.txt', 'file3.txt']
task_instance = context['task_instance']
task_instance.xcom_push(key='file_list', value=file_list)
--------------------------------------------------------------
and a pull peration in another task can be done like in below:
--------------------------------------------------------------
task_instance = context['task_instance']
file_list = task_instance.xcom_pull(key='file_list')
for file in file_list:
# download the file
--------------------------------------------------------------
Tasks conditioning:
I don't know if this thing is called Tasks conditioning XD hhhhhhh but anyway the idea is how to execute what downstream task based on an upstream value which is pushed in XComs, Such operations can be done using BranchPythonOperator
to define tasks execution rules, here's an example where choose_next_task
is the function which triggers next task to be executed based on the xcom value of data_type
:
--------------------------------------------------------------
def choose_next_task(**context):
data_type = context['task_instance'].xcom_pull(key='data_type')
if data_type == 'A':
return 'task_A'
elif data_type == 'B':
return 'task_B'
else:
return 'task_C'
branching_task = BranchPythonOperator(
task_id='branching_task',
python_callable=choose_next_task,
provide_context=True
)
task_A = BashOperator(
task_id='task_A',
bash_command='echo "Data type A processed"'
)
task_B = BashOperator(
task_id='task_B',
bash_command='echo "Data type B processed"'
)
task_C = BashOperator(
task_id='task_C',
bash_command='echo "Data type not recognized"'
)
branching_task >> [task_A, task_B, task_C]
--------------------------------------------------------------
Trigger rules:
Sometimes, we don't need to ensure that all upstream tasks should succeed before running a downstream task, or maybe we need to know if any failed. This can be done through trigger rules, trigger rules enables you to run downstream tasks based on the final execution status of upstream tasks, there are 9 different trigger rules:
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
dummy: dependencies are just for show, trigger at will
Here's bellow an example where if any task fails, the downstream tasks will fail as well, this is achieved using trigger_rule='one_failed'
:
--------------------------------------------------------------
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 23),
}
with DAG('example_alerting', default_args=default_args, schedule_interval=None) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "Hello World from Task 1"',
trigger_rule='one_failed'
)
task_2 = BashOperator(
task_id='task_2',
bash_command='echo "Hello World from Task 2"',
trigger_rule='one_failed'
)
task_3 = BashOperator(
task_id='task_3',
bash_command='echo "Hello World from Task 3"',
trigger_rule='one_failed'
)
task_4 = BashOperator(
task_id='task_4',
bash_command='echo "Hello World from Task 4"',
trigger_rule='one_failed'
)
task_1 >> [task_2, task_3] >> task_4
--------------------------------------------------------------
Conclusion:
Apache Airflow is an awesome tool to create and manage data pipelines, thanks for the AkumenIA tool to introduce such great tool to me. In the next blog about Airflow, we are going to see how to use it in the cloud with Kubernetes (AKS), and how to monitor its cluster using the ELK. Thank you for reading.
Posted on February 24, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.