This workflow corresponds to n8n.io template #15833 — we link there as the canonical source.
This workflow follows the HTTP Request → Postgres recipe pattern — see all workflows that pair these two integrations.
The workflow JSON
Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →
{
"id": "jHSkDPR7DuzOccmC",
"meta": {
"templateCredsSetupCompleted": true
},
"name": "PostgreSQL Auto-Discovery to Pinecone Vector Knowledge Base (Gemini)",
"tags": [],
"nodes": [
{
"id": "d10aa602-ade3-4e79-b868-e8a445cb3a34",
"name": "Manual Trigger",
"type": "n8n-nodes-base.manualTrigger",
"position": [
7824,
6768
],
"parameters": {},
"typeVersion": 1
},
{
"id": "5ec270d6-9ef8-4d5e-b9eb-409cda60955e",
"name": "Cron Trigger (Optional)",
"type": "n8n-nodes-base.scheduleTrigger",
"disabled": true,
"position": [
7824,
6944
],
"parameters": {
"rule": {
"interval": [
{
"field": "hours",
"hoursInterval": 6
}
]
}
},
"typeVersion": 1.2
},
{
"id": "0c1bff6b-1592-409e-af48-7db87d46f227",
"name": "Workflow Configuration",
"type": "n8n-nodes-base.set",
"position": [
8096,
6848
],
"parameters": {
"options": {},
"assignments": {
"assignments": [
{
"id": "sourceName",
"name": "sourceName",
"type": "string",
"value": "postgres"
},
{
"id": "allowedSchemas",
"name": "allowedSchemas",
"type": "string",
"value": "public"
},
{
"id": "allowedTables",
"name": "allowedTables",
"type": "string",
"value": ""
},
{
"id": "excludedTables",
"name": "excludedTables",
"type": "string",
"value": "schema_migrations, migrations, audit_logs"
},
{
"id": "excludedColumns",
"name": "excludedColumns",
"type": "string",
"value": ""
},
{
"id": "maxRowsPerTable",
"name": "maxRowsPerTable",
"type": "number",
"value": 10
},
{
"id": "maxDocumentsPerRun",
"name": "maxDocumentsPerRun",
"type": "number",
"value": 10
},
{
"id": "minTextColumns",
"name": "minTextColumns",
"type": "number",
"value": 1
},
{
"id": "fullSyncTablesWithoutTimestamp",
"name": "fullSyncTablesWithoutTimestamp",
"type": "boolean",
"value": false
},
{
"id": "skipTablesWithoutPrimaryKey",
"name": "skipTablesWithoutPrimaryKey",
"type": "boolean",
"value": true
},
{
"id": "includeScalarContextColumns",
"name": "includeScalarContextColumns",
"type": "boolean",
"value": true
},
{
"id": "defaultLastSync",
"name": "defaultLastSync",
"type": "string",
"value": "1970-01-01T00:00:00.000Z"
},
{
"id": "maxDocumentCharacters",
"name": "maxDocumentCharacters",
"type": "number",
"value": 2000
},
{
"id": "geminiEmbeddingModel",
"name": "geminiEmbeddingModel",
"type": "string",
"value": "gemini-embedding-001"
},
{
"id": "geminiOutputDimensionality",
"name": "geminiOutputDimensionality",
"type": "number",
"value": 1024
},
{
"id": "geminiBatchSize",
"name": "geminiBatchSize",
"type": "number",
"value": 1
},
{
"id": "geminiSmokeTestMode",
"name": "geminiSmokeTestMode",
"type": "boolean",
"value": true
},
{
"id": "pineconeIndexHost",
"name": "pineconeIndexHost",
"type": "string",
"value": "first-vector.pinecone.io"
},
{
"id": "pineconeNamespacePrefix",
"name": "pineconeNamespacePrefix",
"type": "string",
"value": "postgres-"
},
{
"id": "pineconeBatchSize",
"name": "pineconeBatchSize",
"type": "number",
"value": 50
}
]
}
},
"typeVersion": 3.4
},
{
"id": "5f8820bf-ad28-42ab-a0a5-b48e3d907157",
"name": "Discover PostgreSQL Metadata",
"type": "n8n-nodes-base.postgres",
"maxTries": 3,
"position": [
8352,
6848
],
"parameters": {
"query": "SELECT\n c.table_schema,\n c.table_name,\n c.column_name,\n c.ordinal_position,\n c.data_type,\n c.udt_name,\n c.character_maximum_length,\n CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_primary_key\nFROM information_schema.columns c\nLEFT JOIN (\n SELECT\n kcu.table_schema,\n kcu.table_name,\n kcu.column_name\n FROM information_schema.table_constraints tc\n JOIN information_schema.key_column_usage kcu\n ON tc.constraint_name = kcu.constraint_name\n AND tc.table_schema = kcu.table_schema\n AND tc.table_name = kcu.table_name\n WHERE tc.constraint_type = 'PRIMARY KEY'\n) pk\n ON c.table_schema = pk.table_schema\n AND c.table_name = pk.table_name\n AND c.column_name = pk.column_name\nWHERE c.table_schema NOT IN ('pg_catalog', 'information_schema')\nORDER BY c.table_schema, c.table_name, c.ordinal_position;",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"retryOnFail": true,
"typeVersion": 2.6,
"waitBetweenTries": 5000
},
{
"id": "9a8f92ad-b027-4249-9094-8c46f4ab1194",
"name": "Filter Indexable Tables",
"type": "n8n-nodes-base.code",
"position": [
8624,
6848
],
"parameters": {
"jsCode": "const config = $('Workflow Configuration').first().json;\nconst rows = $input.all().map(item => item.json);\nconst staticData = $getWorkflowStaticData('global');\n\nstaticData.postgresPineconeSync = staticData.postgresPineconeSync || {};\nstaticData.postgresPineconeSync.tableState = staticData.postgresPineconeSync.tableState || {};\nstaticData.postgresPineconeSync.currentRun = {\n startedAt: new Date().toISOString(),\n discoveredColumns: rows.length,\n discoveredTables: 0,\n candidateTables: 0,\n skippedTables: [],\n};\n\nconst parseList = (value) => String(value || '').split(',').map(v => v.trim()).filter(Boolean);\nconst allowedSchemas = parseList(config.allowedSchemas);\nconst allowedTables = parseList(config.allowedTables).map(v => v.toLowerCase());\nconst excludedTables = parseList(config.excludedTables).map(v => v.toLowerCase());\nconst configuredExcludedColumns = parseList(config.excludedColumns).map(v => v.toLowerCase());\nconst sensitivePatterns = ['password', 'passwd', 'pwd', 'token', 'secret', 'api_key', 'apikey', 'access_key', 'refresh_token', 'reset_token', 'otp', 'pin', 'session', 'cookie', 'hash', 'salt', 'private_key', 'client_secret'];\nconst timestampCandidates = ['updated_at', 'modified_at', 'last_modified', 'created_at'];\nconst textTypes = new Set(['text', 'character varying', 'varchar', 'character', 'char', 'json', 'jsonb']);\nconst scalarTypes = new Set(['integer', 'bigint', 'smallint', 'numeric', 'decimal', 'real', 'double precision', 'boolean', 'date', 'timestamp without time zone', 'timestamp with time zone', 'uuid']);\nconst maxRows = Math.max(1, Number(config.maxRowsPerTable || 500));\nconst minTextColumns = Math.max(1, Number(config.minTextColumns || 1));\nconst namespacePrefix = String(config.pineconeNamespacePrefix || 'postgres-');\n\nfunction tableKey(schema, table) { return `${schema}.${table}`; }\nfunction quoteIdent(value) { return '\"' + String(value).replace(/\"/g, '\"\"') + '\"'; }\nfunction quoteLiteral(value) { return \"'\" + String(value).replace(/'/g, \"''\") + \"'\"; }\nfunction isSensitiveColumn(columnName) {\n const lower = String(columnName || '').toLowerCase();\n return sensitivePatterns.some(pattern => lower.includes(pattern)) || configuredExcludedColumns.includes(lower);\n}\nfunction isAllowedTable(schema, table) {\n const key = tableKey(schema, table).toLowerCase();\n const tableOnly = String(table).toLowerCase();\n if (allowedSchemas.length && !allowedSchemas.includes(schema)) return false;\n if (allowedTables.length && !allowedTables.includes(key) && !allowedTables.includes(tableOnly)) return false;\n if (excludedTables.includes(key) || excludedTables.includes(tableOnly)) return false;\n return true;\n}\n\nconst tables = new Map();\nfor (const row of rows) {\n const schema = row.table_schema;\n const table = row.table_name;\n if (!schema || !table) continue;\n const key = tableKey(schema, table);\n if (!tables.has(key)) tables.set(key, { schema, table, columns: [] });\n tables.get(key).columns.push(row);\n}\n\nstaticData.postgresPineconeSync.currentRun.discoveredTables = tables.size;\nconst outputs = [];\nfor (const [key, table] of tables.entries()) {\n if (!isAllowedTable(table.schema, table.table)) {\n staticData.postgresPineconeSync.currentRun.skippedTables.push({ table: key, reason: 'Filtered by allowed/excluded table settings' });\n continue;\n }\n const safeColumns = table.columns.filter(col => !isSensitiveColumn(col.column_name));\n const primaryKeys = safeColumns.filter(col => col.is_primary_key === true || col.is_primary_key === 'true').map(col => col.column_name);\n const primaryKey = primaryKeys[0];\n if (!primaryKey && config.skipTablesWithoutPrimaryKey) {\n staticData.postgresPineconeSync.currentRun.skippedTables.push({ table: key, reason: 'No primary key detected' });\n continue;\n }\n const timestampColumn = timestampCandidates.find(candidate => safeColumns.some(col => String(col.column_name).toLowerCase() === candidate)) || null;\n const textColumns = safeColumns.filter(col => textTypes.has(String(col.data_type).toLowerCase()) || textTypes.has(String(col.udt_name).toLowerCase())).map(col => col.column_name).filter(col => col !== primaryKey);\n if (textColumns.length < minTextColumns) {\n staticData.postgresPineconeSync.currentRun.skippedTables.push({ table: key, reason: 'No useful text/json columns detected' });\n continue;\n }\n if (!timestampColumn && !config.fullSyncTablesWithoutTimestamp) {\n staticData.postgresPineconeSync.currentRun.skippedTables.push({ table: key, reason: 'No timestamp column and full sync fallback is disabled' });\n continue;\n }\n const scalarColumns = safeColumns.filter(col => scalarTypes.has(String(col.data_type).toLowerCase())).map(col => col.column_name);\n const selectedColumns = [...new Set([primaryKey, timestampColumn, ...textColumns, ...(config.includeScalarContextColumns ? scalarColumns : [])].filter(Boolean))];\n const state = staticData.postgresPineconeSync.tableState[key] || {};\n const lastSync = state.lastSync || config.defaultLastSync || '1970-01-01T00:00:00.000Z';\n const schemaSql = quoteIdent(table.schema);\n const tableSql = quoteIdent(table.table);\n const orderSql = timestampColumn ? `ORDER BY ${quoteIdent(timestampColumn)} ASC` : `ORDER BY ${quoteIdent(primaryKey)} ASC`;\n const whereSql = timestampColumn ? `WHERE ${quoteIdent(timestampColumn)} > ${quoteLiteral(lastSync)}` : '';\n const namespace = `${namespacePrefix}${table.schema}.${table.table}`;\n const fetchSql = [\n 'SELECT',\n ` ${quoteLiteral(table.schema)} AS \"__n8n_schema\",`,\n ` ${quoteLiteral(table.table)} AS \"__n8n_table\",`,\n ` ${quoteLiteral(primaryKey || '')} AS \"__n8n_primary_key\",`,\n ` ${quoteLiteral(timestampColumn || '')} AS \"__n8n_timestamp_column\",`,\n ` ${quoteLiteral(JSON.stringify(textColumns))} AS \"__n8n_text_columns\",`,\n ` ${quoteLiteral(JSON.stringify(selectedColumns))} AS \"__n8n_selected_columns\",`,\n ` ${quoteLiteral(namespace)} AS \"__n8n_namespace\",`,\n ` ${quoteLiteral(key)} AS \"__n8n_table_key\",`,\n ' row_to_json(src)::jsonb AS \"__n8n_row\"',\n `FROM (SELECT * FROM ${schemaSql}.${tableSql} ${whereSql} ${orderSql} LIMIT ${maxRows}) src;`,\n ].join('\\n');\n outputs.push({ json: { tableKey: key, schema: table.schema, table: table.table, primaryKey, timestampColumn, textColumns, selectedColumns, namespace, lastSync, fetchSql } });\n}\nstaticData.postgresPineconeSync.currentRun.candidateTables = outputs.length;\nreturn outputs;"
},
"typeVersion": 2
},
{
"id": "f63663eb-db24-479e-ac10-7dce65f6f130",
"name": "Fetch Table Rows",
"type": "n8n-nodes-base.postgres",
"maxTries": 3,
"position": [
8912,
6848
],
"parameters": {
"query": "{{ $json.fetchSql }}",
"options": {},
"operation": "executeQuery"
},
"credentials": {
"postgres": {
"name": "<your credential>"
}
},
"retryOnFail": true,
"typeVersion": 2.6,
"waitBetweenTries": 5000
},
{
"id": "1555fc72-afae-4015-bc58-991e800e8e87",
"name": "Build Embedding Documents",
"type": "n8n-nodes-base.code",
"position": [
9184,
6848
],
"parameters": {
"jsCode": "const config = $('Workflow Configuration').first().json;\nconst maxChars = Math.max(1000, Number(config.maxDocumentCharacters || 12000));\nconst sourceName = String(config.sourceName || 'postgres');\nfunction parseJson(value, fallback) { if (Array.isArray(value)) return value; if (value && typeof value === 'object') return value; try { return JSON.parse(value); } catch (error) { return fallback; } }\nfunction titleCase(value) { return String(value).replace(/_/g, ' ').replace(/\\w\\S*/g, word => word.charAt(0).toUpperCase() + word.slice(1)); }\nfunction normalizeValue(value) { if (value === null || value === undefined) return ''; if (value instanceof Date) return value.toISOString(); if (typeof value === 'object') return JSON.stringify(value); return String(value); }\nconst documents = [];\nfor (const item of $input.all()) {\n const row = parseJson(item.json.__n8n_row, {});\n const schema = item.json.__n8n_schema;\n const table = item.json.__n8n_table;\n const primaryKey = item.json.__n8n_primary_key;\n const timestampColumn = item.json.__n8n_timestamp_column || null;\n const selectedColumns = parseJson(item.json.__n8n_selected_columns, []);\n const textColumns = parseJson(item.json.__n8n_text_columns, []);\n const namespace = item.json.__n8n_namespace;\n const tableKey = item.json.__n8n_table_key;\n const primaryKeyValue = primaryKey ? row[primaryKey] : undefined;\n if (primaryKey && (primaryKeyValue === undefined || primaryKeyValue === null || primaryKeyValue === '')) continue;\n const lines = [`Database: ${sourceName}`, `Schema: ${schema}`, `Table: ${table}`];\n for (const column of selectedColumns) {\n if (!Object.prototype.hasOwnProperty.call(row, column)) continue;\n const value = normalizeValue(row[column]);\n if (!value) continue;\n lines.push(`${titleCase(column)}: ${value}`);\n }\n let document = lines.join('\\n');\n if (document.length > maxChars) document = document.slice(0, maxChars) + '\\n[Document truncated to configured maximum length]';\n const updatedAt = timestampColumn && row[timestampColumn] ? normalizeValue(row[timestampColumn]) : null;\n const vectorId = `${sourceName}::${schema}::${table}::${primaryKeyValue}`;\n documents.push({ json: { id: vectorId, document, namespace, tableKey, timestampColumn, updatedAt, metadata: { source: sourceName, schema, table, primary_key: primaryKey, primary_key_value: normalizeValue(primaryKeyValue), updated_at: updatedAt, text_columns: textColumns.join(','), document_columns: selectedColumns.join(',') } } });\n}\nreturn documents;"
},
"typeVersion": 2
},
{
"id": "9c8a8742-300e-477b-b177-cc2f7e3087a7",
"name": "Prepare Gemini Embedding Batches",
"type": "n8n-nodes-base.code",
"position": [
9472,
6848
],
"parameters": {
"jsCode": "const config = $('Workflow Configuration').first().json;\nconst allDocs = $input.all().map(item => item.json).filter(doc => doc.document);\nconst smokeTestMode = config.geminiSmokeTestMode !== false;\nconst maxDocuments = smokeTestMode ? 1 : Math.max(1, Number(config.maxDocumentsPerRun || 10));\nconst docs = allDocs.slice(0, maxDocuments);\nconst batchSize = Math.max(1, Math.min(100, Number(config.geminiBatchSize || 1)));\nconst staticData = $getWorkflowStaticData('global');\nstaticData.postgresPineconeSync = staticData.postgresPineconeSync || {};\nstaticData.postgresPineconeSync.currentRun = staticData.postgresPineconeSync.currentRun || {};\nstaticData.postgresPineconeSync.currentRun.geminiSmokeTestMode = smokeTestMode;\nstaticData.postgresPineconeSync.currentRun.embeddingDocumentsPrepared = docs.length;\nstaticData.postgresPineconeSync.currentRun.embeddingDocumentsSkippedByRunLimit = Math.max(0, allDocs.length - docs.length);\nconst batches = [];\n\nfor (let i = 0; i < docs.length; i += batchSize) {\n const documents = docs.slice(i, i + batchSize);\n batches.push({\n json: {\n input: documents.map(doc => doc.document),\n documents,\n documentCount: documents.length,\n },\n });\n}\n\nreturn batches;"
},
"typeVersion": 2
},
{
"id": "b79df4e2-05f9-415c-b673-cbad457b0b00",
"name": "Generate Gemini Embeddings",
"type": "n8n-nodes-base.httpRequest",
"position": [
9744,
6848
],
"parameters": {
"url": "={{ \"https://generativelanguage.googleapis.com/v1beta/models/\" + $(\"Workflow Configuration\").first().json.geminiEmbeddingModel.replace(/^models\\//, \"\") + \":batchEmbedContents\" }}",
"method": "POST",
"options": {
"timeout": 45000,
"batching": {
"batch": {
"batchSize": 1,
"batchInterval": 5000
}
}
},
"jsonBody": "={{ { requests: $json.input.map(text => ({ model: \"models/\" + $(\"Workflow Configuration\").first().json.geminiEmbeddingModel.replace(/^models\\//, \"\"), content: { parts: [{ text }] }, taskType: \"RETRIEVAL_DOCUMENT\", outputDimensionality: $(\"Workflow Configuration\").first().json.geminiOutputDimensionality })) } }}",
"sendBody": true,
"specifyBody": "json",
"authentication": "genericCredentialType",
"genericAuthType": "httpHeaderAuth"
},
"credentials": {
"httpHeaderAuth": {
"name": "<your credential>"
}
},
"retryOnFail": false,
"typeVersion": 4.2
},
{
"id": "499ba30e-a8f7-4349-a435-67e75c0f8978",
"name": "Prepare Pinecone Upsert Batches",
"type": "n8n-nodes-base.code",
"position": [
10032,
6848
],
"parameters": {
"jsCode": "const config = $('Workflow Configuration').first().json;\nconst embeddingBatches = $items('Prepare Gemini Embedding Batches');\nconst responses = $input.all();\nconst batchSize = Math.max(1, Math.min(100, Number(config.pineconeBatchSize || 50)));\nconst groups = new Map();\n\nfunction isEmbedding(value) {\n return Array.isArray(value) && value.length > 0 && value.every(v => typeof v === 'number');\n}\n\nfor (let batchIndex = 0; batchIndex < responses.length; batchIndex++) {\n const response = responses[batchIndex].json;\n const sourceBatch = embeddingBatches[batchIndex]?.json;\n const documents = sourceBatch?.documents || [];\n const embeddings = Array.isArray(response?.embeddings) ? response.embeddings : [];\n\n if (documents.length && embeddings.length !== documents.length) {\n throw new Error(`Gemini returned ${embeddings.length} embeddings for ${documents.length} documents in batch ${batchIndex}. Stopping before Pinecone upsert.`);\n }\n if (!documents.length || !embeddings.length) continue;\n\n for (let docIndex = 0; docIndex < documents.length; docIndex++) {\n const doc = documents[docIndex];\n const embedding = embeddings[docIndex]?.values;\n if (!doc || !isEmbedding(embedding)) continue;\n\n const groupKey = `${doc.namespace}::${doc.tableKey}`;\n if (!groups.has(groupKey)) {\n groups.set(groupKey, {\n namespace: doc.namespace,\n tableKey: doc.tableKey,\n timestampColumn: doc.timestampColumn,\n pineconeIndexHost: config.pineconeIndexHost,\n vectors: [],\n maxTimestamp: null,\n });\n }\n\n const group = groups.get(groupKey);\n group.vectors.push({ id: doc.id, values: embedding, metadata: doc.metadata });\n\n if (doc.updatedAt) {\n const next = new Date(doc.updatedAt).toISOString();\n if (!group.maxTimestamp || next > group.maxTimestamp) group.maxTimestamp = next;\n }\n }\n}\n\nconst output = [];\nfor (const group of groups.values()) {\n for (let i = 0; i < group.vectors.length; i += batchSize) {\n const vectors = group.vectors.slice(i, i + batchSize);\n output.push({\n json: {\n namespace: group.namespace,\n tableKey: group.tableKey,\n timestampColumn: group.timestampColumn,\n maxTimestamp: group.maxTimestamp,\n pineconeIndexHost: group.pineconeIndexHost,\n vectorCount: vectors.length,\n vectors,\n },\n });\n }\n}\n\nreturn output;"
},
"typeVersion": 2
},
{
"id": "5628f430-3256-4524-b143-d5e8db485caf",
"name": "Upsert Vectors to Pinecone",
"type": "n8n-nodes-base.httpRequest",
"position": [
10304,
6848
],
"parameters": {
"url": "={{ \"https://\" + $json.pineconeIndexHost + \"/vectors/upsert\" }}",
"method": "POST",
"options": {},
"jsonBody": "={{ { namespace: $json.namespace, vectors: $json.vectors } }}",
"sendBody": true,
"specifyBody": "json",
"authentication": "genericCredentialType",
"genericAuthType": "httpHeaderAuth"
},
"credentials": {
"httpHeaderAuth": {
"name": "<your credential>"
}
},
"typeVersion": 4.2
},
{
"id": "fea565c7-d4ba-4ba1-b331-ff1d66f64946",
"name": "Return Sync Summary",
"type": "n8n-nodes-base.code",
"position": [
10592,
6848
],
"parameters": {
"jsCode": "const staticData = $getWorkflowStaticData('global');\nstaticData.postgresPineconeSync = staticData.postgresPineconeSync || {};\nstaticData.postgresPineconeSync.tableState = staticData.postgresPineconeSync.tableState || {};\nconst batches = $items('Prepare Pinecone Upsert Batches');\nconst responses = $input.all();\nconst currentRun = staticData.postgresPineconeSync.currentRun || {};\nconst summary = { startedAt: currentRun.startedAt, finishedAt: new Date().toISOString(), discoveredColumns: currentRun.discoveredColumns || 0, discoveredTables: currentRun.discoveredTables || 0, candidateTables: currentRun.candidateTables || 0, skippedTables: currentRun.skippedTables || [], geminiSmokeTestMode: currentRun.geminiSmokeTestMode || false, embeddingDocumentsPrepared: currentRun.embeddingDocumentsPrepared || 0, embeddingDocumentsSkippedByRunLimit: currentRun.embeddingDocumentsSkippedByRunLimit || 0, successfulBatches: 0, failedBatches: 0, vectorsUpserted: 0, updatedSyncState: {}, failedBatchDetails: [] };\nfunction responseFailed(json) { if (!json) return true; if (json.error) return true; if (json.message && String(json.message).toLowerCase().includes('error')) return true; if (json.statusCode && Number(json.statusCode) >= 400) return true; return false; }\nfor (let index = 0; index < responses.length; index++) {\n const response = responses[index].json;\n const batch = batches[index]?.json;\n if (!batch) continue;\n if (responseFailed(response)) {\n summary.failedBatches += 1;\n summary.failedBatchDetails.push({ table: batch.tableKey, namespace: batch.namespace, error: response });\n continue;\n }\n summary.successfulBatches += 1;\n summary.vectorsUpserted += batch.vectorCount || batch.vectors?.length || 0;\n staticData.postgresPineconeSync.tableState[batch.tableKey] = staticData.postgresPineconeSync.tableState[batch.tableKey] || {};\n if (batch.timestampColumn && batch.maxTimestamp) {\n const previous = staticData.postgresPineconeSync.tableState[batch.tableKey].lastSync;\n if (!previous || batch.maxTimestamp > previous) staticData.postgresPineconeSync.tableState[batch.tableKey].lastSync = batch.maxTimestamp;\n } else {\n staticData.postgresPineconeSync.tableState[batch.tableKey].lastFullSync = new Date().toISOString();\n }\n staticData.postgresPineconeSync.tableState[batch.tableKey].lastNamespace = batch.namespace;\n staticData.postgresPineconeSync.tableState[batch.tableKey].lastSuccessfulRun = summary.finishedAt;\n summary.updatedSyncState[batch.tableKey] = staticData.postgresPineconeSync.tableState[batch.tableKey];\n}\nstaticData.postgresPineconeSync.lastSummary = summary;\nreturn [{ json: summary }];"
},
"typeVersion": 2
},
{
"id": "6626293a-bddb-4a93-85aa-ef80ee13b6e5",
"name": "Sticky Note",
"type": "n8n-nodes-base.stickyNote",
"position": [
7280,
6512
],
"parameters": {
"width": 368,
"height": 720,
"content": "## PostgreSQL Auto-Discovery to Pinecone\n\nThis workflow turns existing PostgreSQL data into a Pinecone vector knowledge base for semantic search and RAG. It discovers schemas, tables, primary keys, timestamp columns, and useful text or JSON fields automatically, then converts selected rows into readable embedding documents, creates Gemini embeddings, and upserts vectors with source metadata.\n\nTo set it up, add PostgreSQL, Gemini, and Pinecone credentials, then open **Workflow Configuration** and set your allowed schemas, table filters, Pinecone index host, row limits, and namespace prefix. Run manually first with `geminiSmokeTestMode = true`, `maxRowsPerTable = 1`, and `maxDocumentsPerRun = 1`. After a successful test, turn off smoke test mode and increase limits gradually.\n\nCustomize the workflow by changing allowed/excluded schemas, excluded columns, document size, sync behavior for tables without timestamps, and Pinecone namespace naming."
},
"typeVersion": 1
},
{
"id": "1838bbe5-530b-4462-8d02-70fdda5f530e",
"name": "Sticky Note1",
"type": "n8n-nodes-base.stickyNote",
"position": [
7744,
6560
],
"parameters": {
"color": "#5D6569",
"width": 512,
"height": 608,
"content": "## Trigger and configuration\n\nStart manually while testing, then enable the schedule when the configuration is proven. The configuration node controls schema filters, row limits, batching, Gemini settings, and Pinecone namespace behavior."
},
"typeVersion": 1
},
{
"id": "33b46153-f483-455f-bad2-34f2a1b85775",
"name": "Sticky Note2",
"type": "n8n-nodes-base.stickyNote",
"position": [
8304,
6560
],
"parameters": {
"color": "#5D6569",
"width": 496,
"height": 608,
"content": "## Database discovery\n\nThese nodes inspect PostgreSQL metadata and choose indexable tables dynamically. They avoid hard-coded table or column names and skip tables that do not match the configured rules."
},
"typeVersion": 1
},
{
"id": "3d6aff7f-7964-4ac6-894b-0fe2025120ce",
"name": "Sticky Note3",
"type": "n8n-nodes-base.stickyNote",
"position": [
8864,
6560
],
"parameters": {
"color": "#5D6569",
"width": 752,
"height": 608,
"content": "## Document preparation\n\nRows are converted into readable text documents that preserve business context for embeddings. The batching node limits how many documents are sent to Gemini during tests and larger sync runs."
},
"typeVersion": 1
},
{
"id": "431fbe85-481a-4715-912d-5d4dd6af54b1",
"name": "Sticky Note4",
"type": "n8n-nodes-base.stickyNote",
"position": [
9664,
6560
],
"parameters": {
"color": "#5D6569",
"width": 256,
"height": 608,
"content": "## Gemini embeddings\n\nThis section sends document batches to Gemini using `RETRIEVAL_DOCUMENT`. Keep smoke test mode on until one small embedding request succeeds, then scale document limits carefully."
},
"typeVersion": 1
},
{
"id": "d203133c-5a07-44a6-b87a-fb4ba96e7d9e",
"name": "Sticky Note5",
"type": "n8n-nodes-base.stickyNote",
"position": [
9952,
6560
],
"parameters": {
"color": "#5D6569",
"width": 528,
"height": 608,
"content": "## Pinecone upsert\n\nEmbeddings are matched back to their original row metadata, grouped into Pinecone batches, and upserted with stable vector IDs. Metadata keeps each result traceable to schema, table, and primary key."
},
"typeVersion": 1
}
],
"active": false,
"settings": {
"binaryMode": "separate",
"executionOrder": "v1"
},
"versionId": "2a55ca7f-b725-445d-a70f-9946cff4c369",
"connections": {
"Manual Trigger": {
"main": [
[
{
"node": "Workflow Configuration",
"type": "main",
"index": 0
}
]
]
},
"Fetch Table Rows": {
"main": [
[
{
"node": "Build Embedding Documents",
"type": "main",
"index": 0
}
]
]
},
"\u2753 Qty Cancelled?": {
"main": [
[],
[]
]
},
"\u2753 Addr Cancelled?": {
"main": [
[],
[
{
"node": "\ud83d\udcbe Save Payment Link",
"type": "main",
"index": 0
}
]
]
},
"\u2753 Email Cancelled?": {
"main": [
[],
[]
]
},
"\u2753 Search Cancelled?": {
"main": [
[],
[]
]
},
"Workflow Configuration": {
"main": [
[
{
"node": "Discover PostgreSQL Metadata",
"type": "main",
"index": 0
}
]
]
},
"\ud83d\udcbe Save Payment Link": {
"main": [
[]
]
},
"Cron Trigger (Optional)": {
"main": [
[
{
"node": "Workflow Configuration",
"type": "main",
"index": 0
}
]
]
},
"Filter Indexable Tables": {
"main": [
[
{
"node": "Fetch Table Rows",
"type": "main",
"index": 0
}
]
]
},
"Build Embedding Documents": {
"main": [
[
{
"node": "Prepare Gemini Embedding Batches",
"type": "main",
"index": 0
}
]
]
},
"Generate Gemini Embeddings": {
"main": [
[
{
"node": "Prepare Pinecone Upsert Batches",
"type": "main",
"index": 0
}
]
]
},
"Upsert Vectors to Pinecone": {
"main": [
[
{
"node": "Return Sync Summary",
"type": "main",
"index": 0
}
]
]
},
"Discover PostgreSQL Metadata": {
"main": [
[
{
"node": "Filter Indexable Tables",
"type": "main",
"index": 0
}
]
]
},
"Prepare Pinecone Upsert Batches": {
"main": [
[
{
"node": "Upsert Vectors to Pinecone",
"type": "main",
"index": 0
}
]
]
},
"Prepare Gemini Embedding Batches": {
"main": [
[
{
"node": "Generate Gemini Embeddings",
"type": "main",
"index": 0
}
]
]
}
}
}
Credentials you'll need
Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.
httpHeaderAuthpostgres
For the full experience including quality scoring and batch install features for each workflow upgrade to Pro
About this workflow
Turn existing PostgreSQL databases into AI-searchable Pinecone vector knowledge bases without manually defining every table and column.
Source: https://n8n.io/workflows/15833/ — original creator credit. Request a take-down →
Related workflows
Workflows that share integrations, category, or trigger type with this one. All free to copy and import.
Document Ingestion Pipeline. Uses executeCommand, readBinaryFile, readPDF, httpRequest. Event-driven trigger; 12 nodes.
This simple philosophy changes the way we think about automated sales agents. Context changes everything. In this 4-part workflow, we start by creating a knowledge base that will act as context across
RAG AI Agent Template V5. Uses lmChatOpenAi, documentDefaultDataLoader, embeddingsOpenAi, googleDrive. Event-driven trigger; 56 nodes.
My workflow 2529. Uses lmChatOpenAi, documentDefaultDataLoader, embeddingsOpenAi, googleDrive. Event-driven trigger; 54 nodes.
05. Base_To_Copy. Uses lmChatOpenAi, documentDefaultDataLoader, embeddingsOpenAi, googleDrive. Event-driven trigger; 54 nodes.