AutomationFlowsAI & RAG › Automated Feedback & Error Alerts

Automated Feedback & Error Alerts

Original n8n title: W8 - Ops (feedback Cron + Errors + Alerts)

W8 - OPS (Feedback Cron + Errors + Alerts). Uses scheduleTrigger, postgres, errorTrigger, splitInBatches. Scheduled trigger; 31 nodes.

Cron / scheduled trigger★★★★★ complexity31 nodesPostgresError Trigger
AI & RAG Trigger: Cron / scheduled Nodes: 31 Complexity: ★★★★★ Added:

The workflow JSON

Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →

Download .json
{
  "name": "W8 - OPS (Feedback Cron + Errors + Alerts)",
  "active": false,
  "settings": {
    "executionTimeout": 300,
    "saveExecutionProgress": true,
    "saveManualExecutions": true
  },
  "nodes": [
    {
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "minutes",
              "minutesInterval": 5
            }
          ]
        }
      },
      "id": "dc4d7f7d-2708-45da-827c-9930815741ab",
      "name": "S1 - Every 5 min",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1,
      "position": [
        -2400,
        -200
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "SELECT COALESCE(jsonb_agg(row_to_json(t)), '[]'::jsonb) AS due\n    FROM (\n      SELECT id, channel, user_id, restaurant_id, message_text\n      FROM feedback_jobs\n      WHERE status='PENDING' AND scheduled_at <= now()\n      ORDER BY scheduled_at ASC\n      LIMIT 50\n    ) t;"
      },
      "id": "c0fedec7-27a7-4fe1-b5d6-21e5e79a59e9",
      "name": "S2 - Load Due Feedback",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        -2150,
        -200
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const row = $json;\nconst due = Array.isArray(row.due) ? row.due : [];\nconst results = [];\n\nconst cfg = {\n  whatsapp: { url: $env.WA_SEND_URL, token: $env.WA_API_TOKEN },\n  instagram:{ url: $env.IG_SEND_URL, token: $env.IG_API_TOKEN },\n  messenger:{ url: $env.MSG_SEND_URL, token: $env.MSG_API_TOKEN }\n};\n\nfor (const job of due) {\n  const ch = (job.channel || '').toString();\n  const url = (cfg[ch]?.url || '').toString();\n  const token = (cfg[ch]?.token || '').toString();\n\n  if (!url) {\n    results.push({id: job.id, ok:false, reason:'send_url_missing'});\n    continue;\n  }\n\n  try {\n    await $httpRequest({\n      method:'POST',\n      url,\n      headers: token ? {Authorization:'Bearer '+token} : {},\n      body: { channel: ch, to: job.user_id, restaurantId: job.restaurant_id, text: job.message_text, buttons: [{id:'FEEDBACK_5',title:'\u2b50 5'},{id:'FEEDBACK_4',title:'\u2b50 4'},{id:'FEEDBACK_3',title:'\u2b50 3'}]},\n      json:true,\n      timeout: 30000\n    });\n    results.push({id: job.id, ok:true});\n  } catch (err) {\n    results.push({id: job.id, ok:false, error: err?.message || 'send_failed'});\n  }\n}\n\nreturn results.map(r => ({json: r}));"
      },
      "id": "4300d758-522b-4802-90c3-6a7c22f3f116",
      "name": "S3 - Send Feedback Batch",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        -1920,
        -200
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "UPDATE feedback_jobs\n    SET status = CASE WHEN $2::boolean THEN 'SENT' ELSE 'PENDING' END,\n        sent_at = CASE WHEN $2::boolean THEN now() ELSE sent_at END,\n        last_error = CASE WHEN $2::boolean THEN NULL ELSE COALESCE($3,'send_failed') END\n    WHERE id = $1::uuid\n    RETURNING id, status;",
        "additionalFields": {
          "queryParams": "={{[$json.id, $json.ok, $json.error]}}"
        }
      },
      "id": "dbc6b5fd-98da-4efa-a0e1-f6110ec46513",
      "name": "S4 - Mark Feedback Status",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        -1700,
        -200
      ]
    },
    {
      "parameters": {},
      "id": "c8c1f04e-61c6-4199-88ab-7931e9c2cb33",
      "name": "E1 - On Error",
      "type": "n8n-nodes-base.errorTrigger",
      "typeVersion": 1,
      "position": [
        -2400,
        200
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const e = $json;\nconst out = {\n  workflow: e.workflow?.name || e.workflow || '',\n  node: e.node?.name || e.node || '',\n  message: e.error?.message || e.message || '',\n  stack: e.error?.stack || e.stack || '',\n  time: new Date().toISOString(),\n  executionId: e.execution?.id || e.executionId || ''\n};\nreturn [{json: out}];"
      },
      "id": "28f89712-2a5f-436b-9aa7-758e439a5d5d",
      "name": "E2 - Normalize Error",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        -2150,
        200
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "INSERT INTO workflow_errors (workflow_name, node_name, error_message, stack, execution_id)\n    VALUES ($1,$2,$3,$4,$5)\n    RETURNING id;",
        "additionalFields": {
          "queryParams": "={{[$json.workflow,$json.node,$json.message,$json.stack,$json.executionId]}}"
        }
      },
      "id": "e8d4e616-713d-4964-acb9-8695782a2822",
      "name": "E3 - Save Error (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        -1920,
        200
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const url = ($env.ALERT_WEBHOOK_URL || '').toString();\nif (!url) return [{json:{ok:true, alerted:false, reason:'no_webhook_url'}}];\n\n// For SLO checks, respect computed gating to avoid spam/cooldown\nif ($json && $json.kind === 'slo_check' && $json.external_alert === false) {\n  return [{json:{ok:true, alerted:false, reason:'slo_gated'}}];\n}\n\ntry{\n  await $httpRequest({method:'POST',url,body:$json,json:true,timeout:15000});\n  return [{json:{ok:true, alerted:true}}];\n}catch(err){\n  return [{json:{ok:false, alerted:false, error: err?.message || 'alert_failed'}}];\n}\n"
      },
      "id": "01efeee8-e5ea-4aef-b710-8103e4477748",
      "name": "E4 - Optional Alert Webhook",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        -1700,
        200
      ]
    },
    {
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "minutes",
              "value": 1
            }
          ]
        }
      },
      "id": "962294af-aadb-462e-8cef-1604beabf400",
      "name": "O1 - Every 1 min",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1,
      "position": [
        -360,
        260
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "WITH picked AS (\n  SELECT outbound_id\n  FROM outbound_messages\n  WHERE status IN ('PENDING','RETRY')\n    AND next_retry_at <= now()\n  ORDER BY next_retry_at ASC\n  LIMIT 20\n  FOR UPDATE SKIP LOCKED\n)\nUPDATE outbound_messages m\n   SET status='RETRY',\n       updated_at=now()\n  FROM picked\n WHERE m.outbound_id = picked.outbound_id\nRETURNING m.*;",
        "additionalFields": {}
      },
      "id": "215eed60-e2e2-4887-bb5e-4e3a9d7ec0d3",
      "name": "O2 - Pick Due Outbox (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        -140,
        260
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const row = $json;\n\nconst maxAttempts = Number($env.OUTBOX_MAX_ATTEMPTS || 7);\nconst baseDelay = Number($env.OUTBOX_BASE_DELAY_SEC || 30);\nconst maxDelay = Number($env.OUTBOX_MAX_DELAY_SEC || 3600);\n\nconst ch = (row.channel || '').toString();\n\nconst cfg = {\n  whatsapp: { url: $env.WA_SEND_URL, token: $env.WA_API_TOKEN },\n  instagram:{ url: $env.IG_SEND_URL, token: $env.IG_API_TOKEN },\n  messenger:{ url: $env.MSG_SEND_URL, token: $env.MSG_API_TOKEN }\n};\n\nconst url = (cfg[ch]?.url || '').toString();\nconst token = (cfg[ch]?.token || '').toString();\n\nfunction nextDelaySec(attempt) {\n  // attempt starts at 1\n  const d = baseDelay * Math.pow(2, Math.max(0, attempt - 1));\n  return Math.min(maxDelay, Math.floor(d));\n}\n\nlet newAttempts = Number(row.attempts || 0) + 1;\nlet status = 'RETRY';\nlet lastError = null;\nlet providerMessageId = null;\nlet nextRetryAt = new Date(Date.now() + nextDelaySec(newAttempts) * 1000).toISOString();\n\nif (!url) {\n  status = 'DLQ';\n  lastError = 'send_url_missing';\n  nextRetryAt = new Date().toISOString();\n  return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:lastError, _outbox_provider_message_id:providerMessageId}}];\n}\n\ntry {\n  const res = await $httpRequest({\n    method:'POST',\n    url,\n    headers: (() => {\n  const idem = (row.dedupe_key || row.outbound_id || '').toString();\n  const h = token ? {Authorization:'Bearer '+token} : {};\n  if (idem) {\n    h['Idempotency-Key'] = idem;\n    h['X-Idempotency-Key'] = idem;\n    h['X-Client-Message-Id'] = idem;\n  }\n  return h;\n})(),\n    body: (() => {\n  const idem = (row.dedupe_key || row.outbound_id || '').toString();\n  const b = (row.payload_json && typeof row.payload_json === 'object') ? row.payload_json : {};\n  // attach client idempotency for providers that support it\n  return idem ? {...b, client_message_id: idem, dedupe_key: idem} : b;\n})(),\n    json:true,\n    timeout: 30000\n  });\n\n  status = 'SENT';\n  providerMessageId = (res && (res.message_id || res.id || res.provider_message_id)) ? (res.message_id || res.id || res.provider_message_id).toString() : null;\n  nextRetryAt = new Date().toISOString();\n\n  return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:null, _outbox_provider_message_id:providerMessageId}}];\n} catch (err) {\n  const statusCode = err?.statusCode || err?.response?.statusCode || err?.response?.status;\n  if (statusCode === 409) {\n    // Provider indicates duplicate/idempotent conflict => treat as SENT\n    status = 'SENT';\n    lastError = 'duplicate_conflict_409';\n    nextRetryAt = new Date().toISOString();\n    return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:lastError, _outbox_provider_message_id:providerMessageId}}];\n  }\n\n  lastError = (err && err.message) ? err.message : 'send_failed';\n  if (newAttempts >= maxAttempts) {\n    status = 'DLQ';\n    nextRetryAt = new Date().toISOString();\n  }\n  return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:lastError, _outbox_provider_message_id:providerMessageId}}];\n}"
      },
      "id": "88c4f068-c7dc-4d9c-b985-c99ee26e7f3b",
      "name": "O3 - Send Outbox",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        80,
        260
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "UPDATE outbound_messages\n   SET status = $2,\n       attempts = $3,\n       next_retry_at = $4::timestamptz,\n       last_error = $5,\n       provider_message_id = COALESCE($6, provider_message_id),\n       sent_at = CASE WHEN $2='SENT' THEN now() ELSE sent_at END,\n       updated_at = now()\n WHERE outbound_id = $1\nRETURNING outbound_id, status;",
        "additionalFields": {
          "queryParams": "={{[$json.outbound_id, $json._outbox_status, $json._outbox_attempts, $json._outbox_next_retry_at, $json._outbox_last_error, $json._outbox_provider_message_id]}}"
        }
      },
      "id": "0c4780ad-5212-47d8-b57a-49274602fc82",
      "name": "O4 - Update Outbox (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        300,
        260
      ]
    },
    {
      "parameters": {
        "rule": {
          "cronExpression": "30 3 * * *"
        }
      },
      "id": "d1bdf929-592e-4912-93a3-156bde227d7a",
      "name": "R1 - Retention Purge (Daily 03:30)",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1,
      "position": [
        -360,
        -220
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const asBool = (v) => ['1','true','yes','y','on'].includes(String(v||'').toLowerCase());\nconst asInt = (v, d) => { const n = parseInt(v, 10); return Number.isFinite(n) ? n : d; };\n\nconst now = Date.now();\nconst dryRun = asBool($env.RETENTION_DRY_RUN ?? 'false');\nconst batchSize = asInt($env.RETENTION_BATCH_SIZE, 5000);\nconst maxIter = asInt($env.RETENTION_MAX_ITERATIONS, 200);\n\nconst tables = [\n  { kind: 'inbound', table_name: 'public.inbound_messages', retention_days: asInt($env.RETENTION_DAYS_INBOUND, 30) },\n  { kind: 'security', table_name: 'public.security_events', retention_days: asInt($env.RETENTION_DAYS_SECURITY, 90) },\n  { kind: 'outbox_sent', table_name: 'public.outbound_messages', retention_days: asInt($env.RETENTION_DAYS_OUTBOX_SENT, 30) },\n  { kind: 'workflow_errors', table_name: 'public.workflow_errors', retention_days: asInt($env.RETENTION_DAYS_WORKFLOW_ERRORS, 30) },\n];\n\nconst items = tables.map(t => {\n  const cutoffTs = new Date(now - t.retention_days * 24*3600*1000).toISOString();\n  return { json: {\n    kind: t.kind,\n    table_name: t.table_name,\n    retention_days: t.retention_days,\n    cutoff_ts: cutoffTs,\n    batch_size: batchSize,\n    dry_run: dryRun,\n    max_iterations: maxIter,\n  }};\n});\n\nreturn items;"
      },
      "id": "185a5fff-ce80-41ed-ab3e-8eccda9997d1",
      "name": "R2 - Build Retention Plan",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        -140,
        -220
      ]
    },
    {
      "parameters": {
        "batchSize": 1,
        "options": {}
      },
      "id": "7c3ab0f2-6d0e-47a7-9dfd-8db87a387fbe",
      "name": "R3 - Split Tables",
      "type": "n8n-nodes-base.splitInBatches",
      "typeVersion": 1,
      "position": [
        80,
        -220
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "INSERT INTO ops.retention_runs (dry_run, table_name, cutoff_ts, batch_size, details_json, status)\nVALUES ($1,$2,$3::timestamptz,$4, jsonb_build_object('retention_days',$5,'max_iterations',$6), 'STARTED')\nRETURNING run_id;",
        "additionalFields": {
          "queryParams": "={{[$json.dry_run, $json.table_name, $json.cutoff_ts, $json.batch_size, $json.retention_days, $json.max_iterations]}}"
        }
      },
      "id": "c4b6ebe3-e364-47d1-af8b-9f1bc34d1326",
      "name": "R4 - Retention Start Run (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        300,
        -220
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const runId = $json.run_id ?? $json[0]?.run_id;\nreturn [{json:{...$json, run_id: runId, _total_deleted:0, _iteration:0, _time_column:null, _deleted_last:0}}];"
      },
      "id": "fbb2578e-7b32-49a5-998a-c89723638071",
      "name": "R5 - Init Counters",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        520,
        -220
      ]
    },
    {
      "parameters": {
        "conditions": {
          "string": [
            {
              "value1": "={{$json.kind}}",
              "operation": "equal",
              "value2": "outbox_sent"
            }
          ]
        }
      },
      "id": "c7b54184-8b55-47b3-be1e-9bf9aba8a101",
      "name": "R6 - Is Outbox SENT?",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [
        740,
        -220
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "SELECT deleted_count, time_column FROM ops.purge_table_batch($1, $2::timestamptz, $3::int, $4::boolean);",
        "additionalFields": {
          "queryParams": "={{[$json.table_name, $json.cutoff_ts, $json.batch_size, $json.dry_run]}}"
        }
      },
      "id": "d59c17b0-9ed1-42ac-b08d-0f4efd15217c",
      "name": "R7a - Purge Generic Batch (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        980,
        -320
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "SELECT deleted_count, 'sent_at'::text AS time_column FROM ops.purge_outbound_sent_batch($1::timestamptz, $2::int, $3::boolean);",
        "additionalFields": {
          "queryParams": "={{[$json.cutoff_ts, $json.batch_size, $json.dry_run]}}"
        }
      },
      "id": "3591b127-8452-46a3-bc23-b468742022f7",
      "name": "R7b - Purge Outbox SENT Batch (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        980,
        -120
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const row = Array.isArray($json) ? ($json[0]||{}) : $json;\n// Postgres node returns object with fields. In batch mode, n8n may nest results.\nconst deleted = Number(row.deleted_count ?? row[0]?.deleted_count ?? 0);\nconst timeCol = row.time_column ?? row[0]?.time_column ?? null;\nconst total = Number($json._total_deleted ?? 0) + deleted;\nconst iter = Number($json._iteration ?? 0) + 1;\nreturn [{json:{...$json, _deleted_last: deleted, _total_deleted: total, _iteration: iter, _time_column: timeCol}}];"
      },
      "id": "5f076331-d2e3-4139-8afb-cbb46709dc81",
      "name": "R8 - Accumulate",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1220,
        -220
      ]
    },
    {
      "parameters": {
        "conditions": {
          "boolean": [
            {
              "value1": "={{$json.dry_run}}",
              "operation": "isFalse"
            }
          ],
          "number": [
            {
              "value1": "={{$json._deleted_last}}",
              "operation": "equal",
              "value2": "={{$json.batch_size}}"
            },
            {
              "value1": "={{$json._iteration}}",
              "operation": "smaller",
              "value2": "={{$json.max_iterations}}"
            }
          ]
        }
      },
      "id": "4f89f9eb-e83b-4b9d-adda-65e1d6a67d90",
      "name": "R9 - Continue Purge?",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [
        1440,
        -220
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "UPDATE ops.retention_runs\nSET run_finished_at = now(),\n    deleted_rows = $2::bigint,\n    details_json = jsonb_strip_nulls(jsonb_build_object(\n      'time_column',$3::text,\n      'iterations',$4::int,\n      'cutoff_ts',$5::timestamptz,\n      'dry_run',$6::boolean\n    )),\n    status = 'DONE'\nWHERE run_id = $1::bigint\nRETURNING run_id;",
        "additionalFields": {
          "queryParams": "={{[$json.run_id, $json._total_deleted, $json._time_column, $json._iteration, $json.cutoff_ts, $json.dry_run]}}"
        }
      },
      "id": "58db7131-2a21-4e1d-a901-3bb0232da98d",
      "name": "R10 - Finalize Run (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        1680,
        -320
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "INSERT INTO security_events (tenant_id, restaurant_id, conversation_key, channel, user_id, event_type, severity, payload_json)\nVALUES (NULL,NULL,NULL,NULL,NULL,'RETENTION_RUN','LOW',\n  jsonb_build_object('table',$1,'cutoff_ts',$2::timestamptz,'dry_run',$3::boolean,'deleted_rows',$4::bigint,'iterations',$5::int)\n) RETURNING id;",
        "additionalFields": {
          "queryParams": "={{[$json.table_name, $json.cutoff_ts, $json.dry_run, $json._total_deleted, $json._iteration]}}"
        }
      },
      "id": "06ae02d6-05ff-417e-ba08-57b42feea3ea",
      "name": "R11 - Log Security Event (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        1680,
        -120
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "// pass-through end-of-table marker\nreturn [{json:{...$json, _retention_done:true}}];"
      },
      "id": "1dca8a8f-3851-49cf-91d4-5405d34d753a",
      "name": "R12 - Next Table",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1900,
        -220
      ]
    },
    {
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "minutes",
              "minutesInterval": 5
            }
          ]
        }
      },
      "id": "6fb5b75f-980b-4aec-ab53-41cb2307d7e9",
      "name": "M1 - Every 5 min (SLO Checks)",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1,
      "position": [
        800,
        450
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "WITH params AS (SELECT GREATEST($1::int, 1) AS window_min),\nlat AS (\n  SELECT COALESCE(percentile_cont(0.95) within group (order by ms),0) AS p95_ms,\n         COUNT(*)::int AS n\n  FROM (\n    SELECT EXTRACT(epoch FROM (o.created_at - i.received_at))*1000 AS ms\n    FROM outbound_messages o\n    JOIN inbound_messages i\n      ON i.channel = o.channel\n     AND i.msg_id = split_part(o.dedupe_key,':',3)\n    WHERE o.dedupe_key LIKE 'msg:%:%'\n      AND o.created_at > now() - make_interval(mins => (SELECT window_min FROM params))\n  ) t\n),\npend AS (\n  SELECT COALESCE(MAX(EXTRACT(epoch FROM (now()-created_at))),0) AS max_age_sec,\n         COUNT(*)::int AS pending_cnt\n  FROM outbound_messages\n  WHERE status IN ('PENDING','RETRY')\n),\ndlq AS (\n  SELECT \n    SUM(CASE WHEN status='DLQ' THEN 1 ELSE 0 END)::int AS dlq_cnt,\n    SUM(CASE WHEN status='SENT' THEN 1 ELSE 0 END)::int AS sent_cnt\n  FROM outbound_messages\n  WHERE updated_at > now() - make_interval(mins => (SELECT window_min FROM params))\n    AND status IN ('SENT','DLQ')\n)\nSELECT (SELECT window_min FROM params) AS window_min,\n       lat.p95_ms, lat.n,\n       pend.max_age_sec, pend.pending_cnt,\n       dlq.dlq_cnt, dlq.sent_cnt,\n       CASE WHEN (dlq.dlq_cnt + dlq.sent_cnt) > 0 THEN (dlq.dlq_cnt::float / (dlq.dlq_cnt + dlq.sent_cnt)) ELSE 0 END AS dlq_rate,\n       (SELECT (value_json->>'last_sent_at')::timestamptz FROM ops_kv WHERE key='alert:slo') AS last_sent_at,\n       (SELECT (value_json->>'last_hash')::text FROM ops_kv WHERE key='alert:slo') AS last_hash\nFROM lat, pend, dlq;",
        "additionalFields": {
          "queryParams": "={{[Number($env.SLO_WINDOW_MIN || 15)]}}"
        }
      },
      "id": "4e22128c-7f69-4bdd-baf6-4e54b054cb8d",
      "name": "M2 - Compute SLO Metrics (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        1060,
        450
      ]
    },
    {
      "parameters": {
        "language": "JavaScript",
        "jsCode": "const m = $json;\nconst windowMin = Number($env.SLO_WINDOW_MIN || m.window_min || 15);\nconst tP95 = Number($env.SLO_INBOUND_TO_OUTBOX_P95_MS || 2000);\nconst tPending = Number($env.SLO_OUTBOX_PENDING_AGE_MAX_SEC || 600);\nconst tDlqRate = Number($env.SLO_DLQ_RATE_MAX || 0.05);\nconst tDlqCount = Number($env.SLO_DLQ_COUNT_MAX || 5);\n\nconst p95 = Number(m.p95_ms || 0);\nconst pendingAge = Number(m.max_age_sec || 0);\nconst pendingCnt = Number(m.pending_cnt || 0);\nconst dlqRate = Number(m.dlq_rate || 0);\nconst dlqCnt = Number(m.dlq_cnt || 0);\nconst sentCnt = Number(m.sent_cnt || 0);\n\nconst breaches = [];\nif (p95 > tP95) breaches.push({slo:'inbound_to_outbox_ms_p95', value:p95, threshold:tP95, window_min:windowMin, n:Number(m.n||0)});\nif (pendingAge > tPending) breaches.push({slo:'outbox_pending_age_max_sec', value:pendingAge, threshold:tPending, pending_cnt:pendingCnt});\nif (dlqRate > tDlqRate) breaches.push({slo:'dlq_rate', value:dlqRate, threshold:tDlqRate, dlq_cnt:dlqCnt, sent_cnt:sentCnt, window_min:windowMin});\nif (dlqCnt > tDlqCount) breaches.push({slo:'dlq_count', value:dlqCnt, threshold:tDlqCount, window_min:windowMin});\n\nconst alert = breaches.length > 0;\nconst severity = breaches.some(b => b.slo === 'outbox_pending_age_max_sec' && b.value > (tPending*6))\n  ? 'CRITICAL'\n  : (alert ? 'HIGH' : 'LOW');\n\nconst alertWebhookUrl = ($env.ALERT_WEBHOOK_URL || '').toString().trim();\nconst alertEnabled = (($env.ALERT_SLO_ENABLED || 'true').toString().toLowerCase() === 'true');\nconst cooldownSec = Number($env.ALERT_COOLDOWN_SEC || 300);\n\n// Compute a stable hash for cooldown (breaches content)\nconst breachHash = crypto.createHash('sha256').update(JSON.stringify(breaches)).digest('hex');\nconst lastSentAtRaw = m.last_sent_at || null;\nconst lastHash = (m.last_hash || '').toString();\nlet suppressed = false;\nlet suppressReason = '';\n\nif (alert && alertEnabled && lastSentAtRaw) {\n  const lastSentAt = new Date(lastSentAtRaw).getTime();\n  const nowTs = Date.now();\n  const ageSec = Math.floor((nowTs - lastSentAt)/1000);\n  if (ageSec >= 0 && ageSec < cooldownSec && lastHash && lastHash === breachHash) {\n    suppressed = true;\n    suppressReason = `cooldown_${ageSec}s`;\n  }\n}\n\n// Never alert externally if webhook URL missing or disabled; still log breach in DB for visibility.\nconst externalAlert = alertEnabled && !!alertWebhookUrl && !suppressed && alert;\n\nreturn [{json:{\n  kind:'slo_check',\n  service:'resto-bot',\n  timestamp: new Date().toISOString(),\n  window_min: windowMin,\n  metrics:{\n    inbound_to_outbox_ms_p95: p95,\n    inbound_to_outbox_samples: Number(m.n||0),\n    outbox_pending_age_max_sec: pendingAge,\n    outbox_pending_cnt: pendingCnt,\n    dlq_rate: dlqRate,\n    dlq_cnt: dlqCnt,\n    sent_cnt: sentCnt\n  },\n  thresholds:{\n    SLO_INBOUND_TO_OUTBOX_P95_MS: tP95,\n    SLO_OUTBOX_PENDING_AGE_MAX_SEC: tPending,\n    SLO_DLQ_RATE_MAX: tDlqRate,\n    SLO_DLQ_COUNT_MAX: tDlqCount\n  },\n  alert,\n  severity,\n  breaches,\n  runbook:'docs/SLO.md',\n  breach_hash: breachHash,\n  external_alert: externalAlert,\n  suppressed,\n  suppress_reason: suppressReason,\n  alert_webhook_configured: !!alertWebhookUrl,\n  alert_enabled: alertEnabled\n}}];\n"
      },
      "id": "3afed419-e07e-42e3-bf6e-a160309ebba3",
      "name": "M3 - Evaluate SLO Thresholds",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1320,
        450
      ]
    },
    {
      "parameters": {
        "conditions": {
          "boolean": [
            {
              "value1": "={{$json.alert}}",
              "operation": "isTrue"
            }
          ]
        }
      },
      "id": "f8b0ce61-2338-4c29-a163-8063916e0ce8",
      "name": "M4 - Alert Needed?",
      "type": "n8n-nodes-base.if",
      "typeVersion": 1,
      "position": [
        1560,
        450
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "INSERT INTO security_events(tenant_id, restaurant_id, conversation_key, channel, user_id, event_type, severity, payload_json) VALUES (NULL,NULL,NULL,NULL,NULL,'SLO_BREACH',$1, $2::jsonb) RETURNING 1;",
        "additionalFields": {
          "queryParams": "={{[$json.severity, JSON.stringify($json)]}}"
        }
      },
      "id": "6eb983bd-bb86-4136-9f28-73ebbefe28ff",
      "name": "M6 - Log SLO Breach (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        1820,
        330
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "INSERT INTO ops_kv(key, value_json, updated_at)\nVALUES ('alert:slo', jsonb_build_object('last_sent_at', now(), 'last_hash', $1), now())\nON CONFLICT (key) DO UPDATE SET value_json=EXCLUDED.value_json, updated_at=now()\nRETURNING key;",
        "additionalFields": {
          "queryParams": "={{[$json.breach_hash]}}"
        }
      },
      "id": "M5_update_alert_state_3f44a84e",
      "name": "M5 - Update Alert State (DB)",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        -450,
        880
      ]
    }
  ],
  "connections": {
    "S1 - Every 5 min": {
      "main": [
        [
          {
            "node": "S2 - Load Due Feedback",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "S2 - Load Due Feedback": {
      "main": [
        [
          {
            "node": "S3 - Send Feedback Batch",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "S3 - Send Feedback Batch": {
      "main": [
        [
          {
            "node": "S4 - Mark Feedback Status",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "E1 - On Error": {
      "main": [
        [
          {
            "node": "E2 - Normalize Error",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "E2 - Normalize Error": {
      "main": [
        [
          {
            "node": "E3 - Save Error (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "E3 - Save Error (DB)": {
      "main": [
        [
          {
            "node": "E4 - Optional Alert Webhook",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "O1 - Every 1 min": {
      "main": [
        [
          {
            "node": "O2 - Pick Due Outbox (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "O2 - Pick Due Outbox (DB)": {
      "main": [
        [
          {
            "node": "O3 - Send Outbox",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "O3 - Send Outbox": {
      "main": [
        [
          {
            "node": "O4 - Update Outbox (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R1 - Retention Purge (Daily 03:30)": {
      "main": [
        [
          {
            "node": "R2 - Build Retention Plan",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R2 - Build Retention Plan": {
      "main": [
        [
          {
            "node": "R3 - Split Tables",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R3 - Split Tables": {
      "main": [
        [
          {
            "node": "R4 - Retention Start Run (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R4 - Retention Start Run (DB)": {
      "main": [
        [
          {
            "node": "R5 - Init Counters",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R5 - Init Counters": {
      "main": [
        [
          {
            "node": "R6 - Is Outbox SENT?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R6 - Is Outbox SENT?": {
      "main": [
        [
          {
            "node": "R7b - Purge Outbox SENT Batch (DB)",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "R7a - Purge Generic Batch (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R7a - Purge Generic Batch (DB)": {
      "main": [
        [
          {
            "node": "R8 - Accumulate",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R7b - Purge Outbox SENT Batch (DB)": {
      "main": [
        [
          {
            "node": "R8 - Accumulate",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R8 - Accumulate": {
      "main": [
        [
          {
            "node": "R9 - Continue Purge?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R9 - Continue Purge?": {
      "main": [
        [
          {
            "node": "R6 - Is Outbox SENT?",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "R10 - Finalize Run (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R10 - Finalize Run (DB)": {
      "main": [
        [
          {
            "node": "R11 - Log Security Event (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R11 - Log Security Event (DB)": {
      "main": [
        [
          {
            "node": "R12 - Next Table",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "R12 - Next Table": {
      "main": [
        [
          {
            "node": "R3 - Split Tables",
            "type": "main",
            "index": 1
          }
        ]
      ]
    },
    "M1 - Every 5 min (SLO Checks)": {
      "main": [
        [
          {
            "node": "M2 - Compute SLO Metrics (DB)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "M2 - Compute SLO Metrics (DB)": {
      "main": [
        [
          {
            "node": "M3 - Evaluate SLO Thresholds",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "M3 - Evaluate SLO Thresholds": {
      "main": [
        [
          {
            "node": "M4 - Alert Needed?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "M4 - Alert Needed?": {
      "main": [
        [
          {
            "node": "M5 - Update Alert State (DB)",
            "type": "main",
            "index": 0
          },
          {
            "node": "M6 - Log SLO Breach (DB)",
            "type": "main",
            "index": 0
          },
          {
            "node": "E4 - Optional Alert Webhook",
            "type": "main",
            "index": 0
          }
        ],
        []
      ]
    },
    "M6 - Log SLO Breach (DB)": {
      "main": [
        []
      ]
    },
    "E4 - Optional Alert Webhook": {
      "main": [
        []
      ]
    }
  }
}
Pro

For the full experience including quality scoring and batch install features for each workflow upgrade to Pro

How this works

This workflow automates operational monitoring by running feedback checks every five minutes and capturing errors for swift resolution, ensuring your systems stay reliable without constant manual oversight. It's ideal for operations teams managing databases and automated processes who need proactive alerts to prevent downtime. The key step involves querying PostgreSQL to load due feedback tasks, processing them in batches, and updating statuses, while an error trigger normalises and logs any issues to the database for review.

Use this when you handle recurring tasks like customer feedback collection alongside error-prone automations, such as in SaaS platforms or internal tools requiring real-time ops vigilance. Avoid it for one-off processes or environments without PostgreSQL integration, as it relies on scheduled cron jobs. Common variations include adjusting the interval for less frequent checks or adding Slack notifications via the optional webhook for immediate team alerts.

About this workflow

W8 - OPS (Feedback Cron + Errors + Alerts). Uses scheduleTrigger, postgres, errorTrigger, splitInBatches. Scheduled trigger; 31 nodes.

Source: https://github.com/zerAda/RestaurantAgentAutomation/blob/41a4d42dcd66e57b1e87b4750c0fd5fbf7058f68/workflows/W8_OPS.json — original creator credit. Request a take-down →

More AI & RAG workflows → · Browse all categories →

Related workflows

Workflows that share integrations, category, or trigger type with this one. All free to copy and import.

AI & RAG

Automated Gmail Email Processing System

Postgres, S3, Gmail +1
AI & RAG

This n8n workflow automatically fetches monthly financial statements, normalizes the data, performs KPI calculations and trend analysis, detects anomalies, generates AI-powered executive insights and

HTTP Request, Postgres, Email Send
AI & RAG

Use cases are many: Automate your personal trading strategy, monitor a portfolio for significant trend changes, or provide automated analysis highlights for a trading community or client group.

HTTP Request, Postgres
AI & RAG

LRAC-031 · Reporte semanal ejecutivo Gemini Pro. Uses postgres, httpRequest, googleDrive, gmail. Scheduled trigger; 8 nodes.

Postgres, HTTP Request, Google Drive +1
AI & RAG

Supabase Data Analyst — Morning Briefing. Uses stickyNote, scheduleTrigger, splitOut, postgres. Scheduled trigger; 7 nodes.

Postgres