A No-code workflow (DAG) executor

yohamta

Yota Hamada

Posted on May 1, 2022

A No-code workflow (DAG) executor

About

We built a tool, Dagu, that can execute DAGs (Directed acyclic graph) from declarative YAML definitions. Dagu also comes with a web UI for visualizing workflows.

In this article, I would like to explain why we created it, and how it works.

GIF

Why not Airflow or Prefect?

Airflow and Prefect are powerful and valuable tools, but they require writing Python code to manage workflows. Our ETL pipeline is already hundreds of thousands of lines of complex code in Perl and shell scripts. Adding another layer of Python on top of this would make it even more complicated. Instead, we needed a more lightweight solution. So we have developed a No-code workflow execution engine that doesn't require writing code.

How does it work?

  • Dagu is a single command and it uses the file system to store data in JSON format. Therefore, no DBMS or cloud service is required.
  • Dagu executes DAGs defined in declarative YAML format. Existing programs can be used without any modification.

📖 Usage

  • dagu start [--params=<params>] <file> - run a DAG
  • dagu status <file> - display the current status of the DAG
  • dagu retry --req=<request-id> <file> - retry the failed/canceled DAG
  • dagu stop <file> - stop a DAG execution by sending a TERM signal
  • dagu dry [--params=<params>] <file> - dry-run a DAG
  • dagu server - start a web server for web UI

User interfaces

The web UI can be started by dagu server command

  • DAGs: DAGs page displays all workflows and real-time status. To create a new workflow, you can click the button in the top-right corner.

DAGs

  • Detail: The detail page displays the real-time status, logs, and all workflow configurations.

Detail

  • History: The history page allows you to check past execution results and logs.

History

How to try

1. Installation

Download the latest binary from the Releases page and place it in your $PATH. For example, you can download it in /usr/local/bin.

2. Download an example DAG definition

Download this example and place it in the current directory with extension *.yaml.

3. Start Web UI server

Start the server with dagu server and browse to http://127.0.0.1:8000 to explore the Web UI.

4. Running the DAG

You can start the example DAG from the Web UI by submitting Start button on the top right corner of the UI.

example

Architecture

It uses plain JSON files as a history database, and Unix sockets to communicate with running processes.

dagu Architecture

How to define DAG in YAML files

Minimal

A minimal DAG definition is as simple as:



name: minimal configuration          # DAG's name
steps:                               # Steps inside the DAG
  - name: step 1                     # Step's name (should be unique within the file)
    command: python main_1.py        # Command and arguments to execute
  - name: step 2
    command: python main_2.py
    depends:
      - step 1                       # [optional] Name of the step to depend on


Enter fullscreen mode Exit fullscreen mode

Using environment variables

Environment variables can be defined and used throughout the file using env field.



name: example
env:
  SOME_DIR: ${HOME}/batch
steps:
  - name: some task in some dir
    dir: ${SOME_DIR}
    command: python main.py


Enter fullscreen mode Exit fullscreen mode

Using DAG parameters

Parameters can be defined and referenced throughout a file using params field. Each parameter can be referenced as $1, $2, etc. Parameters can also be command substitutions or environment variables. You can override the default values of the parameters with the --params= parameter of the start command.



name: example
params: param1 param2
steps:
  - name: some task with parameters
    command: python main.py $1 $2


Enter fullscreen mode Exit fullscreen mode

Using command substitution

