Creating Data Pipelines as DAGs in Apache Airflow (Part 1)

franklinobasy

franklinobasy

Posted on February 20, 2023

Creating Data Pipelines as DAGs in Apache Airflow (Part 1)

A DAG is a special kind of graph.

So, what exactly is a graph?

A graph is used to express or illustrate relationships among objects. In more technical terms, graphs are used to describe any set of nodes and the edges (relationships) between the nodes. The image below is a simple illustration of a graph.

graph

A graph whose edges have no direction, that is, the edges connect two nodes without specifying which one is the source and which one is the destination, is known as an "undirected graph." The image above is an example of an undirected graph. These graphs can be used to represent symmetric relationships, such as those found in social networks with bidirectional friendships.

A directed graph is the exact opposite of an undirected graph. Directed graphs are graphs whose edges have a defined direction, such that an edge joins a source node with a destination node.

a directed graph

When a path in a graph begins and finishes at the same node and has at least one edge, the path is said to be in a cycle.

A graph is referred to as "cyclic" if it has at least one cycle. On the other hand, an "acyclic" graph is a graph with no cycles.

cycle in a graph

Now that we understand what a graph is, the difference between directed and undirected graphs, and the difference between cyclic and acyclic graphs, let's define what a DAG is.

What is a DAG?

A DAG is a type of directed graph that has no cycles. As a result, there are never any loops or cycles when tracing a path from one node to another in a DAG by looking at the direction of the edges.

For describing workflows or dependencies between jobs, DAGs are especially helpful because of their acyclic nature. Each node in a DAG represents a task to be completed, and the edges show how those tasks are related to one another. You may figure out what order the tasks need to be completed in to satisfy all of the dependencies by following the directed edges of the DAG.

For example, imagine you want to extract data from a source, transform the data, and load the data into a database. We can sketch a DAG illustration for this workflow as follows:

a DAG

From the illustration above, we see three different nodes that represent a sequence of tasks. The first task is data extraction; once the extraction is complete, the next task is initiated, which transforms the data. After the transformation is complete, the last task is to load the data into a database.

Let's consider another example:

Let's say you want to receive several log files from various servers, parse the log files, and then send the parsed data to a visualization server if there are no issues and an email to the devops team if there are. Converting this to a DAG, we have:

a DAG

The DAG shown above resembles a tree.

According to the proverb;

All trees are DAGS, but not all DAGs are trees.

A node in a tree can only have one parent, but the "Task 4" node has several parents, hence this DAG cannot be classified as a tree.

DAGS in Apache Airflow

DAGs are used to represent workflows or pipelines in Apache Airflow.

Your data pipeline's individual tasks are each represented as a node in a DAG, and any dependency between two tasks is represented as a directed edge in the DAG. Edges, in other words, specify the sequence in which the two jobs should be performed. DAGs are thus utilized in Airflow to specify which tasks should run and in what order.

In Apache Airflow, the structure of a DAG is represented using a Python script. As a result, the tasks and their dependencies are described as code. Also, the DAG's script contains scheduling instructions as code.

Each task performed within a DAG is also written in Python and is implemented by an operator.

What are Airflow DAG Operators

Operators are the building blocks of a workflow. Operators are the individual tasks that are performed as a part of the workflow. They are capable of carrying out a wide range of operations, such as running a Python function, a SQL query, or a shell command.

In Apache Airflow, DAG (Directed Acyclic Graph) operators are the building blocks of a workflow. Operators are the individual tasks that are executed as part of the workflow, and they can perform a wide variety of actions, such as running a Python function, executing a SQL query, or running a shell command.

Many pre-built operators in Airflow can be utilized right away, including the following:

  1. BashOperator: executes a bash command or script.

  2. PythonOperator: executes a Python function.

  3. SQLOperator: executes a SQL query.

These are just a few of the several operators that Airflow offers. To carry out particular activities or interface with external systems, users can also design their custom operators.

DAG as Code

To code along in this section, ensure you have the airflow Python library installed in your Python environment.

pip install apache-airflow
Enter fullscreen mode Exit fullscreen mode

The typical logical blocks of a DAG definition in a Python script are as follows:

  1. Library imports

    Import the required python libraries.

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    from datetime import timedelta
    
  2. DAG arguments

    Next, this block specifies default arguments for the DAG object

    args = {
        'owner': 'John Doe',
        'start_date': days_ago(1),
        'email': ['johndoe@random.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
  3. DAG definition

    To instantiate the DAG, you require a name for the dag, example_dag, default argument, description, and schedule interval.

    
    dag = DAG(
        'example_dag',
        default_args=args,
        description='John Doe\'s DAG',
        schedule_interval=timedelta(days=1),
    )
    
  4. Task definitions

    Here we create the individual task definition using the BashOperator. These task definitions are the nodes of the DAG.

    task_1 =  BashOperator(
        task_id='task_1',
        bash_command='echo "Task 1"',
        dag=dag
    )
    
    task_2 =  BashOperator(
        task_id='task_2',
        bash_command='echo "Task 2"',
        dag=dag
    )
    
    task_3 =  BashOperator(
        task_id='task_3',
        bash_command='echo "Task 3"',
        dag=dag
    )
    
  5. Task pipeline

    The task pipeline specifies the dependencies between the tasks.

    task_1.set_downstream(task_2)
    task_2.set_downstream(task_3)
    

    Here, task_2 depends on task_1, and task_3 depends on task_2

You can distribute your workflow across an array of workers using Apache Airflow Scheduler. It follows the tasks and dependencies that you listed in your DAG. Your DAGs will begin to execute once an Airflow Scheduler instance has been started, according to the "start date" that you set as code in each of your DAGs. The Scheduler then starts each next DAG run at the time interval you specify.

Here's the full code:

# import libraries
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

# Specify default arguments
args = {
    'owner': 'John Doe',
    'start_date': days_ago(1),
    'email': ['johndoe@random.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Instantiate the DAG object
dag = DAG(
    'example_dag',
    default_args=args,
    description='John Doe\'s DAG',
    schedule_interval=timedelta(days=1),
)

# define the tasks
task_1 =  BashOperator(
    task_id='task_1',
    bash_command='echo "Task 1"',
    dag=dag
)

task_2 =  BashOperator(
    task_id='task_2',
    bash_command='echo "Task 2"',
    dag=dag
)

task_3 =  BashOperator(
    task_id='task_3',
    bash_command='echo "Task 3"',
    dag=dag
)

# Specify the pipeline dependencies
task_1.set_downstream(task_2)
task_2.set_downstream(task_3)
Enter fullscreen mode Exit fullscreen mode

The fact that data pipelines are expressed as code is one of the main benefits of Apache Airflow's approach to modelling data pipelines as DAGs.  When workflows are defined as code, they become more:

  1. Maintainable: By reading the code, developers can directly follow what has been indicated.

  2. Versionable: Code revisions can easily be tracked by a version control system such as Git.

  3. Collaborative: Developer teams may easily work together on the creation and upkeep of the code throughout the entire workflow.

  4. Testable: Unit tests can be run after any changes to make sure the code continues to function as intended.

💖 💪 🙅 🚩
franklinobasy
franklinobasy

Posted on February 20, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related