AutomationFlowsGeneral › Monitor Airflow DAG Runs

Monitor Airflow DAG Runs

Original n8n title: Airflow Dag_run

airflow dag_run. Uses httpRequest, stopAndError, executeWorkflowTrigger. Event-driven trigger; 12 nodes.

Event trigger★★★★☆ complexity12 nodesHTTP RequestStop And ErrorExecute Workflow Trigger
General Trigger: Event Nodes: 12 Complexity: ★★★★☆ Added:

This workflow follows the Execute Workflow Trigger → HTTP Request recipe pattern — see all workflows that pair these two integrations.

The workflow JSON

Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →

Download .json
{
  "id": "Y5URlIlbX4HDzWKA",
  "meta": {
    "templateCredsSetupCompleted": true
  },
  "name": "airflow dag_run",
  "tags": [
    {
      "id": "YSelDQ3zfWB0LeVn",
      "name": "airflow",
      "createdAt": "2025-02-25T04:17:21.943Z",
      "updatedAt": "2025-02-25T04:17:21.943Z"
    }
  ],
  "nodes": [
    {
      "id": "0d4457ef-7a88-4755-8bd2-b0e8148f86f4",
      "name": "Airflow: dag_run",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        380,
        -40
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns",
        "method": "POST",
        "options": {},
        "jsonBody": "={\n  \"conf\": {{ $('in data').item.json.conf }}\n}",
        "sendBody": true,
        "specifyBody": "json",
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "name": "<your credential>"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "acf477a0-aad5-402a-a5a0-543f3e277333",
      "name": "Airflow: dag_run - state",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        840,
        60
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}",
        "options": {},
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "name": "<your credential>"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "26982a6f-6281-4140-a05c-ea6f3f2c0cb2",
      "name": "count",
      "type": "n8n-nodes-base.code",
      "position": [
        1180,
        40
      ],
      "parameters": {
        "jsCode": "try {\n  $('count').first().json.count += 1\n  return {count:$('count').first().json.count};\n}\ncatch (error) {\n  return {count:1};\n}"
      },
      "typeVersion": 2
    },
    {
      "id": "613718f6-ba7e-415c-8e07-0123224e1ac6",
      "name": "dag run fail",
      "type": "n8n-nodes-base.stopAndError",
      "position": [
        1180,
        400
      ],
      "parameters": {
        "errorMessage": "dag run fail"
      },
      "typeVersion": 1
    },
    {
      "id": "66ba0e85-4608-418b-b27b-8cbc50f78319",
      "name": "if state == queued",
      "type": "n8n-nodes-base.if",
      "position": [
        520,
        60
      ],
      "parameters": {
        "options": {},
        "conditions": {
          "options": {
            "version": 2,
            "leftValue": "",
            "caseSensitive": true,
            "typeValidation": "strict"
          },
          "combinator": "and",
          "conditions": [
            {
              "id": "0ae67986-09c0-443d-9262-075bfe94ca2e",
              "operator": {
                "name": "filter.operator.equals",
                "type": "string",
                "operation": "equals"
              },
              "leftValue": "={{ $json.state }}",
              "rightValue": "queued"
            }
          ]
        }
      },
      "typeVersion": 2.2
    },
    {
      "id": "92176e9a-0ea7-48b0-9ca0-ea4ea8442cee",
      "name": "dag run wait too long",
      "type": "n8n-nodes-base.stopAndError",
      "position": [
        1500,
        40
      ],
      "parameters": {
        "errorMessage": "dag run wait too long"
      },
      "typeVersion": 1
    },
    {
      "id": "6a05471f-232e-44d6-9dbb-583400537507",
      "name": "Airflow: dag_run - get result",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        1180,
        -140
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}/taskInstances/{{ $('in data').item.json.task_id }}/xcomEntries/return_value",
        "options": {},
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "name": "<your credential>"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "fb2211d5-cef2-4be2-b281-de315aa07093",
      "name": "Switch: state",
      "type": "n8n-nodes-base.switch",
      "position": [
        980,
        -40
      ],
      "parameters": {
        "rules": {
          "values": [
            {
              "outputKey": "=success",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "4d4ecf8a-c02b-4722-9528-1919cdf9b83e",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "success"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "queued",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "operator": {
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "queued"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "running",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "fa5d8524-334a-4ab1-b543-bba75cafd0ec",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "running"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "failed",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "dd853677-c51c-4c06-8680-3c9d1829d6bd",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "failed"
                  }
                ]
              },
              "renameOutput": true
            }
          ]
        },
        "options": {
          "fallbackOutput": 3
        }
      },
      "typeVersion": 3.2
    },
    {
      "id": "5941496a-a64d-432c-9103-e7bcae4b8d67",
      "name": "in data",
      "type": "n8n-nodes-base.executeWorkflowTrigger",
      "position": [
        100,
        -40
      ],
      "parameters": {
        "workflowInputs": {
          "values": [
            {
              "name": "dag_id"
            },
            {
              "name": "task_id"
            },
            {
              "name": "conf"
            },
            {
              "name": "wait",
              "type": "number"
            },
            {
              "name": "wait_time",
              "type": "number"
            }
          ]
        }
      },
      "typeVersion": 1.1
    },
    {
      "id": "e77fed4a-b61a-4126-8be3-43ef8a832cfe",
      "name": "Wait",
      "type": "n8n-nodes-base.wait",
      "position": [
        700,
        -40
      ],
      "parameters": {
        "amount": "={{ $('in data').item.json.hasOwnProperty('wait') ? $('in data').item.json.wait : 10 }}"
      },
      "typeVersion": 1.1
    },
    {
      "id": "8ffae842-4400-4667-85bb-50644ef10fc0",
      "name": "If count > wait_time",
      "type": "n8n-nodes-base.if",
      "position": [
        1320,
        140
      ],
      "parameters": {
        "options": {},
        "conditions": {
          "options": {
            "version": 2,
            "leftValue": "",
            "caseSensitive": true,
            "typeValidation": "strict"
          },
          "combinator": "and",
          "conditions": [
            {
              "id": "1829d538-5633-4f5c-ad1b-285b084b35ee",
              "operator": {
                "type": "number",
                "operation": "gt"
              },
              "leftValue": "={{ $json.count }}",
              "rightValue": "={{ $('in data').item.json.hasOwnProperty('wait_time') ? $('in data').item.json.wait_time : 12 }}"
            }
          ]
        }
      },
      "typeVersion": 2.2
    },
    {
      "id": "c49d4a1b-6f25-471b-9c21-d3defb709dda",
      "name": "airflow-api",
      "type": "n8n-nodes-base.set",
      "position": [
        240,
        60
      ],
      "parameters": {
        "options": {},
        "assignments": {
          "assignments": [
            {
              "id": "40a5ab5b-dee1-44c4-910a-d6b85af75aed",
              "name": "prefix",
              "type": "string",
              "value": "https://airflow.example.com"
            }
          ]
        }
      },
      "typeVersion": 3.4
    }
  ],
  "active": false,
  "settings": {
    "executionOrder": "v1"
  },
  "versionId": "57fdbcfc-7950-4aff-9ac7-3c2a99a2c8c8",
  "connections": {
    "Wait": {
      "main": [
        [
          {
            "node": "Airflow: dag_run - state",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "count": {
      "main": [
        [
          {
            "node": "If count > wait_time",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "in data": {
      "main": [
        [
          {
            "node": "airflow-api",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "airflow-api": {
      "main": [
        [
          {
            "node": "Airflow: dag_run",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Switch: state": {
      "main": [
        [
          {
            "node": "Airflow: dag_run - get result",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "count",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "count",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "dag run fail",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run": {
      "main": [
        [
          {
            "node": "if state == queued",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "if state == queued": {
      "main": [
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "dag run fail",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "If count > wait_time": {
      "main": [
        [
          {
            "node": "dag run wait too long",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run - state": {
      "main": [
        [
          {
            "node": "Switch: state",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run - get result": {
      "main": [
        []
      ]
    }
  }
}

Credentials you'll need

Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.

Pro

For the full experience including quality scoring and batch install features for each workflow upgrade to Pro

How this works

This workflow automates the monitoring and management of Apache Airflow DAG runs, ensuring your data pipelines execute reliably without manual oversight. It is ideal for data engineers and DevOps teams handling scheduled tasks or ETL processes, providing peace of mind by alerting on failures or delays. The key step involves using HTTP requests to query the Airflow API for DAG run status, followed by conditional checks via code and if nodes to validate completion, with stopAndError nodes halting execution if issues arise.

Use this workflow when integrating Airflow into event-driven automations, such as triggering downstream tasks only after a DAG succeeds, or for real-time pipeline health checks in production environments. Avoid it for simple, non-API-accessible Airflow setups or when basic scheduling suffices without custom error handling. Common variations include adding email notifications via integrations like Send Email for alerts, or extending waits for longer-running DAGs with adjustable timeouts.

About this workflow

airflow dag_run. Uses httpRequest, stopAndError, executeWorkflowTrigger. Event-driven trigger; 12 nodes.

Source: https://github.com/Zie619/n8n-workflows — original creator credit. Request a take-down →

More General workflows → · Browse all categories →

Related workflows

Workflows that share integrations, category, or trigger type with this one. All free to copy and import.

General

Generate Leads with Google Maps - AlexK1919. Uses manualTrigger, scheduleTrigger, executeWorkflowTrigger, stopAndError. Event-driven trigger; 42 nodes.

Execute Workflow Trigger, Stop And Error, HTTP Request +1
General

Stopanderror. Uses manualTrigger, stickyNote, executeWorkflowTrigger, nextCloud. Event-driven trigger; 32 nodes.

Execute Workflow Trigger, Next Cloud, Stop And Error
General

Http Stickynote. Uses httpRequest, executeWorkflowTrigger, stickyNote, sendInBlue. Event-driven trigger; 22 nodes.

HTTP Request, Execute Workflow Trigger, Sendinblue
General

Manipulate Pdf With Adobe Developer Api. Uses manualTrigger, httpRequest, executeWorkflowTrigger, stickyNote. Event-driven trigger; 20 nodes.

HTTP Request, Execute Workflow Trigger, Dropbox
General

[2/2] KNN classifier (lands dataset). Uses httpRequest, stickyNote, executeWorkflowTrigger. Event-driven trigger; 18 nodes.

HTTP Request, Execute Workflow Trigger