AutomationFlowsData & Sheets › Ingest Twitter CSV Analytics to Postgres

Ingest Twitter CSV Analytics to Postgres

Original n8n title: X (twitter) CSV Analytics Ingest (x-csv-ingest)

X (Twitter) CSV Analytics Ingest (x-csv-ingest). Uses localFileTrigger, readBinaryFile, postgres. Event-driven trigger; 6 nodes.

Event trigger★★★★☆ complexity6 nodesLocal File TriggerRead Binary FilePostgres
Data & Sheets Trigger: Event Nodes: 6 Complexity: ★★★★☆ Added:

The workflow JSON

Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →

Download .json
{
  "name": "X (Twitter) CSV Analytics Ingest (x-csv-ingest)",
  "nodes": [
    {
      "parameters": {
        "path": "={{ $env.X_CSV_DROP_FOLDER || '/files/x-analytics-exports' }}",
        "events": [
          "add"
        ],
        "options": {}
      },
      "id": "local-file-trigger-x",
      "name": "Watch CSV Folder",
      "type": "n8n-nodes-base.localFileTrigger",
      "typeVersion": 1,
      "position": [
        240,
        300
      ],
      "notes": "Watches ~/sync/devrel/x-analytics-exports/ for new CSV files dropped by operator. Operator exports from X Analytics dashboard weekly (manual 5min task). The n8n files/ mount maps to this directory."
    },
    {
      "parameters": {
        "jsCode": "// Filter: only process .csv files\nconst file = $json;\nif (!file.name || !file.name.toLowerCase().endsWith('.csv')) {\n  return [];\n}\nreturn [{ json: file }];"
      },
      "id": "filter-csv-only",
      "name": "Filter CSV Only",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        460,
        300
      ]
    },
    {
      "parameters": {
        "filePath": "={{ $json.path }}"
      },
      "id": "read-csv-file",
      "name": "Read CSV File",
      "type": "n8n-nodes-base.readBinaryFile",
      "typeVersion": 1,
      "position": [
        680,
        300
      ]
    },
    {
      "parameters": {
        "jsCode": "// Parse X Analytics CSV \u2192 analytics_events rows\n// X Analytics CSV columns (typical export):\n//   Tweet id, Tweet permalink, Tweet text, Time, Impressions, Engagements,\n//   Engagement rate, Retweets, Replies, Likes, Profile clicks, ...\n//\n// Refs: docs/specs/devrel-analytics-stack.md (AC-8)\nconst content = Buffer.from($json.data, 'base64').toString('utf-8');\nconst lines = content.split('\\n').filter(l => l.trim());\nif (lines.length < 2) return [];\n\n// Parse header\nconst header = lines[0].split(',').map(h => h.trim().replace(/^\"|\"$/g, '').toLowerCase());\n\nconst findCol = (candidates) => {\n  for (const c of candidates) {\n    const idx = header.findIndex(h => h.includes(c));\n    if (idx >= 0) return idx;\n  }\n  return -1;\n};\n\nconst idxTweetId     = findCol(['tweet id', 'id']);\nconst idxTime        = findCol(['time', 'date']);\nconst idxImpressions = findCol(['impressions']);\nconst idxEngagements = findCol(['engagements']);\nconst idxLikes       = findCol(['likes', 'favorites']);\nconst idxRetweets    = findCol(['retweets']);\nconst idxReplies     = findCol(['replies']);\n\nconst rows = [];\nfor (let i = 1; i < lines.length; i++) {\n  const cols = lines[i].split(',').map(c => c.trim().replace(/^\"|\"$/g, ''));\n  if (cols.length < 3) continue;\n\n  const tweetId = idxTweetId >= 0 ? cols[idxTweetId] : `row-${i}`;\n  const contentId = `twitter:${tweetId}`;\n  const occurredAt = idxTime >= 0 && cols[idxTime]\n    ? new Date(cols[idxTime]).toISOString()\n    : new Date().toISOString();\n\n  const addRow = (eventType, colIdx) => {\n    if (colIdx < 0) return;\n    const val = parseFloat(cols[colIdx]);\n    if (isNaN(val)) return;\n    rows.push({\n      platform: 'twitter',\n      content_id: contentId,\n      event_type: eventType,\n      occurred_at: occurredAt,\n      metric_value: val,\n      metadata: JSON.stringify({ source: 'x_csv_export', tweet_id: tweetId })\n    });\n  };\n\n  addRow('impression', idxImpressions);\n  addRow('engagement', idxEngagements);\n  addRow('like', idxLikes);\n  addRow('retweet', idxRetweets);\n  addRow('reply', idxReplies);\n}\n\nreturn rows.map(r => ({ json: r }));"
      },
      "id": "parse-x-csv",
      "name": "Parse X CSV \u2192 analytics_events",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        900,
        300
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "INSERT INTO analytics_events (occurred_at, platform, content_id, event_type, metric_value, metadata) VALUES ($1::timestamptz, $2, $3, $4, $5, $6::jsonb) ON CONFLICT (platform, content_id, event_type, occurred_at) DO UPDATE SET metric_value = EXCLUDED.metric_value, ingested_at = NOW()",
        "additionalFields": {
          "queryParams": "={{ [$json.occurred_at, $json.platform, $json.content_id, $json.event_type, $json.metric_value, $json.metadata] }}"
        }
      },
      "id": "upsert-x-events",
      "name": "Upsert analytics_events",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        1120,
        300
      ],
      "credentials": {
        "postgres": {
          "name": "<your credential>"
        }
      }
    },
    {
      "parameters": {
        "jsCode": "// Move processed CSV to archive folder to avoid re-processing\nconst path = require('path');\nconst fs = require('fs');\nconst src = $('Read CSV File').first().json.path;\nif (!src) return [{ json: { archived: false } }];\nconst archiveDir = path.join(path.dirname(src), 'processed');\nif (!fs.existsSync(archiveDir)) fs.mkdirSync(archiveDir, { recursive: true });\nconst dst = path.join(archiveDir, path.basename(src));\ntry { fs.renameSync(src, dst); } catch(e) { /* ignore \u2014 file may already be moved */ }\nreturn [{ json: { archived: true, src, dst } }];"
      },
      "id": "archive-processed-csv",
      "name": "Archive Processed CSV",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1340,
        300
      ]
    }
  ],
  "connections": {
    "Watch CSV Folder": {
      "main": [
        [
          {
            "node": "Filter CSV Only",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Filter CSV Only": {
      "main": [
        [
          {
            "node": "Read CSV File",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Read CSV File": {
      "main": [
        [
          {
            "node": "Parse X CSV \u2192 analytics_events",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Parse X CSV \u2192 analytics_events": {
      "main": [
        [
          {
            "node": "Upsert analytics_events",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Upsert analytics_events": {
      "main": [
        [
          {
            "node": "Archive Processed CSV",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "active": false,
  "settings": {
    "executionOrder": "v1",
    "saveManualExecutions": true
  },
  "tags": [
    "devrel-analytics",
    "tier-2",
    "twitter",
    "x",
    "csv"
  ],
  "versionId": "v1",
  "notes": "Phase V Part B \u2014 AC-8. Watches /files/x-analytics-exports/ for CSV drops. X API v2 costs $200/mo for analytics; operator exports manually from X Analytics dashboard weekly (~5 min). Drop new CSV \u2192 workflow fires \u2192 ingests \u2192 archives to processed/. Map /files/ in n8n compose to ~/sync/devrel/x-analytics-exports/ on host."
}

Credentials you'll need

Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.

Pro

For the full experience including quality scoring and batch install features for each workflow upgrade to Pro

About this workflow

X (Twitter) CSV Analytics Ingest (x-csv-ingest). Uses localFileTrigger, readBinaryFile, postgres. Event-driven trigger; 6 nodes.

Source: https://github.com/Xipher-Labs/walter-os/blob/main/setup/walter-host/services/n8n/workflows/x-csv-ingest.json — original creator credit. Request a take-down →

More Data & Sheets workflows → · Browse all categories →

Related workflows

Workflows that share integrations, category, or trigger type with this one. All free to copy and import.

Data & Sheets

How to automatically import CSV files into postgres. Uses manualTrigger, readBinaryFile, spreadsheetFile, postgres. Event-driven trigger; 4 nodes.

Read Binary File, Spreadsheet File, Postgres
Data & Sheets

-- Disclaimer: This template is mainly made for self-hosted users who can reach CSV files in their file system. For Cloud users, just replace the first few nodes with your file system of choice, like

Read Binary File, Spreadsheet File, Postgres
Data & Sheets

This workflow acts as a junior finance research analyst for a UK boutique M&A or corporate finance team. It listens for Slack messages, classifies the request, gathers company or market data, and prod

HTTP Request, Google Drive, Google Docs +5
Data & Sheets

Agendamiento_v2. Uses n8n-nodes-evolution-api, redis, httpRequest, executeWorkflowTrigger. Event-driven trigger; 59 nodes.

N8N Nodes Evolution Api, Redis, HTTP Request +3
Data & Sheets

Cancelacion_v2. Uses executeWorkflowTrigger, redis, httpRequest, n8n-nodes-evolution-api. Event-driven trigger; 46 nodes.

Execute Workflow Trigger, Redis, HTTP Request +3