Feat: add versioning to documents

This commit is contained in:
Manoj K 2025-09-02 17:18:25 +05:30
parent 5b0dd7d4a7
commit ec80667c24
9 changed files with 983 additions and 31 deletions

View File

@ -8,6 +8,7 @@ export interface DocumentChunk {
context?: string;
startPosition: number;
endPosition: number;
contentHash: string; // Hash for change detection
}
export interface ChunkedDocument {
@ -16,6 +17,8 @@ export interface ChunkedDocument {
originalContent: string;
chunks: DocumentChunk[];
totalChunks: number;
contentHash: string; // Hash of the entire document
chunkHashes: string[]; // Array of chunk hashes for change detection
}
/**
@ -36,6 +39,7 @@ export class DocumentChunker {
title: string,
): Promise<ChunkedDocument> {
const documentId = crypto.randomUUID();
const contentHash = this.generateContentHash(originalContent);
// First, split by major section headers (markdown style)
const majorSections = this.splitByMajorSections(originalContent);
@ -107,12 +111,17 @@ export class DocumentChunker {
));
}
// Generate chunk hashes array
const chunkHashes = chunks.map(chunk => chunk.contentHash);
return {
documentId,
title,
originalContent,
chunks,
totalChunks: chunks.length,
contentHash,
chunkHashes,
};
}
@ -235,6 +244,7 @@ export class DocumentChunker {
): DocumentChunk {
// Generate a concise context/title if not provided
const context = title || this.generateChunkContext(content);
const contentHash = this.generateContentHash(content.trim());
return {
content: content.trim(),
@ -243,6 +253,7 @@ export class DocumentChunker {
context: `Chunk ${chunkIndex + 1}${context ? `: ${context}` : ""}`,
startPosition,
endPosition,
contentHash,
};
}
@ -259,4 +270,46 @@ export class DocumentChunker {
return "Document content";
}
/**
* Generate content hash for change detection
*/
private generateContentHash(content: string): string {
return crypto.createHash('sha256').update(content, 'utf8').digest('hex').substring(0, 16);
}
/**
* Compare chunk hashes to detect changes
*/
static compareChunkHashes(oldHashes: string[], newHashes: string[]): {
changedIndices: number[];
changePercentage: number;
} {
const maxLength = Math.max(oldHashes.length, newHashes.length);
const changedIndices: number[] = [];
for (let i = 0; i < maxLength; i++) {
const oldHash = oldHashes[i];
const newHash = newHashes[i];
// Mark as changed if hash is different or chunk added/removed
if (oldHash !== newHash) {
changedIndices.push(i);
}
}
const changePercentage = maxLength > 0 ? (changedIndices.length / maxLength) * 100 : 0;
return {
changedIndices,
changePercentage,
};
}
/**
* Calculate document size in tokens for threshold decisions
*/
static getDocumentSizeInTokens(content: string): number {
return encode(content).length;
}
}

View File

