Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions apps/api/src/vector-store/lib/sync/sync-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ export async function fetchContextEntries(
});
}

interface SyncSingleResult {
status: 'created' | 'updated' | 'skipped';
lastEmbeddingId: string | null;
}

/**
* Sync a single context entry's embeddings
*/
async function syncSingleContext(
context: ContextData,
existingEmbeddings: ExistingEmbedding[],
organizationId: string,
): Promise<'created' | 'updated' | 'skipped'> {
): Promise<SyncSingleResult> {
const contextUpdatedAt = context.updatedAt.toISOString();

// Check if context needs update
if (!needsUpdate(existingEmbeddings, contextUpdatedAt)) {
return 'skipped';
return { status: 'skipped', lastEmbeddingId: null };
}

// Delete old embeddings if they exist
Expand All @@ -60,7 +65,7 @@ async function syncSingleContext(
const contextText = `Question: ${context.question}\n\nAnswer: ${context.answer}`;

if (!contextText || contextText.trim().length === 0) {
return 'skipped';
return { status: 'skipped', lastEmbeddingId: null };
}

// Use larger chunk size for context entries
Expand All @@ -77,12 +82,15 @@ async function syncSingleContext(
);

if (chunkItems.length === 0) {
return 'skipped';
return { status: 'skipped', lastEmbeddingId: null };
}

await upsertChunks(chunkItems);

return existingEmbeddings.length === 0 ? 'created' : 'updated';
const lastEmbeddingId = chunkItems[chunkItems.length - 1]?.id ?? null;
const status = existingEmbeddings.length === 0 ? 'created' : 'updated';

return { status, lastEmbeddingId };
}

/**
Expand Down Expand Up @@ -115,9 +123,14 @@ export async function syncContextEntries(
organizationId,
);

if (result === 'created') stats.created++;
else if (result === 'updated') stats.updated++;
if (result.status === 'created') stats.created++;
else if (result.status === 'updated') stats.updated++;
else stats.skipped++;

// Track the last upserted embedding ID
if (result.lastEmbeddingId) {
stats.lastUpsertedEmbeddingId = result.lastEmbeddingId;
}
} catch (error) {
logger.error('Failed to sync context', {
contextId: context.id,
Expand Down
26 changes: 20 additions & 6 deletions apps/api/src/vector-store/lib/sync/sync-knowledge-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,18 @@ async function deleteExistingDocumentEmbeddings(
}
}

interface ProcessDocumentResult {
status: 'processed' | 'failed';
lastEmbeddingId: string | null;
}

/**
* Process a single knowledge base document
*/
async function processSingleDocument(
document: KnowledgeBaseDocumentData,
organizationId: string,
): Promise<'processed' | 'failed'> {
): Promise<ProcessDocumentResult> {
const documentUpdatedAt = document.updatedAt.toISOString();

logger.info('Processing Knowledge Base document', {
Expand All @@ -128,7 +133,7 @@ async function processSingleDocument(
documentId: document.id,
});
await updateDocumentStatus(document.id, 'failed');
return 'failed';
return { status: 'failed', lastEmbeddingId: null };
}

// Delete existing embeddings
Expand All @@ -148,7 +153,7 @@ async function processSingleDocument(
if (chunkItems.length === 0) {
logger.warn('No chunks created from content', { documentId: document.id });
await updateDocumentStatus(document.id, 'failed');
return 'failed';
return { status: 'failed', lastEmbeddingId: null };
}

await upsertChunks(chunkItems);
Expand All @@ -158,7 +163,9 @@ async function processSingleDocument(
});

await updateDocumentStatus(document.id, 'completed');
return 'processed';

const lastEmbeddingId = chunkItems[chunkItems.length - 1]?.id ?? null;
return { status: 'processed', lastEmbeddingId };
}

/**
Expand Down Expand Up @@ -211,8 +218,15 @@ export async function syncKnowledgeBaseDocuments(
try {
const result = await processSingleDocument(document, organizationId);

if (result === 'processed') stats.created++;
else stats.failed++;
if (result.status === 'processed') {
stats.created++;
// Track the last upserted embedding ID
if (result.lastEmbeddingId) {
stats.lastUpsertedEmbeddingId = result.lastEmbeddingId;
}
} else {
stats.failed++;
}
} catch (error) {
logger.error('Failed to process Knowledge Base document', {
documentId: document.id,
Expand Down
187 changes: 178 additions & 9 deletions apps/api/src/vector-store/lib/sync/sync-organization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
syncKnowledgeBaseDocuments,
fetchKnowledgeBaseDocuments,
} from './sync-knowledge-base';
import type { SyncStats } from './sync-utils';

/**
* Lock map to prevent concurrent syncs for the same organization
Expand Down Expand Up @@ -100,6 +101,51 @@ async function performSync(organizationId: string): Promise<void> {
existingEmbeddings,
);

// Step 7: Verify embeddings are queryable (handles Upstash eventual consistency)
// Only verify if we actually created or updated any embeddings
const totalCreated =
policyStats.created +
contextStats.created +
manualAnswerStats.created +
kbDocStats.created;
const totalUpdated =
policyStats.updated +
contextStats.updated +
manualAnswerStats.updated +
kbDocStats.updated;

// Find the last upserted embedding ID (prioritize knowledge base, then manual answers, then context, then policies)
// This ensures we verify the most recently upserted embedding
const lastUpsertedEmbeddingId =
kbDocStats.lastUpsertedEmbeddingId ??
manualAnswerStats.lastUpsertedEmbeddingId ??
contextStats.lastUpsertedEmbeddingId ??
policyStats.lastUpsertedEmbeddingId ??
null;

if ((totalCreated > 0 || totalUpdated > 0) && lastUpsertedEmbeddingId) {
const verificationResult = await verifyEmbeddingIsReady(
lastUpsertedEmbeddingId,
organizationId,
);
logger.info('Embeddings verification completed', {
organizationId,
embeddingId: lastUpsertedEmbeddingId,
verified: verificationResult.success,
attempts: verificationResult.attempts,
totalWaitMs: verificationResult.totalWaitMs,
});
} else if (totalCreated > 0 || totalUpdated > 0) {
logger.warn(
'Embeddings were created/updated but no embedding ID tracked for verification',
{ organizationId, totalCreated, totalUpdated },
);
} else {
logger.info('Skipping verification - no new embeddings created/updated', {
organizationId,
});
}

logger.info('Incremental organization embeddings sync completed', {
organizationId,
policies: policyStats,
Expand All @@ -118,18 +164,125 @@ async function performSync(organizationId: string): Promise<void> {
}
}

/**
* Verifies that embeddings are indexed and QUERYABLE after sync.
* Uses retry with exponential backoff to handle Upstash Vector's eventual consistency.
*
* IMPORTANT: We use vectorIndex.query() instead of fetch() because:
* - fetch() checks if data is stored (works immediately)
* - query() checks if data is INDEXED for semantic search (may have delay)
*
* The embedding must be queryable, not just fetchable, for RAG to work.
*/
async function verifyEmbeddingIsReady(
embeddingId: string,
organizationId: string,
): Promise<{
success: boolean;
attempts: number;
totalWaitMs: number;
}> {
if (!vectorIndex) {
logger.warn('Vector index not configured, skipping verification');
return { success: false, attempts: 0, totalWaitMs: 0 };
}

const maxRetries = 8;
const initialDelay = 300; // 300ms
let totalWaitMs = 0;

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
// First, fetch the embedding to get its vector
const fetchedEmbeddings = await vectorIndex.fetch([embeddingId], {
includeVectors: true,
});

const fetchedEmbedding = fetchedEmbeddings?.[0];
if (!fetchedEmbedding || !fetchedEmbedding.vector) {
// Embedding not even stored yet, wait and retry
if (attempt < maxRetries) {
const delay = initialDelay * Math.pow(2, attempt - 1);
logger.info('Embedding not yet stored, waiting before retry', {
organizationId,
embeddingId,
attempt,
nextDelayMs: delay,
});
await new Promise((resolve) => setTimeout(resolve, delay));
totalWaitMs += delay;
}
continue;
}

// Now query using the embedding's own vector to verify it's INDEXED
// If the embedding is indexed, querying with its own vector should return itself
const queryResults = await vectorIndex.query({
vector: fetchedEmbedding.vector as number[],
topK: 1,
filter: `organizationId = "${organizationId}"`,
includeMetadata: true,
});

// Check if our embedding appears in the query results
const isIndexed = queryResults.some((result) => result.id === embeddingId);

if (isIndexed) {
logger.info('Embedding verification succeeded - indexed and queryable', {
organizationId,
embeddingId,
attempt,
totalWaitMs,
});
return { success: true, attempts: attempt, totalWaitMs };
}

// Embedding is stored but not yet indexed for search
if (attempt < maxRetries) {
const delay = initialDelay * Math.pow(2, attempt - 1); // 300ms, 600ms, 1200ms, 2400ms...
logger.info('Embedding stored but not yet indexed, waiting before retry', {
organizationId,
embeddingId,
attempt,
nextDelayMs: delay,
queryResultCount: queryResults.length,
});
await new Promise((resolve) => setTimeout(resolve, delay));
totalWaitMs += delay;
}
} catch (error) {
logger.warn('Verification query failed', {
organizationId,
embeddingId,
attempt,
error: error instanceof Error ? error.message : 'Unknown error',
});
// Continue to next retry
if (attempt < maxRetries) {
const delay = initialDelay * Math.pow(2, attempt - 1);
await new Promise((resolve) => setTimeout(resolve, delay));
totalWaitMs += delay;
}
}
}

