From eacd0f0801d03fb87bfdb337b0767af93e99b7a3 Mon Sep 17 00:00:00 2001 From: waleedlatif1 Date: Wed, 17 Sep 2025 17:04:13 -0700 Subject: [PATCH 1/2] improvement(idempotency): added atomic claims to prevent duplicate processing for long-running workflows --- apps/sim/lib/idempotency/service.ts | 475 +++++++++++++++++++--------- 1 file changed, 321 insertions(+), 154 deletions(-) diff --git a/apps/sim/lib/idempotency/service.ts b/apps/sim/lib/idempotency/service.ts index 7098990840..ac9b82d5ec 100644 --- a/apps/sim/lib/idempotency/service.ts +++ b/apps/sim/lib/idempotency/service.ts @@ -19,12 +19,6 @@ export interface IdempotencyConfig { * Default: 'default' */ namespace?: string - - /** - * Enable database fallback when Redis is not available - * Default: true - */ - enableDatabaseFallback?: boolean } export interface IdempotencyResult { @@ -44,29 +38,30 @@ export interface IdempotencyResult { previousResult?: any /** - * Storage method used ('redis', 'database', 'memory') + * Storage method used ('redis', 'database') */ - storageMethod: 'redis' | 'database' | 'memory' + storageMethod: 'redis' | 'database' } export interface ProcessingResult { success: boolean result?: any error?: string + status?: 'in-progress' | 'completed' | 'failed' + startedAt?: number +} + +export interface AtomicClaimResult { + claimed: boolean + existingResult?: ProcessingResult + normalizedKey: string + storageMethod: 'redis' | 'database' } const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days const REDIS_KEY_PREFIX = 'idempotency:' -const MEMORY_CACHE_SIZE = 1000 - -const memoryCache = new Map< - string, - { - result: any - timestamp: number - ttl: number - } ->() +const MAX_WAIT_TIME_MS = 300000 // 5 minutes max wait for in-progress operations +const POLL_INTERVAL_MS = 1000 // Check every 1 second for completion /** * Universal idempotency service for webhooks, triggers, and any other operations @@ -79,7 +74,6 @@ export class IdempotencyService { this.config = { ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL, namespace: config.namespace ?? 'default', - enableDatabaseFallback: config.enableDatabaseFallback ?? true, } } @@ -139,70 +133,202 @@ export class IdempotencyService { logger.warn(`Redis idempotency check failed for ${normalizedKey}:`, error) } - if (this.config.enableDatabaseFallback) { - try { - const existing = await db - .select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt }) - .from(idempotencyKey) - .where( - and( - eq(idempotencyKey.key, normalizedKey), - eq(idempotencyKey.namespace, this.config.namespace) - ) + // Always fallback to database when Redis is not available + try { + const existing = await db + .select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt }) + .from(idempotencyKey) + .where( + and( + eq(idempotencyKey.key, normalizedKey), + eq(idempotencyKey.namespace, this.config.namespace) ) - .limit(1) - - if (existing.length > 0) { - const item = existing[0] - const isExpired = Date.now() - item.createdAt.getTime() > this.config.ttlSeconds * 1000 - - if (!isExpired) { - logger.debug(`Idempotency hit in database: ${normalizedKey}`) - return { - isFirstTime: false, - normalizedKey, - previousResult: item.result, - storageMethod: 'database', - } + ) + .limit(1) + + if (existing.length > 0) { + const item = existing[0] + const isExpired = Date.now() - item.createdAt.getTime() > this.config.ttlSeconds * 1000 + + if (!isExpired) { + logger.debug(`Idempotency hit in database: ${normalizedKey}`) + return { + isFirstTime: false, + normalizedKey, + previousResult: item.result, + storageMethod: 'database', } - await db - .delete(idempotencyKey) - .where(eq(idempotencyKey.key, normalizedKey)) - .catch((err) => logger.warn(`Failed to clean up expired key ${normalizedKey}:`, err)) } + await db + .delete(idempotencyKey) + .where(eq(idempotencyKey.key, normalizedKey)) + .catch((err) => logger.warn(`Failed to clean up expired key ${normalizedKey}:`, err)) + } + + logger.debug(`Idempotency miss in database: ${normalizedKey}`) + return { + isFirstTime: true, + normalizedKey, + storageMethod: 'database', + } + } catch (error) { + logger.error(`Database idempotency check failed for ${normalizedKey}:`, error) + throw new Error(`Failed to check idempotency: database unavailable`) + } + } - logger.debug(`Idempotency miss in database: ${normalizedKey}`) + /** + * Atomically claim an idempotency key for processing + * Returns true if successfully claimed, false if already exists + */ + async atomicallyClaim( + provider: string, + identifier: string, + additionalContext?: Record + ): Promise { + const normalizedKey = this.normalizeKey(provider, identifier, additionalContext) + const redisKey = `${REDIS_KEY_PREFIX}${normalizedKey}` + const inProgressResult: ProcessingResult = { + success: false, + status: 'in-progress', + startedAt: Date.now(), + } + + try { + const redis = getRedisClient() + if (redis) { + const claimed = await redis.set( + redisKey, + JSON.stringify(inProgressResult), + 'EX', + this.config.ttlSeconds, + 'NX' + ) + + if (claimed === 'OK') { + logger.debug(`Atomically claimed idempotency key in Redis: ${normalizedKey}`) + return { + claimed: true, + normalizedKey, + storageMethod: 'redis', + } + } + const existingData = await redis.get(redisKey) + const existingResult = existingData ? JSON.parse(existingData) : null + logger.debug(`Idempotency key already claimed in Redis: ${normalizedKey}`) return { - isFirstTime: true, + claimed: false, + existingResult, normalizedKey, - storageMethod: 'database', + storageMethod: 'redis', } - } catch (error) { - logger.warn(`Database idempotency check failed for ${normalizedKey}:`, error) } + } catch (error) { + logger.warn(`Redis atomic claim failed for ${normalizedKey}:`, error) } - const memoryEntry = memoryCache.get(normalizedKey) - if (memoryEntry) { - const isExpired = Date.now() - memoryEntry.timestamp > memoryEntry.ttl * 1000 - if (!isExpired) { - logger.debug(`Idempotency hit in memory: ${normalizedKey}`) + // Always fallback to database when Redis is not available + try { + const insertResult = await db + .insert(idempotencyKey) + .values({ + key: normalizedKey, + namespace: this.config.namespace, + result: inProgressResult, + createdAt: new Date(), + }) + .onConflictDoNothing() + .returning({ key: idempotencyKey.key }) + + if (insertResult.length > 0) { + logger.debug(`Atomically claimed idempotency key in database: ${normalizedKey}`) return { - isFirstTime: false, + claimed: true, normalizedKey, - previousResult: memoryEntry.result, - storageMethod: 'memory', + storageMethod: 'database', } } - memoryCache.delete(normalizedKey) + const existing = await db + .select({ result: idempotencyKey.result }) + .from(idempotencyKey) + .where( + and( + eq(idempotencyKey.key, normalizedKey), + eq(idempotencyKey.namespace, this.config.namespace) + ) + ) + .limit(1) + + const existingResult = + existing.length > 0 ? (existing[0].result as ProcessingResult) : undefined + logger.debug(`Idempotency key already claimed in database: ${normalizedKey}`) + return { + claimed: false, + existingResult, + normalizedKey, + storageMethod: 'database', + } + } catch (error) { + logger.error(`Database atomic claim failed for ${normalizedKey}:`, error) + throw new Error(`Failed to claim idempotency key: database unavailable`) } + } + + /** + * Wait for an in-progress operation to complete and return its result + */ + async waitForResult(normalizedKey: string, storageMethod: 'redis' | 'database'): Promise { + const startTime = Date.now() + const redisKey = `${REDIS_KEY_PREFIX}${normalizedKey}` - logger.debug(`Idempotency miss in memory: ${normalizedKey}`) - return { - isFirstTime: true, - normalizedKey, - storageMethod: 'memory', + while (Date.now() - startTime < MAX_WAIT_TIME_MS) { + try { + let currentResult: ProcessingResult | null = null + + if (storageMethod === 'redis') { + const redis = getRedisClient() + if (redis) { + const data = await redis.get(redisKey) + currentResult = data ? JSON.parse(data) : null + } + } else if (storageMethod === 'database') { + const existing = await db + .select({ result: idempotencyKey.result }) + .from(idempotencyKey) + .where( + and( + eq(idempotencyKey.key, normalizedKey), + eq(idempotencyKey.namespace, this.config.namespace) + ) + ) + .limit(1) + currentResult = existing.length > 0 ? (existing[0].result as ProcessingResult) : null + } + + if (currentResult?.status === 'completed') { + logger.debug(`Operation completed, returning result: ${normalizedKey}`) + if (currentResult.success === false) { + throw new Error(currentResult.error || 'Previous operation failed') + } + return currentResult.result as T + } + + if (currentResult?.status === 'failed') { + logger.debug(`Operation failed, throwing error: ${normalizedKey}`) + throw new Error(currentResult.error || 'Previous operation failed') + } + + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) + } catch (error) { + if (error instanceof Error && error.message.includes('operation failed')) { + throw error + } + logger.warn(`Error while waiting for result ${normalizedKey}:`, error) + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) + } } + + throw new Error(`Timeout waiting for idempotency operation to complete: ${normalizedKey}`) } /** @@ -211,7 +337,7 @@ export class IdempotencyService { async storeResult( normalizedKey: string, result: ProcessingResult, - storageMethod: 'redis' | 'database' | 'memory' + storageMethod: 'redis' | 'database' ): Promise { const serializedResult = JSON.stringify(result) @@ -232,62 +358,34 @@ export class IdempotencyService { logger.warn(`Failed to store result in Redis for ${normalizedKey}:`, error) } - if (this.config.enableDatabaseFallback && storageMethod !== 'memory') { - try { - await db - .insert(idempotencyKey) - .values({ - key: normalizedKey, - namespace: this.config.namespace, + // Always fallback to database when Redis is not available + try { + await db + .insert(idempotencyKey) + .values({ + key: normalizedKey, + namespace: this.config.namespace, + result: result, + createdAt: new Date(), + }) + .onConflictDoUpdate({ + target: [idempotencyKey.key, idempotencyKey.namespace], + set: { result: result, createdAt: new Date(), - }) - .onConflictDoUpdate({ - target: [idempotencyKey.key, idempotencyKey.namespace], - set: { - result: result, - createdAt: new Date(), - }, - }) - - logger.debug(`Stored idempotency result in database: ${normalizedKey}`) - return - } catch (error) { - logger.warn(`Failed to store result in database for ${normalizedKey}:`, error) - } - } - - memoryCache.set(normalizedKey, { - result, - timestamp: Date.now(), - ttl: this.config.ttlSeconds, - }) + }, + }) - if (memoryCache.size > MEMORY_CACHE_SIZE) { - const entries = Array.from(memoryCache.entries()) - const now = Date.now() - - entries.forEach(([key, entry]) => { - if (now - entry.timestamp > entry.ttl * 1000) { - memoryCache.delete(key) - } - }) - - if (memoryCache.size > MEMORY_CACHE_SIZE) { - const sortedEntries = entries - .filter(([key]) => memoryCache.has(key)) - .sort((a, b) => a[1].timestamp - b[1].timestamp) - - const toRemove = sortedEntries.slice(0, memoryCache.size - MEMORY_CACHE_SIZE) - toRemove.forEach(([key]) => memoryCache.delete(key)) - } + logger.debug(`Stored idempotency result in database: ${normalizedKey}`) + } catch (error) { + logger.error(`Failed to store result in database for ${normalizedKey}:`, error) + throw new Error(`Failed to store idempotency result: database unavailable`) } - - logger.debug(`Stored idempotency result in memory: ${normalizedKey}`) } /** - * Execute an operation with idempotency protection + * Execute an operation with idempotency protection using atomic claims + * Eliminates race conditions by claiming the key before execution */ async executeWithIdempotency( provider: string, @@ -295,68 +393,104 @@ export class IdempotencyService { operation: () => Promise, additionalContext?: Record ): Promise { - const idempotencyCheck = await this.checkIdempotency(provider, identifier, additionalContext) + const claimResult = await this.atomicallyClaim(provider, identifier, additionalContext) + + if (!claimResult.claimed) { + const existingResult = claimResult.existingResult - if (!idempotencyCheck.isFirstTime) { - logger.info(`Skipping duplicate operation: ${idempotencyCheck.normalizedKey}`) + if (existingResult?.status === 'completed') { + logger.info(`Returning cached result for: ${claimResult.normalizedKey}`) + if (existingResult.success === false) { + throw new Error(existingResult.error || 'Previous operation failed') + } + return existingResult.result as T + } - if (idempotencyCheck.previousResult?.success === false) { - throw new Error(idempotencyCheck.previousResult?.error || 'Previous operation failed') + if (existingResult?.status === 'failed') { + logger.info(`Previous operation failed for: ${claimResult.normalizedKey}`) + throw new Error(existingResult.error || 'Previous operation failed') } - return idempotencyCheck.previousResult?.result as T + if (existingResult?.status === 'in-progress') { + logger.info(`Waiting for in-progress operation: ${claimResult.normalizedKey}`) + return await this.waitForResult(claimResult.normalizedKey, claimResult.storageMethod) + } + + if (existingResult?.success === false) { + throw new Error(existingResult.error || 'Previous operation failed') + } + return existingResult?.result as T } try { - logger.debug(`Executing new operation: ${idempotencyCheck.normalizedKey}`) + logger.info(`Executing new operation: ${claimResult.normalizedKey}`) const result = await operation() await this.storeResult( - idempotencyCheck.normalizedKey, - { success: true, result }, - idempotencyCheck.storageMethod + claimResult.normalizedKey, + { success: true, result, status: 'completed' }, + claimResult.storageMethod ) + logger.debug(`Successfully completed operation: ${claimResult.normalizedKey}`) return result } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' + await this.storeResult( - idempotencyCheck.normalizedKey, - { success: false, error: errorMessage }, - idempotencyCheck.storageMethod + claimResult.normalizedKey, + { success: false, error: errorMessage, status: 'failed' }, + claimResult.storageMethod ) + logger.warn(`Operation failed: ${claimResult.normalizedKey} - ${errorMessage}`) throw error } } /** - * Create an idempotency key from a webhook payload + * Create an idempotency key from a webhook payload following RFC best practices + * Priority order: + * 1. Standard webhook headers (webhook-id, x-webhook-id, etc.) + * 2. Event/message IDs from payload + * 3. Deterministic hash of stable payload fields (excluding timestamps) */ static createWebhookIdempotencyKey( webhookId: string, payload: any, headers?: Record ): string { + // 1. Check for standard webhook headers (RFC compliant) const webhookIdHeader = - headers?.['x-webhook-id'] || + headers?.['webhook-id'] || // Standard Webhooks spec + headers?.['x-webhook-id'] || // Legacy standard headers?.['x-shopify-webhook-id'] || headers?.['x-github-delivery'] || - headers?.['stripe-signature']?.split(',')[0] + headers?.['x-event-id'] // Generic event ID header if (webhookIdHeader) { return `${webhookId}:${webhookIdHeader}` } - const payloadId = payload?.id || payload?.event_id || payload?.message?.id || payload?.data?.id + // 2. Extract event/message IDs from payload (most reliable) + const payloadId = + payload?.id || + payload?.event_id || + payload?.eventId || + payload?.message?.id || + payload?.data?.id || + payload?.object?.id || + payload?.event?.id if (payloadId) { return `${webhookId}:${payloadId}` } + // 3. Create deterministic hash from stable payload fields (excluding timestamps) + const stablePayload = IdempotencyService.createStablePayloadForHashing(payload) const payloadHash = crypto .createHash('sha256') - .update(JSON.stringify(payload)) + .update(JSON.stringify(stablePayload)) .digest('hex') .substring(0, 16) @@ -364,29 +498,62 @@ export class IdempotencyService { } /** - * Create an idempotency key for Gmail polling + * Create a stable representation of the payload for hashing by removing + * timestamp and other volatile fields that change between requests */ - static createGmailIdempotencyKey(webhookId: string, emailId: string): string { - return `${webhookId}:${emailId}` - } + private static createStablePayloadForHashing(payload: any): any { + if (!payload || typeof payload !== 'object') { + return payload + } - /** - * Create an idempotency key for generic triggers - */ - static createTriggerIdempotencyKey( - triggerId: string, - eventId: string, - additionalContext?: Record - ): string { - const base = `${triggerId}:${eventId}` - if (additionalContext && Object.keys(additionalContext).length > 0) { - const contextStr = Object.keys(additionalContext) - .sort() - .map((key) => `${key}=${additionalContext[key]}`) - .join('&') - return `${base}:${contextStr}` + const volatileFields = [ + 'timestamp', + 'created_at', + 'updated_at', + 'sent_at', + 'received_at', + 'processed_at', + 'delivered_at', + 'attempt', + 'retry_count', + 'request_id', + 'trace_id', + 'span_id', + 'delivery_id', + 'webhook_timestamp', + ] + + const cleanPayload = { ...payload } + + const removeVolatileFields = (obj: any): any => { + if (!obj || typeof obj !== 'object') return obj + + if (Array.isArray(obj)) { + return obj.map(removeVolatileFields) + } + + const cleaned: any = {} + for (const [key, value] of Object.entries(obj)) { + const lowerKey = key.toLowerCase() + + if (volatileFields.some((field) => lowerKey.includes(field))) { + continue + } + + if (typeof value === 'string' && /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/.test(value)) { + continue + } + if (typeof value === 'number' && value > 1000000000 && value < 9999999999) { + continue + } + + cleaned[key] = removeVolatileFields(value) + } + + return cleaned } - return base + + return removeVolatileFields(cleanPayload) } } From 2c114653b9a70aa894f902b13409d55e4a3414bb Mon Sep 17 00:00:00 2001 From: waleedlatif1 Date: Wed, 17 Sep 2025 17:11:25 -0700 Subject: [PATCH 2/2] ack PR comments --- apps/sim/lib/idempotency/service.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/idempotency/service.ts b/apps/sim/lib/idempotency/service.ts index ac9b82d5ec..48fb4ce8e1 100644 --- a/apps/sim/lib/idempotency/service.ts +++ b/apps/sim/lib/idempotency/service.ts @@ -416,10 +416,11 @@ export class IdempotencyService { return await this.waitForResult(claimResult.normalizedKey, claimResult.storageMethod) } - if (existingResult?.success === false) { - throw new Error(existingResult.error || 'Previous operation failed') + if (existingResult) { + return existingResult.result as T } - return existingResult?.result as T + + throw new Error(`Unexpected state: key claimed but no existing result found`) } try {