@ -0,0 +1,204 @@
import { encode } from "gpt-tokenizer";
import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server";
import type { DocumentNode } from "@core/types";
export interface DifferentialDecision {
shouldUseDifferential: boolean;
strategy: "full_reingest" | "chunk_level_diff" | "new_document";
reason: string;
changedChunkIndices: number[];
changePercentage: number;
documentSizeTokens: number;
}
export interface ChunkComparison {
chunkIndex: number;
hasChanged: boolean;
oldHash?: string;
newHash: string;
semanticSimilarity?: number;
}
/**
* Service for implementing differential document processing logic
* Determines when to use differential vs full re-ingestion based on
* document size and change percentage thresholds
*/
export class DocumentDifferentialService {
// Threshold constants based on our enhanced approach
private readonly SMALL_DOC_THRESHOLD = 5 * 1000; // 5K tokens
private readonly MEDIUM_DOC_THRESHOLD = 50 * 1000; // 50K tokens
// Change percentage thresholds
private readonly SMALL_CHANGE_THRESHOLD = 20; // 20%
private readonly MEDIUM_CHANGE_THRESHOLD = 30; // 30%
/**
* Analyze whether to use differential processing for a document update
*/
async analyzeDifferentialNeed(
newContent: string,
existingDocument: DocumentNode | null,
newChunkedDocument: ChunkedDocument,
): Promise<DifferentialDecision> {
// If no existing document, it's a new document
if (!existingDocument) {
return {
shouldUseDifferential: false,
strategy: "new_document",
reason: "No existing document found",
changedChunkIndices: [],
changePercentage: 100,
documentSizeTokens: encode(newContent).length,
};
}
const documentSizeTokens = encode(newContent).length;
// Quick content hash comparison
if (existingDocument.contentHash === newChunkedDocument.contentHash) {
return {
shouldUseDifferential: false,
strategy: "full_reingest", // No changes detected
reason: "Document content unchanged",
changedChunkIndices: [],
changePercentage: 0,
documentSizeTokens,
};
}
// Compare chunk hashes to identify changes
const chunkComparison = DocumentChunker.compareChunkHashes(
existingDocument.chunkHashes || [],
newChunkedDocument.chunkHashes,
);
const { changedIndices, changePercentage } = chunkComparison;
// Apply threshold-based decision matrix
const decision = this.applyThresholdDecision(
documentSizeTokens,
changePercentage,
changedIndices,
);
return {
...decision,
changedChunkIndices: changedIndices,
changePercentage,
documentSizeTokens,
};
}
/**
* Apply threshold-based decision matrix
*/
private applyThresholdDecision(
documentSizeTokens: number,
changePercentage: number,
changedIndices: number[],
): Pick<DifferentialDecision, "shouldUseDifferential" | "strategy" | "reason"> {
// Small documents: always full re-ingest (cheap)
if (documentSizeTokens < this.SMALL_DOC_THRESHOLD) {
return {
shouldUseDifferential: false,
strategy: "full_reingest",
reason: `Document too small (${documentSizeTokens} tokens < ${this.SMALL_DOC_THRESHOLD})`,
};
}
// Medium documents (5-50K tokens)
if (documentSizeTokens < this.MEDIUM_DOC_THRESHOLD) {
if (changePercentage < this.SMALL_CHANGE_THRESHOLD) {
return {
shouldUseDifferential: true,
strategy: "chunk_level_diff",
reason: `Medium document with small changes (${changePercentage.toFixed(1)}% < ${this.SMALL_CHANGE_THRESHOLD}%)`,
};
} else {
return {
shouldUseDifferential: false,
strategy: "full_reingest",
reason: `Medium document with large changes (${changePercentage.toFixed(1)}% >= ${this.SMALL_CHANGE_THRESHOLD}%)`,
};
}
}
// Large documents (>50K tokens)
if (changePercentage < this.MEDIUM_CHANGE_THRESHOLD) {
return {
shouldUseDifferential: true,
strategy: "chunk_level_diff",
reason: `Large document with moderate changes (${changePercentage.toFixed(1)}% < ${this.MEDIUM_CHANGE_THRESHOLD}%)`,
};
} else {
return {
shouldUseDifferential: false,
strategy: "full_reingest",
reason: `Large document with extensive changes (${changePercentage.toFixed(1)}% >= ${this.MEDIUM_CHANGE_THRESHOLD}%)`,
};
}
}
/**
* Get detailed chunk comparison for differential processing
*/
getChunkComparisons(
existingDocument: DocumentNode,
newChunkedDocument: ChunkedDocument,
): ChunkComparison[] {
const oldHashes = existingDocument.chunkHashes || [];
const newHashes = newChunkedDocument.chunkHashes;
const maxLength = Math.max(oldHashes.length, newHashes.length);
const comparisons: ChunkComparison[] = [];
for (let i = 0; i < maxLength; i++) {
const oldHash = oldHashes[i];
const newHash = newHashes[i];
comparisons.push({
chunkIndex: i,
hasChanged: oldHash !== newHash,
oldHash,
newHash: newHash || "", // Handle case where new doc has fewer chunks
});
}
return comparisons;
}
/**
* Filter chunks that need re-processing
*/
getChunksNeedingReprocessing(
chunkComparisons: ChunkComparison[],
): number[] {
return chunkComparisons
.filter(comparison => comparison.hasChanged)
.map(comparison => comparison.chunkIndex);
}
/**
* Calculate processing cost savings estimate
*/
calculateCostSavings(
totalChunks: number,
changedChunks: number,
): {
chunksToProcess: number;
chunksSkipped: number;
estimatedSavingsPercentage: number;
} {
const chunksSkipped = totalChunks - changedChunks;
const estimatedSavingsPercentage = totalChunks > 0
? (chunksSkipped / totalChunks) * 100
: 0;
return {
chunksToProcess: changedChunks,
chunksSkipped,
estimatedSavingsPercentage,
};
}
}

View File

