The workflow JSON
Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →
{
"name": "W8 - OPS (Feedback Cron + Errors + Alerts)",
"active": false,
"settings": {
"executionTimeout": 300,
"saveExecutionProgress": true,
"saveManualExecutions": true
},
"nodes": [
{
"parameters": {
"rule": {
"interval": [
{
"field": "minutes",
"minutesInterval": 5
}
]
}
},
"id": "dc4d7f7d-2708-45da-827c-9930815741ab",
"name": "S1 - Every 5 min",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [
-2400,
-200
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT COALESCE(jsonb_agg(row_to_json(t)), '[]'::jsonb) AS due\n FROM (\n SELECT id, channel, user_id, restaurant_id, message_text\n FROM feedback_jobs\n WHERE status='PENDING' AND scheduled_at <= now()\n ORDER BY scheduled_at ASC\n LIMIT 50\n ) t;"
},
"id": "c0fedec7-27a7-4fe1-b5d6-21e5e79a59e9",
"name": "S2 - Load Due Feedback",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
-2150,
-200
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const row = $json;\nconst due = Array.isArray(row.due) ? row.due : [];\nconst results = [];\n\nconst cfg = {\n whatsapp: { url: $env.WA_SEND_URL, token: $env.WA_API_TOKEN },\n instagram:{ url: $env.IG_SEND_URL, token: $env.IG_API_TOKEN },\n messenger:{ url: $env.MSG_SEND_URL, token: $env.MSG_API_TOKEN }\n};\n\nfor (const job of due) {\n const ch = (job.channel || '').toString();\n const url = (cfg[ch]?.url || '').toString();\n const token = (cfg[ch]?.token || '').toString();\n\n if (!url) {\n results.push({id: job.id, ok:false, reason:'send_url_missing'});\n continue;\n }\n\n try {\n await $httpRequest({\n method:'POST',\n url,\n headers: token ? {Authorization:'Bearer '+token} : {},\n body: { channel: ch, to: job.user_id, restaurantId: job.restaurant_id, text: job.message_text, buttons: [{id:'FEEDBACK_5',title:'\u2b50 5'},{id:'FEEDBACK_4',title:'\u2b50 4'},{id:'FEEDBACK_3',title:'\u2b50 3'}]},\n json:true,\n timeout: 30000\n });\n results.push({id: job.id, ok:true});\n } catch (err) {\n results.push({id: job.id, ok:false, error: err?.message || 'send_failed'});\n }\n}\n\nreturn results.map(r => ({json: r}));"
},
"id": "4300d758-522b-4802-90c3-6a7c22f3f116",
"name": "S3 - Send Feedback Batch",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
-1920,
-200
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "UPDATE feedback_jobs\n SET status = CASE WHEN $2::boolean THEN 'SENT' ELSE 'PENDING' END,\n sent_at = CASE WHEN $2::boolean THEN now() ELSE sent_at END,\n last_error = CASE WHEN $2::boolean THEN NULL ELSE COALESCE($3,'send_failed') END\n WHERE id = $1::uuid\n RETURNING id, status;",
"additionalFields": {
"queryParams": "={{[$json.id, $json.ok, $json.error]}}"
}
},
"id": "dbc6b5fd-98da-4efa-a0e1-f6110ec46513",
"name": "S4 - Mark Feedback Status",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
-1700,
-200
]
},
{
"parameters": {},
"id": "c8c1f04e-61c6-4199-88ab-7931e9c2cb33",
"name": "E1 - On Error",
"type": "n8n-nodes-base.errorTrigger",
"typeVersion": 1,
"position": [
-2400,
200
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const e = $json;\nconst out = {\n workflow: e.workflow?.name || e.workflow || '',\n node: e.node?.name || e.node || '',\n message: e.error?.message || e.message || '',\n stack: e.error?.stack || e.stack || '',\n time: new Date().toISOString(),\n executionId: e.execution?.id || e.executionId || ''\n};\nreturn [{json: out}];"
},
"id": "28f89712-2a5f-436b-9aa7-758e439a5d5d",
"name": "E2 - Normalize Error",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
-2150,
200
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO workflow_errors (workflow_name, node_name, error_message, stack, execution_id)\n VALUES ($1,$2,$3,$4,$5)\n RETURNING id;",
"additionalFields": {
"queryParams": "={{[$json.workflow,$json.node,$json.message,$json.stack,$json.executionId]}}"
}
},
"id": "e8d4e616-713d-4964-acb9-8695782a2822",
"name": "E3 - Save Error (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
-1920,
200
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const url = ($env.ALERT_WEBHOOK_URL || '').toString();\nif (!url) return [{json:{ok:true, alerted:false, reason:'no_webhook_url'}}];\n\n// For SLO checks, respect computed gating to avoid spam/cooldown\nif ($json && $json.kind === 'slo_check' && $json.external_alert === false) {\n return [{json:{ok:true, alerted:false, reason:'slo_gated'}}];\n}\n\ntry{\n await $httpRequest({method:'POST',url,body:$json,json:true,timeout:15000});\n return [{json:{ok:true, alerted:true}}];\n}catch(err){\n return [{json:{ok:false, alerted:false, error: err?.message || 'alert_failed'}}];\n}\n"
},
"id": "01efeee8-e5ea-4aef-b710-8103e4477748",
"name": "E4 - Optional Alert Webhook",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
-1700,
200
]
},
{
"parameters": {
"rule": {
"interval": [
{
"field": "minutes",
"value": 1
}
]
}
},
"id": "962294af-aadb-462e-8cef-1604beabf400",
"name": "O1 - Every 1 min",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [
-360,
260
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "WITH picked AS (\n SELECT outbound_id\n FROM outbound_messages\n WHERE status IN ('PENDING','RETRY')\n AND next_retry_at <= now()\n ORDER BY next_retry_at ASC\n LIMIT 20\n FOR UPDATE SKIP LOCKED\n)\nUPDATE outbound_messages m\n SET status='RETRY',\n updated_at=now()\n FROM picked\n WHERE m.outbound_id = picked.outbound_id\nRETURNING m.*;",
"additionalFields": {}
},
"id": "215eed60-e2e2-4887-bb5e-4e3a9d7ec0d3",
"name": "O2 - Pick Due Outbox (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
-140,
260
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const row = $json;\n\nconst maxAttempts = Number($env.OUTBOX_MAX_ATTEMPTS || 7);\nconst baseDelay = Number($env.OUTBOX_BASE_DELAY_SEC || 30);\nconst maxDelay = Number($env.OUTBOX_MAX_DELAY_SEC || 3600);\n\nconst ch = (row.channel || '').toString();\n\nconst cfg = {\n whatsapp: { url: $env.WA_SEND_URL, token: $env.WA_API_TOKEN },\n instagram:{ url: $env.IG_SEND_URL, token: $env.IG_API_TOKEN },\n messenger:{ url: $env.MSG_SEND_URL, token: $env.MSG_API_TOKEN }\n};\n\nconst url = (cfg[ch]?.url || '').toString();\nconst token = (cfg[ch]?.token || '').toString();\n\nfunction nextDelaySec(attempt) {\n // attempt starts at 1\n const d = baseDelay * Math.pow(2, Math.max(0, attempt - 1));\n return Math.min(maxDelay, Math.floor(d));\n}\n\nlet newAttempts = Number(row.attempts || 0) + 1;\nlet status = 'RETRY';\nlet lastError = null;\nlet providerMessageId = null;\nlet nextRetryAt = new Date(Date.now() + nextDelaySec(newAttempts) * 1000).toISOString();\n\nif (!url) {\n status = 'DLQ';\n lastError = 'send_url_missing';\n nextRetryAt = new Date().toISOString();\n return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:lastError, _outbox_provider_message_id:providerMessageId}}];\n}\n\ntry {\n const res = await $httpRequest({\n method:'POST',\n url,\n headers: (() => {\n const idem = (row.dedupe_key || row.outbound_id || '').toString();\n const h = token ? {Authorization:'Bearer '+token} : {};\n if (idem) {\n h['Idempotency-Key'] = idem;\n h['X-Idempotency-Key'] = idem;\n h['X-Client-Message-Id'] = idem;\n }\n return h;\n})(),\n body: (() => {\n const idem = (row.dedupe_key || row.outbound_id || '').toString();\n const b = (row.payload_json && typeof row.payload_json === 'object') ? row.payload_json : {};\n // attach client idempotency for providers that support it\n return idem ? {...b, client_message_id: idem, dedupe_key: idem} : b;\n})(),\n json:true,\n timeout: 30000\n });\n\n status = 'SENT';\n providerMessageId = (res && (res.message_id || res.id || res.provider_message_id)) ? (res.message_id || res.id || res.provider_message_id).toString() : null;\n nextRetryAt = new Date().toISOString();\n\n return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:null, _outbox_provider_message_id:providerMessageId}}];\n} catch (err) {\n const statusCode = err?.statusCode || err?.response?.statusCode || err?.response?.status;\n if (statusCode === 409) {\n // Provider indicates duplicate/idempotent conflict => treat as SENT\n status = 'SENT';\n lastError = 'duplicate_conflict_409';\n nextRetryAt = new Date().toISOString();\n return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:lastError, _outbox_provider_message_id:providerMessageId}}];\n }\n\n lastError = (err && err.message) ? err.message : 'send_failed';\n if (newAttempts >= maxAttempts) {\n status = 'DLQ';\n nextRetryAt = new Date().toISOString();\n }\n return [{json:{...row, _outbox_status:status, _outbox_attempts:newAttempts, _outbox_next_retry_at:nextRetryAt, _outbox_last_error:lastError, _outbox_provider_message_id:providerMessageId}}];\n}"
},
"id": "88c4f068-c7dc-4d9c-b985-c99ee26e7f3b",
"name": "O3 - Send Outbox",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
80,
260
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "UPDATE outbound_messages\n SET status = $2,\n attempts = $3,\n next_retry_at = $4::timestamptz,\n last_error = $5,\n provider_message_id = COALESCE($6, provider_message_id),\n sent_at = CASE WHEN $2='SENT' THEN now() ELSE sent_at END,\n updated_at = now()\n WHERE outbound_id = $1\nRETURNING outbound_id, status;",
"additionalFields": {
"queryParams": "={{[$json.outbound_id, $json._outbox_status, $json._outbox_attempts, $json._outbox_next_retry_at, $json._outbox_last_error, $json._outbox_provider_message_id]}}"
}
},
"id": "0c4780ad-5212-47d8-b57a-49274602fc82",
"name": "O4 - Update Outbox (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
300,
260
]
},
{
"parameters": {
"rule": {
"cronExpression": "30 3 * * *"
}
},
"id": "d1bdf929-592e-4912-93a3-156bde227d7a",
"name": "R1 - Retention Purge (Daily 03:30)",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [
-360,
-220
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const asBool = (v) => ['1','true','yes','y','on'].includes(String(v||'').toLowerCase());\nconst asInt = (v, d) => { const n = parseInt(v, 10); return Number.isFinite(n) ? n : d; };\n\nconst now = Date.now();\nconst dryRun = asBool($env.RETENTION_DRY_RUN ?? 'false');\nconst batchSize = asInt($env.RETENTION_BATCH_SIZE, 5000);\nconst maxIter = asInt($env.RETENTION_MAX_ITERATIONS, 200);\n\nconst tables = [\n { kind: 'inbound', table_name: 'public.inbound_messages', retention_days: asInt($env.RETENTION_DAYS_INBOUND, 30) },\n { kind: 'security', table_name: 'public.security_events', retention_days: asInt($env.RETENTION_DAYS_SECURITY, 90) },\n { kind: 'outbox_sent', table_name: 'public.outbound_messages', retention_days: asInt($env.RETENTION_DAYS_OUTBOX_SENT, 30) },\n { kind: 'workflow_errors', table_name: 'public.workflow_errors', retention_days: asInt($env.RETENTION_DAYS_WORKFLOW_ERRORS, 30) },\n];\n\nconst items = tables.map(t => {\n const cutoffTs = new Date(now - t.retention_days * 24*3600*1000).toISOString();\n return { json: {\n kind: t.kind,\n table_name: t.table_name,\n retention_days: t.retention_days,\n cutoff_ts: cutoffTs,\n batch_size: batchSize,\n dry_run: dryRun,\n max_iterations: maxIter,\n }};\n});\n\nreturn items;"
},
"id": "185a5fff-ce80-41ed-ab3e-8eccda9997d1",
"name": "R2 - Build Retention Plan",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
-140,
-220
]
},
{
"parameters": {
"batchSize": 1,
"options": {}
},
"id": "7c3ab0f2-6d0e-47a7-9dfd-8db87a387fbe",
"name": "R3 - Split Tables",
"type": "n8n-nodes-base.splitInBatches",
"typeVersion": 1,
"position": [
80,
-220
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO ops.retention_runs (dry_run, table_name, cutoff_ts, batch_size, details_json, status)\nVALUES ($1,$2,$3::timestamptz,$4, jsonb_build_object('retention_days',$5,'max_iterations',$6), 'STARTED')\nRETURNING run_id;",
"additionalFields": {
"queryParams": "={{[$json.dry_run, $json.table_name, $json.cutoff_ts, $json.batch_size, $json.retention_days, $json.max_iterations]}}"
}
},
"id": "c4b6ebe3-e364-47d1-af8b-9f1bc34d1326",
"name": "R4 - Retention Start Run (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
300,
-220
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const runId = $json.run_id ?? $json[0]?.run_id;\nreturn [{json:{...$json, run_id: runId, _total_deleted:0, _iteration:0, _time_column:null, _deleted_last:0}}];"
},
"id": "fbb2578e-7b32-49a5-998a-c89723638071",
"name": "R5 - Init Counters",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
520,
-220
]
},
{
"parameters": {
"conditions": {
"string": [
{
"value1": "={{$json.kind}}",
"operation": "equal",
"value2": "outbox_sent"
}
]
}
},
"id": "c7b54184-8b55-47b3-be1e-9bf9aba8a101",
"name": "R6 - Is Outbox SENT?",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [
740,
-220
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT deleted_count, time_column FROM ops.purge_table_batch($1, $2::timestamptz, $3::int, $4::boolean);",
"additionalFields": {
"queryParams": "={{[$json.table_name, $json.cutoff_ts, $json.batch_size, $json.dry_run]}}"
}
},
"id": "d59c17b0-9ed1-42ac-b08d-0f4efd15217c",
"name": "R7a - Purge Generic Batch (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
980,
-320
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT deleted_count, 'sent_at'::text AS time_column FROM ops.purge_outbound_sent_batch($1::timestamptz, $2::int, $3::boolean);",
"additionalFields": {
"queryParams": "={{[$json.cutoff_ts, $json.batch_size, $json.dry_run]}}"
}
},
"id": "3591b127-8452-46a3-bc23-b468742022f7",
"name": "R7b - Purge Outbox SENT Batch (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
980,
-120
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "const row = Array.isArray($json) ? ($json[0]||{}) : $json;\n// Postgres node returns object with fields. In batch mode, n8n may nest results.\nconst deleted = Number(row.deleted_count ?? row[0]?.deleted_count ?? 0);\nconst timeCol = row.time_column ?? row[0]?.time_column ?? null;\nconst total = Number($json._total_deleted ?? 0) + deleted;\nconst iter = Number($json._iteration ?? 0) + 1;\nreturn [{json:{...$json, _deleted_last: deleted, _total_deleted: total, _iteration: iter, _time_column: timeCol}}];"
},
"id": "5f076331-d2e3-4139-8afb-cbb46709dc81",
"name": "R8 - Accumulate",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1220,
-220
]
},
{
"parameters": {
"conditions": {
"boolean": [
{
"value1": "={{$json.dry_run}}",
"operation": "isFalse"
}
],
"number": [
{
"value1": "={{$json._deleted_last}}",
"operation": "equal",
"value2": "={{$json.batch_size}}"
},
{
"value1": "={{$json._iteration}}",
"operation": "smaller",
"value2": "={{$json.max_iterations}}"
}
]
}
},
"id": "4f89f9eb-e83b-4b9d-adda-65e1d6a67d90",
"name": "R9 - Continue Purge?",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [
1440,
-220
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "UPDATE ops.retention_runs\nSET run_finished_at = now(),\n deleted_rows = $2::bigint,\n details_json = jsonb_strip_nulls(jsonb_build_object(\n 'time_column',$3::text,\n 'iterations',$4::int,\n 'cutoff_ts',$5::timestamptz,\n 'dry_run',$6::boolean\n )),\n status = 'DONE'\nWHERE run_id = $1::bigint\nRETURNING run_id;",
"additionalFields": {
"queryParams": "={{[$json.run_id, $json._total_deleted, $json._time_column, $json._iteration, $json.cutoff_ts, $json.dry_run]}}"
}
},
"id": "58db7131-2a21-4e1d-a901-3bb0232da98d",
"name": "R10 - Finalize Run (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
1680,
-320
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO security_events (tenant_id, restaurant_id, conversation_key, channel, user_id, event_type, severity, payload_json)\nVALUES (NULL,NULL,NULL,NULL,NULL,'RETENTION_RUN','LOW',\n jsonb_build_object('table',$1,'cutoff_ts',$2::timestamptz,'dry_run',$3::boolean,'deleted_rows',$4::bigint,'iterations',$5::int)\n) RETURNING id;",
"additionalFields": {
"queryParams": "={{[$json.table_name, $json.cutoff_ts, $json.dry_run, $json._total_deleted, $json._iteration]}}"
}
},
"id": "06ae02d6-05ff-417e-ba08-57b42feea3ea",
"name": "R11 - Log Security Event (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
1680,
-120
]
},
{
"parameters": {
"language": "javascript",
"jsCode": "// pass-through end-of-table marker\nreturn [{json:{...$json, _retention_done:true}}];"
},
"id": "1dca8a8f-3851-49cf-91d4-5405d34d753a",
"name": "R12 - Next Table",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1900,
-220
]
},
{
"parameters": {
"rule": {
"interval": [
{
"field": "minutes",
"minutesInterval": 5
}
]
}
},
"id": "6fb5b75f-980b-4aec-ab53-41cb2307d7e9",
"name": "M1 - Every 5 min (SLO Checks)",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [
800,
450
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "WITH params AS (SELECT GREATEST($1::int, 1) AS window_min),\nlat AS (\n SELECT COALESCE(percentile_cont(0.95) within group (order by ms),0) AS p95_ms,\n COUNT(*)::int AS n\n FROM (\n SELECT EXTRACT(epoch FROM (o.created_at - i.received_at))*1000 AS ms\n FROM outbound_messages o\n JOIN inbound_messages i\n ON i.channel = o.channel\n AND i.msg_id = split_part(o.dedupe_key,':',3)\n WHERE o.dedupe_key LIKE 'msg:%:%'\n AND o.created_at > now() - make_interval(mins => (SELECT window_min FROM params))\n ) t\n),\npend AS (\n SELECT COALESCE(MAX(EXTRACT(epoch FROM (now()-created_at))),0) AS max_age_sec,\n COUNT(*)::int AS pending_cnt\n FROM outbound_messages\n WHERE status IN ('PENDING','RETRY')\n),\ndlq AS (\n SELECT \n SUM(CASE WHEN status='DLQ' THEN 1 ELSE 0 END)::int AS dlq_cnt,\n SUM(CASE WHEN status='SENT' THEN 1 ELSE 0 END)::int AS sent_cnt\n FROM outbound_messages\n WHERE updated_at > now() - make_interval(mins => (SELECT window_min FROM params))\n AND status IN ('SENT','DLQ')\n)\nSELECT (SELECT window_min FROM params) AS window_min,\n lat.p95_ms, lat.n,\n pend.max_age_sec, pend.pending_cnt,\n dlq.dlq_cnt, dlq.sent_cnt,\n CASE WHEN (dlq.dlq_cnt + dlq.sent_cnt) > 0 THEN (dlq.dlq_cnt::float / (dlq.dlq_cnt + dlq.sent_cnt)) ELSE 0 END AS dlq_rate,\n (SELECT (value_json->>'last_sent_at')::timestamptz FROM ops_kv WHERE key='alert:slo') AS last_sent_at,\n (SELECT (value_json->>'last_hash')::text FROM ops_kv WHERE key='alert:slo') AS last_hash\nFROM lat, pend, dlq;",
"additionalFields": {
"queryParams": "={{[Number($env.SLO_WINDOW_MIN || 15)]}}"
}
},
"id": "4e22128c-7f69-4bdd-baf6-4e54b054cb8d",
"name": "M2 - Compute SLO Metrics (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
1060,
450
]
},
{
"parameters": {
"language": "JavaScript",
"jsCode": "const m = $json;\nconst windowMin = Number($env.SLO_WINDOW_MIN || m.window_min || 15);\nconst tP95 = Number($env.SLO_INBOUND_TO_OUTBOX_P95_MS || 2000);\nconst tPending = Number($env.SLO_OUTBOX_PENDING_AGE_MAX_SEC || 600);\nconst tDlqRate = Number($env.SLO_DLQ_RATE_MAX || 0.05);\nconst tDlqCount = Number($env.SLO_DLQ_COUNT_MAX || 5);\n\nconst p95 = Number(m.p95_ms || 0);\nconst pendingAge = Number(m.max_age_sec || 0);\nconst pendingCnt = Number(m.pending_cnt || 0);\nconst dlqRate = Number(m.dlq_rate || 0);\nconst dlqCnt = Number(m.dlq_cnt || 0);\nconst sentCnt = Number(m.sent_cnt || 0);\n\nconst breaches = [];\nif (p95 > tP95) breaches.push({slo:'inbound_to_outbox_ms_p95', value:p95, threshold:tP95, window_min:windowMin, n:Number(m.n||0)});\nif (pendingAge > tPending) breaches.push({slo:'outbox_pending_age_max_sec', value:pendingAge, threshold:tPending, pending_cnt:pendingCnt});\nif (dlqRate > tDlqRate) breaches.push({slo:'dlq_rate', value:dlqRate, threshold:tDlqRate, dlq_cnt:dlqCnt, sent_cnt:sentCnt, window_min:windowMin});\nif (dlqCnt > tDlqCount) breaches.push({slo:'dlq_count', value:dlqCnt, threshold:tDlqCount, window_min:windowMin});\n\nconst alert = breaches.length > 0;\nconst severity = breaches.some(b => b.slo === 'outbox_pending_age_max_sec' && b.value > (tPending*6))\n ? 'CRITICAL'\n : (alert ? 'HIGH' : 'LOW');\n\nconst alertWebhookUrl = ($env.ALERT_WEBHOOK_URL || '').toString().trim();\nconst alertEnabled = (($env.ALERT_SLO_ENABLED || 'true').toString().toLowerCase() === 'true');\nconst cooldownSec = Number($env.ALERT_COOLDOWN_SEC || 300);\n\n// Compute a stable hash for cooldown (breaches content)\nconst breachHash = crypto.createHash('sha256').update(JSON.stringify(breaches)).digest('hex');\nconst lastSentAtRaw = m.last_sent_at || null;\nconst lastHash = (m.last_hash || '').toString();\nlet suppressed = false;\nlet suppressReason = '';\n\nif (alert && alertEnabled && lastSentAtRaw) {\n const lastSentAt = new Date(lastSentAtRaw).getTime();\n const nowTs = Date.now();\n const ageSec = Math.floor((nowTs - lastSentAt)/1000);\n if (ageSec >= 0 && ageSec < cooldownSec && lastHash && lastHash === breachHash) {\n suppressed = true;\n suppressReason = `cooldown_${ageSec}s`;\n }\n}\n\n// Never alert externally if webhook URL missing or disabled; still log breach in DB for visibility.\nconst externalAlert = alertEnabled && !!alertWebhookUrl && !suppressed && alert;\n\nreturn [{json:{\n kind:'slo_check',\n service:'resto-bot',\n timestamp: new Date().toISOString(),\n window_min: windowMin,\n metrics:{\n inbound_to_outbox_ms_p95: p95,\n inbound_to_outbox_samples: Number(m.n||0),\n outbox_pending_age_max_sec: pendingAge,\n outbox_pending_cnt: pendingCnt,\n dlq_rate: dlqRate,\n dlq_cnt: dlqCnt,\n sent_cnt: sentCnt\n },\n thresholds:{\n SLO_INBOUND_TO_OUTBOX_P95_MS: tP95,\n SLO_OUTBOX_PENDING_AGE_MAX_SEC: tPending,\n SLO_DLQ_RATE_MAX: tDlqRate,\n SLO_DLQ_COUNT_MAX: tDlqCount\n },\n alert,\n severity,\n breaches,\n runbook:'docs/SLO.md',\n breach_hash: breachHash,\n external_alert: externalAlert,\n suppressed,\n suppress_reason: suppressReason,\n alert_webhook_configured: !!alertWebhookUrl,\n alert_enabled: alertEnabled\n}}];\n"
},
"id": "3afed419-e07e-42e3-bf6e-a160309ebba3",
"name": "M3 - Evaluate SLO Thresholds",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1320,
450
]
},
{
"parameters": {
"conditions": {
"boolean": [
{
"value1": "={{$json.alert}}",
"operation": "isTrue"
}
]
}
},
"id": "f8b0ce61-2338-4c29-a163-8063916e0ce8",
"name": "M4 - Alert Needed?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [
1560,
450
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO security_events(tenant_id, restaurant_id, conversation_key, channel, user_id, event_type, severity, payload_json) VALUES (NULL,NULL,NULL,NULL,NULL,'SLO_BREACH',$1, $2::jsonb) RETURNING 1;",
"additionalFields": {
"queryParams": "={{[$json.severity, JSON.stringify($json)]}}"
}
},
"id": "6eb983bd-bb86-4136-9f28-73ebbefe28ff",
"name": "M6 - Log SLO Breach (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
1820,
330
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO ops_kv(key, value_json, updated_at)\nVALUES ('alert:slo', jsonb_build_object('last_sent_at', now(), 'last_hash', $1), now())\nON CONFLICT (key) DO UPDATE SET value_json=EXCLUDED.value_json, updated_at=now()\nRETURNING key;",
"additionalFields": {
"queryParams": "={{[$json.breach_hash]}}"
}
},
"id": "M5_update_alert_state_3f44a84e",
"name": "M5 - Update Alert State (DB)",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
-450,
880
]
}
],
"connections": {
"S1 - Every 5 min": {
"main": [
[
{
"node": "S2 - Load Due Feedback",
"type": "main",
"index": 0
}
]
]
},
"S2 - Load Due Feedback": {
"main": [
[
{
"node": "S3 - Send Feedback Batch",
"type": "main",
"index": 0
}
]
]
},
"S3 - Send Feedback Batch": {
"main": [
[
{
"node": "S4 - Mark Feedback Status",
"type": "main",
"index": 0
}
]
]
},
"E1 - On Error": {
"main": [
[
{
"node": "E2 - Normalize Error",
"type": "main",
"index": 0
}
]
]
},
"E2 - Normalize Error": {
"main": [
[
{
"node": "E3 - Save Error (DB)",
"type": "main",
"index": 0
}
]
]
},
"E3 - Save Error (DB)": {
"main": [
[
{
"node": "E4 - Optional Alert Webhook",
"type": "main",
"index": 0
}
]
]
},
"O1 - Every 1 min": {
"main": [
[
{
"node": "O2 - Pick Due Outbox (DB)",
"type": "main",
"index": 0
}
]
]
},
"O2 - Pick Due Outbox (DB)": {
"main": [
[
{
"node": "O3 - Send Outbox",
"type": "main",
"index": 0
}
]
]
},
"O3 - Send Outbox": {
"main": [
[
{
"node": "O4 - Update Outbox (DB)",
"type": "main",
"index": 0
}
]
]
},
"R1 - Retention Purge (Daily 03:30)": {
"main": [
[
{
"node": "R2 - Build Retention Plan",
"type": "main",
"index": 0
}
]
]
},
"R2 - Build Retention Plan": {
"main": [
[
{
"node": "R3 - Split Tables",
"type": "main",
"index": 0
}
]
]
},
"R3 - Split Tables": {
"main": [
[
{
"node": "R4 - Retention Start Run (DB)",
"type": "main",
"index": 0
}
]
]
},
"R4 - Retention Start Run (DB)": {
"main": [
[
{
"node": "R5 - Init Counters",
"type": "main",
"index": 0
}
]
]
},
"R5 - Init Counters": {
"main": [
[
{
"node": "R6 - Is Outbox SENT?",
"type": "main",
"index": 0
}
]
]
},
"R6 - Is Outbox SENT?": {
"main": [
[
{
"node": "R7b - Purge Outbox SENT Batch (DB)",
"type": "main",
"index": 0
}
],
[
{
"node": "R7a - Purge Generic Batch (DB)",
"type": "main",
"index": 0
}
]
]
},
"R7a - Purge Generic Batch (DB)": {
"main": [
[
{
"node": "R8 - Accumulate",
"type": "main",
"index": 0
}
]
]
},
"R7b - Purge Outbox SENT Batch (DB)": {
"main": [
[
{
"node": "R8 - Accumulate",
"type": "main",
"index": 0
}
]
]
},
"R8 - Accumulate": {
"main": [
[
{
"node": "R9 - Continue Purge?",
"type": "main",
"index": 0
}
]
]
},
"R9 - Continue Purge?": {
"main": [
[
{
"node": "R6 - Is Outbox SENT?",
"type": "main",
"index": 0
}
],
[
{
"node": "R10 - Finalize Run (DB)",
"type": "main",
"index": 0
}
]
]
},
"R10 - Finalize Run (DB)": {
"main": [
[
{
"node": "R11 - Log Security Event (DB)",
"type": "main",
"index": 0
}
]
]
},
"R11 - Log Security Event (DB)": {
"main": [
[
{
"node": "R12 - Next Table",
"type": "main",
"index": 0
}
]
]
},
"R12 - Next Table": {
"main": [
[
{
"node": "R3 - Split Tables",
"type": "main",
"index": 1
}
]
]
},
"M1 - Every 5 min (SLO Checks)": {
"main": [
[
{
"node": "M2 - Compute SLO Metrics (DB)",
"type": "main",
"index": 0
}
]
]
},
"M2 - Compute SLO Metrics (DB)": {
"main": [
[
{
"node": "M3 - Evaluate SLO Thresholds",
"type": "main",
"index": 0
}
]
]
},
"M3 - Evaluate SLO Thresholds": {
"main": [
[
{
"node": "M4 - Alert Needed?",
"type": "main",
"index": 0
}
]
]
},
"M4 - Alert Needed?": {
"main": [
[
{
"node": "M5 - Update Alert State (DB)",
"type": "main",
"index": 0
},
{
"node": "M6 - Log SLO Breach (DB)",
"type": "main",
"index": 0
},
{
"node": "E4 - Optional Alert Webhook",
"type": "main",
"index": 0
}
],
[]
]
},
"M6 - Log SLO Breach (DB)": {
"main": [
[]
]
},
"E4 - Optional Alert Webhook": {
"main": [
[]
]
}
}
}
About this workflow
W8 - OPS (Feedback Cron + Errors + Alerts). Uses scheduleTrigger, postgres, errorTrigger, splitInBatches. Scheduled trigger; 31 nodes.
Source: https://github.com/zerAda/RestaurantAgentAutomation/blob/41a4d42dcd66e57b1e87b4750c0fd5fbf7058f68/workflows/W8_OPS.json — original creator credit. Request a take-down →