Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/sim/app/api/webhooks/cleanup/idempotency/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { cleanupExpiredIdempotencyKeys, getIdempotencyKeyStats } from '@/lib/idempotency/cleanup'
import { cleanupExpiredIdempotencyKeys, getIdempotencyKeyStats } from '@/lib/idempotency'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'

Expand Down
47 changes: 24 additions & 23 deletions apps/sim/app/api/webhooks/trigger/[path]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,39 +375,40 @@ export async function POST(

const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
foundWebhook.id,
body,
Object.fromEntries(request.headers.entries())
)

const result = await webhookIdempotency.executeWithIdempotency(
foundWebhook.provider,
idempotencyKey,
async () => {
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
const runOperation = async () => {
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)

if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
return {
method: 'trigger.dev',
taskId: handle.id,
status: 'queued',
}
}
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
return {
method: 'direct',
method: 'trigger.dev',
taskId: handle.id,
status: 'queued',
}
}
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
return {
method: 'direct',
status: 'queued',
}
}

const result = await webhookIdempotency.executeWithIdempotency(
foundWebhook.provider,
idempotencyKey,
runOperation
)

logger.debug(`[${requestId}] Webhook execution result:`, result)
Expand Down
9 changes: 5 additions & 4 deletions apps/sim/background/webhook-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {

const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
payload.webhookId,
payload.body,
payload.headers
)

const runOperation = async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}

return await webhookIdempotency.executeWithIdempotency(
payload.provider,
idempotencyKey,
async () => {
return await executeWebhookJobInternal(payload, executionId, requestId)
}
runOperation
)
}