@ -0,0 +1,316 @@
import crypto from "crypto";
import type { DocumentNode } from "@core/types";
import {
findExistingDocument,
getDocumentVersions
} from "./graphModels/document";
import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server";
import { KnowledgeGraphService } from "./knowledgeGraph.server";
export interface DocumentVersion {
uuid: string;
version: number;
contentHash: string;
chunkHashes: string[];
createdAt: Date;
validAt: Date;
title: string;
metadata: Record<string, any>;
}
export interface VersionedDocumentInfo {
isNewDocument: boolean;
existingDocument: DocumentNode | null;
newVersion: number;
previousVersionUuid: string | null;
hasContentChanged: boolean;
chunkLevelChanges: {
changedChunkIndices: number[];
changePercentage: number;
totalChunks: number;
};
}
/**
* Service for managing document versions and coordinating differential ingestion
* Integrates with the knowledge graph for semantic similarity checks
*/
export class DocumentVersioningService {
private knowledgeGraphService: KnowledgeGraphService;
constructor() {
this.knowledgeGraphService = new KnowledgeGraphService();
}
/**
* Prepare a new document version with proper versioning information
*/
async prepareDocumentVersion(
documentId: string,
userId: string,
title: string,
content: string,
source: string,
metadata: Record<string, any> = {},
sessionId?: string,
): Promise<{
documentNode: DocumentNode;
versionInfo: VersionedDocumentInfo;
chunkedDocument: ChunkedDocument;
}> {
// Find existing document for version comparison
const existingDocument = await findExistingDocument(documentId, userId);
// Chunk the new document content
const documentChunker = new DocumentChunker();
const chunkedDocument = await documentChunker.chunkDocument(content, title);
// Determine version information
const versionInfo = this.analyzeVersionChanges(existingDocument, chunkedDocument);
// Create new document node
const documentNode = this.createVersionedDocumentNode(
documentId,
userId,
title,
content,
source,
metadata,
sessionId,
versionInfo,
chunkedDocument,
);
return {
documentNode,
versionInfo,
chunkedDocument,
};
}
/**
* Analyze changes between existing and new document versions
*/
private analyzeVersionChanges(
existingDocument: DocumentNode | null,
newChunkedDocument: ChunkedDocument,
): VersionedDocumentInfo {
if (!existingDocument) {
return {
isNewDocument: true,
existingDocument: null,
newVersion: 1,
previousVersionUuid: null,
hasContentChanged: true,
chunkLevelChanges: {
changedChunkIndices: [],
changePercentage: 100,
totalChunks: newChunkedDocument.totalChunks,
},
};
}
// Check if content has actually changed
const hasContentChanged = existingDocument.contentHash !== newChunkedDocument.contentHash;
if (!hasContentChanged) {
return {
isNewDocument: false,
existingDocument,
newVersion: existingDocument.version,
previousVersionUuid: existingDocument.uuid,
hasContentChanged: false,
chunkLevelChanges: {
changedChunkIndices: [],
changePercentage: 0,
totalChunks: newChunkedDocument.totalChunks,
},
};
}
// Analyze chunk-level changes
const chunkComparison = DocumentChunker.compareChunkHashes(
existingDocument.chunkHashes || [],
newChunkedDocument.chunkHashes,
);
return {
isNewDocument: false,
existingDocument,
newVersion: existingDocument.version + 1,
previousVersionUuid: existingDocument.uuid,
hasContentChanged: true,
chunkLevelChanges: {
changedChunkIndices: chunkComparison.changedIndices,
changePercentage: chunkComparison.changePercentage,
totalChunks: newChunkedDocument.totalChunks,
},
};
}
/**
* Create a new versioned document node
*/
private createVersionedDocumentNode(
documentId: string,
userId: string,
title: string,
content: string,
source: string,
metadata: Record<string, any>,
sessionId: string | undefined,
versionInfo: VersionedDocumentInfo,
chunkedDocument: ChunkedDocument,
): DocumentNode {
return {
uuid: crypto.randomUUID(),
title,
originalContent: content,
metadata: {
...metadata,
chunkingStrategy: "semantic_sections",
targetChunkSize: 12500,
actualChunks: chunkedDocument.totalChunks,
},
source,
userId,
createdAt: new Date(),
validAt: new Date(),
totalChunks: chunkedDocument.totalChunks,
documentId,
sessionId,
version: versionInfo.newVersion,
contentHash: chunkedDocument.contentHash,
previousVersionUuid: versionInfo.previousVersionUuid || undefined,
chunkHashes: chunkedDocument.chunkHashes,
};
}
/**
* Get version history for a document
*/
async getDocumentHistory(
documentId: string,
userId: string,
limit: number = 10,
): Promise<DocumentVersion[]> {
const versions = await getDocumentVersions(documentId, userId, limit);
return versions.map(doc => ({
uuid: doc.uuid,
version: doc.version,
contentHash: doc.contentHash,
chunkHashes: doc.chunkHashes || [],
createdAt: doc.createdAt,
validAt: doc.validAt,
title: doc.title,
metadata: doc.metadata,
}));
}
/**
* Check if statements should be invalidated based on semantic similarity
* This implements the semantic similarity gate (>0.85 threshold)
*/
async checkStatementInvalidation(
oldChunkContent: string,
newChunkContent: string,
threshold: number = 0.85,
): Promise<{
shouldInvalidate: boolean;
semanticSimilarity: number;
}> {
try {
// Generate embeddings for both chunks
const [oldEmbedding, newEmbedding] = await Promise.all([
this.knowledgeGraphService.getEmbedding(oldChunkContent),
this.knowledgeGraphService.getEmbedding(newChunkContent),
]);
// Calculate cosine similarity
const similarity = this.calculateCosineSimilarity(oldEmbedding, newEmbedding);
// If similarity is below threshold, invalidate old statements
const shouldInvalidate = similarity < threshold;
return {
shouldInvalidate,
semanticSimilarity: similarity,
};
} catch (error) {
console.error("Error checking statement invalidation:", error);
// On error, be conservative and invalidate
return {
shouldInvalidate: true,
semanticSimilarity: 0,
};
}
}
/**
* Calculate cosine similarity between two embedding vectors
*/
private calculateCosineSimilarity(vecA: number[], vecB: number[]): number {
if (vecA.length !== vecB.length) {
throw new Error("Vector dimensions must match");
}
let dotProduct = 0;
let normA = 0;
let normB = 0;
for (let i = 0; i < vecA.length; i++) {
dotProduct += vecA[i] * vecB[i];
normA += vecA[i] * vecA[i];
normB += vecB[i] * vecB[i];
}
normA = Math.sqrt(normA);
normB = Math.sqrt(normB);
if (normA === 0 || normB === 0) {
return 0;
}
return dotProduct / (normA * normB);
}
/**
* Generate a differential processing report
*/
generateDifferentialReport(
versionInfo: VersionedDocumentInfo,
processingStats: {
chunksProcessed: number;
chunksSkipped: number;
statementsCreated: number;
statementsInvalidated: number;
processingTimeMs: number;
},
): {
summary: string;
metrics: Record<string, any>;
} {
const totalChunks = versionInfo.chunkLevelChanges.totalChunks;
const changePercentage = versionInfo.chunkLevelChanges.changePercentage;
const savingsPercentage = totalChunks > 0
? (processingStats.chunksSkipped / totalChunks) * 100
: 0;
return {
summary: `Document v${versionInfo.newVersion}: ${changePercentage.toFixed(1)}% changed, ${savingsPercentage.toFixed(1)}% processing saved`,
metrics: {
version: versionInfo.newVersion,
isNewDocument: versionInfo.isNewDocument,
totalChunks,
chunksChanged: processingStats.chunksProcessed,
chunksSkipped: processingStats.chunksSkipped,
changePercentage: changePercentage,
processingTimeMs: processingStats.processingTimeMs,
statementsCreated: processingStats.statementsCreated,
statementsInvalidated: processingStats.statementsInvalidated,
estimatedCostSavings: savingsPercentage,
},
};
}
}

