Fix: credit deduction for document type

This commit is contained in:
Manoj 2025-10-10 08:47:55 +05:30
parent a3798db1e6
commit 3cec513421
4 changed files with 94 additions and 34 deletions

View File

@ -12,6 +12,7 @@ import { DocumentDifferentialService } from "~/services/documentDiffer.server";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { prisma } from "../utils/prisma";
import { ingestTask } from "./ingest";
import type { DocumentIngestionOutput } from "./types";
const documentIngestionQueue = queue({
name: "document-ingestion-queue",
@ -219,28 +220,30 @@ export const ingestDocumentTask = task({
chunksToProcess.length,
);
const documentOutput: DocumentIngestionOutput = {
documentUuid: document.uuid,
version: versionInfo.newVersion,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
processingMode,
differentialStrategy: differentialDecision.strategy,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: null,
episodes: [],
episodeHandlers,
};
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: {
documentUuid: document.uuid,
version: versionInfo.newVersion,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
processingMode,
differentialStrategy: differentialDecision.strategy,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: null,
episodes: [],
episodeHandlers,
},
output: documentOutput as any,
status: IngestionStatus.PROCESSING,
},
});

View File

@ -10,6 +10,10 @@ import { prisma } from "../utils/prisma";
import { EpisodeType } from "@core/types";
import { deductCredits, hasCredits } from "../utils/utils";
import { assignEpisodesToSpace } from "~/services/graphModels/space";
import type {
DocumentIngestionOutput,
IngestionQueueOutput,
} from "./types";
export const IngestBodyRequest = z.object({
episodeBody: z.string(),
@ -111,41 +115,53 @@ export const ingestTask = task({
}
}
let finalOutput = episodeDetails;
let finalOutput: IngestionQueueOutput = episodeDetails;
let episodeUuids: string[] = episodeDetails.episodeUuid
? [episodeDetails.episodeUuid]
: [];
let currentStatus: IngestionStatus = IngestionStatus.COMPLETED;
if (episodeBody.type === EpisodeType.DOCUMENT) {
const currentOutput = ingestionQueue.output as any;
currentOutput.episodes.push(episodeDetails);
episodeUuids = currentOutput.episodes.map(
(episode: any) => episode.episodeUuid,
);
// Get current output and ensure it has the document structure
const documentOutput = ingestionQueue.output as DocumentIngestionOutput | null;
if (documentOutput) {
// Add current episode to episodes array
documentOutput?.episodes.push(episodeDetails);
finalOutput = {
...currentOutput,
};
// Collect all episode UUIDs
episodeUuids = documentOutput.episodes
.map((episode) => episode.episodeUuid)
.filter((uuid): uuid is string => uuid !== undefined);
if (currentOutput.episodes.length !== currentOutput.totalChunks) {
currentStatus = IngestionStatus.PROCESSING;
finalOutput = documentOutput;
// Check if all chunks have been processed
if (documentOutput.episodes.length !== documentOutput.totalChunks) {
currentStatus = IngestionStatus.PROCESSING;
}
}
}
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: finalOutput,
output: finalOutput as any,
status: currentStatus,
},
});
// Deduct credits for episode creation
if (currentStatus === IngestionStatus.COMPLETED) {
if (episodeDetails.statementsCreated > 0) {
logger.info(`Deducting credits for episode creation`, {
userId: payload.userId,
workspaceId: payload.workspaceId,
statementsCreated: episodeDetails.statementsCreated,
});
await deductCredits(
payload.workspaceId,
"addEpisode",
finalOutput.statementsCreated,
episodeDetails.statementsCreated,
);
}

View File

@ -0,0 +1,40 @@
/**
* Shared type definitions for the ingestion pipeline
*/
// Token usage tracking structure
export interface TokenUsage {
high: { input: number; output: number; total: number };
low: { input: number; output: number; total: number };
}
// Output type for episode ingestion
export interface EpisodeIngestionOutput {
episodeUuid: string | null;
statementsCreated: number;
entitiesCreated?: number;
processingTimeMs?: number;
tokenUsage?: TokenUsage;
}
// Output type for document ingestion (tracks multiple chunks)
export interface DocumentIngestionOutput {
documentUuid: string | null;
version: number;
chunksProcessed: number;
chunksSkipped: number;
processingMode: string;
differentialStrategy: string;
estimatedSavings: string;
statementInvalidation: {
totalAnalyzed: number;
invalidated: number;
preserved: number;
} | null;
episodeHandlers?: string[];
episodes: EpisodeIngestionOutput[];
totalChunks: number;
}
// Union type for ingestion queue output
export type IngestionQueueOutput = EpisodeIngestionOutput | DocumentIngestionOutput;

View File

@ -439,6 +439,7 @@ CRITICAL RULES:
3. Write in a factual, neutral tone - avoid promotional language ("pivotal", "invaluable", "cutting-edge")
4. Be specific and concrete - reference actual content, patterns, and insights found in the episodes
5. If episodes are insufficient for meaningful insights, state that more data is needed
6. Episodes are presented in chronological order (oldest to newest) - use this to identify evolution, progression, and how understanding/focus changed over time
INTENT-DRIVEN SUMMARIZATION:
Your summary should SERVE the space's intended purpose. Examples:
@ -601,7 +602,7 @@ async function getSpaceEpisodes(
MATCH (space:Space {uuid: $spaceId, userId: $userId})-[:HAS_EPISODE]->(e:Episode {userId: $userId})
WHERE e IS NOT NULL ${dateCondition}
RETURN DISTINCT e
ORDER BY e.createdAt DESC
ORDER BY e.createdAt ASC
`;
const result = await runQuery(query, params);