refactor: consolidate document versioning around sessionId instead of documentId

This commit is contained in:
Manoj K 2025-09-03 11:34:48 +05:30
parent b701a11016
commit acfc0540ac
8 changed files with 124 additions and 115 deletions

View File

@ -37,16 +37,11 @@ export const addToQueue = async (
},
});
let ingestionType = EpisodeType.CONVERSATION;
if (body.documentId) {
ingestionType = EpisodeType.DOCUMENT;
}
let handler;
if (ingestionType === EpisodeType.DOCUMENT) {
if (body.type === EpisodeType.DOCUMENT) {
handler = await ingestDocumentTask.trigger(
{
body: { ...body, type: ingestionType },
body,
userId,
workspaceId: user.Workspace.id,
queueId: queuePersist.id,
@ -57,10 +52,10 @@ export const addToQueue = async (
tags: [user.id, queuePersist.id],
},
);
} else if (ingestionType === EpisodeType.CONVERSATION) {
} else if (body.type === EpisodeType.CONVERSATION) {
handler = await ingestTask.trigger(
{
body: { ...body, type: ingestionType },
body,
userId,
workspaceId: user.Workspace.id,
queueId: queuePersist.id,

View File

@ -1,10 +1,13 @@
import crypto from "crypto";
import type { DocumentNode } from "@core/types";
import {
findExistingDocument,
getDocumentVersions
import {
findExistingDocument,
getDocumentVersions,
} from "./graphModels/document";
import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server";
import {
DocumentChunker,
type ChunkedDocument,
} from "./documentChunker.server";
import { KnowledgeGraphService } from "./knowledgeGraph.server";
export interface DocumentVersion {
@ -46,37 +49,38 @@ export class DocumentVersioningService {
* Prepare a new document version with proper versioning information
*/
async prepareDocumentVersion(
documentId: string,
sessionId: string,
userId: string,
title: string,
content: string,
source: string,
metadata: Record<string, any> = {},
sessionId?: string,
): Promise<{
documentNode: DocumentNode;
versionInfo: VersionedDocumentInfo;
chunkedDocument: ChunkedDocument;
}> {
// Find existing document for version comparison
const existingDocument = await findExistingDocument(documentId, userId);
const existingDocument = await findExistingDocument(sessionId, userId);
// Chunk the new document content
const documentChunker = new DocumentChunker();
const chunkedDocument = await documentChunker.chunkDocument(content, title);
// Determine version information
const versionInfo = this.analyzeVersionChanges(existingDocument, chunkedDocument);
const versionInfo = this.analyzeVersionChanges(
existingDocument,
chunkedDocument,
);
// Create new document node
const documentNode = this.createVersionedDocumentNode(
documentId,
sessionId,
userId,
title,
content,
source,
metadata,
sessionId,
versionInfo,
chunkedDocument,
);
@ -111,8 +115,9 @@ export class DocumentVersioningService {
}
// Check if content has actually changed
const hasContentChanged = existingDocument.contentHash !== newChunkedDocument.contentHash;
const hasContentChanged =
existingDocument.contentHash !== newChunkedDocument.contentHash;
if (!hasContentChanged) {
return {
isNewDocument: false,
@ -152,13 +157,12 @@ export class DocumentVersioningService {
* Create a new versioned document node
*/
private createVersionedDocumentNode(
documentId: string,
sessionId: string,
userId: string,
title: string,
content: string,
source: string,
metadata: Record<string, any>,
sessionId: string | undefined,
versionInfo: VersionedDocumentInfo,
chunkedDocument: ChunkedDocument,
): DocumentNode {
@ -177,12 +181,11 @@ export class DocumentVersioningService {
createdAt: new Date(),
validAt: new Date(),
totalChunks: chunkedDocument.totalChunks,
documentId,
sessionId,
version: versionInfo.newVersion,
contentHash: chunkedDocument.contentHash,
previousVersionUuid: versionInfo.previousVersionUuid || undefined,
chunkHashes: chunkedDocument.chunkHashes,
sessionId,
};
}
@ -195,8 +198,8 @@ export class DocumentVersioningService {
limit: number = 10,
): Promise<DocumentVersion[]> {
const versions = await getDocumentVersions(documentId, userId, limit);
return versions.map(doc => ({
return versions.map((doc) => ({
uuid: doc.uuid,
version: doc.version,
contentHash: doc.contentHash,
@ -228,11 +231,14 @@ export class DocumentVersioningService {
]);
// Calculate cosine similarity
const similarity = this.calculateCosineSimilarity(oldEmbedding, newEmbedding);
const similarity = this.calculateCosineSimilarity(
oldEmbedding,
newEmbedding,
);
// If similarity is below threshold, invalidate old statements
const shouldInvalidate = similarity < threshold;
return {
shouldInvalidate,
semanticSimilarity: similarity,
@ -293,9 +299,8 @@ export class DocumentVersioningService {
} {
const totalChunks = versionInfo.chunkLevelChanges.totalChunks;
const changePercentage = versionInfo.chunkLevelChanges.changePercentage;
const savingsPercentage = totalChunks > 0
? (processingStats.chunksSkipped / totalChunks) * 100
: 0;
const savingsPercentage =
totalChunks > 0 ? (processingStats.chunksSkipped / totalChunks) * 100 : 0;
return {
summary: `Document v${versionInfo.newVersion}: ${changePercentage.toFixed(1)}% changed, ${savingsPercentage.toFixed(1)}% processing saved`,
@ -313,4 +318,4 @@ export class DocumentVersioningService {
},
};
}
}
}

View File

@ -14,7 +14,6 @@ export async function saveDocument(document: DocumentNode): Promise<string> {
d.createdAt = $createdAt,
d.validAt = $validAt,
d.totalChunks = $totalChunks,
d.documentId = $documentId,
d.sessionId = $sessionId,
d.version = $version,
d.contentHash = $contentHash,
@ -27,7 +26,6 @@ export async function saveDocument(document: DocumentNode): Promise<string> {
d.source = $source,
d.validAt = $validAt,
d.totalChunks = $totalChunks,
d.documentId = $documentId,
d.sessionId = $sessionId,
d.version = $version,
d.contentHash = $contentHash,
@ -46,7 +44,6 @@ export async function saveDocument(document: DocumentNode): Promise<string> {
createdAt: document.createdAt.toISOString(),
validAt: document.validAt.toISOString(),
totalChunks: document.totalChunks || 0,
documentId: document.documentId || null,
sessionId: document.sessionId || null,
version: document.version || 1,
contentHash: document.contentHash,
@ -67,8 +64,7 @@ export async function linkEpisodeToDocument(
MATCH (e:Episode {uuid: $episodeUuid})
MATCH (d:Document {uuid: $documentUuid})
MERGE (d)-[r:CONTAINS_CHUNK {chunkIndex: $chunkIndex}]->(e)
SET e.documentId = $documentUuid,
e.chunkIndex = $chunkIndex
SET e.chunkIndex = $chunkIndex
RETURN r
`;
@ -175,24 +171,24 @@ export async function getUserDocuments(
* Generate content hash for document versioning
*/
export function generateContentHash(content: string): string {
return crypto.createHash('sha256').update(content, 'utf8').digest('hex');
return crypto.createHash("sha256").update(content, "utf8").digest("hex");
}
/**
* Find existing document by documentId and userId for version comparison
*/
export async function findExistingDocument(
documentId: string,
sessionId: string,
userId: string,
): Promise<DocumentNode | null> {
const query = `
MATCH (d:Document {documentId: $documentId, userId: $userId})
MATCH (d:Document {sessionId: $sessionId, userId: $userId})
RETURN d
ORDER BY d.version DESC
LIMIT 1
`;
const params = { documentId, userId };
const params = { sessionId, userId };
const result = await runQuery(query, params);
if (result.length === 0) return null;
@ -219,18 +215,18 @@ export async function findExistingDocument(
* Get document version history
*/
export async function getDocumentVersions(
documentId: string,
sessionId: string,
userId: string,
limit: number = 10,
): Promise<DocumentNode[]> {
const query = `
MATCH (d:Document {documentId: $documentId, userId: $userId})
MATCH (d:Document {sessionId: $sessionId, userId: $userId})
RETURN d
ORDER BY d.version DESC
LIMIT $limit
`;
const params = { documentId, userId, limit };
const params = { sessionId, userId, limit };
const result = await runQuery(query, params);
return result.map((record) => {

View File

@ -82,28 +82,23 @@ export async function getRecentEpisodes(params: {
userId: string;
source?: string;
sessionId?: string;
type?: EpisodeType;
documentId?: string;
}): Promise<EpisodicNode[]> {
let filters = `WHERE e.validAt <= $referenceTime
AND e.userId = $userId`;
let filters = `WHERE e.validAt <= $referenceTime`;
if (params.source) {
filters += `\nAND e.source = $source`;
}
if (params.type === EpisodeType.CONVERSATION && params.sessionId) {
if (params.sessionId) {
filters += `\nAND e.sessionId = $sessionId`;
}
if (params.type === EpisodeType.DOCUMENT && params.documentId) {
filters += `\nAND e.documentId = $documentId`;
}
const query = `
MATCH (e:Episode)
MATCH (e:Episode{userId: $userId})
${filters}
RETURN e
MATCH (e)-[:HAS_PROVENANCE]->(s:Statement)
WHERE s.invalidAt IS NULL
RETURN DISTINCT e
ORDER BY e.validAt DESC
LIMIT ${params.limit}
`;
@ -113,7 +108,6 @@ export async function getRecentEpisodes(params: {
userId: params.userId,
source: params.source || null,
sessionId: params.sessionId || null,
documentId: params.documentId || null,
};
const result = await runQuery(query, queryParams);
@ -316,6 +310,7 @@ export async function getEpisodeStatements(params: {
}) {
const query = `
MATCH (episode:Episode {uuid: $episodeUuid, userId: $userId})-[:HAS_PROVENANCE]->(stmt:Statement)
WHERE stmt.invalidAt IS NULL
RETURN stmt
`;

View File

@ -249,8 +249,6 @@ export class KnowledgeGraphService {
userId: params.userId,
source: params.source,
sessionId: params.sessionId,
type: params.type,
documentId: params.documentId,
});
// Format session context from previous episodes

View File

@ -33,7 +33,6 @@ export const ingestDocumentTask = task({
try {
logger.log(`Processing document for user ${payload.userId}`, {
documentTitle: payload.body.documentTitle,
contentLength: payload.body.episodeBody.length,
});
@ -44,7 +43,7 @@ export const ingestDocumentTask = task({
},
});
const documentBody = payload.body as any;
const documentBody = payload.body;
// Step 1: Initialize services and prepare document version
const versioningService = new DocumentVersioningService();
@ -56,13 +55,12 @@ export const ingestDocumentTask = task({
versionInfo,
chunkedDocument,
} = await versioningService.prepareDocumentVersion(
documentBody.documentId || crypto.randomUUID(),
documentBody.sessionId!,
payload.userId,
documentBody.documentTitle || "Untitled Document",
documentBody.metadata?.documentTitle?.toString() || "Untitled Document",
documentBody.episodeBody,
documentBody.source,
documentBody.metadata || {},
documentBody.sessionId,
);
logger.log(`Document version analysis:`, {
@ -75,11 +73,12 @@ export const ingestDocumentTask = task({
});
// Step 2: Determine processing strategy
const differentialDecision = await differentialService.analyzeDifferentialNeed(
documentBody.episodeBody,
versionInfo.existingDocument,
chunkedDocument,
);
const differentialDecision =
await differentialService.analyzeDifferentialNeed(
documentBody.episodeBody,
versionInfo.existingDocument,
chunkedDocument,
);
logger.log(`Differential analysis:`, {
shouldUseDifferential: differentialDecision.shouldUseDifferential,
@ -94,15 +93,20 @@ export const ingestDocumentTask = task({
// Step 3.1: Invalidate statements from previous document version if it exists
let invalidationResults = null;
if (versionInfo.existingDocument && versionInfo.hasContentChanged) {
logger.log(`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`);
invalidationResults = await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion({
previousDocumentUuid: versionInfo.existingDocument.uuid,
newDocumentContent: documentBody.episodeBody,
userId: payload.userId,
invalidatedBy: document.uuid,
semanticSimilarityThreshold: 0.75, // Configurable threshold
});
logger.log(
`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`,
);
invalidationResults =
await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion(
{
previousDocumentUuid: versionInfo.existingDocument.uuid,
newDocumentContent: documentBody.episodeBody,
userId: payload.userId,
invalidatedBy: document.uuid,
semanticSimilarityThreshold: 0.75, // Configurable threshold
},
);
logger.log(`Statement invalidation completed:`, {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
@ -119,24 +123,32 @@ export const ingestDocumentTask = task({
let chunksToProcess = chunkedDocument.chunks;
let processingMode = "full";
if (differentialDecision.shouldUseDifferential && differentialDecision.strategy === "chunk_level_diff") {
if (
differentialDecision.shouldUseDifferential &&
differentialDecision.strategy === "chunk_level_diff"
) {
// Only process changed chunks
const chunkComparisons = differentialService.getChunkComparisons(
versionInfo.existingDocument!,
chunkedDocument,
);
const changedIndices = differentialService.getChunksNeedingReprocessing(chunkComparisons);
chunksToProcess = chunkedDocument.chunks.filter(chunk =>
changedIndices.includes(chunk.chunkIndex)
const changedIndices =
differentialService.getChunksNeedingReprocessing(chunkComparisons);
chunksToProcess = chunkedDocument.chunks.filter((chunk) =>
changedIndices.includes(chunk.chunkIndex),
);
processingMode = "differential";
logger.log(`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`);
logger.log(
`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`,
);
} else if (differentialDecision.strategy === "full_reingest") {
// Process all chunks
processingMode = "full";
logger.log(`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`);
logger.log(
`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`,
);
}
// Step 5: Queue chunks for processing
@ -150,14 +162,16 @@ export const ingestDocumentTask = task({
processingMode,
differentialStrategy: differentialDecision.strategy,
chunkHash: chunk.contentHash,
documentTitle:
documentBody.metadata?.documentTitle?.toString() ||
"Untitled Document",
chunkIndex: chunk.chunkIndex,
documentUuid: document.uuid,
},
source: documentBody.source,
spaceId: documentBody.spaceId,
sessionId: documentBody.sessionId,
type: EpisodeTypeEnum.DOCUMENT,
documentTitle: documentBody.documentTitle,
documentId: document.uuid, // Use the new document UUID
chunkIndex: chunk.chunkIndex,
};
const episodeHandler = await ingestTask.trigger(
@ -205,11 +219,13 @@ export const ingestDocumentTask = task({
processingMode,
differentialStrategy: differentialDecision.strategy,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults ? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
} : null,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: null,
episodes: [],
episodeHandlers,
},
@ -230,11 +246,13 @@ export const ingestDocumentTask = task({
chunksSkipped: costSavings.chunksSkipped,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults ? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
} : "No previous version",
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: "No previous version",
},
);

View File

@ -19,8 +19,6 @@ export const IngestBodyRequest = z.object({
type: z
.enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT])
.default(EpisodeType.CONVERSATION),
documentTitle: z.string().optional(),
documentId: z.string().optional(),
});
const ingestionQueue = queue({
@ -62,16 +60,26 @@ export const ingestTask = task({
);
// Link episode to document if it's a document chunk
if (episodeBody.type === EpisodeType.DOCUMENT && episodeBody.documentId && episodeDetails.episodeUuid) {
if (
episodeBody.type === EpisodeType.DOCUMENT &&
episodeBody.metadata.documentUuid &&
episodeDetails.episodeUuid
) {
try {
await linkEpisodeToDocument(
episodeDetails.episodeUuid,
episodeBody.documentId,
episodeBody.chunkIndex || 0,
episodeBody.metadata.documentUuid,
episodeBody.metadata.chunkIndex || 0,
);
logger.log(
`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.metadata.documentUuid} at chunk ${episodeBody.metadata.chunkIndex || 0}`,
);
logger.log(`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.documentId} at chunk ${episodeBody.chunkIndex || 0}`);
} catch (error) {
logger.error(`Failed to link episode to document:`, { error, episodeUuid: episodeDetails.episodeUuid, documentId: episodeBody.documentId });
logger.error(`Failed to link episode to document:`, {
error,
episodeUuid: episodeDetails.episodeUuid,
documentUuid: episodeBody.metadata.documentUuid,
});
}
}

View File

@ -12,7 +12,6 @@ export interface DocumentNode {
createdAt: Date;
validAt: Date;
totalChunks: number;
documentId?: string;
sessionId?: string;
// Version tracking for differential ingestion
version: number;
@ -39,7 +38,6 @@ export interface EpisodicNode {
space?: string;
sessionId?: string;
recallCount?: number;
documentId?: string;
chunkIndex?: number; // Index of this chunk within the document
}
@ -113,10 +111,6 @@ export type AddEpisodeParams = {
spaceId?: string;
sessionId?: string;
type?: EpisodeType;
documentTitle?: string;
documentId?: string;
chunkIndex?: number;
chunkContext?: string;
};
export type AddEpisodeResult = {