This commit is contained in:
Harshith Mullapudi 2025-10-29 23:33:33 +05:30
parent bc2deea998
commit a57996a510
13 changed files with 40 additions and 2409 deletions

View File

@ -66,7 +66,6 @@ export async function initWorkers(): Promise<void> {
queue: conversationTitleQueue, queue: conversationTitleQueue,
name: "conversation-title", name: "conversation-title",
}, },
{ {
worker: sessionCompactionWorker, worker: sessionCompactionWorker,
queue: sessionCompactionQueue, queue: sessionCompactionQueue,

View File

@ -152,15 +152,12 @@ export async function processTopicAnalysis(
// Create new space (creates in both PostgreSQL and Neo4j) // Create new space (creates in both PostgreSQL and Neo4j)
// Skip automatic space assignment since we're manually assigning from BERT topics // Skip automatic space assignment since we're manually assigning from BERT topics
const spaceService = new SpaceService(); const spaceService = new SpaceService();
const newSpace = await spaceService.createSpace( const newSpace = await spaceService.createSpace({
{ name: proposal.name,
name: proposal.name, description: proposal.intent,
description: proposal.intent, userId,
userId, workspaceId,
workspaceId, });
},
{ skipAutoAssignment: true },
);
spaceId = newSpace.id; spaceId = newSpace.id;
logger.info("[BERT Topic Analysis] Created new space", { logger.info("[BERT Topic Analysis] Created new space", {
spaceName: proposal.name, spaceName: proposal.name,

View File

@ -29,12 +29,6 @@ Exclude:
Anything not explicitly consented to share Anything not explicitly consented to share
don't store anything the user did not explicitly consent to share.`; don't store anything the user did not explicitly consent to share.`;
const githubDescription = `Everything related to my GitHub work - repos I'm working on, projects I contribute to, code I'm writing, PRs I'm reviewing. Basically my coding life on GitHub.`;
const healthDescription = `My health and wellness stuff - how I'm feeling, what I'm learning about my body, experiments I'm trying, patterns I notice. Whatever matters to me about staying healthy.`;
const fitnessDescription = `My workouts and training - what I'm doing at the gym, runs I'm going on, progress I'm making, goals I'm chasing. Anything related to physical exercise and getting stronger.`;
export async function createWorkspace( export async function createWorkspace(
input: CreateWorkspaceDto, input: CreateWorkspaceDto,
): Promise<Workspace> { ): Promise<Workspace> {
@ -56,32 +50,7 @@ export async function createWorkspace(
await ensureBillingInitialized(workspace.id); await ensureBillingInitialized(workspace.id);
// Create default spaces // Create default spaces
await Promise.all([ await Promise.all([]);
spaceService.createSpace({
name: "Profile",
description: profileRule,
userId: input.userId,
workspaceId: workspace.id,
}),
spaceService.createSpace({
name: "GitHub",
description: githubDescription,
userId: input.userId,
workspaceId: workspace.id,
}),
spaceService.createSpace({
name: "Health",
description: healthDescription,
userId: input.userId,
workspaceId: workspace.id,
}),
spaceService.createSpace({
name: "Fitness",
description: fitnessDescription,
userId: input.userId,
workspaceId: workspace.id,
}),
]);
try { try {
const response = await sendEmail({ email: "welcome", to: user.email }); const response = await sendEmail({ email: "welcome", to: user.email });

View File

@ -7,7 +7,10 @@ import { SpaceService } from "~/services/space.server";
import { json } from "@remix-run/node"; import { json } from "@remix-run/node";
import { prisma } from "~/db.server"; import { prisma } from "~/db.server";
import { apiCors } from "~/utils/apiCors"; import { apiCors } from "~/utils/apiCors";
import { isTriggerDeployment } from "~/lib/queue-adapter.server"; import {
enqueueSpaceAssignment,
isTriggerDeployment,
} from "~/lib/queue-adapter.server";
const spaceService = new SpaceService(); const spaceService = new SpaceService();
@ -74,6 +77,14 @@ const { action } = createHybridActionApiRoute(
workspaceId: user.Workspace.id, workspaceId: user.Workspace.id,
}); });
await enqueueSpaceAssignment({
userId: user.id,
workspaceId: user.Workspace.id,
mode: "new_space",
newSpaceId: space.id,
batchSize: 25, // Analyze recent statements for the new space
});
return json({ space, success: true }); return json({ space, success: true });
} }

View File

@ -1,262 +0,0 @@
import { logger } from "~/services/logger.service";
import {
getCompactedSessionBySessionId,
getCompactionStats,
getSessionEpisodes,
type CompactedSessionNode,
} from "~/services/graphModels/compactedSession";
import { enqueueSessionCompaction } from "~/lib/queue-adapter.server";
/**
* Configuration for session compaction
*/
export const COMPACTION_CONFIG = {
minEpisodesForCompaction: 5, // Minimum episodes to trigger initial compaction
compactionThreshold: 1, // Trigger update after N new episodes
autoCompactionEnabled: true, // Enable automatic compaction
};
/**
* SessionCompactionService - Manages session compaction lifecycle
*/
export class SessionCompactionService {
/**
* Check if a session should be compacted
*/
async shouldCompact(sessionId: string, userId: string): Promise<{
shouldCompact: boolean;
reason: string;
episodeCount?: number;
newEpisodeCount?: number;
}> {
try {
// Get existing compact
const existingCompact = await getCompactedSessionBySessionId(sessionId, userId);
if (!existingCompact) {
// No compact exists, check if we have enough episodes
const episodeCount = await this.getSessionEpisodeCount(sessionId, userId);
if (episodeCount >= COMPACTION_CONFIG.minEpisodesForCompaction) {
return {
shouldCompact: true,
reason: "initial_compaction",
episodeCount,
};
}
return {
shouldCompact: false,
reason: "insufficient_episodes",
episodeCount,
};
}
// Compact exists, check if we have enough new episodes
const newEpisodeCount = await this.getNewEpisodeCount(
sessionId,
userId,
existingCompact.endTime
);
if (newEpisodeCount >= COMPACTION_CONFIG.compactionThreshold) {
return {
shouldCompact: true,
reason: "update_compaction",
newEpisodeCount,
};
}
return {
shouldCompact: false,
reason: "insufficient_new_episodes",
newEpisodeCount,
};
} catch (error) {
logger.error(`Error checking if session should compact`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
shouldCompact: false,
reason: "error",
};
}
}
/**
* Get total episode count for a session
*/
private async getSessionEpisodeCount(
sessionId: string,
userId: string
): Promise<number> {
const episodes = await getSessionEpisodes(sessionId, userId);
return episodes.length;
}
/**
* Get count of new episodes since last compaction
*/
private async getNewEpisodeCount(
sessionId: string,
userId: string,
afterTime: Date
): Promise<number> {
const episodes = await getSessionEpisodes(sessionId, userId, afterTime);
return episodes.length;
}
/**
* Trigger compaction for a session
*/
async triggerCompaction(
sessionId: string,
userId: string,
source: string,
triggerSource: "auto" | "manual" | "threshold" = "auto"
): Promise<{ success: boolean; taskId?: string; error?: string }> {
try {
// Check if compaction should be triggered
const check = await this.shouldCompact(sessionId, userId);
if (!check.shouldCompact) {
logger.info(`Compaction not needed`, {
sessionId,
userId,
reason: check.reason,
});
return {
success: false,
error: `Compaction not needed: ${check.reason}`,
};
}
// Trigger the compaction task
logger.info(`Triggering session compaction`, {
sessionId,
userId,
source,
triggerSource,
reason: check.reason,
});
const handle = await enqueueSessionCompaction({
userId,
sessionId,
source,
triggerSource,
});
logger.info(`Session compaction triggered`, {
sessionId,
userId,
taskId: handle.id,
});
return {
success: true,
taskId: handle.id,
};
} catch (error) {
logger.error(`Failed to trigger compaction`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
success: false,
error: error instanceof Error ? error.message : "Unknown error",
};
}
}
/**
* Get compacted session for recall
*/
async getCompactForRecall(
sessionId: string,
userId: string
): Promise<CompactedSessionNode | null> {
try {
return await getCompactedSessionBySessionId(sessionId, userId);
} catch (error) {
logger.error(`Error fetching compact for recall`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return null;
}
}
/**
* Get compaction statistics for a user
*/
async getStats(userId: string): Promise<{
totalCompacts: number;
totalEpisodes: number;
averageCompressionRatio: number;
mostRecentCompaction: Date | null;
}> {
try {
return await getCompactionStats(userId);
} catch (error) {
logger.error(`Error fetching compaction stats`, {
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
totalCompacts: 0,
totalEpisodes: 0,
averageCompressionRatio: 0,
mostRecentCompaction: null,
};
}
}
/**
* Auto-trigger compaction after episode ingestion
* Called from ingestion pipeline
*/
async autoTriggerAfterIngestion(
sessionId: string | null | undefined,
userId: string,
source: string
): Promise<void> {
// Skip if no sessionId or auto-compaction disabled
if (!sessionId || !COMPACTION_CONFIG.autoCompactionEnabled) {
return;
}
try {
const check = await this.shouldCompact(sessionId, userId);
if (check.shouldCompact) {
logger.info(`Auto-triggering compaction after ingestion`, {
sessionId,
userId,
reason: check.reason,
});
// Trigger compaction asynchronously (don't wait)
await this.triggerCompaction(sessionId, userId, source, "auto");
}
} catch (error) {
// Log error but don't fail ingestion
logger.error(`Error in auto-trigger compaction`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
}
}
}
// Singleton instance
export const sessionCompactionService = new SessionCompactionService();

View File

@ -17,16 +17,12 @@ import {
} from "./graphModels/space"; } from "./graphModels/space";
import { prisma } from "~/trigger/utils/prisma"; import { prisma } from "~/trigger/utils/prisma";
import { trackFeatureUsage } from "./telemetry.server"; import { trackFeatureUsage } from "./telemetry.server";
import { enqueueSpaceAssignment } from "~/lib/queue-adapter.server";
export class SpaceService { export class SpaceService {
/** /**
* Create a new space for a user * Create a new space for a user
*/ */
async createSpace( async createSpace(params: CreateSpaceParams): Promise<Space> {
params: CreateSpaceParams,
options?: { skipAutoAssignment?: boolean },
): Promise<Space> {
logger.info(`Creating space "${params.name}" for user ${params.userId}`); logger.info(`Creating space "${params.name}" for user ${params.userId}`);
// Validate input // Validate input
@ -70,27 +66,6 @@ export class SpaceService {
// Track space creation // Track space creation
trackFeatureUsage("space_created", params.userId).catch(console.error); trackFeatureUsage("space_created", params.userId).catch(console.error);
// Trigger automatic LLM assignment for the new space (unless skipped)
if (!options?.skipAutoAssignment) {
try {
await enqueueSpaceAssignment({
userId: params.userId,
workspaceId: params.workspaceId,
mode: "new_space",
newSpaceId: space.id,
batchSize: 25, // Analyze recent statements for the new space
});
logger.info(`Triggered LLM space assignment for new space ${space.id}`);
} catch (error) {
// Don't fail space creation if LLM assignment fails
logger.warn(
`Failed to trigger LLM assignment for space ${space.id}:`,
error as Record<string, unknown>,
);
}
}
return space; return space;
} }

View File

@ -14,13 +14,15 @@ async function runBertWithTriggerPython(
minTopicSize: number, minTopicSize: number,
nrTopics?: number, nrTopics?: number,
): Promise<string> { ): Promise<string> {
const args = [userId, "--json", "--min-topic-size", String(minTopicSize)]; const args = [userId, "--json"];
if (nrTopics) { if (nrTopics) {
args.push("--nr-topics", String(nrTopics)); args.push("--nr-topics", String(nrTopics));
} }
console.log(`[BERT Topic Analysis] Running with Trigger.dev Python: args=${args.join(" ")}`); console.log(
`[BERT Topic Analysis] Running with Trigger.dev Python: args=${args.join(" ")}`,
);
const result = await python.runScript("./apps/webapp/app/bert/main.py", args); const result = await python.runScript("./apps/webapp/app/bert/main.py", args);
return result.stdout; return result.stdout;

View File

@ -80,7 +80,7 @@
"@tiptap/pm": "^2.11.9", "@tiptap/pm": "^2.11.9",
"@tiptap/react": "^2.11.9", "@tiptap/react": "^2.11.9",
"@tiptap/starter-kit": "2.11.9", "@tiptap/starter-kit": "2.11.9",
"@trigger.dev/python": "^4.0.5", "@trigger.dev/python": "4.0.4",
"@trigger.dev/react-hooks": "4.0.4", "@trigger.dev/react-hooks": "4.0.4",
"@trigger.dev/sdk": "4.0.4", "@trigger.dev/sdk": "4.0.4",
"ai": "5.0.78", "ai": "5.0.78",

View File

@ -4,7 +4,7 @@ import { prismaExtension } from "@trigger.dev/build/extensions/prisma";
import { pythonExtension } from "@trigger.dev/python/extension"; import { pythonExtension } from "@trigger.dev/python/extension";
export default defineConfig({ export default defineConfig({
project: process.env.TRIGGER_PROJECT_ID as string, project: "proj_dtctdgjvszcisssppudu",
runtime: "node", runtime: "node",
logLevel: "log", logLevel: "log",
// The max compute seconds a task is allowed to run. If the task run exceeds this duration, it will be stopped. // The max compute seconds a task is allowed to run. If the task run exceeds this duration, it will be stopped.
@ -24,6 +24,9 @@ export default defineConfig({
dirs: ["./app/trigger"], dirs: ["./app/trigger"],
build: { build: {
extensions: [ extensions: [
pythonExtension({
scripts: ["./python/*.py"],
}),
syncEnvVars(() => ({ syncEnvVars(() => ({
// ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY as string, // ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY as string,
// API_BASE_URL: process.env.API_BASE_URL as string, // API_BASE_URL: process.env.API_BASE_URL as string,
@ -39,10 +42,6 @@ export default defineConfig({
prismaExtension({ prismaExtension({
schema: "prisma/schema.prisma", schema: "prisma/schema.prisma",
}), }),
pythonExtension({
scripts: ["./app/bert/**/*.py"],
requirementsFile: "./app/bert/requirements.txt",
}),
], ],
}, },
}); });

2081
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff