The workflow JSON
Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →
{
"name": "X (Twitter) CSV Analytics Ingest (x-csv-ingest)",
"nodes": [
{
"parameters": {
"path": "={{ $env.X_CSV_DROP_FOLDER || '/files/x-analytics-exports' }}",
"events": [
"add"
],
"options": {}
},
"id": "local-file-trigger-x",
"name": "Watch CSV Folder",
"type": "n8n-nodes-base.localFileTrigger",
"typeVersion": 1,
"position": [
240,
300
],
"notes": "Watches ~/sync/devrel/x-analytics-exports/ for new CSV files dropped by operator. Operator exports from X Analytics dashboard weekly (manual 5min task). The n8n files/ mount maps to this directory."
},
{
"parameters": {
"jsCode": "// Filter: only process .csv files\nconst file = $json;\nif (!file.name || !file.name.toLowerCase().endsWith('.csv')) {\n return [];\n}\nreturn [{ json: file }];"
},
"id": "filter-csv-only",
"name": "Filter CSV Only",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
460,
300
]
},
{
"parameters": {
"filePath": "={{ $json.path }}"
},
"id": "read-csv-file",
"name": "Read CSV File",
"type": "n8n-nodes-base.readBinaryFile",
"typeVersion": 1,
"position": [
680,
300
]
},
{
"parameters": {
"jsCode": "// Parse X Analytics CSV \u2192 analytics_events rows\n// X Analytics CSV columns (typical export):\n// Tweet id, Tweet permalink, Tweet text, Time, Impressions, Engagements,\n// Engagement rate, Retweets, Replies, Likes, Profile clicks, ...\n//\n// Refs: docs/specs/devrel-analytics-stack.md (AC-8)\nconst content = Buffer.from($json.data, 'base64').toString('utf-8');\nconst lines = content.split('\\n').filter(l => l.trim());\nif (lines.length < 2) return [];\n\n// Parse header\nconst header = lines[0].split(',').map(h => h.trim().replace(/^\"|\"$/g, '').toLowerCase());\n\nconst findCol = (candidates) => {\n for (const c of candidates) {\n const idx = header.findIndex(h => h.includes(c));\n if (idx >= 0) return idx;\n }\n return -1;\n};\n\nconst idxTweetId = findCol(['tweet id', 'id']);\nconst idxTime = findCol(['time', 'date']);\nconst idxImpressions = findCol(['impressions']);\nconst idxEngagements = findCol(['engagements']);\nconst idxLikes = findCol(['likes', 'favorites']);\nconst idxRetweets = findCol(['retweets']);\nconst idxReplies = findCol(['replies']);\n\nconst rows = [];\nfor (let i = 1; i < lines.length; i++) {\n const cols = lines[i].split(',').map(c => c.trim().replace(/^\"|\"$/g, ''));\n if (cols.length < 3) continue;\n\n const tweetId = idxTweetId >= 0 ? cols[idxTweetId] : `row-${i}`;\n const contentId = `twitter:${tweetId}`;\n const occurredAt = idxTime >= 0 && cols[idxTime]\n ? new Date(cols[idxTime]).toISOString()\n : new Date().toISOString();\n\n const addRow = (eventType, colIdx) => {\n if (colIdx < 0) return;\n const val = parseFloat(cols[colIdx]);\n if (isNaN(val)) return;\n rows.push({\n platform: 'twitter',\n content_id: contentId,\n event_type: eventType,\n occurred_at: occurredAt,\n metric_value: val,\n metadata: JSON.stringify({ source: 'x_csv_export', tweet_id: tweetId })\n });\n };\n\n addRow('impression', idxImpressions);\n addRow('engagement', idxEngagements);\n addRow('like', idxLikes);\n addRow('retweet', idxRetweets);\n addRow('reply', idxReplies);\n}\n\nreturn rows.map(r => ({ json: r }));"
},
"id": "parse-x-csv",
"name": "Parse X CSV \u2192 analytics_events",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
900,
300
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO analytics_events (occurred_at, platform, content_id, event_type, metric_value, metadata) VALUES ($1::timestamptz, $2, $3, $4, $5, $6::jsonb) ON CONFLICT (platform, content_id, event_type, occurred_at) DO UPDATE SET metric_value = EXCLUDED.metric_value, ingested_at = NOW()",
"additionalFields": {
"queryParams": "={{ [$json.occurred_at, $json.platform, $json.content_id, $json.event_type, $json.metric_value, $json.metadata] }}"
}
},
"id": "upsert-x-events",
"name": "Upsert analytics_events",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [
1120,
300
],
"credentials": {
"postgres": {
"name": "<your credential>"
}
}
},
{
"parameters": {
"jsCode": "// Move processed CSV to archive folder to avoid re-processing\nconst path = require('path');\nconst fs = require('fs');\nconst src = $('Read CSV File').first().json.path;\nif (!src) return [{ json: { archived: false } }];\nconst archiveDir = path.join(path.dirname(src), 'processed');\nif (!fs.existsSync(archiveDir)) fs.mkdirSync(archiveDir, { recursive: true });\nconst dst = path.join(archiveDir, path.basename(src));\ntry { fs.renameSync(src, dst); } catch(e) { /* ignore \u2014 file may already be moved */ }\nreturn [{ json: { archived: true, src, dst } }];"
},
"id": "archive-processed-csv",
"name": "Archive Processed CSV",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1340,
300
]
}
],
"connections": {
"Watch CSV Folder": {
"main": [
[
{
"node": "Filter CSV Only",
"type": "main",
"index": 0
}
]
]
},
"Filter CSV Only": {
"main": [
[
{
"node": "Read CSV File",
"type": "main",
"index": 0
}
]
]
},
"Read CSV File": {
"main": [
[
{
"node": "Parse X CSV \u2192 analytics_events",
"type": "main",
"index": 0
}
]
]
},
"Parse X CSV \u2192 analytics_events": {
"main": [
[
{
"node": "Upsert analytics_events",
"type": "main",
"index": 0
}
]
]
},
"Upsert analytics_events": {
"main": [
[
{
"node": "Archive Processed CSV",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1",
"saveManualExecutions": true
},
"tags": [
"devrel-analytics",
"tier-2",
"twitter",
"x",
"csv"
],
"versionId": "v1",
"notes": "Phase V Part B \u2014 AC-8. Watches /files/x-analytics-exports/ for CSV drops. X API v2 costs $200/mo for analytics; operator exports manually from X Analytics dashboard weekly (~5 min). Drop new CSV \u2192 workflow fires \u2192 ingests \u2192 archives to processed/. Map /files/ in n8n compose to ~/sync/devrel/x-analytics-exports/ on host."
}
Credentials you'll need
Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.
postgres
For the full experience including quality scoring and batch install features for each workflow upgrade to Pro
About this workflow
X (Twitter) CSV Analytics Ingest (x-csv-ingest). Uses localFileTrigger, readBinaryFile, postgres. Event-driven trigger; 6 nodes.
Source: https://github.com/Xipher-Labs/walter-os/blob/main/setup/walter-host/services/n8n/workflows/x-csv-ingest.json — original creator credit. Request a take-down →
Related workflows
Workflows that share integrations, category, or trigger type with this one. All free to copy and import.
How to automatically import CSV files into postgres. Uses manualTrigger, readBinaryFile, spreadsheetFile, postgres. Event-driven trigger; 4 nodes.
-- Disclaimer: This template is mainly made for self-hosted users who can reach CSV files in their file system. For Cloud users, just replace the first few nodes with your file system of choice, like
This workflow acts as a junior finance research analyst for a UK boutique M&A or corporate finance team. It listens for Slack messages, classifies the request, gathers company or market data, and prod
Agendamiento_v2. Uses n8n-nodes-evolution-api, redis, httpRequest, executeWorkflowTrigger. Event-driven trigger; 59 nodes.
Cancelacion_v2. Uses executeWorkflowTrigger, redis, httpRequest, n8n-nodes-evolution-api. Event-driven trigger; 46 nodes.