Trigger airflow failed action
Developer213
Posted on November 12, 2024
# Install necessary libraries if not already installed
!pip install requests
# Import libraries
import requests
import json
# Set up your Astronomer API URL, DAG ID, and token key
BASE_URL = "https://<your-astronomer-airflow-instance>/api/v1"
DAG_ID = "your_dag_id" # Replace with your DAG ID
TOKEN = "your_token_key" # Replace with your actual API token key
# Configure the authorization headers
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
# Step 1: Retrieve the latest DAG run
dag_run_url = f"{BASE_URL}/dags/{DAG_ID}/dagRuns"
response = requests.get(dag_run_url, headers=headers)
if response.status_code == 200:
# Retrieve the most recent DAG run ID
dag_runs = response.json()
DAG_RUN_ID = dag_runs["dag_runs"][0]["dag_run_id"]
print("Latest DAG Run ID:", DAG_RUN_ID)
else:
print("Failed to retrieve DAG runs:", response.text)
DAG_RUN_ID = None
# Step 2: Trigger a rerun for the failed task if a valid DAG run ID was retrieved
if DAG_RUN_ID:
TASK_ID = "your_failed_task_id" # Replace with the specific failed task ID
task_url = f"{BASE_URL}/dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances/{TASK_ID}/clear"
data = {
"only_failed": True, # Clears only failed tasks
"reset_dag_runs": False, # Keeps other tasks intact
"include_subdags": False,
"include_parentdag": False
}
# Send POST request to rerun the failed task
response = requests.post(task_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("Task rerun triggered successfully.")
else:
print("Failed to rerun task:", response.text)
💖 💪 🙅 🚩
Developer213
Posted on November 12, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.