From acfc0540ac81e1b871bead561468c42a5e18aaf8 Mon Sep 17 00:00:00 2001 From: Manoj K Date: Wed, 3 Sep 2025 11:34:48 +0530 Subject: [PATCH] refactor: consolidate document versioning around sessionId instead of documentId --- apps/webapp/app/lib/ingest.server.ts | 13 +-- .../app/services/documentVersioning.server.ts | 61 ++++++------ .../app/services/graphModels/document.ts | 20 ++-- .../app/services/graphModels/episode.ts | 19 ++-- .../app/services/knowledgeGraph.server.ts | 2 - .../app/trigger/ingest/ingest-document.ts | 96 +++++++++++-------- apps/webapp/app/trigger/ingest/ingest.ts | 22 +++-- packages/types/src/graph/graph.entity.ts | 6 -- 8 files changed, 124 insertions(+), 115 deletions(-) diff --git a/apps/webapp/app/lib/ingest.server.ts b/apps/webapp/app/lib/ingest.server.ts index f69f2db..2b791a2 100644 --- a/apps/webapp/app/lib/ingest.server.ts +++ b/apps/webapp/app/lib/ingest.server.ts @@ -37,16 +37,11 @@ export const addToQueue = async ( }, }); - let ingestionType = EpisodeType.CONVERSATION; - if (body.documentId) { - ingestionType = EpisodeType.DOCUMENT; - } - let handler; - if (ingestionType === EpisodeType.DOCUMENT) { + if (body.type === EpisodeType.DOCUMENT) { handler = await ingestDocumentTask.trigger( { - body: { ...body, type: ingestionType }, + body, userId, workspaceId: user.Workspace.id, queueId: queuePersist.id, @@ -57,10 +52,10 @@ export const addToQueue = async ( tags: [user.id, queuePersist.id], }, ); - } else if (ingestionType === EpisodeType.CONVERSATION) { + } else if (body.type === EpisodeType.CONVERSATION) { handler = await ingestTask.trigger( { - body: { ...body, type: ingestionType }, + body, userId, workspaceId: user.Workspace.id, queueId: queuePersist.id, diff --git a/apps/webapp/app/services/documentVersioning.server.ts b/apps/webapp/app/services/documentVersioning.server.ts index c158763..14d428f 100644 --- a/apps/webapp/app/services/documentVersioning.server.ts +++ b/apps/webapp/app/services/documentVersioning.server.ts @@ -1,10 +1,13 @@ import crypto from "crypto"; import type { DocumentNode } from "@core/types"; -import { - findExistingDocument, - getDocumentVersions +import { + findExistingDocument, + getDocumentVersions, } from "./graphModels/document"; -import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server"; +import { + DocumentChunker, + type ChunkedDocument, +} from "./documentChunker.server"; import { KnowledgeGraphService } from "./knowledgeGraph.server"; export interface DocumentVersion { @@ -46,37 +49,38 @@ export class DocumentVersioningService { * Prepare a new document version with proper versioning information */ async prepareDocumentVersion( - documentId: string, + sessionId: string, userId: string, title: string, content: string, source: string, metadata: Record = {}, - sessionId?: string, ): Promise<{ documentNode: DocumentNode; versionInfo: VersionedDocumentInfo; chunkedDocument: ChunkedDocument; }> { // Find existing document for version comparison - const existingDocument = await findExistingDocument(documentId, userId); - + const existingDocument = await findExistingDocument(sessionId, userId); + // Chunk the new document content const documentChunker = new DocumentChunker(); const chunkedDocument = await documentChunker.chunkDocument(content, title); - + // Determine version information - const versionInfo = this.analyzeVersionChanges(existingDocument, chunkedDocument); - + const versionInfo = this.analyzeVersionChanges( + existingDocument, + chunkedDocument, + ); + // Create new document node const documentNode = this.createVersionedDocumentNode( - documentId, + sessionId, userId, title, content, source, metadata, - sessionId, versionInfo, chunkedDocument, ); @@ -111,8 +115,9 @@ export class DocumentVersioningService { } // Check if content has actually changed - const hasContentChanged = existingDocument.contentHash !== newChunkedDocument.contentHash; - + const hasContentChanged = + existingDocument.contentHash !== newChunkedDocument.contentHash; + if (!hasContentChanged) { return { isNewDocument: false, @@ -152,13 +157,12 @@ export class DocumentVersioningService { * Create a new versioned document node */ private createVersionedDocumentNode( - documentId: string, + sessionId: string, userId: string, title: string, content: string, source: string, metadata: Record, - sessionId: string | undefined, versionInfo: VersionedDocumentInfo, chunkedDocument: ChunkedDocument, ): DocumentNode { @@ -177,12 +181,11 @@ export class DocumentVersioningService { createdAt: new Date(), validAt: new Date(), totalChunks: chunkedDocument.totalChunks, - documentId, - sessionId, version: versionInfo.newVersion, contentHash: chunkedDocument.contentHash, previousVersionUuid: versionInfo.previousVersionUuid || undefined, chunkHashes: chunkedDocument.chunkHashes, + sessionId, }; } @@ -195,8 +198,8 @@ export class DocumentVersioningService { limit: number = 10, ): Promise { const versions = await getDocumentVersions(documentId, userId, limit); - - return versions.map(doc => ({ + + return versions.map((doc) => ({ uuid: doc.uuid, version: doc.version, contentHash: doc.contentHash, @@ -228,11 +231,14 @@ export class DocumentVersioningService { ]); // Calculate cosine similarity - const similarity = this.calculateCosineSimilarity(oldEmbedding, newEmbedding); - + const similarity = this.calculateCosineSimilarity( + oldEmbedding, + newEmbedding, + ); + // If similarity is below threshold, invalidate old statements const shouldInvalidate = similarity < threshold; - + return { shouldInvalidate, semanticSimilarity: similarity, @@ -293,9 +299,8 @@ export class DocumentVersioningService { } { const totalChunks = versionInfo.chunkLevelChanges.totalChunks; const changePercentage = versionInfo.chunkLevelChanges.changePercentage; - const savingsPercentage = totalChunks > 0 - ? (processingStats.chunksSkipped / totalChunks) * 100 - : 0; + const savingsPercentage = + totalChunks > 0 ? (processingStats.chunksSkipped / totalChunks) * 100 : 0; return { summary: `Document v${versionInfo.newVersion}: ${changePercentage.toFixed(1)}% changed, ${savingsPercentage.toFixed(1)}% processing saved`, @@ -313,4 +318,4 @@ export class DocumentVersioningService { }, }; } -} \ No newline at end of file +} diff --git a/apps/webapp/app/services/graphModels/document.ts b/apps/webapp/app/services/graphModels/document.ts index a173f17..cdfbf38 100644 --- a/apps/webapp/app/services/graphModels/document.ts +++ b/apps/webapp/app/services/graphModels/document.ts @@ -14,7 +14,6 @@ export async function saveDocument(document: DocumentNode): Promise { d.createdAt = $createdAt, d.validAt = $validAt, d.totalChunks = $totalChunks, - d.documentId = $documentId, d.sessionId = $sessionId, d.version = $version, d.contentHash = $contentHash, @@ -27,7 +26,6 @@ export async function saveDocument(document: DocumentNode): Promise { d.source = $source, d.validAt = $validAt, d.totalChunks = $totalChunks, - d.documentId = $documentId, d.sessionId = $sessionId, d.version = $version, d.contentHash = $contentHash, @@ -46,7 +44,6 @@ export async function saveDocument(document: DocumentNode): Promise { createdAt: document.createdAt.toISOString(), validAt: document.validAt.toISOString(), totalChunks: document.totalChunks || 0, - documentId: document.documentId || null, sessionId: document.sessionId || null, version: document.version || 1, contentHash: document.contentHash, @@ -67,8 +64,7 @@ export async function linkEpisodeToDocument( MATCH (e:Episode {uuid: $episodeUuid}) MATCH (d:Document {uuid: $documentUuid}) MERGE (d)-[r:CONTAINS_CHUNK {chunkIndex: $chunkIndex}]->(e) - SET e.documentId = $documentUuid, - e.chunkIndex = $chunkIndex + SET e.chunkIndex = $chunkIndex RETURN r `; @@ -175,24 +171,24 @@ export async function getUserDocuments( * Generate content hash for document versioning */ export function generateContentHash(content: string): string { - return crypto.createHash('sha256').update(content, 'utf8').digest('hex'); + return crypto.createHash("sha256").update(content, "utf8").digest("hex"); } /** * Find existing document by documentId and userId for version comparison */ export async function findExistingDocument( - documentId: string, + sessionId: string, userId: string, ): Promise { const query = ` - MATCH (d:Document {documentId: $documentId, userId: $userId}) + MATCH (d:Document {sessionId: $sessionId, userId: $userId}) RETURN d ORDER BY d.version DESC LIMIT 1 `; - const params = { documentId, userId }; + const params = { sessionId, userId }; const result = await runQuery(query, params); if (result.length === 0) return null; @@ -219,18 +215,18 @@ export async function findExistingDocument( * Get document version history */ export async function getDocumentVersions( - documentId: string, + sessionId: string, userId: string, limit: number = 10, ): Promise { const query = ` - MATCH (d:Document {documentId: $documentId, userId: $userId}) + MATCH (d:Document {sessionId: $sessionId, userId: $userId}) RETURN d ORDER BY d.version DESC LIMIT $limit `; - const params = { documentId, userId, limit }; + const params = { sessionId, userId, limit }; const result = await runQuery(query, params); return result.map((record) => { diff --git a/apps/webapp/app/services/graphModels/episode.ts b/apps/webapp/app/services/graphModels/episode.ts index 90852fa..89d490f 100644 --- a/apps/webapp/app/services/graphModels/episode.ts +++ b/apps/webapp/app/services/graphModels/episode.ts @@ -82,28 +82,23 @@ export async function getRecentEpisodes(params: { userId: string; source?: string; sessionId?: string; - type?: EpisodeType; - documentId?: string; }): Promise { - let filters = `WHERE e.validAt <= $referenceTime - AND e.userId = $userId`; + let filters = `WHERE e.validAt <= $referenceTime`; if (params.source) { filters += `\nAND e.source = $source`; } - if (params.type === EpisodeType.CONVERSATION && params.sessionId) { + if (params.sessionId) { filters += `\nAND e.sessionId = $sessionId`; } - if (params.type === EpisodeType.DOCUMENT && params.documentId) { - filters += `\nAND e.documentId = $documentId`; - } - const query = ` - MATCH (e:Episode) + MATCH (e:Episode{userId: $userId}) ${filters} - RETURN e + MATCH (e)-[:HAS_PROVENANCE]->(s:Statement) + WHERE s.invalidAt IS NULL + RETURN DISTINCT e ORDER BY e.validAt DESC LIMIT ${params.limit} `; @@ -113,7 +108,6 @@ export async function getRecentEpisodes(params: { userId: params.userId, source: params.source || null, sessionId: params.sessionId || null, - documentId: params.documentId || null, }; const result = await runQuery(query, queryParams); @@ -316,6 +310,7 @@ export async function getEpisodeStatements(params: { }) { const query = ` MATCH (episode:Episode {uuid: $episodeUuid, userId: $userId})-[:HAS_PROVENANCE]->(stmt:Statement) + WHERE stmt.invalidAt IS NULL RETURN stmt `; diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index 1dbab6c..ac69c79 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -249,8 +249,6 @@ export class KnowledgeGraphService { userId: params.userId, source: params.source, sessionId: params.sessionId, - type: params.type, - documentId: params.documentId, }); // Format session context from previous episodes diff --git a/apps/webapp/app/trigger/ingest/ingest-document.ts b/apps/webapp/app/trigger/ingest/ingest-document.ts index 1a4b279..5e6ef50 100644 --- a/apps/webapp/app/trigger/ingest/ingest-document.ts +++ b/apps/webapp/app/trigger/ingest/ingest-document.ts @@ -33,7 +33,6 @@ export const ingestDocumentTask = task({ try { logger.log(`Processing document for user ${payload.userId}`, { - documentTitle: payload.body.documentTitle, contentLength: payload.body.episodeBody.length, }); @@ -44,7 +43,7 @@ export const ingestDocumentTask = task({ }, }); - const documentBody = payload.body as any; + const documentBody = payload.body; // Step 1: Initialize services and prepare document version const versioningService = new DocumentVersioningService(); @@ -56,13 +55,12 @@ export const ingestDocumentTask = task({ versionInfo, chunkedDocument, } = await versioningService.prepareDocumentVersion( - documentBody.documentId || crypto.randomUUID(), + documentBody.sessionId!, payload.userId, - documentBody.documentTitle || "Untitled Document", + documentBody.metadata?.documentTitle?.toString() || "Untitled Document", documentBody.episodeBody, documentBody.source, documentBody.metadata || {}, - documentBody.sessionId, ); logger.log(`Document version analysis:`, { @@ -75,11 +73,12 @@ export const ingestDocumentTask = task({ }); // Step 2: Determine processing strategy - const differentialDecision = await differentialService.analyzeDifferentialNeed( - documentBody.episodeBody, - versionInfo.existingDocument, - chunkedDocument, - ); + const differentialDecision = + await differentialService.analyzeDifferentialNeed( + documentBody.episodeBody, + versionInfo.existingDocument, + chunkedDocument, + ); logger.log(`Differential analysis:`, { shouldUseDifferential: differentialDecision.shouldUseDifferential, @@ -94,15 +93,20 @@ export const ingestDocumentTask = task({ // Step 3.1: Invalidate statements from previous document version if it exists let invalidationResults = null; if (versionInfo.existingDocument && versionInfo.hasContentChanged) { - logger.log(`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`); - - invalidationResults = await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion({ - previousDocumentUuid: versionInfo.existingDocument.uuid, - newDocumentContent: documentBody.episodeBody, - userId: payload.userId, - invalidatedBy: document.uuid, - semanticSimilarityThreshold: 0.75, // Configurable threshold - }); + logger.log( + `Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`, + ); + + invalidationResults = + await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion( + { + previousDocumentUuid: versionInfo.existingDocument.uuid, + newDocumentContent: documentBody.episodeBody, + userId: payload.userId, + invalidatedBy: document.uuid, + semanticSimilarityThreshold: 0.75, // Configurable threshold + }, + ); logger.log(`Statement invalidation completed:`, { totalAnalyzed: invalidationResults.totalStatementsAnalyzed, @@ -119,24 +123,32 @@ export const ingestDocumentTask = task({ let chunksToProcess = chunkedDocument.chunks; let processingMode = "full"; - if (differentialDecision.shouldUseDifferential && differentialDecision.strategy === "chunk_level_diff") { + if ( + differentialDecision.shouldUseDifferential && + differentialDecision.strategy === "chunk_level_diff" + ) { // Only process changed chunks const chunkComparisons = differentialService.getChunkComparisons( versionInfo.existingDocument!, chunkedDocument, ); - - const changedIndices = differentialService.getChunksNeedingReprocessing(chunkComparisons); - chunksToProcess = chunkedDocument.chunks.filter(chunk => - changedIndices.includes(chunk.chunkIndex) + + const changedIndices = + differentialService.getChunksNeedingReprocessing(chunkComparisons); + chunksToProcess = chunkedDocument.chunks.filter((chunk) => + changedIndices.includes(chunk.chunkIndex), ); processingMode = "differential"; - logger.log(`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`); + logger.log( + `Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`, + ); } else if (differentialDecision.strategy === "full_reingest") { // Process all chunks processingMode = "full"; - logger.log(`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`); + logger.log( + `Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`, + ); } // Step 5: Queue chunks for processing @@ -150,14 +162,16 @@ export const ingestDocumentTask = task({ processingMode, differentialStrategy: differentialDecision.strategy, chunkHash: chunk.contentHash, + documentTitle: + documentBody.metadata?.documentTitle?.toString() || + "Untitled Document", + chunkIndex: chunk.chunkIndex, + documentUuid: document.uuid, }, source: documentBody.source, spaceId: documentBody.spaceId, sessionId: documentBody.sessionId, type: EpisodeTypeEnum.DOCUMENT, - documentTitle: documentBody.documentTitle, - documentId: document.uuid, // Use the new document UUID - chunkIndex: chunk.chunkIndex, }; const episodeHandler = await ingestTask.trigger( @@ -205,11 +219,13 @@ export const ingestDocumentTask = task({ processingMode, differentialStrategy: differentialDecision.strategy, estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, - statementInvalidation: invalidationResults ? { - totalAnalyzed: invalidationResults.totalStatementsAnalyzed, - invalidated: invalidationResults.invalidatedStatements.length, - preserved: invalidationResults.preservedStatements.length, - } : null, + statementInvalidation: invalidationResults + ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } + : null, episodes: [], episodeHandlers, }, @@ -230,11 +246,13 @@ export const ingestDocumentTask = task({ chunksSkipped: costSavings.chunksSkipped, estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`, - statementInvalidation: invalidationResults ? { - totalAnalyzed: invalidationResults.totalStatementsAnalyzed, - invalidated: invalidationResults.invalidatedStatements.length, - preserved: invalidationResults.preservedStatements.length, - } : "No previous version", + statementInvalidation: invalidationResults + ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } + : "No previous version", }, ); diff --git a/apps/webapp/app/trigger/ingest/ingest.ts b/apps/webapp/app/trigger/ingest/ingest.ts index cd23aef..4fa8479 100644 --- a/apps/webapp/app/trigger/ingest/ingest.ts +++ b/apps/webapp/app/trigger/ingest/ingest.ts @@ -19,8 +19,6 @@ export const IngestBodyRequest = z.object({ type: z .enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT]) .default(EpisodeType.CONVERSATION), - documentTitle: z.string().optional(), - documentId: z.string().optional(), }); const ingestionQueue = queue({ @@ -62,16 +60,26 @@ export const ingestTask = task({ ); // Link episode to document if it's a document chunk - if (episodeBody.type === EpisodeType.DOCUMENT && episodeBody.documentId && episodeDetails.episodeUuid) { + if ( + episodeBody.type === EpisodeType.DOCUMENT && + episodeBody.metadata.documentUuid && + episodeDetails.episodeUuid + ) { try { await linkEpisodeToDocument( episodeDetails.episodeUuid, - episodeBody.documentId, - episodeBody.chunkIndex || 0, + episodeBody.metadata.documentUuid, + episodeBody.metadata.chunkIndex || 0, + ); + logger.log( + `Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.metadata.documentUuid} at chunk ${episodeBody.metadata.chunkIndex || 0}`, ); - logger.log(`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.documentId} at chunk ${episodeBody.chunkIndex || 0}`); } catch (error) { - logger.error(`Failed to link episode to document:`, { error, episodeUuid: episodeDetails.episodeUuid, documentId: episodeBody.documentId }); + logger.error(`Failed to link episode to document:`, { + error, + episodeUuid: episodeDetails.episodeUuid, + documentUuid: episodeBody.metadata.documentUuid, + }); } } diff --git a/packages/types/src/graph/graph.entity.ts b/packages/types/src/graph/graph.entity.ts index c5c986b..8838d3f 100644 --- a/packages/types/src/graph/graph.entity.ts +++ b/packages/types/src/graph/graph.entity.ts @@ -12,7 +12,6 @@ export interface DocumentNode { createdAt: Date; validAt: Date; totalChunks: number; - documentId?: string; sessionId?: string; // Version tracking for differential ingestion version: number; @@ -39,7 +38,6 @@ export interface EpisodicNode { space?: string; sessionId?: string; recallCount?: number; - documentId?: string; chunkIndex?: number; // Index of this chunk within the document } @@ -113,10 +111,6 @@ export type AddEpisodeParams = { spaceId?: string; sessionId?: string; type?: EpisodeType; - documentTitle?: string; - documentId?: string; - chunkIndex?: number; - chunkContext?: string; }; export type AddEpisodeResult = {