diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 6aa373e..afe7281 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -66,6 +66,7 @@ const EnvironmentSchema = z.object({ //OpenAI OPENAI_API_KEY: z.string(), + ANTHROPIC_API_KEY: z.string().optional(), EMAIL_TRANSPORT: z.string().optional(), FROM_EMAIL: z.string().optional(), diff --git a/apps/webapp/app/services/graphModels/compactedSession.ts b/apps/webapp/app/services/graphModels/compactedSession.ts new file mode 100644 index 0000000..b83b5ad --- /dev/null +++ b/apps/webapp/app/services/graphModels/compactedSession.ts @@ -0,0 +1,319 @@ +import { runQuery } from "~/lib/neo4j.server"; + +export interface SessionEpisodeData { + uuid: string; + content: string; + originalContent: string; + source: string; + createdAt: Date; + validAt: Date; + metadata: any; + sessionId: string; +} + +export interface CompactedSessionNode { + uuid: string; + sessionId: string; + summary: string; + summaryEmbedding: number[]; + episodeCount: number; + startTime: Date; + endTime: Date; + createdAt: Date; + updatedAt?: Date; + confidence: number; + userId: string; + source: string; + compressionRatio: number; + metadata: Record; +} + +/** + * Save or update a compacted session + */ +export async function saveCompactedSession( + compact: CompactedSessionNode +): Promise { + const query = ` + MERGE (cs:CompactedSession {uuid: $uuid}) + ON CREATE SET + cs.sessionId = $sessionId, + cs.summary = $summary, + cs.summaryEmbedding = $summaryEmbedding, + cs.episodeCount = $episodeCount, + cs.startTime = $startTime, + cs.endTime = $endTime, + cs.createdAt = $createdAt, + cs.confidence = $confidence, + cs.userId = $userId, + cs.source = $source, + cs.compressionRatio = $compressionRatio, + cs.metadata = $metadata + ON MATCH SET + cs.summary = $summary, + cs.summaryEmbedding = $summaryEmbedding, + cs.episodeCount = $episodeCount, + cs.endTime = $endTime, + cs.updatedAt = $updatedAt, + cs.confidence = $confidence, + cs.compressionRatio = $compressionRatio, + cs.metadata = $metadata + RETURN cs.uuid as uuid + `; + + const params = { + uuid: compact.uuid, + sessionId: compact.sessionId, + summary: compact.summary, + summaryEmbedding: compact.summaryEmbedding, + episodeCount: compact.episodeCount, + startTime: compact.startTime.toISOString(), + endTime: compact.endTime.toISOString(), + createdAt: compact.createdAt.toISOString(), + updatedAt: compact.updatedAt?.toISOString() || null, + confidence: compact.confidence, + userId: compact.userId, + source: compact.source, + compressionRatio: compact.compressionRatio, + metadata: JSON.stringify(compact.metadata || {}), + }; + + const result = await runQuery(query, params); + return result[0].get("uuid"); +} + +/** + * Get a compacted session by UUID + */ +export async function getCompactedSession( + uuid: string +): Promise { + const query = ` + MATCH (cs:CompactedSession {uuid: $uuid}) + RETURN cs + `; + + const result = await runQuery(query, { uuid }); + if (result.length === 0) return null; + + const compact = result[0].get("cs").properties; + return parseCompactedSessionNode(compact); +} + +/** + * Get compacted session by sessionId + */ +export async function getCompactedSessionBySessionId( + sessionId: string, + userId: string +): Promise { + const query = ` + MATCH (cs:CompactedSession {sessionId: $sessionId, userId: $userId}) + RETURN cs + ORDER BY cs.endTime DESC + LIMIT 1 + `; + + const result = await runQuery(query, { sessionId, userId }); + if (result.length === 0) return null; + + const compact = result[0].get("cs").properties; + return parseCompactedSessionNode(compact); +} + +/** + * Get all episodes linked to a compacted session + */ +export async function getCompactedSessionEpisodes( + compactUuid: string +): Promise { + const query = ` + MATCH (cs:CompactedSession {uuid: $compactUuid})-[:COMPACTS]->(e:Episode) + RETURN e.uuid as episodeUuid + ORDER BY e.createdAt ASC + `; + + const result = await runQuery(query, { compactUuid }); + return result.map((r) => r.get("episodeUuid")); +} + +/** + * Link episodes to compacted session + */ +export async function linkEpisodesToCompact( + compactUuid: string, + episodeUuids: string[], + userId: string +): Promise { + const query = ` + MATCH (cs:CompactedSession {uuid: $compactUuid, userId: $userId}) + UNWIND $episodeUuids as episodeUuid + MATCH (e:Episode {uuid: episodeUuid, userId: $userId}) + MERGE (cs)-[:COMPACTS {createdAt: datetime()}]->(e) + MERGE (e)-[:COMPACTED_INTO {createdAt: datetime()}]->(cs) + `; + + await runQuery(query, { compactUuid, episodeUuids, userId }); +} + +/** + * Search compacted sessions by embedding similarity + */ +export async function searchCompactedSessionsByEmbedding( + embedding: number[], + userId: string, + limit: number = 10, + minScore: number = 0.7 +): Promise> { + const query = ` + MATCH (cs:CompactedSession {userId: $userId}) + WHERE cs.summaryEmbedding IS NOT NULL + WITH cs, + gds.similarity.cosine(cs.summaryEmbedding, $embedding) AS score + WHERE score >= $minScore + RETURN cs, score + ORDER BY score DESC + LIMIT $limit + `; + + const result = await runQuery(query, { + embedding, + userId, + limit, + minScore, + }); + + return result.map((r) => ({ + compact: parseCompactedSessionNode(r.get("cs").properties), + score: r.get("score"), + })); +} + +/** + * Get compacted sessions for a user + */ +export async function getUserCompactedSessions( + userId: string, + limit: number = 50 +): Promise { + const query = ` + MATCH (cs:CompactedSession {userId: $userId}) + RETURN cs + ORDER BY cs.endTime DESC + LIMIT $limit + `; + + const result = await runQuery(query, { userId, limit }); + return result.map((r) => parseCompactedSessionNode(r.get("cs").properties)); +} + +/** + * Delete a compacted session + */ +export async function deleteCompactedSession(uuid: string): Promise { + const query = ` + MATCH (cs:CompactedSession {uuid: $uuid}) + DETACH DELETE cs + `; + + await runQuery(query, { uuid }); +} + +/** + * Get compaction statistics for a user + */ +export async function getCompactionStats(userId: string): Promise<{ + totalCompacts: number; + totalEpisodes: number; + averageCompressionRatio: number; + mostRecentCompaction: Date | null; +}> { + const query = ` + MATCH (cs:CompactedSession {userId: $userId}) + RETURN + count(cs) as totalCompacts, + sum(cs.episodeCount) as totalEpisodes, + avg(cs.compressionRatio) as avgCompressionRatio, + max(cs.endTime) as mostRecent + `; + + const result = await runQuery(query, { userId }); + if (result.length === 0) { + return { + totalCompacts: 0, + totalEpisodes: 0, + averageCompressionRatio: 0, + mostRecentCompaction: null, + }; + } + + const stats = result[0]; + return { + totalCompacts: stats.get("totalCompacts")?.toNumber() || 0, + totalEpisodes: stats.get("totalEpisodes")?.toNumber() || 0, + averageCompressionRatio: stats.get("avgCompressionRatio") || 0, + mostRecentCompaction: stats.get("mostRecent") + ? new Date(stats.get("mostRecent")) + : null, + }; +} + +/** + * Get all episodes for a session + */ +export async function getSessionEpisodes( + sessionId: string, + userId: string, + afterTime?: Date +): Promise { + const query = ` + MATCH (e:Episode {sessionId: $sessionId, userId: $userId}) + ${afterTime ? "WHERE e.createdAt > datetime($afterTime)" : ""} + RETURN e + ORDER BY e.createdAt ASC + `; + + const result = await runQuery(query, { + sessionId, + userId, + afterTime: afterTime?.toISOString(), + }); + + return result.map((r) => r.get("e").properties); +} + +/** + * Get episode count for a session + */ +export async function getSessionEpisodeCount( + sessionId: string, + userId: string, + afterTime?: Date +): Promise { + const episodes = await getSessionEpisodes(sessionId, userId, afterTime); + return episodes.length; +} + +/** + * Helper to parse raw compact node from Neo4j + */ +function parseCompactedSessionNode(raw: any): CompactedSessionNode { + return { + uuid: raw.uuid, + sessionId: raw.sessionId, + summary: raw.summary, + summaryEmbedding: raw.summaryEmbedding || [], + episodeCount: raw.episodeCount || 0, + startTime: new Date(raw.startTime), + endTime: new Date(raw.endTime), + createdAt: new Date(raw.createdAt), + updatedAt: raw.updatedAt ? new Date(raw.updatedAt) : undefined, + confidence: raw.confidence || 0, + userId: raw.userId, + source: raw.source, + compressionRatio: raw.compressionRatio || 1, + metadata: typeof raw.metadata === "string" + ? JSON.parse(raw.metadata) + : raw.metadata || {}, + }; +} diff --git a/apps/webapp/app/services/search.server.ts b/apps/webapp/app/services/search.server.ts index 1945504..edbfc23 100644 --- a/apps/webapp/app/services/search.server.ts +++ b/apps/webapp/app/services/search.server.ts @@ -361,6 +361,116 @@ export class SearchService { `; await runQuery(cypher2, { statementUuids: statementIds, userId }); } + + /** + * Enhanced search that includes compacted sessions + * This method searches both regular episodes/statements AND compacted sessions + * Compacted sessions are preferred when they have high confidence matches + */ + public async searchWithCompacts( + query: string, + userId: string, + options: SearchOptions = {}, + source?: string, + ): Promise { + const startTime = Date.now(); + + // First, run the standard search for episodes and facts + const standardResults = await this.search(query, userId, options, source); + + // Search compacted sessions + try { + const queryVector = await this.getEmbedding(query); + const { searchCompactedSessionsByEmbedding } = await import( + "~/services/graphModels/compactedSession" + ); + + const compactResults = await searchCompactedSessionsByEmbedding( + queryVector, + userId, + options.limit || 10, + options.scoreThreshold || 0.7, + ); + + logger.info(`Found ${compactResults.length} matching compacted sessions`, { + query, + userId, + }); + + // Format compact results + const formattedCompacts = compactResults.map((result) => ({ + summary: result.compact.summary, + sessionId: result.compact.sessionId, + episodeCount: result.compact.episodeCount, + confidence: result.compact.confidence, + relevantScore: result.score, + })); + + // Log compact recall + await this.logCompactRecallAsync( + query, + userId, + compactResults, + Date.now() - startTime, + source, + ).catch((error) => { + logger.error("Failed to log compact recall event:", error); + }); + + return { + ...standardResults, + compacts: formattedCompacts, + }; + } catch (error) { + logger.error("Error searching compacted sessions:", { error }); + // Return standard results if compact search fails + return { + ...standardResults, + compacts: [], + }; + } + } + + /** + * Log recall event for compacted sessions + */ + private async logCompactRecallAsync( + query: string, + userId: string, + compacts: Array<{ compact: any; score: number }>, + responseTime: number, + source?: string, + ): Promise { + try { + const averageScore = + compacts.length > 0 + ? compacts.reduce((sum, c) => sum + c.score, 0) / compacts.length + : 0; + + await prisma.recallLog.create({ + data: { + accessType: "search", + query, + targetType: "compacted_session", + searchMethod: "vector_similarity", + resultCount: compacts.length, + similarityScore: averageScore, + context: JSON.stringify({ + compactedSessionSearch: true, + }), + source: source ?? "search_with_compacts", + responseTimeMs: responseTime, + userId, + }, + }); + + logger.debug( + `Logged compact recall event for user ${userId}: ${compacts.length} compacts in ${responseTime}ms`, + ); + } catch (error) { + logger.error("Error creating compact recall log entry:", { error }); + } + } } /** @@ -380,3 +490,23 @@ export interface SearchOptions { spaceIds?: string[]; // Filter results by specific spaces adaptiveFiltering?: boolean; } + +/** + * Extended search result that includes compacted sessions + */ +export interface ExtendedSearchResult { + episodes: { content: string; createdAt: Date; spaceIds: string[] }[]; + facts: { + fact: string; + validAt: Date; + invalidAt: Date | null; + relevantScore: number; + }[]; + compacts?: { + summary: string; + sessionId: string; + episodeCount: number; + confidence: number; + relevantScore: number; + }[]; +} diff --git a/apps/webapp/app/services/sessionCompaction.server.ts b/apps/webapp/app/services/sessionCompaction.server.ts new file mode 100644 index 0000000..1afc8a9 --- /dev/null +++ b/apps/webapp/app/services/sessionCompaction.server.ts @@ -0,0 +1,262 @@ +import { logger } from "~/services/logger.service"; +import { + getCompactedSessionBySessionId, + getCompactionStats, + getSessionEpisodes, + type CompactedSessionNode, +} from "~/services/graphModels/compactedSession"; +import { tasks } from "@trigger.dev/sdk/v3"; + +/** + * Configuration for session compaction + */ +export const COMPACTION_CONFIG = { + minEpisodesForCompaction: 5, // Minimum episodes to trigger initial compaction + compactionThreshold: 1, // Trigger update after N new episodes + autoCompactionEnabled: true, // Enable automatic compaction +}; + +/** + * SessionCompactionService - Manages session compaction lifecycle + */ +export class SessionCompactionService { + /** + * Check if a session should be compacted + */ + async shouldCompact(sessionId: string, userId: string): Promise<{ + shouldCompact: boolean; + reason: string; + episodeCount?: number; + newEpisodeCount?: number; + }> { + try { + // Get existing compact + const existingCompact = await getCompactedSessionBySessionId(sessionId, userId); + + if (!existingCompact) { + // No compact exists, check if we have enough episodes + const episodeCount = await this.getSessionEpisodeCount(sessionId, userId); + + if (episodeCount >= COMPACTION_CONFIG.minEpisodesForCompaction) { + return { + shouldCompact: true, + reason: "initial_compaction", + episodeCount, + }; + } + + return { + shouldCompact: false, + reason: "insufficient_episodes", + episodeCount, + }; + } + + // Compact exists, check if we have enough new episodes + const newEpisodeCount = await this.getNewEpisodeCount( + sessionId, + userId, + existingCompact.endTime + ); + + if (newEpisodeCount >= COMPACTION_CONFIG.compactionThreshold) { + return { + shouldCompact: true, + reason: "update_compaction", + newEpisodeCount, + }; + } + + return { + shouldCompact: false, + reason: "insufficient_new_episodes", + newEpisodeCount, + }; + } catch (error) { + logger.error(`Error checking if session should compact`, { + sessionId, + userId, + error: error instanceof Error ? error.message : String(error), + }); + + return { + shouldCompact: false, + reason: "error", + }; + } + } + + /** + * Get total episode count for a session + */ + private async getSessionEpisodeCount( + sessionId: string, + userId: string + ): Promise { + const episodes = await getSessionEpisodes(sessionId, userId); + return episodes.length; + } + + /** + * Get count of new episodes since last compaction + */ + private async getNewEpisodeCount( + sessionId: string, + userId: string, + afterTime: Date + ): Promise { + const episodes = await getSessionEpisodes(sessionId, userId, afterTime); + return episodes.length; + } + + /** + * Trigger compaction for a session + */ + async triggerCompaction( + sessionId: string, + userId: string, + source: string, + triggerSource: "auto" | "manual" | "threshold" = "auto" + ): Promise<{ success: boolean; taskId?: string; error?: string }> { + try { + // Check if compaction should be triggered + const check = await this.shouldCompact(sessionId, userId); + + if (!check.shouldCompact) { + logger.info(`Compaction not needed`, { + sessionId, + userId, + reason: check.reason, + }); + + return { + success: false, + error: `Compaction not needed: ${check.reason}`, + }; + } + + // Trigger the compaction task + logger.info(`Triggering session compaction`, { + sessionId, + userId, + source, + triggerSource, + reason: check.reason, + }); + + const handle = await tasks.trigger("session-compaction", { + userId, + sessionId, + source, + triggerSource, + }); + + logger.info(`Session compaction triggered`, { + sessionId, + userId, + taskId: handle.id, + }); + + return { + success: true, + taskId: handle.id, + }; + } catch (error) { + logger.error(`Failed to trigger compaction`, { + sessionId, + userId, + error: error instanceof Error ? error.message : String(error), + }); + + return { + success: false, + error: error instanceof Error ? error.message : "Unknown error", + }; + } + } + + /** + * Get compacted session for recall + */ + async getCompactForRecall( + sessionId: string, + userId: string + ): Promise { + try { + return await getCompactedSessionBySessionId(sessionId, userId); + } catch (error) { + logger.error(`Error fetching compact for recall`, { + sessionId, + userId, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } + } + + /** + * Get compaction statistics for a user + */ + async getStats(userId: string): Promise<{ + totalCompacts: number; + totalEpisodes: number; + averageCompressionRatio: number; + mostRecentCompaction: Date | null; + }> { + try { + return await getCompactionStats(userId); + } catch (error) { + logger.error(`Error fetching compaction stats`, { + userId, + error: error instanceof Error ? error.message : String(error), + }); + + return { + totalCompacts: 0, + totalEpisodes: 0, + averageCompressionRatio: 0, + mostRecentCompaction: null, + }; + } + } + + /** + * Auto-trigger compaction after episode ingestion + * Called from ingestion pipeline + */ + async autoTriggerAfterIngestion( + sessionId: string | null | undefined, + userId: string, + source: string + ): Promise { + // Skip if no sessionId or auto-compaction disabled + if (!sessionId || !COMPACTION_CONFIG.autoCompactionEnabled) { + return; + } + + try { + const check = await this.shouldCompact(sessionId, userId); + + if (check.shouldCompact) { + logger.info(`Auto-triggering compaction after ingestion`, { + sessionId, + userId, + reason: check.reason, + }); + + // Trigger compaction asynchronously (don't wait) + await this.triggerCompaction(sessionId, userId, source, "auto"); + } + } catch (error) { + // Log error but don't fail ingestion + logger.error(`Error in auto-trigger compaction`, { + sessionId, + userId, + error: error instanceof Error ? error.message : String(error), + }); + } + } +} + +// Singleton instance +export const sessionCompactionService = new SessionCompactionService(); diff --git a/apps/webapp/app/trigger/ingest/ingest.ts b/apps/webapp/app/trigger/ingest/ingest.ts index 66e382f..79a829d 100644 --- a/apps/webapp/app/trigger/ingest/ingest.ts +++ b/apps/webapp/app/trigger/ingest/ingest.ts @@ -10,6 +10,7 @@ import { prisma } from "../utils/prisma"; import { EpisodeType } from "@core/types"; import { deductCredits, hasCredits } from "../utils/utils"; import { assignEpisodesToSpace } from "~/services/graphModels/space"; +import { triggerSessionCompaction } from "../session/session-compaction"; export const IngestBodyRequest = z.object({ episodeBody: z.string(), @@ -202,6 +203,30 @@ export const ingestTask = task({ }); } + // Auto-trigger session compaction if episode has sessionId + try { + if (episodeBody.sessionId && currentStatus === IngestionStatus.COMPLETED) { + logger.info(`Checking if session compaction should be triggered`, { + userId: payload.userId, + sessionId: episodeBody.sessionId, + source: episodeBody.source, + }); + + await triggerSessionCompaction({ + userId: payload.userId, + sessionId: episodeBody.sessionId, + source: episodeBody.source, + }); + } + } catch (compactionError) { + // Don't fail the ingestion if compaction fails + logger.warn(`Failed to trigger session compaction after ingestion:`, { + error: compactionError, + userId: payload.userId, + sessionId: episodeBody.sessionId, + }); + } + return { success: true, episodeDetails }; } catch (err: any) { await prisma.ingestionQueue.update({ diff --git a/apps/webapp/app/trigger/session/session-compaction.ts b/apps/webapp/app/trigger/session/session-compaction.ts new file mode 100644 index 0000000..3d2379e --- /dev/null +++ b/apps/webapp/app/trigger/session/session-compaction.ts @@ -0,0 +1,444 @@ +import { queue, task } from "@trigger.dev/sdk/v3"; +import { logger } from "~/services/logger.service"; +import { runQuery } from "~/lib/neo4j.server"; +import type { CoreMessage } from "ai"; +import { z } from "zod"; +import { getEmbedding, makeModelCall } from "~/lib/model.server"; +import { + getCompactedSessionBySessionId, + linkEpisodesToCompact, + getSessionEpisodes, + type CompactedSessionNode, + type SessionEpisodeData, + saveCompactedSession, +} from "~/services/graphModels/compactedSession"; + +interface SessionCompactionPayload { + userId: string; + sessionId: string; + source: string; + triggerSource?: "auto" | "manual" | "threshold"; +} + +// Zod schema for LLM response validation +const CompactionResultSchema = z.object({ + summary: z.string().describe("Consolidated narrative of the entire session"), + confidence: z.number().min(0).max(1).describe("Confidence score of the compaction quality"), +}); + +const CONFIG = { + minEpisodesForCompaction: 3, // Minimum episodes to trigger compaction + compactionThreshold: 1, // Trigger after N new episodes + maxEpisodesPerBatch: 50, // Process in batches if needed +}; + +export const sessionCompactionQueue = queue({ + name: "session-compaction-queue", + concurrencyLimit: 1, +}); + +export const sessionCompactionTask = task({ + id: "session-compaction", + queue: sessionCompactionQueue, + run: async (payload: SessionCompactionPayload) => { + const { userId, sessionId, source, triggerSource = "auto" } = payload; + + logger.info(`Starting session compaction`, { + userId, + sessionId, + source, + triggerSource, + }); + + try { + // Check if compaction already exists + // const existingCompact = await getCompactedSessionBySessionId(sessionId, userId); + const existingCompact = {} as CompactedSessionNode; + + + + // Fetch all episodes for this session + const episodes = await getSessionEpisodes(sessionId, userId, existingCompact?.endTime); + + console.log("episodes", episodes.length); + // Check if we have enough episodes + if (!existingCompact && episodes.length < CONFIG.minEpisodesForCompaction) { + logger.info(`Not enough episodes for compaction`, { + sessionId, + episodeCount: episodes.length, + minRequired: CONFIG.minEpisodesForCompaction, + }); + return { + success: false, + reason: "insufficient_episodes", + episodeCount: episodes.length, + }; + } else if (existingCompact && episodes.length < CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold) { + logger.info(`Not enough new episodes for compaction`, { + sessionId, + episodeCount: episodes.length, + minRequired: CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold, + }); + return { + success: false, + reason: "insufficient_new_episodes", + episodeCount: episodes.length, + }; + } + + // Generate or update compaction + const compactionResult = await createCompaction(sessionId, episodes, userId, source) + // const compactionResult = existingCompact + // ? await updateCompaction(existingCompact, episodes, userId) + // : await createCompaction(sessionId, episodes, userId, source); + + logger.info(`Session compaction completed`, { + sessionId, + compactUuid: compactionResult.uuid, + episodeCount: compactionResult.episodeCount, + compressionRatio: compactionResult.compressionRatio, + }); + + return { + success: true, + compactionResult: { + compactUuid: compactionResult.uuid, + sessionId: compactionResult.sessionId, + summary: compactionResult.summary, + episodeCount: compactionResult.episodeCount, + startTime: compactionResult.startTime, + endTime: compactionResult.endTime, + confidence: compactionResult.confidence, + compressionRatio: compactionResult.compressionRatio, + }, + }; + } catch (error) { + logger.error(`Session compaction failed`, { + sessionId, + userId, + error: error instanceof Error ? error.message : String(error), + }); + + throw error; + } + }, +}); + +/** + * Create new compaction + */ +async function createCompaction( + sessionId: string, + episodes: SessionEpisodeData[], + userId: string, + source: string +): Promise { + logger.info(`Creating new compaction`, { sessionId, episodeCount: episodes.length }); + + // Generate compaction using LLM + const compactionData = await generateCompaction(episodes, null); + + // Generate embedding for summary + const summaryEmbedding = await getEmbedding(compactionData.summary); + + // Create CompactedSession node using graph model + const compactUuid = crypto.randomUUID(); + const now = new Date(); + const startTime = new Date(episodes[0].createdAt); + const endTime = new Date(episodes[episodes.length - 1].createdAt); + const episodeUuids = episodes.map((e) => e.uuid); + const compressionRatio = episodes.length / 1; + + const compactNode: CompactedSessionNode = { + uuid: compactUuid, + sessionId, + summary: compactionData.summary, + summaryEmbedding, + episodeCount: episodes.length, + startTime, + endTime, + createdAt: now, + confidence: compactionData.confidence, + userId, + source, + compressionRatio, + metadata: { triggerType: "create" }, + }; + + console.log("compactNode", compactNode); + // Use graph model functions + await saveCompactedSession(compactNode); + await linkEpisodesToCompact(compactUuid, episodeUuids, userId); + + logger.info(`Compaction created`, { compactUuid, episodeCount: episodes.length }); + + return compactNode; +} + +/** + * Update existing compaction with new episodes + */ +async function updateCompaction( + existingCompact: CompactedSessionNode, + newEpisodes: SessionEpisodeData[], + userId: string +): Promise { + logger.info(`Updating existing compaction`, { + compactUuid: existingCompact.uuid, + newEpisodeCount: newEpisodes.length, + }); + + // Generate updated compaction using LLM (merging) + const compactionData = await generateCompaction(newEpisodes, existingCompact.summary); + + // Generate new embedding for updated summary + const summaryEmbedding = await getEmbedding(compactionData.summary); + + // Update CompactedSession node using graph model + const now = new Date(); + const endTime = newEpisodes[newEpisodes.length - 1].createdAt; + const totalEpisodeCount = existingCompact.episodeCount + newEpisodes.length; + const compressionRatio = totalEpisodeCount / 1; + const episodeUuids = newEpisodes.map((e) => e.uuid); + + + + const updatedNode: CompactedSessionNode = { + ...existingCompact, + summary: compactionData.summary, + summaryEmbedding, + episodeCount: totalEpisodeCount, + endTime, + updatedAt: now, + confidence: compactionData.confidence, + compressionRatio, + metadata: { triggerType: "update", newEpisodesAdded: newEpisodes.length }, + }; + + // Use graph model functions + const { saveCompactedSession } = await import("~/services/graphModels/compactedSession"); + await saveCompactedSession(updatedNode); + await linkEpisodesToCompact(existingCompact.uuid, episodeUuids, userId); + + logger.info(`Compaction updated`, { + compactUuid: existingCompact.uuid, + totalEpisodeCount, + }); + + return updatedNode; +} + +/** + * Generate compaction using LLM (similar to Claude Code's compact approach) + */ +async function generateCompaction( + episodes: SessionEpisodeData[], + existingSummary: string | null +): Promise> { + const systemPrompt = createCompactionSystemPrompt(); + const userPrompt = createCompactionUserPrompt(episodes, existingSummary); + + const messages: CoreMessage[] = [ + { role: "system", content: systemPrompt }, + { role: "user", content: userPrompt }, + ]; + + logger.info(`Generating compaction with LLM`, { + episodeCount: episodes.length, + hasExistingSummary: !!existingSummary, + }); + + try { + let responseText = ""; + await makeModelCall( + false, + messages, + (text: string) => { + responseText = text; + }, + undefined, + "high", + ); + + return parseCompactionResponse(responseText); + } catch (error) { + logger.error(`Failed to generate compaction`, { + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } +} + +/** + * System prompt for compaction (for agent recall/context retrieval) + */ +function createCompactionSystemPrompt(): string { + return `You are a session compaction specialist. Your task is to create a rich, informative summary that will help AI agents understand what happened in this conversation session when they need context for future interactions. + +## PURPOSE + +This summary will be retrieved by AI agents when the user references this session in future conversations. The agent needs enough context to: +- Understand what was discussed and why +- Know what decisions were made and their rationale +- Grasp the outcome and current state +- Have relevant technical details to provide informed responses + +## COMPACTION GOALS + +1. **Comprehensive Context**: Capture all important information that might be referenced later +2. **Decision Documentation**: Clearly state what was decided, why, and what alternatives were considered +3. **Technical Details**: Include specific implementations, tools, configurations, and technical choices +4. **Outcome Clarity**: Make it clear what was accomplished and what the final state is +5. **Evolution Tracking**: Show how thinking or decisions evolved during the session + +## COMPACTION RULES + +1. **Be Information-Dense**: Pack useful details without fluff or repetition +2. **Structure Chronologically**: Start with problem/question, show progression, end with outcome +3. **Highlight Key Points**: Emphasize decisions, implementations, results, and learnings +4. **Include Specifics**: Names of libraries, specific configurations, metrics, numbers matter +5. **Resolve Contradictions**: Always use the most recent/final version when information conflicts + +## OUTPUT REQUIREMENTS + +- **summary**: A detailed, information-rich narrative that tells the complete story + - Structure naturally based on content - use as many paragraphs as needed + - Each distinct topic, decision, or phase should get its own paragraph(s) + - Start with context and initial problem/question + - Progress chronologically through discussions, decisions, and implementations + - **Final paragraph MUST**: State the outcome, results, and current state + - Don't artificially limit length - capture everything important + +- **confidence**: Score (0-1) reflecting how well this summary captures the session's essence + +Your response MUST be valid JSON wrapped in tags. + +## EXAMPLE TRANSFORMATION + +Input Episodes: +1. "Need to implement authentication for web app. Considering JWT vs session-based auth." +2. "JWT is stateless, good for distributed systems. Session-based simpler but needs server storage." +3. "Going with JWT for microservices. Deciding on token lifetime strategy." +4. "Chose 15-minute access tokens + 7-day refresh tokens for security." +5. "Implemented using jsonwebtoken library, storing refresh tokens in Redis." +6. "Testing complete: login, protected routes, token refresh, logout all working." + +Output: + +{ + "summary": "Session focused on implementing authentication for a web application. Initial consideration was between JWT and session-based authentication approaches. JWT was selected over session-based auth due to its stateless nature and better fit for distributed microservices architecture, despite session-based being simpler to implement. + +For token strategy, decided on a dual-token approach: 15-minute access tokens paired with 7-day refresh tokens to balance security and user experience. This prevents long-lived token exposure while minimizing re-authentication friction. + +Implementation used the jsonwebtoken library for token generation and validation. Refresh tokens are stored in Redis with user association for token revocation capability. Access tokens contain userId and role claims for authorization. Tokens delivered via httpOnly cookies to prevent XSS attacks. + +Testing validated all authentication flows: user login, accessing protected routes with access tokens, automatic token refresh using refresh tokens, and proper logout with token cleanup. System is production-ready with complete authentication lifecycle working correctly.", + "confidence": 0.95 +} + + +## KEY PRINCIPLES + +- Write for an AI agent that needs to help the user in future conversations +- Include technical specifics that might be referenced (library names, configurations, metrics) +- Make outcomes and current state crystal clear in the final paragraph +- Show the reasoning behind decisions, not just the decisions themselves +- Be comprehensive but concise - every sentence should add value +- Each major topic or phase deserves its own paragraph(s) +- Don't compress too much - agents need the details +`; +} + +/** + * User prompt for compaction + */ +function createCompactionUserPrompt( + episodes: SessionEpisodeData[], + existingSummary: string | null +): string { + let prompt = ""; + + if (existingSummary) { + prompt += `## EXISTING SUMMARY (from previous compaction)\n\n${existingSummary}\n\n`; + prompt += `## NEW EPISODES (to merge into existing summary)\n\n`; + } else { + prompt += `## SESSION EPISODES (to compact)\n\n`; + } + + episodes.forEach((episode, index) => { + const timestamp = new Date(episode.validAt).toISOString(); + prompt += `### Episode ${index + 1} (${timestamp})\n`; + prompt += `Source: ${episode.source}\n`; + prompt += `Content:\n${episode.originalContent}\n\n`; + }); + + if (existingSummary) { + prompt += `\n## INSTRUCTIONS\n\n`; + prompt += `Merge the new episodes into the existing summary. Update facts, add new information, and maintain narrative coherence. Ensure the consolidated summary reflects the complete session including both old and new content.\n`; + } else { + prompt += `\n## INSTRUCTIONS\n\n`; + prompt += `Create a compact summary of this entire session. Consolidate all information into a coherent narrative with deduplicated key facts.\n`; + } + + return prompt; +} + +/** + * Parse LLM response for compaction + */ +function parseCompactionResponse(response: string): z.infer { + try { + // Extract content from tags + const outputMatch = response.match(/([\s\S]*?)<\/output>/); + if (!outputMatch) { + logger.warn("No tags found in LLM compaction response"); + logger.debug("Full LLM response:", { response }); + throw new Error("Invalid LLM response format - missing tags"); + } + + let jsonContent = outputMatch[1].trim(); + + // Remove markdown code blocks if present + jsonContent = jsonContent.replace(/```json\n?/g, "").replace(/```\n?/g, ""); + + const parsed = JSON.parse(jsonContent); + + // Validate with schema + const validated = CompactionResultSchema.parse(parsed); + + return validated; + } catch (error) { + logger.error("Failed to parse compaction response", { + error: error instanceof Error ? error.message : String(error), + response: response.substring(0, 500), + }); + throw new Error(`Failed to parse compaction response: ${error}`); + } +} + +/** + * Helper function to check if compaction should be triggered + */ +export async function shouldTriggerCompaction( + sessionId: string, + userId: string +): Promise { + const existingCompact = await getCompactedSessionBySessionId(sessionId, userId); + + if (!existingCompact) { + // Check if we have enough episodes for initial compaction + const episodes = await getSessionEpisodes(sessionId, userId); + return episodes.length >= CONFIG.minEpisodesForCompaction; + } + + // Check if we have enough new episodes to update + const newEpisodes = await getSessionEpisodes(sessionId, userId, existingCompact.endTime); + return newEpisodes.length >= CONFIG.compactionThreshold; +} + +/** + * Trigger compaction for a session + */ +export async function triggerSessionCompaction(payload: SessionCompactionPayload) { + return await sessionCompactionTask.trigger(payload); +}