From f5548c3acaa4e1123eb9dc45c1421e633813af6f Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 5 Jan 2026 00:23:38 -0800 Subject: [PATCH] improvement(kb): add configurable concurrency to chunks processing, sped up 22x for large docs --- apps/sim/lib/core/config/env.ts | 6 +- apps/sim/lib/knowledge/documents/service.ts | 8 +-- apps/sim/lib/knowledge/embeddings.ts | 68 ++++++++++++++------- 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index b42ae407a2..5cca8759be 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -174,9 +174,9 @@ export const env = createEnv({ KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms - KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit - KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size - KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms + KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(50), // Concurrent embedding API calls + KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch + KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed) KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms // Real-time Communication diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 19419fccf4..313ea8d395 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -29,10 +29,10 @@ const TIMEOUTS = { // Configuration for handling large documents const LARGE_DOC_CONFIG = { - MAX_CHUNKS_PER_BATCH: 500, // Insert embeddings in batches of 500 - MAX_EMBEDDING_BATCH: 500, // Generate embeddings in batches of 500 - MAX_FILE_SIZE: 100 * 1024 * 1024, // 100MB max file size - MAX_CHUNKS_PER_DOCUMENT: 100000, // Maximum chunks allowed per document + MAX_CHUNKS_PER_BATCH: 500, + MAX_EMBEDDING_BATCH: env.KB_CONFIG_BATCH_SIZE || 2000, + MAX_FILE_SIZE: 100 * 1024 * 1024, + MAX_CHUNKS_PER_DOCUMENT: 100000, } /** diff --git a/apps/sim/lib/knowledge/embeddings.ts b/apps/sim/lib/knowledge/embeddings.ts index 785d8347d6..7a736688be 100644 --- a/apps/sim/lib/knowledge/embeddings.ts +++ b/apps/sim/lib/knowledge/embeddings.ts @@ -7,6 +7,7 @@ import { batchByTokenLimit, getTotalTokenCount } from '@/lib/tokenization' const logger = createLogger('EmbeddingUtils') const MAX_TOKENS_PER_REQUEST = 8000 +const MAX_CONCURRENT_BATCHES = env.KB_CONFIG_CONCURRENCY_LIMIT || 50 export class EmbeddingAPIError extends Error { public status: number @@ -121,8 +122,29 @@ async function callEmbeddingAPI(inputs: string[], config: EmbeddingConfig): Prom } /** - * Generate embeddings for multiple texts with token-aware batching - * Uses tiktoken for token counting + * Process batches with controlled concurrency + */ +async function processWithConcurrency( + items: T[], + concurrency: number, + processor: (item: T, index: number) => Promise +): Promise { + const results: R[] = new Array(items.length) + let currentIndex = 0 + + const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => { + while (currentIndex < items.length) { + const index = currentIndex++ + results[index] = await processor(items[index], index) + } + }) + + await Promise.all(workers) + return results +} + +/** + * Generate embeddings for multiple texts with token-aware batching and parallel processing */ export async function generateEmbeddings( texts: string[], @@ -138,35 +160,35 @@ export async function generateEmbeddings( const batches = batchByTokenLimit(texts, MAX_TOKENS_PER_REQUEST, embeddingModel) logger.info( - `Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch)` + `Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch, ${MAX_CONCURRENT_BATCHES} concurrent)` ) - const allEmbeddings: number[][] = [] + const batchResults = await processWithConcurrency( + batches, + MAX_CONCURRENT_BATCHES, + async (batch, i) => { + const batchTokenCount = getTotalTokenCount(batch, embeddingModel) - for (let i = 0; i < batches.length; i++) { - const batch = batches[i] - const batchTokenCount = getTotalTokenCount(batch, embeddingModel) + logger.info( + `Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens` + ) - logger.info( - `Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens` - ) + try { + const batchEmbeddings = await callEmbeddingAPI(batch, config) - try { - const batchEmbeddings = await callEmbeddingAPI(batch, config) - allEmbeddings.push(...batchEmbeddings) + logger.info( + `Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}` + ) - logger.info( - `Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}` - ) - } catch (error) { - logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error) - throw error + return batchEmbeddings + } catch (error) { + logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error) + throw error + } } + ) - if (i + 1 < batches.length) { - await new Promise((resolve) => setTimeout(resolve, 100)) - } - } + const allEmbeddings = batchResults.flat() logger.info(`Successfully generated ${allEmbeddings.length} embeddings total`)