This workflow follows the HTTP Request → Postgres recipe pattern — see all workflows that pair these two integrations.
The workflow JSON
Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →
{
"name": "ingest",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "ingest",
"responseMode": "lastNode",
"responseData": "allEntries",
"options": {}
},
"id": "9f32ff2b-9532-47a5-86f7-312c84f243d8",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [
0,
0
]
},
{
"parameters": {
"jsCode": "const payload = ($json.body && typeof $json.body === 'object') ? $json.body : $json;\nconst filepath = typeof payload.filepath === 'string' ? payload.filepath.trim() : '';\nconst filename = typeof payload.filename === 'string' && payload.filename.trim() ? payload.filename.trim() : (filepath ? filepath.split('/').pop() : '');\nconst rawContent = typeof payload.content === 'string' ? payload.content : '';\nconst content = rawContent.replace(/\\r\\n/g, '\\n');\nconst modifiedEpoch = payload.modifiedEpoch !== undefined && payload.modifiedEpoch !== null ? Number(payload.modifiedEpoch) : null;\nconst rawProjectSlug = payload.project_slug;\nconst sourceType = typeof payload.source_type === 'string' && payload.source_type.trim() ? payload.source_type.trim() : 'file_ingest';\nconst runId = typeof payload.run_id === 'string' && payload.run_id.trim() !== '' ? payload.run_id.trim() : 'ingest-' + Date.now() + '-' + Math.random().toString(36).slice(2, 8);\nconst correlationId = typeof payload.correlation_id === 'string' && payload.correlation_id.trim() !== '' ? payload.correlation_id.trim() : runId;\nconst slugPattern = /^[a-z0-9]+(?:[-_][a-z0-9]+)*$/;\nconst hashText = (value) => {\n let hash = 2166136261;\n for (let index = 0; index < value.length; index += 1) {\n hash ^= value.charCodeAt(index);\n hash = Math.imul(hash, 16777619);\n }\n return (hash >>> 0).toString(16).padStart(8, '0');\n};\nconst withStage = (trace, stage, status, extra = {}) => {\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (trace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (trace.error_message ?? null);\n return {\n ...trace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(trace.stage_history) ? trace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nconst baseTrace = {\n run_id: runId,\n correlation_id: correlationId,\n workflow_name: 'ingest',\n project_slug: null,\n source_type: sourceType,\n filename: filename || null,\n filepath: filepath || null,\n stage: 'received',\n status: 'received',\n error_code: null,\n error_message: null,\n timestamp: new Date().toISOString(),\n workflow_chain: ['ingest'],\n stage_history: [],\n};\nconst buildError = (code, message) => [{\n json: {\n request_ok: false,\n success: false,\n filepath: filepath || null,\n filename: filename || null,\n modifiedEpoch: Number.isFinite(modifiedEpoch) ? modifiedEpoch : null,\n projectSlug: typeof rawProjectSlug === 'string' && rawProjectSlug.trim() ? rawProjectSlug.trim() : null,\n sourceType,\n error: { code, message, classification: 'validation', retryable: false },\n trace: withStage(baseTrace, 'validation', 'rejected', { error_code: code, error_message: message }),\n },\n}];\nif (!filepath) {\n return buildError('MISSING_FILEPATH', 'Missing filepath in webhook payload');\n}\nif (!filepath.startsWith('/')) {\n return buildError('INVALID_FILEPATH', 'filepath must be an absolute path');\n}\nif (!filename) {\n return buildError('MISSING_FILENAME', 'Missing filename in webhook payload');\n}\nif (!content.trim()) {\n return buildError('EMPTY_CONTENT', 'Missing or empty content in webhook payload');\n}\nif (payload.modifiedEpoch !== undefined && payload.modifiedEpoch !== null && !Number.isFinite(modifiedEpoch)) {\n return buildError('INVALID_MODIFIED_EPOCH', 'modifiedEpoch must be numeric when provided');\n}\nlet projectSlug = null;\nif (rawProjectSlug !== undefined && rawProjectSlug !== null) {\n if (typeof rawProjectSlug !== 'string') {\n return buildError('INVALID_PROJECT_SLUG', 'project_slug must be a string when provided');\n }\n const trimmed = rawProjectSlug.trim();\n if (trimmed && !slugPattern.test(trimmed)) {\n return buildError('INVALID_PROJECT_SLUG', 'project_slug must use lowercase slug characters only');\n }\n projectSlug = trimmed || null;\n}\nreturn [{\n json: {\n request_ok: true,\n filepath,\n filename,\n content,\n modifiedEpoch: Number.isFinite(modifiedEpoch) ? modifiedEpoch : null,\n projectSlug,\n sourceType,\n category: 'project_note',\n ingestedAt: new Date().toISOString(),\n content_hash: hashText(content.trim()),\n trace: withStage({ ...baseTrace, project_slug: projectSlug }, 'normalized', 'accepted', { project_slug: projectSlug }),\n },\n}];"
},
"id": "ae13317e-1315-41f0-b157-d63a32a70136",
"name": "Normalize Input",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
260,
0
]
},
{
"parameters": {
"jsCode": "const item = $input.first();\nconst data = item.json;\nconst chunkSize = 1200;\nconst overlap = 200;\nconst withStage = (trace, stage, status, extra = {}) => {\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (trace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (trace.error_message ?? null);\n return {\n ...trace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(trace.stage_history) ? trace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nconst normalize = (text) => text.replace(/\\r\\n/g, '\\n').replace(/\\t/g, ' ').trim();\nconst text = normalize(data.content || '');\nif (!text) {\n return [{ json: { ...data, chunking_ok: false, error: { code: 'EMPTY_CONTENT', message: 'Content is empty after normalization', classification: 'validation', retryable: false }, trace: withStage(data.trace, 'chunking_rejected', 'rejected', { error_code: 'EMPTY_CONTENT', error_message: 'Content is empty after normalization' }) } }];\n}\nconst chunks = [];\nlet start = 0;\nwhile (start < text.length) {\n let end = Math.min(start + chunkSize, text.length);\n if (end < text.length) {\n const windowStart = Math.max(start + Math.floor(chunkSize * 0.6), start);\n const boundarySlice = text.slice(windowStart, end);\n const breakCandidates = [\n boundarySlice.lastIndexOf('\\n\\n'),\n boundarySlice.lastIndexOf('\\n'),\n boundarySlice.lastIndexOf('. '),\n boundarySlice.lastIndexOf(' '),\n ].filter((idx) => idx > 0);\n if (breakCandidates.length > 0) {\n end = windowStart + Math.max(...breakCandidates) + 1;\n }\n }\n const chunkText = text.slice(start, end).trim();\n if (chunkText) {\n chunks.push(chunkText);\n }\n if (end >= text.length) break;\n start = Math.max(end - overlap, start + 1);\n}\nif (chunks.length === 0) {\n return [{ json: { ...data, chunking_ok: false, error: { code: 'ZERO_CHUNKS', message: 'Chunking produced zero chunks', classification: 'validation', retryable: false }, trace: withStage(data.trace, 'chunking_rejected', 'rejected', { error_code: 'ZERO_CHUNKS', error_message: 'Chunking produced zero chunks' }) } }];\n}\nif (chunks.length > 200) {\n return [{ json: { ...data, chunking_ok: false, error: { code: 'TOO_MANY_CHUNKS', message: 'Refusing to embed more than 200 chunks in one run', classification: 'validation', retryable: false }, trace: withStage(data.trace, 'chunking_rejected', 'rejected', { error_code: 'TOO_MANY_CHUNKS', error_message: 'Refusing to embed more than 200 chunks in one run' }) } }];\n}\nreturn [{\n json: {\n ...data,\n chunking_ok: true,\n totalChunks: chunks.length,\n chunks: chunks.map((chunkText, index) => ({\n chunkIndex: index + 1,\n totalChunks: chunks.length,\n title: data.filename + ' :: chunk ' + String(index + 1).padStart(2, '0'),\n content: chunkText,\n })),\n trace: withStage(data.trace, 'chunked', 'accepted'),\n },\n}];"
},
"id": "7f2b1aac-b39c-4579-bbdf-a152c0303fe3",
"name": "Chunk Content",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
520,
0
]
},
{
"parameters": {
"jsCode": "const item = $input.first();\nconst data = item.json;\nconst withStage = (trace, stage, status, extra = {}) => {\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (trace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (trace.error_message ?? null);\n return {\n ...trace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(trace.stage_history) ? trace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nconst { chunks, filepath, filename, modifiedEpoch, projectSlug, sourceType, category, ingestedAt, totalChunks, content_hash } = data;\nif (!Array.isArray(chunks) || chunks.length === 0) {\n return [{ json: { ...data, embedding_request_ok: false, error: { code: 'MISSING_CHUNKS', message: 'Missing chunks array', classification: 'validation', retryable: false }, trace: withStage(data.trace, 'embedding_request_rejected', 'rejected', { error_code: 'MISSING_CHUNKS', error_message: 'Missing chunks array' }) } }];\n}\nreturn [{\n json: {\n filepath,\n filename,\n modifiedEpoch,\n projectSlug,\n sourceType,\n category,\n ingestedAt,\n totalChunks,\n chunks,\n content_hash,\n inputs: chunks.map((chunk) => chunk.content),\n embedding_request_ok: true,\n trace: withStage(data.trace, 'embedding_request_ready', 'accepted'),\n },\n}];"
},
"id": "fb146680-c4b8-4d9e-a9ed-e136ef425e2f",
"name": "Build Embedding Batch Request",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
780,
0
]
},
{
"parameters": {
"method": "POST",
"url": "http://host.docker.internal:11434/api/embed",
"sendBody": true,
"specifyBody": "json",
"jsonBody": "={{ JSON.stringify({\n model: \"nomic-embed-text\",\n input: $json.inputs\n}) }}",
"options": {
"response": {
"response": {
"fullResponse": true,
"neverError": true,
"responseFormat": "json"
}
},
"timeout": 300000
}
},
"id": "24d95374-54a8-4bf9-a22f-928d6ad2e3a2",
"name": "Embed Chunks",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
1060,
0
],
"retryOnFail": true,
"maxTries": 3,
"waitBetweenTries": 2000
},
{
"parameters": {
"jsCode": "const base = $('Classify Existing Ingest State').first().json;\nconst response = $json;\nconst statusCode = response.statusCode ?? 0;\nconst body = response.body ?? {};\nconst embeddings = Array.isArray(body.embeddings)\n ? body.embeddings\n : (Array.isArray(body.embedding) ? [body.embedding] : null);\nconst trustedHistoryPrefixes = [\n '/home/node/.n8n-files/crispybrain/inbox/openbrain-history/',\n '/Users/elric/repos/crispybrain/inbox/openbrain-history/',\n];\nconst autoReviewHistoryPack = base.projectSlug === 'openbrain-history'\n && trustedHistoryPrefixes.some((prefix) => typeof base.filepath === 'string' && base.filepath.startsWith(prefix));\nconst withStage = (trace, stage, status, extra = {}) => {\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (trace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (trace.error_message ?? null);\n return {\n ...trace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(trace.stage_history) ? trace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nif (statusCode < 200 || statusCode >= 300 || !Array.isArray(embeddings) || embeddings.length === 0) {\n return [{ json: { ...base, insert_ready: false, error: { code: 'OLLAMA_EMBEDDING_FAILED', message: 'Ollama embedding request failed during ingest', classification: 'transient', retryable: true, status: statusCode || null, details: body.error ?? null }, trace: withStage(base.trace, 'embedding_failed', 'failed', { error_code: 'OLLAMA_EMBEDDING_FAILED', error_message: 'Ollama embedding request failed during ingest' }) } }];\n}\nif (embeddings.length !== base.chunks.length) {\n return [{ json: { ...base, insert_ready: false, error: { code: 'EMBEDDING_COUNT_MISMATCH', message: 'Embedding count mismatch between request and response', classification: 'external', retryable: false }, trace: withStage(base.trace, 'embedding_failed', 'failed', { error_code: 'EMBEDDING_COUNT_MISMATCH', error_message: 'Embedding count mismatch between request and response' }) } }];\n}\nif (embeddings.some((embedding) => !Array.isArray(embedding) || embedding.length === 0)) {\n return [{ json: { ...base, insert_ready: false, error: { code: 'INVALID_EMBEDDING_VECTOR', message: 'One or more embedding vectors were empty or malformed', classification: 'external', retryable: false }, trace: withStage(base.trace, 'embedding_failed', 'failed', { error_code: 'INVALID_EMBEDDING_VECTOR', error_message: 'One or more embedding vectors were empty or malformed' }) } }];\n}\nconst escapeSql = (value) => String(value ?? '').replace(/'/g, \"''\");\nconst values = base.chunks.map((chunk, index) => {\n const embedding = embeddings[index];\n const metadata = {\n filename: base.filename,\n filepath: base.filepath,\n modified_epoch: base.modifiedEpoch,\n ingested_at: base.ingestedAt,\n project_slug: base.projectSlug,\n chunk_index: chunk.chunkIndex,\n total_chunks: chunk.totalChunks,\n run_id: base.trace?.run_id ?? null,\n correlation_id: base.trace?.correlation_id ?? null,\n workflow_name: 'ingest',\n source_type: base.sourceType,\n content_hash: base.content_hash,\n review_status: autoReviewHistoryPack ? 'reviewed' : 'unreviewed',\n review_note: autoReviewHistoryPack ? 'Auto-reviewed trusted repo-controlled openbrain-history ingest.' : null,\n review_scope: autoReviewHistoryPack ? 'repo_controlled_history_pack' : null,\n reviewed_at: autoReviewHistoryPack ? base.ingestedAt : null,\n };\n const title = escapeSql(chunk.title);\n const content = escapeSql(chunk.content);\n const source = escapeSql(base.sourceType);\n const category = escapeSql(base.category);\n const metadataSql = \"'\" + escapeSql(JSON.stringify(metadata)) + \"'::jsonb\";\n const embeddingSql = \"'[\" + embedding.join(',') + \"]'::vector\";\n return \"('\" + source + \"', '\" + category + \"', '\" + title + \"', '\" + content + \"', ARRAY['file_ingest','chunked']::text[], \" + metadataSql + \", \" + embeddingSql + \")\";\n});\nconst query = \"INSERT INTO memories (source, category, title, content, tags, metadata_json, embedding) VALUES \" + values.join(', ') + \" RETURNING id, title, source, category, created_at::text AS created_at;\";\nreturn [{ json: { filepath: base.filepath, filename: base.filename, modifiedEpoch: base.modifiedEpoch, projectSlug: base.projectSlug, sourceType: base.sourceType, totalChunks: base.totalChunks, insert_ready: true, query, trace: withStage(base.trace, 'insert_prepared', 'accepted', { auto_review_history_pack: autoReviewHistoryPack }), content_hash: base.content_hash } }];"
},
"id": "807441bd-b827-4206-b1f9-73aca8586b08",
"name": "Prepare Insert Query",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1320,
0
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "={{$json.query}}",
"options": {}
},
"id": "08d4ebf5-a805-4031-a853-b5c78530dc6d",
"name": "Insert Memory Rows",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2.6,
"position": [
1580,
0
],
"credentials": {
"postgres": {
"name": "<your credential>"
}
}
},
{
"parameters": {
"jsCode": "const inserted = $input.all();\nconst base = $('Prepare Insert Query').first().json;\nconst withStage = (trace, stage, status, extra = {}) => {\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (trace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (trace.error_message ?? null);\n return {\n ...trace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(trace.stage_history) ? trace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nreturn [{\n json: {\n ok: true,\n success: true,\n status: 'succeeded',\n filepath: base.filepath,\n filename: base.filename,\n modifiedEpoch: base.modifiedEpoch,\n projectSlug: base.projectSlug,\n sourceType: base.sourceType,\n insertedCount: inserted.length,\n totalChunks: base.totalChunks,\n duplicate_detected: false,\n partial_ingest_detected: false,\n trace: withStage(base.trace, 'completed', 'succeeded'),\n insertedRows: inserted.map((item) => ({\n id: item.json.id,\n title: item.json.title,\n source: item.json.source,\n category: item.json.category,\n created_at: item.json.created_at,\n })),\n },\n}];"
},
"id": "c924a315-63d4-4719-b61a-4120e5f53c54",
"name": "Return Success Payload",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1840,
0
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "if-ingest-request-ok-condition",
"leftValue": "={{ $json.request_ok === true }}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "true",
"singleValue": true
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "if-ingest-request-ok",
"name": "Input Is Valid?",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
390,
0
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "if-ingest-chunking-ok-condition",
"leftValue": "={{ $json.chunking_ok === true }}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "true",
"singleValue": true
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "if-ingest-chunking-ok",
"name": "Chunking Ready?",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
650,
0
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "if-ingest-embedding-request-ok-condition",
"leftValue": "={{ $json.embedding_request_ok === true }}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "true",
"singleValue": true
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "if-ingest-embedding-request-ok",
"name": "Embedding Request Ready?",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
910,
0
]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT\n COUNT(*)::int AS existing_chunk_count,\n COALESCE(\n jsonb_agg(DISTINCT NULLIF(metadata_json->>'run_id', '')) FILTER (WHERE COALESCE(metadata_json->>'run_id', '') <> ''),\n '[]'::jsonb\n ) AS existing_run_ids\nFROM memories\nWHERE COALESCE(metadata_json->>'filepath', '') = $1::text\n AND (\n ($2::text <> '' AND COALESCE(metadata_json->>'modified_epoch', '') = $2::text)\n OR ($3::text <> '' AND COALESCE(metadata_json->>'content_hash', '') = $3::text)\n );",
"options": {
"queryReplacement": "={{ [$json.filepath, $json.modifiedEpoch ? String($json.modifiedEpoch) : '', $json.content_hash || ''] }}"
}
},
"id": "postgres-detect-existing-ingest-state",
"name": "Detect Existing Ingest State",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2.6,
"position": [
1160,
0
],
"credentials": {
"postgres": {
"name": "<your credential>"
}
}
},
{
"parameters": {
"jsCode": "const base = $('Build Embedding Batch Request').first().json;\nconst row = ($input.all()[0] && $input.all()[0].json) ? $input.all()[0].json : {};\nconst existingChunkCount = Number(row.existing_chunk_count ?? 0);\nconst existingRunIds = Array.isArray(row.existing_run_ids) ? row.existing_run_ids : [];\nconst withStage = (trace, stage, status, extra = {}) => {\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (trace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (trace.error_message ?? null);\n return {\n ...trace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(trace.stage_history) ? trace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nif (existingChunkCount === 0) {\n return [{ json: { ...base, ingest_ready: true, existing_chunk_count: 0, existing_run_ids: [], trace: withStage(base.trace, 'duplicate_check', 'accepted') } }];\n}\nif (existingChunkCount >= Number(base.totalChunks ?? 0)) {\n return [{ json: { ...base, ingest_ready: false, existing_chunk_count: existingChunkCount, existing_run_ids: existingRunIds, error: { code: 'DUPLICATE_INGEST', message: 'A matching ingest already exists for this filepath and content fingerprint', classification: 'duplicate', retryable: false }, trace: withStage(base.trace, 'duplicate_detected', 'rejected', { error_code: 'DUPLICATE_INGEST', error_message: 'A matching ingest already exists for this filepath and content fingerprint' }) } }];\n}\nreturn [{ json: { ...base, ingest_ready: false, existing_chunk_count: existingChunkCount, existing_run_ids: existingRunIds, error: { code: 'PARTIAL_INGEST_DETECTED', message: 'A partial ingest was detected for this filepath and content fingerprint', classification: 'state_conflict', retryable: false }, trace: withStage(base.trace, 'partial_ingest_detected', 'failed', { error_code: 'PARTIAL_INGEST_DETECTED', error_message: 'A partial ingest was detected for this filepath and content fingerprint' }) } }];"
},
"id": "code-classify-existing-ingest-state",
"name": "Classify Existing Ingest State",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
1420,
0
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "if-ingest-state-acceptable-condition",
"leftValue": "={{ $json.ingest_ready === true }}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "true",
"singleValue": true
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "if-ingest-state-acceptable",
"name": "Existing State Acceptable?",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
1680,
0
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict",
"version": 2
},
"conditions": [
{
"id": "if-ingest-insert-ready-condition",
"leftValue": "={{ $json.insert_ready === true }}",
"rightValue": true,
"operator": {
"type": "boolean",
"operation": "true",
"singleValue": true
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "if-ingest-insert-ready",
"name": "Insert Ready?",
"type": "n8n-nodes-base.if",
"typeVersion": 2.2,
"position": [
2460,
0
]
},
{
"parameters": {
"jsCode": "const error = $json.error ?? {\n code: 'UNKNOWN_FAILURE',\n message: 'Workflow failed without a structured error payload',\n classification: 'unknown',\n retryable: false,\n};\nconst trace = $json.trace ?? null;\nconst withStage = (existingTrace, stage, status, extra = {}) => {\n if (!existingTrace) {\n return null;\n }\n const timestamp = new Date().toISOString();\n const errorCode = Object.prototype.hasOwnProperty.call(extra, 'error_code') ? extra.error_code : (existingTrace.error_code ?? null);\n const errorMessage = Object.prototype.hasOwnProperty.call(extra, 'error_message') ? extra.error_message : (existingTrace.error_message ?? null);\n return {\n ...existingTrace,\n ...extra,\n stage,\n status,\n error_code: errorCode,\n error_message: errorMessage,\n timestamp,\n stage_history: [\n ...(Array.isArray(existingTrace.stage_history) ? existingTrace.stage_history : []),\n { stage, status, timestamp, error_code: errorCode, error_message: errorMessage },\n ],\n };\n};\nreturn [{\n json: {\n ok: false,\n success: false,\n status: trace?.status ?? 'failed',\n filepath: $json.filepath ?? null,\n filename: $json.filename ?? null,\n modifiedEpoch: $json.modifiedEpoch ?? null,\n projectSlug: $json.projectSlug ?? null,\n sourceType: $json.sourceType ?? null,\n totalChunks: $json.totalChunks ?? 0,\n duplicate_detected: error.code === 'DUPLICATE_INGEST',\n partial_ingest_detected: error.code === 'PARTIAL_INGEST_DETECTED',\n existing_chunk_count: $json.existing_chunk_count ?? 0,\n existing_run_ids: $json.existing_run_ids ?? [],\n error,\n trace: withStage(trace, trace?.stage ?? 'failed', trace?.status ?? 'failed', { error_code: error.code, error_message: error.message }),\n },\n}];"
},
"id": "code-return-ingest-failure",
"name": "Return Failure Payload",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [
2720,
220
]
}
],
"connections": {
"Webhook": {
"main": [
[
{
"node": "Normalize Input",
"type": "main",
"index": 0
}
]
]
},
"Normalize Input": {
"main": [
[
{
"node": "Input Is Valid?",
"type": "main",
"index": 0
}
]
]
},
"Input Is Valid?": {
"main": [
[
{
"node": "Chunk Content",
"type": "main",
"index": 0
}
],
[
{
"node": "Return Failure Payload",
"type": "main",
"index": 0
}
]
]
},
"Chunk Content": {
"main": [
[
{
"node": "Chunking Ready?",
"type": "main",
"index": 0
}
]
]
},
"Chunking Ready?": {
"main": [
[
{
"node": "Build Embedding Batch Request",
"type": "main",
"index": 0
}
],
[
{
"node": "Return Failure Payload",
"type": "main",
"index": 0
}
]
]
},
"Build Embedding Batch Request": {
"main": [
[
{
"node": "Embedding Request Ready?",
"type": "main",
"index": 0
}
]
]
},
"Embedding Request Ready?": {
"main": [
[
{
"node": "Detect Existing Ingest State",
"type": "main",
"index": 0
}
],
[
{
"node": "Return Failure Payload",
"type": "main",
"index": 0
}
]
]
},
"Detect Existing Ingest State": {
"main": [
[
{
"node": "Classify Existing Ingest State",
"type": "main",
"index": 0
}
]
]
},
"Classify Existing Ingest State": {
"main": [
[
{
"node": "Existing State Acceptable?",
"type": "main",
"index": 0
}
]
]
},
"Existing State Acceptable?": {
"main": [
[
{
"node": "Embed Chunks",
"type": "main",
"index": 0
}
],
[
{
"node": "Return Failure Payload",
"type": "main",
"index": 0
}
]
]
},
"Embed Chunks": {
"main": [
[
{
"node": "Prepare Insert Query",
"type": "main",
"index": 0
}
]
]
},
"Prepare Insert Query": {
"main": [
[
{
"node": "Insert Ready?",
"type": "main",
"index": 0
}
]
]
},
"Insert Ready?": {
"main": [
[
{
"node": "Insert Memory Rows",
"type": "main",
"index": 0
}
],
[
{
"node": "Return Failure Payload",
"type": "main",
"index": 0
}
]
]
},
"Insert Memory Rows": {
"main": [
[
{
"node": "Return Success Payload",
"type": "main",
"index": 0
}
]
]
}
},
"settings": {},
"staticData": null,
"tags": [],
"active": false,
"versionId": "00000000-0000-0000-0000-000000000002",
"meta": {
"templateCredsSetupCompleted": false
},
"id": "ingest"
}
Credentials you'll need
Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.
postgres
For the full experience including quality scoring and batch install features for each workflow upgrade to Pro
About this workflow
ingest. Uses httpRequest, postgres. Webhook trigger; 16 nodes.
Source: https://github.com/Crispy-Biscuits-AI/crispybrain/blob/f6bfa58df50e78c236ebde113422cc01522b88c7/workflows/ingest.json — original creator credit. Request a take-down →
Related workflows
Workflows that share integrations, category, or trigger type with this one. All free to copy and import.
Scraping. Uses httpRequest, postgres, @apify/n8n-nodes-apify, respondToWebhook. Webhook trigger; 61 nodes.
Workflow B — AI Listing Engine. Uses httpRequest, postgres, errorTrigger. Webhook trigger; 47 nodes.
Fluxo de voluntárias ZendeskXANXBD. Uses functionItem, zendesk, httpRequest, postgres. Webhook trigger; 25 nodes.
Fluxo de voluntárias ZendeskXANXBD. Uses functionItem, zendesk, httpRequest, postgres. Webhook trigger; 25 nodes.
Fluxo de voluntárias ZendeskXANXBD. Uses functionItem, zendesk, httpRequest, postgres. Webhook trigger; 25 nodes.