// All retries exhausted
logger.warn('Embedding verification failed after all retries', {
organizationId,
embeddingId,
attempts: maxRetries,
totalWaitMs,
});
return { success: false, attempts: maxRetries, totalWaitMs };
}

/**
* Sync manual answers for an organization
*/
async function syncManualAnswers(
organizationId: string,
existingEmbeddings: Map<string, ExistingEmbedding[]>,
): Promise<{
created: number;
updated: number;
skipped: number;
total: number;
}> {
): Promise<SyncStats> {
const manualAnswers = await db.securityQuestionnaireManualAnswer.findMany({
where: { organizationId },
select: {
Expand All @@ -148,6 +301,7 @@ async function syncManualAnswers(
let created = 0;
let updated = 0;
let skipped = 0;
let lastUpsertedEmbeddingId: string | null = null;

if (manualAnswers.length > 0) {
const itemsToUpsert = manualAnswers
Expand Down Expand Up @@ -183,8 +337,16 @@ async function syncManualAnswers(
})
.filter((item): item is NonNullable<typeof item> => item !== null);

if (itemsToUpsert.length > 0) {
await batchUpsertEmbeddings(itemsToUpsert);
// Filter out items with empty text (same filter as batchUpsertEmbeddings)
// This ensures we only track IDs that will actually be upserted
const validItems = itemsToUpsert.filter(
(item) => item.text && item.text.trim().length > 0,
);

if (validItems.length > 0) {
await batchUpsertEmbeddings(validItems);
// Track the last ACTUALLY upserted embedding ID
lastUpsertedEmbeddingId = validItems[validItems.length - 1]?.id ?? null;
}
}

Expand All @@ -196,7 +358,14 @@ async function syncManualAnswers(
total: manualAnswers.length,
});

return { created, updated, skipped, total: manualAnswers.length };
return {
created,
updated,
skipped,
failed: 0,
total: manualAnswers.length,
lastUpsertedEmbeddingId,
};
}

/**
Expand Down
Loading
Loading