Comparing results between Airflow runs
Dani Hodovic
Posted on January 10, 2022
The Airbnb engineering team built Airflow to automate workflows related to data engineering problems.
Given how generic the tool is, it can be used for right about anything that needs to be run on a regular schedule. Think of it as cron on steroids, but with better interoperability with the outside world (integrations with third party providers) and a better debugging experience (built in UI, metadata and logs).
I use it to automate a various tasks, for example:
- I track stock and BTC prices and notify when the price fluctuates above a certain percentage
- I send myself a notification when one of my Github repositories gains a star
- I monitor the air pollution in the city during winter
Airflow doesn't automatically store results between DAG runs (DAGs are a fancy word for Python scripts). It stores logs, success status and other metadata, but doesn't provide a trivial way to store custom data between script executions.
Airflow does store the dag run metadata in serial fashion and the metadata contains execution date. We can use it to construct a key ($DAG_ID__$DATE) which points to custom data. I use Redis in the examples below, but any key value store will do.
To get historical DAG runs we use the Airflow internals:
def get_previous_dagrun():
from airflow.models.dagrun import DagRun, DagRunState
# Use the context to dynamically find the dag name. Otherwise you can just
# use the name of the dag as it's displayed in the Airflow UI.
ctx = get_current_context()
dag_id = ctx["dag"].dag_id
# DagRun.find returns dag metadata ordered by execution date. The last
# entry corresponds to the last dag run
dag_runs = DagRun.find(dag_id=dag_id, state=DagRunState.SUCCESS)
if len(dag_runs) >= 1:
return dag_runs[-1]
To construct a key based on the dag name and runtime:
def create_key(run_time):
return f"airflow:github_stars:{run_time}"
last_run = get_previous_dagrun()
last_run_id = last_run.run_id if last_run else None
old_key = create_key(last_run_id)
# old_key = '"airflow:github_stars:scheduled__2022-01-01T08:00:00+00:00"'
Below is a full example of a script I use to check new Github stars for my repositories. It uses the Github API and Redis to store the number of stars for each day. If a repository gains a star a notification is sent to my Pushbullet (mobile) device.
import logging
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.providers.redis.hooks.redis import RedisHook
@dag(
schedule_interval="0 8 * * *",
start_date=datetime(2021, 1, 1),
catchup=False,
)
def github_stars_notifier():
import requests
from pushbullet import Pushbullet
from airflow.models import Variable
http = requests.Session()
def read_most_popular_repos():
res = http.get(
"https://api.github.com/users/danihodovic/repos?per_page=100&sort=pushed"
)
res.raise_for_status()
by_stars = sorted(res.json(), key=lambda d: d["stargazers_count"], reverse=True)
return by_stars
def notify_pushbullet(repo_name, delta, stargazers_count):
pb_token = Variable.get("pushbullet_token")
verb = "gained" if delta > 0 else "lost"
message = (
f"{repo_name} {verb} {delta} stars ★. It now has {stargazers_count} stars"
)
pb = Pushbullet(pb_token)
pb.push_note("Airflow notification ★", message)
def create_key(repo_name, run_time):
return f"airflow:github_stars:{repo_name}:{run_time}"
def get_previous_dagrun():
from airflow.models.dagrun import DagRun, DagRunState
ctx = get_current_context()
dag_runs = DagRun.find(dag_id=ctx["dag"].dag_id, state=DagRunState.SUCCESS)
if len(dag_runs) >= 1:
return dag_runs[-1]
@task()
def check_for_new_stars():
ctx = get_current_context()
redis = RedisHook("redis_github_stars_dag").get_conn()
by_stars = read_most_popular_repos()[0:10]
for repo in by_stars:
repo_name = repo["full_name"]
last_run = get_previous_dagrun()
last_run_id = last_run.run_id if last_run else None
old_key = create_key(repo_name, last_run_id)
stargazers_count = repo["stargazers_count"]
cached_value_str = redis.get(old_key)
if cached_value_str:
cached_value = int(cached_value_str.decode())
delta = stargazers_count - cached_value
logging.info(
f"{repo_name=} {cached_value=} {stargazers_count=} {delta=}"
)
if delta != 0:
notify_pushbullet(repo_name, delta, stargazers_count)
# Update the new value
new_key = create_key(repo_name, ctx["run_id"])
# Expire the key to avoid flooding the Redis DB
redis.set(new_key, stargazers_count, keepttl=604800)
check_for_new_stars()
instance = github_stars_notifier()
Posted on January 10, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.