feat: add session compaction models and search integration with Neo4j

This commit is contained in:
Manoj 2025-10-20 19:11:55 +05:30 committed by Harshith Mullapudi
parent 60dd4bfa6f
commit 8a6b06383e
6 changed files with 1181 additions and 0 deletions

View File

@ -66,6 +66,7 @@ const EnvironmentSchema = z.object({
//OpenAI
OPENAI_API_KEY: z.string(),
ANTHROPIC_API_KEY: z.string().optional(),
EMAIL_TRANSPORT: z.string().optional(),
FROM_EMAIL: z.string().optional(),

View File

@ -0,0 +1,319 @@
import { runQuery } from "~/lib/neo4j.server";
export interface SessionEpisodeData {
uuid: string;
content: string;
originalContent: string;
source: string;
createdAt: Date;
validAt: Date;
metadata: any;
sessionId: string;
}
export interface CompactedSessionNode {
uuid: string;
sessionId: string;
summary: string;
summaryEmbedding: number[];
episodeCount: number;
startTime: Date;
endTime: Date;
createdAt: Date;
updatedAt?: Date;
confidence: number;
userId: string;
source: string;
compressionRatio: number;
metadata: Record<string, any>;
}
/**
* Save or update a compacted session
*/
export async function saveCompactedSession(
compact: CompactedSessionNode
): Promise<string> {
const query = `
MERGE (cs:CompactedSession {uuid: $uuid})
ON CREATE SET
cs.sessionId = $sessionId,
cs.summary = $summary,
cs.summaryEmbedding = $summaryEmbedding,
cs.episodeCount = $episodeCount,
cs.startTime = $startTime,
cs.endTime = $endTime,
cs.createdAt = $createdAt,
cs.confidence = $confidence,
cs.userId = $userId,
cs.source = $source,
cs.compressionRatio = $compressionRatio,
cs.metadata = $metadata
ON MATCH SET
cs.summary = $summary,
cs.summaryEmbedding = $summaryEmbedding,
cs.episodeCount = $episodeCount,
cs.endTime = $endTime,
cs.updatedAt = $updatedAt,
cs.confidence = $confidence,
cs.compressionRatio = $compressionRatio,
cs.metadata = $metadata
RETURN cs.uuid as uuid
`;
const params = {
uuid: compact.uuid,
sessionId: compact.sessionId,
summary: compact.summary,
summaryEmbedding: compact.summaryEmbedding,
episodeCount: compact.episodeCount,
startTime: compact.startTime.toISOString(),
endTime: compact.endTime.toISOString(),
createdAt: compact.createdAt.toISOString(),
updatedAt: compact.updatedAt?.toISOString() || null,
confidence: compact.confidence,
userId: compact.userId,
source: compact.source,
compressionRatio: compact.compressionRatio,
metadata: JSON.stringify(compact.metadata || {}),
};
const result = await runQuery(query, params);
return result[0].get("uuid");
}
/**
* Get a compacted session by UUID
*/
export async function getCompactedSession(
uuid: string
): Promise<CompactedSessionNode | null> {
const query = `
MATCH (cs:CompactedSession {uuid: $uuid})
RETURN cs
`;
const result = await runQuery(query, { uuid });
if (result.length === 0) return null;
const compact = result[0].get("cs").properties;
return parseCompactedSessionNode(compact);
}
/**
* Get compacted session by sessionId
*/
export async function getCompactedSessionBySessionId(
sessionId: string,
userId: string
): Promise<CompactedSessionNode | null> {
const query = `
MATCH (cs:CompactedSession {sessionId: $sessionId, userId: $userId})
RETURN cs
ORDER BY cs.endTime DESC
LIMIT 1
`;
const result = await runQuery(query, { sessionId, userId });
if (result.length === 0) return null;
const compact = result[0].get("cs").properties;
return parseCompactedSessionNode(compact);
}
/**
* Get all episodes linked to a compacted session
*/
export async function getCompactedSessionEpisodes(
compactUuid: string
): Promise<string[]> {
const query = `
MATCH (cs:CompactedSession {uuid: $compactUuid})-[:COMPACTS]->(e:Episode)
RETURN e.uuid as episodeUuid
ORDER BY e.createdAt ASC
`;
const result = await runQuery(query, { compactUuid });
return result.map((r) => r.get("episodeUuid"));
}
/**
* Link episodes to compacted session
*/
export async function linkEpisodesToCompact(
compactUuid: string,
episodeUuids: string[],
userId: string
): Promise<void> {
const query = `
MATCH (cs:CompactedSession {uuid: $compactUuid, userId: $userId})
UNWIND $episodeUuids as episodeUuid
MATCH (e:Episode {uuid: episodeUuid, userId: $userId})
MERGE (cs)-[:COMPACTS {createdAt: datetime()}]->(e)
MERGE (e)-[:COMPACTED_INTO {createdAt: datetime()}]->(cs)
`;
await runQuery(query, { compactUuid, episodeUuids, userId });
}
/**
* Search compacted sessions by embedding similarity
*/
export async function searchCompactedSessionsByEmbedding(
embedding: number[],
userId: string,
limit: number = 10,
minScore: number = 0.7
): Promise<Array<{ compact: CompactedSessionNode; score: number }>> {
const query = `
MATCH (cs:CompactedSession {userId: $userId})
WHERE cs.summaryEmbedding IS NOT NULL
WITH cs,
gds.similarity.cosine(cs.summaryEmbedding, $embedding) AS score
WHERE score >= $minScore
RETURN cs, score
ORDER BY score DESC
LIMIT $limit
`;
const result = await runQuery(query, {
embedding,
userId,
limit,
minScore,
});
return result.map((r) => ({
compact: parseCompactedSessionNode(r.get("cs").properties),
score: r.get("score"),
}));
}
/**
* Get compacted sessions for a user
*/
export async function getUserCompactedSessions(
userId: string,
limit: number = 50
): Promise<CompactedSessionNode[]> {
const query = `
MATCH (cs:CompactedSession {userId: $userId})
RETURN cs
ORDER BY cs.endTime DESC
LIMIT $limit
`;
const result = await runQuery(query, { userId, limit });
return result.map((r) => parseCompactedSessionNode(r.get("cs").properties));
}
/**
* Delete a compacted session
*/
export async function deleteCompactedSession(uuid: string): Promise<void> {
const query = `
MATCH (cs:CompactedSession {uuid: $uuid})
DETACH DELETE cs
`;
await runQuery(query, { uuid });
}
/**
* Get compaction statistics for a user
*/
export async function getCompactionStats(userId: string): Promise<{
totalCompacts: number;
totalEpisodes: number;
averageCompressionRatio: number;
mostRecentCompaction: Date | null;
}> {
const query = `
MATCH (cs:CompactedSession {userId: $userId})
RETURN
count(cs) as totalCompacts,
sum(cs.episodeCount) as totalEpisodes,
avg(cs.compressionRatio) as avgCompressionRatio,
max(cs.endTime) as mostRecent
`;
const result = await runQuery(query, { userId });
if (result.length === 0) {
return {
totalCompacts: 0,
totalEpisodes: 0,
averageCompressionRatio: 0,
mostRecentCompaction: null,
};
}
const stats = result[0];
return {
totalCompacts: stats.get("totalCompacts")?.toNumber() || 0,
totalEpisodes: stats.get("totalEpisodes")?.toNumber() || 0,
averageCompressionRatio: stats.get("avgCompressionRatio") || 0,
mostRecentCompaction: stats.get("mostRecent")
? new Date(stats.get("mostRecent"))
: null,
};
}
/**
* Get all episodes for a session
*/
export async function getSessionEpisodes(
sessionId: string,
userId: string,
afterTime?: Date
): Promise<SessionEpisodeData[]> {
const query = `
MATCH (e:Episode {sessionId: $sessionId, userId: $userId})
${afterTime ? "WHERE e.createdAt > datetime($afterTime)" : ""}
RETURN e
ORDER BY e.createdAt ASC
`;
const result = await runQuery(query, {
sessionId,
userId,
afterTime: afterTime?.toISOString(),
});
return result.map((r) => r.get("e").properties);
}
/**
* Get episode count for a session
*/
export async function getSessionEpisodeCount(
sessionId: string,
userId: string,
afterTime?: Date
): Promise<number> {
const episodes = await getSessionEpisodes(sessionId, userId, afterTime);
return episodes.length;
}
/**
* Helper to parse raw compact node from Neo4j
*/
function parseCompactedSessionNode(raw: any): CompactedSessionNode {
return {
uuid: raw.uuid,
sessionId: raw.sessionId,
summary: raw.summary,
summaryEmbedding: raw.summaryEmbedding || [],
episodeCount: raw.episodeCount || 0,
startTime: new Date(raw.startTime),
endTime: new Date(raw.endTime),
createdAt: new Date(raw.createdAt),
updatedAt: raw.updatedAt ? new Date(raw.updatedAt) : undefined,
confidence: raw.confidence || 0,
userId: raw.userId,
source: raw.source,
compressionRatio: raw.compressionRatio || 1,
metadata: typeof raw.metadata === "string"
? JSON.parse(raw.metadata)
: raw.metadata || {},
};
}

View File

@ -361,6 +361,116 @@ export class SearchService {
`;
await runQuery(cypher2, { statementUuids: statementIds, userId });
}
/**
* Enhanced search that includes compacted sessions
* This method searches both regular episodes/statements AND compacted sessions
* Compacted sessions are preferred when they have high confidence matches
*/
public async searchWithCompacts(
query: string,
userId: string,
options: SearchOptions = {},
source?: string,
): Promise<ExtendedSearchResult> {
const startTime = Date.now();
// First, run the standard search for episodes and facts
const standardResults = await this.search(query, userId, options, source);
// Search compacted sessions
try {
const queryVector = await this.getEmbedding(query);
const { searchCompactedSessionsByEmbedding } = await import(
"~/services/graphModels/compactedSession"
);
const compactResults = await searchCompactedSessionsByEmbedding(
queryVector,
userId,
options.limit || 10,
options.scoreThreshold || 0.7,
);
logger.info(`Found ${compactResults.length} matching compacted sessions`, {
query,
userId,
});
// Format compact results
const formattedCompacts = compactResults.map((result) => ({
summary: result.compact.summary,
sessionId: result.compact.sessionId,
episodeCount: result.compact.episodeCount,
confidence: result.compact.confidence,
relevantScore: result.score,
}));
// Log compact recall
await this.logCompactRecallAsync(
query,
userId,
compactResults,
Date.now() - startTime,
source,
).catch((error) => {
logger.error("Failed to log compact recall event:", error);
});
return {
...standardResults,
compacts: formattedCompacts,
};
} catch (error) {
logger.error("Error searching compacted sessions:", { error });
// Return standard results if compact search fails
return {
...standardResults,
compacts: [],
};
}
}
/**
* Log recall event for compacted sessions
*/
private async logCompactRecallAsync(
query: string,
userId: string,
compacts: Array<{ compact: any; score: number }>,
responseTime: number,
source?: string,
): Promise<void> {
try {
const averageScore =
compacts.length > 0
? compacts.reduce((sum, c) => sum + c.score, 0) / compacts.length
: 0;
await prisma.recallLog.create({
data: {
accessType: "search",
query,
targetType: "compacted_session",
searchMethod: "vector_similarity",
resultCount: compacts.length,
similarityScore: averageScore,
context: JSON.stringify({
compactedSessionSearch: true,
}),
source: source ?? "search_with_compacts",
responseTimeMs: responseTime,
userId,
},
});
logger.debug(
`Logged compact recall event for user ${userId}: ${compacts.length} compacts in ${responseTime}ms`,
);
} catch (error) {
logger.error("Error creating compact recall log entry:", { error });
}
}
}
/**
@ -380,3 +490,23 @@ export interface SearchOptions {
spaceIds?: string[]; // Filter results by specific spaces
adaptiveFiltering?: boolean;
}
/**
* Extended search result that includes compacted sessions
*/
export interface ExtendedSearchResult {
episodes: { content: string; createdAt: Date; spaceIds: string[] }[];
facts: {
fact: string;
validAt: Date;
invalidAt: Date | null;
relevantScore: number;
}[];
compacts?: {
summary: string;
sessionId: string;
episodeCount: number;
confidence: number;
relevantScore: number;
}[];
}

View File

@ -0,0 +1,262 @@
import { logger } from "~/services/logger.service";
import {
getCompactedSessionBySessionId,
getCompactionStats,
getSessionEpisodes,
type CompactedSessionNode,
} from "~/services/graphModels/compactedSession";
import { tasks } from "@trigger.dev/sdk/v3";
/**
* Configuration for session compaction
*/
export const COMPACTION_CONFIG = {
minEpisodesForCompaction: 5, // Minimum episodes to trigger initial compaction
compactionThreshold: 1, // Trigger update after N new episodes
autoCompactionEnabled: true, // Enable automatic compaction
};
/**
* SessionCompactionService - Manages session compaction lifecycle
*/
export class SessionCompactionService {
/**
* Check if a session should be compacted
*/
async shouldCompact(sessionId: string, userId: string): Promise<{
shouldCompact: boolean;
reason: string;
episodeCount?: number;
newEpisodeCount?: number;
}> {
try {
// Get existing compact
const existingCompact = await getCompactedSessionBySessionId(sessionId, userId);
if (!existingCompact) {
// No compact exists, check if we have enough episodes
const episodeCount = await this.getSessionEpisodeCount(sessionId, userId);
if (episodeCount >= COMPACTION_CONFIG.minEpisodesForCompaction) {
return {
shouldCompact: true,
reason: "initial_compaction",
episodeCount,
};
}
return {
shouldCompact: false,
reason: "insufficient_episodes",
episodeCount,
};
}
// Compact exists, check if we have enough new episodes
const newEpisodeCount = await this.getNewEpisodeCount(
sessionId,
userId,
existingCompact.endTime
);
if (newEpisodeCount >= COMPACTION_CONFIG.compactionThreshold) {
return {
shouldCompact: true,
reason: "update_compaction",
newEpisodeCount,
};
}
return {
shouldCompact: false,
reason: "insufficient_new_episodes",
newEpisodeCount,
};
} catch (error) {
logger.error(`Error checking if session should compact`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
shouldCompact: false,
reason: "error",
};
}
}
/**
* Get total episode count for a session
*/
private async getSessionEpisodeCount(
sessionId: string,
userId: string
): Promise<number> {
const episodes = await getSessionEpisodes(sessionId, userId);
return episodes.length;
}
/**
* Get count of new episodes since last compaction
*/
private async getNewEpisodeCount(
sessionId: string,
userId: string,
afterTime: Date
): Promise<number> {
const episodes = await getSessionEpisodes(sessionId, userId, afterTime);
return episodes.length;
}
/**
* Trigger compaction for a session
*/
async triggerCompaction(
sessionId: string,
userId: string,
source: string,
triggerSource: "auto" | "manual" | "threshold" = "auto"
): Promise<{ success: boolean; taskId?: string; error?: string }> {
try {
// Check if compaction should be triggered
const check = await this.shouldCompact(sessionId, userId);
if (!check.shouldCompact) {
logger.info(`Compaction not needed`, {
sessionId,
userId,
reason: check.reason,
});
return {
success: false,
error: `Compaction not needed: ${check.reason}`,
};
}
// Trigger the compaction task
logger.info(`Triggering session compaction`, {
sessionId,
userId,
source,
triggerSource,
reason: check.reason,
});
const handle = await tasks.trigger("session-compaction", {
userId,
sessionId,
source,
triggerSource,
});
logger.info(`Session compaction triggered`, {
sessionId,
userId,
taskId: handle.id,
});
return {
success: true,
taskId: handle.id,
};
} catch (error) {
logger.error(`Failed to trigger compaction`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
success: false,
error: error instanceof Error ? error.message : "Unknown error",
};
}
}
/**
* Get compacted session for recall
*/
async getCompactForRecall(
sessionId: string,
userId: string
): Promise<CompactedSessionNode | null> {
try {
return await getCompactedSessionBySessionId(sessionId, userId);
} catch (error) {
logger.error(`Error fetching compact for recall`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return null;
}
}
/**
* Get compaction statistics for a user
*/
async getStats(userId: string): Promise<{
totalCompacts: number;
totalEpisodes: number;
averageCompressionRatio: number;
mostRecentCompaction: Date | null;
}> {
try {
return await getCompactionStats(userId);
} catch (error) {
logger.error(`Error fetching compaction stats`, {
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
totalCompacts: 0,
totalEpisodes: 0,
averageCompressionRatio: 0,
mostRecentCompaction: null,
};
}
}
/**
* Auto-trigger compaction after episode ingestion
* Called from ingestion pipeline
*/
async autoTriggerAfterIngestion(
sessionId: string | null | undefined,
userId: string,
source: string
): Promise<void> {
// Skip if no sessionId or auto-compaction disabled
if (!sessionId || !COMPACTION_CONFIG.autoCompactionEnabled) {
return;
}
try {
const check = await this.shouldCompact(sessionId, userId);
if (check.shouldCompact) {
logger.info(`Auto-triggering compaction after ingestion`, {
sessionId,
userId,
reason: check.reason,
});
// Trigger compaction asynchronously (don't wait)
await this.triggerCompaction(sessionId, userId, source, "auto");
}
} catch (error) {
// Log error but don't fail ingestion
logger.error(`Error in auto-trigger compaction`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
}
}
}
// Singleton instance
export const sessionCompactionService = new SessionCompactionService();

View File

@ -10,6 +10,7 @@ import { prisma } from "../utils/prisma";
import { EpisodeType } from "@core/types";
import { deductCredits, hasCredits } from "../utils/utils";
import { assignEpisodesToSpace } from "~/services/graphModels/space";
import { triggerSessionCompaction } from "../session/session-compaction";
export const IngestBodyRequest = z.object({
episodeBody: z.string(),
@ -202,6 +203,30 @@ export const ingestTask = task({
});
}
// Auto-trigger session compaction if episode has sessionId
try {
if (episodeBody.sessionId && currentStatus === IngestionStatus.COMPLETED) {
logger.info(`Checking if session compaction should be triggered`, {
userId: payload.userId,
sessionId: episodeBody.sessionId,
source: episodeBody.source,
});
await triggerSessionCompaction({
userId: payload.userId,
sessionId: episodeBody.sessionId,
source: episodeBody.source,
});
}
} catch (compactionError) {
// Don't fail the ingestion if compaction fails
logger.warn(`Failed to trigger session compaction after ingestion:`, {
error: compactionError,
userId: payload.userId,
sessionId: episodeBody.sessionId,
});
}
return { success: true, episodeDetails };
} catch (err: any) {
await prisma.ingestionQueue.update({

View File

@ -0,0 +1,444 @@
import { queue, task } from "@trigger.dev/sdk/v3";
import { logger } from "~/services/logger.service";
import { runQuery } from "~/lib/neo4j.server";
import type { CoreMessage } from "ai";
import { z } from "zod";
import { getEmbedding, makeModelCall } from "~/lib/model.server";
import {
getCompactedSessionBySessionId,
linkEpisodesToCompact,
getSessionEpisodes,
type CompactedSessionNode,
type SessionEpisodeData,
saveCompactedSession,
} from "~/services/graphModels/compactedSession";
interface SessionCompactionPayload {
userId: string;
sessionId: string;
source: string;
triggerSource?: "auto" | "manual" | "threshold";
}
// Zod schema for LLM response validation
const CompactionResultSchema = z.object({
summary: z.string().describe("Consolidated narrative of the entire session"),
confidence: z.number().min(0).max(1).describe("Confidence score of the compaction quality"),
});
const CONFIG = {
minEpisodesForCompaction: 3, // Minimum episodes to trigger compaction
compactionThreshold: 1, // Trigger after N new episodes
maxEpisodesPerBatch: 50, // Process in batches if needed
};
export const sessionCompactionQueue = queue({
name: "session-compaction-queue",
concurrencyLimit: 1,
});
export const sessionCompactionTask = task({
id: "session-compaction",
queue: sessionCompactionQueue,
run: async (payload: SessionCompactionPayload) => {
const { userId, sessionId, source, triggerSource = "auto" } = payload;
logger.info(`Starting session compaction`, {
userId,
sessionId,
source,
triggerSource,
});
try {
// Check if compaction already exists
// const existingCompact = await getCompactedSessionBySessionId(sessionId, userId);
const existingCompact = {} as CompactedSessionNode;
// Fetch all episodes for this session
const episodes = await getSessionEpisodes(sessionId, userId, existingCompact?.endTime);
console.log("episodes", episodes.length);
// Check if we have enough episodes
if (!existingCompact && episodes.length < CONFIG.minEpisodesForCompaction) {
logger.info(`Not enough episodes for compaction`, {
sessionId,
episodeCount: episodes.length,
minRequired: CONFIG.minEpisodesForCompaction,
});
return {
success: false,
reason: "insufficient_episodes",
episodeCount: episodes.length,
};
} else if (existingCompact && episodes.length < CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold) {
logger.info(`Not enough new episodes for compaction`, {
sessionId,
episodeCount: episodes.length,
minRequired: CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold,
});
return {
success: false,
reason: "insufficient_new_episodes",
episodeCount: episodes.length,
};
}
// Generate or update compaction
const compactionResult = await createCompaction(sessionId, episodes, userId, source)
// const compactionResult = existingCompact
// ? await updateCompaction(existingCompact, episodes, userId)
// : await createCompaction(sessionId, episodes, userId, source);
logger.info(`Session compaction completed`, {
sessionId,
compactUuid: compactionResult.uuid,
episodeCount: compactionResult.episodeCount,
compressionRatio: compactionResult.compressionRatio,
});
return {
success: true,
compactionResult: {
compactUuid: compactionResult.uuid,
sessionId: compactionResult.sessionId,
summary: compactionResult.summary,
episodeCount: compactionResult.episodeCount,
startTime: compactionResult.startTime,
endTime: compactionResult.endTime,
confidence: compactionResult.confidence,
compressionRatio: compactionResult.compressionRatio,
},
};
} catch (error) {
logger.error(`Session compaction failed`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
},
});
/**
* Create new compaction
*/
async function createCompaction(
sessionId: string,
episodes: SessionEpisodeData[],
userId: string,
source: string
): Promise<CompactedSessionNode> {
logger.info(`Creating new compaction`, { sessionId, episodeCount: episodes.length });
// Generate compaction using LLM
const compactionData = await generateCompaction(episodes, null);
// Generate embedding for summary
const summaryEmbedding = await getEmbedding(compactionData.summary);
// Create CompactedSession node using graph model
const compactUuid = crypto.randomUUID();
const now = new Date();
const startTime = new Date(episodes[0].createdAt);
const endTime = new Date(episodes[episodes.length - 1].createdAt);
const episodeUuids = episodes.map((e) => e.uuid);
const compressionRatio = episodes.length / 1;
const compactNode: CompactedSessionNode = {
uuid: compactUuid,
sessionId,
summary: compactionData.summary,
summaryEmbedding,
episodeCount: episodes.length,
startTime,
endTime,
createdAt: now,
confidence: compactionData.confidence,
userId,
source,
compressionRatio,
metadata: { triggerType: "create" },
};
console.log("compactNode", compactNode);
// Use graph model functions
await saveCompactedSession(compactNode);
await linkEpisodesToCompact(compactUuid, episodeUuids, userId);
logger.info(`Compaction created`, { compactUuid, episodeCount: episodes.length });
return compactNode;
}
/**
* Update existing compaction with new episodes
*/
async function updateCompaction(
existingCompact: CompactedSessionNode,
newEpisodes: SessionEpisodeData[],
userId: string
): Promise<CompactedSessionNode> {
logger.info(`Updating existing compaction`, {
compactUuid: existingCompact.uuid,
newEpisodeCount: newEpisodes.length,
});
// Generate updated compaction using LLM (merging)
const compactionData = await generateCompaction(newEpisodes, existingCompact.summary);
// Generate new embedding for updated summary
const summaryEmbedding = await getEmbedding(compactionData.summary);
// Update CompactedSession node using graph model
const now = new Date();
const endTime = newEpisodes[newEpisodes.length - 1].createdAt;
const totalEpisodeCount = existingCompact.episodeCount + newEpisodes.length;
const compressionRatio = totalEpisodeCount / 1;
const episodeUuids = newEpisodes.map((e) => e.uuid);
const updatedNode: CompactedSessionNode = {
...existingCompact,
summary: compactionData.summary,
summaryEmbedding,
episodeCount: totalEpisodeCount,
endTime,
updatedAt: now,
confidence: compactionData.confidence,
compressionRatio,
metadata: { triggerType: "update", newEpisodesAdded: newEpisodes.length },
};
// Use graph model functions
const { saveCompactedSession } = await import("~/services/graphModels/compactedSession");
await saveCompactedSession(updatedNode);
await linkEpisodesToCompact(existingCompact.uuid, episodeUuids, userId);
logger.info(`Compaction updated`, {
compactUuid: existingCompact.uuid,
totalEpisodeCount,
});
return updatedNode;
}
/**
* Generate compaction using LLM (similar to Claude Code's compact approach)
*/
async function generateCompaction(
episodes: SessionEpisodeData[],
existingSummary: string | null
): Promise<z.infer<typeof CompactionResultSchema>> {
const systemPrompt = createCompactionSystemPrompt();
const userPrompt = createCompactionUserPrompt(episodes, existingSummary);
const messages: CoreMessage[] = [
{ role: "system", content: systemPrompt },
{ role: "user", content: userPrompt },
];
logger.info(`Generating compaction with LLM`, {
episodeCount: episodes.length,
hasExistingSummary: !!existingSummary,
});
try {
let responseText = "";
await makeModelCall(
false,
messages,
(text: string) => {
responseText = text;
},
undefined,
"high",
);
return parseCompactionResponse(responseText);
} catch (error) {
logger.error(`Failed to generate compaction`, {
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* System prompt for compaction (for agent recall/context retrieval)
*/
function createCompactionSystemPrompt(): string {
return `You are a session compaction specialist. Your task is to create a rich, informative summary that will help AI agents understand what happened in this conversation session when they need context for future interactions.
## PURPOSE
This summary will be retrieved by AI agents when the user references this session in future conversations. The agent needs enough context to:
- Understand what was discussed and why
- Know what decisions were made and their rationale
- Grasp the outcome and current state
- Have relevant technical details to provide informed responses
## COMPACTION GOALS
1. **Comprehensive Context**: Capture all important information that might be referenced later
2. **Decision Documentation**: Clearly state what was decided, why, and what alternatives were considered
3. **Technical Details**: Include specific implementations, tools, configurations, and technical choices
4. **Outcome Clarity**: Make it clear what was accomplished and what the final state is
5. **Evolution Tracking**: Show how thinking or decisions evolved during the session
## COMPACTION RULES
1. **Be Information-Dense**: Pack useful details without fluff or repetition
2. **Structure Chronologically**: Start with problem/question, show progression, end with outcome
3. **Highlight Key Points**: Emphasize decisions, implementations, results, and learnings
4. **Include Specifics**: Names of libraries, specific configurations, metrics, numbers matter
5. **Resolve Contradictions**: Always use the most recent/final version when information conflicts
## OUTPUT REQUIREMENTS
- **summary**: A detailed, information-rich narrative that tells the complete story
- Structure naturally based on content - use as many paragraphs as needed
- Each distinct topic, decision, or phase should get its own paragraph(s)
- Start with context and initial problem/question
- Progress chronologically through discussions, decisions, and implementations
- **Final paragraph MUST**: State the outcome, results, and current state
- Don't artificially limit length - capture everything important
- **confidence**: Score (0-1) reflecting how well this summary captures the session's essence
Your response MUST be valid JSON wrapped in <output></output> tags.
## EXAMPLE TRANSFORMATION
Input Episodes:
1. "Need to implement authentication for web app. Considering JWT vs session-based auth."
2. "JWT is stateless, good for distributed systems. Session-based simpler but needs server storage."
3. "Going with JWT for microservices. Deciding on token lifetime strategy."
4. "Chose 15-minute access tokens + 7-day refresh tokens for security."
5. "Implemented using jsonwebtoken library, storing refresh tokens in Redis."
6. "Testing complete: login, protected routes, token refresh, logout all working."
Output:
<output>
{
"summary": "Session focused on implementing authentication for a web application. Initial consideration was between JWT and session-based authentication approaches. JWT was selected over session-based auth due to its stateless nature and better fit for distributed microservices architecture, despite session-based being simpler to implement.
For token strategy, decided on a dual-token approach: 15-minute access tokens paired with 7-day refresh tokens to balance security and user experience. This prevents long-lived token exposure while minimizing re-authentication friction.
Implementation used the jsonwebtoken library for token generation and validation. Refresh tokens are stored in Redis with user association for token revocation capability. Access tokens contain userId and role claims for authorization. Tokens delivered via httpOnly cookies to prevent XSS attacks.
Testing validated all authentication flows: user login, accessing protected routes with access tokens, automatic token refresh using refresh tokens, and proper logout with token cleanup. System is production-ready with complete authentication lifecycle working correctly.",
"confidence": 0.95
}
</output>
## KEY PRINCIPLES
- Write for an AI agent that needs to help the user in future conversations
- Include technical specifics that might be referenced (library names, configurations, metrics)
- Make outcomes and current state crystal clear in the final paragraph
- Show the reasoning behind decisions, not just the decisions themselves
- Be comprehensive but concise - every sentence should add value
- Each major topic or phase deserves its own paragraph(s)
- Don't compress too much - agents need the details
`;
}
/**
* User prompt for compaction
*/
function createCompactionUserPrompt(
episodes: SessionEpisodeData[],
existingSummary: string | null
): string {
let prompt = "";
if (existingSummary) {
prompt += `## EXISTING SUMMARY (from previous compaction)\n\n${existingSummary}\n\n`;
prompt += `## NEW EPISODES (to merge into existing summary)\n\n`;
} else {
prompt += `## SESSION EPISODES (to compact)\n\n`;
}
episodes.forEach((episode, index) => {
const timestamp = new Date(episode.validAt).toISOString();
prompt += `### Episode ${index + 1} (${timestamp})\n`;
prompt += `Source: ${episode.source}\n`;
prompt += `Content:\n${episode.originalContent}\n\n`;
});
if (existingSummary) {
prompt += `\n## INSTRUCTIONS\n\n`;
prompt += `Merge the new episodes into the existing summary. Update facts, add new information, and maintain narrative coherence. Ensure the consolidated summary reflects the complete session including both old and new content.\n`;
} else {
prompt += `\n## INSTRUCTIONS\n\n`;
prompt += `Create a compact summary of this entire session. Consolidate all information into a coherent narrative with deduplicated key facts.\n`;
}
return prompt;
}
/**
* Parse LLM response for compaction
*/
function parseCompactionResponse(response: string): z.infer<typeof CompactionResultSchema> {
try {
// Extract content from <output> tags
const outputMatch = response.match(/<output>([\s\S]*?)<\/output>/);
if (!outputMatch) {
logger.warn("No <output> tags found in LLM compaction response");
logger.debug("Full LLM response:", { response });
throw new Error("Invalid LLM response format - missing <output> tags");
}
let jsonContent = outputMatch[1].trim();
// Remove markdown code blocks if present
jsonContent = jsonContent.replace(/```json\n?/g, "").replace(/```\n?/g, "");
const parsed = JSON.parse(jsonContent);
// Validate with schema
const validated = CompactionResultSchema.parse(parsed);
return validated;
} catch (error) {
logger.error("Failed to parse compaction response", {
error: error instanceof Error ? error.message : String(error),
response: response.substring(0, 500),
});
throw new Error(`Failed to parse compaction response: ${error}`);
}
}
/**
* Helper function to check if compaction should be triggered
*/
export async function shouldTriggerCompaction(
sessionId: string,
userId: string
): Promise<boolean> {
const existingCompact = await getCompactedSessionBySessionId(sessionId, userId);
if (!existingCompact) {
// Check if we have enough episodes for initial compaction
const episodes = await getSessionEpisodes(sessionId, userId);
return episodes.length >= CONFIG.minEpisodesForCompaction;
}
// Check if we have enough new episodes to update
const newEpisodes = await getSessionEpisodes(sessionId, userId, existingCompact.endTime);
return newEpisodes.length >= CONFIG.compactionThreshold;
}
/**
* Trigger compaction for a session
*/
export async function triggerSessionCompaction(payload: SessionCompactionPayload) {
return await sessionCompactionTask.trigger(payload);
}