diff --git a/apps/api/src/vector-store/lib/sync/sync-context.ts b/apps/api/src/vector-store/lib/sync/sync-context.ts index 375723a30..9fe2d9c3a 100644 --- a/apps/api/src/vector-store/lib/sync/sync-context.ts +++ b/apps/api/src/vector-store/lib/sync/sync-context.ts @@ -38,6 +38,11 @@ export async function fetchContextEntries( }); } +interface SyncSingleResult { + status: 'created' | 'updated' | 'skipped'; + lastEmbeddingId: string | null; +} + /** * Sync a single context entry's embeddings */ @@ -45,12 +50,12 @@ async function syncSingleContext( context: ContextData, existingEmbeddings: ExistingEmbedding[], organizationId: string, -): Promise<'created' | 'updated' | 'skipped'> { +): Promise { const contextUpdatedAt = context.updatedAt.toISOString(); // Check if context needs update if (!needsUpdate(existingEmbeddings, contextUpdatedAt)) { - return 'skipped'; + return { status: 'skipped', lastEmbeddingId: null }; } // Delete old embeddings if they exist @@ -60,7 +65,7 @@ async function syncSingleContext( const contextText = `Question: ${context.question}\n\nAnswer: ${context.answer}`; if (!contextText || contextText.trim().length === 0) { - return 'skipped'; + return { status: 'skipped', lastEmbeddingId: null }; } // Use larger chunk size for context entries @@ -77,12 +82,15 @@ async function syncSingleContext( ); if (chunkItems.length === 0) { - return 'skipped'; + return { status: 'skipped', lastEmbeddingId: null }; } await upsertChunks(chunkItems); - return existingEmbeddings.length === 0 ? 'created' : 'updated'; + const lastEmbeddingId = chunkItems[chunkItems.length - 1]?.id ?? null; + const status = existingEmbeddings.length === 0 ? 'created' : 'updated'; + + return { status, lastEmbeddingId }; } /** @@ -115,9 +123,14 @@ export async function syncContextEntries( organizationId, ); - if (result === 'created') stats.created++; - else if (result === 'updated') stats.updated++; + if (result.status === 'created') stats.created++; + else if (result.status === 'updated') stats.updated++; else stats.skipped++; + + // Track the last upserted embedding ID + if (result.lastEmbeddingId) { + stats.lastUpsertedEmbeddingId = result.lastEmbeddingId; + } } catch (error) { logger.error('Failed to sync context', { contextId: context.id, diff --git a/apps/api/src/vector-store/lib/sync/sync-knowledge-base.ts b/apps/api/src/vector-store/lib/sync/sync-knowledge-base.ts index 9284fbcbb..127f990d0 100644 --- a/apps/api/src/vector-store/lib/sync/sync-knowledge-base.ts +++ b/apps/api/src/vector-store/lib/sync/sync-knowledge-base.ts @@ -100,13 +100,18 @@ async function deleteExistingDocumentEmbeddings( } } +interface ProcessDocumentResult { + status: 'processed' | 'failed'; + lastEmbeddingId: string | null; +} + /** * Process a single knowledge base document */ async function processSingleDocument( document: KnowledgeBaseDocumentData, organizationId: string, -): Promise<'processed' | 'failed'> { +): Promise { const documentUpdatedAt = document.updatedAt.toISOString(); logger.info('Processing Knowledge Base document', { @@ -128,7 +133,7 @@ async function processSingleDocument( documentId: document.id, }); await updateDocumentStatus(document.id, 'failed'); - return 'failed'; + return { status: 'failed', lastEmbeddingId: null }; } // Delete existing embeddings @@ -148,7 +153,7 @@ async function processSingleDocument( if (chunkItems.length === 0) { logger.warn('No chunks created from content', { documentId: document.id }); await updateDocumentStatus(document.id, 'failed'); - return 'failed'; + return { status: 'failed', lastEmbeddingId: null }; } await upsertChunks(chunkItems); @@ -158,7 +163,9 @@ async function processSingleDocument( }); await updateDocumentStatus(document.id, 'completed'); - return 'processed'; + + const lastEmbeddingId = chunkItems[chunkItems.length - 1]?.id ?? null; + return { status: 'processed', lastEmbeddingId }; } /** @@ -211,8 +218,15 @@ export async function syncKnowledgeBaseDocuments( try { const result = await processSingleDocument(document, organizationId); - if (result === 'processed') stats.created++; - else stats.failed++; + if (result.status === 'processed') { + stats.created++; + // Track the last upserted embedding ID + if (result.lastEmbeddingId) { + stats.lastUpsertedEmbeddingId = result.lastEmbeddingId; + } + } else { + stats.failed++; + } } catch (error) { logger.error('Failed to process Knowledge Base document', { documentId: document.id, diff --git a/apps/api/src/vector-store/lib/sync/sync-organization.ts b/apps/api/src/vector-store/lib/sync/sync-organization.ts index fd6521470..28dfbd8fa 100644 --- a/apps/api/src/vector-store/lib/sync/sync-organization.ts +++ b/apps/api/src/vector-store/lib/sync/sync-organization.ts @@ -12,6 +12,7 @@ import { syncKnowledgeBaseDocuments, fetchKnowledgeBaseDocuments, } from './sync-knowledge-base'; +import type { SyncStats } from './sync-utils'; /** * Lock map to prevent concurrent syncs for the same organization @@ -100,6 +101,51 @@ async function performSync(organizationId: string): Promise { existingEmbeddings, ); + // Step 7: Verify embeddings are queryable (handles Upstash eventual consistency) + // Only verify if we actually created or updated any embeddings + const totalCreated = + policyStats.created + + contextStats.created + + manualAnswerStats.created + + kbDocStats.created; + const totalUpdated = + policyStats.updated + + contextStats.updated + + manualAnswerStats.updated + + kbDocStats.updated; + + // Find the last upserted embedding ID (prioritize knowledge base, then manual answers, then context, then policies) + // This ensures we verify the most recently upserted embedding + const lastUpsertedEmbeddingId = + kbDocStats.lastUpsertedEmbeddingId ?? + manualAnswerStats.lastUpsertedEmbeddingId ?? + contextStats.lastUpsertedEmbeddingId ?? + policyStats.lastUpsertedEmbeddingId ?? + null; + + if ((totalCreated > 0 || totalUpdated > 0) && lastUpsertedEmbeddingId) { + const verificationResult = await verifyEmbeddingIsReady( + lastUpsertedEmbeddingId, + organizationId, + ); + logger.info('Embeddings verification completed', { + organizationId, + embeddingId: lastUpsertedEmbeddingId, + verified: verificationResult.success, + attempts: verificationResult.attempts, + totalWaitMs: verificationResult.totalWaitMs, + }); + } else if (totalCreated > 0 || totalUpdated > 0) { + logger.warn( + 'Embeddings were created/updated but no embedding ID tracked for verification', + { organizationId, totalCreated, totalUpdated }, + ); + } else { + logger.info('Skipping verification - no new embeddings created/updated', { + organizationId, + }); + } + logger.info('Incremental organization embeddings sync completed', { organizationId, policies: policyStats, @@ -118,18 +164,125 @@ async function performSync(organizationId: string): Promise { } } +/** + * Verifies that embeddings are indexed and QUERYABLE after sync. + * Uses retry with exponential backoff to handle Upstash Vector's eventual consistency. + * + * IMPORTANT: We use vectorIndex.query() instead of fetch() because: + * - fetch() checks if data is stored (works immediately) + * - query() checks if data is INDEXED for semantic search (may have delay) + * + * The embedding must be queryable, not just fetchable, for RAG to work. + */ +async function verifyEmbeddingIsReady( + embeddingId: string, + organizationId: string, +): Promise<{ + success: boolean; + attempts: number; + totalWaitMs: number; +}> { + if (!vectorIndex) { + logger.warn('Vector index not configured, skipping verification'); + return { success: false, attempts: 0, totalWaitMs: 0 }; + } + + const maxRetries = 8; + const initialDelay = 300; // 300ms + let totalWaitMs = 0; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + // First, fetch the embedding to get its vector + const fetchedEmbeddings = await vectorIndex.fetch([embeddingId], { + includeVectors: true, + }); + + const fetchedEmbedding = fetchedEmbeddings?.[0]; + if (!fetchedEmbedding || !fetchedEmbedding.vector) { + // Embedding not even stored yet, wait and retry + if (attempt < maxRetries) { + const delay = initialDelay * Math.pow(2, attempt - 1); + logger.info('Embedding not yet stored, waiting before retry', { + organizationId, + embeddingId, + attempt, + nextDelayMs: delay, + }); + await new Promise((resolve) => setTimeout(resolve, delay)); + totalWaitMs += delay; + } + continue; + } + + // Now query using the embedding's own vector to verify it's INDEXED + // If the embedding is indexed, querying with its own vector should return itself + const queryResults = await vectorIndex.query({ + vector: fetchedEmbedding.vector as number[], + topK: 1, + filter: `organizationId = "${organizationId}"`, + includeMetadata: true, + }); + + // Check if our embedding appears in the query results + const isIndexed = queryResults.some((result) => result.id === embeddingId); + + if (isIndexed) { + logger.info('Embedding verification succeeded - indexed and queryable', { + organizationId, + embeddingId, + attempt, + totalWaitMs, + }); + return { success: true, attempts: attempt, totalWaitMs }; + } + + // Embedding is stored but not yet indexed for search + if (attempt < maxRetries) { + const delay = initialDelay * Math.pow(2, attempt - 1); // 300ms, 600ms, 1200ms, 2400ms... + logger.info('Embedding stored but not yet indexed, waiting before retry', { + organizationId, + embeddingId, + attempt, + nextDelayMs: delay, + queryResultCount: queryResults.length, + }); + await new Promise((resolve) => setTimeout(resolve, delay)); + totalWaitMs += delay; + } + } catch (error) { + logger.warn('Verification query failed', { + organizationId, + embeddingId, + attempt, + error: error instanceof Error ? error.message : 'Unknown error', + }); + // Continue to next retry + if (attempt < maxRetries) { + const delay = initialDelay * Math.pow(2, attempt - 1); + await new Promise((resolve) => setTimeout(resolve, delay)); + totalWaitMs += delay; + } + } + } + + // All retries exhausted + logger.warn('Embedding verification failed after all retries', { + organizationId, + embeddingId, + attempts: maxRetries, + totalWaitMs, + }); + return { success: false, attempts: maxRetries, totalWaitMs }; +} + /** * Sync manual answers for an organization */ async function syncManualAnswers( organizationId: string, existingEmbeddings: Map, -): Promise<{ - created: number; - updated: number; - skipped: number; - total: number; -}> { +): Promise { const manualAnswers = await db.securityQuestionnaireManualAnswer.findMany({ where: { organizationId }, select: { @@ -148,6 +301,7 @@ async function syncManualAnswers( let created = 0; let updated = 0; let skipped = 0; + let lastUpsertedEmbeddingId: string | null = null; if (manualAnswers.length > 0) { const itemsToUpsert = manualAnswers @@ -183,8 +337,16 @@ async function syncManualAnswers( }) .filter((item): item is NonNullable => item !== null); - if (itemsToUpsert.length > 0) { - await batchUpsertEmbeddings(itemsToUpsert); + // Filter out items with empty text (same filter as batchUpsertEmbeddings) + // This ensures we only track IDs that will actually be upserted + const validItems = itemsToUpsert.filter( + (item) => item.text && item.text.trim().length > 0, + ); + + if (validItems.length > 0) { + await batchUpsertEmbeddings(validItems); + // Track the last ACTUALLY upserted embedding ID + lastUpsertedEmbeddingId = validItems[validItems.length - 1]?.id ?? null; } } @@ -196,7 +358,14 @@ async function syncManualAnswers( total: manualAnswers.length, }); - return { created, updated, skipped, total: manualAnswers.length }; + return { + created, + updated, + skipped, + failed: 0, + total: manualAnswers.length, + lastUpsertedEmbeddingId, + }; } /** diff --git a/apps/api/src/vector-store/lib/sync/sync-policies.ts b/apps/api/src/vector-store/lib/sync/sync-policies.ts index b3ec15169..25205f9db 100644 --- a/apps/api/src/vector-store/lib/sync/sync-policies.ts +++ b/apps/api/src/vector-store/lib/sync/sync-policies.ts @@ -44,6 +44,11 @@ export async function fetchPolicies( }); } +interface SyncSingleResult { + status: 'created' | 'updated' | 'skipped'; + lastEmbeddingId: string | null; +} + /** * Sync a single policy's embeddings */ @@ -51,12 +56,12 @@ async function syncSinglePolicy( policy: PolicyData, existingEmbeddings: ExistingEmbedding[], organizationId: string, -): Promise<'created' | 'updated' | 'skipped'> { +): Promise { const policyUpdatedAt = policy.updatedAt.toISOString(); // Check if policy needs update if (!needsUpdate(existingEmbeddings, policyUpdatedAt)) { - return 'skipped'; + return { status: 'skipped', lastEmbeddingId: null }; } // Delete old embeddings if they exist @@ -68,7 +73,7 @@ async function syncSinglePolicy( ); if (!policyText || policyText.trim().length === 0) { - return 'skipped'; + return { status: 'skipped', lastEmbeddingId: null }; } const chunkItems = createChunkItems( @@ -82,12 +87,16 @@ async function syncSinglePolicy( ); if (chunkItems.length === 0) { - return 'skipped'; + return { status: 'skipped', lastEmbeddingId: null }; } await upsertChunks(chunkItems); - return existingEmbeddings.length === 0 ? 'created' : 'updated'; + // Return the last chunk's ID for verification + const lastEmbeddingId = chunkItems[chunkItems.length - 1]?.id ?? null; + const status = existingEmbeddings.length === 0 ? 'created' : 'updated'; + + return { status, lastEmbeddingId }; } /** @@ -120,9 +129,14 @@ export async function syncPolicies( organizationId, ); - if (result === 'created') stats.created++; - else if (result === 'updated') stats.updated++; + if (result.status === 'created') stats.created++; + else if (result.status === 'updated') stats.updated++; else stats.skipped++; + + // Track the last upserted embedding ID + if (result.lastEmbeddingId) { + stats.lastUpsertedEmbeddingId = result.lastEmbeddingId; + } } catch (error) { logger.error('Failed to sync policy', { policyId: policy.id, diff --git a/apps/api/src/vector-store/lib/sync/sync-utils.ts b/apps/api/src/vector-store/lib/sync/sync-utils.ts index c04265fc0..0f94b1271 100644 --- a/apps/api/src/vector-store/lib/sync/sync-utils.ts +++ b/apps/api/src/vector-store/lib/sync/sync-utils.ts @@ -18,6 +18,8 @@ export interface SyncStats { skipped: number; failed: number; total: number; + /** The last embedding ID that was upserted (for verification) */ + lastUpsertedEmbeddingId: string | null; } export interface ChunkItem { @@ -183,5 +185,6 @@ export function initSyncStats(total: number): SyncStats { skipped: 0, failed: 0, total, + lastUpsertedEmbeddingId: null, }; } diff --git a/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/actions/sanitize-error.ts b/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/actions/sanitize-error.ts new file mode 100644 index 000000000..37aac0164 --- /dev/null +++ b/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/actions/sanitize-error.ts @@ -0,0 +1,125 @@ +'use server'; + +import { groq } from '@ai-sdk/groq'; +import { generateText } from 'ai'; + +const ERROR_SANITIZATION_SYSTEM_PROMPT = `Transform error messages into friendly, helpful guidance. Hide any sensitive data. + +RULES: +1. Write in simple, friendly language (not technical jargon) +2. Explain what went wrong and HOW TO FIX it +3. NEVER show: API keys, tokens, passwords, secrets, connection strings, internal paths, IPs +4. Keep error types (TypeError, SyntaxError) - they help debugging +5. If error is already clear and safe, return it unchanged and add some helpful tips to fix it (it should usefull for the user to fix the error) + +EXAMPLES: + +INPUT: "Failed to authenticate with key: sk_live_abc123xyz" +OUTPUT: "Authentication failed. Please check that your API key is correct and hasn't expired." + +INPUT: "Connection to postgres://user:pass@db.example.com failed: ETIMEDOUT" +OUTPUT: "Unable to connect to the database. Please verify your database credentials are correct and the database server is accessible." + +INPUT: "TypeError: Cannot read property 'data' of undefined" +OUTPUT: "The API response was missing expected data. Please check that the API endpoint is correct and returning the expected format." + +INPUT: "Invalid regular expression: missing /" +OUTPUT: "Invalid regular expression: missing /. Please check that all regex patterns have matching opening and closing slashes." + +INPUT: "Internal Server Error" +OUTPUT: "Something went wrong while running your automation. Please check your script for any syntax errors or incorrect API configurations." + +INPUT: "Request failed with status 401" +OUTPUT: "Access denied (401). Please verify your credentials or API key have the required permissions." + +INPUT: "Request failed with status 404" +OUTPUT: "The requested resource was not found (404). Please check that the URL or endpoint in your script is correct." + +INPUT: "Request failed with status 429" +OUTPUT: "Too many requests (429). The API rate limit was exceeded. Please wait a moment and try again, or reduce the frequency of requests." + +INPUT: "ENOTFOUND api.example.com" +OUTPUT: "Could not reach the server. Please check your internet connection and verify the API URL is correct." + +Return ONLY the friendly error message.`; + +/** + * Convert any error format to a string for AI processing. + * Handles: strings, Error objects, plain objects, and edge cases. + */ +const extractRawError = (err: unknown): string => { + if (!err) return ''; + + // String - use directly + if (typeof err === 'string') return err; + + // Error object - include name and message for context + if (err instanceof Error) { + const parts = [err.name !== 'Error' ? err.name : '', err.message].filter(Boolean); + return parts.join(': ') || ''; + } + + // Object - stringify and let AI parse the details + if (typeof err === 'object') { + try { + return JSON.stringify(err); + } catch { + // Circular reference or other stringify error + return String(err); + } + } + + // Fallback for other types (number, boolean, etc.) + return String(err); +}; + +/** + * Sanitize an error message using AI to make it user-friendly + * and remove any sensitive information. + * + * Uses deterministic settings (temperature: 0) for consistent results. + */ +export const sanitizeErrorMessage = async (rawError: unknown): Promise => { + const errorString = extractRawError(rawError); + + // If we couldn't extract any error, return a generic message + if (!errorString) { + return 'The automation encountered an unexpected error. Please check your script and try again.'; + } + + // Always use AI to make errors user-friendly and hide sensitive data + try { + const { text } = await generateText({ + model: groq('meta-llama/llama-4-scout-17b-16e-instruct'), + system: ERROR_SANITIZATION_SYSTEM_PROMPT, + prompt: errorString, + temperature: 0, // Deterministic output + maxRetries: 2, + }); + + const result = text.trim() || 'The automation encountered an error. Please check your script and try again.'; + console.log('[sanitizeErrorMessage] SYSTEM AI response:', result); + + return result; + } catch (aiError) { + // If AI fails, fall back to basic sanitization + console.error('[sanitizeErrorMessage] AI sanitization failed:', aiError); + + // Basic regex-based sanitization as fallback + let sanitized = errorString + // Remove potential API keys and tokens + .replace(/([a-zA-Z_]*(?:key|token|secret|password|api_key|apikey|authorization)[a-zA-Z_]*[=:\s]+)['"]?[a-zA-Z0-9_\-]{16,}['"]?/gi, '$1[REDACTED]') + // Remove Bearer tokens + .replace(/Bearer\s+[a-zA-Z0-9_\-\.]+/gi, 'Bearer [REDACTED]') + // Remove connection strings + .replace(/(mongodb|postgres|mysql|redis|amqp):\/\/[^\s]+/gi, '$1://[REDACTED]') + // Remove AWS keys + .replace(/AKIA[A-Z0-9]{16}/g, '[AWS_KEY_REDACTED]') + // Remove long hex strings that look like secrets + .replace(/['"][a-f0-9]{32,}['"]/gi, '"[REDACTED]"') + // Remove URLs with credentials + .replace(/:\/\/[^:]+:[^@]+@/g, '://[CREDENTIALS_REDACTED]@'); + + return sanitized || 'The automation encountered an error. Please check your script and try again.'; + } +}; diff --git a/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation-execution.ts b/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation-execution.ts index a356701ed..d5fad2c4e 100644 --- a/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation-execution.ts +++ b/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation-execution.ts @@ -20,6 +20,7 @@ import { useParams } from 'next/navigation'; import { useCallback, useEffect, useRef, useState } from 'react'; +import { sanitizeErrorMessage } from '../actions/sanitize-error'; import { useSharedChatContext } from '../lib/chat-context'; import { taskAutomationApi } from '../lib/task-automation-api'; import type { @@ -60,11 +61,21 @@ export function useTaskAutomationExecution({ } if (data.status === 'COMPLETED' && data.output) { + // Check both possible error locations: + // - data.output.error (direct error) + // - data.output.output.error (nested error from user's script returning {ok: false, error: "..."}) + const rawError = data.output.error || data.output.output?.error; + + // Sanitize if there's an error - makes it user-friendly and removes sensitive data + const sanitizedError = rawError + ? await sanitizeErrorMessage(rawError).catch(() => String(rawError)) + : undefined; + const executionResult: TaskAutomationExecutionResult = { success: data.output.success, data: data.output.output, - error: data.output.error, - logs: data.output.logs, + error: sanitizedError, + logs: [], // Don't expose internal execution logs to users summary: data.output.summary, evaluationStatus: data.output.evaluationStatus, evaluationReason: data.output.evaluationReason, @@ -75,8 +86,18 @@ export function useTaskAutomationExecution({ setIsExecuting(false); onSuccess?.(executionResult); } else if (data.status === 'FAILED') { - const error = new Error(data.error?.message || 'Task execution failed'); - console.error('[Automation Execution] Client received error:', data.error); + // Log raw error for debugging (internal only) + console.error('[Automation Execution] Raw error:', data.error); + + // Use AI to sanitize error message with fallback if AI fails + // Fallback extracts message from error object if available + const sanitizedMessage = await sanitizeErrorMessage(data.error).catch(() => { + if (typeof data.error === 'string') return data.error; + if (data.error?.message) return String(data.error.message); + return 'The automation failed to execute'; + }); + const error = new Error(sanitizedMessage); + setError(error); setIsExecuting(false); onError?.(error); @@ -85,7 +106,11 @@ export function useTaskAutomationExecution({ pollingIntervalRef.current = setTimeout(pollRunStatus, 1000); } } catch (err) { - const error = err instanceof Error ? err : new Error('Failed to poll run status'); + // Sanitize with fallback to ensure state cleanup always happens + const sanitizedMessage = await sanitizeErrorMessage(err).catch( + () => (err instanceof Error ? err.message : 'An unexpected error occurred'), + ); + const error = new Error(sanitizedMessage); setError(error); setIsExecuting(false); onError?.(error); @@ -135,7 +160,11 @@ export function useTaskAutomationExecution({ return response; } } catch (err) { - const error = err instanceof Error ? err : new Error('Unknown error'); + // Sanitize with fallback to ensure state cleanup always happens + const sanitizedMessage = await sanitizeErrorMessage(err).catch( + () => (err instanceof Error ? err.message : 'An unexpected error occurred'), + ); + const error = new Error(sanitizedMessage); setError(error); setIsExecuting(false); onError?.(error); diff --git a/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation.ts b/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation.ts index 55e47c508..aad2edd38 100644 --- a/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation.ts +++ b/apps/app/src/app/(app)/[orgId]/tasks/[taskId]/automation/[automationId]/hooks/use-task-automation.ts @@ -58,8 +58,6 @@ export function useTaskAutomation(overrideAutomationId?: string): UseTaskAutomat throw new Error('Failed to fetch automation'); } - console.log('response.data.automation', response.data.automation); - return response.data.automation; }, {