View File

@ -1,5 +1,6 @@
import { runQuery } from "~/lib/neo4j.server";
import type { DocumentNode } from "@core/types";
import crypto from "crypto";
export async function saveDocument(document: DocumentNode): Promise<string> {
const query = `
@ -14,7 +15,11 @@ export async function saveDocument(document: DocumentNode): Promise<string> {
d.validAt = $validAt,
d.totalChunks = $totalChunks,
d.documentId = $documentId,
d.sessionId = $sessionId
d.sessionId = $sessionId,
d.version = $version,
d.contentHash = $contentHash,
d.previousVersionUuid = $previousVersionUuid,
d.chunkHashes = $chunkHashes
ON MATCH SET
d.title = $title,
d.originalContent = $originalContent,
@ -23,7 +28,11 @@ export async function saveDocument(document: DocumentNode): Promise<string> {
d.validAt = $validAt,
d.totalChunks = $totalChunks,
d.documentId = $documentId,
d.sessionId = $sessionId
d.sessionId = $sessionId,
d.version = $version,
d.contentHash = $contentHash,
d.previousVersionUuid = $previousVersionUuid,
d.chunkHashes = $chunkHashes
RETURN d.uuid as uuid
`;
@ -39,6 +48,10 @@ export async function saveDocument(document: DocumentNode): Promise<string> {
totalChunks: document.totalChunks || 0,
documentId: document.documentId || null,
sessionId: document.sessionId || null,
version: document.version || 1,
contentHash: document.contentHash,
previousVersionUuid: document.previousVersionUuid || null,
chunkHashes: document.chunkHashes || [],
};
const result = await runQuery(query, params);
@ -94,6 +107,10 @@ export async function getDocument(
createdAt: new Date(documentNode.properties.createdAt),
validAt: new Date(documentNode.properties.validAt),
totalChunks: documentNode.properties.totalChunks,
version: documentNode.properties.version || 1,
contentHash: documentNode.properties.contentHash || "",
previousVersionUuid: documentNode.properties.previousVersionUuid || null,
chunkHashes: documentNode.properties.chunkHashes || [],
};
}
@ -146,6 +163,92 @@ export async function getUserDocuments(
createdAt: new Date(documentNode.properties.createdAt),
validAt: new Date(documentNode.properties.validAt),
totalChunks: documentNode.properties.totalChunks,
version: documentNode.properties.version || 1,
contentHash: documentNode.properties.contentHash || "",
previousVersionUuid: documentNode.properties.previousVersionUuid || null,
chunkHashes: documentNode.properties.chunkHashes || [],
};
});
}
/**
* Generate content hash for document versioning
*/
export function generateContentHash(content: string): string {
return crypto.createHash('sha256').update(content, 'utf8').digest('hex');
}
/**
* Find existing document by documentId and userId for version comparison
*/
export async function findExistingDocument(
documentId: string,
userId: string,
): Promise<DocumentNode | null> {
const query = `
MATCH (d:Document {documentId: $documentId, userId: $userId})
RETURN d
ORDER BY d.version DESC
LIMIT 1
`;
const params = { documentId, userId };
const result = await runQuery(query, params);
if (result.length === 0) return null;
const documentNode = result[0].get("d");
return {
uuid: documentNode.properties.uuid,
title: documentNode.properties.title,
originalContent: documentNode.properties.originalContent,
metadata: JSON.parse(documentNode.properties.metadata || "{}"),
source: documentNode.properties.source,
userId: documentNode.properties.userId,
createdAt: new Date(documentNode.properties.createdAt),
validAt: new Date(documentNode.properties.validAt),
totalChunks: documentNode.properties.totalChunks,
version: documentNode.properties.version || 1,
contentHash: documentNode.properties.contentHash || "",
previousVersionUuid: documentNode.properties.previousVersionUuid || null,
chunkHashes: documentNode.properties.chunkHashes || [],
};
}
/**
* Get document version history
*/
export async function getDocumentVersions(
documentId: string,
userId: string,
limit: number = 10,
): Promise<DocumentNode[]> {
const query = `
MATCH (d:Document {documentId: $documentId, userId: $userId})
RETURN d
ORDER BY d.version DESC
LIMIT $limit
`;
const params = { documentId, userId, limit };
const result = await runQuery(query, params);
return result.map((record) => {
const documentNode = record.get("d");
return {
uuid: documentNode.properties.uuid,
title: documentNode.properties.title,
originalContent: documentNode.properties.originalContent,
metadata: JSON.parse(documentNode.properties.metadata || "{}"),
source: documentNode.properties.source,
userId: documentNode.properties.userId,
createdAt: new Date(documentNode.properties.createdAt),
validAt: new Date(documentNode.properties.validAt),
totalChunks: documentNode.properties.totalChunks,
version: documentNode.properties.version || 1,
contentHash: documentNode.properties.contentHash || "",
previousVersionUuid: documentNode.properties.previousVersionUuid || null,
chunkHashes: documentNode.properties.chunkHashes || [],
};
});
}

