From 5b0dd7d4a78251807a53a448c7437e36d12e8aa2 Mon Sep 17 00:00:00 2001 From: Manoj K Date: Sun, 31 Aug 2025 08:52:17 +0530 Subject: [PATCH] Feat: add documents to the kg --- apps/webapp/app/lib/ingest.server.ts | 42 ++- apps/webapp/app/lib/neo4j.server.ts | 6 + apps/webapp/app/routes/api.v1.activity.tsx | 2 + apps/webapp/app/routes/api.v1.mcp.memory.tsx | 2 + .../routes/home.space.$spaceId.patterns.tsx | 2 + apps/webapp/app/routes/onboarding.tsx | 2 + .../app/services/documentChunker.server.ts | 262 ++++++++++++++++++ .../app/services/graphModels/document.ts | 151 ++++++++++ .../app/services/graphModels/episode.ts | 10 +- .../app/services/knowledgeGraph.server.ts | 21 +- apps/webapp/app/services/prompts/normalize.ts | 136 +++++++++ .../app/trigger/ingest/ingest-document.ts | 154 ++++++++++ apps/webapp/app/trigger/ingest/ingest.ts | 40 ++- .../app/trigger/spaces/space-assignment.ts | 23 +- .../webapp/app/trigger/utils/message-utils.ts | 3 +- apps/webapp/app/utils/mcp/memory.ts | 2 + apps/webapp/package.json | 5 +- apps/webapp/server.js | 4 +- packages/types/src/graph/graph.entity.ts | 40 ++- pnpm-lock.yaml | 8 + 20 files changed, 876 insertions(+), 39 deletions(-) create mode 100644 apps/webapp/app/services/documentChunker.server.ts create mode 100644 apps/webapp/app/services/graphModels/document.ts create mode 100644 apps/webapp/app/trigger/ingest/ingest-document.ts diff --git a/apps/webapp/app/lib/ingest.server.ts b/apps/webapp/app/lib/ingest.server.ts index a578875..2b791a2 100644 --- a/apps/webapp/app/lib/ingest.server.ts +++ b/apps/webapp/app/lib/ingest.server.ts @@ -1,8 +1,10 @@ // lib/ingest.queue.ts import { IngestionStatus } from "@core/database"; +import { EpisodeType } from "@core/types"; import { type z } from "zod"; import { prisma } from "~/db.server"; import { type IngestBodyRequest, ingestTask } from "~/trigger/ingest/ingest"; +import { ingestDocumentTask } from "~/trigger/ingest/ingest-document"; export const addToQueue = async ( body: z.infer, @@ -35,16 +37,38 @@ export const addToQueue = async ( }, }); - const handler = await ingestTask.trigger( - { body, userId, workspaceId: user.Workspace.id, queueId: queuePersist.id }, - { - queue: "ingestion-queue", - concurrencyKey: userId, - tags: [user.id, queuePersist.id], - }, - ); + let handler; + if (body.type === EpisodeType.DOCUMENT) { + handler = await ingestDocumentTask.trigger( + { + body, + userId, + workspaceId: user.Workspace.id, + queueId: queuePersist.id, + }, + { + queue: "document-ingestion-queue", + concurrencyKey: userId, + tags: [user.id, queuePersist.id], + }, + ); + } else if (body.type === EpisodeType.CONVERSATION) { + handler = await ingestTask.trigger( + { + body, + userId, + workspaceId: user.Workspace.id, + queueId: queuePersist.id, + }, + { + queue: "ingestion-queue", + concurrencyKey: userId, + tags: [user.id, queuePersist.id], + }, + ); + } - return { id: handler.id, token: handler.publicAccessToken }; + return { id: handler?.id, token: handler?.publicAccessToken }; }; export { IngestBodyRequest }; diff --git a/apps/webapp/app/lib/neo4j.server.ts b/apps/webapp/app/lib/neo4j.server.ts index 8c067a5..e0595f2 100644 --- a/apps/webapp/app/lib/neo4j.server.ts +++ b/apps/webapp/app/lib/neo4j.server.ts @@ -355,6 +355,12 @@ const initializeSchema = async () => { await runQuery( "CREATE INDEX entity_user_uuid IF NOT EXISTS FOR (n:Entity) ON (n.userId, n.uuid)", ); + await runQuery( + "CREATE INDEX episode_user_uuid IF NOT EXISTS FOR (n:Episode) ON (n.userId, n.uuid)", + ); + await runQuery( + "CREATE INDEX episode_user_id IF NOT EXISTS FOR (n:Episode) ON (n.userId)", + ); // Create vector indexes for semantic search (if using Neo4j 5.0+) await runQuery(` diff --git a/apps/webapp/app/routes/api.v1.activity.tsx b/apps/webapp/app/routes/api.v1.activity.tsx index a741bdf..a8a6e86 100644 --- a/apps/webapp/app/routes/api.v1.activity.tsx +++ b/apps/webapp/app/routes/api.v1.activity.tsx @@ -6,6 +6,7 @@ import { addToQueue } from "~/lib/ingest.server"; import { prisma } from "~/db.server"; import { logger } from "~/services/logger.service"; import { triggerWebhookDelivery } from "~/trigger/webhooks/webhook-delivery"; +import { EpisodeTypeEnum } from "@core/types"; const ActivityCreateSchema = z.object({ text: z.string().min(1, "Text is required"), @@ -56,6 +57,7 @@ const { action, loader } = createActionApiRoute( episodeBody: body.text, referenceTime: new Date().toISOString(), source: body.source, + type: EpisodeTypeEnum.CONVERSATION, }; const queueResponse = await addToQueue( diff --git a/apps/webapp/app/routes/api.v1.mcp.memory.tsx b/apps/webapp/app/routes/api.v1.mcp.memory.tsx index 24d4858..50aeac4 100644 --- a/apps/webapp/app/routes/api.v1.mcp.memory.tsx +++ b/apps/webapp/app/routes/api.v1.mcp.memory.tsx @@ -9,6 +9,7 @@ import { addToQueue } from "~/lib/ingest.server"; import { SearchService } from "~/services/search.server"; import { handleTransport } from "~/utils/mcp"; import { SpaceService } from "~/services/space.server"; +import { EpisodeTypeEnum } from "@core/types"; // Map to store transports by session ID with cleanup tracking const transports: { @@ -124,6 +125,7 @@ const handleMCPRequest = async ( episodeBody: args.message, referenceTime: new Date().toISOString(), source, + type: EpisodeTypeEnum.CONVERSATION, }, userId, ); diff --git a/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx b/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx index c1da349..31d511b 100644 --- a/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx +++ b/apps/webapp/app/routes/home.space.$spaceId.patterns.tsx @@ -11,6 +11,7 @@ import { SpacePattern } from "~/services/spacePattern.server"; import { addToQueue } from "~/lib/ingest.server"; import { redirect } from "@remix-run/node"; import { SpaceService } from "~/services/space.server"; +import { EpisodeTypeEnum } from "@core/types"; export async function loader({ request, params }: LoaderFunctionArgs) { const workspace = await requireWorkpace(request); @@ -68,6 +69,7 @@ export async function action({ request, params }: ActionFunctionArgs) { }, source: space.name, spaceId: space.id, + type: EpisodeTypeEnum.CONVERSATION, }, userId, ); diff --git a/apps/webapp/app/routes/onboarding.tsx b/apps/webapp/app/routes/onboarding.tsx index 7b8caa9..63ce074 100644 --- a/apps/webapp/app/routes/onboarding.tsx +++ b/apps/webapp/app/routes/onboarding.tsx @@ -26,6 +26,7 @@ import { updateUser } from "~/models/user.server"; import { Copy, Check } from "lucide-react"; import { addToQueue } from "~/lib/ingest.server"; import { cn } from "~/lib/utils"; +import { EpisodeTypeEnum } from "@core/types"; const ONBOARDING_STEP_COOKIE = "onboardingStep"; const onboardingStepCookie = createCookie(ONBOARDING_STEP_COOKIE, { @@ -75,6 +76,7 @@ export async function action({ request }: ActionFunctionArgs) { source: "Core", episodeBody: aboutUser, referenceTime: new Date().toISOString(), + type: EpisodeTypeEnum.CONVERSATION, }, userId, ); diff --git a/apps/webapp/app/services/documentChunker.server.ts b/apps/webapp/app/services/documentChunker.server.ts new file mode 100644 index 0000000..36e9cf4 --- /dev/null +++ b/apps/webapp/app/services/documentChunker.server.ts @@ -0,0 +1,262 @@ +import { encode } from "gpt-tokenizer"; +import crypto from "crypto"; + +export interface DocumentChunk { + content: string; + chunkIndex: number; + title?: string; + context?: string; + startPosition: number; + endPosition: number; +} + +export interface ChunkedDocument { + documentId: string; + title: string; + originalContent: string; + chunks: DocumentChunk[]; + totalChunks: number; +} + +/** + * Document chunking service that splits large documents into semantic chunks + * Targets 10-15k tokens per chunk with natural paragraph boundaries + */ +export class DocumentChunker { + private readonly TARGET_CHUNK_SIZE = 12500; // Middle of 10-15k range + private readonly MIN_CHUNK_SIZE = 10000; + private readonly MAX_CHUNK_SIZE = 15000; + private readonly MIN_PARAGRAPH_SIZE = 100; // Minimum tokens for a paragraph to be considered + + /** + * Chunk a document into semantic sections with natural boundaries + */ + async chunkDocument( + originalContent: string, + title: string, + ): Promise { + const documentId = crypto.randomUUID(); + + // First, split by major section headers (markdown style) + const majorSections = this.splitByMajorSections(originalContent); + + const chunks: DocumentChunk[] = []; + let currentChunk = ""; + let currentChunkStart = 0; + let chunkIndex = 0; + + for (const section of majorSections) { + const sectionTokens = encode(section.content).length; + const currentChunkTokens = encode(currentChunk).length; + + // If adding this section would exceed max size, finalize current chunk + if (currentChunkTokens > 0 && currentChunkTokens + sectionTokens > this.MAX_CHUNK_SIZE) { + if (currentChunkTokens >= this.MIN_CHUNK_SIZE) { + chunks.push(this.createChunk( + currentChunk, + chunkIndex, + currentChunkStart, + currentChunkStart + currentChunk.length, + section.title + )); + chunkIndex++; + currentChunk = ""; + currentChunkStart = section.startPosition; + } + } + + // Add section to current chunk + if (currentChunk) { + currentChunk += "\n\n" + section.content; + } else { + currentChunk = section.content; + currentChunkStart = section.startPosition; + } + + // If current chunk is large enough and we have a natural break, consider chunking + const updatedChunkTokens = encode(currentChunk).length; + if (updatedChunkTokens >= this.TARGET_CHUNK_SIZE) { + // Try to find a good breaking point within the section + const paragraphs = this.splitIntoParagraphs(section.content); + if (paragraphs.length > 1) { + // Split at paragraph boundary if beneficial + const optimalSplit = this.findOptimalParagraphSplit(currentChunk); + if (optimalSplit) { + chunks.push(this.createChunk( + optimalSplit.beforeSplit, + chunkIndex, + currentChunkStart, + currentChunkStart + optimalSplit.beforeSplit.length, + section.title + )); + chunkIndex++; + currentChunk = optimalSplit.afterSplit; + currentChunkStart = currentChunkStart + optimalSplit.beforeSplit.length; + } + } + } + } + + // Add remaining content as final chunk + if (currentChunk.trim() && encode(currentChunk).length >= this.MIN_PARAGRAPH_SIZE) { + chunks.push(this.createChunk( + currentChunk, + chunkIndex, + currentChunkStart, + originalContent.length + )); + } + + return { + documentId, + title, + originalContent, + chunks, + totalChunks: chunks.length, + }; + } + + private splitByMajorSections(content: string): Array<{ + content: string; + title?: string; + startPosition: number; + endPosition: number; + }> { + const sections: Array<{ + content: string; + title?: string; + startPosition: number; + endPosition: number; + }> = []; + + // Split by markdown headers (# ## ### etc.) or common document patterns + const headerRegex = /^(#{1,6}\s+.*$|={3,}$|-{3,}$)/gm; + const matches = Array.from(content.matchAll(headerRegex)); + + if (matches.length === 0) { + // No headers found, treat as single section + sections.push({ + content: content.trim(), + startPosition: 0, + endPosition: content.length, + }); + return sections; + } + + let lastIndex = 0; + + for (let i = 0; i < matches.length; i++) { + const match = matches[i]; + const nextMatch = matches[i + 1]; + + const sectionStart = lastIndex; + const sectionEnd = nextMatch ? nextMatch.index! : content.length; + + const sectionContent = content.slice(sectionStart, sectionEnd).trim(); + + if (sectionContent) { + sections.push({ + content: sectionContent, + title: this.extractSectionTitle(match[0]), + startPosition: sectionStart, + endPosition: sectionEnd, + }); + } + + lastIndex = match.index! + match[0].length; + } + + return sections; + } + + private extractSectionTitle(header: string): string | undefined { + // Extract title from markdown header + const markdownMatch = header.match(/^#{1,6}\s+(.+)$/); + if (markdownMatch) { + return markdownMatch[1].trim(); + } + return undefined; + } + + private splitIntoParagraphs(content: string): string[] { + // Split by double newlines (paragraph breaks) and filter out empty strings + return content + .split(/\n\s*\n/) + .map(p => p.trim()) + .filter(p => p.length > 0); + } + + private findOptimalParagraphSplit(content: string): { + beforeSplit: string; + afterSplit: string; + } | null { + const paragraphs = this.splitIntoParagraphs(content); + if (paragraphs.length < 2) return null; + + let bestSplitIndex = -1; + let bestScore = 0; + + // Find the split that gets us closest to target size + for (let i = 1; i < paragraphs.length; i++) { + const beforeSplit = paragraphs.slice(0, i).join("\n\n"); + const afterSplit = paragraphs.slice(i).join("\n\n"); + + const beforeTokens = encode(beforeSplit).length; + const afterTokens = encode(afterSplit).length; + + // Score based on how close we get to target, avoiding too small chunks + if (beforeTokens >= this.MIN_CHUNK_SIZE && afterTokens >= this.MIN_PARAGRAPH_SIZE) { + const beforeDistance = Math.abs(beforeTokens - this.TARGET_CHUNK_SIZE); + const score = 1 / (1 + beforeDistance); // Higher score for closer to target + + if (score > bestScore) { + bestScore = score; + bestSplitIndex = i; + } + } + } + + if (bestSplitIndex > 0) { + return { + beforeSplit: paragraphs.slice(0, bestSplitIndex).join("\n\n"), + afterSplit: paragraphs.slice(bestSplitIndex).join("\n\n"), + }; + } + + return null; + } + + private createChunk( + content: string, + chunkIndex: number, + startPosition: number, + endPosition: number, + title?: string + ): DocumentChunk { + // Generate a concise context/title if not provided + const context = title || this.generateChunkContext(content); + + return { + content: content.trim(), + chunkIndex, + title: context, + context: `Chunk ${chunkIndex + 1}${context ? `: ${context}` : ""}`, + startPosition, + endPosition, + }; + } + + private generateChunkContext(content: string): string { + // Extract first meaningful line as context (avoiding markdown syntax) + const lines = content.split('\n').map(line => line.trim()).filter(Boolean); + + for (const line of lines.slice(0, 3)) { + // Skip markdown headers and find first substantial content + if (!line.match(/^#{1,6}\s/) && !line.match(/^[=-]{3,}$/) && line.length > 10) { + return line.substring(0, 100) + (line.length > 100 ? "..." : ""); + } + } + + return "Document content"; + } +} \ No newline at end of file diff --git a/apps/webapp/app/services/graphModels/document.ts b/apps/webapp/app/services/graphModels/document.ts new file mode 100644 index 0000000..a35a9fc --- /dev/null +++ b/apps/webapp/app/services/graphModels/document.ts @@ -0,0 +1,151 @@ +import { runQuery } from "~/lib/neo4j.server"; +import type { DocumentNode } from "@core/types"; + +export async function saveDocument(document: DocumentNode): Promise { + const query = ` + MERGE (d:Document {uuid: $uuid}) + ON CREATE SET + d.title = $title, + d.originalContent = $originalContent, + d.metadata = $metadata, + d.source = $source, + d.userId = $userId, + d.createdAt = $createdAt, + d.validAt = $validAt, + d.totalChunks = $totalChunks, + d.documentId = $documentId, + d.sessionId = $sessionId + ON MATCH SET + d.title = $title, + d.originalContent = $originalContent, + d.metadata = $metadata, + d.source = $source, + d.validAt = $validAt, + d.totalChunks = $totalChunks, + d.documentId = $documentId, + d.sessionId = $sessionId + RETURN d.uuid as uuid + `; + + const params = { + uuid: document.uuid, + title: document.title, + originalContent: document.originalContent, + metadata: JSON.stringify(document.metadata || {}), + source: document.source, + userId: document.userId || null, + createdAt: document.createdAt.toISOString(), + validAt: document.validAt.toISOString(), + totalChunks: document.totalChunks || 0, + documentId: document.documentId || null, + sessionId: document.sessionId || null, + }; + + const result = await runQuery(query, params); + return result[0].get("uuid"); +} + +export async function linkEpisodeToDocument( + episodeUuid: string, + documentUuid: string, + chunkIndex: number, +): Promise { + const query = ` + MATCH (e:Episode {uuid: $episodeUuid}) + MATCH (d:Document {uuid: $documentUuid}) + MERGE (d)-[r:CONTAINS_CHUNK {chunkIndex: $chunkIndex}]->(e) + SET e.documentId = $documentUuid, + e.chunkIndex = $chunkIndex + RETURN r + `; + + const params = { + episodeUuid, + documentUuid, + chunkIndex, + }; + + await runQuery(query, params); +} + +export async function getDocument( + documentUuid: string, +): Promise { + const query = ` + MATCH (d:Document {uuid: $uuid}) + RETURN d + `; + + const params = { uuid: documentUuid }; + const result = await runQuery(query, params); + + if (result.length === 0) return null; + + const record = result[0]; + const documentNode = record.get("d"); + + return { + uuid: documentNode.properties.uuid, + title: documentNode.properties.title, + originalContent: documentNode.properties.originalContent, + metadata: JSON.parse(documentNode.properties.metadata || "{}"), + source: documentNode.properties.source, + userId: documentNode.properties.userId, + createdAt: new Date(documentNode.properties.createdAt), + validAt: new Date(documentNode.properties.validAt), + totalChunks: documentNode.properties.totalChunks, + }; +} + +export async function getDocumentEpisodes(documentUuid: string): Promise< + Array<{ + episodeUuid: string; + chunkIndex: number; + content: string; + }> +> { + const query = ` + MATCH (d:Document {uuid: $uuid})-[r:CONTAINS_CHUNK]->(e:Episode) + RETURN e.uuid as episodeUuid, r.chunkIndex as chunkIndex, e.content as content + ORDER BY r.chunkIndex ASC + `; + + const params = { uuid: documentUuid }; + const result = await runQuery(query, params); + + return result.map((record) => ({ + episodeUuid: record.get("episodeUuid"), + chunkIndex: record.get("chunkIndex"), + content: record.get("content"), + })); +} + +export async function getUserDocuments( + userId: string, + limit: number = 50, +): Promise { + const query = ` + MATCH (d:Document {userId: $userId}) + RETURN d + ORDER BY d.createdAt DESC + LIMIT $limit + `; + + const params = { userId, limit }; + const result = await runQuery(query, params); + + return result.map((record) => { + const documentNode = record.get("d"); + return { + uuid: documentNode.properties.uuid, + title: documentNode.properties.title, + originalContent: documentNode.properties.originalContent, + metadata: JSON.parse(documentNode.properties.metadata || "{}"), + source: documentNode.properties.source, + userId: documentNode.properties.userId, + createdAt: new Date(documentNode.properties.createdAt), + validAt: new Date(documentNode.properties.validAt), + totalChunks: documentNode.properties.totalChunks, + }; + }); +} diff --git a/apps/webapp/app/services/graphModels/episode.ts b/apps/webapp/app/services/graphModels/episode.ts index 634ae85..b721d99 100644 --- a/apps/webapp/app/services/graphModels/episode.ts +++ b/apps/webapp/app/services/graphModels/episode.ts @@ -1,5 +1,5 @@ import { runQuery } from "~/lib/neo4j.server"; -import type { EntityNode, EpisodicNode } from "@core/types"; +import { type EntityNode, EpisodeType, type EpisodicNode } from "@core/types"; export async function saveEpisode(episode: EpisodicNode): Promise { const query = ` @@ -82,6 +82,8 @@ export async function getRecentEpisodes(params: { userId: string; source?: string; sessionId?: string; + type?: EpisodeType; + documentId?: string; }): Promise { let filters = `WHERE e.validAt <= $referenceTime AND e.userId = $userId`; @@ -90,10 +92,14 @@ export async function getRecentEpisodes(params: { filters += `\nAND e.source = $source`; } - if (params.sessionId) { + if (params.type === EpisodeType.CONVERSATION && params.sessionId) { filters += `\nAND e.sessionId = $sessionId`; } + if (params.type === EpisodeType.DOCUMENT && params.documentId) { + filters += `\nAND e.documentId = $documentId`; + } + const query = ` MATCH (e:Episode) ${filters} diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index d7019b6..aa05d3d 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -6,6 +6,8 @@ import { type EpisodicNode, type StatementNode, type Triple, + EpisodeTypeEnum, + type EpisodeType, } from "@core/types"; import { logger } from "./logger.service"; import { ClusteringService } from "./clustering.server"; @@ -48,7 +50,7 @@ import { getNodeTypesString, isPresetType, } from "~/utils/presets/nodes"; -import { normalizePrompt } from "./prompts"; +import { normalizePrompt, normalizeDocumentPrompt } from "./prompts"; import { type PrismaClient } from "@prisma/client"; // Default number of previous episodes to retrieve for context @@ -90,6 +92,8 @@ export class KnowledgeGraphService { userId: params.userId, source: params.source, sessionId: params.sessionId, + type: params.type, + documentId: params.documentId, }); // Format session context from previous episodes @@ -110,6 +114,7 @@ export class KnowledgeGraphService { prisma, new Date(params.referenceTime), sessionContext, + params.type, ); const normalizedTime = Date.now() - startTime; @@ -251,9 +256,9 @@ export class KnowledgeGraphService { logger.log(`Saved triples in ${saveTriplesTime - updatedTriplesTime} ms`); // Invalidate invalidated statements - await invalidateStatements({ - statementIds: invalidatedStatements, - invalidatedBy: episode.uuid + await invalidateStatements({ + statementIds: invalidatedStatements, + invalidatedBy: episode.uuid, }); const endTime = Date.now(); @@ -1146,6 +1151,7 @@ export class KnowledgeGraphService { prisma: PrismaClient, episodeTimestamp?: Date, sessionContext?: string, + contentType?: EpisodeType, ) { let appEnumValues: Apps[] = []; if (Apps[source.toUpperCase() as keyof typeof Apps]) { @@ -1171,7 +1177,12 @@ export class KnowledgeGraphService { episodeTimestamp?.toISOString() || new Date().toISOString(), sessionContext, }; - const messages = normalizePrompt(context); + + // Route to appropriate normalization prompt based on content type + const messages = + contentType === EpisodeTypeEnum.DOCUMENT + ? normalizeDocumentPrompt(context) + : normalizePrompt(context); let responseText = ""; await makeModelCall(false, messages, (text) => { responseText = text; diff --git a/apps/webapp/app/services/prompts/normalize.ts b/apps/webapp/app/services/prompts/normalize.ts index babe6bf..6b92ee8 100644 --- a/apps/webapp/app/services/prompts/normalize.ts +++ b/apps/webapp/app/services/prompts/normalize.ts @@ -262,3 +262,139 @@ ${context.relatedMemories} { role: "user", content: userPrompt }, ]; }; + +export const normalizeDocumentPrompt = ( + context: Record, +): CoreMessage[] => { + const sysPrompt = `You are C.O.R.E. (Contextual Observation & Recall Engine), a document memory processing system. + +Transform this document content into enriched factual statements for knowledge graph storage. + + +Focus on STRUCTURED CONTENT EXTRACTION optimized for documents: + +1. FACTUAL PRESERVATION - Extract concrete facts, data, and information +2. STRUCTURAL AWARENESS - Preserve document hierarchy, lists, tables, code blocks +3. CROSS-REFERENCE HANDLING - Maintain internal document references and connections +4. TECHNICAL CONTENT - Handle specialized terminology, code, formulas, diagrams +5. CONTEXTUAL CHUNKING - This content is part of a larger document, maintain coherence + +DOCUMENT-SPECIFIC ENRICHMENT: +- Preserve technical accuracy and specialized vocabulary +- Extract structured data (lists, tables, procedures, specifications) +- Maintain hierarchical relationships (sections, subsections, bullet points) +- Handle code blocks, formulas, and technical diagrams +- Capture cross-references and internal document links +- Preserve authorship, citations, and source attributions + + + +Handle various document formats: +- Technical documentation and specifications +- Research papers and academic content +- Code documentation and API references +- Business documents and reports +- Notes and knowledge base articles +- Structured content (wikis, blogs, guides) + + + +For document content, convert relative time references using document timestamp: +- Publication dates, modification dates, version information +- Time-sensitive information within the document content +- Historical context and chronological information + + + +${context.entityTypes} + + + +${ + context.ingestionRules + ? `Apply these rules for content from ${context.source}: +${context.ingestionRules} + +CRITICAL: If content does NOT satisfy these rules, respond with "NOTHING_TO_REMEMBER" regardless of other criteria.` + : "No specific ingestion rules defined for this source." +} + + + +RETURN "NOTHING_TO_REMEMBER" if content consists ONLY of: +- Navigation elements or UI text +- Copyright notices and boilerplate +- Empty sections or placeholder text +- Pure formatting markup without content +- Table of contents without substance +- Repetitive headers without content + +STORE IN MEMORY for document content containing: +- Factual information and data +- Technical specifications and procedures +- Structured knowledge and explanations +- Code examples and implementations +- Research findings and conclusions +- Process descriptions and workflows +- Reference information and definitions +- Analysis, insights, and documented decisions + + + +TECHNICAL CONTENT: +- Original: "The API returns a 200 status code on success" +- Enriched: "On June 15, 2024, the REST API documentation specifies that successful requests return HTTP status code 200." + +STRUCTURED CONTENT: +- Original: "Step 1: Initialize the database\nStep 2: Run migrations" +- Enriched: "On June 15, 2024, the deployment guide outlines a two-step process: first initialize the database, then run migrations." + +CROSS-REFERENCE: +- Original: "As mentioned in Section 3, the algorithm complexity is O(n)" +- Enriched: "On June 15, 2024, the algorithm analysis document confirms O(n) time complexity, referencing the detailed explanation in Section 3." + + +CRITICAL OUTPUT FORMAT REQUIREMENT: +You MUST wrap your response in tags. This is MANDATORY - no exceptions. + +If the document content should be stored in memory: + +{{your_enriched_statement_here}} + + +If there is nothing worth remembering: + +NOTHING_TO_REMEMBER + + +ALWAYS include opening and closing tags around your entire response. +`; + + const userPrompt = ` + +${context.episodeContent} + + + +${context.source} + + + +${context.episodeTimestamp || "Not provided"} + + + +${context.sessionContext || "No previous chunks in this document session"} + + + +${context.relatedMemories} + + +`; + + return [ + { role: "system", content: sysPrompt }, + { role: "user", content: userPrompt }, + ]; +}; diff --git a/apps/webapp/app/trigger/ingest/ingest-document.ts b/apps/webapp/app/trigger/ingest/ingest-document.ts new file mode 100644 index 0000000..1ca7a75 --- /dev/null +++ b/apps/webapp/app/trigger/ingest/ingest-document.ts @@ -0,0 +1,154 @@ +import { queue, task } from "@trigger.dev/sdk"; +import { type z } from "zod"; +import crypto from "crypto"; + +import { IngestionStatus } from "@core/database"; +import { EpisodeTypeEnum, type DocumentNode } from "@core/types"; +import { logger } from "~/services/logger.service"; +import { DocumentChunker } from "~/services/documentChunker.server"; +import { saveDocument } from "~/services/graphModels/document"; +import { type IngestBodyRequest } from "~/lib/ingest.server"; +import { prisma } from "../utils/prisma"; +import { ingestTask } from "./ingest"; + +const documentIngestionQueue = queue({ + name: "document-ingestion-queue", + concurrencyLimit: 5, +}); + +// Register the Document Ingestion Trigger.dev task +export const ingestDocumentTask = task({ + id: "ingest-document", + queue: documentIngestionQueue, + machine: "medium-2x", + run: async (payload: { + body: z.infer; + userId: string; + workspaceId: string; + queueId: string; + }) => { + const startTime = Date.now(); + + try { + logger.log(`Processing document for user ${payload.userId}`, { + documentTitle: payload.body.documentTitle, + contentLength: payload.body.episodeBody.length, + }); + + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + status: IngestionStatus.PROCESSING, + }, + }); + + const documentBody = payload.body as any; + + // Step 1: Create document node + const document: DocumentNode = { + uuid: crypto.randomUUID(), + title: documentBody.documentTitle || "Untitled Document", + originalContent: documentBody.episodeBody, + metadata: documentBody.metadata || {}, + source: documentBody.source, + userId: payload.userId, + createdAt: new Date(), + validAt: new Date(documentBody.referenceTime), + totalChunks: 0, + documentId: documentBody.documentId, + sessionId: documentBody.sessionId, + }; + + await saveDocument(document); + + // Step 2: Chunk the document + const documentChunker = new DocumentChunker(); + const chunkedDocument = await documentChunker.chunkDocument( + documentBody.episodeBody, + documentBody.documentTitle, + ); + + logger.log( + `Document chunked into ${chunkedDocument.chunks.length} chunks`, + ); + + // Step 3: Queue each chunk as a separate episode + for (const chunk of chunkedDocument.chunks) { + const chunkEpisodeData = { + episodeBody: chunk.content, + referenceTime: documentBody.referenceTime, + metadata: documentBody.metadata, + source: documentBody.source, + spaceId: documentBody.spaceId, + sessionId: documentBody.sessionId, + type: EpisodeTypeEnum.DOCUMENT, + documentTitle: documentBody.documentTitle, + documentId: documentBody.documentId, + chunkIndex: chunk.chunkIndex, + }; + + const episodeHandler = await ingestTask.trigger( + { + body: chunkEpisodeData, + userId: payload.userId, + workspaceId: payload.workspaceId, + queueId: payload.queueId, + }, + { + queue: "ingestion-queue", + concurrencyKey: payload.userId, + tags: [payload.userId, payload.queueId], + }, + ); + + if (episodeHandler.id) { + logger.log( + `Queued chunk ${chunk.chunkIndex + 1}/${chunkedDocument.chunks.length} for processing`, + { + handlerId: episodeHandler.id, + chunkSize: chunk.content.length, + }, + ); + } + } + + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + output: { + documentUuid: document.uuid, + totalChunks: chunkedDocument.chunks.length, + episodes: [], + }, + status: IngestionStatus.PROCESSING, + }, + }); + + const processingTimeMs = Date.now() - startTime; + + logger.log( + `Document chunking processing completed in ${processingTimeMs}ms`, + { + documentUuid: document.uuid, + totalChunks: chunkedDocument.chunks.length, + }, + ); + + return { success: true }; + } catch (err: any) { + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + error: err.message, + status: IngestionStatus.FAILED, + }, + }); + + logger.error( + `Error processing document for user ${payload.userId}:`, + err, + ); + return { success: false, error: err.message }; + } + }, +}); diff --git a/apps/webapp/app/trigger/ingest/ingest.ts b/apps/webapp/app/trigger/ingest/ingest.ts index 79d635c..f134b10 100644 --- a/apps/webapp/app/trigger/ingest/ingest.ts +++ b/apps/webapp/app/trigger/ingest/ingest.ts @@ -6,6 +6,7 @@ import { IngestionStatus } from "@core/database"; import { logger } from "~/services/logger.service"; import { triggerSpaceAssignment } from "../spaces/space-assignment"; import { prisma } from "../utils/prisma"; +import { EpisodeType } from "@core/types"; export const IngestBodyRequest = z.object({ episodeBody: z.string(), @@ -14,6 +15,11 @@ export const IngestBodyRequest = z.object({ source: z.string(), spaceId: z.string().optional(), sessionId: z.string().optional(), + type: z + .enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT]) + .default(EpisodeType.CONVERSATION), + documentTitle: z.string().optional(), + documentId: z.string().optional(), }); const ingestionQueue = queue({ @@ -35,7 +41,7 @@ export const ingestTask = task({ try { logger.log(`Processing job for user ${payload.userId}`); - await prisma.ingestionQueue.update({ + const ingestionQueue = await prisma.ingestionQueue.update({ where: { id: payload.queueId }, data: { status: IngestionStatus.PROCESSING, @@ -54,11 +60,32 @@ export const ingestTask = task({ prisma, ); + let finalOutput = episodeDetails; + let episodeUuids: string[] = episodeDetails.episodeUuid + ? [episodeDetails.episodeUuid] + : []; + let currentStatus: IngestionStatus = IngestionStatus.COMPLETED; + if (episodeBody.type === EpisodeType.DOCUMENT) { + const currentOutput = ingestionQueue.output as any; + currentOutput.episodes.push(episodeDetails); + episodeUuids = currentOutput.episodes.map( + (episode: any) => episode.episodeUuid, + ); + + finalOutput = { + ...currentOutput, + }; + + if (currentOutput.episodes.length !== currentOutput.totalChunks) { + currentStatus = IngestionStatus.PROCESSING; + } + } + await prisma.ingestionQueue.update({ where: { id: payload.queueId }, data: { - output: episodeDetails, - status: IngestionStatus.COMPLETED, + output: finalOutput, + status: currentStatus, }, }); @@ -69,12 +96,15 @@ export const ingestTask = task({ workspaceId: payload.workspaceId, episodeId: episodeDetails?.episodeUuid, }); - if (episodeDetails.episodeUuid) { + if ( + episodeDetails.episodeUuid && + currentStatus === IngestionStatus.COMPLETED + ) { await triggerSpaceAssignment({ userId: payload.userId, workspaceId: payload.workspaceId, mode: "episode", - episodeId: episodeDetails.episodeUuid, + episodeIds: episodeUuids, }); } } catch (assignmentError) { diff --git a/apps/webapp/app/trigger/spaces/space-assignment.ts b/apps/webapp/app/trigger/spaces/space-assignment.ts index 7f80b6a..13970a7 100644 --- a/apps/webapp/app/trigger/spaces/space-assignment.ts +++ b/apps/webapp/app/trigger/spaces/space-assignment.ts @@ -25,7 +25,7 @@ interface SpaceAssignmentPayload { workspaceId: string; mode: "new_space" | "episode"; newSpaceId?: string; // For new_space mode - episodeId?: string; // For daily_batch mode (default: 1) + episodeIds?: string[]; // For daily_batch mode (default: 1) batchSize?: number; // Processing batch size } @@ -181,7 +181,7 @@ export const spaceAssignmentTask = task({ workspaceId, mode, newSpaceId, - episodeId, + episodeIds, batchSize = mode === "new_space" ? CONFIG.newSpaceMode.batchSize : CONFIG.episodeMode.batchSize, @@ -191,7 +191,7 @@ export const spaceAssignmentTask = task({ userId, mode, newSpaceId, - episodeId, + episodeIds, batchSize, }); @@ -213,7 +213,7 @@ export const spaceAssignmentTask = task({ // 2. Get statements to analyze based on mode const statements = await getStatementsToAnalyze(userId, mode, { newSpaceId, - episodeId, + episodeIds, }); if (statements.length === 0) { @@ -454,7 +454,7 @@ export const spaceAssignmentTask = task({ async function getStatementsToAnalyze( userId: string, mode: "new_space" | "episode", - options: { newSpaceId?: string; episodeId?: string }, + options: { newSpaceId?: string; episodeIds?: string[] }, ): Promise { let query: string; let params: any = { userId }; @@ -471,16 +471,19 @@ async function getStatementsToAnalyze( ORDER BY s.createdAt DESC `; } else { + // Optimized query: Use UNWIND for better performance with IN clause + // and combine entity lookups in single pattern query = ` - MATCH (e:Episode {uuid: $episodeId, userId: $userId})-[:HAS_PROVENANCE]->(s:Statement) + UNWIND $episodeIds AS episodeId + MATCH (e:Episode {uuid: episodeId, userId: $userId})-[:HAS_PROVENANCE]->(s:Statement) WHERE s.invalidAt IS NULL - MATCH (s)-[:HAS_SUBJECT]->(subj:Entity) - MATCH (s)-[:HAS_PREDICATE]->(pred:Entity) - MATCH (s)-[:HAS_OBJECT]->(obj:Entity) + MATCH (s)-[:HAS_SUBJECT]->(subj:Entity), + (s)-[:HAS_PREDICATE]->(pred:Entity), + (s)-[:HAS_OBJECT]->(obj:Entity) RETURN s, subj.name as subject, pred.name as predicate, obj.name as object ORDER BY s.createdAt DESC `; - params.episodeId = options.episodeId; + params.episodeIds = options.episodeIds; } const result = await runQuery(query, params); diff --git a/apps/webapp/app/trigger/utils/message-utils.ts b/apps/webapp/app/trigger/utils/message-utils.ts index bebddc4..83b84ef 100644 --- a/apps/webapp/app/trigger/utils/message-utils.ts +++ b/apps/webapp/app/trigger/utils/message-utils.ts @@ -1,4 +1,4 @@ -import { type Message } from "@core/types"; +import { EpisodeTypeEnum, type Message } from "@core/types"; import { addToQueue } from "./queue"; import { triggerWebhookDelivery } from "../webhooks/webhook-delivery"; import { logger } from "@trigger.dev/sdk"; @@ -149,6 +149,7 @@ export const createActivities = async ({ episodeBody: message.data.text, referenceTime: new Date().toISOString(), source: integrationAccount?.integrationDefinition.slug, + type: EpisodeTypeEnum.CONVERSATION, }; const queueResponse = await addToQueue( diff --git a/apps/webapp/app/utils/mcp/memory.ts b/apps/webapp/app/utils/mcp/memory.ts index 594b1a6..c6bd5a9 100644 --- a/apps/webapp/app/utils/mcp/memory.ts +++ b/apps/webapp/app/utils/mcp/memory.ts @@ -1,3 +1,4 @@ +import { EpisodeTypeEnum } from "@core/types"; import { addToQueue } from "~/lib/ingest.server"; import { logger } from "~/services/logger.service"; import { SearchService } from "~/services/search.server"; @@ -115,6 +116,7 @@ async function handleMemoryIngest(args: any) { episodeBody: args.message, referenceTime: new Date().toISOString(), source: args.source, + type: EpisodeTypeEnum.CONVERSATION, }, args.userId, ); diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 699bcf8..337b986 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -97,6 +97,7 @@ "execa": "^9.6.0", "express": "^4.18.1", "fast-sort": "^3.4.0", + "gpt-tokenizer": "^3.0.1", "graphology": "^0.26.0", "graphology-layout-force": "^0.2.4", "graphology-layout-forceatlas2": "^0.10.1", @@ -174,10 +175,10 @@ "prettier-plugin-tailwindcss": "^0.6.11", "tailwind-scrollbar": "^4.0.2", "tailwindcss": "4.1.7", + "tsx": "4.20.4", "typescript": "5.8.3", "vite": "^6.0.0", - "vite-tsconfig-paths": "^4.2.1", - "tsx": "4.20.4" + "vite-tsconfig-paths": "^4.2.1" }, "engines": { "node": ">=20.0.0" diff --git a/apps/webapp/server.js b/apps/webapp/server.js index 909dc3f..ae58f2c 100644 --- a/apps/webapp/server.js +++ b/apps/webapp/server.js @@ -16,7 +16,9 @@ async function init() { const build = viteDevServer ? () => viteDevServer.ssrLoadModule("virtual:remix/server-build") : await import("./build/server/index.js"); - const module = build.entry?.module; + const module = viteDevServer + ? (await build()).entry.module + : build.entry?.module; remixHandler = createRequestHandler({ build }); const app = express(); app.use(compression()); diff --git a/packages/types/src/graph/graph.entity.ts b/packages/types/src/graph/graph.entity.ts index 45d4684..5bc7e99 100644 --- a/packages/types/src/graph/graph.entity.ts +++ b/packages/types/src/graph/graph.entity.ts @@ -1,6 +1,19 @@ -export enum EpisodeType { - Conversation = "CONVERSATION", - Text = "TEXT", +/** + * Interface for document node in the reified knowledge graph + * Documents are parent containers for episodic chunks + */ +export interface DocumentNode { + uuid: string; + title: string; + originalContent: string; + metadata: Record; + source: string; + userId: string; + createdAt: Date; + validAt: Date; + totalChunks: number; + documentId?: string; + sessionId?: string; } /** @@ -21,6 +34,8 @@ export interface EpisodicNode { space?: string; sessionId?: string; recallCount?: number; + documentId?: string; + chunkIndex?: number; // Index of this chunk within the document } /** @@ -72,14 +87,31 @@ export interface Triple { provenance: EpisodicNode; } +export enum EpisodeTypeEnum { + CONVERSATION = "CONVERSATION", + DOCUMENT = "DOCUMENT", +} + +export const EpisodeType = { + CONVERSATION: "CONVERSATION", + DOCUMENT: "DOCUMENT", +}; + +export type EpisodeType = (typeof EpisodeType)[keyof typeof EpisodeType]; + export type AddEpisodeParams = { episodeBody: string; referenceTime: Date; - metadata: Record; + metadata?: Record; source: string; userId: string; spaceId?: string; sessionId?: string; + type?: EpisodeType; + documentTitle?: string; + documentId?: string; + chunkIndex?: number; + chunkContext?: string; }; export type AddEpisodeResult = { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ed90497..ab0e89b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -529,6 +529,9 @@ importers: fast-sort: specifier: ^3.4.0 version: 3.4.1 + gpt-tokenizer: + specifier: ^3.0.1 + version: 3.0.1 graphology: specifier: ^0.26.0 version: 0.26.0(graphology-types@0.24.8) @@ -7634,6 +7637,9 @@ packages: resolution: {integrity: sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==} engines: {node: '>= 0.4'} + gpt-tokenizer@3.0.1: + resolution: {integrity: sha512-5jdaspBq/w4sWw322SvQj1Fku+CN4OAfYZeeEg8U7CWtxBz+zkxZ3h0YOHD43ee+nZYZ5Ud70HRN0ANcdIj4qg==} + graceful-fs@4.2.11: resolution: {integrity: sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==} @@ -20045,6 +20051,8 @@ snapshots: gopd@1.2.0: {} + gpt-tokenizer@3.0.1: {} + graceful-fs@4.2.11: {} gradient-string@2.0.2: