diff --git a/.env.example b/.env.example
index 3e421f4..be8502f 100644
--- a/.env.example
+++ b/.env.example
@@ -1,4 +1,4 @@
-VERSION=0.1.19
+VERSION=0.1.20
# Nest run in docker, change host to database container name
DB_HOST=localhost
diff --git a/apps/webapp/app/components/graph/graph-clustering.tsx b/apps/webapp/app/components/graph/graph-clustering.tsx
index 418742c..cb9dafb 100644
--- a/apps/webapp/app/components/graph/graph-clustering.tsx
+++ b/apps/webapp/app/components/graph/graph-clustering.tsx
@@ -513,7 +513,7 @@ export const GraphClustering = forwardRef<
} else if (complexity < 500) {
durationSeconds = 4.0;
} else {
- durationSeconds = Math.min(8, 5 + (complexity - 500) * 0.006);
+ durationSeconds = Math.min(20, 5 + (complexity - 500) * 0.006);
}
return {
diff --git a/apps/webapp/app/components/graph/graph-popover.tsx b/apps/webapp/app/components/graph/graph-popover.tsx
index 1a3cc01..a46d3c3 100644
--- a/apps/webapp/app/components/graph/graph-popover.tsx
+++ b/apps/webapp/app/components/graph/graph-popover.tsx
@@ -82,10 +82,12 @@ export function GraphPopovers({
}),
);
- return Object.entries(entityProperties).map(([key, value]) => ({
- key,
- value,
- }));
+ return Object.entries(entityProperties)
+ .map(([key, value]) => ({
+ key,
+ value,
+ }))
+ .filter(({ value }) => value);
}, [nodePopupContent]);
return (
diff --git a/apps/webapp/app/components/logs/log-details.tsx b/apps/webapp/app/components/logs/log-details.tsx
index 5e24d80..bdce7ac 100644
--- a/apps/webapp/app/components/logs/log-details.tsx
+++ b/apps/webapp/app/components/logs/log-details.tsx
@@ -4,6 +4,7 @@ import { AlertCircle, Loader2 } from "lucide-react";
import { Dialog, DialogContent, DialogHeader, DialogTitle } from "../ui/dialog";
import { Badge } from "../ui/badge";
import { type LogItem } from "~/hooks/use-logs";
+import Markdown from "react-markdown";
interface LogDetailsProps {
open: boolean;
@@ -79,13 +80,9 @@ export function LogDetails({
{/* Log Content */}
-
+
diff --git a/apps/webapp/app/hooks/usePostHog.ts b/apps/webapp/app/hooks/usePostHog.ts
index d6aa7fc..b09979d 100644
--- a/apps/webapp/app/hooks/usePostHog.ts
+++ b/apps/webapp/app/hooks/usePostHog.ts
@@ -4,7 +4,11 @@ import { useEffect, useRef } from "react";
import { useOptionalUser, useUserChanged } from "./useUser";
-export const usePostHog = (apiKey?: string, logging = false, debug = false): void => {
+export const usePostHog = (
+ apiKey?: string,
+ logging = false,
+ debug = false,
+): void => {
const postHogInitialized = useRef(false);
const location = useLocation();
const user = useOptionalUser();
@@ -15,7 +19,7 @@ export const usePostHog = (apiKey?: string, logging = false, debug = false): voi
if (postHogInitialized.current === true) return;
if (logging) console.log("Initializing PostHog");
posthog.init(apiKey, {
- api_host: "https://eu.posthog.com",
+ api_host: "https://us.i.posthog.com",
opt_in_site_apps: true,
debug,
loaded: function (posthog) {
diff --git a/apps/webapp/app/lib/ingest.server.ts b/apps/webapp/app/lib/ingest.server.ts
index a578875..2b791a2 100644
--- a/apps/webapp/app/lib/ingest.server.ts
+++ b/apps/webapp/app/lib/ingest.server.ts
@@ -1,8 +1,10 @@
// lib/ingest.queue.ts
import { IngestionStatus } from "@core/database";
+import { EpisodeType } from "@core/types";
import { type z } from "zod";
import { prisma } from "~/db.server";
import { type IngestBodyRequest, ingestTask } from "~/trigger/ingest/ingest";
+import { ingestDocumentTask } from "~/trigger/ingest/ingest-document";
export const addToQueue = async (
body: z.infer
,
@@ -35,16 +37,38 @@ export const addToQueue = async (
},
});
- const handler = await ingestTask.trigger(
- { body, userId, workspaceId: user.Workspace.id, queueId: queuePersist.id },
- {
- queue: "ingestion-queue",
- concurrencyKey: userId,
- tags: [user.id, queuePersist.id],
- },
- );
+ let handler;
+ if (body.type === EpisodeType.DOCUMENT) {
+ handler = await ingestDocumentTask.trigger(
+ {
+ body,
+ userId,
+ workspaceId: user.Workspace.id,
+ queueId: queuePersist.id,
+ },
+ {
+ queue: "document-ingestion-queue",
+ concurrencyKey: userId,
+ tags: [user.id, queuePersist.id],
+ },
+ );
+ } else if (body.type === EpisodeType.CONVERSATION) {
+ handler = await ingestTask.trigger(
+ {
+ body,
+ userId,
+ workspaceId: user.Workspace.id,
+ queueId: queuePersist.id,
+ },
+ {
+ queue: "ingestion-queue",
+ concurrencyKey: userId,
+ tags: [user.id, queuePersist.id],
+ },
+ );
+ }
- return { id: handler.id, token: handler.publicAccessToken };
+ return { id: handler?.id, token: handler?.publicAccessToken };
};
export { IngestBodyRequest };
diff --git a/apps/webapp/app/lib/neo4j.server.ts b/apps/webapp/app/lib/neo4j.server.ts
index 8c067a5..0870783 100644
--- a/apps/webapp/app/lib/neo4j.server.ts
+++ b/apps/webapp/app/lib/neo4j.server.ts
@@ -148,6 +148,8 @@ export const getClusteredGraphData = async (userId: string) => {
s.uuid as statementUuid,
s.spaceIds as spaceIds,
s.fact as fact,
+ s.invalidAt as invalidAt,
+ s.validAt as validAt,
s.createdAt as createdAt,
rel.isEntityToStatement as isEntityToStatement,
rel.isStatementToEntity as isStatementToEntity`,
@@ -175,6 +177,8 @@ export const getClusteredGraphData = async (userId: string) => {
const clusterIds = record.get("spaceIds");
const clusterId = clusterIds ? clusterIds[0] : undefined;
const fact = record.get("fact");
+ const invalidAt = record.get("invalidAt");
+ const validAt = record.get("validAt");
const createdAt = record.get("createdAt");
// Create unique edge identifier to avoid duplicates
@@ -195,6 +199,8 @@ export const getClusteredGraphData = async (userId: string) => {
clusterId,
nodeType: "Statement",
fact,
+ invalidAt,
+ validAt,
}
: {
...sourceProperties,
@@ -209,6 +215,8 @@ export const getClusteredGraphData = async (userId: string) => {
clusterId,
nodeType: "Statement",
fact,
+ invalidAt,
+ validAt,
}
: {
...targetProperties,
@@ -355,6 +363,12 @@ const initializeSchema = async () => {
await runQuery(
"CREATE INDEX entity_user_uuid IF NOT EXISTS FOR (n:Entity) ON (n.userId, n.uuid)",
);
+ await runQuery(
+ "CREATE INDEX episode_user_uuid IF NOT EXISTS FOR (n:Episode) ON (n.userId, n.uuid)",
+ );
+ await runQuery(
+ "CREATE INDEX episode_user_id IF NOT EXISTS FOR (n:Episode) ON (n.userId)",
+ );
// Create vector indexes for semantic search (if using Neo4j 5.0+)
await runQuery(`
diff --git a/apps/webapp/app/routes/api.v1.activity.tsx b/apps/webapp/app/routes/api.v1.activity.tsx
index a741bdf..a8a6e86 100644
--- a/apps/webapp/app/routes/api.v1.activity.tsx
+++ b/apps/webapp/app/routes/api.v1.activity.tsx
@@ -6,6 +6,7 @@ import { addToQueue } from "~/lib/ingest.server";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.service";
import { triggerWebhookDelivery } from "~/trigger/webhooks/webhook-delivery";
+import { EpisodeTypeEnum } from "@core/types";
const ActivityCreateSchema = z.object({
text: z.string().min(1, "Text is required"),
@@ -56,6 +57,7 @@ const { action, loader } = createActionApiRoute(
episodeBody: body.text,
referenceTime: new Date().toISOString(),
source: body.source,
+ type: EpisodeTypeEnum.CONVERSATION,
};
const queueResponse = await addToQueue(
diff --git a/apps/webapp/app/routes/api.v1.mcp.memory.tsx b/apps/webapp/app/routes/api.v1.mcp.memory.tsx
index 24d4858..50aeac4 100644
--- a/apps/webapp/app/routes/api.v1.mcp.memory.tsx
+++ b/apps/webapp/app/routes/api.v1.mcp.memory.tsx
@@ -9,6 +9,7 @@ import { addToQueue } from "~/lib/ingest.server";
import { SearchService } from "~/services/search.server";
import { handleTransport } from "~/utils/mcp";
import { SpaceService } from "~/services/space.server";
+import { EpisodeTypeEnum } from "@core/types";
// Map to store transports by session ID with cleanup tracking
const transports: {
@@ -124,6 +125,7 @@ const handleMCPRequest = async (
episodeBody: args.message,
referenceTime: new Date().toISOString(),
source,
+ type: EpisodeTypeEnum.CONVERSATION,
},
userId,
);
diff --git a/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx b/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx
index c1da349..31d511b 100644
--- a/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx
+++ b/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx
@@ -11,6 +11,7 @@ import { SpacePattern } from "~/services/spacePattern.server";
import { addToQueue } from "~/lib/ingest.server";
import { redirect } from "@remix-run/node";
import { SpaceService } from "~/services/space.server";
+import { EpisodeTypeEnum } from "@core/types";
export async function loader({ request, params }: LoaderFunctionArgs) {
const workspace = await requireWorkpace(request);
@@ -68,6 +69,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
},
source: space.name,
spaceId: space.id,
+ type: EpisodeTypeEnum.CONVERSATION,
},
userId,
);
diff --git a/apps/webapp/app/routes/onboarding.tsx b/apps/webapp/app/routes/onboarding.tsx
index 7b8caa9..63ce074 100644
--- a/apps/webapp/app/routes/onboarding.tsx
+++ b/apps/webapp/app/routes/onboarding.tsx
@@ -26,6 +26,7 @@ import { updateUser } from "~/models/user.server";
import { Copy, Check } from "lucide-react";
import { addToQueue } from "~/lib/ingest.server";
import { cn } from "~/lib/utils";
+import { EpisodeTypeEnum } from "@core/types";
const ONBOARDING_STEP_COOKIE = "onboardingStep";
const onboardingStepCookie = createCookie(ONBOARDING_STEP_COOKIE, {
@@ -75,6 +76,7 @@ export async function action({ request }: ActionFunctionArgs) {
source: "Core",
episodeBody: aboutUser,
referenceTime: new Date().toISOString(),
+ type: EpisodeTypeEnum.CONVERSATION,
},
userId,
);
diff --git a/apps/webapp/app/services/documentChunker.server.ts b/apps/webapp/app/services/documentChunker.server.ts
new file mode 100644
index 0000000..63cc99f
--- /dev/null
+++ b/apps/webapp/app/services/documentChunker.server.ts
@@ -0,0 +1,315 @@
+import { encode } from "gpt-tokenizer";
+import crypto from "crypto";
+
+export interface DocumentChunk {
+ content: string;
+ chunkIndex: number;
+ title?: string;
+ context?: string;
+ startPosition: number;
+ endPosition: number;
+ contentHash: string; // Hash for change detection
+}
+
+export interface ChunkedDocument {
+ documentId: string;
+ title: string;
+ originalContent: string;
+ chunks: DocumentChunk[];
+ totalChunks: number;
+ contentHash: string; // Hash of the entire document
+ chunkHashes: string[]; // Array of chunk hashes for change detection
+}
+
+/**
+ * Document chunking service that splits large documents into semantic chunks
+ * Targets 10-15k tokens per chunk with natural paragraph boundaries
+ */
+export class DocumentChunker {
+ private readonly TARGET_CHUNK_SIZE = 12500; // Middle of 10-15k range
+ private readonly MIN_CHUNK_SIZE = 10000;
+ private readonly MAX_CHUNK_SIZE = 15000;
+ private readonly MIN_PARAGRAPH_SIZE = 100; // Minimum tokens for a paragraph to be considered
+
+ /**
+ * Chunk a document into semantic sections with natural boundaries
+ */
+ async chunkDocument(
+ originalContent: string,
+ title: string,
+ ): Promise {
+ const documentId = crypto.randomUUID();
+ const contentHash = this.generateContentHash(originalContent);
+
+ // First, split by major section headers (markdown style)
+ const majorSections = this.splitByMajorSections(originalContent);
+
+ const chunks: DocumentChunk[] = [];
+ let currentChunk = "";
+ let currentChunkStart = 0;
+ let chunkIndex = 0;
+
+ for (const section of majorSections) {
+ const sectionTokens = encode(section.content).length;
+ const currentChunkTokens = encode(currentChunk).length;
+
+ // If adding this section would exceed max size, finalize current chunk
+ if (currentChunkTokens > 0 && currentChunkTokens + sectionTokens > this.MAX_CHUNK_SIZE) {
+ if (currentChunkTokens >= this.MIN_CHUNK_SIZE) {
+ chunks.push(this.createChunk(
+ currentChunk,
+ chunkIndex,
+ currentChunkStart,
+ currentChunkStart + currentChunk.length,
+ section.title
+ ));
+ chunkIndex++;
+ currentChunk = "";
+ currentChunkStart = section.startPosition;
+ }
+ }
+
+ // Add section to current chunk
+ if (currentChunk) {
+ currentChunk += "\n\n" + section.content;
+ } else {
+ currentChunk = section.content;
+ currentChunkStart = section.startPosition;
+ }
+
+ // If current chunk is large enough and we have a natural break, consider chunking
+ const updatedChunkTokens = encode(currentChunk).length;
+ if (updatedChunkTokens >= this.TARGET_CHUNK_SIZE) {
+ // Try to find a good breaking point within the section
+ const paragraphs = this.splitIntoParagraphs(section.content);
+ if (paragraphs.length > 1) {
+ // Split at paragraph boundary if beneficial
+ const optimalSplit = this.findOptimalParagraphSplit(currentChunk);
+ if (optimalSplit) {
+ chunks.push(this.createChunk(
+ optimalSplit.beforeSplit,
+ chunkIndex,
+ currentChunkStart,
+ currentChunkStart + optimalSplit.beforeSplit.length,
+ section.title
+ ));
+ chunkIndex++;
+ currentChunk = optimalSplit.afterSplit;
+ currentChunkStart = currentChunkStart + optimalSplit.beforeSplit.length;
+ }
+ }
+ }
+ }
+
+ // Add remaining content as final chunk
+ if (currentChunk.trim() && encode(currentChunk).length >= this.MIN_PARAGRAPH_SIZE) {
+ chunks.push(this.createChunk(
+ currentChunk,
+ chunkIndex,
+ currentChunkStart,
+ originalContent.length
+ ));
+ }
+
+ // Generate chunk hashes array
+ const chunkHashes = chunks.map(chunk => chunk.contentHash);
+
+ return {
+ documentId,
+ title,
+ originalContent,
+ chunks,
+ totalChunks: chunks.length,
+ contentHash,
+ chunkHashes,
+ };
+ }
+
+ private splitByMajorSections(content: string): Array<{
+ content: string;
+ title?: string;
+ startPosition: number;
+ endPosition: number;
+ }> {
+ const sections: Array<{
+ content: string;
+ title?: string;
+ startPosition: number;
+ endPosition: number;
+ }> = [];
+
+ // Split by markdown headers (# ## ### etc.) or common document patterns
+ const headerRegex = /^(#{1,6}\s+.*$|={3,}$|-{3,}$)/gm;
+ const matches = Array.from(content.matchAll(headerRegex));
+
+ if (matches.length === 0) {
+ // No headers found, treat as single section
+ sections.push({
+ content: content.trim(),
+ startPosition: 0,
+ endPosition: content.length,
+ });
+ return sections;
+ }
+
+ let lastIndex = 0;
+
+ for (let i = 0; i < matches.length; i++) {
+ const match = matches[i];
+ const nextMatch = matches[i + 1];
+
+ const sectionStart = lastIndex;
+ const sectionEnd = nextMatch ? nextMatch.index! : content.length;
+
+ const sectionContent = content.slice(sectionStart, sectionEnd).trim();
+
+ if (sectionContent) {
+ sections.push({
+ content: sectionContent,
+ title: this.extractSectionTitle(match[0]),
+ startPosition: sectionStart,
+ endPosition: sectionEnd,
+ });
+ }
+
+ lastIndex = match.index! + match[0].length;
+ }
+
+ return sections;
+ }
+
+ private extractSectionTitle(header: string): string | undefined {
+ // Extract title from markdown header
+ const markdownMatch = header.match(/^#{1,6}\s+(.+)$/);
+ if (markdownMatch) {
+ return markdownMatch[1].trim();
+ }
+ return undefined;
+ }
+
+ private splitIntoParagraphs(content: string): string[] {
+ // Split by double newlines (paragraph breaks) and filter out empty strings
+ return content
+ .split(/\n\s*\n/)
+ .map(p => p.trim())
+ .filter(p => p.length > 0);
+ }
+
+ private findOptimalParagraphSplit(content: string): {
+ beforeSplit: string;
+ afterSplit: string;
+ } | null {
+ const paragraphs = this.splitIntoParagraphs(content);
+ if (paragraphs.length < 2) return null;
+
+ let bestSplitIndex = -1;
+ let bestScore = 0;
+
+ // Find the split that gets us closest to target size
+ for (let i = 1; i < paragraphs.length; i++) {
+ const beforeSplit = paragraphs.slice(0, i).join("\n\n");
+ const afterSplit = paragraphs.slice(i).join("\n\n");
+
+ const beforeTokens = encode(beforeSplit).length;
+ const afterTokens = encode(afterSplit).length;
+
+ // Score based on how close we get to target, avoiding too small chunks
+ if (beforeTokens >= this.MIN_CHUNK_SIZE && afterTokens >= this.MIN_PARAGRAPH_SIZE) {
+ const beforeDistance = Math.abs(beforeTokens - this.TARGET_CHUNK_SIZE);
+ const score = 1 / (1 + beforeDistance); // Higher score for closer to target
+
+ if (score > bestScore) {
+ bestScore = score;
+ bestSplitIndex = i;
+ }
+ }
+ }
+
+ if (bestSplitIndex > 0) {
+ return {
+ beforeSplit: paragraphs.slice(0, bestSplitIndex).join("\n\n"),
+ afterSplit: paragraphs.slice(bestSplitIndex).join("\n\n"),
+ };
+ }
+
+ return null;
+ }
+
+ private createChunk(
+ content: string,
+ chunkIndex: number,
+ startPosition: number,
+ endPosition: number,
+ title?: string
+ ): DocumentChunk {
+ // Generate a concise context/title if not provided
+ const context = title || this.generateChunkContext(content);
+ const contentHash = this.generateContentHash(content.trim());
+
+ return {
+ content: content.trim(),
+ chunkIndex,
+ title: context,
+ context: `Chunk ${chunkIndex + 1}${context ? `: ${context}` : ""}`,
+ startPosition,
+ endPosition,
+ contentHash,
+ };
+ }
+
+ private generateChunkContext(content: string): string {
+ // Extract first meaningful line as context (avoiding markdown syntax)
+ const lines = content.split('\n').map(line => line.trim()).filter(Boolean);
+
+ for (const line of lines.slice(0, 3)) {
+ // Skip markdown headers and find first substantial content
+ if (!line.match(/^#{1,6}\s/) && !line.match(/^[=-]{3,}$/) && line.length > 10) {
+ return line.substring(0, 100) + (line.length > 100 ? "..." : "");
+ }
+ }
+
+ return "Document content";
+ }
+
+ /**
+ * Generate content hash for change detection
+ */
+ private generateContentHash(content: string): string {
+ return crypto.createHash('sha256').update(content, 'utf8').digest('hex').substring(0, 16);
+ }
+
+ /**
+ * Compare chunk hashes to detect changes
+ */
+ static compareChunkHashes(oldHashes: string[], newHashes: string[]): {
+ changedIndices: number[];
+ changePercentage: number;
+ } {
+ const maxLength = Math.max(oldHashes.length, newHashes.length);
+ const changedIndices: number[] = [];
+
+ for (let i = 0; i < maxLength; i++) {
+ const oldHash = oldHashes[i];
+ const newHash = newHashes[i];
+
+ // Mark as changed if hash is different or chunk added/removed
+ if (oldHash !== newHash) {
+ changedIndices.push(i);
+ }
+ }
+
+ const changePercentage = maxLength > 0 ? (changedIndices.length / maxLength) * 100 : 0;
+
+ return {
+ changedIndices,
+ changePercentage,
+ };
+ }
+
+ /**
+ * Calculate document size in tokens for threshold decisions
+ */
+ static getDocumentSizeInTokens(content: string): number {
+ return encode(content).length;
+ }
+}
\ No newline at end of file
diff --git a/apps/webapp/app/services/documentDiffer.server.ts b/apps/webapp/app/services/documentDiffer.server.ts
new file mode 100644
index 0000000..c970c70
--- /dev/null
+++ b/apps/webapp/app/services/documentDiffer.server.ts
@@ -0,0 +1,204 @@
+import { encode } from "gpt-tokenizer";
+import { DocumentChunker, type ChunkedDocument } from "./documentChunker.server";
+import type { DocumentNode } from "@core/types";
+
+export interface DifferentialDecision {
+ shouldUseDifferential: boolean;
+ strategy: "full_reingest" | "chunk_level_diff" | "new_document";
+ reason: string;
+ changedChunkIndices: number[];
+ changePercentage: number;
+ documentSizeTokens: number;
+}
+
+export interface ChunkComparison {
+ chunkIndex: number;
+ hasChanged: boolean;
+ oldHash?: string;
+ newHash: string;
+ semanticSimilarity?: number;
+}
+
+/**
+ * Service for implementing differential document processing logic
+ * Determines when to use differential vs full re-ingestion based on
+ * document size and change percentage thresholds
+ */
+export class DocumentDifferentialService {
+ // Threshold constants based on our enhanced approach
+ private readonly SMALL_DOC_THRESHOLD = 5 * 1000; // 5K tokens
+ private readonly MEDIUM_DOC_THRESHOLD = 50 * 1000; // 50K tokens
+
+ // Change percentage thresholds
+ private readonly SMALL_CHANGE_THRESHOLD = 20; // 20%
+ private readonly MEDIUM_CHANGE_THRESHOLD = 30; // 30%
+
+ /**
+ * Analyze whether to use differential processing for a document update
+ */
+ async analyzeDifferentialNeed(
+ newContent: string,
+ existingDocument: DocumentNode | null,
+ newChunkedDocument: ChunkedDocument,
+ ): Promise {
+ // If no existing document, it's a new document
+ if (!existingDocument) {
+ return {
+ shouldUseDifferential: false,
+ strategy: "new_document",
+ reason: "No existing document found",
+ changedChunkIndices: [],
+ changePercentage: 100,
+ documentSizeTokens: encode(newContent).length,
+ };
+ }
+
+ const documentSizeTokens = encode(newContent).length;
+
+ // Quick content hash comparison
+ if (existingDocument.contentHash === newChunkedDocument.contentHash) {
+ return {
+ shouldUseDifferential: false,
+ strategy: "full_reingest", // No changes detected
+ reason: "Document content unchanged",
+ changedChunkIndices: [],
+ changePercentage: 0,
+ documentSizeTokens,
+ };
+ }
+
+ // Compare chunk hashes to identify changes
+ const chunkComparison = DocumentChunker.compareChunkHashes(
+ existingDocument.chunkHashes || [],
+ newChunkedDocument.chunkHashes,
+ );
+
+ const { changedIndices, changePercentage } = chunkComparison;
+
+ // Apply threshold-based decision matrix
+ const decision = this.applyThresholdDecision(
+ documentSizeTokens,
+ changePercentage,
+ changedIndices,
+ );
+
+ return {
+ ...decision,
+ changedChunkIndices: changedIndices,
+ changePercentage,
+ documentSizeTokens,
+ };
+ }
+
+ /**
+ * Apply threshold-based decision matrix
+ */
+ private applyThresholdDecision(
+ documentSizeTokens: number,
+ changePercentage: number,
+ changedIndices: number[],
+ ): Pick {
+ // Small documents: always full re-ingest (cheap)
+ if (documentSizeTokens < this.SMALL_DOC_THRESHOLD) {
+ return {
+ shouldUseDifferential: false,
+ strategy: "full_reingest",
+ reason: `Document too small (${documentSizeTokens} tokens < ${this.SMALL_DOC_THRESHOLD})`,
+ };
+ }
+
+ // Medium documents (5-50K tokens)
+ if (documentSizeTokens < this.MEDIUM_DOC_THRESHOLD) {
+ if (changePercentage < this.SMALL_CHANGE_THRESHOLD) {
+ return {
+ shouldUseDifferential: true,
+ strategy: "chunk_level_diff",
+ reason: `Medium document with small changes (${changePercentage.toFixed(1)}% < ${this.SMALL_CHANGE_THRESHOLD}%)`,
+ };
+ } else {
+ return {
+ shouldUseDifferential: false,
+ strategy: "full_reingest",
+ reason: `Medium document with large changes (${changePercentage.toFixed(1)}% >= ${this.SMALL_CHANGE_THRESHOLD}%)`,
+ };
+ }
+ }
+
+ // Large documents (>50K tokens)
+ if (changePercentage < this.MEDIUM_CHANGE_THRESHOLD) {
+ return {
+ shouldUseDifferential: true,
+ strategy: "chunk_level_diff",
+ reason: `Large document with moderate changes (${changePercentage.toFixed(1)}% < ${this.MEDIUM_CHANGE_THRESHOLD}%)`,
+ };
+ } else {
+ return {
+ shouldUseDifferential: false,
+ strategy: "full_reingest",
+ reason: `Large document with extensive changes (${changePercentage.toFixed(1)}% >= ${this.MEDIUM_CHANGE_THRESHOLD}%)`,
+ };
+ }
+ }
+
+ /**
+ * Get detailed chunk comparison for differential processing
+ */
+ getChunkComparisons(
+ existingDocument: DocumentNode,
+ newChunkedDocument: ChunkedDocument,
+ ): ChunkComparison[] {
+ const oldHashes = existingDocument.chunkHashes || [];
+ const newHashes = newChunkedDocument.chunkHashes;
+ const maxLength = Math.max(oldHashes.length, newHashes.length);
+
+ const comparisons: ChunkComparison[] = [];
+
+ for (let i = 0; i < maxLength; i++) {
+ const oldHash = oldHashes[i];
+ const newHash = newHashes[i];
+
+ comparisons.push({
+ chunkIndex: i,
+ hasChanged: oldHash !== newHash,
+ oldHash,
+ newHash: newHash || "", // Handle case where new doc has fewer chunks
+ });
+ }
+
+ return comparisons;
+ }
+
+ /**
+ * Filter chunks that need re-processing
+ */
+ getChunksNeedingReprocessing(
+ chunkComparisons: ChunkComparison[],
+ ): number[] {
+ return chunkComparisons
+ .filter(comparison => comparison.hasChanged)
+ .map(comparison => comparison.chunkIndex);
+ }
+
+ /**
+ * Calculate processing cost savings estimate
+ */
+ calculateCostSavings(
+ totalChunks: number,
+ changedChunks: number,
+ ): {
+ chunksToProcess: number;
+ chunksSkipped: number;
+ estimatedSavingsPercentage: number;
+ } {
+ const chunksSkipped = totalChunks - changedChunks;
+ const estimatedSavingsPercentage = totalChunks > 0
+ ? (chunksSkipped / totalChunks) * 100
+ : 0;
+
+ return {
+ chunksToProcess: changedChunks,
+ chunksSkipped,
+ estimatedSavingsPercentage,
+ };
+ }
+}
\ No newline at end of file
diff --git a/apps/webapp/app/services/documentVersioning.server.ts b/apps/webapp/app/services/documentVersioning.server.ts
new file mode 100644
index 0000000..14d428f
--- /dev/null
+++ b/apps/webapp/app/services/documentVersioning.server.ts
@@ -0,0 +1,321 @@
+import crypto from "crypto";
+import type { DocumentNode } from "@core/types";
+import {
+ findExistingDocument,
+ getDocumentVersions,
+} from "./graphModels/document";
+import {
+ DocumentChunker,
+ type ChunkedDocument,
+} from "./documentChunker.server";
+import { KnowledgeGraphService } from "./knowledgeGraph.server";
+
+export interface DocumentVersion {
+ uuid: string;
+ version: number;
+ contentHash: string;
+ chunkHashes: string[];
+ createdAt: Date;
+ validAt: Date;
+ title: string;
+ metadata: Record;
+}
+
+export interface VersionedDocumentInfo {
+ isNewDocument: boolean;
+ existingDocument: DocumentNode | null;
+ newVersion: number;
+ previousVersionUuid: string | null;
+ hasContentChanged: boolean;
+ chunkLevelChanges: {
+ changedChunkIndices: number[];
+ changePercentage: number;
+ totalChunks: number;
+ };
+}
+
+/**
+ * Service for managing document versions and coordinating differential ingestion
+ * Integrates with the knowledge graph for semantic similarity checks
+ */
+export class DocumentVersioningService {
+ private knowledgeGraphService: KnowledgeGraphService;
+
+ constructor() {
+ this.knowledgeGraphService = new KnowledgeGraphService();
+ }
+
+ /**
+ * Prepare a new document version with proper versioning information
+ */
+ async prepareDocumentVersion(
+ sessionId: string,
+ userId: string,
+ title: string,
+ content: string,
+ source: string,
+ metadata: Record = {},
+ ): Promise<{
+ documentNode: DocumentNode;
+ versionInfo: VersionedDocumentInfo;
+ chunkedDocument: ChunkedDocument;
+ }> {
+ // Find existing document for version comparison
+ const existingDocument = await findExistingDocument(sessionId, userId);
+
+ // Chunk the new document content
+ const documentChunker = new DocumentChunker();
+ const chunkedDocument = await documentChunker.chunkDocument(content, title);
+
+ // Determine version information
+ const versionInfo = this.analyzeVersionChanges(
+ existingDocument,
+ chunkedDocument,
+ );
+
+ // Create new document node
+ const documentNode = this.createVersionedDocumentNode(
+ sessionId,
+ userId,
+ title,
+ content,
+ source,
+ metadata,
+ versionInfo,
+ chunkedDocument,
+ );
+
+ return {
+ documentNode,
+ versionInfo,
+ chunkedDocument,
+ };
+ }
+
+ /**
+ * Analyze changes between existing and new document versions
+ */
+ private analyzeVersionChanges(
+ existingDocument: DocumentNode | null,
+ newChunkedDocument: ChunkedDocument,
+ ): VersionedDocumentInfo {
+ if (!existingDocument) {
+ return {
+ isNewDocument: true,
+ existingDocument: null,
+ newVersion: 1,
+ previousVersionUuid: null,
+ hasContentChanged: true,
+ chunkLevelChanges: {
+ changedChunkIndices: [],
+ changePercentage: 100,
+ totalChunks: newChunkedDocument.totalChunks,
+ },
+ };
+ }
+
+ // Check if content has actually changed
+ const hasContentChanged =
+ existingDocument.contentHash !== newChunkedDocument.contentHash;
+
+ if (!hasContentChanged) {
+ return {
+ isNewDocument: false,
+ existingDocument,
+ newVersion: existingDocument.version,
+ previousVersionUuid: existingDocument.uuid,
+ hasContentChanged: false,
+ chunkLevelChanges: {
+ changedChunkIndices: [],
+ changePercentage: 0,
+ totalChunks: newChunkedDocument.totalChunks,
+ },
+ };
+ }
+
+ // Analyze chunk-level changes
+ const chunkComparison = DocumentChunker.compareChunkHashes(
+ existingDocument.chunkHashes || [],
+ newChunkedDocument.chunkHashes,
+ );
+
+ return {
+ isNewDocument: false,
+ existingDocument,
+ newVersion: existingDocument.version + 1,
+ previousVersionUuid: existingDocument.uuid,
+ hasContentChanged: true,
+ chunkLevelChanges: {
+ changedChunkIndices: chunkComparison.changedIndices,
+ changePercentage: chunkComparison.changePercentage,
+ totalChunks: newChunkedDocument.totalChunks,
+ },
+ };
+ }
+
+ /**
+ * Create a new versioned document node
+ */
+ private createVersionedDocumentNode(
+ sessionId: string,
+ userId: string,
+ title: string,
+ content: string,
+ source: string,
+ metadata: Record,
+ versionInfo: VersionedDocumentInfo,
+ chunkedDocument: ChunkedDocument,
+ ): DocumentNode {
+ return {
+ uuid: crypto.randomUUID(),
+ title,
+ originalContent: content,
+ metadata: {
+ ...metadata,
+ chunkingStrategy: "semantic_sections",
+ targetChunkSize: 12500,
+ actualChunks: chunkedDocument.totalChunks,
+ },
+ source,
+ userId,
+ createdAt: new Date(),
+ validAt: new Date(),
+ totalChunks: chunkedDocument.totalChunks,
+ version: versionInfo.newVersion,
+ contentHash: chunkedDocument.contentHash,
+ previousVersionUuid: versionInfo.previousVersionUuid || undefined,
+ chunkHashes: chunkedDocument.chunkHashes,
+ sessionId,
+ };
+ }
+
+ /**
+ * Get version history for a document
+ */
+ async getDocumentHistory(
+ documentId: string,
+ userId: string,
+ limit: number = 10,
+ ): Promise {
+ const versions = await getDocumentVersions(documentId, userId, limit);
+
+ return versions.map((doc) => ({
+ uuid: doc.uuid,
+ version: doc.version,
+ contentHash: doc.contentHash,
+ chunkHashes: doc.chunkHashes || [],
+ createdAt: doc.createdAt,
+ validAt: doc.validAt,
+ title: doc.title,
+ metadata: doc.metadata,
+ }));
+ }
+
+ /**
+ * Check if statements should be invalidated based on semantic similarity
+ * This implements the semantic similarity gate (>0.85 threshold)
+ */
+ async checkStatementInvalidation(
+ oldChunkContent: string,
+ newChunkContent: string,
+ threshold: number = 0.85,
+ ): Promise<{
+ shouldInvalidate: boolean;
+ semanticSimilarity: number;
+ }> {
+ try {
+ // Generate embeddings for both chunks
+ const [oldEmbedding, newEmbedding] = await Promise.all([
+ this.knowledgeGraphService.getEmbedding(oldChunkContent),
+ this.knowledgeGraphService.getEmbedding(newChunkContent),
+ ]);
+
+ // Calculate cosine similarity
+ const similarity = this.calculateCosineSimilarity(
+ oldEmbedding,
+ newEmbedding,
+ );
+
+ // If similarity is below threshold, invalidate old statements
+ const shouldInvalidate = similarity < threshold;
+
+ return {
+ shouldInvalidate,
+ semanticSimilarity: similarity,
+ };
+ } catch (error) {
+ console.error("Error checking statement invalidation:", error);
+ // On error, be conservative and invalidate
+ return {
+ shouldInvalidate: true,
+ semanticSimilarity: 0,
+ };
+ }
+ }
+
+ /**
+ * Calculate cosine similarity between two embedding vectors
+ */
+ private calculateCosineSimilarity(vecA: number[], vecB: number[]): number {
+ if (vecA.length !== vecB.length) {
+ throw new Error("Vector dimensions must match");
+ }
+
+ let dotProduct = 0;
+ let normA = 0;
+ let normB = 0;
+
+ for (let i = 0; i < vecA.length; i++) {
+ dotProduct += vecA[i] * vecB[i];
+ normA += vecA[i] * vecA[i];
+ normB += vecB[i] * vecB[i];
+ }
+
+ normA = Math.sqrt(normA);
+ normB = Math.sqrt(normB);
+
+ if (normA === 0 || normB === 0) {
+ return 0;
+ }
+
+ return dotProduct / (normA * normB);
+ }
+
+ /**
+ * Generate a differential processing report
+ */
+ generateDifferentialReport(
+ versionInfo: VersionedDocumentInfo,
+ processingStats: {
+ chunksProcessed: number;
+ chunksSkipped: number;
+ statementsCreated: number;
+ statementsInvalidated: number;
+ processingTimeMs: number;
+ },
+ ): {
+ summary: string;
+ metrics: Record;
+ } {
+ const totalChunks = versionInfo.chunkLevelChanges.totalChunks;
+ const changePercentage = versionInfo.chunkLevelChanges.changePercentage;
+ const savingsPercentage =
+ totalChunks > 0 ? (processingStats.chunksSkipped / totalChunks) * 100 : 0;
+
+ return {
+ summary: `Document v${versionInfo.newVersion}: ${changePercentage.toFixed(1)}% changed, ${savingsPercentage.toFixed(1)}% processing saved`,
+ metrics: {
+ version: versionInfo.newVersion,
+ isNewDocument: versionInfo.isNewDocument,
+ totalChunks,
+ chunksChanged: processingStats.chunksProcessed,
+ chunksSkipped: processingStats.chunksSkipped,
+ changePercentage: changePercentage,
+ processingTimeMs: processingStats.processingTimeMs,
+ statementsCreated: processingStats.statementsCreated,
+ statementsInvalidated: processingStats.statementsInvalidated,
+ estimatedCostSavings: savingsPercentage,
+ },
+ };
+ }
+}
diff --git a/apps/webapp/app/services/graphModels/document.ts b/apps/webapp/app/services/graphModels/document.ts
new file mode 100644
index 0000000..cdfbf38
--- /dev/null
+++ b/apps/webapp/app/services/graphModels/document.ts
@@ -0,0 +1,250 @@
+import { runQuery } from "~/lib/neo4j.server";
+import type { DocumentNode } from "@core/types";
+import crypto from "crypto";
+
+export async function saveDocument(document: DocumentNode): Promise {
+ const query = `
+ MERGE (d:Document {uuid: $uuid})
+ ON CREATE SET
+ d.title = $title,
+ d.originalContent = $originalContent,
+ d.metadata = $metadata,
+ d.source = $source,
+ d.userId = $userId,
+ d.createdAt = $createdAt,
+ d.validAt = $validAt,
+ d.totalChunks = $totalChunks,
+ d.sessionId = $sessionId,
+ d.version = $version,
+ d.contentHash = $contentHash,
+ d.previousVersionUuid = $previousVersionUuid,
+ d.chunkHashes = $chunkHashes
+ ON MATCH SET
+ d.title = $title,
+ d.originalContent = $originalContent,
+ d.metadata = $metadata,
+ d.source = $source,
+ d.validAt = $validAt,
+ d.totalChunks = $totalChunks,
+ d.sessionId = $sessionId,
+ d.version = $version,
+ d.contentHash = $contentHash,
+ d.previousVersionUuid = $previousVersionUuid,
+ d.chunkHashes = $chunkHashes
+ RETURN d.uuid as uuid
+ `;
+
+ const params = {
+ uuid: document.uuid,
+ title: document.title,
+ originalContent: document.originalContent,
+ metadata: JSON.stringify(document.metadata || {}),
+ source: document.source,
+ userId: document.userId || null,
+ createdAt: document.createdAt.toISOString(),
+ validAt: document.validAt.toISOString(),
+ totalChunks: document.totalChunks || 0,
+ sessionId: document.sessionId || null,
+ version: document.version || 1,
+ contentHash: document.contentHash,
+ previousVersionUuid: document.previousVersionUuid || null,
+ chunkHashes: document.chunkHashes || [],
+ };
+
+ const result = await runQuery(query, params);
+ return result[0].get("uuid");
+}
+
+export async function linkEpisodeToDocument(
+ episodeUuid: string,
+ documentUuid: string,
+ chunkIndex: number,
+): Promise {
+ const query = `
+ MATCH (e:Episode {uuid: $episodeUuid})
+ MATCH (d:Document {uuid: $documentUuid})
+ MERGE (d)-[r:CONTAINS_CHUNK {chunkIndex: $chunkIndex}]->(e)
+ SET e.chunkIndex = $chunkIndex
+ RETURN r
+ `;
+
+ const params = {
+ episodeUuid,
+ documentUuid,
+ chunkIndex,
+ };
+
+ await runQuery(query, params);
+}
+
+export async function getDocument(
+ documentUuid: string,
+): Promise {
+ const query = `
+ MATCH (d:Document {uuid: $uuid})
+ RETURN d
+ `;
+
+ const params = { uuid: documentUuid };
+ const result = await runQuery(query, params);
+
+ if (result.length === 0) return null;
+
+ const record = result[0];
+ const documentNode = record.get("d");
+
+ return {
+ uuid: documentNode.properties.uuid,
+ title: documentNode.properties.title,
+ originalContent: documentNode.properties.originalContent,
+ metadata: JSON.parse(documentNode.properties.metadata || "{}"),
+ source: documentNode.properties.source,
+ userId: documentNode.properties.userId,
+ createdAt: new Date(documentNode.properties.createdAt),
+ validAt: new Date(documentNode.properties.validAt),
+ totalChunks: documentNode.properties.totalChunks,
+ version: documentNode.properties.version || 1,
+ contentHash: documentNode.properties.contentHash || "",
+ previousVersionUuid: documentNode.properties.previousVersionUuid || null,
+ chunkHashes: documentNode.properties.chunkHashes || [],
+ };
+}
+
+export async function getDocumentEpisodes(documentUuid: string): Promise<
+ Array<{
+ episodeUuid: string;
+ chunkIndex: number;
+ content: string;
+ }>
+> {
+ const query = `
+ MATCH (d:Document {uuid: $uuid})-[r:CONTAINS_CHUNK]->(e:Episode)
+ RETURN e.uuid as episodeUuid, r.chunkIndex as chunkIndex, e.content as content
+ ORDER BY r.chunkIndex ASC
+ `;
+
+ const params = { uuid: documentUuid };
+ const result = await runQuery(query, params);
+
+ return result.map((record) => ({
+ episodeUuid: record.get("episodeUuid"),
+ chunkIndex: record.get("chunkIndex"),
+ content: record.get("content"),
+ }));
+}
+
+export async function getUserDocuments(
+ userId: string,
+ limit: number = 50,
+): Promise {
+ const query = `
+ MATCH (d:Document {userId: $userId})
+ RETURN d
+ ORDER BY d.createdAt DESC
+ LIMIT $limit
+ `;
+
+ const params = { userId, limit };
+ const result = await runQuery(query, params);
+
+ return result.map((record) => {
+ const documentNode = record.get("d");
+ return {
+ uuid: documentNode.properties.uuid,
+ title: documentNode.properties.title,
+ originalContent: documentNode.properties.originalContent,
+ metadata: JSON.parse(documentNode.properties.metadata || "{}"),
+ source: documentNode.properties.source,
+ userId: documentNode.properties.userId,
+ createdAt: new Date(documentNode.properties.createdAt),
+ validAt: new Date(documentNode.properties.validAt),
+ totalChunks: documentNode.properties.totalChunks,
+ version: documentNode.properties.version || 1,
+ contentHash: documentNode.properties.contentHash || "",
+ previousVersionUuid: documentNode.properties.previousVersionUuid || null,
+ chunkHashes: documentNode.properties.chunkHashes || [],
+ };
+ });
+}
+
+/**
+ * Generate content hash for document versioning
+ */
+export function generateContentHash(content: string): string {
+ return crypto.createHash("sha256").update(content, "utf8").digest("hex");
+}
+
+/**
+ * Find existing document by documentId and userId for version comparison
+ */
+export async function findExistingDocument(
+ sessionId: string,
+ userId: string,
+): Promise {
+ const query = `
+ MATCH (d:Document {sessionId: $sessionId, userId: $userId})
+ RETURN d
+ ORDER BY d.version DESC
+ LIMIT 1
+ `;
+
+ const params = { sessionId, userId };
+ const result = await runQuery(query, params);
+
+ if (result.length === 0) return null;
+
+ const documentNode = result[0].get("d");
+ return {
+ uuid: documentNode.properties.uuid,
+ title: documentNode.properties.title,
+ originalContent: documentNode.properties.originalContent,
+ metadata: JSON.parse(documentNode.properties.metadata || "{}"),
+ source: documentNode.properties.source,
+ userId: documentNode.properties.userId,
+ createdAt: new Date(documentNode.properties.createdAt),
+ validAt: new Date(documentNode.properties.validAt),
+ totalChunks: documentNode.properties.totalChunks,
+ version: documentNode.properties.version || 1,
+ contentHash: documentNode.properties.contentHash || "",
+ previousVersionUuid: documentNode.properties.previousVersionUuid || null,
+ chunkHashes: documentNode.properties.chunkHashes || [],
+ };
+}
+
+/**
+ * Get document version history
+ */
+export async function getDocumentVersions(
+ sessionId: string,
+ userId: string,
+ limit: number = 10,
+): Promise {
+ const query = `
+ MATCH (d:Document {sessionId: $sessionId, userId: $userId})
+ RETURN d
+ ORDER BY d.version DESC
+ LIMIT $limit
+ `;
+
+ const params = { sessionId, userId, limit };
+ const result = await runQuery(query, params);
+
+ return result.map((record) => {
+ const documentNode = record.get("d");
+ return {
+ uuid: documentNode.properties.uuid,
+ title: documentNode.properties.title,
+ originalContent: documentNode.properties.originalContent,
+ metadata: JSON.parse(documentNode.properties.metadata || "{}"),
+ source: documentNode.properties.source,
+ userId: documentNode.properties.userId,
+ createdAt: new Date(documentNode.properties.createdAt),
+ validAt: new Date(documentNode.properties.validAt),
+ totalChunks: documentNode.properties.totalChunks,
+ version: documentNode.properties.version || 1,
+ contentHash: documentNode.properties.contentHash || "",
+ previousVersionUuid: documentNode.properties.previousVersionUuid || null,
+ chunkHashes: documentNode.properties.chunkHashes || [],
+ };
+ });
+}
diff --git a/apps/webapp/app/services/graphModels/episode.ts b/apps/webapp/app/services/graphModels/episode.ts
index 38bac51..8ff53e3 100644
--- a/apps/webapp/app/services/graphModels/episode.ts
+++ b/apps/webapp/app/services/graphModels/episode.ts
@@ -1,5 +1,5 @@
import { runQuery } from "~/lib/neo4j.server";
-import type { EntityNode, EpisodicNode } from "@core/types";
+import { type EntityNode, EpisodeType, type EpisodicNode } from "@core/types";
export async function saveEpisode(episode: EpisodicNode): Promise {
const query = `
@@ -83,8 +83,7 @@ export async function getRecentEpisodes(params: {
source?: string;
sessionId?: string;
}): Promise {
- let filters = `WHERE e.validAt <= $referenceTime
- AND e.userId = $userId`;
+ let filters = `WHERE e.validAt <= $referenceTime`;
if (params.source) {
filters += `\nAND e.source = $source`;
@@ -95,9 +94,11 @@ export async function getRecentEpisodes(params: {
}
const query = `
- MATCH (e:Episode)
+ MATCH (e:Episode{userId: $userId})
${filters}
- RETURN e
+ MATCH (e)-[:HAS_PROVENANCE]->(s:Statement)
+ WHERE s.invalidAt IS NULL
+ RETURN DISTINCT e
ORDER BY e.validAt DESC
LIMIT ${params.limit}
`;
@@ -126,6 +127,7 @@ export async function getRecentEpisodes(params: {
userId: episode.userId,
space: episode.space,
sessionId: episode.sessionId,
+ documentId: episode.documentId,
};
});
}
@@ -170,6 +172,7 @@ export async function searchEpisodesByEmbedding(params: {
? JSON.parse(episode.attributesJson)
: {},
userId: episode.userId,
+ documentId: episode.documentId,
};
});
}
@@ -307,6 +310,7 @@ export async function getEpisodeStatements(params: {
}) {
const query = `
MATCH (episode:Episode {uuid: $episodeUuid, userId: $userId})-[:HAS_PROVENANCE]->(stmt:Statement)
+ WHERE stmt.invalidAt IS NULL
RETURN stmt
`;
diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts
index d7019b6..ac69c79 100644
--- a/apps/webapp/app/services/knowledgeGraph.server.ts
+++ b/apps/webapp/app/services/knowledgeGraph.server.ts
@@ -6,6 +6,8 @@ import {
type EpisodicNode,
type StatementNode,
type Triple,
+ EpisodeTypeEnum,
+ type EpisodeType,
} from "@core/types";
import { logger } from "./logger.service";
import { ClusteringService } from "./clustering.server";
@@ -42,13 +44,14 @@ import {
searchStatementsByEmbedding,
} from "./graphModels/statement";
import { getEmbedding, makeModelCall } from "~/lib/model.server";
+import { runQuery } from "~/lib/neo4j.server";
import {
Apps,
getNodeTypes,
getNodeTypesString,
isPresetType,
} from "~/utils/presets/nodes";
-import { normalizePrompt } from "./prompts";
+import { normalizePrompt, normalizeDocumentPrompt } from "./prompts";
import { type PrismaClient } from "@prisma/client";
// Default number of previous episodes to retrieve for context
@@ -65,6 +68,162 @@ export class KnowledgeGraphService {
return getEmbedding(text);
}
+ /**
+ * Invalidate statements from a previous document version that are no longer supported
+ * by the new document content using semantic similarity analysis
+ */
+ async invalidateStatementsFromPreviousDocumentVersion(params: {
+ previousDocumentUuid: string;
+ newDocumentContent: string;
+ userId: string;
+ invalidatedBy: string;
+ semanticSimilarityThreshold?: number;
+ }): Promise<{
+ invalidatedStatements: string[];
+ preservedStatements: string[];
+ totalStatementsAnalyzed: number;
+ }> {
+ const threshold = params.semanticSimilarityThreshold || 0.75; // Lower threshold for document-level analysis
+ const invalidatedStatements: string[] = [];
+ const preservedStatements: string[] = [];
+
+ // Step 1: Get all statements from the previous document version
+ const previousStatements = await this.getStatementsFromDocument(
+ params.previousDocumentUuid,
+ params.userId,
+ );
+
+ if (previousStatements.length === 0) {
+ return {
+ invalidatedStatements: [],
+ preservedStatements: [],
+ totalStatementsAnalyzed: 0,
+ };
+ }
+
+ logger.log(
+ `Analyzing ${previousStatements.length} statements from previous document version`,
+ );
+
+ // Step 2: Generate embedding for new document content
+ const newDocumentEmbedding = await this.getEmbedding(
+ params.newDocumentContent,
+ );
+
+ // Step 3: For each statement, check if it's still semantically supported by new content
+ for (const statement of previousStatements) {
+ try {
+ // Generate embedding for the statement fact
+ const statementEmbedding = await this.getEmbedding(statement.fact);
+
+ // Calculate semantic similarity between statement and new document
+ const semanticSimilarity = this.calculateCosineSimilarity(
+ statementEmbedding,
+ newDocumentEmbedding,
+ );
+
+ if (semanticSimilarity < threshold) {
+ invalidatedStatements.push(statement.uuid);
+ logger.log(
+ `Invalidating statement: "${statement.fact}" (similarity: ${semanticSimilarity.toFixed(3)})`,
+ );
+ } else {
+ preservedStatements.push(statement.uuid);
+ logger.log(
+ `Preserving statement: "${statement.fact}" (similarity: ${semanticSimilarity.toFixed(3)})`,
+ );
+ }
+ } catch (error) {
+ logger.error(`Error analyzing statement ${statement.uuid}:`, { error });
+ // On error, be conservative and invalidate
+ invalidatedStatements.push(statement.uuid);
+ }
+ }
+
+ // Step 4: Bulk invalidate the selected statements
+ if (invalidatedStatements.length > 0) {
+ await invalidateStatements({
+ statementIds: invalidatedStatements,
+ invalidatedBy: params.invalidatedBy,
+ });
+
+ logger.log(`Document-level invalidation completed`, {
+ previousDocumentUuid: params.previousDocumentUuid,
+ totalAnalyzed: previousStatements.length,
+ invalidated: invalidatedStatements.length,
+ preserved: preservedStatements.length,
+ threshold,
+ });
+ }
+
+ return {
+ invalidatedStatements,
+ preservedStatements,
+ totalStatementsAnalyzed: previousStatements.length,
+ };
+ }
+
+ /**
+ * Get all statements that were created from episodes linked to a specific document
+ */
+ private async getStatementsFromDocument(
+ documentUuid: string,
+ userId: string,
+ ): Promise {
+ const query = `
+ MATCH (doc:Document {uuid: $documentUuid, userId: $userId})-[:CONTAINS_CHUNK]->(episode:Episode)
+ MATCH (episode)-[:HAS_PROVENANCE]->(stmt:Statement)
+ RETURN stmt
+ `;
+
+ const result = await runQuery(query, {
+ documentUuid,
+ userId,
+ });
+
+ return result.map((record) => {
+ const stmt = record.get("stmt").properties;
+ return {
+ uuid: stmt.uuid,
+ fact: stmt.fact,
+ factEmbedding: stmt.factEmbedding || [],
+ createdAt: new Date(stmt.createdAt),
+ validAt: new Date(stmt.validAt),
+ invalidAt: stmt.invalidAt ? new Date(stmt.invalidAt) : null,
+ attributes: stmt.attributesJson ? JSON.parse(stmt.attributesJson) : {},
+ userId: stmt.userId,
+ };
+ });
+ }
+
+ /**
+ * Calculate cosine similarity between two embedding vectors
+ */
+ private calculateCosineSimilarity(vecA: number[], vecB: number[]): number {
+ if (vecA.length !== vecB.length) {
+ throw new Error("Vector dimensions must match");
+ }
+
+ let dotProduct = 0;
+ let normA = 0;
+ let normB = 0;
+
+ for (let i = 0; i < vecA.length; i++) {
+ dotProduct += vecA[i] * vecB[i];
+ normA += vecA[i] * vecA[i];
+ normB += vecB[i] * vecB[i];
+ }
+
+ normA = Math.sqrt(normA);
+ normB = Math.sqrt(normB);
+
+ if (normA === 0 || normB === 0) {
+ return 0;
+ }
+
+ return dotProduct / (normA * normB);
+ }
+
/**
* Process an episode and update the knowledge graph.
*
@@ -110,6 +269,7 @@ export class KnowledgeGraphService {
prisma,
new Date(params.referenceTime),
sessionContext,
+ params.type,
);
const normalizedTime = Date.now() - startTime;
@@ -251,9 +411,9 @@ export class KnowledgeGraphService {
logger.log(`Saved triples in ${saveTriplesTime - updatedTriplesTime} ms`);
// Invalidate invalidated statements
- await invalidateStatements({
- statementIds: invalidatedStatements,
- invalidatedBy: episode.uuid
+ await invalidateStatements({
+ statementIds: invalidatedStatements,
+ invalidatedBy: episode.uuid,
});
const endTime = Date.now();
@@ -1146,6 +1306,7 @@ export class KnowledgeGraphService {
prisma: PrismaClient,
episodeTimestamp?: Date,
sessionContext?: string,
+ contentType?: EpisodeType,
) {
let appEnumValues: Apps[] = [];
if (Apps[source.toUpperCase() as keyof typeof Apps]) {
@@ -1171,7 +1332,12 @@ export class KnowledgeGraphService {
episodeTimestamp?.toISOString() || new Date().toISOString(),
sessionContext,
};
- const messages = normalizePrompt(context);
+
+ // Route to appropriate normalization prompt based on content type
+ const messages =
+ contentType === EpisodeTypeEnum.DOCUMENT
+ ? normalizeDocumentPrompt(context)
+ : normalizePrompt(context);
let responseText = "";
await makeModelCall(false, messages, (text) => {
responseText = text;
diff --git a/apps/webapp/app/services/postAuth.server.ts b/apps/webapp/app/services/postAuth.server.ts
index 7c91726..d95026c 100644
--- a/apps/webapp/app/services/postAuth.server.ts
+++ b/apps/webapp/app/services/postAuth.server.ts
@@ -1,6 +1,5 @@
import type { User } from "~/models/user.server";
import { createWorkspace } from "~/models/workspace.server";
-import { singleton } from "~/utils/singleton";
export async function postAuthentication({
user,
diff --git a/apps/webapp/app/services/prompts/normalize.ts b/apps/webapp/app/services/prompts/normalize.ts
index babe6bf..6b92ee8 100644
--- a/apps/webapp/app/services/prompts/normalize.ts
+++ b/apps/webapp/app/services/prompts/normalize.ts
@@ -262,3 +262,139 @@ ${context.relatedMemories}
{ role: "user", content: userPrompt },
];
};
+
+export const normalizeDocumentPrompt = (
+ context: Record,
+): CoreMessage[] => {
+ const sysPrompt = `You are C.O.R.E. (Contextual Observation & Recall Engine), a document memory processing system.
+
+Transform this document content into enriched factual statements for knowledge graph storage.
+
+
+Focus on STRUCTURED CONTENT EXTRACTION optimized for documents:
+
+1. FACTUAL PRESERVATION - Extract concrete facts, data, and information
+2. STRUCTURAL AWARENESS - Preserve document hierarchy, lists, tables, code blocks
+3. CROSS-REFERENCE HANDLING - Maintain internal document references and connections
+4. TECHNICAL CONTENT - Handle specialized terminology, code, formulas, diagrams
+5. CONTEXTUAL CHUNKING - This content is part of a larger document, maintain coherence
+
+DOCUMENT-SPECIFIC ENRICHMENT:
+- Preserve technical accuracy and specialized vocabulary
+- Extract structured data (lists, tables, procedures, specifications)
+- Maintain hierarchical relationships (sections, subsections, bullet points)
+- Handle code blocks, formulas, and technical diagrams
+- Capture cross-references and internal document links
+- Preserve authorship, citations, and source attributions
+
+
+
+Handle various document formats:
+- Technical documentation and specifications
+- Research papers and academic content
+- Code documentation and API references
+- Business documents and reports
+- Notes and knowledge base articles
+- Structured content (wikis, blogs, guides)
+
+
+
+For document content, convert relative time references using document timestamp:
+- Publication dates, modification dates, version information
+- Time-sensitive information within the document content
+- Historical context and chronological information
+
+
+
+${context.entityTypes}
+
+
+
+${
+ context.ingestionRules
+ ? `Apply these rules for content from ${context.source}:
+${context.ingestionRules}
+
+CRITICAL: If content does NOT satisfy these rules, respond with "NOTHING_TO_REMEMBER" regardless of other criteria.`
+ : "No specific ingestion rules defined for this source."
+}
+
+
+
+RETURN "NOTHING_TO_REMEMBER" if content consists ONLY of:
+- Navigation elements or UI text
+- Copyright notices and boilerplate
+- Empty sections or placeholder text
+- Pure formatting markup without content
+- Table of contents without substance
+- Repetitive headers without content
+
+STORE IN MEMORY for document content containing:
+- Factual information and data
+- Technical specifications and procedures
+- Structured knowledge and explanations
+- Code examples and implementations
+- Research findings and conclusions
+- Process descriptions and workflows
+- Reference information and definitions
+- Analysis, insights, and documented decisions
+
+
+
+TECHNICAL CONTENT:
+- Original: "The API returns a 200 status code on success"
+- Enriched: "On June 15, 2024, the REST API documentation specifies that successful requests return HTTP status code 200."
+
+STRUCTURED CONTENT:
+- Original: "Step 1: Initialize the database\nStep 2: Run migrations"
+- Enriched: "On June 15, 2024, the deployment guide outlines a two-step process: first initialize the database, then run migrations."
+
+CROSS-REFERENCE:
+- Original: "As mentioned in Section 3, the algorithm complexity is O(n)"
+- Enriched: "On June 15, 2024, the algorithm analysis document confirms O(n) time complexity, referencing the detailed explanation in Section 3."
+
+
+CRITICAL OUTPUT FORMAT REQUIREMENT:
+You MUST wrap your response in