Expand Down
13 changes: 1 addition & 12 deletions apps/sim/blocks/blocks/generic_webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,7 @@ export const GenericWebhookBlock: BlockConfig = {

inputs: {}, // No inputs - webhook triggers receive data externally

outputs: {
// Generic webhook outputs that can be used with any webhook payload
payload: { type: 'json', description: 'Complete webhook payload' },
headers: { type: 'json', description: 'Request headers' },
method: { type: 'string', description: 'HTTP method' },
url: { type: 'string', description: 'Request URL' },
timestamp: { type: 'string', description: 'Webhook received timestamp' },
// Common webhook fields that services often use
event: { type: 'string', description: 'Event type from payload' },
id: { type: 'string', description: 'Event ID from payload' },
data: { type: 'json', description: 'Event data from payload' },
},
outputs: {},

triggers: {
enabled: true,
Expand Down
1 change: 0 additions & 1 deletion apps/sim/lib/idempotency/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ export * from './cleanup'
export * from './service'
export {
pollingIdempotency,
triggerIdempotency,
webhookIdempotency,
} from './service'
117 changes: 14 additions & 103 deletions apps/sim/lib/idempotency/service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as crypto from 'crypto'
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { idempotencyKey } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
Expand Down Expand Up @@ -451,110 +451,26 @@ export class IdempotencyService {

/**
* Create an idempotency key from a webhook payload following RFC best practices
* Priority order:
* 1. Standard webhook headers (webhook-id, x-webhook-id, etc.)
* 2. Event/message IDs from payload
* 3. Deterministic hash of stable payload fields (excluding timestamps)
* Standard webhook headers (webhook-id, x-webhook-id, etc.)
*/
static createWebhookIdempotencyKey(
webhookId: string,
payload: any,
headers?: Record<string, string>
): string {
// 1. Check for standard webhook headers (RFC compliant)
static createWebhookIdempotencyKey(webhookId: string, headers?: Record<string, string>): string {
const normalizedHeaders = headers
? Object.fromEntries(Object.entries(headers).map(([k, v]) => [k.toLowerCase(), v]))
: undefined

const webhookIdHeader =
headers?.['webhook-id'] || // Standard Webhooks spec
headers?.['x-webhook-id'] || // Legacy standard
headers?.['x-shopify-webhook-id'] ||
headers?.['x-github-delivery'] ||
headers?.['x-event-id'] // Generic event ID header
normalizedHeaders?.['webhook-id'] ||
normalizedHeaders?.['x-webhook-id'] ||
normalizedHeaders?.['x-shopify-webhook-id'] ||
normalizedHeaders?.['x-github-delivery'] ||
normalizedHeaders?.['x-event-id']

if (webhookIdHeader) {
return `${webhookId}:${webhookIdHeader}`
}

// 2. Extract event/message IDs from payload (most reliable)
const payloadId =
payload?.id ||
payload?.event_id ||
payload?.eventId ||
payload?.message?.id ||
payload?.data?.id ||
payload?.object?.id ||
payload?.event?.id

if (payloadId) {
return `${webhookId}:${payloadId}`
}

// 3. Create deterministic hash from stable payload fields (excluding timestamps)
const stablePayload = IdempotencyService.createStablePayloadForHashing(payload)
const payloadHash = crypto
.createHash('sha256')
.update(JSON.stringify(stablePayload))
.digest('hex')
.substring(0, 16)

return `${webhookId}:${payloadHash}`
}

/**
* Create a stable representation of the payload for hashing by removing
* timestamp and other volatile fields that change between requests
*/
private static createStablePayloadForHashing(payload: any): any {
if (!payload || typeof payload !== 'object') {
return payload
}

const volatileFields = [
'timestamp',
'created_at',
'updated_at',
'sent_at',
'received_at',
'processed_at',
'delivered_at',
'attempt',
'retry_count',
'request_id',
'trace_id',
'span_id',
'delivery_id',
'webhook_timestamp',
]

const cleanPayload = { ...payload }

const removeVolatileFields = (obj: any): any => {
if (!obj || typeof obj !== 'object') return obj

if (Array.isArray(obj)) {
return obj.map(removeVolatileFields)
}

const cleaned: any = {}
for (const [key, value] of Object.entries(obj)) {
const lowerKey = key.toLowerCase()

if (volatileFields.some((field) => lowerKey.includes(field))) {
continue
}

if (typeof value === 'string' && /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/.test(value)) {
continue
}
if (typeof value === 'number' && value > 1000000000 && value < 9999999999) {
continue
}

cleaned[key] = removeVolatileFields(value)
}

return cleaned
}

return removeVolatileFields(cleanPayload)
const uniqueId = randomUUID()
return `${webhookId}:${uniqueId}`
}
}

Expand All @@ -567,8 +483,3 @@ export const pollingIdempotency = new IdempotencyService({
namespace: 'polling',
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
})

export const triggerIdempotency = new IdempotencyService({
namespace: 'trigger',
ttlSeconds: 60 * 60 * 24 * 1, // 1 day
})
4 changes: 4 additions & 0 deletions apps/sim/lib/webhooks/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,10 @@ export function formatWebhookInput(
}
}

if (foundWebhook.provider === 'generic') {
return body
}

if (foundWebhook.provider === 'google_forms') {
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}

Expand Down
40 changes: 1 addition & 39 deletions apps/sim/triggers/generic/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,45 +34,7 @@ export const genericWebhookTrigger: TriggerConfig = {
},
},

outputs: {
payload: {
type: 'json',
description: 'Complete webhook payload received',
},
headers: {
type: 'json',
description: 'HTTP request headers',
},
method: {
type: 'string',
description: 'HTTP method (GET, POST, PUT, etc.)',
},
url: {
type: 'string',
description: 'Request URL path',
},
query: {
type: 'json',
description: 'URL query parameters',
},
timestamp: {
type: 'string',
description: 'Webhook received timestamp',
},
// Common fields that many services use
event: {
type: 'string',
description: 'Event type (extracted from payload.event, payload.type, or payload.event_type)',
},
id: {
type: 'string',
description: 'Event ID (extracted from payload.id, payload.event_id, or payload.uuid)',
},
data: {
type: 'json',
description: 'Event data (extracted from payload.data or the full payload)',
},
},
outputs: {},

instructions: [
'Copy the webhook URL provided above and use it in your external service or API.',
Expand Down