AutomationFlowsAI & RAG › Iot Sensor Monitoring with Gpt-4o Anomaly Detection, Mqtt & Multi-channel Alerts

Iot Sensor Monitoring with Gpt-4o Anomaly Detection, Mqtt & Multi-channel Alerts

ByTOMOMITSU ASANO @tomo-0310 on n8n.io

{ "name": "IoT Sensor Data Aggregation with AI-Powered Anomaly Detection", "nodes": [ { "parameters": { "content": "## How it works\nThis workflow monitors IoT sensors in real-time. It ingests data via MQTT or a schedule, normalizes the format, and removes duplicates using data…

Event trigger★★★★☆ complexityAI-powered21 nodesMqtt TriggerCryptoAgentGmailSlackGoogle SheetsOpenAI Chat
AI & RAG Trigger: Event Nodes: 21 Complexity: ★★★★☆ AI nodes: yes Added:

This workflow corresponds to n8n.io template #11909 — we link there as the canonical source.

This workflow follows the Agent → Gmail recipe pattern — see all workflows that pair these two integrations.

The workflow JSON

Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →

Download .json
{
  "name": "IoT Sensor Data Aggregation with AI-Powered Anomaly Detection",
  "nodes": [
    {
      "id": "sticky-overview",
      "name": "Sticky Note",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        -352,
        96
      ],
      "parameters": {
        "width": 400,
        "height": 664,
        "content": "## IoT Sensor Data Aggregation with AI-Powered Anomaly Detection\n\n### Overview\nReal-time IoT sensor monitoring system that collects data via MQTT, processes with AI for anomaly detection, and triggers smart alerts.\n\n### Key Features\n- **MQTT Integration**: Real-time sensor data ingestion\n- **AI Agent Analysis**: Dynamic anomaly detection with reasoning\n- **Multi-Threshold Alerts**: Critical/Warning/Info levels\n- **Data Fingerprinting**: SHA256 deduplication\n- **Historical Logging**: Google Sheets archiving\n\n### Required Credentials\n- MQTT Broker credentials\n- OpenAI API key\n- Slack Bot Token\n- Google Sheets OAuth\n- Gmail OAuth\n\n### Trigger Options\n1. MQTT real-time sensor data\n2. Scheduled batch processing\n3. Manual webhook trigger"
      },
      "typeVersion": 1
    },
    {
      "id": "sticky-step1",
      "name": "Sticky Note1",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        112,
        624
      ],
      "parameters": {
        "color": 7,
        "width": 280,
        "height": 140,
        "content": "### Step 1: Data Ingestion\nMultiple trigger sources for flexibility:\n- MQTT for real-time IoT data\n- Schedule for batch processing\n- Webhook for manual triggers"
      },
      "typeVersion": 1
    },
    {
      "id": "sticky-step2",
      "name": "Sticky Note2",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        608,
        608
      ],
      "parameters": {
        "color": 7,
        "width": 280,
        "height": 156,
        "content": "### Step 2: Data Processing\n- Parse sensor payload\n- Add timestamps & metadata\n- Generate content hash for deduplication\n- Filter duplicate readings"
      },
      "typeVersion": 1
    },
    {
      "id": "sticky-step3",
      "name": "Sticky Note3",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        1312,
        688
      ],
      "parameters": {
        "color": 7,
        "width": 280,
        "height": 172,
        "content": "### Step 3: AI Analysis\nAI Agent analyzes sensor data to:\n- Detect anomalies dynamically\n- Provide reasoning for decisions\n- Classify severity levels\n- Suggest corrective actions"
      },
      "typeVersion": 1
    },
    {
      "id": "sticky-step4",
      "name": "Sticky Note4",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        2000,
        736
      ],
      "parameters": {
        "color": 7,
        "width": 280,
        "height": 140,
        "content": "### Step 4: Alert & Archive\n- Route alerts by severity\n- Critical \u2192 Email + Slack\n- Warning \u2192 Slack only\n- Log all data to Sheets"
      },
      "typeVersion": 1
    },
    {
      "id": "mqtt-trigger",
      "name": "MQTT Sensor Trigger",
      "type": "n8n-nodes-base.mqttTrigger",
      "position": [
        112,
        304
      ],
      "parameters": {
        "topics": "sensors/+/data",
        "options": {}
      },
      "typeVersion": 1
    },
    {
      "id": "schedule-trigger",
      "name": "Batch Process Schedule",
      "type": "n8n-nodes-base.scheduleTrigger",
      "position": [
        112,
        480
      ],
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "minutes",
              "minutesInterval": 15
            }
          ]
        }
      },
      "typeVersion": 1.2
    },
    {
      "id": "merge-triggers",
      "name": "Merge Triggers",
      "type": "n8n-nodes-base.merge",
      "position": [
        352,
        384
      ],
      "parameters": {
        "mode": "chooseBranch"
      },
      "typeVersion": 3
    },
    {
      "id": "set-sensor-config",
      "name": "Define Sensor Thresholds",
      "type": "n8n-nodes-base.set",
      "position": [
        560,
        384
      ],
      "parameters": {
        "mode": "raw",
        "options": {},
        "jsonOutput": "{\n  \"thresholds\": {\n    \"temperature\": {\"min\": -10, \"max\": 50, \"unit\": \"C\"},\n    \"humidity\": {\"min\": 20, \"max\": 90, \"unit\": \"%\"},\n    \"pressure\": {\"min\": 950, \"max\": 1050, \"unit\": \"hPa\"},\n    \"co2\": {\"min\": 400, \"max\": 2000, \"unit\": \"ppm\"}\n  },\n  \"alertConfig\": {\n    \"criticalChannel\": \"#iot-critical\",\n    \"warningChannel\": \"#iot-alerts\",\n    \"emailRecipients\": \"user@example.com\"\n  }\n}"
      },
      "typeVersion": 3.4
    },
    {
      "id": "parse-sensor-data",
      "name": "Parse Sensor Payload",
      "type": "n8n-nodes-base.code",
      "position": [
        752,
        384
      ],
      "parameters": {
        "jsCode": "const items = $input.all();\nconst thresholds = $('Define Sensor Thresholds').first().json.thresholds;\nconst results = [];\n\nfor (const item of items) {\n  let sensorData;\n  try {\n    sensorData = typeof item.json.message === 'string' \n      ? JSON.parse(item.json.message) \n      : item.json;\n  } catch (e) {\n    sensorData = item.json;\n  }\n  \n  const now = new Date();\n  const reading = {\n    sensorId: sensorData.sensorId || sensorData.topic?.split('/')[1] || 'unknown',\n    location: sensorData.location || 'Main Facility',\n    timestamp: now.toISOString(),\n    readings: {\n      temperature: sensorData.temperature ?? null,\n      humidity: sensorData.humidity ?? null,\n      pressure: sensorData.pressure ?? null,\n      co2: sensorData.co2 ?? null\n    },\n    metadata: {\n      receivedAt: now.toISOString(),\n      source: item.json.topic || 'batch',\n      thresholds: thresholds\n    }\n  };\n  \n  results.push({ json: reading });\n}\n\nreturn results;"
      },
      "typeVersion": 2
    },
    {
      "id": "generate-hash",
      "name": "Generate Data Fingerprint",
      "type": "n8n-nodes-base.crypto",
      "position": [
        960,
        384
      ],
      "parameters": {
        "type": "SHA256",
        "value": "={{ $json.sensorId + '-' + $json.timestamp + '-' + JSON.stringify($json.readings) }}",
        "dataPropertyName": "dataHash"
      },
      "typeVersion": 1
    },
    {
      "id": "remove-duplicates",
      "name": "Remove Duplicate Readings",
      "type": "n8n-nodes-base.removeDuplicates",
      "position": [
        1152,
        384
      ],
      "parameters": {
        "compare": "selectedFields",
        "options": {},
        "fieldsToCompare": "dataHash"
      },
      "typeVersion": 1
    },
    {
      "id": "ai-agent",
      "name": "AI Anomaly Detector",
      "type": "@n8n/n8n-nodes-langchain.agent",
      "position": [
        1344,
        384
      ],
      "parameters": {
        "text": "=Analyze this IoT sensor reading and determine if there are any anomalies:\n\nSensor ID: {{ $json.sensorId }}\nLocation: {{ $json.location }}\nTimestamp: {{ $json.timestamp }}\n\nReadings:\n- Temperature: {{ $json.readings.temperature }}\u00b0C (Normal: {{ $json.metadata.thresholds.temperature.min }} to {{ $json.metadata.thresholds.temperature.max }})\n- Humidity: {{ $json.readings.humidity }}% (Normal: {{ $json.metadata.thresholds.humidity.min }} to {{ $json.metadata.thresholds.humidity.max }})\n- CO2: {{ $json.readings.co2 }} ppm (Normal: {{ $json.metadata.thresholds.co2.min }} to {{ $json.metadata.thresholds.co2.max }})\n\nProvide your analysis in this exact JSON format:\n{\n  \"hasAnomaly\": true/false,\n  \"severity\": \"critical\"/\"warning\"/\"normal\",\n  \"anomalies\": [\"list of detected issues\"],\n  \"reasoning\": \"explanation of your analysis\",\n  \"recommendation\": \"suggested action\"\n}",
        "options": {
          "systemMessage": "You are an IoT monitoring expert. Analyze sensor data and detect anomalies based on the provided thresholds. Be precise and provide actionable recommendations. Always respond in valid JSON format."
        }
      },
      "typeVersion": 1.7
    },
    {
      "id": "parse-ai-response",
      "name": "Parse AI Analysis",
      "type": "n8n-nodes-base.code",
      "position": [
        1600,
        384
      ],
      "parameters": {
        "jsCode": "const item = $input.first();\nconst originalData = $('Remove Duplicate Readings').first().json;\n\nlet aiAnalysis;\ntry {\n  const responseText = item.json.output || item.json.text || '';\n  const jsonMatch = responseText.match(/\\{[\\s\\S]*\\}/);\n  aiAnalysis = jsonMatch ? JSON.parse(jsonMatch[0]) : {\n    hasAnomaly: false,\n    severity: 'normal',\n    anomalies: [],\n    reasoning: 'Unable to parse AI response',\n    recommendation: 'Manual review required'\n  };\n} catch (e) {\n  aiAnalysis = {\n    hasAnomaly: false,\n    severity: 'normal',\n    anomalies: [],\n    reasoning: 'Parse error: ' + e.message,\n    recommendation: 'Manual review required'\n  };\n}\n\nreturn [{\n  json: {\n    ...originalData,\n    analysis: aiAnalysis,\n    alertLevel: aiAnalysis.severity,\n    requiresAlert: aiAnalysis.hasAnomaly && aiAnalysis.severity !== 'normal'\n  }\n}];"
      },
      "typeVersion": 2
    },
    {
      "id": "switch-severity",
      "name": "Route by Severity",
      "type": "n8n-nodes-base.switch",
      "position": [
        1808,
        384
      ],
      "parameters": {
        "rules": {
          "values": [
            {
              "outputKey": "Critical",
              "conditions": {
                "options": {
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "critical",
                    "operator": {
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.alertLevel }}",
                    "rightValue": "critical"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "Warning",
              "conditions": {
                "options": {
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "warning",
                    "operator": {
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.alertLevel }}",
                    "rightValue": "warning"
                  }
                ]
              },
              "renameOutput": true
            }
          ]
        },
        "options": {
          "fallbackOutput": "extra"
        }
      },
      "typeVersion": 3.2
    },
    {
      "id": "send-critical-email",
      "name": "Send Critical Email",
      "type": "n8n-nodes-base.gmail",
      "position": [
        2080,
        208
      ],
      "parameters": {
        "sendTo": "={{ $('Define Sensor Thresholds').first().json.alertConfig.emailRecipients }}",
        "message": "=CRITICAL IoT SENSOR ALERT\n\nSensor: {{ $json.sensorId }}\nLocation: {{ $json.location }}\nTime: {{ $json.timestamp }}\n\nReadings:\n- Temperature: {{ $json.readings.temperature }}\u00b0C\n- Humidity: {{ $json.readings.humidity }}%\n- CO2: {{ $json.readings.co2 }} ppm\n\nAI Analysis:\n{{ $json.analysis.reasoning }}\n\nDetected Issues:\n{{ $json.analysis.anomalies.join('\\n- ') }}\n\nRecommendation:\n{{ $json.analysis.recommendation }}",
        "options": {},
        "subject": "=CRITICAL IoT Alert: {{ $json.sensorId }} - {{ $json.analysis.anomalies[0] || 'Anomaly Detected' }}"
      },
      "typeVersion": 2.1
    },
    {
      "id": "send-critical-slack",
      "name": "Slack Critical Alert",
      "type": "n8n-nodes-base.slack",
      "position": [
        2080,
        368
      ],
      "parameters": {
        "text": "=\ud83d\udea8 *CRITICAL IoT ALERT*\n\n*Sensor:* {{ $json.sensorId }}\n*Location:* {{ $json.location }}\n\n*Readings:*\n\u2022 Temperature: {{ $json.readings.temperature }}\u00b0C\n\u2022 Humidity: {{ $json.readings.humidity }}%\n\u2022 CO2: {{ $json.readings.co2 }} ppm\n\n*AI Analysis:* {{ $json.analysis.reasoning }}\n*Recommendation:* {{ $json.analysis.recommendation }}",
        "select": "channel",
        "channelId": {
          "__rl": true,
          "mode": "name",
          "value": "#iot-critical"
        },
        "otherOptions": {}
      },
      "typeVersion": 2.2
    },
    {
      "id": "send-warning-slack",
      "name": "Slack Warning Alert",
      "type": "n8n-nodes-base.slack",
      "position": [
        2080,
        528
      ],
      "parameters": {
        "text": "=\u26a0\ufe0f *IoT Warning*\n\n*Sensor:* {{ $json.sensorId }} | *Location:* {{ $json.location }}\n*Issue:* {{ $json.analysis.anomalies[0] || 'Threshold approaching' }}\n*Recommendation:* {{ $json.analysis.recommendation }}",
        "select": "channel",
        "channelId": {
          "__rl": true,
          "mode": "name",
          "value": "#iot-alerts"
        },
        "otherOptions": {}
      },
      "typeVersion": 2.2
    },
    {
      "id": "merge-alerts",
      "name": "Merge Alert Outputs",
      "type": "n8n-nodes-base.merge",
      "position": [
        2336,
        384
      ],
      "parameters": {},
      "typeVersion": 3
    },
    {
      "id": "log-to-sheets",
      "name": "Archive to Google Sheets",
      "type": "n8n-nodes-base.googleSheets",
      "position": [
        2512,
        384
      ],
      "parameters": {
        "operation": "append",
        "sheetName": {
          "__rl": true,
          "mode": "list",
          "value": ""
        },
        "documentId": {
          "__rl": true,
          "mode": "list",
          "value": ""
        }
      },
      "typeVersion": 4.5
    },
    {
      "id": "openai-model",
      "name": "OpenAI Chat Model",
      "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
      "position": [
        1344,
        544
      ],
      "parameters": {
        "model": "gpt-4o-mini",
        "options": {
          "temperature": 0.3
        }
      },
      "typeVersion": 1.2
    }
  ],
  "settings": {
    "executionOrder": "v1"
  },
  "connections": {
    "Merge Triggers": {
      "main": [
        [
          {
            "node": "Define Sensor Thresholds",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "OpenAI Chat Model": {
      "ai_languageModel": [
        [
          {
            "node": "AI Anomaly Detector",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "Parse AI Analysis": {
      "main": [
        [
          {
            "node": "Route by Severity",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Route by Severity": {
      "main": [
        [
          {
            "node": "Send Critical Email",
            "type": "main",
            "index": 0
          },
          {
            "node": "Slack Critical Alert",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Slack Warning Alert",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Merge Alert Outputs",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "AI Anomaly Detector": {
      "main": [
        [
          {
            "node": "Parse AI Analysis",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "MQTT Sensor Trigger": {
      "main": [
        [
          {
            "node": "Merge Triggers",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Merge Alert Outputs": {
      "main": [
        [
          {
            "node": "Archive to Google Sheets",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Send Critical Email": {
      "main": [
        [
          {
            "node": "Merge Alert Outputs",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Slack Warning Alert": {
      "main": [
        [
          {
            "node": "Merge Alert Outputs",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Parse Sensor Payload": {
      "main": [
        [
          {
            "node": "Generate Data Fingerprint",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Slack Critical Alert": {
      "main": [
        [
          {
            "node": "Merge Alert Outputs",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Batch Process Schedule": {
      "main": [
        [
          {
            "node": "Merge Triggers",
            "type": "main",
            "index": 1
          }
        ]
      ]
    },
    "Define Sensor Thresholds": {
      "main": [
        [
          {
            "node": "Parse Sensor Payload",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Generate Data Fingerprint": {
      "main": [
        [
          {
            "node": "Remove Duplicate Readings",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Remove Duplicate Readings": {
      "main": [
        [
          {
            "node": "AI Anomaly Detector",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  }
}
Pro

For the full experience including quality scoring and batch install features for each workflow upgrade to Pro

About this workflow

{ "name": "IoT Sensor Data Aggregation with AI-Powered Anomaly Detection", "nodes": [ { "parameters": { "content": "## How it works\nThis workflow monitors IoT sensors in real-time. It ingests data via MQTT or a schedule, normalizes the format, and removes duplicates using data…

Source: https://n8n.io/workflows/11909/ — original creator credit. Request a take-down →

More AI & RAG workflows → · Browse all categories →

Related workflows

Workflows that share integrations, category, or trigger type with this one. All free to copy and import.

AI & RAG

This workflow contains community nodes that are only compatible with the self-hosted version of n8n.

Google Sheets, Form Trigger, Output Parser Structured +7
AI & RAG

Gmail users report spending significant time manually sorting email, so this tool helps alleviate that burden. Gmail Trigger monitors unread emails every 2 minutes Once an email arrives, the content i

OpenAI Chat, Output Parser Structured, Agent +4
AI & RAG

Streamline your HR recruitment process with this intelligent automation that reads candidate emails and resumes, analyzes them using GPT-4, and automatically shortlists or rejects applicants based on

Gmail, Gmail Trigger, HTTP Request +7
AI & RAG

This workflow automates market-driven content ideation by continuously discovering real user pain points from public discussions and converting them into execution-ready content ideas. It is designed

Error Trigger, Gmail, Agent +5
AI & RAG

📘 Description This workflow automates keyword-level SEO research and content opportunity discovery using live Google SERP data and AI-driven analysis. It takes a single keyword request, pulls real-tim

OpenAI Chat, Output Parser Structured, Error Trigger +5