This workflow corresponds to n8n.io template #14997 — we link there as the canonical source.
This workflow follows the Execute Workflow Trigger → HTTP Request recipe pattern — see all workflows that pair these two integrations.
The workflow JSON
Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →
{
"meta": {
"templateCredsSetupCompleted": true
},
"nodes": [
{
"id": "5d3c7648-aec9-487d-874e-76270c64ea84",
"name": "Sticky Note",
"type": "n8n-nodes-base.stickyNote",
"position": [
1200,
-1488
],
"parameters": {
"width": 480,
"height": 896,
"content": "## SpendBoss\n\n### How it works\n\n1. Receives email attachments via a webhook for processing.\n2. Verifies the type of document and evaluates the next steps.\n3. Routes the document for parsing and storing the results.\n4. Handles successful completion and logging of operations.\n5. Includes error handling and waits before retrying failed tasks.\n\n### Setup steps\n\n- [ ] Configure AgentMail Webhook URL.\n- [ ] Set up access to JigsawStack API for document checking.\n- [ ] Ensure Postgres database credentials are provided for data storage.\n- [ ] Make sure you have the necessary permissions for the sub-workflow 'parseInvoice'.\n- [ ] Set appropriate batch sizes and wait durations.\n\n### Customization\n\nBatch sizes and waiting times can be adjusted to optimize processing speed and resource utilization."
},
"typeVersion": 1
},
{
"id": "58a39e77-7c38-4fe1-bc26-4904675e28f9",
"name": "Sticky Note1",
"type": "n8n-nodes-base.stickyNote",
"position": [
1760,
-1120
],
"parameters": {
"color": 7,
"width": 416,
"height": 304,
"content": "## Initial email handling\n\nStarts with an AgentMail webhook and checks attachments."
},
"typeVersion": 1
},
{
"id": "d69a4445-b77d-477d-92fb-d8954d54f9c3",
"name": "Sticky Note2",
"type": "n8n-nodes-base.stickyNote",
"position": [
2208,
-1104
],
"parameters": {
"color": 7,
"width": 640,
"height": 272,
"content": "## Pipeline job acquisition\n\nHandles acquiring and verifying pipeline jobs from the attachment."
},
"typeVersion": 1
},
{
"id": "4c2a404b-89e0-4ff0-b666-ee642e020d7f",
"name": "Sticky Note3",
"type": "n8n-nodes-base.stickyNote",
"position": [
2880,
-1216
],
"parameters": {
"color": 7,
"width": 416,
"height": 640,
"content": "## Job processing decision\n\nSplit items based on acquisition results and prepare skipped logs."
},
"typeVersion": 1
},
{
"id": "ea3da675-fd59-428e-929e-443cde7e8a4d",
"name": "Sticky Note4",
"type": "n8n-nodes-base.stickyNote",
"position": [
3328,
-1264
],
"parameters": {
"color": 7,
"width": 864,
"height": 272,
"content": "## Document verification and processing\n\nChecks if the document is an invoice and evaluates the next steps."
},
"typeVersion": 1
},
{
"id": "fa1ad5f3-1e77-4615-aa5f-416b06873196",
"name": "Sticky Note5",
"type": "n8n-nodes-base.stickyNote",
"position": [
4224,
-1280
],
"parameters": {
"color": 7,
"width": 416,
"height": 304,
"content": "## Invoice parsing and database operation\n\nParses invoices and upserts data into the database."
},
"typeVersion": 1
},
{
"id": "145616ee-52d1-4d45-bc9a-4b62fd2e6a8f",
"name": "Sticky Note6",
"type": "n8n-nodes-base.stickyNote",
"position": [
5120,
-1184
],
"parameters": {
"color": 7,
"width": 416,
"height": 304,
"content": "## SQL error handling\n\nHandles SQL errors and sets events to wait before retries."
},
"typeVersion": 1
},
{
"id": "1ae9bb7c-7436-497c-9215-0e115ea5e8cc",
"name": "Sticky Note7",
"type": "n8n-nodes-base.stickyNote",
"position": [
4672,
-1488
],
"parameters": {
"color": 7,
"width": 864,
"height": 272,
"content": "## Completion and logging\n\nCompletes the job, logs success, and waits if needed."
},
"typeVersion": 1
},
{
"id": "b954f89d-e1e5-4b1c-8354-b44f91e08eda",
"name": "Post to JigsawStack API",
"type": "n8n-nodes-base.httpRequest",
"position": [
3600,
-1152
],
"parameters": {
"url": "https://api.jigsawstack.com/v1/vocr",
"method": "POST",
"options": {},
"jsonBody": "={\n \"url\": \"{{ $json.download_url}}\",\n \"prompt\": {\n \"isInvoice\": \"Is this document an invoice or receipt? Answer only 'true' or 'false'\",\n \"isPaid\": \"Does this document indicate it has already been paid? Answer only 'true' or 'false'\"\n }\n}",
"sendBody": true,
"specifyBody": "json",
"authentication": "predefinedCredentialType",
"nodeCredentialType": "jigsawStackApi"
},
"credentials": {
"jigsawStackApi": {
"name": "<your credential>"
}
},
"typeVersion": 4.4
},
{
"id": "b04697bd-e97a-44bd-949c-3be3e5c64c3a",
"name": "Route by Invoice Type",
"type": "n8n-nodes-base.switch",
"position": [
4048,
-1152
],
"parameters": {
"rules": {
"values": [
{
"outputKey": "proceedExtract",
"conditions": {
"options": {
"version": 3,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "09993f24-5c63-485f-947f-13be9720e8f2",
"operator": {
"type": "boolean",
"operation": "equals"
},
"leftValue": "={{ $json.shouldProceed }}",
"rightValue": true
}
]
},
"renameOutput": true
},
{
"outputKey": "drop",
"conditions": {
"options": {
"version": 3,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "d55dce85-70e0-45a2-996a-ba420e362e55",
"operator": {
"type": "boolean",
"operation": "equals"
},
"leftValue": "={{ $json.shouldProceed }}",
"rightValue": false
}
]
},
"renameOutput": true
}
]
},
"options": {}
},
"typeVersion": 3.4
},
{
"id": "693ca664-21c5-4dc7-9e52-ccfb035490c1",
"name": "Upsert Invoice Record",
"type": "n8n-nodes-base.postgres",
"onError": "continueErrorOutput",
"position": [
4496,
-1152
],
"parameters": {
"query": "SELECT process_full_invoice($1::jsonb)\n",
"options": {
"queryReplacement": "={{ JSON.stringify($json) }}"
},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"notesInFlow": true,
"typeVersion": 2.6
},
{
"id": "04fdfae2-9bd7-4033-a2dd-55a0e1e94c71",
"name": "AgentMail Webhook Trigger",
"type": "n8n-nodes-base.webhook",
"position": [
1808,
-992
],
"parameters": {
"path": "spendbase-inbound",
"options": {},
"httpMethod": "POST"
},
"typeVersion": 2
},
{
"id": "46665311-dcac-446d-a234-07bfba21a52e",
"name": "Fetch Download URL",
"type": "n8n-nodes-base.httpRequest",
"onError": "continueErrorOutput",
"position": [
3376,
-1152
],
"parameters": {
"url": "=https://api.agentmail.to/v0/threads/{{ $json.thread_id }}/attachments/{{ $json.attachment_id }}",
"options": {
"response": {
"response": {
"responseFormat": "json"
}
}
},
"authentication": "genericCredentialType",
"genericAuthType": "httpBearerAuth"
},
"credentials": {
"httpBearerAuth": {
"name": "<your credential>"
},
"httpHeaderAuth": {
"name": "<your credential>"
}
},
"retryOnFail": true,
"typeVersion": 4.2
},
{
"id": "e5043788-7a90-42f8-b10f-dcfe8cc04022",
"name": "Acquire Pipeline Job",
"type": "n8n-nodes-base.postgres",
"position": [
2256,
-992
],
"parameters": {
"query": "SELECT public.acquire_pipeline_job(\n ($1::jsonb->>'job_key'),\n ($1::jsonb->>'attachment_id'),\n ($1::jsonb->'payload'),\n ($1::jsonb->>'workflow_name'),\n ($1::jsonb->>'stage')\n)",
"options": {
"queryReplacement": "={{ JSON.stringify({ job_key: \"attachment:\" + ($json.attachment_id || \"unknown\"), attachment_id: $json.attachment_id || null, payload: $json, workflow_name: \"SpendBase Inbound\", stage: \"attachment_received\" }) }}"
},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.6
},
{
"id": "8ed2b889-a5dc-4005-85b1-9c67c8ef0288",
"name": "Parse Acquire Result",
"type": "n8n-nodes-base.code",
"position": [
2480,
-992
],
"parameters": {
"jsCode": "return $input.all().map(item => {\n const raw = item.json || {};\n\n let result = raw.acquire_pipeline_job || raw;\n if (typeof result === 'string') {\n try { result = JSON.parse(result); } catch(e) { result = {}; }\n }\n\n let original = {};\n try {\n original = $('Extract Attachment Payload').item.json;\n } catch(e) {}\n\n return {\n json: {\n ...original,\n acquired: result.acquired === true,\n job_id: result.job_id || null,\n job_key: result.job_key || ('attachment:' + (original.attachment_id || 'unknown')),\n acquire_reason: result.reason || null,\n acquire_status: result.status || null,\n acquire_attempt_count: result.attempt_count || null\n }\n };\n});"
},
"typeVersion": 2
},
{
"id": "8263119a-020c-46c1-8ae1-a8dd9976ca4a",
"name": "If Job Acquired",
"type": "n8n-nodes-base.if",
"position": [
2704,
-992
],
"parameters": {
"options": {},
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "loose"
},
"combinator": "and",
"conditions": [
{
"id": "a0000003-cond-0000-0000-000000000000",
"operator": {
"type": "boolean",
"operation": "true"
},
"leftValue": "={{ $json.acquired }}",
"rightValue": ""
}
]
},
"looseTypeValidation": true
},
"typeVersion": 2.2
},
{
"id": "389ef3fa-2450-4931-95c6-77f02d21d704",
"name": "Prepare Skipped Log",
"type": "n8n-nodes-base.code",
"position": [
2928,
-752
],
"parameters": {
"jsCode": "return $input.all().map(item => {\n const data = item.json || {};\n let executionId = 'manual-run';\n try { executionId = String($execution.id); } catch(e) {}\n\n return {\n json: {\n source: 'n8n',\n workflowName: 'SpendBase Inbound',\n nodeName: 'Job Acquired Gate',\n executionId,\n fingerprint: data.fingerprint || null,\n attachmentId: data.attachment_id || null,\n stage: 'job_skipped_duplicate',\n status: 'info',\n message: 'Job already exists: ' + (data.acquire_reason || 'unknown') + ' (status: ' + (data.acquire_status || 'unknown') + ')',\n payload: {},\n context: {\n job_key: data.job_key || null,\n existing_status: data.acquire_status || null,\n reason: data.acquire_reason || null,\n attempt_count: data.acquire_attempt_count || null,\n attachment_id: data.attachment_id || null,\n filename: data.filename || null\n }\n }\n };\n});"
},
"typeVersion": 2
},
{
"id": "fcac62b4-ef8f-46ad-9ba8-2723ffb6a709",
"name": "Log Skipped Event",
"type": "n8n-nodes-base.postgres",
"position": [
3152,
-752
],
"parameters": {
"query": "SELECT public.log_pipeline_event($1::jsonb)",
"options": {
"queryReplacement": "={{ JSON.stringify($json) }}"
},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.6
},
{
"id": "ac4020ee-1ab9-4efd-8de2-6dde94e49a89",
"name": "Handle SQL Error Details",
"type": "n8n-nodes-base.code",
"position": [
5152,
-1056
],
"parameters": {
"jsCode": "return $input.all().map(item => {\n const errorObj = item.json || {};\n const errorMsg = errorObj.message || errorObj.error?.description || \"Unknown Database Error\";\n let severity = \"critical\";\n let promptContext = \"\";\n\n if (errorMsg.includes(\"Duplicate document\") || errorMsg.includes(\"unique constraint\")) {\n severity = \"info\";\n promptContext = \"Idempotency Guard: This exact file was already processed.\";\n } else if (errorMsg.includes(\"Business Duplicate\")) {\n severity = \"info\";\n promptContext = \"Business Guard: Invoice number exists for this vendor.\";\n } else if (errorMsg.includes(\"No fingerprint provided\")) {\n severity = \"warning\";\n promptContext = \"The workflow failed to generate a fingerprint hash.\";\n } else {\n severity = \"critical\";\n promptContext = \"A severe database error occurred.\";\n }\n\n return {\n json: {\n severity,\n raw_error: errorMsg,\n context_for_llm: promptContext,\n timestamp: new Date().toISOString()\n }\n };\n});"
},
"typeVersion": 2
},
{
"id": "e814cafb-ef5b-47ed-84ac-519539d61441",
"name": "Extract Attachment Payload",
"type": "n8n-nodes-base.code",
"position": [
2032,
-992
],
"parameters": {
"jsCode": "// 1) Safely extract payload pieces\nconst input = $input.first().json;\nconst body = input.body || input;\nconst message = body.message || {};\nconst thread = body.thread || {};\n\nconst attachments = Array.isArray(message.attachments) ? message.attachments : [];\nconst messageLabels = Array.isArray(message.labels) ? message.labels : [];\nconst threadLabels = Array.isArray(thread.labels) ? thread.labels : [];\n\nconst allLabels = [...new Set([...messageLabels, ...threadLabels])];\n\nif (attachments.length === 0) {\n return [];\n}\n\nreturn attachments.map((att, index) => {\n const filename = att.filename || '';\n const contentType = att.content_type || '';\n const isPdf =\n contentType.toLowerCase() === 'application/pdf' ||\n contentType.toLowerCase().includes('pdf') ||\n filename.toLowerCase().endsWith('.pdf');\n\n return {\n json: {\n inbox_id: message.inbox_id || null,\n message_id: message.message_id || null,\n thread_id: message.thread_id || thread.thread_id || null,\n sender: message.from || message.from_ || null,\n subject: message.subject || null,\n preview: message.preview || null,\n created_at: message.created_at || null,\n message_labels: messageLabels,\n thread_labels: threadLabels,\n all_labels: allLabels,\n attachment_id: att.attachment_id || null,\n filename,\n content_type: contentType,\n size: att.size || null,\n content_disposition: att.content_disposition || null,\n content_id: att.content_id || null,\n is_pdf: isPdf,\n attachment_count: attachments.length,\n has_multiple_attachments: attachments.length > 1,\n attachment_index: index,\n attachment_number: index + 1,\n fingerprint: `${message.message_id || 'no_message_id'}_${att.attachment_id || index}`\n }\n };\n});"
},
"typeVersion": 2
},
{
"id": "c456f930-3c83-49ad-8241-0e3262d77422",
"name": "Prepare Complete Job",
"type": "n8n-nodes-base.code",
"position": [
4720,
-1376
],
"parameters": {
"jsCode": "return $input.all().map(item => {\n const dbRow = item.json || {};\n let dbResult = dbRow.process_full_invoice || dbRow;\n\n if (typeof dbResult === \"string\") {\n try {\n dbResult = JSON.parse(dbResult);\n } catch (e) {\n dbResult = {};\n }\n }\n\n let attachmentId = null;\n let fingerprint = null;\n\n try {\n attachmentId = $('Extract Attachment Payload').first().json.attachment_id;\n } catch (e) {}\n\n try {\n fingerprint = $(\"Prepare Database Data\").first().json.fingerprint;\n } catch (e) {}\n\n if (!fingerprint) {\n try {\n fingerprint = $('Extract Attachment Payload').first().json.fingerprint;\n } catch (e) {}\n }\n\n const jobKey = \"attachment:\" + (attachmentId || \"unknown\");\n\n return {\n json: {\n job_key: jobKey,\n fingerprint: fingerprint || dbResult.fingerprint || null,\n result: dbResult,\n stage: \"db_upsert_succeeded\"\n }\n };\n});"
},
"typeVersion": 2
},
{
"id": "bd51a839-9e07-40bb-8aed-51457f84fa4d",
"name": "Complete Pipeline Job",
"type": "n8n-nodes-base.postgres",
"position": [
4944,
-1376
],
"parameters": {
"query": "SELECT public.complete_pipeline_job(\n ($1::jsonb->>'job_key'),\n ($1::jsonb->>'fingerprint'),\n ($1::jsonb->'result'),\n ($1::jsonb->>'stage')\n)",
"options": {
"queryReplacement": "={{ JSON.stringify($json) }}"
},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.6
},
{
"id": "f405272e-93db-4516-98cd-b6d652d6bc8d",
"name": "Prepare Success Log",
"type": "n8n-nodes-base.code",
"position": [
5168,
-1376
],
"parameters": {
"jsCode": "return $input.all().map(item => {\n const data = item.json || {};\n let executionId = \"manual-run\";\n try {\n executionId = String($execution.id);\n } catch (e) {}\n\n let attachmentId = null;\n let preparedPayload = {};\n let completeResult = data.complete_pipeline_job || data;\n\n if (typeof completeResult === \"string\") {\n try {\n completeResult = JSON.parse(completeResult);\n } catch (e) {\n completeResult = {};\n }\n }\n\n try {\n attachmentId = $('Extract Attachment Payload').first().json.attachment_id;\n } catch (e) {}\n\n try {\n preparedPayload = $(\"Prepare Database Data\").first().json;\n } catch (e) {}\n\n return {\n json: {\n source: \"n8n\",\n workflowName: \"SpendBase Inbound\",\n nodeName: \"\u2705 Complete Pipeline Job\",\n executionId,\n attachmentId,\n fingerprint: preparedPayload.fingerprint || null,\n stage: \"db_upsert_succeeded\",\n status: \"success\",\n message: \"Invoice processed successfully\",\n payload: {\n invoiceNumber: preparedPayload.invoiceNumber || null,\n vendorName: preparedPayload.vendorName || null,\n vendorDomain: preparedPayload.vendorDomain || null,\n totalAmount: preparedPayload.totalAmount || null,\n currency: preparedPayload.currency || null\n },\n context: {\n job_id: completeResult.job_id || null,\n job_key: completeResult.job_key || null,\n final_status: completeResult.final_status || null\n }\n }\n };\n});"
},
"typeVersion": 2
},
{
"id": "dc5062a6-a6f0-46db-ac25-e0d4fb7577f9",
"name": "Log Success Event",
"type": "n8n-nodes-base.postgres",
"position": [
5392,
-1376
],
"parameters": {
"query": "SELECT public.log_pipeline_event($1::jsonb)",
"options": {
"queryReplacement": "={{ JSON.stringify($json) }}"
},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.6
},
{
"id": "bb13b686-9f86-4896-8340-8c10c2c13a12",
"name": "Execute Parse Invoice Workflow",
"type": "n8n-nodes-base.executeWorkflow",
"position": [
4272,
-1152
],
"parameters": {
"options": {
"waitForSubWorkflow": true
},
"workflowId": {
"__rl": true,
"mode": "list",
"value": "NKxV09Ds6f7jAswa",
"cachedResultUrl": "/workflow/NKxV09Ds6f7jAswa",
"cachedResultName": "parseInvoice"
},
"workflowInputs": {
"value": {
"isPaid": "={{ $json.isPaid }}",
"job_id": "={{ $json.job_id }}",
"sender": "={{ $json.sender }}",
"job_key": "={{ $json.job_key }}",
"subject": "={{ $json.subject }}",
"acquired": "={{ $json.acquired }}",
"filename": "={{ $json.filename }}",
"fullText": "={{ $json.fullText }}",
"inbox_id": "={{ $json.inbox_id }}",
"sections": "={{ $json.sections }}",
"isInvoice": "={{ $json.isInvoice }}",
"thread_id": "={{ $json.thread_id }}",
"all_labels": "={{ $json.all_labels }}",
"message_id": "={{ $json.message_id }}",
"fingerprint": "={{ $json.fingerprint }}",
"summaryText": "={{ $json.summaryText }}",
"content_type": "={{ $json.content_type }}",
"download_url": "={{ $json.download_url }}",
"attachment_id": "={{ $json.attachment_id }}",
"shouldProceed": "={{ $json.shouldProceed }}",
"_originalLogId": "={{ $json._originalLogId }}",
"acquire_status": "={{ $json.acquire_status }}",
"confidenceScore": "={{ $json.confidenceScore }}",
"isLowConfidence": "={{ $json.isLowConfidence }}"
},
"schema": [
{
"id": "job_id",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "job_id",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "job_key",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "job_key",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "acquired",
"type": "boolean",
"display": true,
"removed": false,
"required": false,
"displayName": "acquired",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "acquire_status",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "acquire_status",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "attachment_id",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "attachment_id",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "filename",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "filename",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "content_type",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "content_type",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "message_id",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "message_id",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "thread_id",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "thread_id",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "inbox_id",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "inbox_id",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "sender",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "sender",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "subject",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "subject",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "fingerprint",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "fingerprint",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "download_url",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "download_url",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "all_labels",
"type": "array",
"display": true,
"removed": false,
"required": false,
"displayName": "all_labels",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "shouldProceed",
"type": "boolean",
"display": true,
"removed": false,
"required": false,
"displayName": "shouldProceed",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "isInvoice",
"type": "boolean",
"display": true,
"removed": false,
"required": false,
"displayName": "isInvoice",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "isPaid",
"type": "boolean",
"display": true,
"removed": false,
"required": false,
"displayName": "isPaid",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "confidenceScore",
"type": "number",
"display": true,
"removed": false,
"required": false,
"displayName": "confidenceScore",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "isLowConfidence",
"type": "boolean",
"display": true,
"removed": false,
"required": false,
"displayName": "isLowConfidence",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "summaryText",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "summaryText",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "fullText",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "fullText",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "sections",
"type": "array",
"display": true,
"removed": false,
"required": false,
"displayName": "sections",
"defaultMatch": false,
"canBeUsedToMatch": true
},
{
"id": "_originalLogId",
"type": "string",
"display": true,
"removed": false,
"required": false,
"displayName": "_originalLogId",
"defaultMatch": false,
"canBeUsedToMatch": true
}
],
"mappingMode": "defineBelow",
"matchingColumns": [],
"attemptToConvertTypes": false,
"convertFieldsToString": true
}
},
"typeVersion": 1.3
},
{
"id": "b84d5d37-82aa-427d-b6ad-ae404b36e26d",
"name": "Evaluate Processing Steps",
"type": "n8n-nodes-base.code",
"position": [
3824,
-1152
],
"parameters": {
"jsCode": "// 1. Pull absolute arrays from upstream nodes to bypass broken item-linking\nconst allAcquireResults = $('Parse Acquire Result').all();\n\n// 2. Safely pull current item lineages where available\nlet guardData = {};\nlet downloadData = {};\ntry { guardData = $('Extract Attachment Payload').item.json; } catch(e) {}\ntry { downloadData = $('Fetch Download URL').item.json; } catch(e) {}\n\nconst jigsawData = $input.item.json || {};\n\n// 3. Find the specific Acquire Result for THIS exact attachment\nconst myAcquireResult = allAcquireResults.find(item => \n item.json.attachment_id === guardData.attachment_id\n)?.json || {};\n\n// 4. Extract Text and Stats from Jigsaw\nconst cleanSections = (jigsawData.sections || []).map(section => ({\n text: section.text,\n lines: (section.lines || []).map(line => typeof line === 'object' ? (line.text || '') : line).filter(Boolean)\n}));\nconst fullText = cleanSections.map(s => s.text).join('\\n');\nconst confidence = jigsawData.sections?.[0]?.lines?.[0]?.average_confidence || 0;\n\n// 5. Evaluate Booleans\nconst isInvoiceFlag = jigsawData.context?.isInvoice?.[0] === 'true';\nconst isPaidFlag = jigsawData.context?.isPaid?.[0] === 'true';\n\nconst textLower = fullText.toLowerCase();\nconst hasInvoiceKeywords = textLower.includes('invoice') || textLower.includes('tax invoice') || textLower.includes('bill to');\n\n// Fallback URL extraction depending on HTTP node output shape\nconst downloadUrl = downloadData.download_url || downloadData.url || downloadData.body?.download_url || '';\n\n// 6. Return Unified Production Payload\nreturn {\n json: {\n // --- Routing & Decision Logic ---\n shouldProceed: (isInvoiceFlag || hasInvoiceKeywords) && confidence > 0.6,\n isInvoice: isInvoiceFlag || hasInvoiceKeywords,\n isPaid: isPaidFlag,\n confidenceScore: parseFloat(confidence.toFixed(4)),\n isLowConfidence: confidence < 0.85,\n\n // --- Task & Job Management (FIXED) ---\n acquired: myAcquireResult.acquired === true,\n job_id: myAcquireResult.job_id || null,\n job_key: myAcquireResult.job_key || null,\n acquire_status: myAcquireResult.acquire_status || null,\n acquire_reason: myAcquireResult.acquire_reason || null,\n acquire_attempt_count: myAcquireResult.acquire_attempt_count || null,\n\n // --- Metadata & Identity ---\n ...guardData, \n download_url: downloadUrl,\n summaryText: fullText.substring(0, 500),\n fullText: fullText.trim(),\n sections: cleanSections,\n _originalLogId: jigsawData.log_id || guardData.fingerprint || null\n }\n};"
},
"typeVersion": 2
},
{
"id": "49927508-dfe5-4cf2-8a7c-bb721d2ddb05",
"name": "Loop Over Batch Items",
"type": "n8n-nodes-base.splitInBatches",
"position": [
2928,
-1088
],
"parameters": {
"options": {}
},
"typeVersion": 3
},
{
"id": "0bd7765c-eebf-46ca-ad28-35745327b172",
"name": "Wait for Event Triggered",
"type": "n8n-nodes-base.wait",
"position": [
5376,
-1056
],
"parameters": {},
"typeVersion": 1.1
},
{
"id": "35265376-3f18-4549-8a43-b72ca813e953",
"name": "Manual Setup Trigger",
"type": "n8n-nodes-base.manualTrigger",
"position": [
2432,
-1920
],
"parameters": {},
"typeVersion": 1
},
{
"id": "b7003cd2-78e9-4189-9560-18505c927c06",
"name": "Configure Extensions & Triggers",
"type": "n8n-nodes-base.postgres",
"position": [
2688,
-1920
],
"parameters": {
"query": "CREATE EXTENSION IF NOT EXISTS pgcrypto;\n\nCREATE OR REPLACE FUNCTION public.set_updated_at()\nRETURNS trigger LANGUAGE plpgsql AS\n$body$\nBEGIN\n NEW.updated_at = now();\n RETURN NEW;\nEND;\n$body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "4c69b495-9c67-4659-9fd8-d9681406337a",
"name": "Configure Enumerations",
"type": "n8n-nodes-base.postgres",
"position": [
2912,
-1920
],
"parameters": {
"query": "DO\n$body$\nBEGIN\n IF NOT EXISTS (\n SELECT 1 FROM pg_type t\n JOIN pg_namespace n ON n.oid = t.typnamespace\n WHERE t.typname = 'billing_type_enum' AND n.nspname = 'public'\n ) THEN\n CREATE TYPE public.billing_type_enum AS ENUM (\n 'recurring',\n 'one_time',\n 'usage_based',\n 'penalty'\n );\n END IF;\nEND\n$body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "2605a6b6-de84-4581-a6bb-d44d7da11c21",
"name": "Set Up Vendors Table",
"type": "n8n-nodes-base.postgres",
"position": [
3136,
-1920
],
"parameters": {
"query": "CREATE TABLE IF NOT EXISTS public.vendors (\n id uuid PRIMARY KEY DEFAULT gen_random_uuid(),\n name text NOT NULL,\n domain text NULL,\n support_email text NULL,\n created_at timestamptz NOT NULL DEFAULT now(),\n updated_at timestamptz NOT NULL DEFAULT now()\n);\n\nCREATE UNIQUE INDEX IF NOT EXISTS vendors_domain_unique_idx\n ON public.vendors (lower(domain)) WHERE domain IS NOT NULL;\n\nCREATE INDEX IF NOT EXISTS vendors_name_idx\n ON public.vendors (name);\n\nDROP TRIGGER IF EXISTS trg_vendors_updated_at ON public.vendors;\nCREATE TRIGGER trg_vendors_updated_at\n BEFORE UPDATE ON public.vendors\n FOR EACH ROW EXECUTE FUNCTION public.set_updated_at();",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "a1138ab8-1044-4c42-a7d8-aaf3a65a9520",
"name": "Set Up Documents Table",
"type": "n8n-nodes-base.postgres",
"position": [
3344,
-1920
],
"parameters": {
"query": "CREATE TABLE IF NOT EXISTS public.documents (\n id uuid PRIMARY KEY DEFAULT gen_random_uuid(),\n vendor_id uuid NOT NULL REFERENCES public.vendors(id) ON DELETE RESTRICT,\n fingerprint text NOT NULL,\n n8n_execution_id text NULL,\n invoice_number text NULL,\n total_amount numeric(18,6) NOT NULL DEFAULT 0,\n currency char(3) NOT NULL,\n received_at timestamptz NOT NULL,\n raw_payload jsonb NOT NULL DEFAULT '{}'::jsonb,\n llm_extraction_payload jsonb NULL,\n created_at timestamptz NOT NULL DEFAULT now(),\n updated_at timestamptz NOT NULL DEFAULT now()\n);\n\nCREATE UNIQUE INDEX IF NOT EXISTS documents_fingerprint_unique_idx\n ON public.documents (fingerprint);\n\nCREATE INDEX IF NOT EXISTS documents_vendor_id_idx ON public.documents (vendor_id);\nCREATE INDEX IF NOT EXISTS documents_received_at_idx ON public.documents (received_at DESC);\nCREATE INDEX IF NOT EXISTS documents_invoice_number_idx ON public.documents (invoice_number);\nCREATE INDEX IF NOT EXISTS documents_currency_idx ON public.documents (currency);\n\nALTER TABLE public.documents\n DROP CONSTRAINT IF EXISTS documents_currency_check;\nALTER TABLE public.documents\n ADD CONSTRAINT documents_currency_check CHECK (currency ~ '^[A-Z]{3}$');\n\nDROP TRIGGER IF EXISTS trg_documents_updated_at ON public.documents;\nCREATE TRIGGER trg_documents_updated_at\n BEFORE UPDATE ON public.documents\n FOR EACH ROW EXECUTE FUNCTION public.set_updated_at();",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "a49ee118-0e5f-4b6e-80a7-325c1af918e4",
"name": "Set Up Line Items Table",
"type": "n8n-nodes-base.postgres",
"position": [
3568,
-1920
],
"parameters": {
"query": "CREATE TABLE IF NOT EXISTS public.line_items (\n id uuid PRIMARY KEY DEFAULT gen_random_uuid(),\n document_id uuid NOT NULL REFERENCES public.documents(id) ON DELETE CASCADE,\n vendor_id uuid NOT NULL REFERENCES public.vendors(id) ON DELETE RESTRICT,\n position integer NOT NULL,\n description text NOT NULL,\n clean_name text NOT NULL,\n billing_type public.billing_type_enum NOT NULL,\n billing_cycle text NULL,\n quantity numeric(18,6) NOT NULL DEFAULT 1,\n unit_price numeric(18,6) NOT NULL DEFAULT 0,\n total_price numeric(18,6) NOT NULL DEFAULT 0,\n currency char(3) NOT NULL,\n period_end date NULL,\n category text NULL,\n original_description text NULL,\n seat_count integer NULL,\n tier text NULL,\n raw_metadata jsonb NOT NULL DEFAULT '{}'::jsonb,\n created_at timestamptz NOT NULL DEFAULT now(),\n updated_at timestamptz NOT NULL DEFAULT now()\n);\n\nCREATE UNIQUE INDEX IF NOT EXISTS line_items_document_position_unique_idx\n ON public.line_items (document_id, position);\n\nCREATE INDEX IF NOT EXISTS line_items_document_id_idx ON public.line_items (document_id);\nCREATE INDEX IF NOT EXISTS line_items_vendor_id_idx ON public.line_items (vendor_id);\nCREATE INDEX IF NOT EXISTS line_items_billing_type_idx ON public.line_items (billing_type);\nCREATE INDEX IF NOT EXISTS line_items_billing_cycle_idx ON public.line_items (billing_cycle);\nCREATE INDEX IF NOT EXISTS line_items_category_idx ON public.line_items (category);\nCREATE INDEX IF NOT EXISTS line_items_period_end_idx ON public.line_items (period_end);\n\nALTER TABLE public.line_items\n DROP CONSTRAINT IF EXISTS line_items_currency_check;\nALTER TABLE public.line_items\n ADD CONSTRAINT line_items_currency_check CHECK (currency ~ '^[A-Z]{3}$');\n\nALTER TABLE public.line_items\n DROP CONSTRAINT IF EXISTS line_items_position_check;\nALTER TABLE public.line_items\n ADD CONSTRAINT line_items_position_check CHECK (position >= 0);\n\nDROP TRIGGER IF EXISTS trg_line_items_updated_at ON public.line_items;\nCREATE TRIGGER trg_line_items_updated_at\n BEFORE UPDATE ON public.line_items\n FOR EACH ROW EXECUTE FUNCTION public.set_updated_at();",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "65676e1e-6dcd-419c-ac44-c70d021a3012",
"name": "Set Up Pipeline Jobs Table",
"type": "n8n-nodes-base.postgres",
"position": [
3792,
-1920
],
"parameters": {
"query": "CREATE TABLE IF NOT EXISTS public.pipeline_jobs (\n id uuid PRIMARY KEY DEFAULT gen_random_uuid(),\n job_key text NOT NULL,\n attachment_id text NULL,\n fingerprint text NULL,\n status text NOT NULL DEFAULT 'pending',\n attempt_count integer NOT NULL DEFAULT 0,\n workflow_name text NULL,\n current_stage text NULL,\n locked_at timestamptz NULL,\n started_at timestamptz NULL,\n finished_at timestamptz NULL,\n last_error text NULL,\n payload jsonb NOT NULL DEFAULT '{}'::jsonb,\n result jsonb NULL,\n created_at timestamptz NOT NULL DEFAULT now(),\n updated_at timestamptz NOT NULL DEFAULT now()\n);\n\nDO\n$body$\nBEGIN\n IF NOT EXISTS (\n SELECT 1 FROM pg_constraint WHERE conname = 'pipeline_jobs_status_check'\n ) THEN\n ALTER TABLE public.pipeline_jobs ADD CONSTRAINT pipeline_jobs_status_check\n CHECK (status IN ('pending','processing','succeeded','failed','manual_review','skipped'));\n END IF;\nEND\n$body$;\n\nCREATE UNIQUE INDEX IF NOT EXISTS pipeline_jobs_job_key_unique_idx ON public.pipeline_jobs (job_key);\nCREATE INDEX IF NOT EXISTS pipeline_jobs_status_idx ON public.pipeline_jobs (status);\nCREATE INDEX IF NOT EXISTS pipeline_jobs_attachment_id_idx ON public.pipeline_jobs (attachment_id);\nCREATE INDEX IF NOT EXISTS pipeline_jobs_fingerprint_idx ON public.pipeline_jobs (fingerprint);\nCREATE INDEX IF NOT EXISTS pipeline_jobs_created_at_idx ON public.pipeline_jobs (created_at DESC);\nCREATE INDEX IF NOT EXISTS pipeline_jobs_updated_at_idx ON public.pipeline_jobs (updated_at DESC);\nCREATE INDEX IF NOT EXISTS pipeline_jobs_workflow_name_idx ON public.pipeline_jobs (workflow_name);\n\nDROP TRIGGER IF EXISTS trg_pipeline_jobs_updated_at ON public.pipeline_jobs;\nCREATE TRIGGER trg_pipeline_jobs_updated_at\n BEFORE UPDATE ON public.pipeline_jobs\n FOR EACH ROW EXECUTE FUNCTION public.set_updated_at();",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "b1ac576c-93cf-47f5-b834-237121d63fd2",
"name": "Set Up Audit Logs Table",
"type": "n8n-nodes-base.postgres",
"position": [
4016,
-1920
],
"parameters": {
"query": "-- 1. Create the Table\nCREATE TABLE IF NOT EXISTS public.pipeline_audit_logs (\n id uuid PRIMARY KEY DEFAULT gen_random_uuid(),\n created_at timestamptz NOT NULL DEFAULT now(),\n source text NOT NULL DEFAULT 'n8n',\n workflow_name text NULL,\n node_name text NULL,\n execution_id text NULL,\n fingerprint text NULL,\n attachment_id text NULL,\n document_id uuid NULL REFERENCES public.documents(id),\n vendor_id uuid NULL REFERENCES public.vendors(id),\n attempt integer NOT NULL DEFAULT 1,\n stage text NOT NULL CHECK (length(TRIM(stage)) > 0),\n status text NOT NULL DEFAULT 'info' CHECK (status = ANY (ARRAY['info','success','warning','error'])),\n message text NULL,\n payload jsonb NOT NULL DEFAULT '{}'::jsonb,\n error jsonb NULL,\n context jsonb NOT NULL DEFAULT '{}'::jsonb\n);\n\n-- 2. Create Indexes\nCREATE INDEX IF NOT EXISTS audit_logs_fingerprint_idx ON public.pipeline_audit_logs (fingerprint);\nCREATE INDEX IF NOT EXISTS audit_logs_execution_id_idx ON public.pipeline_audit_logs (execution_id);\nCREATE INDEX IF NOT EXISTS audit_logs_created_at_idx ON public.pipeline_audit_logs (created_at DESC);\n\n-- 3. Create the Function (The part that was missing!)\nCREATE OR REPLACE FUNCTION public.log_pipeline_event(p_event jsonb)\nRETURNS uuid LANGUAGE plpgsql SECURITY DEFINER AS $body$\nDECLARE v_log_id uuid;\nBEGIN\n INSERT INTO public.pipeline_audit_logs (\n source, workflow_name, node_name, execution_id, fingerprint, \n attachment_id, document_id, vendor_id, stage, status, \n message, payload, error, context\n ) VALUES (\n coalesce(p_event->>'source', 'n8n'),\n p_event->>'workflowName',\n p_event->>'nodeName',\n p_event->>'executionId',\n p_event->>'fingerprint',\n p_event->>'attachmentId',\n (p_event->>'documentId')::uuid,\n (p_event->>'vendorId')::uuid,\n p_event->>'stage',\n coalesce(p_event->>'status', 'info'),\n p_event->>'message',\n coalesce(p_event->'payload', '{}'::jsonb),\n p_event->'error',\n coalesce(p_event->'context', '{}'::jsonb)\n ) RETURNING id INTO v_log_id;\n RETURN v_log_id;\nEND; $body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "d3964780-879c-4960-ab75-3d2d6d997efb",
"name": "Set Up Spend Summary View",
"type": "n8n-nodes-base.postgres",
"position": [
4224,
-1920
],
"parameters": {
"query": "CREATE OR REPLACE VIEW public.document_spend_summary AS\nSELECT\n d.id AS document_id,\n d.fingerprint,\n d.invoice_number,\n d.total_amount,\n d.currency,\n d.received_at,\n v.id AS vendor_id,\n v.name AS vendor_name,\n v.domain AS vendor_domain,\n COUNT(li.id) AS line_item_count,\n COALESCE(SUM(CASE WHEN li.total_price > 0 THEN li.total_price ELSE 0 END), 0) AS positive_spend,\n COALESCE(SUM(CASE WHEN li.total_price < 0 THEN li.total_price ELSE 0 END), 0) AS negative_offsets\nFROM public.documents d\nJOIN public.vendors v ON v.id = d.vendor_id\nLEFT JOIN public.line_items li ON li.document_id = d.id\nGROUP BY d.id, v.id;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "c9559039-286d-4685-b087-50a12216ae25",
"name": "Execute Full Invoice Process",
"type": "n8n-nodes-base.postgres",
"position": [
4448,
-1920
],
"parameters": {
"query": "CREATE OR REPLACE FUNCTION public.process_full_invoice(payload jsonb)\nRETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS\n$body$\nDECLARE\n v_vendor_id uuid;\n v_document_id uuid;\n v_existing_document_id uuid;\n v_vendor_name text;\n v_vendor_domain text;\n v_support_email text;\n v_fingerprint text;\n v_n8n_execution_id text;\n v_invoice_number text;\n v_total_amount numeric;\n v_currency text;\n v_received_at timestamptz;\n v_inserted_items integer := 0;\nBEGIN\n IF payload IS NULL THEN RAISE EXCEPTION 'payload is null'; END IF;\n\n v_fingerprint := nullif(trim(payload->>'fingerprint'), '');\n IF v_fingerprint IS NULL THEN RAISE EXCEPTION 'fingerprint is required'; END IF;\n\n v_vendor_name := COALESCE(nullif(trim(payload->>'vendorName'), ''), 'Unknown Vendor');\n v_vendor_domain := nullif(lower(trim(payload->>'vendorDomain')), '');\n v_support_email := nullif(trim(payload->>'supportEmail'), '');\n v_n8n_execution_id := nullif(trim(payload->>'n8nExecutionId'), '');\n v_invoice_number := nullif(trim(payload->>'invoiceNumber'), '');\n v_total_amount := COALESCE((payload->>'totalAmount')::numeric, 0);\n v_currency := upper(COALESCE(nullif(trim(payload->>'currency'), ''), 'USD'));\n IF v_currency !~ '^[A-Z]{3}$' THEN v_currency := 'USD'; END IF;\n v_received_at := COALESCE((payload->>'receivedAt')::timestamptz, now());\n\n IF v_vendor_domain IS NOT NULL THEN\n SELECT id INTO v_vendor_id FROM public.vendors WHERE lower(domain) = v_vendor_domain LIMIT 1;\n END IF;\n IF v_vendor_id IS NULL THEN\n SELECT id INTO v_vendor_id FROM public.vendors\n WHERE domain IS NULL AND lower(name) = lower(v_vendor_name) LIMIT 1;\n END IF;\n IF v_vendor_id IS NULL THEN\n INSERT INTO public.vendors (name, domain, support_email)\n VALUES (v_vendor_name, v_vendor_domain, v_support_email)\n RETURNING id INTO v_vendor_id;\n ELSE\n UPDATE public.vendors SET\n name = COALESCE(v_vendor_name, name),\n domain = COALESCE(v_vendor_domain, domain),\n support_email = COALESCE(v_support_email, support_email)\n WHERE id = v_vendor_id;\n END IF;\n\n SELECT id INTO v_existing_document_id FROM public.documents WHERE fingerprint = v_fingerprint LIMIT 1;\n IF v_existing_document_id IS NULL THEN\n INSERT INTO public.documents (\n vendor_id, fingerprint, n8n_execution_id, invoice_number,\n total_amount, currency, received_at, raw_payload\n ) VALUES (\n v_vendor_id, v_fingerprint, v_n8n_execution_id, v_invoice_number,\n v_total_amount, v_currency, v_received_at, payload\n ) RETURNING id INTO v_document_id;\n ELSE\n v_document_id := v_existing_document_id;\n UPDATE public.documents SET\n vendor_id = v_vendor_id,\n n8n_execution_id = COALESCE(v_n8n_execution_id, n8n_execution_id),\n invoice_number = COALESCE(v_invoice_number, invoice_number),\n total_amount = v_total_amount,\n currency = v_currency,\n received_at = v_received_at,\n raw_payload = payload\n WHERE id = v_document_id;\n END IF;\n\n DELETE FROM public.line_items WHERE document_id = v_document_id;\n\n INSERT INTO public.line_items (\n document_id, vendor_id, position, description, clean_name,\n billing_type, billing_cycle, quantity, unit_price, total_price,\n currency, period_end, category, original_description, seat_count, tier, raw_metadata\n )\n SELECT\n v_document_id, v_vendor_id, (i.ordinality - 1), \n COALESCE(nullif(trim(i.value->>'description'), ''), 'Item'),\n COALESCE(nullif(trim(i.value->>'cleanName'), ''), nullif(trim(i.value->>'description'), ''), 'Item'),\n CASE lower(COALESCE(i.value->>'billingType', 'one_time'))\n WHEN 'recurring' THEN 'recurring'::public.billing_type_enum\n WHEN 'usage_based' THEN 'usage_based'::public.billing_type_enum\n WHEN 'penalty' THEN 'penalty'::public.billing_type_enum\n ELSE 'one_time'::public.billing_type_enum\n END,\n nullif(trim(i.value->>'billingCycle'), ''),\n COALESCE((i.value->>'quantity')::numeric, 1),\n COALESCE((i.value->>'unitPrice')::numeric, 0),\n COALESCE((i.value->>'totalPrice')::numeric, 0),\n upper(COALESCE(nullif(trim(i.value->>'currency'), ''), v_currency)),\n nullif(i.value->>'periodEnd', '')::date,\n nullif(trim(i.value #>> '{metadata,category}'), ''),\n nullif(trim(i.value #>> '{metadata,originalDescription}'), ''),\n nullif(i.value #>> '{metadata,seatCount}', '')::integer,\n nullif(trim(i.value #>> '{metadata,tier}'), ''),\n COALESCE(i.value->'metadata', '{}'::jsonb)\n FROM jsonb_array_elements(COALESCE(payload->'items', '[]'::jsonb)) WITH ORDINALITY AS i;\n\n GET DIAGNOSTICS v_inserted_items = ROW_COUNT;\n\n RETURN jsonb_build_object(\n 'status', 'ok',\n 'vendor_id', v_vendor_id,\n 'document_id', v_document_id,\n 'fingerprint', v_fingerprint,\n 'inserted_line_items', v_inserted_items,\n 'is_new_document', v_existing_document_id IS NULL\n );\nEND;\n$body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "f925d5bb-1453-46cc-943d-f76c6c4c6de7",
"name": "Run Pipeline Job Acquisition",
"type": "n8n-nodes-base.postgres",
"position": [
4672,
-1920
],
"parameters": {
"query": "CREATE OR REPLACE FUNCTION public.acquire_pipeline_job(\n p_job_key text,\n p_attachment_id text DEFAULT NULL,\n p_payload jsonb DEFAULT '{}'::jsonb,\n p_workflow_name text DEFAULT 'n8n',\n p_stage text DEFAULT 'started'\n)\nRETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS\n$body$\nDECLARE\n v_job_id uuid;\n v_existing public.pipeline_jobs%rowtype;\nBEGIN\n IF p_job_key IS NULL OR length(trim(p_job_key)) = 0 THEN\n RAISE EXCEPTION 'p_job_key is required';\n END IF;\n\n INSERT INTO public.pipeline_jobs (\n job_key, attachment_id, status, attempt_count,\n workflow_name, current_stage, locked_at, started_at, payload\n ) VALUES (\n trim(p_job_key), nullif(trim(p_attachment_id), ''), 'processing', 1,\n nullif(trim(p_workflow_name), ''), nullif(trim(p_stage), ''),\n now(), now(), COALESCE(p_payload, '{}'::jsonb)\n )\n ON CONFLICT (job_key) DO NOTHING\n RETURNING id INTO v_job_id;\n\n IF v_job_id IS NOT NULL THEN\n RETURN jsonb_build_object(\n 'acquired', true, 'job_id', v_job_id,\n 'job_key', trim(p_job_key), 'status', 'processing', 'reason', 'new_job_created'\n );\n END IF;\n\n SELECT * INTO v_existing FROM public.pipeline_jobs WHERE job_key = trim(p_job_key) LIMIT 1;\n\n RETURN jsonb_build_object(\n 'acquired', false, 'job_id', v_existing.id,\n 'job_key', v_existing.job_key, 'status', v_existing.status,\n 'reason', 'job_already_exists', 'attempt_count', v_existing.attempt_count,\n 'current_stage', v_existing.current_stage, 'locked_at', v_existing.locked_at,\n 'finished_at', v_existing.finished_at\n );\nEND;\n$body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "51a6558f-f365-45bd-bd42-373f1e1a0fdb",
"name": "Finalize Pipeline Job",
"type": "n8n-nodes-base.postgres",
"position": [
4896,
-1920
],
"parameters": {
"query": "CREATE OR REPLACE FUNCTION public.complete_pipeline_job(\n p_job_key text,\n p_fingerprint text DEFAULT NULL,\n p_result jsonb DEFAULT '{}'::jsonb,\n p_stage text DEFAULT 'completed'\n)\nRETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS\n$body$\nDECLARE v_job_id uuid;\nBEGIN\n IF p_job_key IS NULL OR length(trim(p_job_key)) = 0 THEN\n RAISE EXCEPTION 'p_job_key is required';\n END IF;\n\n UPDATE public.pipeline_jobs SET\n status = 'succeeded',\n fingerprint = COALESCE(nullif(trim(p_fingerprint), ''), fingerprint),\n result = COALESCE(p_result, '{}'::jsonb),\n finished_at = now(),\n current_stage = nullif(trim(p_stage), ''),\n updated_at = now()\n WHERE job_key = trim(p_job_key)\n RETURNING id INTO v_job_id;\n\n IF v_job_id IS NULL THEN\n RAISE EXCEPTION 'No pipeline_jobs row found for job_key: %', p_job_key;\n END IF;\n\n RETURN jsonb_build_object(\n 'status', 'ok', 'job_id', v_job_id,\n 'job_key', trim(p_job_key), 'final_status', 'succeeded'\n );\nEND;\n$body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "0dea7d57-6ba7-4c19-887c-c39b8b223c68",
"name": "Process Pipeline Job Failure",
"type": "n8n-nodes-base.postgres",
"position": [
5104,
-1920
],
"parameters": {
"query": "CREATE OR REPLACE FUNCTION public.fail_pipeline_job(\n p_job_key text,\n p_error text,\n p_result jsonb DEFAULT '{}'::jsonb,\n p_stage text DEFAULT 'failed'\n)\nRETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER SET search_path = public AS\n$body$\nDECLARE v_job_id uuid;\nBEGIN\n IF p_job_key IS NULL OR length(trim(p_job_key)) = 0 THEN\n RAISE EXCEPTION 'p_job_key is required';\n END IF;\n\n UPDATE public.pipeline_jobs SET\n status = 'failed',\n last_error = nullif(trim(p_error), ''),\n result = COALESCE(p_result, '{}'::jsonb),\n finished_at = now(),\n current_stage = nullif(trim(p_stage), ''),\n updated_at = now()\n WHERE job_key = trim(p_job_key)\n RETURNING id INTO v_job_id;\n\n IF v_job_id IS NULL THEN\n RAISE EXCEPTION 'No pipeline_jobs row found for job_key: %', p_job_key;\n END IF;\n\n RETURN jsonb_build_object(\n 'status', 'ok', 'job_id', v_job_id,\n 'job_key', trim(p_job_key), 'final_status', 'failed'\n );\nEND;\n$body$;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"typeVersion": 2.5
},
{
"id": "2c9f3bd1-10d9-4a48-9b88-75609e0af8b6",
"name": "Complete Setup with Code Logic",
"type": "n8n-nodes-base.code",
"position": [
5328,
-1920
],
"parameters": {
"jsCode": "return [{\n json: {\n status: '\u2705 SpendBase DB setup complete',\n steps: [\n '1) pgcrypto + set_updated_at trigger fn',\n '2) billing_type_enum',\n '3) vendors table + indexes + trigger',\n '4) documents table + indexes + trigger',\n '5) line_items table + indexes + trigger',\n '6) pipeline_jobs table + indexes + trigger',\n '7) pipeline_audit_logs table + indexes',\n '8) document_spend_summary view',\n '9) process_full_invoice()',\n '10) acquire_pipeline_job()',\n '11) complete_pipeline_job()',\n '12) fail_pipeline_job()'\n ],\n completedAt: new Date().toISOString()\n }\n}];"
},
"typeVersion": 2
},
{
"id": "6a80523a-1f9e-4db6-8504-4a1c32af1970",
"name": "Sticky Note8",
"type": "n8n-nodes-base.stickyNote",
"position": [
4400,
-2032
],
"parameters": {
"color": 7,
"width": 848,
"height": 272,
"content": "## Process invoices and jobs\n\nHandles full invoice processing and manages pipeline jobs functions."
},
"typeVersion": 1
},
{
"id": "57706905-4223-4078-bf24-01d6d02e679a",
"name": "Sticky Note9",
"type": "n8n-nodes-base.stickyNote",
"position": [
1808,
-2048
],
"parameters": {
"width": 480,
"height": 272,
"content": "### How it works\n\n1. Manually trigger the setup process to start the workflow.\n2. Set up necessary extensions and triggers using SQL queries.\n3. Initialize various database tables and views required for the setup.\n4. Execute functions to process invoices and manage pipeline jobs.\n5. Finalize the setup with custom code execution to complete the workflow.\n"
},
"typeVersion": 1
},
{
"id": "bf443087-bcc8-4b5f-b838-a6127bdb4fff",
"name": "Sticky Note10",
"type": "n8n-nodes-base.stickyNote",
"posit
Credentials you'll need
Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.
httpBearerAuthhttpHeaderAuthjigsawStackApipostgresredis
For the full experience including quality scoring and batch install features for each workflow upgrade to Pro
About this workflow
Jigsaw API key for image processing, I use this as a gatekeeper/second pair of eyes. LINK to their website https://jigsawstack.com/ SECOND A postgress DATABASE (I use Supabase) LlamaCloud for the parsing (you can replace this with Jigsaw if you like) Redis (Optional , but I…
Source: https://n8n.io/workflows/14997/ — original creator credit. Request a take-down →
Related workflows
Workflows that share integrations, category, or trigger type with this one. All free to copy and import.
The "Short Content" automation is a powerful, all-in-one solution designed to streamline the creation of short videos for social media, marketing, or personal projects. Leveraging cutting-edge AI tool
🔥 LIMITED-TIME OFFER: AI Video Automation (Previously \$59) Previously Template
Content Review Loop Workflow. Uses postgres, httpRequest. Webhook trigger; 20 nodes.
Creates an AI-powered sales and support agent connected to live store data from Shopify/WooCommerce. MCP ensures controlled access to inventory and order systems. Automatically handles customer querie
Content Generation Workflow. Uses postgres, httpRequest. Webhook trigger; 13 nodes.