You can use command substitution in field values. A string enclosed in backquotes (`) is evaluated as a command and replaced with the result of standard output.



name: minimal configuration          
env:
  TODAY: "`date '+%Y%m%d'`"
steps:                               
  - name: hello
    command: "echo hello, today is ${TODAY}"


Enter fullscreen mode Exit fullscreen mode

All available fields

All of the following settings are available. By combining settings, you have granular control over how the workflow runs.



name: all configuration              # DAG's name
description: run a DAG               # DAG's description
env:                                 # Environment variables
  LOG_DIR: ${HOME}/logs
  PATH: /usr/local/bin:${PATH}
logDir: ${LOG_DIR}                   # Log directory to write standard output
histRetentionDays: 3                 # Execution history retention days (not for log files)
delaySec: 1                          # Interval seconds between steps
maxActiveRuns: 1                     # Max parallel number of running step
params: param1 param2                # Default parameters for the DAG that can be referred to by $1, $2, and so on
preconditions:                       # Precondisions for whether the DAG is allowed to run
  - condition: "`echo 1`"            # Command or variables to evaluate
    expected: "1"                    # Expected value for the condition
mailOn:
  failure: true                      # Send a mail when the DAG failed
  success: true                      # Send a mail when the DAG finished
MaxCleanUpTimeSec: 300               # The maximum amount of time to wait after sending a TERM signal to running steps before killing them
handlerOn:                           # Handler on Success, Failure, Cancel, Exit
  success:                           
    command: "echo succeed"          # Command to execute when the DAG execution succeed
  failure:                           
    command: "echo failed"           # Command to execute when the DAG execution failed
  cancel:                            
    command: "echo canceled"         # Command to execute when the DAG execution canceled
  exit:                              
    command: "echo finished"         # Command to execute when the DAG execution finished
steps:
  - name: som task                   # Step's name
    description: some task           # Step's description
    dir: ${HOME}/logs                # Working directory
    command: python main.py $1       # Command and parameters
    mailOn:
      failure: true                  # Send a mail when the step failed
      success: true                  # Send a mail when the step finished
    continueOn:
      failed: true                   # Continue to the next regardless of the step failed or not
      skipped: true                  # Continue to the next regardless the preconditions are met or not 
    retryPolicy:                     # Retry policy for the step
      limit: 2                       # Retry up to 2 times when the step failed
    repeatPolicy:                    # Repeat policy for the step
      repeat: true                   # Boolean whether to repeat this step
      intervalSec: 60                # Interval time to repeat the step in seconds
    preconditions:                   # Precondisions for whether the step is allowed to run
      - condition: "`echo 1`"        # Command or variables to evaluate
        expected: "1"                # Expected Value for the condition


Enter fullscreen mode Exit fullscreen mode

The global configuration file ~/.dagu/config.yaml is useful to gather common settings, such as the directory to write log files.

Examples

A simple DAG

sample_job_1_dag



name: A sample job             
steps:                         
  - name: "1"                  
    command: echo hello world  
  - name: "2"
    command: sleep 10
    depends:
      - "1"                    
  - name: "3"
    command: echo done!
    depends:
      - "2"


Enter fullscreen mode Exit fullscreen mode

The execution result will be something like this.

sample_job_1

More complex example

you can define more complex DAG using like this:

complex_dag



name: "multiple steps"
steps:
- name: "Initialize"
command: "sleep 2"
- name: "Copy TAB_1"
description: "Extract data from TAB_1 to TAB_2"
command: "sleep 2"
depends:
- "Initialize"
- name: "Update TAB_2"
description: "Update TAB_2"
command: "sleep 2"
depends:
- Copy TAB_1
- name: Validate TAB_2
command: "sleep 2"
depends:
- "Update TAB_2"
- name: "Load TAB_3"
description: "Read data from files"
command: "sleep 2"
depends:
- Initialize
- name: "Update TAB_3"
command: "sleep 2"
depends:
- "Load TAB_3"
- name: Merge
command: "sleep 2"
depends:
- Update TAB_3
- Validate TAB_2
- Validate File
- name: "Check File"
command: "sleep 2"
- name: "Copy File"
command: "sleep 2"
depends:
- Check File
- name: "Validate File"
command: "sleep 2"
depends:
- Copy File
- name: Calc Result
command: "sleep 2"
depends:
- Merge
- name: "Report"
command: "sleep 2"
depends:
- Calc Result
- name: Reconcile
command: "sleep 2"
depends:
- Calc Result
- name: "Cleaning"
command: "sleep 2"
depends:
- Reconcile

Enter fullscreen mode Exit fullscreen mode




About the future development

We are actually using this tool to improve our ETL pipeline. We will continue to add various features and improvements as continue our developments.

Please give it a star if you like! Also, feel free to contribute in any way you want. Share ideas, submit issues, create pull requests. Thank you!

https://github.com/yohamta/dagu

💖 💪 🙅 🚩
yohamta
Yota Hamada

Posted on May 1, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related