From ec80667c24d2f1c1ce082f25a684655bd1811eef Mon Sep 17 00:00:00 2001 From: Manoj K Date: Tue, 2 Sep 2025 17:18:25 +0530 Subject: [PATCH] Feat: add versioning to documents --- .../app/services/documentChunker.server.ts | 53 +++ .../app/services/documentDiffer.server.ts | 204 +++++++++++ .../app/services/documentVersioning.server.ts | 316 ++++++++++++++++++ .../app/services/graphModels/document.ts | 107 +++++- .../app/services/graphModels/episode.ts | 3 + .../app/services/knowledgeGraph.server.ts | 149 +++++++++ .../app/trigger/ingest/ingest-document.ts | 162 +++++++-- apps/webapp/app/trigger/ingest/ingest.ts | 15 + packages/types/src/graph/graph.entity.ts | 5 + 9 files changed, 983 insertions(+), 31 deletions(-) create mode 100644 apps/webapp/app/services/documentDiffer.server.ts create mode 100644 apps/webapp/app/services/documentVersioning.server.ts diff --git a/apps/webapp/app/services/documentChunker.server.ts b/apps/webapp/app/services/documentChunker.server.ts index 36e9cf4..63cc99f 100644 --- a/apps/webapp/app/services/documentChunker.server.ts +++ b/apps/webapp/app/services/documentChunker.server.ts @@ -8,6 +8,7 @@ export interface DocumentChunk { context?: string; startPosition: number; endPosition: number; + contentHash: string; // Hash for change detection } export interface ChunkedDocument { @@ -16,6 +17,8 @@ export interface ChunkedDocument { originalContent: string; chunks: DocumentChunk[]; totalChunks: number; + contentHash: string; // Hash of the entire document + chunkHashes: string[]; // Array of chunk hashes for change detection } /** @@ -36,6 +39,7 @@ export class DocumentChunker { title: string, ): Promise { const documentId = crypto.randomUUID(); + const contentHash = this.generateContentHash(originalContent); // First, split by major section headers (markdown style) const majorSections = this.splitByMajorSections(originalContent); @@ -107,12 +111,17 @@ export class DocumentChunker { )); } + // Generate chunk hashes array + const chunkHashes = chunks.map(chunk => chunk.contentHash); + return { documentId, title, originalContent, chunks, totalChunks: chunks.length, + contentHash, + chunkHashes, }; } @@ -235,6 +244,7 @@ export class DocumentChunker { ): DocumentChunk { // Generate a concise context/title if not provided const context = title || this.generateChunkContext(content); + const contentHash = this.generateContentHash(content.trim()); return { content: content.trim(), @@ -243,6 +253,7 @@ export class DocumentChunker { context: `Chunk ${chunkIndex + 1}${context ? `: ${context}` : ""}`, startPosition, endPosition, + contentHash, }; } @@ -259,4 +270,46 @@ export class DocumentChunker { return "Document content"; } + + /** + * Generate content hash for change detection + */ + private generateContentHash(content: string): string { + return crypto.createHash('sha256').update(content, 'utf8').digest('hex').substring(0, 16); + } + + /** + * Compare chunk hashes to detect changes + */ + static compareChunkHashes(oldHashes: string[], newHashes: string[]): { + changedIndices: number[]; + changePercentage: number; + } { + const maxLength = Math.max(oldHashes.length, newHashes.length); + const changedIndices: number[] = []; + + for (let i = 0; i < maxLength; i++) { + const oldHash = oldHashes[i]; + const newHash = newHashes[i]; + + // Mark as changed if hash is different or chunk added/removed + if (oldHash !== newHash) { + changedIndices.push(i); + } + } + + const changePercentage = maxLength > 0 ? (changedIndices.length / maxLength) * 100 : 0; + + return { + changedIndices, + changePercentage, + }; + } + + /** + * Calculate document size in tokens for threshold decisions + */ + static getDocumentSizeInTokens(content: string): number { + return encode(content).length; + } } \ No newline at end of file diff --git a/apps/webapp/app/services/documentDiffer.server.ts b/apps/webapp/app/services/documentDiffer.server.ts new file mode 100644 index 0000000..c970c70 --- /dev/null +++ b/apps/webapp/app/services/documentDiffer.server.ts @@ -0,0 +1,204 @@ +import { encode } from "gpt-tokenizer"; +import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server"; +import type { DocumentNode } from "@core/types"; + +export interface DifferentialDecision { + shouldUseDifferential: boolean; + strategy: "full_reingest" | "chunk_level_diff" | "new_document"; + reason: string; + changedChunkIndices: number[]; + changePercentage: number; + documentSizeTokens: number; +} + +export interface ChunkComparison { + chunkIndex: number; + hasChanged: boolean; + oldHash?: string; + newHash: string; + semanticSimilarity?: number; +} + +/** + * Service for implementing differential document processing logic + * Determines when to use differential vs full re-ingestion based on + * document size and change percentage thresholds + */ +export class DocumentDifferentialService { + // Threshold constants based on our enhanced approach + private readonly SMALL_DOC_THRESHOLD = 5 * 1000; // 5K tokens + private readonly MEDIUM_DOC_THRESHOLD = 50 * 1000; // 50K tokens + + // Change percentage thresholds + private readonly SMALL_CHANGE_THRESHOLD = 20; // 20% + private readonly MEDIUM_CHANGE_THRESHOLD = 30; // 30% + + /** + * Analyze whether to use differential processing for a document update + */ + async analyzeDifferentialNeed( + newContent: string, + existingDocument: DocumentNode | null, + newChunkedDocument: ChunkedDocument, + ): Promise { + // If no existing document, it's a new document + if (!existingDocument) { + return { + shouldUseDifferential: false, + strategy: "new_document", + reason: "No existing document found", + changedChunkIndices: [], + changePercentage: 100, + documentSizeTokens: encode(newContent).length, + }; + } + + const documentSizeTokens = encode(newContent).length; + + // Quick content hash comparison + if (existingDocument.contentHash === newChunkedDocument.contentHash) { + return { + shouldUseDifferential: false, + strategy: "full_reingest", // No changes detected + reason: "Document content unchanged", + changedChunkIndices: [], + changePercentage: 0, + documentSizeTokens, + }; + } + + // Compare chunk hashes to identify changes + const chunkComparison = DocumentChunker.compareChunkHashes( + existingDocument.chunkHashes || [], + newChunkedDocument.chunkHashes, + ); + + const { changedIndices, changePercentage } = chunkComparison; + + // Apply threshold-based decision matrix + const decision = this.applyThresholdDecision( + documentSizeTokens, + changePercentage, + changedIndices, + ); + + return { + ...decision, + changedChunkIndices: changedIndices, + changePercentage, + documentSizeTokens, + }; + } + + /** + * Apply threshold-based decision matrix + */ + private applyThresholdDecision( + documentSizeTokens: number, + changePercentage: number, + changedIndices: number[], + ): Pick { + // Small documents: always full re-ingest (cheap) + if (documentSizeTokens < this.SMALL_DOC_THRESHOLD) { + return { + shouldUseDifferential: false, + strategy: "full_reingest", + reason: `Document too small (${documentSizeTokens} tokens < ${this.SMALL_DOC_THRESHOLD})`, + }; + } + + // Medium documents (5-50K tokens) + if (documentSizeTokens < this.MEDIUM_DOC_THRESHOLD) { + if (changePercentage < this.SMALL_CHANGE_THRESHOLD) { + return { + shouldUseDifferential: true, + strategy: "chunk_level_diff", + reason: `Medium document with small changes (${changePercentage.toFixed(1)}% < ${this.SMALL_CHANGE_THRESHOLD}%)`, + }; + } else { + return { + shouldUseDifferential: false, + strategy: "full_reingest", + reason: `Medium document with large changes (${changePercentage.toFixed(1)}% >= ${this.SMALL_CHANGE_THRESHOLD}%)`, + }; + } + } + + // Large documents (>50K tokens) + if (changePercentage < this.MEDIUM_CHANGE_THRESHOLD) { + return { + shouldUseDifferential: true, + strategy: "chunk_level_diff", + reason: `Large document with moderate changes (${changePercentage.toFixed(1)}% < ${this.MEDIUM_CHANGE_THRESHOLD}%)`, + }; + } else { + return { + shouldUseDifferential: false, + strategy: "full_reingest", + reason: `Large document with extensive changes (${changePercentage.toFixed(1)}% >= ${this.MEDIUM_CHANGE_THRESHOLD}%)`, + }; + } + } + + /** + * Get detailed chunk comparison for differential processing + */ + getChunkComparisons( + existingDocument: DocumentNode, + newChunkedDocument: ChunkedDocument, + ): ChunkComparison[] { + const oldHashes = existingDocument.chunkHashes || []; + const newHashes = newChunkedDocument.chunkHashes; + const maxLength = Math.max(oldHashes.length, newHashes.length); + + const comparisons: ChunkComparison[] = []; + + for (let i = 0; i < maxLength; i++) { + const oldHash = oldHashes[i]; + const newHash = newHashes[i]; + + comparisons.push({ + chunkIndex: i, + hasChanged: oldHash !== newHash, + oldHash, + newHash: newHash || "", // Handle case where new doc has fewer chunks + }); + } + + return comparisons; + } + + /** + * Filter chunks that need re-processing + */ + getChunksNeedingReprocessing( + chunkComparisons: ChunkComparison[], + ): number[] { + return chunkComparisons + .filter(comparison => comparison.hasChanged) + .map(comparison => comparison.chunkIndex); + } + + /** + * Calculate processing cost savings estimate + */ + calculateCostSavings( + totalChunks: number, + changedChunks: number, + ): { + chunksToProcess: number; + chunksSkipped: number; + estimatedSavingsPercentage: number; + } { + const chunksSkipped = totalChunks - changedChunks; + const estimatedSavingsPercentage = totalChunks > 0 + ? (chunksSkipped / totalChunks) * 100 + : 0; + + return { + chunksToProcess: changedChunks, + chunksSkipped, + estimatedSavingsPercentage, + }; + } +} \ No newline at end of file diff --git a/apps/webapp/app/services/documentVersioning.server.ts b/apps/webapp/app/services/documentVersioning.server.ts new file mode 100644 index 0000000..c158763 --- /dev/null +++ b/apps/webapp/app/services/documentVersioning.server.ts @@ -0,0 +1,316 @@ +import crypto from "crypto"; +import type { DocumentNode } from "@core/types"; +import { + findExistingDocument, + getDocumentVersions +} from "./graphModels/document"; +import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server"; +import { KnowledgeGraphService } from "./knowledgeGraph.server"; + +export interface DocumentVersion { + uuid: string; + version: number; + contentHash: string; + chunkHashes: string[]; + createdAt: Date; + validAt: Date; + title: string; + metadata: Record; +} + +export interface VersionedDocumentInfo { + isNewDocument: boolean; + existingDocument: DocumentNode | null; + newVersion: number; + previousVersionUuid: string | null; + hasContentChanged: boolean; + chunkLevelChanges: { + changedChunkIndices: number[]; + changePercentage: number; + totalChunks: number; + }; +} + +/** + * Service for managing document versions and coordinating differential ingestion + * Integrates with the knowledge graph for semantic similarity checks + */ +export class DocumentVersioningService { + private knowledgeGraphService: KnowledgeGraphService; + + constructor() { + this.knowledgeGraphService = new KnowledgeGraphService(); + } + + /** + * Prepare a new document version with proper versioning information + */ + async prepareDocumentVersion( + documentId: string, + userId: string, + title: string, + content: string, + source: string, + metadata: Record = {}, + sessionId?: string, + ): Promise<{ + documentNode: DocumentNode; + versionInfo: VersionedDocumentInfo; + chunkedDocument: ChunkedDocument; + }> { + // Find existing document for version comparison + const existingDocument = await findExistingDocument(documentId, userId); + + // Chunk the new document content + const documentChunker = new DocumentChunker(); + const chunkedDocument = await documentChunker.chunkDocument(content, title); + + // Determine version information + const versionInfo = this.analyzeVersionChanges(existingDocument, chunkedDocument); + + // Create new document node + const documentNode = this.createVersionedDocumentNode( + documentId, + userId, + title, + content, + source, + metadata, + sessionId, + versionInfo, + chunkedDocument, + ); + + return { + documentNode, + versionInfo, + chunkedDocument, + }; + } + + /** + * Analyze changes between existing and new document versions + */ + private analyzeVersionChanges( + existingDocument: DocumentNode | null, + newChunkedDocument: ChunkedDocument, + ): VersionedDocumentInfo { + if (!existingDocument) { + return { + isNewDocument: true, + existingDocument: null, + newVersion: 1, + previousVersionUuid: null, + hasContentChanged: true, + chunkLevelChanges: { + changedChunkIndices: [], + changePercentage: 100, + totalChunks: newChunkedDocument.totalChunks, + }, + }; + } + + // Check if content has actually changed + const hasContentChanged = existingDocument.contentHash !== newChunkedDocument.contentHash; + + if (!hasContentChanged) { + return { + isNewDocument: false, + existingDocument, + newVersion: existingDocument.version, + previousVersionUuid: existingDocument.uuid, + hasContentChanged: false, + chunkLevelChanges: { + changedChunkIndices: [], + changePercentage: 0, + totalChunks: newChunkedDocument.totalChunks, + }, + }; + } + + // Analyze chunk-level changes + const chunkComparison = DocumentChunker.compareChunkHashes( + existingDocument.chunkHashes || [], + newChunkedDocument.chunkHashes, + ); + + return { + isNewDocument: false, + existingDocument, + newVersion: existingDocument.version + 1, + previousVersionUuid: existingDocument.uuid, + hasContentChanged: true, + chunkLevelChanges: { + changedChunkIndices: chunkComparison.changedIndices, + changePercentage: chunkComparison.changePercentage, + totalChunks: newChunkedDocument.totalChunks, + }, + }; + } + + /** + * Create a new versioned document node + */ + private createVersionedDocumentNode( + documentId: string, + userId: string, + title: string, + content: string, + source: string, + metadata: Record, + sessionId: string | undefined, + versionInfo: VersionedDocumentInfo, + chunkedDocument: ChunkedDocument, + ): DocumentNode { + return { + uuid: crypto.randomUUID(), + title, + originalContent: content, + metadata: { + ...metadata, + chunkingStrategy: "semantic_sections", + targetChunkSize: 12500, + actualChunks: chunkedDocument.totalChunks, + }, + source, + userId, + createdAt: new Date(), + validAt: new Date(), + totalChunks: chunkedDocument.totalChunks, + documentId, + sessionId, + version: versionInfo.newVersion, + contentHash: chunkedDocument.contentHash, + previousVersionUuid: versionInfo.previousVersionUuid || undefined, + chunkHashes: chunkedDocument.chunkHashes, + }; + } + + /** + * Get version history for a document + */ + async getDocumentHistory( + documentId: string, + userId: string, + limit: number = 10, + ): Promise { + const versions = await getDocumentVersions(documentId, userId, limit); + + return versions.map(doc => ({ + uuid: doc.uuid, + version: doc.version, + contentHash: doc.contentHash, + chunkHashes: doc.chunkHashes || [], + createdAt: doc.createdAt, + validAt: doc.validAt, + title: doc.title, + metadata: doc.metadata, + })); + } + + /** + * Check if statements should be invalidated based on semantic similarity + * This implements the semantic similarity gate (>0.85 threshold) + */ + async checkStatementInvalidation( + oldChunkContent: string, + newChunkContent: string, + threshold: number = 0.85, + ): Promise<{ + shouldInvalidate: boolean; + semanticSimilarity: number; + }> { + try { + // Generate embeddings for both chunks + const [oldEmbedding, newEmbedding] = await Promise.all([ + this.knowledgeGraphService.getEmbedding(oldChunkContent), + this.knowledgeGraphService.getEmbedding(newChunkContent), + ]); + + // Calculate cosine similarity + const similarity = this.calculateCosineSimilarity(oldEmbedding, newEmbedding); + + // If similarity is below threshold, invalidate old statements + const shouldInvalidate = similarity < threshold; + + return { + shouldInvalidate, + semanticSimilarity: similarity, + }; + } catch (error) { + console.error("Error checking statement invalidation:", error); + // On error, be conservative and invalidate + return { + shouldInvalidate: true, + semanticSimilarity: 0, + }; + } + } + + /** + * Calculate cosine similarity between two embedding vectors + */ + private calculateCosineSimilarity(vecA: number[], vecB: number[]): number { + if (vecA.length !== vecB.length) { + throw new Error("Vector dimensions must match"); + } + + let dotProduct = 0; + let normA = 0; + let normB = 0; + + for (let i = 0; i < vecA.length; i++) { + dotProduct += vecA[i] * vecB[i]; + normA += vecA[i] * vecA[i]; + normB += vecB[i] * vecB[i]; + } + + normA = Math.sqrt(normA); + normB = Math.sqrt(normB); + + if (normA === 0 || normB === 0) { + return 0; + } + + return dotProduct / (normA * normB); + } + + /** + * Generate a differential processing report + */ + generateDifferentialReport( + versionInfo: VersionedDocumentInfo, + processingStats: { + chunksProcessed: number; + chunksSkipped: number; + statementsCreated: number; + statementsInvalidated: number; + processingTimeMs: number; + }, + ): { + summary: string; + metrics: Record; + } { + const totalChunks = versionInfo.chunkLevelChanges.totalChunks; + const changePercentage = versionInfo.chunkLevelChanges.changePercentage; + const savingsPercentage = totalChunks > 0 + ? (processingStats.chunksSkipped / totalChunks) * 100 + : 0; + + return { + summary: `Document v${versionInfo.newVersion}: ${changePercentage.toFixed(1)}% changed, ${savingsPercentage.toFixed(1)}% processing saved`, + metrics: { + version: versionInfo.newVersion, + isNewDocument: versionInfo.isNewDocument, + totalChunks, + chunksChanged: processingStats.chunksProcessed, + chunksSkipped: processingStats.chunksSkipped, + changePercentage: changePercentage, + processingTimeMs: processingStats.processingTimeMs, + statementsCreated: processingStats.statementsCreated, + statementsInvalidated: processingStats.statementsInvalidated, + estimatedCostSavings: savingsPercentage, + }, + }; + } +} \ No newline at end of file diff --git a/apps/webapp/app/services/graphModels/document.ts b/apps/webapp/app/services/graphModels/document.ts index a35a9fc..a173f17 100644 --- a/apps/webapp/app/services/graphModels/document.ts +++ b/apps/webapp/app/services/graphModels/document.ts @@ -1,5 +1,6 @@ import { runQuery } from "~/lib/neo4j.server"; import type { DocumentNode } from "@core/types"; +import crypto from "crypto"; export async function saveDocument(document: DocumentNode): Promise { const query = ` @@ -14,7 +15,11 @@ export async function saveDocument(document: DocumentNode): Promise { d.validAt = $validAt, d.totalChunks = $totalChunks, d.documentId = $documentId, - d.sessionId = $sessionId + d.sessionId = $sessionId, + d.version = $version, + d.contentHash = $contentHash, + d.previousVersionUuid = $previousVersionUuid, + d.chunkHashes = $chunkHashes ON MATCH SET d.title = $title, d.originalContent = $originalContent, @@ -23,7 +28,11 @@ export async function saveDocument(document: DocumentNode): Promise { d.validAt = $validAt, d.totalChunks = $totalChunks, d.documentId = $documentId, - d.sessionId = $sessionId + d.sessionId = $sessionId, + d.version = $version, + d.contentHash = $contentHash, + d.previousVersionUuid = $previousVersionUuid, + d.chunkHashes = $chunkHashes RETURN d.uuid as uuid `; @@ -39,6 +48,10 @@ export async function saveDocument(document: DocumentNode): Promise { totalChunks: document.totalChunks || 0, documentId: document.documentId || null, sessionId: document.sessionId || null, + version: document.version || 1, + contentHash: document.contentHash, + previousVersionUuid: document.previousVersionUuid || null, + chunkHashes: document.chunkHashes || [], }; const result = await runQuery(query, params); @@ -94,6 +107,10 @@ export async function getDocument( createdAt: new Date(documentNode.properties.createdAt), validAt: new Date(documentNode.properties.validAt), totalChunks: documentNode.properties.totalChunks, + version: documentNode.properties.version || 1, + contentHash: documentNode.properties.contentHash || "", + previousVersionUuid: documentNode.properties.previousVersionUuid || null, + chunkHashes: documentNode.properties.chunkHashes || [], }; } @@ -146,6 +163,92 @@ export async function getUserDocuments( createdAt: new Date(documentNode.properties.createdAt), validAt: new Date(documentNode.properties.validAt), totalChunks: documentNode.properties.totalChunks, + version: documentNode.properties.version || 1, + contentHash: documentNode.properties.contentHash || "", + previousVersionUuid: documentNode.properties.previousVersionUuid || null, + chunkHashes: documentNode.properties.chunkHashes || [], + }; + }); +} + +/** + * Generate content hash for document versioning + */ +export function generateContentHash(content: string): string { + return crypto.createHash('sha256').update(content, 'utf8').digest('hex'); +} + +/** + * Find existing document by documentId and userId for version comparison + */ +export async function findExistingDocument( + documentId: string, + userId: string, +): Promise { + const query = ` + MATCH (d:Document {documentId: $documentId, userId: $userId}) + RETURN d + ORDER BY d.version DESC + LIMIT 1 + `; + + const params = { documentId, userId }; + const result = await runQuery(query, params); + + if (result.length === 0) return null; + + const documentNode = result[0].get("d"); + return { + uuid: documentNode.properties.uuid, + title: documentNode.properties.title, + originalContent: documentNode.properties.originalContent, + metadata: JSON.parse(documentNode.properties.metadata || "{}"), + source: documentNode.properties.source, + userId: documentNode.properties.userId, + createdAt: new Date(documentNode.properties.createdAt), + validAt: new Date(documentNode.properties.validAt), + totalChunks: documentNode.properties.totalChunks, + version: documentNode.properties.version || 1, + contentHash: documentNode.properties.contentHash || "", + previousVersionUuid: documentNode.properties.previousVersionUuid || null, + chunkHashes: documentNode.properties.chunkHashes || [], + }; +} + +/** + * Get document version history + */ +export async function getDocumentVersions( + documentId: string, + userId: string, + limit: number = 10, +): Promise { + const query = ` + MATCH (d:Document {documentId: $documentId, userId: $userId}) + RETURN d + ORDER BY d.version DESC + LIMIT $limit + `; + + const params = { documentId, userId, limit }; + const result = await runQuery(query, params); + + return result.map((record) => { + const documentNode = record.get("d"); + return { + uuid: documentNode.properties.uuid, + title: documentNode.properties.title, + originalContent: documentNode.properties.originalContent, + metadata: JSON.parse(documentNode.properties.metadata || "{}"), + source: documentNode.properties.source, + userId: documentNode.properties.userId, + createdAt: new Date(documentNode.properties.createdAt), + validAt: new Date(documentNode.properties.validAt), + totalChunks: documentNode.properties.totalChunks, + version: documentNode.properties.version || 1, + contentHash: documentNode.properties.contentHash || "", + previousVersionUuid: documentNode.properties.previousVersionUuid || null, + chunkHashes: documentNode.properties.chunkHashes || [], }; }); } diff --git a/apps/webapp/app/services/graphModels/episode.ts b/apps/webapp/app/services/graphModels/episode.ts index b721d99..90852fa 100644 --- a/apps/webapp/app/services/graphModels/episode.ts +++ b/apps/webapp/app/services/graphModels/episode.ts @@ -113,6 +113,7 @@ export async function getRecentEpisodes(params: { userId: params.userId, source: params.source || null, sessionId: params.sessionId || null, + documentId: params.documentId || null, }; const result = await runQuery(query, queryParams); @@ -132,6 +133,7 @@ export async function getRecentEpisodes(params: { userId: episode.userId, space: episode.space, sessionId: episode.sessionId, + documentId: episode.documentId, }; }); } @@ -176,6 +178,7 @@ export async function searchEpisodesByEmbedding(params: { ? JSON.parse(episode.attributesJson) : {}, userId: episode.userId, + documentId: episode.documentId, }; }); } diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index aa05d3d..425c1f7 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -44,6 +44,7 @@ import { searchStatementsByEmbedding, } from "./graphModels/statement"; import { getEmbedding, makeModelCall } from "~/lib/model.server"; +import { runQuery } from "~/lib/neo4j.server"; import { Apps, getNodeTypes, @@ -67,6 +68,154 @@ export class KnowledgeGraphService { return getEmbedding(text); } + /** + * Invalidate statements from a previous document version that are no longer supported + * by the new document content using semantic similarity analysis + */ + async invalidateStatementsFromPreviousDocumentVersion(params: { + previousDocumentUuid: string; + newDocumentContent: string; + userId: string; + invalidatedBy: string; + semanticSimilarityThreshold?: number; + }): Promise<{ + invalidatedStatements: string[]; + preservedStatements: string[]; + totalStatementsAnalyzed: number; + }> { + const threshold = params.semanticSimilarityThreshold || 0.75; // Lower threshold for document-level analysis + const invalidatedStatements: string[] = []; + const preservedStatements: string[] = []; + + // Step 1: Get all statements from the previous document version + const previousStatements = await this.getStatementsFromDocument( + params.previousDocumentUuid, + params.userId, + ); + + if (previousStatements.length === 0) { + return { + invalidatedStatements: [], + preservedStatements: [], + totalStatementsAnalyzed: 0, + }; + } + + logger.log(`Analyzing ${previousStatements.length} statements from previous document version`); + + // Step 2: Generate embedding for new document content + const newDocumentEmbedding = await this.getEmbedding(params.newDocumentContent); + + // Step 3: For each statement, check if it's still semantically supported by new content + for (const statement of previousStatements) { + try { + // Generate embedding for the statement fact + const statementEmbedding = await this.getEmbedding(statement.fact); + + // Calculate semantic similarity between statement and new document + const semanticSimilarity = this.calculateCosineSimilarity( + statementEmbedding, + newDocumentEmbedding, + ); + + if (semanticSimilarity < threshold) { + invalidatedStatements.push(statement.uuid); + logger.log(`Invalidating statement: "${statement.fact}" (similarity: ${semanticSimilarity.toFixed(3)})`); + } else { + preservedStatements.push(statement.uuid); + logger.log(`Preserving statement: "${statement.fact}" (similarity: ${semanticSimilarity.toFixed(3)})`); + } + } catch (error) { + logger.error(`Error analyzing statement ${statement.uuid}:`, { error }); + // On error, be conservative and invalidate + invalidatedStatements.push(statement.uuid); + } + } + + // Step 4: Bulk invalidate the selected statements + if (invalidatedStatements.length > 0) { + await invalidateStatements({ + statementIds: invalidatedStatements, + invalidatedBy: params.invalidatedBy, + }); + + logger.log(`Document-level invalidation completed`, { + previousDocumentUuid: params.previousDocumentUuid, + totalAnalyzed: previousStatements.length, + invalidated: invalidatedStatements.length, + preserved: preservedStatements.length, + threshold, + }); + } + + return { + invalidatedStatements, + preservedStatements, + totalStatementsAnalyzed: previousStatements.length, + }; + } + + /** + * Get all statements that were created from episodes linked to a specific document + */ + private async getStatementsFromDocument( + documentUuid: string, + userId: string, + ): Promise { + const query = ` + MATCH (doc:Document {uuid: $documentUuid, userId: $userId})-[:CONTAINS_CHUNK]->(episode:Episode) + MATCH (episode)-[:HAS_PROVENANCE]->(stmt:Statement) + RETURN stmt + `; + + const result = await runQuery(query, { + documentUuid, + userId, + }); + + return result.map((record) => { + const stmt = record.get("stmt").properties; + return { + uuid: stmt.uuid, + fact: stmt.fact, + factEmbedding: stmt.factEmbedding || [], + createdAt: new Date(stmt.createdAt), + validAt: new Date(stmt.validAt), + invalidAt: stmt.invalidAt ? new Date(stmt.invalidAt) : null, + attributes: stmt.attributesJson ? JSON.parse(stmt.attributesJson) : {}, + userId: stmt.userId, + }; + }); + } + + /** + * Calculate cosine similarity between two embedding vectors + */ + private calculateCosineSimilarity(vecA: number[], vecB: number[]): number { + if (vecA.length !== vecB.length) { + throw new Error("Vector dimensions must match"); + } + + let dotProduct = 0; + let normA = 0; + let normB = 0; + + for (let i = 0; i < vecA.length; i++) { + dotProduct += vecA[i] * vecB[i]; + normA += vecA[i] * vecA[i]; + normB += vecB[i] * vecB[i]; + } + + normA = Math.sqrt(normA); + normB = Math.sqrt(normB); + + if (normA === 0 || normB === 0) { + return 0; + } + + return dotProduct / (normA * normB); + } + /** * Process an episode and update the knowledge graph. * diff --git a/apps/webapp/app/trigger/ingest/ingest-document.ts b/apps/webapp/app/trigger/ingest/ingest-document.ts index 1ca7a75..1a4b279 100644 --- a/apps/webapp/app/trigger/ingest/ingest-document.ts +++ b/apps/webapp/app/trigger/ingest/ingest-document.ts @@ -3,11 +3,13 @@ import { type z } from "zod"; import crypto from "crypto"; import { IngestionStatus } from "@core/database"; -import { EpisodeTypeEnum, type DocumentNode } from "@core/types"; +import { EpisodeTypeEnum } from "@core/types"; import { logger } from "~/services/logger.service"; -import { DocumentChunker } from "~/services/documentChunker.server"; import { saveDocument } from "~/services/graphModels/document"; import { type IngestBodyRequest } from "~/lib/ingest.server"; +import { DocumentVersioningService } from "~/services/documentVersioning.server"; +import { DocumentDifferentialService } from "~/services/documentDiffer.server"; +import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; import { prisma } from "../utils/prisma"; import { ingestTask } from "./ingest"; @@ -44,46 +46,117 @@ export const ingestDocumentTask = task({ const documentBody = payload.body as any; - // Step 1: Create document node - const document: DocumentNode = { - uuid: crypto.randomUUID(), - title: documentBody.documentTitle || "Untitled Document", - originalContent: documentBody.episodeBody, - metadata: documentBody.metadata || {}, - source: documentBody.source, - userId: payload.userId, - createdAt: new Date(), - validAt: new Date(documentBody.referenceTime), - totalChunks: 0, - documentId: documentBody.documentId, - sessionId: documentBody.sessionId, - }; + // Step 1: Initialize services and prepare document version + const versioningService = new DocumentVersioningService(); + const differentialService = new DocumentDifferentialService(); + const knowledgeGraphService = new KnowledgeGraphService(); + const { + documentNode: document, + versionInfo, + chunkedDocument, + } = await versioningService.prepareDocumentVersion( + documentBody.documentId || crypto.randomUUID(), + payload.userId, + documentBody.documentTitle || "Untitled Document", + documentBody.episodeBody, + documentBody.source, + documentBody.metadata || {}, + documentBody.sessionId, + ); + + logger.log(`Document version analysis:`, { + version: versionInfo.newVersion, + isNewDocument: versionInfo.isNewDocument, + hasContentChanged: versionInfo.hasContentChanged, + changePercentage: versionInfo.chunkLevelChanges.changePercentage, + changedChunks: versionInfo.chunkLevelChanges.changedChunkIndices.length, + totalChunks: versionInfo.chunkLevelChanges.totalChunks, + }); + + // Step 2: Determine processing strategy + const differentialDecision = await differentialService.analyzeDifferentialNeed( + documentBody.episodeBody, + versionInfo.existingDocument, + chunkedDocument, + ); + + logger.log(`Differential analysis:`, { + shouldUseDifferential: differentialDecision.shouldUseDifferential, + strategy: differentialDecision.strategy, + reason: differentialDecision.reason, + documentSizeTokens: differentialDecision.documentSizeTokens, + }); + + // Step 3: Save the new document version await saveDocument(document); - // Step 2: Chunk the document - const documentChunker = new DocumentChunker(); - const chunkedDocument = await documentChunker.chunkDocument( - documentBody.episodeBody, - documentBody.documentTitle, - ); + // Step 3.1: Invalidate statements from previous document version if it exists + let invalidationResults = null; + if (versionInfo.existingDocument && versionInfo.hasContentChanged) { + logger.log(`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`); + + invalidationResults = await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion({ + previousDocumentUuid: versionInfo.existingDocument.uuid, + newDocumentContent: documentBody.episodeBody, + userId: payload.userId, + invalidatedBy: document.uuid, + semanticSimilarityThreshold: 0.75, // Configurable threshold + }); + + logger.log(`Statement invalidation completed:`, { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + }); + } logger.log( `Document chunked into ${chunkedDocument.chunks.length} chunks`, ); - // Step 3: Queue each chunk as a separate episode - for (const chunk of chunkedDocument.chunks) { + // Step 4: Process chunks based on differential strategy + let chunksToProcess = chunkedDocument.chunks; + let processingMode = "full"; + + if (differentialDecision.shouldUseDifferential && differentialDecision.strategy === "chunk_level_diff") { + // Only process changed chunks + const chunkComparisons = differentialService.getChunkComparisons( + versionInfo.existingDocument!, + chunkedDocument, + ); + + const changedIndices = differentialService.getChunksNeedingReprocessing(chunkComparisons); + chunksToProcess = chunkedDocument.chunks.filter(chunk => + changedIndices.includes(chunk.chunkIndex) + ); + processingMode = "differential"; + + logger.log(`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`); + } else if (differentialDecision.strategy === "full_reingest") { + // Process all chunks + processingMode = "full"; + logger.log(`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`); + } + + // Step 5: Queue chunks for processing + const episodeHandlers = []; + for (const chunk of chunksToProcess) { const chunkEpisodeData = { episodeBody: chunk.content, referenceTime: documentBody.referenceTime, - metadata: documentBody.metadata, + metadata: { + ...documentBody.metadata, + processingMode, + differentialStrategy: differentialDecision.strategy, + chunkHash: chunk.contentHash, + }, source: documentBody.source, spaceId: documentBody.spaceId, sessionId: documentBody.sessionId, type: EpisodeTypeEnum.DOCUMENT, documentTitle: documentBody.documentTitle, - documentId: documentBody.documentId, + documentId: document.uuid, // Use the new document UUID chunkIndex: chunk.chunkIndex, }; @@ -97,28 +170,48 @@ export const ingestDocumentTask = task({ { queue: "ingestion-queue", concurrencyKey: payload.userId, - tags: [payload.userId, payload.queueId], + tags: [payload.userId, payload.queueId, processingMode], }, ); if (episodeHandler.id) { + episodeHandlers.push(episodeHandler.id); logger.log( - `Queued chunk ${chunk.chunkIndex + 1}/${chunkedDocument.chunks.length} for processing`, + `Queued chunk ${chunk.chunkIndex + 1} for ${processingMode} processing`, { handlerId: episodeHandler.id, chunkSize: chunk.content.length, + chunkHash: chunk.contentHash, }, ); } } + // Calculate cost savings + const costSavings = differentialService.calculateCostSavings( + chunkedDocument.chunks.length, + chunksToProcess.length, + ); + await prisma.ingestionQueue.update({ where: { id: payload.queueId }, data: { output: { documentUuid: document.uuid, + version: versionInfo.newVersion, totalChunks: chunkedDocument.chunks.length, + chunksProcessed: chunksToProcess.length, + chunksSkipped: costSavings.chunksSkipped, + processingMode, + differentialStrategy: differentialDecision.strategy, + estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, + statementInvalidation: invalidationResults ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } : null, episodes: [], + episodeHandlers, }, status: IngestionStatus.PROCESSING, }, @@ -127,10 +220,21 @@ export const ingestDocumentTask = task({ const processingTimeMs = Date.now() - startTime; logger.log( - `Document chunking processing completed in ${processingTimeMs}ms`, + `Document differential processing completed in ${processingTimeMs}ms`, { documentUuid: document.uuid, + version: versionInfo.newVersion, + processingMode, totalChunks: chunkedDocument.chunks.length, + chunksProcessed: chunksToProcess.length, + chunksSkipped: costSavings.chunksSkipped, + estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, + changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`, + statementInvalidation: invalidationResults ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } : "No previous version", }, ); diff --git a/apps/webapp/app/trigger/ingest/ingest.ts b/apps/webapp/app/trigger/ingest/ingest.ts index f134b10..cd23aef 100644 --- a/apps/webapp/app/trigger/ingest/ingest.ts +++ b/apps/webapp/app/trigger/ingest/ingest.ts @@ -1,6 +1,7 @@ import { queue, task } from "@trigger.dev/sdk"; import { z } from "zod"; import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; +import { linkEpisodeToDocument } from "~/services/graphModels/document"; import { IngestionStatus } from "@core/database"; import { logger } from "~/services/logger.service"; @@ -60,6 +61,20 @@ export const ingestTask = task({ prisma, ); + // Link episode to document if it's a document chunk + if (episodeBody.type === EpisodeType.DOCUMENT && episodeBody.documentId && episodeDetails.episodeUuid) { + try { + await linkEpisodeToDocument( + episodeDetails.episodeUuid, + episodeBody.documentId, + episodeBody.chunkIndex || 0, + ); + logger.log(`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.documentId} at chunk ${episodeBody.chunkIndex || 0}`); + } catch (error) { + logger.error(`Failed to link episode to document:`, { error, episodeUuid: episodeDetails.episodeUuid, documentId: episodeBody.documentId }); + } + } + let finalOutput = episodeDetails; let episodeUuids: string[] = episodeDetails.episodeUuid ? [episodeDetails.episodeUuid] diff --git a/packages/types/src/graph/graph.entity.ts b/packages/types/src/graph/graph.entity.ts index 5bc7e99..c5c986b 100644 --- a/packages/types/src/graph/graph.entity.ts +++ b/packages/types/src/graph/graph.entity.ts @@ -14,6 +14,11 @@ export interface DocumentNode { totalChunks: number; documentId?: string; sessionId?: string; + // Version tracking for differential ingestion + version: number; + contentHash: string; + previousVersionUuid?: string; + chunkHashes?: string[]; // Hash of each chunk for change detection } /**