View File

@ -113,6 +113,7 @@ export async function getRecentEpisodes(params: {
userId: params.userId,
source: params.source || null,
sessionId: params.sessionId || null,
documentId: params.documentId || null,
};
const result = await runQuery(query, queryParams);
@ -132,6 +133,7 @@ export async function getRecentEpisodes(params: {
userId: episode.userId,
space: episode.space,
sessionId: episode.sessionId,
documentId: episode.documentId,
};
});
}
@ -176,6 +178,7 @@ export async function searchEpisodesByEmbedding(params: {
? JSON.parse(episode.attributesJson)
: {},
userId: episode.userId,
documentId: episode.documentId,
};
});
}

View File

@ -44,6 +44,7 @@ import {
searchStatementsByEmbedding,
} from "./graphModels/statement";
import { getEmbedding, makeModelCall } from "~/lib/model.server";
import { runQuery } from "~/lib/neo4j.server";
import {
Apps,
getNodeTypes,
@ -67,6 +68,154 @@ export class KnowledgeGraphService {
return getEmbedding(text);
}
/**
* Invalidate statements from a previous document version that are no longer supported
* by the new document content using semantic similarity analysis
*/
async invalidateStatementsFromPreviousDocumentVersion(params: {
previousDocumentUuid: string;
newDocumentContent: string;
userId: string;
invalidatedBy: string;
semanticSimilarityThreshold?: number;
}): Promise<{
invalidatedStatements: string[];
preservedStatements: string[];
totalStatementsAnalyzed: number;
}> {
const threshold = params.semanticSimilarityThreshold || 0.75; // Lower threshold for document-level analysis
const invalidatedStatements: string[] = [];
const preservedStatements: string[] = [];
// Step 1: Get all statements from the previous document version
const previousStatements = await this.getStatementsFromDocument(
params.previousDocumentUuid,
params.userId,
);
if (previousStatements.length === 0) {
return {
invalidatedStatements: [],
preservedStatements: [],
totalStatementsAnalyzed: 0,
};
}
logger.log(`Analyzing ${previousStatements.length} statements from previous document version`);
// Step 2: Generate embedding for new document content
const newDocumentEmbedding = await this.getEmbedding(params.newDocumentContent);
// Step 3: For each statement, check if it's still semantically supported by new content
for (const statement of previousStatements) {
try {
// Generate embedding for the statement fact
const statementEmbedding = await this.getEmbedding(statement.fact);
// Calculate semantic similarity between statement and new document
const semanticSimilarity = this.calculateCosineSimilarity(
statementEmbedding,
newDocumentEmbedding,
);
if (semanticSimilarity < threshold) {
invalidatedStatements.push(statement.uuid);
logger.log(`Invalidating statement: "${statement.fact}" (similarity: ${semanticSimilarity.toFixed(3)})`);
} else {
preservedStatements.push(statement.uuid);
logger.log(`Preserving statement: "${statement.fact}" (similarity: ${semanticSimilarity.toFixed(3)})`);
}
} catch (error) {
logger.error(`Error analyzing statement ${statement.uuid}:`, { error });
// On error, be conservative and invalidate
invalidatedStatements.push(statement.uuid);
}
}
// Step 4: Bulk invalidate the selected statements
if (invalidatedStatements.length > 0) {
await invalidateStatements({
statementIds: invalidatedStatements,
invalidatedBy: params.invalidatedBy,
});
logger.log(`Document-level invalidation completed`, {
previousDocumentUuid: params.previousDocumentUuid,
totalAnalyzed: previousStatements.length,
invalidated: invalidatedStatements.length,
preserved: preservedStatements.length,
threshold,
});
}
return {
invalidatedStatements,
preservedStatements,
totalStatementsAnalyzed: previousStatements.length,
};
}
/**
* Get all statements that were created from episodes linked to a specific document
*/
private async getStatementsFromDocument(
documentUuid: string,
userId: string,
): Promise<StatementNode[]> {
const query = `
MATCH (doc:Document {uuid: $documentUuid, userId: $userId})-[:CONTAINS_CHUNK]->(episode:Episode)
MATCH (episode)-[:HAS_PROVENANCE]->(stmt:Statement)
RETURN stmt
`;
const result = await runQuery(query, {
documentUuid,
userId,
});
return result.map((record) => {
const stmt = record.get("stmt").properties;
return {
uuid: stmt.uuid,
fact: stmt.fact,
factEmbedding: stmt.factEmbedding || [],
createdAt: new Date(stmt.createdAt),
validAt: new Date(stmt.validAt),
invalidAt: stmt.invalidAt ? new Date(stmt.invalidAt) : null,
attributes: stmt.attributesJson ? JSON.parse(stmt.attributesJson) : {},
userId: stmt.userId,
};
});
}
/**
* Calculate cosine similarity between two embedding vectors
*/
private calculateCosineSimilarity(vecA: number[], vecB: number[]): number {
if (vecA.length !== vecB.length) {
throw new Error("Vector dimensions must match");
}
let dotProduct = 0;
let normA = 0;
let normB = 0;
for (let i = 0; i < vecA.length; i++) {
dotProduct += vecA[i] * vecB[i];
normA += vecA[i] * vecA[i];
normB += vecB[i] * vecB[i];
}
normA = Math.sqrt(normA);
normB = Math.sqrt(normB);
if (normA === 0 || normB === 0) {
return 0;
}
return dotProduct / (normA * normB);
}
/**
* Process an episode and update the knowledge graph.
*

View File

@ -3,11 +3,13 @@ import { type z } from "zod";
import crypto from "crypto";
import { IngestionStatus } from "@core/database";
import { EpisodeTypeEnum, type DocumentNode } from "@core/types";
import { EpisodeTypeEnum } from "@core/types";
import { logger } from "~/services/logger.service";
import { DocumentChunker } from "~/services/documentChunker.server";
import { saveDocument } from "~/services/graphModels/document";
import { type IngestBodyRequest } from "~/lib/ingest.server";
import { DocumentVersioningService } from "~/services/documentVersioning.server";
import { DocumentDifferentialService } from "~/services/documentDiffer.server";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { prisma } from "../utils/prisma";
import { ingestTask } from "./ingest";
@ -44,46 +46,117 @@ export const ingestDocumentTask = task({
const documentBody = payload.body as any;
// Step 1: Create document node
const document: DocumentNode = {
uuid: crypto.randomUUID(),
title: documentBody.documentTitle || "Untitled Document",
originalContent: documentBody.episodeBody,
metadata: documentBody.metadata || {},
source: documentBody.source,
userId: payload.userId,
createdAt: new Date(),
validAt: new Date(documentBody.referenceTime),
totalChunks: 0,
documentId: documentBody.documentId,
sessionId: documentBody.sessionId,
};
// Step 1: Initialize services and prepare document version
const versioningService = new DocumentVersioningService();
const differentialService = new DocumentDifferentialService();
const knowledgeGraphService = new KnowledgeGraphService();
const {
documentNode: document,
versionInfo,
chunkedDocument,
} = await versioningService.prepareDocumentVersion(
documentBody.documentId || crypto.randomUUID(),
payload.userId,
documentBody.documentTitle || "Untitled Document",
documentBody.episodeBody,
documentBody.source,
documentBody.metadata || {},
documentBody.sessionId,
);
logger.log(`Document version analysis:`, {
version: versionInfo.newVersion,
isNewDocument: versionInfo.isNewDocument,
hasContentChanged: versionInfo.hasContentChanged,
changePercentage: versionInfo.chunkLevelChanges.changePercentage,
changedChunks: versionInfo.chunkLevelChanges.changedChunkIndices.length,
totalChunks: versionInfo.chunkLevelChanges.totalChunks,
});
// Step 2: Determine processing strategy
const differentialDecision = await differentialService.analyzeDifferentialNeed(
documentBody.episodeBody,
versionInfo.existingDocument,
chunkedDocument,
);
logger.log(`Differential analysis:`, {
shouldUseDifferential: differentialDecision.shouldUseDifferential,
strategy: differentialDecision.strategy,
reason: differentialDecision.reason,
documentSizeTokens: differentialDecision.documentSizeTokens,
});
// Step 3: Save the new document version
await saveDocument(document);
// Step 2: Chunk the document
const documentChunker = new DocumentChunker();
const chunkedDocument = await documentChunker.chunkDocument(
documentBody.episodeBody,
documentBody.documentTitle,
);
// Step 3.1: Invalidate statements from previous document version if it exists
let invalidationResults = null;
if (versionInfo.existingDocument && versionInfo.hasContentChanged) {
logger.log(`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`);
invalidationResults = await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion({
previousDocumentUuid: versionInfo.existingDocument.uuid,
newDocumentContent: documentBody.episodeBody,
userId: payload.userId,
invalidatedBy: document.uuid,
semanticSimilarityThreshold: 0.75, // Configurable threshold
});
logger.log(`Statement invalidation completed:`, {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
});
}
logger.log(
`Document chunked into ${chunkedDocument.chunks.length} chunks`,
);
// Step 3: Queue each chunk as a separate episode
for (const chunk of chunkedDocument.chunks) {
// Step 4: Process chunks based on differential strategy
let chunksToProcess = chunkedDocument.chunks;
let processingMode = "full";
if (differentialDecision.shouldUseDifferential && differentialDecision.strategy === "chunk_level_diff") {
// Only process changed chunks
const chunkComparisons = differentialService.getChunkComparisons(
versionInfo.existingDocument!,
chunkedDocument,
);
const changedIndices = differentialService.getChunksNeedingReprocessing(chunkComparisons);
chunksToProcess = chunkedDocument.chunks.filter(chunk =>
changedIndices.includes(chunk.chunkIndex)
);
processingMode = "differential";
logger.log(`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`);
} else if (differentialDecision.strategy === "full_reingest") {
// Process all chunks
processingMode = "full";
logger.log(`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`);
}
// Step 5: Queue chunks for processing
const episodeHandlers = [];
for (const chunk of chunksToProcess) {
const chunkEpisodeData = {
episodeBody: chunk.content,
referenceTime: documentBody.referenceTime,
metadata: documentBody.metadata,
metadata: {
...documentBody.metadata,
processingMode,
differentialStrategy: differentialDecision.strategy,
chunkHash: chunk.contentHash,
},
source: documentBody.source,
spaceId: documentBody.spaceId,
sessionId: documentBody.sessionId,
type: EpisodeTypeEnum.DOCUMENT,
documentTitle: documentBody.documentTitle,
documentId: documentBody.documentId,
documentId: document.uuid, // Use the new document UUID
chunkIndex: chunk.chunkIndex,
};
@ -97,28 +170,48 @@ export const ingestDocumentTask = task({
{
queue: "ingestion-queue",
concurrencyKey: payload.userId,
tags: [payload.userId, payload.queueId],
tags: [payload.userId, payload.queueId, processingMode],
},
);
if (episodeHandler.id) {
episodeHandlers.push(episodeHandler.id);
logger.log(
`Queued chunk ${chunk.chunkIndex + 1}/${chunkedDocument.chunks.length} for processing`,
`Queued chunk ${chunk.chunkIndex + 1} for ${processingMode} processing`,
{
handlerId: episodeHandler.id,
chunkSize: chunk.content.length,
chunkHash: chunk.contentHash,
},
);
}
}
// Calculate cost savings
const costSavings = differentialService.calculateCostSavings(
chunkedDocument.chunks.length,
chunksToProcess.length,
);
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: {
documentUuid: document.uuid,
version: versionInfo.newVersion,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
processingMode,
differentialStrategy: differentialDecision.strategy,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults ? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
} : null,
episodes: [],
episodeHandlers,
},
status: IngestionStatus.PROCESSING,
},
@ -127,10 +220,21 @@ export const ingestDocumentTask = task({
const processingTimeMs = Date.now() - startTime;
logger.log(
`Document chunking processing completed in ${processingTimeMs}ms`,
`Document differential processing completed in ${processingTimeMs}ms`,
{
documentUuid: document.uuid,
version: versionInfo.newVersion,
processingMode,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults ? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
} : "No previous version",
},
);

View File

@ -1,6 +1,7 @@
import { queue, task } from "@trigger.dev/sdk";
import { z } from "zod";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { linkEpisodeToDocument } from "~/services/graphModels/document";
import { IngestionStatus } from "@core/database";
import { logger } from "~/services/logger.service";
@ -60,6 +61,20 @@ export const ingestTask = task({
prisma,
);
// Link episode to document if it's a document chunk
if (episodeBody.type === EpisodeType.DOCUMENT && episodeBody.documentId && episodeDetails.episodeUuid) {
try {
await linkEpisodeToDocument(
episodeDetails.episodeUuid,
episodeBody.documentId,
episodeBody.chunkIndex || 0,
);
logger.log(`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.documentId} at chunk ${episodeBody.chunkIndex || 0}`);
} catch (error) {
logger.error(`Failed to link episode to document:`, { error, episodeUuid: episodeDetails.episodeUuid, documentId: episodeBody.documentId });
}
}
let finalOutput = episodeDetails;
let episodeUuids: string[] = episodeDetails.episodeUuid
? [episodeDetails.episodeUuid]

View File

@ -14,6 +14,11 @@ export interface DocumentNode {
totalChunks: number;
documentId?: string;
sessionId?: string;
// Version tracking for differential ingestion
version: number;
contentHash: string;
previousVersionUuid?: string;
chunkHashes?: string[]; // Hash of each chunk for change detection
}
/**