diff --git a/apps/webapp/app/trigger/ingest/ingest-document.ts b/apps/webapp/app/trigger/ingest/ingest-document.ts index b461dd0..6934bf6 100644 --- a/apps/webapp/app/trigger/ingest/ingest-document.ts +++ b/apps/webapp/app/trigger/ingest/ingest-document.ts @@ -12,6 +12,7 @@ import { DocumentDifferentialService } from "~/services/documentDiffer.server"; import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; import { prisma } from "../utils/prisma"; import { ingestTask } from "./ingest"; +import type { DocumentIngestionOutput } from "./types"; const documentIngestionQueue = queue({ name: "document-ingestion-queue", @@ -219,28 +220,30 @@ export const ingestDocumentTask = task({ chunksToProcess.length, ); + const documentOutput: DocumentIngestionOutput = { + documentUuid: document.uuid, + version: versionInfo.newVersion, + totalChunks: chunkedDocument.chunks.length, + chunksProcessed: chunksToProcess.length, + chunksSkipped: costSavings.chunksSkipped, + processingMode, + differentialStrategy: differentialDecision.strategy, + estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, + statementInvalidation: invalidationResults + ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } + : null, + episodes: [], + episodeHandlers, + }; + await prisma.ingestionQueue.update({ where: { id: payload.queueId }, data: { - output: { - documentUuid: document.uuid, - version: versionInfo.newVersion, - totalChunks: chunkedDocument.chunks.length, - chunksProcessed: chunksToProcess.length, - chunksSkipped: costSavings.chunksSkipped, - processingMode, - differentialStrategy: differentialDecision.strategy, - estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, - statementInvalidation: invalidationResults - ? { - totalAnalyzed: invalidationResults.totalStatementsAnalyzed, - invalidated: invalidationResults.invalidatedStatements.length, - preserved: invalidationResults.preservedStatements.length, - } - : null, - episodes: [], - episodeHandlers, - }, + output: documentOutput as any, status: IngestionStatus.PROCESSING, }, }); diff --git a/apps/webapp/app/trigger/ingest/ingest.ts b/apps/webapp/app/trigger/ingest/ingest.ts index 66e382f..b302fa4 100644 --- a/apps/webapp/app/trigger/ingest/ingest.ts +++ b/apps/webapp/app/trigger/ingest/ingest.ts @@ -10,6 +10,10 @@ import { prisma } from "../utils/prisma"; import { EpisodeType } from "@core/types"; import { deductCredits, hasCredits } from "../utils/utils"; import { assignEpisodesToSpace } from "~/services/graphModels/space"; +import type { + DocumentIngestionOutput, + IngestionQueueOutput, +} from "./types"; export const IngestBodyRequest = z.object({ episodeBody: z.string(), @@ -111,41 +115,53 @@ export const ingestTask = task({ } } - let finalOutput = episodeDetails; + let finalOutput: IngestionQueueOutput = episodeDetails; let episodeUuids: string[] = episodeDetails.episodeUuid ? [episodeDetails.episodeUuid] : []; let currentStatus: IngestionStatus = IngestionStatus.COMPLETED; + if (episodeBody.type === EpisodeType.DOCUMENT) { - const currentOutput = ingestionQueue.output as any; - currentOutput.episodes.push(episodeDetails); - episodeUuids = currentOutput.episodes.map( - (episode: any) => episode.episodeUuid, - ); + // Get current output and ensure it has the document structure + const documentOutput = ingestionQueue.output as DocumentIngestionOutput | null; + if (documentOutput) { + // Add current episode to episodes array + documentOutput?.episodes.push(episodeDetails); - finalOutput = { - ...currentOutput, - }; + // Collect all episode UUIDs + episodeUuids = documentOutput.episodes + .map((episode) => episode.episodeUuid) + .filter((uuid): uuid is string => uuid !== undefined); - if (currentOutput.episodes.length !== currentOutput.totalChunks) { - currentStatus = IngestionStatus.PROCESSING; + finalOutput = documentOutput; + + // Check if all chunks have been processed + if (documentOutput.episodes.length !== documentOutput.totalChunks) { + currentStatus = IngestionStatus.PROCESSING; + } } } await prisma.ingestionQueue.update({ where: { id: payload.queueId }, data: { - output: finalOutput, + output: finalOutput as any, status: currentStatus, }, }); // Deduct credits for episode creation - if (currentStatus === IngestionStatus.COMPLETED) { + if (episodeDetails.statementsCreated > 0) { + logger.info(`Deducting credits for episode creation`, { + userId: payload.userId, + workspaceId: payload.workspaceId, + statementsCreated: episodeDetails.statementsCreated, + }); + await deductCredits( payload.workspaceId, "addEpisode", - finalOutput.statementsCreated, + episodeDetails.statementsCreated, ); } diff --git a/apps/webapp/app/trigger/ingest/types.ts b/apps/webapp/app/trigger/ingest/types.ts new file mode 100644 index 0000000..225bafc --- /dev/null +++ b/apps/webapp/app/trigger/ingest/types.ts @@ -0,0 +1,40 @@ +/** + * Shared type definitions for the ingestion pipeline + */ + +// Token usage tracking structure +export interface TokenUsage { + high: { input: number; output: number; total: number }; + low: { input: number; output: number; total: number }; +} + +// Output type for episode ingestion +export interface EpisodeIngestionOutput { + episodeUuid: string | null; + statementsCreated: number; + entitiesCreated?: number; + processingTimeMs?: number; + tokenUsage?: TokenUsage; +} + +// Output type for document ingestion (tracks multiple chunks) +export interface DocumentIngestionOutput { + documentUuid: string | null; + version: number; + chunksProcessed: number; + chunksSkipped: number; + processingMode: string; + differentialStrategy: string; + estimatedSavings: string; + statementInvalidation: { + totalAnalyzed: number; + invalidated: number; + preserved: number; + } | null; + episodeHandlers?: string[]; + episodes: EpisodeIngestionOutput[]; + totalChunks: number; +} + +// Union type for ingestion queue output +export type IngestionQueueOutput = EpisodeIngestionOutput | DocumentIngestionOutput; diff --git a/apps/webapp/app/trigger/spaces/space-summary.ts b/apps/webapp/app/trigger/spaces/space-summary.ts index 3f95831..eb47c9c 100644 --- a/apps/webapp/app/trigger/spaces/space-summary.ts +++ b/apps/webapp/app/trigger/spaces/space-summary.ts @@ -439,6 +439,7 @@ CRITICAL RULES: 3. Write in a factual, neutral tone - avoid promotional language ("pivotal", "invaluable", "cutting-edge") 4. Be specific and concrete - reference actual content, patterns, and insights found in the episodes 5. If episodes are insufficient for meaningful insights, state that more data is needed +6. Episodes are presented in chronological order (oldest to newest) - use this to identify evolution, progression, and how understanding/focus changed over time INTENT-DRIVEN SUMMARIZATION: Your summary should SERVE the space's intended purpose. Examples: @@ -601,7 +602,7 @@ async function getSpaceEpisodes( MATCH (space:Space {uuid: $spaceId, userId: $userId})-[:HAS_EPISODE]->(e:Episode {userId: $userId}) WHERE e IS NOT NULL ${dateCondition} RETURN DISTINCT e - ORDER BY e.createdAt DESC + ORDER BY e.createdAt ASC `; const result = await runQuery(query, params);