This workflow follows the Google Sheets → HTTP Request recipe pattern — see all workflows that pair these two integrations.
The workflow JSON
Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →
{
"name": "DAta lake 1",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "kommo-smart-handler",
"responseMode": "lastNode",
"options": {}
},
"id": "webhook_1",
"name": "Kommo Webhook Receiver",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2.1,
"position": [
-2944,
-144
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 1
},
"conditions": [
{
"leftValue": "={{$json.should_respond}}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "equals"
},
"id": "220aed3a-ddbf-42e4-8312-2454bc2c38cf"
},
{
"leftValue": "={{$json.message_text}}",
"rightValue": "",
"operator": {
"type": "string",
"operation": "notEmpty"
},
"id": "5af1d10d-3ebe-42fc-8e53-b276b7445db1"
},
{
"id": "db075dcf-79a0-4bba-9602-22912694bc27",
"leftValue": "={{($json.confidence_level || \"medium\").toString()}}",
"rightValue": "low",
"operator": {
"type": "string",
"operation": "notEquals"
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "check_response_needed",
"name": "Need AI Response?",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [
112,
-480
]
},
{
"parameters": {
"resource": "chat",
"prompt": {
"messages": [
{
"role": "assistant",
"content": "=\u0623\u0646\u062a \u0645\u0633\u0627\u0639\u062f \u062e\u062f\u0645\u0629 \u0639\u0645\u0644\u0627\u0621 \u0645\u062d\u062a\u0631\u0641 \u0644\u0634\u0631\u0643\u0629 \u0627\u0644\u062f\u064a\u0648\u0627\u0646. \u0642\u0648\u0627\u0639\u062f \u0645\u0647\u0645\u0629: - \u0631\u062f \u0628\u0627\u0644\u0644\u063a\u0629 \u0627\u0644\u0639\u0631\u0628\u064a\u0629 \u0625\u0630\u0627 \u0643\u0627\u0646 \u0627\u0644\u0633\u0624\u0627\u0644 \u0628\u0627\u0644\u0639\u0631\u0628\u064a\u0629\u060c \u0648\u0628\u0627\u0644\u0625\u0646\u062c\u0644\u064a\u0632\u064a\u0629 \u0625\u0630\u0627 \u0643\u0627\u0646 \u0628\u0627\u0644\u0625\u0646\u062c\u0644\u064a\u0632\u064a\u0629 - \u0643\u0646 \u0645\u062e\u062a\u0635\u0631\u0627\u064b \u0648\u0648\u0627\u0636\u062d\u0627\u064b (\u0644\u0627 \u062a\u062a\u062c\u0627\u0648\u0632 3 \u062c\u0645\u0644) - \u0623\u0638\u0647\u0631 \u0627\u0644\u0648\u062f \u0648\u0627\u0644\u0627\u062d\u062a\u0631\u0627\u0645 - \u0625\u0630\u0627 \u0633\u064f\u0626\u0644\u062a \u0639\u0646 \u0627\u0644\u0623\u0633\u0639\u0627\u0631\u060c \u0627\u0637\u0644\u0628 \u0645\u0646 \u0627\u0644\u0639\u0645\u064a\u0644 \u0627\u0644\u062a\u0648\u0627\u0635\u0644 \u0645\u0639 \u0627\u0644\u0645\u0628\u064a\u0639\u0627\u062a \u0645\u0639\u0644\u0648\u0645\u0627\u062a \u0627\u0644\u0639\u0645\u064a\u0644: \u0627\u0644\u0627\u0633\u0645: {{$json.client_name || \"\u0639\u0645\u064a\u0644\"}} \u0627\u0644\u0642\u0646\u0627\u0629: {{$json.message_origin || \"\u063a\u064a\u0631 \u0645\u062d\u062f\u062f\"}}"
},
{
"content": "={{$json.message_text}}"
}
]
},
"options": {
"maxTokens": 300,
"temperature": 0.7
},
"requestOptions": {}
},
"id": "openai_response",
"name": "Generate AI Response",
"type": "n8n-nodes-base.openAi",
"typeVersion": 1.1,
"position": [
336,
-480
],
"credentials": {
"openAiApi": {
"name": "<your credential>"
}
}
},
{
"parameters": {
"method": "POST",
"url": "https://infodvnlogcom.kommo.com/api/v4/leads/{{$json.lead_id}}/notes",
"authentication": "predefinedCredentialType",
"nodeCredentialType": "kommoApi",
"sendHeaders": true,
"headerParameters": {
"parameters": [
{
"name": "Authorization",
"value": "=<redacted-credential>"
}
]
},
"sendBody": true,
"specifyBody": "json",
"jsonBody": "{\n \"note_type\": \"common\",\n \"params\": {\n \"text\": \"\ud83e\udd16 AI Response:\\n{{$json.choices[0].message.content}}\",\n \"service\": \"AI Assistant\"\n }\n}",
"options": {}
},
"id": "send_to_kommo",
"name": "Send to Kommo",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.1,
"position": [
112,
-32
]
},
{
"parameters": {
"operation": "append",
"documentId": {
"__rl": true,
"value": "1gEBF0y9g9UwslW6SWiO5aeyJ6hYzFOY4I5sQI5UVC44",
"mode": "list",
"cachedResultName": "kommo",
"cachedResultUrl": "https://docs.google.com/spreadsheets/d/1gEBF0y9g9UwslW6SWiO5aeyJ6hYzFOY4I5sQI5UVC44/edit?usp=drivesdk"
},
"sheetName": {
"__rl": true,
"value": "gid=0",
"mode": "list",
"cachedResultName": "Update Conversation Context",
"cachedResultUrl": "https://docs.google.com/spreadsheets/d/1gEBF0y9g9UwslW6SWiO5aeyJ6hYzFOY4I5sQI5UVC44/edit#gid=0"
},
"columns": {
"mappingMode": "defineBelow",
"value": {
"chat_id": "=={{$json.chat_id}}",
"lead_id": "=={{$json.lead_id}}",
"client_name": "={{$json.client_name}}",
"last_message": "={{$json.message_text}}",
"last_response": "={{$json.ai_response || $json.choices[0].message.content}}",
"last_updated": "={{$now.toISO()}}",
"message_count": "={($json.message_count || 0) + 1}}",
"conversation_history": "={{$json.conversation_history}}",
"language": "={{$json.message_text.match(/[\\u0600-\\u06FF]/) ? 'ar' : 'en'}}",
"status": "={{\"active\"}}",
"tags": "={{$json.event_types}}",
"message_fingerprint": "={{$json.message_fingerprint || \"\"}}",
"processing_time_ms": "={{$json.processing_time_ms || 0}}",
"completeness": "={{$json.completeness_percentage || 100}}%",
"total_pulses": "={{$json.final_pulse_count || 1}}",
"confidence_level": "={{$json.confidence_level || \"medium\"}}",
"service_type": "={{$json.service_type || \"general\"}}",
"status_id": "={{$json.status_id || \"\"}}",
"pipeline_id": "={{$json.pipeline_id || \"\"}}"
},
"matchingColumns": [],
"schema": [
{
"id": "chat_id",
"displayName": "chat_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "lead_id",
"displayName": "lead_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "client_name",
"displayName": "client_name",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "last_message",
"displayName": "last_message",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "last_response",
"displayName": "last_response",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "last_updated",
"displayName": "last_updated",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "message_count",
"displayName": "message_count",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "conversation_history",
"displayName": "conversation_history",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "language",
"displayName": "language",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "status",
"displayName": "status",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "tags",
"displayName": "tags",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "pipeline_id",
"displayName": "pipeline_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "status_id",
"displayName": "status_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "service_type",
"displayName": "service_type",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "confidence_level",
"displayName": "confidence_level",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "total_pulses",
"displayName": "total_pulses",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "completeness",
"displayName": "completeness",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "processing_time_ms",
"displayName": "processing_time_ms",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "message_fingerprint",
"displayName": "message_fingerprint",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
}
],
"attemptToConvertTypes": false,
"convertFieldsToString": false
},
"options": {}
},
"id": "1e60c753-012c-4a9b-8912-34bf199914ce",
"name": "Update Conversation Context",
"type": "n8n-nodes-base.googleSheets",
"typeVersion": 4.6,
"position": [
336,
-32
],
"credentials": {
"googleSheetsOAuth2Api": {
"name": "<your credential>"
}
}
},
{
"parameters": {
"url": "=https://infodvnlogcom.kommo.com/api/v4/leads?query={{ $json.primary_query_value }}\n",
"sendHeaders": true,
"headerParameters": {
"parameters": [
{
"name": "=Authorization",
"value": "=<redacted-credential>"
},
{
"name": "=Content-Type",
"value": "=application/json"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
368,
400
],
"id": "925e384b-5a8a-4089-857c-8938fcdecd52",
"name": "search Leads",
"executeOnce": false
},
{
"parameters": {
"jsCode": "// Loop over input items and add a new field called 'myNewField' to the JSON of each one\nfor (const item of $input.all()) {\n item.json.myNewField = 1;\n}\n\nreturn $input.all();"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
112,
192
],
"id": "2f86ff1d-8acd-4153-ae93-5be17c10c375",
"name": "Code in JavaScript"
},
{
"parameters": {
"jsCode": "// In: items[].json.body (Kommo webhook) or whole item.json\n// Out: { raw, received_at }\nreturn items.map(it => {\n const raw = it.json?.body ? it.json.body : (it.json || {});\n return { json: { raw, received_at: Date.now() } };\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
112,
-256
],
"id": "b4bfc5b1-4a06-4195-9235-ddc0dea2a955",
"name": "Normalize Kommo Payload"
},
{
"parameters": {
"jsCode": "// Helpers (\u0635\u063a\u064a\u0631\u0629 \u0648\u0645\u0643\u0631\u0631\u0629 \u0647\u0646\u0627 \u0644\u0627\u0633\u062a\u0642\u0644\u0627\u0644 \u0627\u0644\u0646\u0648\u062f)\nconst CONFIG = { MAX_EVENTS_PER_MESSAGE: 10 };\nfunction g(b,p,f=''){ if(!b||!p) return f; if(p.includes('[')) return b[p] ?? f;\n try{ return p.split('.').reduce((o,k)=>(o&&k in o)?o[k]:undefined,b) ?? f; }catch{ return f; } }\nfunction extractFlatValue(o,...keys){ for(const k of keys){ const v=g(o,k);\n if(v!==undefined&&v!==null&&v!==''){ if(typeof v==='string'){ const s=v.trim();\n if(s && !s.includes('infodvnlog') && !s.includes('.com')) return s; } else return v; } } return ''; }\nfunction classifyEvent(body){ const ks=Object.keys(body||{}); const has=p=>ks.some(k=>k.startsWith(p));\n const T=[ ['message[add]',10,'message',9], ['unsorted[add]',8,'lead_creation',6], ['talk[add]',6,'conversation',4],\n ['talk[update]',5,'conversation',4], ['leads[update]',4,'lead_management',3], ['leads[delete]',1,'deletion',1],\n ['contacts[update]',3,'contact_management',2], ['contacts[add]',2,'contact_management',2] ];\n for(const [c,p,cat,puls] of T) if(has(c)) return {event_type:c,priority:p,category:cat,expected_pulses:Math.min(puls,CONFIG.MAX_EVENTS_PER_MESSAGE)};\n return {event_type:'unknown',priority:0,category:'other',expected_pulses:1}; }\nfunction detectPlatform(origin='', body={}){ const s=String(origin).toLowerCase();\n if(s.includes('waba')||s.includes('whatsapp')) return 'whatsapp';\n if(s.includes('instagram')||s.includes('ig')) return 'instagram';\n if(s.includes('facebook')||s.includes('fb')) return 'facebook';\n const raw=JSON.stringify(body).toLowerCase();\n if(raw.includes('whatsapp')) return 'whatsapp';\n if(raw.includes('instagram')) return 'instagram';\n if(raw.includes('facebook')) return 'facebook';\n return 'unknown'; }\n\nreturn items.map(item=>{\n const raw = item.json.raw;\n const event = classifyEvent(raw);\n const lead_id = extractFlatValue(raw,'leads[update][0][id]','leads[add][0][id]','unsorted[add][0][lead_id]','talk[update][0][lead_id]');\n const entity_id = lead_id || extractFlatValue(raw,'message[add][0][entity_id]','message[add][0][element_id]','talk[update][0][entity_id]',\n 'talk[add][0][entity_id]','unsorted[add][0][source_data][contact_id]','unsorted[add][0][source_data][from_id]');\n const conversation_id = extractFlatValue(raw,'message[add][0][chat_id]','talk[update][0][chat_id]','talk[add][0][chat_id]',\n 'unsorted[add][0][source_data][conversation_id]','unsorted[add][0][source_data][origin][chat_id]',\n 'unsorted[add][0][source_data][thread_id]','leads[update][0][chat_id]');\n const message_id = extractFlatValue(raw,'message[add][0][id]','unsorted[add][0][source_data][data][0][id]','leads[update][0][message_id]','talk[update][0][message_id]');\n const talk_id = extractFlatValue(raw,'talk[update][0][talk_id]','talk[add][0][talk_id]','message[add][0][talk_id]','talk[update][0][id]');\n const origin = extractFlatValue(raw,'message[add][0][origin]','unsorted[add][0][source]','talk[update][0][origin]','talk[add][0][origin]');\n const platform = detectPlatform(origin, raw);\n const created_at = extractFlatValue(raw,'message[add][0][created_at]','unsorted[add][0][created_at]','talk[update][0][updated_at]',\n 'talk[add][0][created_at]','unsorted[add][0][source_data][date]','leads[update][0][updated_at]') || Math.floor(Date.now()/1000);\n return { json: {\n raw, event,\n ids:{lead_id,entity_id,conversation_id,message_id,talk_id},\n source:{origin,platform},\n timestamps:{created_at, received_at: item.json.received_at}\n }};\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
336,
-256
],
"id": "ce9ac841-e331-4f36-854a-f1989a4a4ad8",
"name": "Extract IDs & Event Type"
},
{
"parameters": {
"jsCode": "// Helpers\nfunction g(b,p,f=''){ if(!b||!p) return f; if(p.includes('[')) return b[p] ?? f;\n try{ return p.split('.').reduce((o,k)=>(o&&k in o)?o[k]:undefined,b) ?? f; }catch{ return f; } }\nfunction extractFlatValue(o,...keys){ for(const k of keys){ const v=g(o,k);\n if(v!==undefined&&v!==null&&v!==''){ if(typeof v==='string'){ const s=v.trim();\n if(s && !s.includes('infodvnlog') && !s.includes('.com')) return s; } else return v; } } return ''; }\nfunction extractTags(body){ const tags=new Set();\n const paths=['leads[update][0][tags]','leads[add][0][tags]','unsorted[add][0][tags]','contacts[update][0][tags]',\n 'leads[update][0][_embedded][tags]','talk[update][0][tags]','talk[add][0][tags]'];\n for(const p of paths){ const v=g(body,p); if(!v) continue;\n if(typeof v==='string') v.split(',').map(t=>t.trim().toLowerCase()).forEach(t=>t&&tags.add(t));\n else if(Array.isArray(v)) v.forEach(x=> typeof x==='string'? tags.add(x.toLowerCase()): (x?.name && tags.add(String(x.name).toLowerCase()))); }\n Object.keys(body).forEach(k=>{ if(k.includes('[tags][')||k.includes('[tag][')){ const v=body[k]; if(typeof v==='string') tags.add(v.toLowerCase()); }});\n return Array.from(tags); }\nfunction extractPipelineStageInfo(body){\n const info={pipeline_id:'',pipeline_name:'',stage_id:'',stage_name:'',previous_stage_id:'',stage_changed:false};\n info.pipeline_id=extractFlatValue(body,'leads[update][0][pipeline_id]','leads[add][0][pipeline_id]','unsorted[add][0][pipeline_id]','leads[status][0][pipeline_id]','talk[update][0][pipeline_id]','leads[update][0][_embedded][pipeline_id]');\n info.stage_id=extractFlatValue(body,'leads[update][0][status_id]','leads[add][0][status_id]','unsorted[add][0][status_id]','leads[status][0][status_id]','leads[update][0][new_status_id]','talk[update][0][status_id]');\n info.previous_stage_id=extractFlatValue(body,'leads[update][0][old_status_id]','leads[status][0][old_status_id]','leads[update][0][previous_status_id]');\n info.pipeline_name=extractFlatValue(body,'leads[update][0][pipeline][name]','leads[update][0][pipeline_name]','leads[update][0][_embedded][pipeline][name]');\n info.stage_name=extractFlatValue(body,'leads[update][0][status][name]','leads[update][0][status_name]','leads[update][0][_embedded][status][name]');\n if(info.previous_stage_id && info.stage_id && info.previous_stage_id!==info.stage_id) info.stage_changed=true;\n return info; }\nfunction extractMessageText(body,eventType){\n let text=''; const paths=[\n 'talk[update][0][last_message_text]','talk[update][0][messages][0][text]','talk[update][0][last_message][text]','talk[update][0][text]','talk[add][0][text]',\n 'message[add][0][text]','message[add][0][message]','message[add][0][body]','message[add][0][content]',\n 'unsorted[add][0][source_data][data][0][text]','unsorted[add][0][text]',\n 'leads[update][0][last_message]','leads[update][0][notes][0][text]','leads[update][0][custom_fields_values][0][values][0][value]',\n 'contacts[update][0][last_message]','contacts[add][0][note]'\n ];\n for(const p of paths){ const v=g(body,p); if(typeof v==='string' && v.trim() && !v.includes('http') && !v.includes('infodvnlog') && !v.includes('.com')){ text=v.trim(); break; } }\n if(!text){\n // sweep notes/messages\n for(let i=0;i<20 && !text;i++){\n for(const p of [\n `leads[update][0][notes][${i}][text]`,\n `talk[update][0][messages][${i}][text]`,\n `talk[update][0][messages][${i}][content]`\n ]){ const v=g(body,p); if(typeof v==='string' && v.trim()){ text=v.trim(); break; } }\n }\n }\n if(!text){\n // deep JSON\n const search=(o,d=0)=>{ if(d>7||!o||typeof o!=='object') return '';\n for(const [k,v] of Object.entries(o)){\n const kk=String(k).toLowerCase();\n if(['text','message','body','content','note','last_message','last_message_text','value'].includes(kk) && typeof v==='string' && v.trim() && !v.includes('http') && !v.includes('infodvnlog') && !v.includes('.com')) return v.trim();\n if(typeof v==='object'){ const n=search(v,d+1); if(n) return n; }\n } return ''; };\n text = search(body);\n }\n return text;\n}\n\nreturn items.map(item=>{\n const { raw, event, ids, source, timestamps } = item.json;\n const text = extractMessageText(raw, event.event_type);\n const tags = extractTags(raw);\n const pipe = extractPipelineStageInfo(raw);\n const client_name = extractFlatValue(raw,'message[add][0][author][name]','unsorted[add][0][source_data][client][name]','unsorted[add][0][source_data][from]','unsorted[add][0][source_data][name]','contacts[add][0][name]','contacts[update][0][name]','leads[update][0][contact_name]','talk[update][0][contact_name]');\n return { json: { raw, event, ids, source, timestamps, content:{ text }, tags, pipeline: pipe, client:{ name: client_name } } };\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
560,
-256
],
"id": "6cad9194-4e29-4ec3-a807-c4877de74722",
"name": "Extract Text, Tags, Pipeline"
},
{
"parameters": {
"jsCode": "const CONFIG = { TIME_WINDOW:5, MESSAGE_CACHE_TTL:120000, MAX_EVENTS_PER_MESSAGE:10, MAX_CACHE_SIZE:5000 };\nfunction generateFingerprint({ entity_id, conversation_id, lead_id, chat_id, platform, created_at }){\n const t = created_at ? Number(created_at) : Date.now()/1000;\n const w = Math.floor(t / CONFIG.TIME_WINDOW);\n if(lead_id) return `${platform}_lead_${lead_id}_${w}`;\n if(entity_id && conversation_id) return `${platform}_${entity_id}_${conversation_id}_${w}`;\n if(chat_id) return `${platform}_chat_${chat_id}_${w}`;\n if(entity_id) return `${platform}_${entity_id}_${w}`;\n if(conversation_id) return `${platform}_${conversation_id}_${w}`;\n return `${platform}_unknown_${w}`;\n}\nconst sd = $getWorkflowStaticData('global');\nif(!sd.messageCache) sd.messageCache = {};\nif(!sd.processedGroups) sd.processedGroups = {};\nif(!sd.lastCleanup) sd.lastCleanup = Date.now();\n\nfunction cleanup(){\n const now=Date.now(); if(now - sd.lastCleanup <= 60000) return;\n for(const k in sd.messageCache) if(now - sd.messageCache[k].timestamp > CONFIG.MESSAGE_CACHE_TTL) delete sd.messageCache[k];\n for(const k in sd.processedGroups) if(now - sd.processedGroups[k] > CONFIG.MESSAGE_CACHE_TTL) delete sd.processedGroups[k];\n const size = Object.keys(sd.messageCache).length;\n if(size > CONFIG.MAX_CACHE_SIZE){\n const arr = Object.entries(sd.messageCache).sort((a,b)=>a[1].timestamp - b[1].timestamp);\n arr.slice(0, size - CONFIG.MAX_CACHE_SIZE).forEach(([k])=> delete sd.messageCache[k]);\n }\n sd.lastCleanup = now;\n}\ncleanup();\n\nreturn items.map(it=>{\n const { event, ids, source, timestamps, content, tags, pipeline, client } = it.json;\n const fp = generateFingerprint({ entity_id: ids.entity_id, conversation_id: ids.conversation_id, lead_id: ids.lead_id, chat_id: ids.conversation_id, platform: source.platform, created_at: timestamps.created_at });\n const now = Date.now();\n const prev = sd.messageCache[fp];\n const g = prev || {\n fingerprint: fp, entity_id:'', lead_id:'', conversation_id:'', message_id:'', talk_id:'',\n message_texts:[], client_name:'', pipeline_id:'', pipeline_name:'', stage_id:'', stage_name:'', previous_stage_id:'', stage_changed:false,\n tags:new Set(), source: source.origin || 'waba', platform: source.platform,\n pulses_received:0, expected_pulses: event.expected_pulses, has_message:false, is_new_lead:false, is_complete:false,\n created_at: timestamps.created_at, first_pulse_time: now, last_pulse_time: now,\n unique_events:new Set(), event_priorities:[], all_event_types:[]\n };\n g.pulses_received++; g.last_pulse_time = now;\n g.unique_events.add(event.event_type); g.event_priorities.push(event.priority); g.all_event_types.push(event.event_type);\n if(ids.lead_id && !g.lead_id) g.lead_id = ids.lead_id;\n if(ids.entity_id && !g.entity_id) g.entity_id = ids.entity_id;\n if(ids.conversation_id && !g.conversation_id) g.conversation_id = ids.conversation_id;\n if(ids.message_id && !g.message_id) g.message_id = ids.message_id;\n if(ids.talk_id && !g.talk_id) g.talk_id = ids.talk_id;\n if(content?.text){ g.message_texts.push({ text: content.text, event_type: event.event_type, priority: event.priority, timestamp: timestamps.created_at }); g.has_message = true; }\n if(client?.name && !g.client_name) g.client_name = client.name;\n if(pipeline?.pipeline_id){ g.pipeline_id = pipeline.pipeline_id; g.pipeline_name = pipeline.pipeline_name; }\n if(pipeline?.stage_id){ g.stage_id = pipeline.stage_id; g.stage_name = pipeline.stage_name; }\n if(pipeline?.previous_stage_id){ g.previous_stage_id = pipeline.previous_stage_id; g.stage_changed = pipeline.stage_changed; }\n (tags||[]).forEach(t=> g.tags.add(t));\n if(event.event_type==='message[add]' && content?.text) g.has_message = true;\n if(event.event_type==='unsorted[add]') g.is_new_lead = true;\n const elapsed = now - g.first_pulse_time;\n if(g.pulses_received>=g.expected_pulses || g.pulses_received>=CONFIG.MAX_EVENTS_PER_MESSAGE || elapsed > CONFIG.TIME_WINDOW*1000) g.is_complete = true;\n sd.messageCache[fp] = { ...g, unique_events_array: Array.from(g.unique_events), timestamp: now };\n return { json: { group_key: fp, is_complete: g.is_complete, expected_pulses: g.expected_pulses, pulses_received: g.pulses_received } };\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
784,
-256
],
"id": "883abb1a-43b4-4897-ada6-20f0c0484e59",
"name": "Upsert Group Cache"
},
{
"parameters": {
"jsCode": "// In: { group_key } \u2192 Out: { group_key, is_complete }\nconst sd = $getWorkflowStaticData('global');\nreturn items.map(it=>{\n const k = it.json.group_key;\n const g = sd.messageCache?.[k];\n return { json: { group_key: k, is_complete: !!g?.is_complete } };\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1008,
-256
],
"id": "90a513cf-1ac8-4605-980a-1acf429d1958",
"name": "Sleep"
},
{
"parameters": {
"jsCode": "const PIPELINE_CONFIG = {\n '10794139': { name:'Sales Pipeline', service:'sales', critical_stages:['10794144'], ai_stages:['10794142','10794143'], bot_stages:['10794140','10794141'] },\n '10794150': { name:'Support Pipeline', service:'support', critical_stages:['10794154'], ai_stages:['10794152','10794153'], bot_stages:['10794151'] },\n '10794160': { name:'Onboarding Pipeline', service:'onboarding', critical_stages:[], ai_stages:['10794162'], bot_stages:['10794161','10794163'] },\n};\nconst TAG_RULES = {\n ai_required:['vip','urgent','complex','ai-assist','premium','enterprise','priority'],\n bot_only:['simple','faq','auto-reply','bot-handled','no-ai','standard','routine'],\n human_required:['complaint','escalation','legal','refund','critical','emergency','manual'],\n language:{ ar:'arabic', en:'english', es:'spanish', fr:'french' }\n};\nfunction determineInterventionType(pipeline_id, stage_id, tags){\n const r={ type:'bot', reason:[], priority:'normal', service:'general', language:'ar', requires_human:false, use_ai_agent:false, confidence:'high' };\n const pc = PIPELINE_CONFIG[pipeline_id];\n if(pc){ r.service=pc.service;\n if(pc.critical_stages?.includes(stage_id)){ Object.assign(r,{type:'human',requires_human:true,priority:'critical'}); r.reason.push('critical_stage'); }\n else if(pc.ai_stages?.includes(stage_id)){ Object.assign(r,{type:'ai_agent',use_ai_agent:true,priority:'high'}); r.reason.push('ai_stage'); }\n else if(pc.bot_stages?.includes(stage_id)){ r.type='bot'; r.reason.push('bot_stage'); }\n } else { r.confidence='medium'; r.reason.push('unknown_pipeline'); }\n const human = (tags||[]).filter(t=>TAG_RULES.human_required.includes(t));\n if(human.length){ Object.assign(r,{type:'human',requires_human:true,priority:'critical'}); r.reason.push(`human_tags: ${human.join(', ')}`); }\n const ai = (tags||[]).filter(t=>TAG_RULES.ai_required.includes(t));\n if(ai.length && !r.requires_human){ Object.assign(r,{type:'ai_agent',use_ai_agent:true,priority:r.priority==='critical'?'critical':'high'}); r.reason.push(`ai_tags: ${ai.join(', ')}`); }\n const bot = (tags||[]).filter(t=>TAG_RULES.bot_only.includes(t));\n if(bot.length && !r.requires_human && !r.use_ai_agent){ Object.assign(r,{type:'bot',use_ai_agent:false,priority:'normal'}); r.reason.push(`bot_tags: ${bot.join(', ')}`); }\n for(const t of (tags||[])) if(TAG_RULES.language[t]){ r.language=t; break; }\n return r;\n}\nfunction pickFinalText(arr=[]){ if(!arr.length) return '';\n const s=[...arr].sort((a,b)=> b.priority-a.priority || b.timestamp-a.timestamp);\n const top=s[0].priority; const same=s.filter(x=>x.priority===top);\n return same.reduce((L,c)=> c.text && c.text.length>(L?.length||0)?c.text:L,'') || s[0].text || ''; }\n\nconst sd = $getWorkflowStaticData('global');\n\nreturn items.map(it=>{\n const k = it.json.group_key;\n const g = sd.messageCache?.[k];\n if(!g) return { json: { error:'group_not_found', group_key:k } };\n\n // cross-ref \u0628\u0633\u064a\u0637: \u062f\u0645\u062c \u0645\u0639\u0644\u0648\u0645\u0627\u062a \u0646\u0627\u0642\u0635\u0629 \u0645\u0646 \u0646\u0641\u0633 entity_id\n if(g.entity_id){\n for(const [ck,cg] of Object.entries(sd.messageCache)){\n if(ck===k) continue;\n if(cg.entity_id===g.entity_id){\n if(!g.pipeline_id && cg.pipeline_id){ g.pipeline_id=cg.pipeline_id; g.pipeline_name=cg.pipeline_name; }\n if(!g.stage_id && cg.stage_id){ g.stage_id=cg.stage_id; g.stage_name=cg.stage_name; }\n if(!g.message_texts?.length && cg.message_texts?.length) g.message_texts=cg.message_texts;\n }\n }\n }\n\n const finalText = pickFinalText(g.message_texts);\n const tags = Array.from(g.tags||[]);\n const intervention = determineInterventionType(g.pipeline_id, g.stage_id, tags);\n\n let action_required='log_only', should_respond=false;\n if(g.has_message && finalText){ action_required='process_message'; should_respond=true; }\n else if(g.is_new_lead && finalText){ action_required='process_new_lead_message'; should_respond=true; }\n else if(g.is_new_lead){ action_required='welcome_new_lead'; should_respond=true; }\n else if(g.stage_changed){ action_required='stage_transition'; should_respond=intervention.use_ai_agent; }\n else if(!finalText && !g.is_new_lead){ action_required='ignore'; should_respond=false; }\n\n const completeness_percentage = Math.min(100, Math.round((g.pulses_received / g.expected_pulses) * 100));\n let confidence_level='low';\n if(completeness_percentage>=90 && finalText) confidence_level='high';\n else if(completeness_percentage>=70 || finalText) confidence_level='medium';\n\n const out = {\n group_key: k,\n lead_id: g.lead_id || g.entity_id || '',\n chat_id: g.conversation_id || '',\n message_text: finalText || '',\n client_name: g.client_name || '',\n event_types: Array.from(g.unique_events||[]).join(', '),\n events_count: g.pulses_received,\n action_required, should_respond,\n event_time_utc: new Date(Number(g.created_at)*1000).toISOString(),\n received_at_utc: new Date().toISOString(),\n unique_keys: [g.message_id||'', g.talk_id||'', g.lead_id||'', g.entity_id||''].filter(Boolean).join('_'),\n pipeline_id: g.pipeline_id || '', pipeline_name: g.pipeline_name || 'Unknown Pipeline',\n pipeline_service: PIPELINE_CONFIG[g.pipeline_id]?.service || 'general',\n stage_id: g.stage_id || '', stage_name: g.stage_name || 'Initial',\n previous_stage_id: g.previous_stage_id || '', stage_changed: g.stage_changed,\n message_fingerprint: k, message_id: g.message_id || '', talk_id: g.talk_id || '', entity_id: g.entity_id || '',\n pipeline: { id:g.pipeline_id||'', name:g.pipeline_name||'Unknown Pipeline', service: PIPELINE_CONFIG[g.pipeline_id]?.service || 'general' },\n stage: { id:g.stage_id||'', name:g.stage_name||'Initial', previous_id:g.previous_stage_id||'', changed:g.stage_changed },\n tags, has_tags: tags.length>0,\n intervention: { type: intervention.type, use_ai_agent: intervention.use_ai_agent, requires_human: intervention.requires_human, priority: intervention.priority, reason: intervention.reason.join('; '), language: intervention.language, confidence: intervention.confidence },\n has_message: g.has_message, is_new_lead: g.is_new_lead,\n confidence_level, pulses_received: g.pulses_received, expected_pulses: g.expected_pulses, completeness_percentage,\n completeness_status: g.is_complete? 'complete':'partial', is_complete: g.is_complete,\n source: g.source || 'waba', platform: g.platform,\n service_type: intervention.service,\n response_channel: g.platform==='whatsapp'?'whatsapp': (g.platform==='instagram'?'instagram': (g.platform==='facebook'?'messenger':'unknown')),\n aggregation_time_ms: g.last_pulse_time - g.first_pulse_time,\n processing_time_ms: Date.now() - g.first_pulse_time,\n is_primary_record: true, processed: true, ready_for_processing: true, deduplication_count: g.pulses_received,\n cache_size: Object.keys(sd.messageCache).length, has_valid_content: !!finalText || g.is_new_lead\n };\n\n // \u0639\u0644\u0627\u0645\u0629 \u201c\u062a\u0645 \u0645\u0639\u0627\u0644\u062c\u062a\u0647\u201d + \u062a\u0646\u0638\u064a\u0641 \u0645\u0646 \u0627\u0644\u0643\u0627\u0634 \u0644\u0648 \u0645\u0643\u062a\u0645\u0644\n const processed = $getWorkflowStaticData('global').processedGroups; processed[k]=Date.now();\n if(g.is_complete) delete sd.messageCache[k];\n\n return { json: out };\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1232,
-256
],
"id": "84603255-c7d5-4636-8598-aa4ea92641f3",
"name": "Finalize Group"
},
{
"parameters": {
"jsCode": "return items.map(it=>{\n const j = it.json;\n const ready = {\n group_key: j.group_key,\n lead_id: j.lead_id,\n chat_id: j.chat_id,\n message_text: j.message_text,\n client_name: j.client_name,\n event_types: j.event_types,\n events_count: j.events_count,\n action_required: j.action_required,\n should_respond: j.should_respond,\n event_time_utc: j.event_time_utc,\n received_at_utc: j.received_at_utc,\n unique_keys: j.unique_keys,\n pipeline_id: j.pipeline_id, pipeline_name: j.pipeline_name, pipeline_service: j.pipeline_service,\n stage_id: j.stage_id, stage_name: j.stage_name, previous_stage_id: j.previous_stage_id, stage_changed: j.stage_changed,\n tags: j.tags,\n intervention_type: j.intervention?.type,\n intervention_priority: j.intervention?.priority,\n language: j.intervention?.language,\n platform: j.platform,\n response_channel: j.response_channel,\n completeness_percentage: j.completeness_percentage,\n confidence_level: j.confidence_level\n };\n return { json: ready };\n});"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1456,
-256
],
"id": "3872aeeb-f3d8-49db-a8a1-a9da6c3c47c0",
"name": "Output Builder"
},
{
"parameters": {
"jsCode": "const CONFIG = { MESSAGE_CACHE_TTL:120000 };\nconst sd = $getWorkflowStaticData('global');\nif(!sd.messageCache) return [{ json:{ status:'no_cache' } }];\nconst before = Object.keys(sd.messageCache).length;\nconst now = Date.now();\nfor(const k in sd.messageCache) if(now - sd.messageCache[k].timestamp > CONFIG.MESSAGE_CACHE_TTL) delete sd.messageCache[k];\nfor(const k in sd.processedGroups || {}) if(now - sd.processedGroups[k] > CONFIG.MESSAGE_CACHE_TTL) delete sd.processedGroups[k];\nconst after = Object.keys(sd.messageCache).length;\nreturn [{ json:{ status:'ok', removed: before-after, remaining: after } }];"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1680,
-256
],
"id": "dfd08269-8e30-447e-8fae-cc7e85531f3a",
"name": "Cache Cleanup"
},
{
"parameters": {
"jsCode": "// ==========================================\n// NODE 2: Extract Basic Info - \u0646\u0633\u062e\u0629 \u0645\u0628\u0633\u0637\u0629 \u0644\u0644\u063a\u0627\u064a\u0629\n// ==========================================\n\n// \u0627\u0633\u062a\u0644\u0627\u0645 \u0627\u0644\u0628\u064a\u0627\u0646\u0627\u062a\nconst items = $input.all();\nconst inputData = items[0];\nconst body = inputData?.json?.body || {};\nconst headers = inputData?.json?.headers || {};\n\n// \u062f\u0627\u0644\u0629 \u0628\u0633\u064a\u0637\u0629 \u062c\u062f\u0627\u064b \u0644\u0644\u0627\u0633\u062a\u062e\u0631\u0627\u062c\nfunction getValue(obj, key) {\n const val = obj[key];\n if (val === undefined || val === null) return \"\";\n if (typeof val === 'object') return JSON.stringify(val);\n return String(val);\n}\n\n// \u0627\u0633\u062a\u062e\u0631\u0627\u062c \u0646\u0648\u0639 \u0627\u0644\u062d\u062f\u062b\nlet eventType = \"unknown\";\nconst bodyKeys = Object.keys(body);\nfor (const key of bodyKeys) {\n if (key.includes(\"[add]\") || key.includes(\"[update]\")) {\n eventType = key.split(\"[\")[0] + \"[\" + key.split(\"[\")[1];\n break;\n }\n}\n\n// \u0627\u0633\u062a\u062e\u0631\u0627\u062c \u0627\u0644\u0645\u0639\u0631\u0641\u0627\u062a - \u0645\u0628\u0627\u0634\u0631 \u0648\u0628\u0633\u064a\u0637\nlet lead_id = \"\";\nlet chat_id = \"\";\nlet message_id = \"\";\nlet message_text = \"\";\nlet client_name = \"\";\n\n// \u0627\u0633\u062a\u062e\u0631\u0627\u062c \u0628\u0646\u0627\u0621\u064b \u0639\u0644\u0649 \u0646\u0648\u0639 \u0627\u0644\u062d\u062f\u062b\nif (eventType === \"unsorted[add]\") {\n lead_id = getValue(body, \"unsorted[add][0][lead_id]\");\n message_text = getValue(body, \"unsorted[add][0][source_data][data][0][text]\");\n client_name = getValue(body, \"unsorted[add][0][source_data][client][name]\");\n \n} else if (eventType === \"message[add]\") {\n lead_id = getValue(body, \"message[add][0][entity_id]\");\n chat_id = getValue(body, \"message[add][0][chat_id]\");\n message_id = getValue(body, \"message[add][0][id]\");\n message_text = getValue(body, \"message[add][0][text]\");\n client_name = getValue(body, \"message[add][0][author][name]\");\n \n} else if (eventType === \"talk[update]\") {\n lead_id = getValue(body, \"talk[update][0][entity_id]\");\n chat_id = getValue(body, \"talk[update][0][chat_id]\");\n \n} else if (eventType === \"leads[update]\") {\n lead_id = getValue(body, \"leads[update][0][id]\");\n}\n\n// \u0625\u0646\u0634\u0627\u0621 \u0627\u0644\u0633\u062c\u0644 - \u0643\u0644 \u0627\u0644\u0642\u064a\u0645 \u0646\u0635 \u0628\u0633\u064a\u0637\nconst output = {\n A: Date.now() + \"_\" + Math.random().toString(36).substring(7), // row_id\n B: getValue(headers, \"x-amocrm-requestid\"), // webhook_id\n C: new Date().toISOString(), // received_at\n D: \"4376\", // execution_id\n E: eventType, // event_type\n F: eventType.includes(\"message\") ? \"message\" : \"other\", // event_category\n G: lead_id, // lead_id\n H: chat_id, // chat_id\n I: message_id, // message_id\n J: \"\", // talk_id\n K: message_text, // message_text\n L: client_name, // client_name\n M: \"\", // pipeline_id\n N: \"\", // status_id\n O: \"\", // event_time\n P: JSON.stringify(body).substring(0, 5000), // raw_body\n Q: \"FALSE\", // processed\n R: \"\", // processed_at\n S: \"\" // processing_notes\n};\n\n// \u0625\u0631\u062c\u0627\u0639 \u0627\u0644\u0628\u064a\u0627\u0646\u0627\u062a \u0643\u0640 flat object\nreturn [{\n json: {\n row_id: output.A,\n webhook_id: output.B,\n received_at: output.C,\n execution_id: output.D,\n event_type: output.E,\n event_category: output.F,\n lead_id: output.G,\n chat_id: output.H,\n message_id: output.I,\n talk_id: output.J,\n message_text: output.K,\n client_name: output.L,\n pipeline_id: output.M,\n status_id: output.N,\n event_time: output.O,\n raw_body: output.P,\n processed: output.Q,\n processed_at: output.R,\n processing_notes: output.S\n }\n}];"
},
"id": "4e982a41-703f-4bf7-93b6-70d3983d5600",
"name": "Extract Basic Info",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
-1968,
-512
]
},
{
"parameters": {
"jsCode": "{\n \"responseCode\": 200,\n \"responseHeaders\": {\n \"Content-Type\": \"application/json\"\n },\n \"responseBody\": {\n \"status\": \"success\",\n \"row_id\": \"={{$json.row_id}}\",\n \"event_type\": \"={{$json.event_type}}\",\n \"message\": \"Event saved successfully\"\n }\n}"
},
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
-1264,
-176
],
"id": "ec153676-6efd-4ef7-bcde-669d1508a31d",
"name": "Return Success"
},
{
"parameters": {
"operation": "append",
"documentId": {
"__rl": true,
"value": "1gEBF0y9g9UwslW6SWiO5aeyJ6hYzFOY4I5sQI5UVC44",
"mode": "list",
"cachedResultName": "kommo",
"cachedResultUrl": "https://docs.google.com/spreadsheets/d/1gEBF0y9g9UwslW6SWiO5aeyJ6hYzFOY4I5sQI5UVC44/edit?usp=drivesdk"
},
"sheetName": {
"__rl": true,
"value": 241084562,
"mode": "list",
"cachedResultName": " Raw_Events",
"cachedResultUrl": "https://docs.google.com/spreadsheets/d/1gEBF0y9g9UwslW6SWiO5aeyJ6hYzFOY4I5sQI5UVC44/edit#gid=241084562"
},
"columns": {
"mappingMode": "defineBelow",
"value": {
"lead_id": "={{$json.chat_id}}",
"chat_id": "={{$json.chat_id || \"\"}}",
"message_text": "={{$json.message_text}}",
"client_name": "={{$json.client_name}}",
"pipeline_id": "={{$json.pipeline_id}}",
"row_id": "={{$json.row_id}}",
"processing_notes": "={{$json.processing_notes}}",
"processed_at": "={{$json.processed_at}}",
"processed": "={{$json.processed}}",
"raw_body": "={{$json.raw_body}}",
"event_time": "={{$json.event_time}}",
"status_id": "={{$json.status_id}}",
"talk_id": "={{$json.talk_id}}",
"message_id": "={{$json.message_id}}",
"event_category": "={{$json.event_category}}",
"event_type": "={{$json.event_type}}",
"execution_id": "={{$json.execution_id}}",
"received_at": "={{$json.received_at}}",
"webhook_id": "={{$json.webhook_id}}"
},
"matchingColumns": [],
"schema": [
{
"id": "row_id",
"displayName": "row_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "webhook_id",
"displayName": "webhook_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "received_at",
"displayName": "received_at",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "execution_id",
"displayName": "execution_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "event_type",
"displayName": "event_type",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "event_category",
"displayName": "event_category",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "lead_id",
"displayName": "lead_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "chat_id",
"displayName": "chat_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "message_id",
"displayName": "message_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "talk_id",
"displayName": "talk_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "message_text",
"displayName": "message_text",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true
},
{
"id": "client_name",
"displayName": "client_name",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "pipeline_id",
"displayName": "pipeline_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "status_id",
"displayName": "status_id",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "event_time",
"displayName": "event_time",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "raw_body",
"displayName": "raw_body",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "processed",
"displayName": "processed",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "processed_at",
"displayName": "processed_at",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
},
{
"id": "processing_notes",
"displayName": "processing_notes",
"required": false,
"defaultMatch": false,
"display": true,
"type": "string",
"canBeUsedToMatch": true,
"removed": false
}
],
"attemptToConvertTypes": false,
"convertFieldsToString": false
},
"options": {}
},
"id": "d17c373d-3b9d-4928-be0a-514d6c4ce33f",
"name": "Save Events to Google Sheets1",
"type": "n8n-nodes-base.googleSheets",
"typeVersion": 4.6,
"position": [
-1552,
-496
],
"credentials": {
"googleSheetsOAuth2Api": {
"name": "<your credential>"
}
}
},
{
"parameters": {
"jsCode": "// ==========================================\n// NODE: Queue Manager - \u0645\u0646\u0638\u0645 \u0627\u0644\u062a\u062f\u0641\u0642\n// ==========================================\n// \u0636\u0639 \u0647\u0630\u0627 \u0627\u0644\u0640 Node \u0628\u064a\u0646 Extract Basic Info \u0648 Save to Google Sheets\n// \u0627\u0644\u0647\u062f\u0641: \u062a\u0646\u0638\u064a\u0645 \u0648\u062a\u0647\u062f\u0626\u0629 \u062a\u062f\u0641\u0642 \u0627\u0644\u0628\u064a\u0627\u0646\u0627\u062a \u0644\u062a\u062c\u0646\u0628 \u0627\u0644\u062a\u0635\u0627\u062f\u0645\n\n// \u0627\u0644\u062d\u0635\u0648\u0644 \u0639\u0644\u0649 \u0627\u0644\u0630\u0627\u0643\u0631\u0629 \u0627\u0644\u0645\u0624\u0642\u062a\u0629 \u0627\u0644\u0645\u0634\u062a\u0631\u0643\u0629\nconst staticData = $getWorkflowStaticData('global');\n\n// \u062a\u0647\u064a\u0626\u0629 \u0627\u0644\u0642\u0627\u0626\u0645\u0629 \u0625\u0630\u0627 \u0644\u0645 \u062a\u0643\u0646 \u0645\u0648\u062c\u0648\u062f\u0629\nif (!staticData.queue) {\n staticData.queue = [];\n staticData.lastProcessTime = 0;\n staticData.isProcessing = false;\n staticData.queueId = 0;\n}\n\n// \u0625\u0639\u062f\u0627\u062f\u0627\u062a \u0627\u0644\u062a\u062d\u0643\u0645\nconst CONFIG = {\n MIN_DELAY_MS: 2000, // \u0623\u0642\u0644 \u062a\u0623\u062e\u064a\u0631 \u0628\u064a\u0646 \u0627\u0644\u0643\u062a\u0627\u0628\u0627\u062a (2 \u062b\u0627\u0646\u064a\u0629)\n MAX_QUEUE_SIZE: 10, // \u0623\u0642\u0635\u0649 \u062d\u062c\u0645 \u0644\u0644\u0642\u0627\u0626\u0645\u0629 \u0642\u0628\u0644 \u0627\u0644\u0625\u062c\u0628\u0627\u0631 \u0639\u0644\u0649 \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629\n MAX_WAIT_TIME: 5000, // \u0623\u0642\u0635\u0649 \u0648\u0642\u062a \u0627\u0646\u062a\u0638\u0627\u0631 (5 \u062b\u0648\u0627\u0646\u064a)\n BATCH_SIZE: 1, // \u0639\u062f\u062f \u0627\u0644\u0633\u062c\u0644\u0627\u062a \u0641\u064a \u0643\u0644 \u062f\u0641\u0639\u0629\n DEBUG: true\n};\n\n// \u062f\u0627\u0644\u0629 \u0644\u0644\u0637\u0628\u0627\u0639\u0629\nfunction log(...args) {\n if (CONFIG.DEBUG) {\n console.log(`[Queue Manager ${new Date().toISOString()}]`, ...args);\n }\n}\n\n// \u062f\u0627\u0644\u0629 \u0644\u0644\u0627\u0646\u062a\u0638\u0627\u0631\nfunction sleep(ms) {\n return new Promise(resolve => setTimeout(resolve, ms));\n}\n\n// \u0627\u0633\u062a\u0644\u0627\u0645 \u0627\u0644\u0628\u064a\u0627\u0646\u0627\u062a \u0627\u0644\u0648\u0627\u0631\u062f\u0629\nconst incomingData = $json;\nconst currentTime = Date.now();\n\n// \u0625\u0636\u0627\u0641\u0629 \u0645\u0639\u0631\u0641 \u0641\u0631\u064a\u062f \u0644\u0644\u0633\u062c\u0644\nincomingData.queue_id = ++staticData.queueId;\nincomingData.queued_at = new Date().toISOString();\n\nlog(`Incoming record #${incomingData.queue_id}:`, {\n event_type: incomingData.event_type,\n lead_id: incomingData.lead_id,\n message: incomingData.message_text ? incomingData.message_text.substring(0, 30) : \"EMPTY\"\n});\n\n// \u0625\u0636\u0627\u0641\u0629 \u0644\u0644\u0642\u0627\u0626\u0645\u0629\nstaticData.queue.push(incomingData);\nlog(`Queue size: ${staticData.queue.length}`);\n\n// \u062d\u0633\u0627\u0628 \u0627\u0644\u0648\u0642\u062a \u0645\u0646\u0630 \u0622\u062e\u0631 \u0645\u0639\u0627\u0644\u062c\u0629\nconst timeSinceLastProcess = currentTime - staticData.lastProcessTime;\nlog(`Time since last process: ${timeSinceLastProcess}ms`);\n\n// \u062a\u062d\u062f\u064a\u062f \u0647\u0644 \u0646\u062d\u062a\u0627\u062c \u0644\u0644\u0645\u0639\u0627\u0644\u062c\u0629 \u0627\u0644\u0622\u0646\nlet shouldProcess = false;\nlet processReason = \"\";\n\n// \u0634\u0631\u0648\u0637 \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629\nif (staticData.queue.length >= CONFIG.MAX_QUEUE_SIZE) {\n shouldProcess = true;\n processReason = \"Queue full\";\n} else if (timeSinceLastProcess >= CONFIG.MAX_WAIT_TIME && staticData.queue.length > 0) {\n shouldProcess = true;\n processReason = \"Max wait time reached\";\n} else if (timeSinceLastProcess >= CONFIG.MIN_DELAY_MS && !staticData.isProcessing) {\n shouldProcess = true;\n processReason = \"Min delay passed\";\n}\n\n// \u0625\u0630\u0627 \u0642\u0631\u0631\u0646\u0627 \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629\nif (shouldProcess) {\n log(`Processing triggered: ${processReason}`);\n \n // \u0645\u0646\u0639 \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629 \u0627\u0644\u0645\u062a\u0632\u0627\u0645\u0646\u0629\n if (staticData.isProcessing) {\n log(\"Another process is running, waiting...\");\n \n // \u0627\u0646\u062a\u0638\u0631 \u062d\u062a\u0649 \u0627\u0646\u062a\u0647\u0627\u0621 \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629 \u0627\u0644\u0633\u0627\u0628\u0642\u0629\n let waitCount = 0;\n while (staticData.isProcessing && waitCount < 50) {\n await sleep(100);\n waitCount++;\n }\n \n if (waitCount >= 50) {\n log(\"WARNING: Timeout waiting for previous process\");\n staticData.isProcessing = false; // \u0625\u0639\u0627\u062f\u0629 \u062a\u0639\u064a\u064a\u0646 \u0641\u064a \u062d\u0627\u0644\u0629 \u0627\u0644\u062a\u0639\u0644\u064a\u0642\n }\n }\n \n // \u0628\u062f\u0621 \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629\n staticData.isProcessing = true;\n \n try {\n // \u0623\u062e\u0630 \u0627\u0644\u0639\u062f\u062f \u0627\u0644\u0645\u0637\u0644\u0648\u0628 \u0645\u0646 \u0627\u0644\u0642\u0627\u0626\u0645\u0629\n const toProcess = staticData.queue.splice(0, CONFIG.BATCH_SIZE);\n \n if (toProcess.length > 0) {\n log(`Processing ${toProcess.length} record(s)`);\n \n // \u062a\u062d\u062f\u064a\u062b \u0648\u0642\u062a \u0622\u062e\u0631 \u0645\u0639\u0627\u0644\u062c\u0629\n staticData.lastProcessTime = Date.now();\n \n // \u0625\u0636\u0627\u0641\u0629 \u0645\u0639\u0644\u0648\u0645\u0627\u062a \u0627\u0644\u0645\u0639\u0627\u0644\u062c\u0629\n const processedRecords = toProcess.map(record => {\n record.processed_from_queue = true;\n record.queue_wait_time = Date.now() - new Date(record.queued_at).getTime();\n record.queue_process_batch = staticData.queueId;\n \n // \u062a\u0646\u0638\u064a\u0641 \u0627\u0644\u0642\u064a\u0645 \u0627\u0644\u0641\u0627\u0631\u063a\u0629\n Object.keys(record).forEach(key => {\n if (record[key] === undefined || record[key] === null) {\n record[key] = \"\";\n }\n });\n \n return { json: record };\n });\n \n log(`Sending to Google Sheets:`, {\n count: processedRecords.length,\n remaining_in_queue: staticData.queue.length\n });\n \n // \u0625\u0636\u0627\u0641\u0629 \u062a\u0623\u062e\u064a\u0631 \u0635\u063a\u064a\u0631 \u0644\u0644\u0623\u0645\u0627\u0646\n await sleep(500);\n \n // \u0625\u0646\u0647\u0627\u0621 \u0627\u0644\u0645\u0639\u0627\u
Credentials you'll need
Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.
googleSheetsOAuth2ApimongoDbopenAiApi
For the full experience including quality scoring and batch install features for each workflow upgrade to Pro
About this workflow
DAta lake 1. Uses openAi, httpRequest, googleSheets, mongoDb. Webhook trigger; 23 nodes.
Source: https://github.com/divantrade/n8n-aldewan/blob/2b5767c553a165bc411b0865cb031dd4e76c8d91/workflows/data-lake-1.json — original creator credit. Request a take-down →
Related workflows
Workflows that share integrations, category, or trigger type with this one. All free to copy and import.
This n8n workflow automates the transformation of spreadsheet data into professional charts and graphs using AI-driven analysis. Triggered via Slack, it processes uploaded files (Excel, CSV, Google Sh
Scheduled processes retrieve customer feedback from multiple channels. The system performs sentiment analysis to classify tone, then uses OpenAI models to extract themes, topics, and urgency indicator
This workflow is designed for Customer Success Managers, Growth Teams, and SaaS Business Owners who want to proactively reduce churn using AI. It automates the analysis of customer health and the deli
This workflow functions as an automated "Chief Wellness Officer," helping HR teams and managers prevent employee burnout before it happens. It aggregates data from communication channels and work tool
Postgres. Uses openAi, postgres, postgresTool, httpRequest. Webhook trigger; 19 nodes.