AutomationFlowsAI & RAG › Replay Postgres DLQ Messages Workflow

Replay Postgres DLQ Messages Workflow

Original n8n title: W8 - Dlq Replay (manual Trigger)

W8 - DLQ Replay (Manual Trigger). Uses postgres. Webhook trigger; 13 nodes.

Webhook trigger★★★★☆ complexity13 nodesPostgres
AI & RAG Trigger: Webhook Nodes: 13 Complexity: ★★★★☆ Added:

The workflow JSON

Copy or download the full n8n JSON below. Paste it into a new n8n workflow, add your credentials, activate. Full import guide →

Download .json
{
  "name": "W8 - DLQ Replay (Manual Trigger)",
  "active": false,
  "settings": {
    "executionTimeout": 600,
    "saveExecutionProgress": true,
    "saveManualExecutions": true
  },
  "nodes": [
    {
      "parameters": {
        "httpMethod": "POST",
        "path": "v1/admin/dlq/replay",
        "responseMode": "responseNode",
        "options": {}
      },
      "id": "webhook-trigger",
      "name": "Replay Trigger",
      "type": "n8n-nodes-base.webhook",
      "typeVersion": 1,
      "position": [
        0,
        0
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "// Parse replay request\nconst body = $json.body || $json;\nconst headers = $json.headers || {};\n\n// Auth check - require admin scope\nconst token = (headers['x-api-token'] || headers['X-Api-Token'] || '').toString().trim();\nconst tokenHash = token ? require('crypto').createHash('sha256').update(token).digest('hex') : '';\n\n// Replay options\nconst options = {\n  msgIds: Array.isArray(body.msg_ids) ? body.msg_ids : [],\n  channel: (body.channel || '').toString(),\n  maxMessages: Math.min(Number(body.max_messages) || 10, 100),\n  dryRun: body.dry_run === true,\n  replayAll: body.replay_all === true\n};\n\nreturn [{\n  json: {\n    tokenHash,\n    options,\n    requestedAt: new Date().toISOString()\n  }\n}];"
      },
      "id": "parse-request",
      "name": "Parse Request",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        250,
        0
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "SELECT client_id, scopes FROM api_clients WHERE is_active = true AND token_hash = $1 LIMIT 1;",
        "additionalFields": {
          "queryParams": "={{[$json.tokenHash]}}"
        }
      },
      "id": "check-admin",
      "name": "Check Admin Scope",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        500,
        0
      ],
      "credentials": {
        "postgres": {
          "name": "<your credential>"
        }
      }
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "// Verify admin scope\nconst result = $json;\nconst input = $('Parse Request').first().json;\n\nlet isAdmin = false;\nlet scopes = [];\n\nif (result.client_id) {\n  try {\n    scopes = Array.isArray(result.scopes) ? result.scopes : JSON.parse(result.scopes || '[]');\n  } catch { scopes = []; }\n  \n  isAdmin = scopes.includes('admin:*') || scopes.includes('dlq:replay') || scopes.includes('*');\n}\n\nreturn [{\n  json: {\n    ...input,\n    isAdmin,\n    scopes,\n    clientId: result.client_id || null\n  }\n}];"
      },
      "id": "verify-admin",
      "name": "Verify Admin",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        750,
        0
      ]
    },
    {
      "parameters": {
        "conditions": {
          "boolean": [
            {
              "value1": "={{$json.isAdmin}}",
              "operation": "isTrue"
            }
          ]
        }
      },
      "id": "is-admin-check",
      "name": "Is Admin?",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [
        1000,
        0
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "SELECT \n  id,\n  conversation_key,\n  msg_id,\n  channel,\n  payload_json,\n  error_message,\n  retry_count\nFROM outbound_messages\nWHERE status = 'DLQ'\n  AND ($1::text[] IS NULL OR msg_id = ANY($1::text[]))\n  AND ($2::text = '' OR channel = $2::text)\nORDER BY created_at ASC\nLIMIT $3;",
        "additionalFields": {
          "queryParams": "={{[$json.options.msgIds.length > 0 ? $json.options.msgIds : null, $json.options.channel, $json.options.maxMessages]}}"
        }
      },
      "id": "fetch-dlq-for-replay",
      "name": "Fetch DLQ Messages",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        1250,
        -100
      ],
      "credentials": {
        "postgres": {
          "name": "<your credential>"
        }
      }
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const messages = $input.all();\nconst input = $('Verify Admin').first().json;\nconst isDryRun = input.options?.dryRun === true;\n\nif (messages.length === 0) {\n  return [{\n    json: {\n      success: true,\n      replayed: 0,\n      message: 'No DLQ messages found matching criteria',\n      dryRun: isDryRun\n    }\n  }];\n}\n\n// Prepare messages for replay\nconst toReplay = messages.map(m => ({\n  id: m.json.id,\n  msg_id: m.json.msg_id,\n  channel: m.json.channel,\n  payload: m.json.payload_json,\n  previousError: m.json.error_message,\n  retryCount: m.json.retry_count\n}));\n\nreturn [{\n  json: {\n    toReplay,\n    count: toReplay.length,\n    dryRun: isDryRun,\n    options: input.options\n  }\n}];"
      },
      "id": "prepare-replay",
      "name": "Prepare Replay",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1500,
        -100
      ]
    },
    {
      "parameters": {
        "conditions": {
          "boolean": [
            {
              "value1": "={{$json.dryRun}}",
              "operation": "isFalse"
            }
          ]
        }
      },
      "id": "not-dry-run",
      "name": "Not Dry Run?",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [
        1750,
        -100
      ]
    },
    {
      "parameters": {
        "operation": "executeQuery",
        "query": "UPDATE outbound_messages \nSET \n  status = 'RETRY',\n  retry_count = retry_count + 1,\n  last_retry_at = NOW(),\n  next_retry_at = NOW()\nWHERE status = 'DLQ'\n  AND id = ANY($1::int[])\nRETURNING id, msg_id;",
        "additionalFields": {
          "queryParams": "={{[$json.toReplay.map(m => m.id)]}}"
        }
      },
      "id": "update-status",
      "name": "Set to RETRY",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [
        2000,
        -200
      ],
      "credentials": {
        "postgres": {
          "name": "<your credential>"
        }
      }
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const updated = $input.all();\nconst input = $('Prepare Replay').first().json;\n\nreturn [{\n  json: {\n    success: true,\n    replayed: updated.length,\n    dryRun: false,\n    messages: updated.map(m => m.json),\n    note: 'Messages moved to RETRY status. Outbound workers will process them.'\n  }\n}];"
      },
      "id": "replay-result",
      "name": "Replay Result",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        2250,
        -200
      ]
    },
    {
      "parameters": {
        "language": "javascript",
        "jsCode": "const input = $json;\n\nreturn [{\n  json: {\n    success: true,\n    dryRun: true,\n    wouldReplay: input.count,\n    messages: input.toReplay,\n    note: 'Dry run - no changes made. Remove dry_run:true to execute.'\n  }\n}];"
      },
      "id": "dry-run-result",
      "name": "Dry Run Result",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        2000,
        0
      ]
    },
    {
      "parameters": {
        "responseCode": 403,
        "responseBody": "={{JSON.stringify({error: 'forbidden', message: 'Admin scope required for DLQ replay'})}}",
        "options": {
          "responseHeaders": {
            "entries": [
              {
                "name": "Content-Type",
                "value": "application/json"
              }
            ]
          }
        }
      },
      "id": "forbidden-response",
      "name": "403 Forbidden",
      "type": "n8n-nodes-base.respondToWebhook",
      "typeVersion": 1,
      "position": [
        1250,
        100
      ]
    },
    {
      "parameters": {
        "responseCode": 200,
        "responseBody": "={{JSON.stringify($json)}}",
        "options": {
          "responseHeaders": {
            "entries": [
              {
                "name": "Content-Type",
                "value": "application/json"
              }
            ]
          }
        }
      },
      "id": "success-response",
      "name": "200 OK",
      "type": "n8n-nodes-base.respondToWebhook",
      "typeVersion": 1,
      "position": [
        2500,
        -100
      ]
    }
  ],
  "connections": {
    "Replay Trigger": {
      "main": [
        [
          {
            "node": "Parse Request",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Parse Request": {
      "main": [
        [
          {
            "node": "Check Admin Scope",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Check Admin Scope": {
      "main": [
        [
          {
            "node": "Verify Admin",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Verify Admin": {
      "main": [
        [
          {
            "node": "Is Admin?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Is Admin?": {
      "main": [
        [
          {
            "node": "Fetch DLQ Messages",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "403 Forbidden",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Fetch DLQ Messages": {
      "main": [
        [
          {
            "node": "Prepare Replay",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Prepare Replay": {
      "main": [
        [
          {
            "node": "Not Dry Run?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Not Dry Run?": {
      "main": [
        [
          {
            "node": "Set to RETRY",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Dry Run Result",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Set to RETRY": {
      "main": [
        [
          {
            "node": "Replay Result",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Replay Result": {
      "main": [
        [
          {
            "node": "200 OK",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Dry Run Result": {
      "main": [
        [
          {
            "node": "200 OK",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  }
}

Credentials you'll need

Each integration node will prompt for credentials when you import. We strip credential IDs before publishing — you'll add your own.

Pro

For the full experience including quality scoring and batch install features for each workflow upgrade to Pro

About this workflow

W8 - DLQ Replay (Manual Trigger). Uses postgres. Webhook trigger; 13 nodes.

Source: https://github.com/zerAda/RestaurantAgentAutomation/blob/41a4d42dcd66e57b1e87b4750c0fd5fbf7058f68/workflows/W8_DLQ_REPLAY.json — original creator credit. Request a take-down →

More AI & RAG workflows → · Browse all categories →

Related workflows

Workflows that share integrations, category, or trigger type with this one. All free to copy and import.

AI & RAG

Jigsaw API key for image processing, I use this as a gatekeeper/second pair of eyes. LINK to their website https://jigsawstack.com/ SECOND A postgress DATABASE (I use Supabase) LlamaCloud for the pars

HTTP Request, Postgres, Stop And Error +2
AI & RAG

Content Review Loop Workflow. Uses postgres, httpRequest. Webhook trigger; 20 nodes.

Postgres, HTTP Request
AI & RAG

Creates an AI-powered sales and support agent connected to live store data from Shopify/WooCommerce. MCP ensures controlled access to inventory and order systems. Automatically handles customer querie

HTTP Request, Postgres, Email Send
AI & RAG

HVAC Event Generator v1.0 - INSERT OPERATIONS. Uses postgres, crypto. Webhook trigger; 16 nodes.

Postgres, Crypto
AI & RAG

Image Generation Workflow. Uses postgres, httpRequest, editImage. Webhook trigger; 15 nodes.

Postgres, HTTP Request, Edit Image