From 2030cebdc0200a1d7845027062c5c9fc77d2571b Mon Sep 17 00:00:00 2001 From: Harshith Mullapudi Date: Fri, 24 Oct 2025 23:35:46 +0530 Subject: [PATCH] feat: remove trigger and run base on bullmq --- .env.example | 6 +- apps/webapp/app/bullmq/connection.ts | 49 ++ apps/webapp/app/bullmq/queues/index.ts | 115 +++++ apps/webapp/app/bullmq/start-workers.ts | 43 ++ apps/webapp/app/bullmq/utils/job-finder.ts | 134 ++++++ apps/webapp/app/bullmq/workers/index.ts | 181 +++++++ apps/webapp/app/env.server.ts | 3 + .../jobs/conversation/create-title.logic.ts | 96 ++++ .../app/jobs/deep-search/deep-search.logic.ts | 105 ++++ .../app/jobs/ingest/ingest-document.logic.ts | 289 +++++++++++ .../app/jobs/ingest/ingest-episode.logic.ts | 266 ++++++++++ .../jobs/session/session-compaction.logic.ts | 455 ++++++++++++++++++ apps/webapp/app/lib/ingest.server.ts | 47 +- apps/webapp/app/lib/queue-adapter.server.ts | 196 ++++++++ apps/webapp/app/routes/api.v1.deep-search.tsx | 6 +- .../routes/api.v1.ingestion_queue.delete.tsx | 16 +- apps/webapp/app/routes/api.v1.logs.$logId.tsx | 16 +- apps/webapp/app/routes/api.v1.logs.tsx | 2 +- .../app/services/conversation.server.ts | 14 +- apps/webapp/app/services/jobManager.server.ts | 87 ++++ .../app/services/sessionCompaction.server.ts | 4 +- .../conversation/create-conversation-title.ts | 63 +-- .../app/trigger/ingest/ingest-document.ts | 289 +---------- apps/webapp/app/trigger/ingest/ingest.ts | 256 +--------- .../app/trigger/session/session-compaction.ts | 111 +---- .../app/trigger/spaces/space-summary.ts | 2 +- apps/webapp/app/utils/startup.ts | 27 +- docker-compose.aws.yaml | 70 --- hosting/docker/.env | 72 +-- hosting/docker/docker-compose.yaml | 237 --------- 30 files changed, 2129 insertions(+), 1128 deletions(-) create mode 100644 apps/webapp/app/bullmq/connection.ts create mode 100644 apps/webapp/app/bullmq/queues/index.ts create mode 100644 apps/webapp/app/bullmq/start-workers.ts create mode 100644 apps/webapp/app/bullmq/utils/job-finder.ts create mode 100644 apps/webapp/app/bullmq/workers/index.ts create mode 100644 apps/webapp/app/jobs/conversation/create-title.logic.ts create mode 100644 apps/webapp/app/jobs/deep-search/deep-search.logic.ts create mode 100644 apps/webapp/app/jobs/ingest/ingest-document.logic.ts create mode 100644 apps/webapp/app/jobs/ingest/ingest-episode.logic.ts create mode 100644 apps/webapp/app/jobs/session/session-compaction.logic.ts create mode 100644 apps/webapp/app/lib/queue-adapter.server.ts create mode 100644 apps/webapp/app/services/jobManager.server.ts delete mode 100644 docker-compose.aws.yaml diff --git a/.env.example b/.env.example index 289272e..ce6a062 100644 --- a/.env.example +++ b/.env.example @@ -56,7 +56,5 @@ AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_REGION=us-east-1 -## Trigger ## -TRIGGER_PROJECT_ID= -TRIGGER_SECRET_KEY= -TRIGGER_API_URL=http://host.docker.internal:8030 +QUEUE_PROVIDER=bullmq + diff --git a/apps/webapp/app/bullmq/connection.ts b/apps/webapp/app/bullmq/connection.ts new file mode 100644 index 0000000..560ea4e --- /dev/null +++ b/apps/webapp/app/bullmq/connection.ts @@ -0,0 +1,49 @@ +import Redis, { type RedisOptions } from "ioredis"; + +let redisConnection: Redis | null = null; + +/** + * Get or create a Redis connection for BullMQ + * This connection is shared across all queues and workers + */ +export function getRedisConnection() { + if (redisConnection) { + return redisConnection; + } + + // Dynamically import ioredis only when needed + + const redisConfig: RedisOptions = { + host: process.env.REDIS_HOST, + port: parseInt(process.env.REDIS_PORT as string), + maxRetriesPerRequest: null, // Required for BullMQ + enableReadyCheck: false, // Required for BullMQ + }; + + // Add TLS configuration if not disabled + if (!process.env.REDIS_TLS_DISABLED) { + redisConfig.tls = {}; + } + + redisConnection = new Redis(redisConfig); + + redisConnection.on("error", (error) => { + console.error("Redis connection error:", error); + }); + + redisConnection.on("connect", () => { + console.log("Redis connected successfully"); + }); + + return redisConnection; +} + +/** + * Close the Redis connection (useful for graceful shutdown) + */ +export async function closeRedisConnection(): Promise { + if (redisConnection) { + await redisConnection.quit(); + redisConnection = null; + } +} diff --git a/apps/webapp/app/bullmq/queues/index.ts b/apps/webapp/app/bullmq/queues/index.ts new file mode 100644 index 0000000..4fc6964 --- /dev/null +++ b/apps/webapp/app/bullmq/queues/index.ts @@ -0,0 +1,115 @@ +/** + * BullMQ Queues + * + * All queue definitions for the BullMQ implementation + */ + +import { Queue } from "bullmq"; +import { getRedisConnection } from "../connection"; + +/** + * Episode ingestion queue + * Handles individual episode ingestion (including document chunks) + */ +export const ingestQueue = new Queue("ingest-queue", { + connection: getRedisConnection(), + defaultJobOptions: { + attempts: 3, + backoff: { + type: "exponential", + delay: 2000, + }, + removeOnComplete: { + age: 3600, // Keep completed jobs for 1 hour + count: 1000, // Keep last 1000 completed jobs + }, + removeOnFail: { + age: 86400, // Keep failed jobs for 24 hours + }, + }, +}); + +/** + * Document ingestion queue + * Handles document-level ingestion with differential processing + */ +export const documentIngestQueue = new Queue("document-ingest-queue", { + connection: getRedisConnection(), + defaultJobOptions: { + attempts: 3, + backoff: { + type: "exponential", + delay: 2000, + }, + removeOnComplete: { + age: 3600, + count: 1000, + }, + removeOnFail: { + age: 86400, + }, + }, +}); + +/** + * Conversation title creation queue + */ +export const conversationTitleQueue = new Queue("conversation-title-queue", { + connection: getRedisConnection(), + defaultJobOptions: { + attempts: 3, + backoff: { + type: "exponential", + delay: 2000, + }, + removeOnComplete: { + age: 3600, + count: 1000, + }, + removeOnFail: { + age: 86400, + }, + }, +}); + +/** + * Deep search queue + */ +export const deepSearchQueue = new Queue("deep-search-queue", { + connection: getRedisConnection(), + defaultJobOptions: { + attempts: 3, + backoff: { + type: "exponential", + delay: 2000, + }, + removeOnComplete: { + age: 3600, + count: 1000, + }, + removeOnFail: { + age: 86400, + }, + }, +}); + +/** + * Session compaction queue + */ +export const sessionCompactionQueue = new Queue("session-compaction-queue", { + connection: getRedisConnection(), + defaultJobOptions: { + attempts: 3, + backoff: { + type: "exponential", + delay: 2000, + }, + removeOnComplete: { + age: 3600, + count: 1000, + }, + removeOnFail: { + age: 86400, + }, + }, +}); diff --git a/apps/webapp/app/bullmq/start-workers.ts b/apps/webapp/app/bullmq/start-workers.ts new file mode 100644 index 0000000..dace682 --- /dev/null +++ b/apps/webapp/app/bullmq/start-workers.ts @@ -0,0 +1,43 @@ +/** + * BullMQ Worker Startup Script + * + * This script starts all BullMQ workers for processing background jobs. + * Run this as a separate process alongside your main application. + * + * Usage: + * tsx apps/webapp/app/bullmq/start-workers.ts + */ + +import { logger } from "~/services/logger.service"; +import { + ingestWorker, + documentIngestWorker, + conversationTitleWorker, + deepSearchWorker, + sessionCompactionWorker, + closeAllWorkers, +} from "./workers"; + +export async function startWorkers() {} + +// Handle graceful shutdown +process.on("SIGTERM", async () => { + logger.log("SIGTERM received, closing workers gracefully..."); + await closeAllWorkers(); + process.exit(0); +}); + +process.on("SIGINT", async () => { + logger.log("SIGINT received, closing workers gracefully..."); + await closeAllWorkers(); + process.exit(0); +}); + +// Log worker startup +logger.log("Starting BullMQ workers..."); +logger.log(`- Ingest worker: ${ingestWorker.name}`); +logger.log(`- Document ingest worker: ${documentIngestWorker.name}`); +logger.log(`- Conversation title worker: ${conversationTitleWorker.name}`); +logger.log(`- Deep search worker: ${deepSearchWorker.name}`); +logger.log(`- Session compaction worker: ${sessionCompactionWorker.name}`); +logger.log("All BullMQ workers started and listening for jobs"); diff --git a/apps/webapp/app/bullmq/utils/job-finder.ts b/apps/webapp/app/bullmq/utils/job-finder.ts new file mode 100644 index 0000000..5e604c1 --- /dev/null +++ b/apps/webapp/app/bullmq/utils/job-finder.ts @@ -0,0 +1,134 @@ +/** + * BullMQ Job Finder Utilities + * + * Helper functions to find, retrieve, and cancel BullMQ jobs + */ + +interface JobInfo { + id: string; + isCompleted: boolean; + status?: string; +} + +/** + * Get all active queues + */ +async function getAllQueues() { + const { + ingestQueue, + documentIngestQueue, + conversationTitleQueue, + deepSearchQueue, + sessionCompactionQueue, + } = await import("../queues"); + + return [ + ingestQueue, + documentIngestQueue, + conversationTitleQueue, + deepSearchQueue, + sessionCompactionQueue, + ]; +} + +/** + * Find jobs by tags (metadata stored in job data) + * Since BullMQ doesn't have native tag support like Trigger.dev, + * we search through jobs and check if their data contains the required identifiers + */ +export async function getJobsByTags( + tags: string[], + taskIdentifier?: string, +): Promise { + const queues = await getAllQueues(); + const matchingJobs: JobInfo[] = []; + + for (const queue of queues) { + // Skip if taskIdentifier is specified and doesn't match queue name + if (taskIdentifier && !queue.name.includes(taskIdentifier)) { + continue; + } + + // Get all active and waiting jobs + const [active, waiting, delayed] = await Promise.all([ + queue.getActive(), + queue.getWaiting(), + queue.getDelayed(), + ]); + + const allJobs = [...active, ...waiting, ...delayed]; + + for (const job of allJobs) { + // Check if job data contains all required tags + const jobData = job.data as any; + const matchesTags = tags.every( + (tag) => + job.id?.includes(tag) || + jobData.userId === tag || + jobData.workspaceId === tag || + jobData.queueId === tag, + ); + + if (matchesTags) { + const state = await job.getState(); + matchingJobs.push({ + id: job.id!, + isCompleted: state === "completed" || state === "failed", + status: state, + }); + } + } + } + + return matchingJobs; +} + +/** + * Get a specific job by ID across all queues + */ +export async function getJobById(jobId: string): Promise { + const queues = await getAllQueues(); + + for (const queue of queues) { + try { + const job = await queue.getJob(jobId); + if (job) { + const state = await job.getState(); + return { + id: job.id!, + isCompleted: state === "completed" || state === "failed", + status: state, + }; + } + } catch { + // Job not in this queue, continue + continue; + } + } + + return null; +} + +/** + * Cancel a job by ID + */ +export async function cancelJobById(jobId: string): Promise { + const queues = await getAllQueues(); + + for (const queue of queues) { + try { + const job = await queue.getJob(jobId); + if (job) { + const state = await job.getState(); + // Only remove if not already completed + if (state !== "completed" && state !== "failed") { + await job.remove(); + } + return; + } + } catch { + // Job not in this queue, continue + continue; + } + } +} diff --git a/apps/webapp/app/bullmq/workers/index.ts b/apps/webapp/app/bullmq/workers/index.ts new file mode 100644 index 0000000..dd8ceaa --- /dev/null +++ b/apps/webapp/app/bullmq/workers/index.ts @@ -0,0 +1,181 @@ +/** + * BullMQ Workers + * + * All worker definitions for processing background jobs with BullMQ + */ + +import { Worker } from "bullmq"; +import { getRedisConnection } from "../connection"; +import { + processEpisodeIngestion, + type IngestEpisodePayload, +} from "~/jobs/ingest/ingest-episode.logic"; +import { + processDocumentIngestion, + type IngestDocumentPayload, +} from "~/jobs/ingest/ingest-document.logic"; +import { + processConversationTitleCreation, + type CreateConversationTitlePayload, +} from "~/jobs/conversation/create-title.logic"; +import { + processDeepSearch, + type ProcessDeepSearchPayload, +} from "~/jobs/deep-search/deep-search.logic"; +import { + processSessionCompaction, + type SessionCompactionPayload, +} from "~/jobs/session/session-compaction.logic"; +import { + enqueueIngestEpisode, + enqueueSpaceAssignment, + enqueueSessionCompaction, +} from "~/lib/queue-adapter.server"; +import { logger } from "~/services/logger.service"; + +/** + * Episode ingestion worker + * Processes individual episode ingestion jobs with per-user concurrency + * + * Note: Per-user concurrency is achieved by using userId as part of the jobId + * when adding jobs to the queue, ensuring only one job per user runs at a time + */ +export const ingestWorker = new Worker( + "ingest-queue", + async (job) => { + const payload = job.data as IngestEpisodePayload; + + return await processEpisodeIngestion( + payload, + // Callbacks to enqueue follow-up jobs + enqueueSpaceAssignment, + enqueueSessionCompaction, + ); + }, + { + connection: getRedisConnection(), + concurrency: 5, // Process up to 5 jobs in parallel + }, +); + +ingestWorker.on("completed", (job) => { + logger.log(`Job ${job.id} completed`); +}); + +ingestWorker.on("failed", (job, error) => { + logger.error(`Job ${job?.id} failed: ${error}`); +}); + +/** + * Document ingestion worker + * Handles document-level ingestion with differential processing + * + * Note: Per-user concurrency is achieved by using userId as part of the jobId + * when adding jobs to the queue + */ +export const documentIngestWorker = new Worker( + "document-ingest-queue", + async (job) => { + const payload = job.data as IngestDocumentPayload; + return await processDocumentIngestion( + payload, + // Callback to enqueue episode ingestion for each chunk + enqueueIngestEpisode, + ); + }, + { + connection: getRedisConnection(), + concurrency: 3, // Process up to 3 documents in parallel + }, +); + +documentIngestWorker.on("completed", (job) => { + logger.log(`Document job ${job.id} completed`); +}); + +documentIngestWorker.on("failed", (job, error) => { + logger.error(`Document job ${job?.id} failed: ${error}`); +}); + +/** + * Conversation title creation worker + */ +export const conversationTitleWorker = new Worker( + "conversation-title-queue", + async (job) => { + const payload = job.data as CreateConversationTitlePayload; + return await processConversationTitleCreation(payload); + }, + { + connection: getRedisConnection(), + concurrency: 10, // Process up to 10 title creations in parallel + }, +); + +conversationTitleWorker.on("completed", (job) => { + logger.log(`Conversation title job ${job.id} completed`); +}); + +conversationTitleWorker.on("failed", (job, error) => { + logger.error(`Conversation title job ${job?.id} failed: ${error}`); +}); + +/** + * Deep search worker (non-streaming version for BullMQ) + */ +export const deepSearchWorker = new Worker( + "deep-search-queue", + async (job) => { + const payload = job.data as ProcessDeepSearchPayload; + return await processDeepSearch(payload); + }, + { + connection: getRedisConnection(), + concurrency: 5, // Process up to 5 searches in parallel + }, +); + +deepSearchWorker.on("completed", (job) => { + logger.log(`Deep search job ${job.id} completed`); +}); + +deepSearchWorker.on("failed", (job, error) => { + logger.error(`Deep search job ${job?.id} failed: ${error}`); +}); + +/** + * Session compaction worker + */ +export const sessionCompactionWorker = new Worker( + "session-compaction-queue", + async (job) => { + const payload = job.data as SessionCompactionPayload; + return await processSessionCompaction(payload); + }, + { + connection: getRedisConnection(), + concurrency: 3, // Process up to 3 compactions in parallel + }, +); + +sessionCompactionWorker.on("completed", (job) => { + logger.log(`Session compaction job ${job.id} completed`); +}); + +sessionCompactionWorker.on("failed", (job, error) => { + logger.error(`Session compaction job ${job?.id} failed: ${error}`); +}); + +/** + * Graceful shutdown handler + */ +export async function closeAllWorkers(): Promise { + await Promise.all([ + ingestWorker.close(), + documentIngestWorker.close(), + conversationTitleWorker.close(), + deepSearchWorker.close(), + sessionCompactionWorker.close(), + ]); + logger.log("All BullMQ workers closed"); +} diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index afe7281..139cffa 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -95,6 +95,9 @@ const EnvironmentSchema = z.object({ AWS_ACCESS_KEY_ID: z.string().optional(), AWS_SECRET_ACCESS_KEY: z.string().optional(), AWS_REGION: z.string().optional(), + + // Queue provider + QUEUE_PROVIDER: z.enum(["trigger", "bullmq"]).default("trigger"), }); export type Environment = z.infer; diff --git a/apps/webapp/app/jobs/conversation/create-title.logic.ts b/apps/webapp/app/jobs/conversation/create-title.logic.ts new file mode 100644 index 0000000..07d3db8 --- /dev/null +++ b/apps/webapp/app/jobs/conversation/create-title.logic.ts @@ -0,0 +1,96 @@ +import { LLMMappings } from "@core/types"; +import { generate } from "~/trigger/chat/stream-utils"; +import { conversationTitlePrompt } from "~/trigger/conversation/prompt"; +import { prisma } from "~/trigger/utils/prisma"; +import { logger } from "~/services/logger.service"; + +export interface CreateConversationTitlePayload { + conversationId: string; + message: string; +} + +export interface CreateConversationTitleResult { + success: boolean; + title?: string; + error?: string; +} + +/** + * Core business logic for creating conversation titles + * This is shared between Trigger.dev and BullMQ implementations + */ +export async function processConversationTitleCreation( + payload: CreateConversationTitlePayload, +): Promise { + try { + let conversationTitleResponse = ""; + const gen = generate( + [ + { + role: "user", + content: conversationTitlePrompt.replace( + "{{message}}", + payload.message, + ), + }, + ], + false, + () => {}, + undefined, + "", + LLMMappings.GPT41, + ); + + for await (const chunk of gen) { + if (typeof chunk === "string") { + conversationTitleResponse += chunk; + } else if (chunk && typeof chunk === "object" && chunk.message) { + conversationTitleResponse += chunk.message; + } + } + + const outputMatch = conversationTitleResponse.match( + /(.*?)<\/output>/s, + ); + + logger.info(`Conversation title data: ${JSON.stringify(outputMatch)}`); + + if (!outputMatch) { + logger.error("No output found in recurrence response"); + throw new Error("Invalid response format from AI"); + } + + const jsonStr = outputMatch[1].trim(); + const conversationTitleData = JSON.parse(jsonStr); + + if (conversationTitleData) { + await prisma.conversation.update({ + where: { + id: payload.conversationId, + }, + data: { + title: conversationTitleData.title, + }, + }); + + return { + success: true, + title: conversationTitleData.title, + }; + } + + return { + success: false, + error: "No title generated", + }; + } catch (error: any) { + logger.error( + `Error creating conversation title for ${payload.conversationId}:`, + error, + ); + return { + success: false, + error: error.message, + }; + } +} diff --git a/apps/webapp/app/jobs/deep-search/deep-search.logic.ts b/apps/webapp/app/jobs/deep-search/deep-search.logic.ts new file mode 100644 index 0000000..94ee2b8 --- /dev/null +++ b/apps/webapp/app/jobs/deep-search/deep-search.logic.ts @@ -0,0 +1,105 @@ +import { type CoreMessage } from "ai"; +import { logger } from "~/services/logger.service"; +import { nanoid } from "nanoid"; +import { + deletePersonalAccessToken, + getOrCreatePersonalAccessToken, +} from "~/trigger/utils/utils"; +import { getReActPrompt } from "~/trigger/deep-search/prompt"; +import { type DeepSearchPayload, type DeepSearchResponse } from "~/trigger/deep-search/types"; +import { createSearchMemoryTool } from "~/trigger/deep-search/utils"; +import { run } from "~/trigger/deep-search/deep-search-utils"; +import { AgentMessageType } from "~/trigger/chat/types"; + +export interface ProcessDeepSearchPayload { + content: string; + userId: string; + metadata?: any; + intentOverride?: string; +} + +export interface ProcessDeepSearchResult { + success: boolean; + synthesis?: string; + error?: string; +} + +/** + * Core business logic for deep search (non-streaming version for BullMQ) + * This is shared logic, but the streaming happens in Trigger.dev via metadata.stream + */ +export async function processDeepSearch( + payload: ProcessDeepSearchPayload, +): Promise { + const { content, userId, metadata: meta, intentOverride } = payload; + + const randomKeyName = `deepSearch_${nanoid(10)}`; + + // Get or create token for search API calls + const pat = await getOrCreatePersonalAccessToken({ + name: randomKeyName, + userId: userId as string, + }); + + if (!pat?.token) { + return { + success: false, + error: "Failed to create personal access token", + }; + } + + try { + // Create search tool that agent will use + const searchTool = createSearchMemoryTool(pat.token); + + // Build initial messages with ReAct prompt + const initialMessages: CoreMessage[] = [ + { + role: "system", + content: getReActPrompt(meta, intentOverride), + }, + { + role: "user", + content: `CONTENT TO ANALYZE:\n${content}\n\nPlease search my memory for relevant context and synthesize what you find.`, + }, + ]; + + // Run the ReAct loop generator + const llmResponse = run(initialMessages, searchTool); + + let synthesis = ""; + + // For BullMQ: iterate without streaming, just accumulate the final synthesis + for await (const step of llmResponse) { + // MESSAGE_CHUNK: Final synthesis - accumulate + if (step.type === AgentMessageType.MESSAGE_CHUNK) { + synthesis += step.message; + } + + // STREAM_END: Loop completed + if (step.type === AgentMessageType.STREAM_END) { + break; + } + } + + await deletePersonalAccessToken(pat?.id); + + // Clean up any remaining tags + synthesis = synthesis + .replace(//gi, "") + .replace(/<\/final_response>/gi, "") + .trim(); + + return { + success: true, + synthesis, + }; + } catch (error: any) { + await deletePersonalAccessToken(pat?.id); + logger.error(`Deep search error: ${error}`); + return { + success: false, + error: error.message, + }; + } +} diff --git a/apps/webapp/app/jobs/ingest/ingest-document.logic.ts b/apps/webapp/app/jobs/ingest/ingest-document.logic.ts new file mode 100644 index 0000000..20c1866 --- /dev/null +++ b/apps/webapp/app/jobs/ingest/ingest-document.logic.ts @@ -0,0 +1,289 @@ +import { type z } from "zod"; +import crypto from "crypto"; + +import { IngestionStatus } from "@core/database"; +import { EpisodeTypeEnum } from "@core/types"; +import { logger } from "~/services/logger.service"; +import { saveDocument } from "~/services/graphModels/document"; +import { type IngestBodyRequest } from "~/lib/ingest.server"; +import { DocumentVersioningService } from "~/services/documentVersioning.server"; +import { DocumentDifferentialService } from "~/services/documentDiffer.server"; +import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; +import { prisma } from "~/trigger/utils/prisma"; + +export interface IngestDocumentPayload { + body: z.infer; + userId: string; + workspaceId: string; + queueId: string; +} + +export interface IngestDocumentResult { + success: boolean; + error?: string; +} + +/** + * Core business logic for document ingestion with differential processing + * This is shared between Trigger.dev and BullMQ implementations + * + * Note: This function should NOT call trigger functions directly for chunk processing. + * Instead, use the enqueueEpisodeIngestion callback to queue episode ingestion jobs. + */ +export async function processDocumentIngestion( + payload: IngestDocumentPayload, + // Callback function for enqueueing episode ingestion for each chunk + enqueueEpisodeIngestion?: (params: { + body: any; + userId: string; + workspaceId: string; + queueId: string; + }) => Promise<{ id?: string }>, +): Promise { + const startTime = Date.now(); + + try { + logger.log(`Processing document for user ${payload.userId}`, { + contentLength: payload.body.episodeBody.length, + }); + + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + status: IngestionStatus.PROCESSING, + }, + }); + + const documentBody = payload.body; + + // Step 1: Initialize services and prepare document version + const versioningService = new DocumentVersioningService(); + const differentialService = new DocumentDifferentialService(); + const knowledgeGraphService = new KnowledgeGraphService(); + + const { + documentNode: document, + versionInfo, + chunkedDocument, + } = await versioningService.prepareDocumentVersion( + documentBody.sessionId!, + payload.userId, + documentBody.metadata?.documentTitle?.toString() || "Untitled Document", + documentBody.episodeBody, + documentBody.source, + documentBody.metadata || {}, + ); + + logger.log(`Document version analysis:`, { + version: versionInfo.newVersion, + isNewDocument: versionInfo.isNewDocument, + hasContentChanged: versionInfo.hasContentChanged, + changePercentage: versionInfo.chunkLevelChanges.changePercentage, + changedChunks: versionInfo.chunkLevelChanges.changedChunkIndices.length, + totalChunks: versionInfo.chunkLevelChanges.totalChunks, + }); + + // Step 2: Determine processing strategy + const differentialDecision = + await differentialService.analyzeDifferentialNeed( + documentBody.episodeBody, + versionInfo.existingDocument, + chunkedDocument, + ); + + logger.log(`Differential analysis:`, { + shouldUseDifferential: differentialDecision.shouldUseDifferential, + strategy: differentialDecision.strategy, + reason: differentialDecision.reason, + documentSizeTokens: differentialDecision.documentSizeTokens, + }); + + // Early return for unchanged documents + if (differentialDecision.strategy === "skip_processing") { + logger.log("Document content unchanged, skipping processing"); + return { + success: true, + }; + } + + // Step 3: Save the new document version + await saveDocument(document); + + // Step 3.1: Invalidate statements from previous document version if it exists + let invalidationResults = null; + if (versionInfo.existingDocument && versionInfo.hasContentChanged) { + logger.log( + `Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`, + ); + + invalidationResults = + await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion( + { + previousDocumentUuid: versionInfo.existingDocument.uuid, + newDocumentContent: documentBody.episodeBody, + userId: payload.userId, + invalidatedBy: document.uuid, + semanticSimilarityThreshold: 0.75, // Configurable threshold + }, + ); + + logger.log(`Statement invalidation completed:`, { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + }); + } + + logger.log( + `Document chunked into ${chunkedDocument.chunks.length} chunks`, + ); + + // Step 4: Process chunks based on differential strategy + let chunksToProcess = chunkedDocument.chunks; + let processingMode = "full"; + + if ( + differentialDecision.shouldUseDifferential && + differentialDecision.strategy === "chunk_level_diff" + ) { + // Only process changed chunks + const chunkComparisons = differentialService.getChunkComparisons( + versionInfo.existingDocument!, + chunkedDocument, + ); + + const changedIndices = + differentialService.getChunksNeedingReprocessing(chunkComparisons); + chunksToProcess = chunkedDocument.chunks.filter((chunk) => + changedIndices.includes(chunk.chunkIndex), + ); + processingMode = "differential"; + + logger.log( + `Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`, + ); + } else if (differentialDecision.strategy === "full_reingest") { + // Process all chunks + processingMode = "full"; + logger.log( + `Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`, + ); + } + + // Step 5: Queue chunks for processing + const episodeHandlers = []; + if (enqueueEpisodeIngestion) { + for (const chunk of chunksToProcess) { + const chunkEpisodeData = { + episodeBody: chunk.content, + referenceTime: documentBody.referenceTime, + metadata: { + ...documentBody.metadata, + processingMode, + differentialStrategy: differentialDecision.strategy, + chunkHash: chunk.contentHash, + documentTitle: + documentBody.metadata?.documentTitle?.toString() || + "Untitled Document", + chunkIndex: chunk.chunkIndex, + documentUuid: document.uuid, + }, + source: documentBody.source, + spaceIds: documentBody.spaceIds, + sessionId: documentBody.sessionId, + type: EpisodeTypeEnum.DOCUMENT, + }; + + const episodeHandler = await enqueueEpisodeIngestion({ + body: chunkEpisodeData, + userId: payload.userId, + workspaceId: payload.workspaceId, + queueId: payload.queueId, + }); + + if (episodeHandler.id) { + episodeHandlers.push(episodeHandler.id); + logger.log( + `Queued chunk ${chunk.chunkIndex + 1} for ${processingMode} processing`, + { + handlerId: episodeHandler.id, + chunkSize: chunk.content.length, + chunkHash: chunk.contentHash, + }, + ); + } + } + } + + // Calculate cost savings + const costSavings = differentialService.calculateCostSavings( + chunkedDocument.chunks.length, + chunksToProcess.length, + ); + + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + output: { + documentUuid: document.uuid, + version: versionInfo.newVersion, + totalChunks: chunkedDocument.chunks.length, + chunksProcessed: chunksToProcess.length, + chunksSkipped: costSavings.chunksSkipped, + processingMode, + differentialStrategy: differentialDecision.strategy, + estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, + statementInvalidation: invalidationResults + ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } + : null, + episodes: [], + episodeHandlers, + }, + status: IngestionStatus.PROCESSING, + }, + }); + + const processingTimeMs = Date.now() - startTime; + + logger.log( + `Document differential processing completed in ${processingTimeMs}ms`, + { + documentUuid: document.uuid, + version: versionInfo.newVersion, + processingMode, + totalChunks: chunkedDocument.chunks.length, + chunksProcessed: chunksToProcess.length, + chunksSkipped: costSavings.chunksSkipped, + estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, + changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`, + statementInvalidation: invalidationResults + ? { + totalAnalyzed: invalidationResults.totalStatementsAnalyzed, + invalidated: invalidationResults.invalidatedStatements.length, + preserved: invalidationResults.preservedStatements.length, + } + : "No previous version", + }, + ); + + return { success: true }; + } catch (err: any) { + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + error: err.message, + status: IngestionStatus.FAILED, + }, + }); + + logger.error( + `Error processing document for user ${payload.userId}:`, + err, + ); + return { success: false, error: err.message }; + } +} diff --git a/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts b/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts new file mode 100644 index 0000000..e1b515a --- /dev/null +++ b/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts @@ -0,0 +1,266 @@ +import { z } from "zod"; +import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; +import { linkEpisodeToDocument } from "~/services/graphModels/document"; +import { IngestionStatus } from "@core/database"; +import { logger } from "~/services/logger.service"; +import { prisma } from "~/trigger/utils/prisma"; +import { EpisodeType } from "@core/types"; +import { deductCredits, hasCredits } from "~/trigger/utils/utils"; +import { assignEpisodesToSpace } from "~/services/graphModels/space"; + +export const IngestBodyRequest = z.object({ + episodeBody: z.string(), + referenceTime: z.string(), + metadata: z.record(z.union([z.string(), z.number(), z.boolean()])).optional(), + source: z.string(), + spaceIds: z.array(z.string()).optional(), + sessionId: z.string().optional(), + type: z + .enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT]) + .default(EpisodeType.CONVERSATION), +}); + +export interface IngestEpisodePayload { + body: z.infer; + userId: string; + workspaceId: string; + queueId: string; +} + +export interface IngestEpisodeResult { + success: boolean; + episodeDetails?: any; + error?: string; +} + +/** + * Core business logic for ingesting a single episode + * This is shared between Trigger.dev and BullMQ implementations + * + * Note: This function should NOT call trigger functions directly. + * Instead, return data that indicates follow-up jobs are needed, + * and let the caller (Trigger task or BullMQ worker) handle job queueing. + */ +export async function processEpisodeIngestion( + payload: IngestEpisodePayload, + // Callback functions for enqueueing follow-up jobs + enqueueSpaceAssignment?: (params: { + userId: string; + workspaceId: string; + mode: "episode"; + episodeIds: string[]; + }) => Promise, + enqueueSessionCompaction?: (params: { + userId: string; + sessionId: string; + source: string; + }) => Promise, +): Promise { + try { + logger.log(`Processing job for user ${payload.userId}`); + + // Check if workspace has sufficient credits before processing + const hasSufficientCredits = await hasCredits( + payload.workspaceId, + "addEpisode", + ); + + if (!hasSufficientCredits) { + logger.warn(`Insufficient credits for workspace ${payload.workspaceId}`); + + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + status: IngestionStatus.NO_CREDITS, + error: + "Insufficient credits. Please upgrade your plan or wait for your credits to reset.", + }, + }); + + return { + success: false, + error: "Insufficient credits", + }; + } + + const ingestionQueue = await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + status: IngestionStatus.PROCESSING, + }, + }); + + const knowledgeGraphService = new KnowledgeGraphService(); + + const episodeBody = payload.body as any; + + const episodeDetails = await knowledgeGraphService.addEpisode( + { + ...episodeBody, + userId: payload.userId, + }, + prisma, + ); + + // Link episode to document if it's a document chunk + if ( + episodeBody.type === EpisodeType.DOCUMENT && + episodeBody.metadata.documentUuid && + episodeDetails.episodeUuid + ) { + try { + await linkEpisodeToDocument( + episodeDetails.episodeUuid, + episodeBody.metadata.documentUuid, + episodeBody.metadata.chunkIndex || 0, + ); + logger.log( + `Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.metadata.documentUuid} at chunk ${episodeBody.metadata.chunkIndex || 0}`, + ); + } catch (error) { + logger.error(`Failed to link episode to document:`, { + error, + episodeUuid: episodeDetails.episodeUuid, + documentUuid: episodeBody.metadata.documentUuid, + }); + } + } + + let finalOutput = episodeDetails; + let episodeUuids: string[] = episodeDetails.episodeUuid + ? [episodeDetails.episodeUuid] + : []; + let currentStatus: IngestionStatus = IngestionStatus.COMPLETED; + if (episodeBody.type === EpisodeType.DOCUMENT) { + const currentOutput = ingestionQueue.output as any; + currentOutput.episodes.push(episodeDetails); + episodeUuids = currentOutput.episodes.map( + (episode: any) => episode.episodeUuid, + ); + + finalOutput = { + ...currentOutput, + }; + + if (currentOutput.episodes.length !== currentOutput.totalChunks) { + currentStatus = IngestionStatus.PROCESSING; + } + } + + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + output: finalOutput, + status: currentStatus, + }, + }); + + // Deduct credits for episode creation + if (currentStatus === IngestionStatus.COMPLETED) { + await deductCredits( + payload.workspaceId, + "addEpisode", + finalOutput.statementsCreated, + ); + } + + // Handle space assignment after successful ingestion + try { + // If spaceIds were explicitly provided, immediately assign the episode to those spaces + if ( + episodeBody.spaceIds && + episodeBody.spaceIds.length > 0 && + episodeDetails.episodeUuid + ) { + logger.info(`Assigning episode to explicitly provided spaces`, { + userId: payload.userId, + episodeId: episodeDetails.episodeUuid, + spaceIds: episodeBody.spaceIds, + }); + + // Assign episode to each space + for (const spaceId of episodeBody.spaceIds) { + await assignEpisodesToSpace( + [episodeDetails.episodeUuid], + spaceId, + payload.userId, + ); + } + + logger.info( + `Skipping LLM space assignment - episode explicitly assigned to ${episodeBody.spaceIds.length} space(s)`, + ); + } else { + // Only trigger automatic LLM space assignment if no explicit spaceIds were provided + logger.info( + `Triggering LLM space assignment after successful ingestion`, + { + userId: payload.userId, + workspaceId: payload.workspaceId, + episodeId: episodeDetails?.episodeUuid, + }, + ); + if ( + episodeDetails.episodeUuid && + currentStatus === IngestionStatus.COMPLETED && + enqueueSpaceAssignment + ) { + await enqueueSpaceAssignment({ + userId: payload.userId, + workspaceId: payload.workspaceId, + mode: "episode", + episodeIds: episodeUuids, + }); + } + } + } catch (assignmentError) { + // Don't fail the ingestion if assignment fails + logger.warn(`Failed to trigger space assignment after ingestion:`, { + error: assignmentError, + userId: payload.userId, + episodeId: episodeDetails?.episodeUuid, + }); + } + + // Auto-trigger session compaction if episode has sessionId + try { + if ( + episodeBody.sessionId && + currentStatus === IngestionStatus.COMPLETED && + enqueueSessionCompaction + ) { + logger.info(`Checking if session compaction should be triggered`, { + userId: payload.userId, + sessionId: episodeBody.sessionId, + source: episodeBody.source, + }); + + await enqueueSessionCompaction({ + userId: payload.userId, + sessionId: episodeBody.sessionId, + source: episodeBody.source, + }); + } + } catch (compactionError) { + // Don't fail the ingestion if compaction fails + logger.warn(`Failed to trigger session compaction after ingestion:`, { + error: compactionError, + userId: payload.userId, + sessionId: episodeBody.sessionId, + }); + } + + return { success: true, episodeDetails }; + } catch (err: any) { + await prisma.ingestionQueue.update({ + where: { id: payload.queueId }, + data: { + error: err.message, + status: IngestionStatus.FAILED, + }, + }); + + logger.error(`Error processing job for user ${payload.userId}:`, err); + return { success: false, error: err.message }; + } +} diff --git a/apps/webapp/app/jobs/session/session-compaction.logic.ts b/apps/webapp/app/jobs/session/session-compaction.logic.ts new file mode 100644 index 0000000..92e242b --- /dev/null +++ b/apps/webapp/app/jobs/session/session-compaction.logic.ts @@ -0,0 +1,455 @@ +import { logger } from "~/services/logger.service"; +import type { CoreMessage } from "ai"; +import { z } from "zod"; +import { getEmbedding, makeModelCall } from "~/lib/model.server"; +import { + getCompactedSessionBySessionId, + linkEpisodesToCompact, + getSessionEpisodes, + type CompactedSessionNode, + type SessionEpisodeData, + saveCompactedSession, +} from "~/services/graphModels/compactedSession"; + +export interface SessionCompactionPayload { + userId: string; + sessionId: string; + source: string; + triggerSource?: "auto" | "manual" | "threshold"; +} + +export interface SessionCompactionResult { + success: boolean; + compactionResult?: { + compactUuid: string; + sessionId: string; + summary: string; + episodeCount: number; + startTime: Date; + endTime: Date; + confidence: number; + compressionRatio: number; + }; + reason?: string; + episodeCount?: number; + error?: string; +} + +// Zod schema for LLM response validation +const CompactionResultSchema = z.object({ + summary: z.string().describe("Consolidated narrative of the entire session"), + confidence: z + .number() + .min(0) + .max(1) + .describe("Confidence score of the compaction quality"), +}); + +const CONFIG = { + minEpisodesForCompaction: 5, // Minimum episodes to trigger compaction + compactionThreshold: 1, // Trigger after N new episodes + maxEpisodesPerBatch: 50, // Process in batches if needed +}; + +/** + * Core business logic for session compaction + * This is shared between Trigger.dev and BullMQ implementations + */ +export async function processSessionCompaction( + payload: SessionCompactionPayload, +): Promise { + const { userId, sessionId, source, triggerSource = "auto" } = payload; + + logger.info(`Starting session compaction`, { + userId, + sessionId, + source, + triggerSource, + }); + + try { + // Check if compaction already exists + const existingCompact = await getCompactedSessionBySessionId( + sessionId, + userId, + ); + + // Fetch all episodes for this session + const episodes = await getSessionEpisodes( + sessionId, + userId, + existingCompact?.endTime, + ); + + console.log("episodes", episodes.length); + // Check if we have enough episodes + if (!existingCompact && episodes.length < CONFIG.minEpisodesForCompaction) { + logger.info(`Not enough episodes for compaction`, { + sessionId, + episodeCount: episodes.length, + minRequired: CONFIG.minEpisodesForCompaction, + }); + return { + success: false, + reason: "insufficient_episodes", + episodeCount: episodes.length, + }; + } else if ( + existingCompact && + episodes.length < + CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold + ) { + logger.info(`Not enough new episodes for compaction`, { + sessionId, + episodeCount: episodes.length, + minRequired: + CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold, + }); + return { + success: false, + reason: "insufficient_new_episodes", + episodeCount: episodes.length, + }; + } + + // Generate or update compaction + const compactionResult = existingCompact + ? await updateCompaction(existingCompact, episodes, userId) + : await createCompaction(sessionId, episodes, userId, source); + + logger.info(`Session compaction completed`, { + sessionId, + compactUuid: compactionResult.uuid, + episodeCount: compactionResult.episodeCount, + compressionRatio: compactionResult.compressionRatio, + }); + + return { + success: true, + compactionResult: { + compactUuid: compactionResult.uuid, + sessionId: compactionResult.sessionId, + summary: compactionResult.summary, + episodeCount: compactionResult.episodeCount, + startTime: compactionResult.startTime, + endTime: compactionResult.endTime, + confidence: compactionResult.confidence, + compressionRatio: compactionResult.compressionRatio, + }, + }; + } catch (error) { + logger.error(`Session compaction failed`, { + sessionId, + userId, + error: error instanceof Error ? error.message : String(error), + }); + + return { + success: false, + error: error instanceof Error ? error.message : String(error), + }; + } +} + +/** + * Create new compaction + */ +async function createCompaction( + sessionId: string, + episodes: SessionEpisodeData[], + userId: string, + source: string, +): Promise { + logger.info(`Creating new compaction`, { + sessionId, + episodeCount: episodes.length, + }); + + // Generate compaction using LLM + const compactionData = await generateCompaction(episodes, null); + + // Generate embedding for summary + const summaryEmbedding = await getEmbedding(compactionData.summary); + + // Create CompactedSession node using graph model + const compactUuid = crypto.randomUUID(); + const now = new Date(); + const startTime = new Date(episodes[0].createdAt); + const endTime = new Date(episodes[episodes.length - 1].createdAt); + const episodeUuids = episodes.map((e) => e.uuid); + const compressionRatio = episodes.length / 1; + + const compactNode: CompactedSessionNode = { + uuid: compactUuid, + sessionId, + summary: compactionData.summary, + summaryEmbedding, + episodeCount: episodes.length, + startTime, + endTime, + createdAt: now, + confidence: compactionData.confidence, + userId, + source, + compressionRatio, + metadata: { triggerType: "create" }, + }; + + console.log("compactNode", compactNode); + // Use graph model functions + await saveCompactedSession(compactNode); + await linkEpisodesToCompact(compactUuid, episodeUuids, userId); + + logger.info(`Compaction created`, { + compactUuid, + episodeCount: episodes.length, + }); + + return compactNode; +} + +/** + * Update existing compaction with new episodes + */ +async function updateCompaction( + existingCompact: CompactedSessionNode, + newEpisodes: SessionEpisodeData[], + userId: string, +): Promise { + logger.info(`Updating existing compaction`, { + compactUuid: existingCompact.uuid, + newEpisodeCount: newEpisodes.length, + }); + + // Generate updated compaction using LLM (merging) + const compactionData = await generateCompaction( + newEpisodes, + existingCompact.summary, + ); + + // Generate new embedding for updated summary + const summaryEmbedding = await getEmbedding(compactionData.summary); + + // Update CompactedSession node using graph model + const now = new Date(); + const endTime = newEpisodes[newEpisodes.length - 1].createdAt; + const totalEpisodeCount = existingCompact.episodeCount + newEpisodes.length; + const compressionRatio = totalEpisodeCount / 1; + const episodeUuids = newEpisodes.map((e) => e.uuid); + + const updatedNode: CompactedSessionNode = { + ...existingCompact, + summary: compactionData.summary, + summaryEmbedding, + episodeCount: totalEpisodeCount, + endTime, + updatedAt: now, + confidence: compactionData.confidence, + compressionRatio, + metadata: { triggerType: "update", newEpisodesAdded: newEpisodes.length }, + }; + + // Use graph model functions + await saveCompactedSession(updatedNode); + await linkEpisodesToCompact(existingCompact.uuid, episodeUuids, userId); + + logger.info(`Compaction updated`, { + compactUuid: existingCompact.uuid, + totalEpisodeCount, + }); + + return updatedNode; +} + +/** + * Generate compaction using LLM (similar to Claude Code's compact approach) + */ +async function generateCompaction( + episodes: SessionEpisodeData[], + existingSummary: string | null, +): Promise> { + const systemPrompt = createCompactionSystemPrompt(); + const userPrompt = createCompactionUserPrompt(episodes, existingSummary); + + const messages: CoreMessage[] = [ + { role: "system", content: systemPrompt }, + { role: "user", content: userPrompt }, + ]; + + logger.info(`Generating compaction with LLM`, { + episodeCount: episodes.length, + hasExistingSummary: !!existingSummary, + }); + + try { + let responseText = ""; + await makeModelCall( + false, + messages, + (text: string) => { + responseText = text; + }, + undefined, + "high", + ); + + return parseCompactionResponse(responseText); + } catch (error) { + logger.error(`Failed to generate compaction`, { + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } +} + +/** + * System prompt for compaction (for agent recall/context retrieval) + */ +function createCompactionSystemPrompt(): string { + return `You are a session compaction specialist. Your task is to create a rich, informative summary that will help AI agents understand what happened in this conversation session when they need context for future interactions. + +## PURPOSE + +This summary will be retrieved by AI agents when the user references this session in future conversations. The agent needs enough context to: +- Understand what was discussed and why +- Know what decisions were made and their rationale +- Grasp the outcome and current state +- Have relevant technical details to provide informed responses + +## COMPACTION GOALS + +1. **Comprehensive Context**: Capture all important information that might be referenced later +2. **Decision Documentation**: Clearly state what was decided, why, and what alternatives were considered +3. **Technical Details**: Include specific implementations, tools, configurations, and technical choices +4. **Outcome Clarity**: Make it clear what was accomplished and what the final state is +5. **Evolution Tracking**: Show how thinking or decisions evolved during the session + +## COMPACTION RULES + +1. **Be Information-Dense**: Pack useful details without fluff or repetition +2. **Structure Chronologically**: Start with problem/question, show progression, end with outcome +3. **Highlight Key Points**: Emphasize decisions, implementations, results, and learnings +4. **Include Specifics**: Names of libraries, specific configurations, metrics, numbers matter +5. **Resolve Contradictions**: Always use the most recent/final version when information conflicts + +## OUTPUT REQUIREMENTS + +- **summary**: A detailed, information-rich narrative that tells the complete story + - Structure naturally based on content - use as many paragraphs as needed + - Each distinct topic, decision, or phase should get its own paragraph(s) + - Start with context and initial problem/question + - Progress chronologically through discussions, decisions, and implementations + - **Final paragraph MUST**: State the outcome, results, and current state + - Don't artificially limit length - capture everything important + +- **confidence**: Score (0-1) reflecting how well this summary captures the session's essence + +Your response MUST be valid JSON wrapped in tags. + +## KEY PRINCIPLES + +- Write for an AI agent that needs to help the user in future conversations +- Include technical specifics that might be referenced (library names, configurations, metrics) +- Make outcomes and current state crystal clear in the final paragraph +- Show the reasoning behind decisions, not just the decisions themselves +- Be comprehensive but concise - every sentence should add value +- Each major topic or phase deserves its own paragraph(s) +- Don't compress too much - agents need the details +`; +} + +/** + * User prompt for compaction + */ +function createCompactionUserPrompt( + episodes: SessionEpisodeData[], + existingSummary: string | null, +): string { + let prompt = ""; + + if (existingSummary) { + prompt += `## EXISTING SUMMARY (from previous compaction)\n\n${existingSummary}\n\n`; + prompt += `## NEW EPISODES (to merge into existing summary)\n\n`; + } else { + prompt += `## SESSION EPISODES (to compact)\n\n`; + } + + episodes.forEach((episode, index) => { + const timestamp = new Date(episode.validAt).toISOString(); + prompt += `### Episode ${index + 1} (${timestamp})\n`; + prompt += `Source: ${episode.source}\n`; + prompt += `Content:\n${episode.originalContent}\n\n`; + }); + + if (existingSummary) { + prompt += `\n## INSTRUCTIONS\n\n`; + prompt += `Merge the new episodes into the existing summary. Update facts, add new information, and maintain narrative coherence. Ensure the consolidated summary reflects the complete session including both old and new content.\n`; + } else { + prompt += `\n## INSTRUCTIONS\n\n`; + prompt += `Create a compact summary of this entire session. Consolidate all information into a coherent narrative with deduplicated key facts.\n`; + } + + return prompt; +} + +/** + * Parse LLM response for compaction + */ +function parseCompactionResponse( + response: string, +): z.infer { + try { + // Extract content from tags + const outputMatch = response.match(/([\s\S]*?)<\/output>/); + if (!outputMatch) { + logger.warn("No tags found in LLM compaction response"); + logger.debug("Full LLM response:", { response }); + throw new Error("Invalid LLM response format - missing tags"); + } + + let jsonContent = outputMatch[1].trim(); + + // Remove markdown code blocks if present + jsonContent = jsonContent.replace(/```json\n?/g, "").replace(/```\n?/g, ""); + + const parsed = JSON.parse(jsonContent); + + // Validate with schema + const validated = CompactionResultSchema.parse(parsed); + + return validated; + } catch (error) { + logger.error("Failed to parse compaction response", { + error: error instanceof Error ? error.message : String(error), + response: response.substring(0, 500), + }); + throw new Error(`Failed to parse compaction response: ${error}`); + } +} + +/** + * Helper function to check if compaction should be triggered + */ +export async function shouldTriggerCompaction( + sessionId: string, + userId: string, +): Promise { + const existingCompact = await getCompactedSessionBySessionId( + sessionId, + userId, + ); + + if (!existingCompact) { + // Check if we have enough episodes for initial compaction + const episodes = await getSessionEpisodes(sessionId, userId); + return episodes.length >= CONFIG.minEpisodesForCompaction; + } + + // Check if we have enough new episodes to update + const newEpisodes = await getSessionEpisodes( + sessionId, + userId, + existingCompact.endTime, + ); + return newEpisodes.length >= CONFIG.compactionThreshold; +} diff --git a/apps/webapp/app/lib/ingest.server.ts b/apps/webapp/app/lib/ingest.server.ts index 2bc7557..4788539 100644 --- a/apps/webapp/app/lib/ingest.server.ts +++ b/apps/webapp/app/lib/ingest.server.ts @@ -4,8 +4,11 @@ import { EpisodeType } from "@core/types"; import { type z } from "zod"; import { prisma } from "~/db.server"; import { hasCredits } from "~/services/billing.server"; -import { type IngestBodyRequest, ingestTask } from "~/trigger/ingest/ingest"; -import { ingestDocumentTask } from "~/trigger/ingest/ingest-document"; +import { type IngestBodyRequest } from "~/trigger/ingest/ingest"; +import { + enqueueIngestDocument, + enqueueIngestEpisode, +} from "~/lib/queue-adapter.server"; export const addToQueue = async ( rawBody: z.infer, @@ -51,36 +54,22 @@ export const addToQueue = async ( let handler; if (body.type === EpisodeType.DOCUMENT) { - handler = await ingestDocumentTask.trigger( - { - body, - userId, - workspaceId: user.Workspace.id, - queueId: queuePersist.id, - }, - { - queue: "document-ingestion-queue", - concurrencyKey: userId, - tags: [user.id, queuePersist.id], - }, - ); + handler = await enqueueIngestDocument({ + body, + userId, + workspaceId: user.Workspace.id, + queueId: queuePersist.id, + }); } else if (body.type === EpisodeType.CONVERSATION) { - handler = await ingestTask.trigger( - { - body, - userId, - workspaceId: user.Workspace.id, - queueId: queuePersist.id, - }, - { - queue: "ingestion-queue", - concurrencyKey: userId, - tags: [user.id, queuePersist.id], - }, - ); + handler = await enqueueIngestEpisode({ + body, + userId, + workspaceId: user.Workspace.id, + queueId: queuePersist.id, + }); } - return { id: handler?.id, token: handler?.publicAccessToken }; + return { id: handler?.id, publicAccessToken: handler?.token }; }; export { IngestBodyRequest }; diff --git a/apps/webapp/app/lib/queue-adapter.server.ts b/apps/webapp/app/lib/queue-adapter.server.ts new file mode 100644 index 0000000..a102781 --- /dev/null +++ b/apps/webapp/app/lib/queue-adapter.server.ts @@ -0,0 +1,196 @@ +/** + * Queue Adapter + * + * This module provides a unified interface for queueing background jobs, + * supporting both Trigger.dev and BullMQ backends based on the QUEUE_PROVIDER + * environment variable. + * + * Usage: + * - Set QUEUE_PROVIDER="trigger" for Trigger.dev (default, good for production scaling) + * - Set QUEUE_PROVIDER="bullmq" for BullMQ (good for open-source deployments) + */ + +import { env } from "~/env.server"; +import type { z } from "zod"; +import type { IngestBodyRequest } from "~/jobs/ingest/ingest-episode.logic"; +import type { CreateConversationTitlePayload } from "~/jobs/conversation/create-title.logic"; +import type { ProcessDeepSearchPayload } from "~/jobs/deep-search/deep-search.logic"; +import type { SessionCompactionPayload } from "~/jobs/session/session-compaction.logic"; + +type QueueProvider = "trigger" | "bullmq"; + +/** + * Enqueue episode ingestion job + */ +export async function enqueueIngestEpisode(payload: { + body: z.infer; + userId: string; + workspaceId: string; + queueId: string; +}): Promise<{ id?: string; token?: string }> { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { ingestTask } = await import("~/trigger/ingest/ingest"); + const handler = await ingestTask.trigger(payload, { + queue: "ingestion-queue", + concurrencyKey: payload.userId, + tags: [payload.userId, payload.queueId], + }); + return { id: handler.id, token: handler.publicAccessToken }; + } else { + // BullMQ + const { ingestQueue } = await import("~/bullmq/queues"); + const job = await ingestQueue.add("ingest-episode", payload, { + jobId: payload.queueId, + attempts: 3, + backoff: { type: "exponential", delay: 2000 }, + }); + return { id: job.id }; + } +} + +/** + * Enqueue document ingestion job + */ +export async function enqueueIngestDocument(payload: { + body: z.infer; + userId: string; + workspaceId: string; + queueId: string; +}): Promise<{ id?: string; token?: string }> { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { ingestDocumentTask } = await import( + "~/trigger/ingest/ingest-document" + ); + const handler = await ingestDocumentTask.trigger(payload, { + queue: "document-ingestion-queue", + concurrencyKey: payload.userId, + tags: [payload.userId, payload.queueId], + }); + return { id: handler.id, token: handler.publicAccessToken }; + } else { + // BullMQ + const { documentIngestQueue } = await import("~/bullmq/queues"); + const job = await documentIngestQueue.add("ingest-document", payload, { + jobId: payload.queueId, + attempts: 3, + backoff: { type: "exponential", delay: 2000 }, + }); + + return { id: job.id }; + } +} + +/** + * Enqueue conversation title creation job + */ +export async function enqueueCreateConversationTitle( + payload: CreateConversationTitlePayload, +): Promise<{ id?: string }> { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { createConversationTitle } = await import( + "~/trigger/conversation/create-conversation-title" + ); + const handler = await createConversationTitle.trigger(payload); + return { id: handler.id }; + } else { + // BullMQ + const { conversationTitleQueue } = await import("~/bullmq/queues"); + const job = await conversationTitleQueue.add( + "create-conversation-title", + payload, + { + attempts: 3, + backoff: { type: "exponential", delay: 2000 }, + }, + ); + return { id: job.id }; + } +} + +/** + * Enqueue deep search job + */ +export async function enqueueDeepSearch( + payload: ProcessDeepSearchPayload, +): Promise<{ id?: string }> { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { deepSearch } = await import("~/trigger/deep-search"); + const handler = await deepSearch.trigger({ + content: payload.content, + userId: payload.userId, + stream: true, + metadata: payload.metadata, + intentOverride: payload.intentOverride, + }); + return { id: handler.id }; + } else { + // BullMQ + const { deepSearchQueue } = await import("~/bullmq/queues"); + const job = await deepSearchQueue.add("deep-search", payload, { + attempts: 3, + backoff: { type: "exponential", delay: 2000 }, + }); + return { id: job.id }; + } +} + +/** + * Enqueue session compaction job + */ +export async function enqueueSessionCompaction( + payload: SessionCompactionPayload, +): Promise<{ id?: string }> { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { sessionCompactionTask } = await import( + "~/trigger/session/session-compaction" + ); + const handler = await sessionCompactionTask.trigger(payload); + return { id: handler.id }; + } else { + // BullMQ + const { sessionCompactionQueue } = await import("~/bullmq/queues"); + const job = await sessionCompactionQueue.add( + "session-compaction", + payload, + { + attempts: 3, + backoff: { type: "exponential", delay: 2000 }, + }, + ); + return { id: job.id }; + } +} + +/** + * Enqueue space assignment job + * (Helper for common job logic to call) + */ +export async function enqueueSpaceAssignment(payload: { + userId: string; + workspaceId: string; + mode: "episode"; + episodeIds: string[]; +}): Promise { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { triggerSpaceAssignment } = await import( + "~/trigger/spaces/space-assignment" + ); + await triggerSpaceAssignment(payload); + } else { + // For BullMQ, space assignment is not implemented yet + // You can add it later when needed + console.warn("Space assignment not implemented for BullMQ yet"); + } +} diff --git a/apps/webapp/app/routes/api.v1.deep-search.tsx b/apps/webapp/app/routes/api.v1.deep-search.tsx index 0ea57f7..28711d8 100644 --- a/apps/webapp/app/routes/api.v1.deep-search.tsx +++ b/apps/webapp/app/routes/api.v1.deep-search.tsx @@ -1,7 +1,7 @@ import { z } from "zod"; import { json } from "@remix-run/node"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { deepSearch } from "~/trigger/deep-search"; +import { enqueueDeepSearch } from "~/lib/queue-adapter.server"; import { runs } from "@trigger.dev/sdk"; const DeepSearchBodySchema = z.object({ @@ -30,7 +30,7 @@ const { action, loader } = createActionApiRoute( async ({ body, authentication }) => { let trigger; if (!body.stream) { - trigger = await deepSearch.trigger({ + trigger = await enqueueDeepSearch({ content: body.content, userId: authentication.userId, stream: body.stream, @@ -40,7 +40,7 @@ const { action, loader } = createActionApiRoute( return json(trigger); } else { - const runHandler = await deepSearch.trigger({ + const runHandler = await enqueueDeepSearch({ content: body.content, userId: authentication.userId, stream: body.stream, diff --git a/apps/webapp/app/routes/api.v1.ingestion_queue.delete.tsx b/apps/webapp/app/routes/api.v1.ingestion_queue.delete.tsx index fc760e9..290f126 100644 --- a/apps/webapp/app/routes/api.v1.ingestion_queue.delete.tsx +++ b/apps/webapp/app/routes/api.v1.ingestion_queue.delete.tsx @@ -6,7 +6,7 @@ import { deleteIngestionQueue, getIngestionQueue, } from "~/services/ingestionLogs.server"; -import { runs, tasks } from "@trigger.dev/sdk"; +import { findRunningJobs, cancelJob } from "~/services/jobManager.server"; export const DeleteEpisodeBodyRequest = z.object({ id: z.string(), @@ -37,19 +37,15 @@ const { action, loader } = createHybridActionApiRoute( } const output = ingestionQueue.output as any; - const runningTasks = await runs.list({ - tag: [authentication.userId, ingestionQueue.id], + const runningTasks = await findRunningJobs({ + tags: [authentication.userId, ingestionQueue.id], taskIdentifier: "ingest-episode", }); - const latestTask = runningTasks.data.find( - (task) => - task.tags.includes(authentication.userId) && - task.tags.includes(ingestionQueue.id), - ); + const latestTask = runningTasks[0]; - if (latestTask && !latestTask?.isCompleted) { - runs.cancel(latestTask?.id as string); + if (latestTask && !latestTask.isCompleted) { + await cancelJob(latestTask.id); } let result; diff --git a/apps/webapp/app/routes/api.v1.logs.$logId.tsx b/apps/webapp/app/routes/api.v1.logs.$logId.tsx index 76ae432..1a53fd8 100644 --- a/apps/webapp/app/routes/api.v1.logs.$logId.tsx +++ b/apps/webapp/app/routes/api.v1.logs.$logId.tsx @@ -1,5 +1,4 @@ import { json } from "@remix-run/node"; -import { runs } from "@trigger.dev/sdk"; import { z } from "zod"; import { deleteEpisodeWithRelatedNodes } from "~/services/graphModels/episode"; import { @@ -11,6 +10,7 @@ import { createHybridActionApiRoute, createHybridLoaderApiRoute, } from "~/services/routeBuilders/apiBuilder.server"; +import { findRunningJobs, cancelJob } from "~/services/jobManager.server"; // Schema for space ID parameter const LogParamsSchema = z.object({ @@ -59,19 +59,15 @@ const { action } = createHybridActionApiRoute( } const output = ingestionQueue.output as any; - const runningTasks = await runs.list({ - tag: [authentication.userId, ingestionQueue.id], + const runningTasks = await findRunningJobs({ + tags: [authentication.userId, ingestionQueue.id], taskIdentifier: "ingest-episode", }); - const latestTask = runningTasks.data.find( - (task) => - task.tags.includes(authentication.userId) && - task.tags.includes(ingestionQueue.id), - ); + const latestTask = runningTasks[0]; - if (latestTask && !latestTask?.isCompleted) { - runs.cancel(latestTask?.id); + if (latestTask && !latestTask.isCompleted) { + await cancelJob(latestTask.id); } let result; diff --git a/apps/webapp/app/routes/api.v1.logs.tsx b/apps/webapp/app/routes/api.v1.logs.tsx index 46e4616..0a06c57 100644 --- a/apps/webapp/app/routes/api.v1.logs.tsx +++ b/apps/webapp/app/routes/api.v1.logs.tsx @@ -1,4 +1,4 @@ -import { type LoaderFunctionArgs, json } from "@remix-run/node"; +import { json } from "@remix-run/node"; import { z } from "zod"; import { prisma } from "~/db.server"; import { createHybridLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; diff --git a/apps/webapp/app/services/conversation.server.ts b/apps/webapp/app/services/conversation.server.ts index 3fbdc3b..05906af 100644 --- a/apps/webapp/app/services/conversation.server.ts +++ b/apps/webapp/app/services/conversation.server.ts @@ -2,7 +2,7 @@ import { UserTypeEnum } from "@core/types"; import { auth, runs, tasks } from "@trigger.dev/sdk/v3"; import { prisma } from "~/db.server"; -import { createConversationTitle } from "~/trigger/conversation/create-conversation-title"; +import { enqueueCreateConversationTitle } from "~/lib/queue-adapter.server"; import { z } from "zod"; import { type ConversationHistory } from "@prisma/client"; @@ -87,14 +87,10 @@ export async function createConversation( const context = await getConversationContext(conversationHistory.id); // Trigger conversation title task - await tasks.trigger( - createConversationTitle.id, - { - conversationId: conversation.id, - message: conversationData.message, - }, - { tags: [conversation.id, workspaceId] }, - ); + await enqueueCreateConversationTitle({ + conversationId: conversation.id, + message: conversationData.message, + }); const handler = await tasks.trigger( "chat", diff --git a/apps/webapp/app/services/jobManager.server.ts b/apps/webapp/app/services/jobManager.server.ts new file mode 100644 index 0000000..95fc7b2 --- /dev/null +++ b/apps/webapp/app/services/jobManager.server.ts @@ -0,0 +1,87 @@ +/** + * Job Manager Service + * + * Unified interface for managing background jobs across both + * Trigger.dev and BullMQ queue providers. + */ + +import { env } from "~/env.server"; + +type QueueProvider = "trigger" | "bullmq"; + +interface JobInfo { + id: string; + isCompleted: boolean; + status?: string; +} + +/** + * Find running jobs by tags/identifiers + */ +export async function findRunningJobs(params: { + tags: string[]; + taskIdentifier?: string; +}): Promise { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { runs } = await import("@trigger.dev/sdk"); + const runningTasks = await runs.list({ + tag: params.tags, + taskIdentifier: params.taskIdentifier, + }); + + return runningTasks.data.map((task) => ({ + id: task.id, + isCompleted: task.isCompleted, + status: task.status, + })); + } else { + // BullMQ + const { getJobsByTags } = await import("~/bullmq/utils/job-finder"); + const jobs = await getJobsByTags(params.tags, params.taskIdentifier); + + return jobs; + } +} + +/** + * Cancel a running job + */ +export async function cancelJob(jobId: string): Promise { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { runs } = await import("@trigger.dev/sdk"); + await runs.cancel(jobId); + } else { + // BullMQ + const { cancelJobById } = await import("~/bullmq/utils/job-finder"); + await cancelJobById(jobId); + } +} + +/** + * Get job status + */ +export async function getJobStatus(jobId: string): Promise { + const provider = env.QUEUE_PROVIDER as QueueProvider; + + if (provider === "trigger") { + const { runs } = await import("@trigger.dev/sdk"); + try { + const run = await runs.retrieve(jobId); + return { + id: run.id, + isCompleted: run.isCompleted, + status: run.status, + }; + } catch { + return null; + } + } else { + // BullMQ + const { getJobById } = await import("~/bullmq/utils/job-finder"); + return await getJobById(jobId); + } +} diff --git a/apps/webapp/app/services/sessionCompaction.server.ts b/apps/webapp/app/services/sessionCompaction.server.ts index 1afc8a9..8d4b36b 100644 --- a/apps/webapp/app/services/sessionCompaction.server.ts +++ b/apps/webapp/app/services/sessionCompaction.server.ts @@ -5,7 +5,7 @@ import { getSessionEpisodes, type CompactedSessionNode, } from "~/services/graphModels/compactedSession"; -import { tasks } from "@trigger.dev/sdk/v3"; +import { enqueueSessionCompaction } from "~/lib/queue-adapter.server"; /** * Configuration for session compaction @@ -144,7 +144,7 @@ export class SessionCompactionService { reason: check.reason, }); - const handle = await tasks.trigger("session-compaction", { + const handle = await enqueueSessionCompaction({ userId, sessionId, source, diff --git a/apps/webapp/app/trigger/conversation/create-conversation-title.ts b/apps/webapp/app/trigger/conversation/create-conversation-title.ts index c6dc2e8..7d9f5e2 100644 --- a/apps/webapp/app/trigger/conversation/create-conversation-title.ts +++ b/apps/webapp/app/trigger/conversation/create-conversation-title.ts @@ -1,61 +1,12 @@ -import { LLMMappings } from "@core/types"; -import { logger, task } from "@trigger.dev/sdk/v3"; -import { generate } from "../chat/stream-utils"; -import { conversationTitlePrompt } from "./prompt"; -import { prisma } from "../utils/prisma"; +import { task } from "@trigger.dev/sdk/v3"; +import { + processConversationTitleCreation, + type CreateConversationTitlePayload, +} from "~/jobs/conversation/create-title.logic"; export const createConversationTitle = task({ id: "create-conversation-title", - run: async (payload: { conversationId: string; message: string }) => { - let conversationTitleResponse = ""; - const gen = generate( - [ - { - role: "user", - content: conversationTitlePrompt.replace( - "{{message}}", - payload.message, - ), - }, - ], - false, - () => {}, - undefined, - "", - LLMMappings.GPT41, - ); - - for await (const chunk of gen) { - if (typeof chunk === "string") { - conversationTitleResponse += chunk; - } else if (chunk && typeof chunk === "object" && chunk.message) { - conversationTitleResponse += chunk.message; - } - } - - const outputMatch = conversationTitleResponse.match( - /(.*?)<\/output>/s, - ); - - logger.info(`Conversation title data: ${JSON.stringify(outputMatch)}`); - - if (!outputMatch) { - logger.error("No output found in recurrence response"); - throw new Error("Invalid response format from AI"); - } - - const jsonStr = outputMatch[1].trim(); - const conversationTitleData = JSON.parse(jsonStr); - - if (conversationTitleData) { - await prisma.conversation.update({ - where: { - id: payload.conversationId, - }, - data: { - title: conversationTitleData.title, - }, - }); - } + run: async (payload: CreateConversationTitlePayload) => { + return await processConversationTitleCreation(payload); }, }); diff --git a/apps/webapp/app/trigger/ingest/ingest-document.ts b/apps/webapp/app/trigger/ingest/ingest-document.ts index b461dd0..b1262fb 100644 --- a/apps/webapp/app/trigger/ingest/ingest-document.ts +++ b/apps/webapp/app/trigger/ingest/ingest-document.ts @@ -1,16 +1,8 @@ import { queue, task } from "@trigger.dev/sdk"; -import { type z } from "zod"; -import crypto from "crypto"; - -import { IngestionStatus } from "@core/database"; -import { EpisodeTypeEnum } from "@core/types"; -import { logger } from "~/services/logger.service"; -import { saveDocument } from "~/services/graphModels/document"; -import { type IngestBodyRequest } from "~/lib/ingest.server"; -import { DocumentVersioningService } from "~/services/documentVersioning.server"; -import { DocumentDifferentialService } from "~/services/documentDiffer.server"; -import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; -import { prisma } from "../utils/prisma"; +import { + processDocumentIngestion, + type IngestDocumentPayload, +} from "~/jobs/ingest/ingest-document.logic"; import { ingestTask } from "./ingest"; const documentIngestionQueue = queue({ @@ -23,266 +15,19 @@ export const ingestDocumentTask = task({ id: "ingest-document", queue: documentIngestionQueue, machine: "medium-2x", - run: async (payload: { - body: z.infer; - userId: string; - workspaceId: string; - queueId: string; - }) => { - const startTime = Date.now(); - - try { - logger.log(`Processing document for user ${payload.userId}`, { - contentLength: payload.body.episodeBody.length, - }); - - await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - status: IngestionStatus.PROCESSING, - }, - }); - - const documentBody = payload.body; - - // Step 1: Initialize services and prepare document version - const versioningService = new DocumentVersioningService(); - const differentialService = new DocumentDifferentialService(); - const knowledgeGraphService = new KnowledgeGraphService(); - - const { - documentNode: document, - versionInfo, - chunkedDocument, - } = await versioningService.prepareDocumentVersion( - documentBody.sessionId!, - payload.userId, - documentBody.metadata?.documentTitle?.toString() || "Untitled Document", - documentBody.episodeBody, - documentBody.source, - documentBody.metadata || {}, - ); - - logger.log(`Document version analysis:`, { - version: versionInfo.newVersion, - isNewDocument: versionInfo.isNewDocument, - hasContentChanged: versionInfo.hasContentChanged, - changePercentage: versionInfo.chunkLevelChanges.changePercentage, - changedChunks: versionInfo.chunkLevelChanges.changedChunkIndices.length, - totalChunks: versionInfo.chunkLevelChanges.totalChunks, - }); - - // Step 2: Determine processing strategy - const differentialDecision = - await differentialService.analyzeDifferentialNeed( - documentBody.episodeBody, - versionInfo.existingDocument, - chunkedDocument, - ); - - logger.log(`Differential analysis:`, { - shouldUseDifferential: differentialDecision.shouldUseDifferential, - strategy: differentialDecision.strategy, - reason: differentialDecision.reason, - documentSizeTokens: differentialDecision.documentSizeTokens, - }); - - // Early return for unchanged documents - if (differentialDecision.strategy === "skip_processing") { - logger.log("Document content unchanged, skipping processing"); - return { - success: true, - documentsProcessed: 1, - chunksProcessed: 0, - episodesCreated: 0, - entitiesExtracted: 0, - }; - } - - // Step 3: Save the new document version - await saveDocument(document); - - // Step 3.1: Invalidate statements from previous document version if it exists - let invalidationResults = null; - if (versionInfo.existingDocument && versionInfo.hasContentChanged) { - logger.log( - `Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`, - ); - - invalidationResults = - await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion( - { - previousDocumentUuid: versionInfo.existingDocument.uuid, - newDocumentContent: documentBody.episodeBody, - userId: payload.userId, - invalidatedBy: document.uuid, - semanticSimilarityThreshold: 0.75, // Configurable threshold - }, - ); - - logger.log(`Statement invalidation completed:`, { - totalAnalyzed: invalidationResults.totalStatementsAnalyzed, - invalidated: invalidationResults.invalidatedStatements.length, - preserved: invalidationResults.preservedStatements.length, + run: async (payload: IngestDocumentPayload) => { + // Use common logic with Trigger-specific callback for episode ingestion + return await processDocumentIngestion( + payload, + // Callback for enqueueing episode ingestion for each chunk + async (episodePayload) => { + const episodeHandler = await ingestTask.trigger(episodePayload, { + queue: "ingestion-queue", + concurrencyKey: episodePayload.userId, + tags: [episodePayload.userId, episodePayload.queueId], }); - } - - logger.log( - `Document chunked into ${chunkedDocument.chunks.length} chunks`, - ); - - // Step 4: Process chunks based on differential strategy - let chunksToProcess = chunkedDocument.chunks; - let processingMode = "full"; - - if ( - differentialDecision.shouldUseDifferential && - differentialDecision.strategy === "chunk_level_diff" - ) { - // Only process changed chunks - const chunkComparisons = differentialService.getChunkComparisons( - versionInfo.existingDocument!, - chunkedDocument, - ); - - const changedIndices = - differentialService.getChunksNeedingReprocessing(chunkComparisons); - chunksToProcess = chunkedDocument.chunks.filter((chunk) => - changedIndices.includes(chunk.chunkIndex), - ); - processingMode = "differential"; - - logger.log( - `Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`, - ); - } else if (differentialDecision.strategy === "full_reingest") { - // Process all chunks - processingMode = "full"; - logger.log( - `Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`, - ); - } - - // Step 5: Queue chunks for processing - const episodeHandlers = []; - for (const chunk of chunksToProcess) { - const chunkEpisodeData = { - episodeBody: chunk.content, - referenceTime: documentBody.referenceTime, - metadata: { - ...documentBody.metadata, - processingMode, - differentialStrategy: differentialDecision.strategy, - chunkHash: chunk.contentHash, - documentTitle: - documentBody.metadata?.documentTitle?.toString() || - "Untitled Document", - chunkIndex: chunk.chunkIndex, - documentUuid: document.uuid, - }, - source: documentBody.source, - spaceIds: documentBody.spaceIds, - sessionId: documentBody.sessionId, - type: EpisodeTypeEnum.DOCUMENT, - }; - - const episodeHandler = await ingestTask.trigger( - { - body: chunkEpisodeData, - userId: payload.userId, - workspaceId: payload.workspaceId, - queueId: payload.queueId, - }, - { - queue: "ingestion-queue", - concurrencyKey: payload.userId, - tags: [payload.userId, payload.queueId, processingMode], - }, - ); - - if (episodeHandler.id) { - episodeHandlers.push(episodeHandler.id); - logger.log( - `Queued chunk ${chunk.chunkIndex + 1} for ${processingMode} processing`, - { - handlerId: episodeHandler.id, - chunkSize: chunk.content.length, - chunkHash: chunk.contentHash, - }, - ); - } - } - - // Calculate cost savings - const costSavings = differentialService.calculateCostSavings( - chunkedDocument.chunks.length, - chunksToProcess.length, - ); - - await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - output: { - documentUuid: document.uuid, - version: versionInfo.newVersion, - totalChunks: chunkedDocument.chunks.length, - chunksProcessed: chunksToProcess.length, - chunksSkipped: costSavings.chunksSkipped, - processingMode, - differentialStrategy: differentialDecision.strategy, - estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, - statementInvalidation: invalidationResults - ? { - totalAnalyzed: invalidationResults.totalStatementsAnalyzed, - invalidated: invalidationResults.invalidatedStatements.length, - preserved: invalidationResults.preservedStatements.length, - } - : null, - episodes: [], - episodeHandlers, - }, - status: IngestionStatus.PROCESSING, - }, - }); - - const processingTimeMs = Date.now() - startTime; - - logger.log( - `Document differential processing completed in ${processingTimeMs}ms`, - { - documentUuid: document.uuid, - version: versionInfo.newVersion, - processingMode, - totalChunks: chunkedDocument.chunks.length, - chunksProcessed: chunksToProcess.length, - chunksSkipped: costSavings.chunksSkipped, - estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`, - changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`, - statementInvalidation: invalidationResults - ? { - totalAnalyzed: invalidationResults.totalStatementsAnalyzed, - invalidated: invalidationResults.invalidatedStatements.length, - preserved: invalidationResults.preservedStatements.length, - } - : "No previous version", - }, - ); - - return { success: true }; - } catch (err: any) { - await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - error: err.message, - status: IngestionStatus.FAILED, - }, - }); - - logger.error( - `Error processing document for user ${payload.userId}:`, - err, - ); - return { success: false, error: err.message }; - } + return { id: episodeHandler.id }; + }, + ); }, }); diff --git a/apps/webapp/app/trigger/ingest/ingest.ts b/apps/webapp/app/trigger/ingest/ingest.ts index 88b8387..4a3cd02 100644 --- a/apps/webapp/app/trigger/ingest/ingest.ts +++ b/apps/webapp/app/trigger/ingest/ingest.ts @@ -1,251 +1,37 @@ import { queue, task } from "@trigger.dev/sdk"; -import { z } from "zod"; -import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; -import { linkEpisodeToDocument } from "~/services/graphModels/document"; - -import { IngestionStatus } from "@core/database"; -import { logger } from "~/services/logger.service"; +import { + processEpisodeIngestion, + IngestBodyRequest, + type IngestEpisodePayload, +} from "~/jobs/ingest/ingest-episode.logic"; import { triggerSpaceAssignment } from "../spaces/space-assignment"; -import { prisma } from "../utils/prisma"; -import { EpisodeType } from "@core/types"; -import { deductCredits, hasCredits } from "../utils/utils"; -import { assignEpisodesToSpace } from "~/services/graphModels/space"; import { triggerSessionCompaction } from "../session/session-compaction"; -export const IngestBodyRequest = z.object({ - episodeBody: z.string(), - referenceTime: z.string(), - metadata: z.record(z.union([z.string(), z.number(), z.boolean()])).optional(), - source: z.string(), - spaceIds: z.array(z.string()).optional(), - sessionId: z.string().optional(), - type: z - .enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT]) - .default(EpisodeType.CONVERSATION), -}); - const ingestionQueue = queue({ name: "ingestion-queue", concurrencyLimit: 1, }); +// Export for backwards compatibility +export { IngestBodyRequest }; + // Register the Trigger.dev task export const ingestTask = task({ id: "ingest-episode", queue: ingestionQueue, machine: "medium-2x", - run: async (payload: { - body: z.infer; - userId: string; - workspaceId: string; - queueId: string; - }) => { - try { - logger.log(`Processing job for user ${payload.userId}`); - - // Check if workspace has sufficient credits before processing - const hasSufficientCredits = await hasCredits( - payload.workspaceId, - "addEpisode", - ); - - if (!hasSufficientCredits) { - logger.warn( - `Insufficient credits for workspace ${payload.workspaceId}`, - ); - - await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - status: IngestionStatus.NO_CREDITS, - error: - "Insufficient credits. Please upgrade your plan or wait for your credits to reset.", - }, - }); - - return { - success: false, - error: "Insufficient credits", - }; - } - - const ingestionQueue = await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - status: IngestionStatus.PROCESSING, - }, - }); - - const knowledgeGraphService = new KnowledgeGraphService(); - - const episodeBody = payload.body as any; - - const episodeDetails = await knowledgeGraphService.addEpisode( - { - ...episodeBody, - userId: payload.userId, - }, - prisma, - ); - - // Link episode to document if it's a document chunk - if ( - episodeBody.type === EpisodeType.DOCUMENT && - episodeBody.metadata.documentUuid && - episodeDetails.episodeUuid - ) { - try { - await linkEpisodeToDocument( - episodeDetails.episodeUuid, - episodeBody.metadata.documentUuid, - episodeBody.metadata.chunkIndex || 0, - ); - logger.log( - `Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.metadata.documentUuid} at chunk ${episodeBody.metadata.chunkIndex || 0}`, - ); - } catch (error) { - logger.error(`Failed to link episode to document:`, { - error, - episodeUuid: episodeDetails.episodeUuid, - documentUuid: episodeBody.metadata.documentUuid, - }); - } - } - - let finalOutput = episodeDetails; - let episodeUuids: string[] = episodeDetails.episodeUuid - ? [episodeDetails.episodeUuid] - : []; - let currentStatus: IngestionStatus = IngestionStatus.COMPLETED; - if (episodeBody.type === EpisodeType.DOCUMENT) { - const currentOutput = ingestionQueue.output as any; - currentOutput.episodes.push(episodeDetails); - episodeUuids = currentOutput.episodes.map( - (episode: any) => episode.episodeUuid, - ); - - finalOutput = { - ...currentOutput, - }; - - if (currentOutput.episodes.length !== currentOutput.totalChunks) { - currentStatus = IngestionStatus.PROCESSING; - } - } - - await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - output: finalOutput, - status: currentStatus, - }, - }); - - // Deduct credits for episode creation - if (currentStatus === IngestionStatus.COMPLETED) { - await deductCredits( - payload.workspaceId, - "addEpisode", - finalOutput.statementsCreated, - ); - } - - // Handle space assignment after successful ingestion - try { - // If spaceIds were explicitly provided, immediately assign the episode to those spaces - if ( - episodeBody.spaceIds && - episodeBody.spaceIds.length > 0 && - episodeDetails.episodeUuid - ) { - logger.info(`Assigning episode to explicitly provided spaces`, { - userId: payload.userId, - episodeId: episodeDetails.episodeUuid, - spaceIds: episodeBody.spaceIds, - }); - - // Assign episode to each space - for (const spaceId of episodeBody.spaceIds) { - await assignEpisodesToSpace( - [episodeDetails.episodeUuid], - spaceId, - payload.userId, - ); - } - - logger.info( - `Skipping LLM space assignment - episode explicitly assigned to ${episodeBody.spaceIds.length} space(s)`, - ); - } else { - // Only trigger automatic LLM space assignment if no explicit spaceIds were provided - logger.info( - `Triggering LLM space assignment after successful ingestion`, - { - userId: payload.userId, - workspaceId: payload.workspaceId, - episodeId: episodeDetails?.episodeUuid, - }, - ); - if ( - episodeDetails.episodeUuid && - currentStatus === IngestionStatus.COMPLETED - ) { - await triggerSpaceAssignment({ - userId: payload.userId, - workspaceId: payload.workspaceId, - mode: "episode", - episodeIds: episodeUuids, - }); - } - } - } catch (assignmentError) { - // Don't fail the ingestion if assignment fails - logger.warn(`Failed to trigger space assignment after ingestion:`, { - error: assignmentError, - userId: payload.userId, - episodeId: episodeDetails?.episodeUuid, - }); - } - - // Auto-trigger session compaction if episode has sessionId - try { - if ( - episodeBody.sessionId && - currentStatus === IngestionStatus.COMPLETED - ) { - logger.info(`Checking if session compaction should be triggered`, { - userId: payload.userId, - sessionId: episodeBody.sessionId, - source: episodeBody.source, - }); - - await triggerSessionCompaction({ - userId: payload.userId, - sessionId: episodeBody.sessionId, - source: episodeBody.source, - }); - } - } catch (compactionError) { - // Don't fail the ingestion if compaction fails - logger.warn(`Failed to trigger session compaction after ingestion:`, { - error: compactionError, - userId: payload.userId, - sessionId: episodeBody.sessionId, - }); - } - - return { success: true, episodeDetails }; - } catch (err: any) { - await prisma.ingestionQueue.update({ - where: { id: payload.queueId }, - data: { - error: err.message, - status: IngestionStatus.FAILED, - }, - }); - - logger.error(`Error processing job for user ${payload.userId}:`, err); - return { success: false, error: err.message }; - } + run: async (payload: IngestEpisodePayload) => { + // Use common logic with Trigger-specific callbacks for follow-up jobs + return await processEpisodeIngestion( + payload, + // Callback for space assignment + async (params) => { + await triggerSpaceAssignment(params); + }, + // Callback for session compaction + async (params) => { + await triggerSessionCompaction(params); + }, + ); }, }); diff --git a/apps/webapp/app/trigger/session/session-compaction.ts b/apps/webapp/app/trigger/session/session-compaction.ts index bb32659..6a61e2f 100644 --- a/apps/webapp/app/trigger/session/session-compaction.ts +++ b/apps/webapp/app/trigger/session/session-compaction.ts @@ -1,36 +1,8 @@ import { queue, task } from "@trigger.dev/sdk/v3"; -import { logger } from "~/services/logger.service"; -import { runQuery } from "~/lib/neo4j.server"; -import type { CoreMessage } from "ai"; -import { z } from "zod"; -import { getEmbedding, makeModelCall } from "~/lib/model.server"; import { - getCompactedSessionBySessionId, - linkEpisodesToCompact, - getSessionEpisodes, - type CompactedSessionNode, - type SessionEpisodeData, - saveCompactedSession, -} from "~/services/graphModels/compactedSession"; - -interface SessionCompactionPayload { - userId: string; - sessionId: string; - source: string; - triggerSource?: "auto" | "manual" | "threshold"; -} - -// Zod schema for LLM response validation -const CompactionResultSchema = z.object({ - summary: z.string().describe("Consolidated narrative of the entire session"), - confidence: z.number().min(0).max(1).describe("Confidence score of the compaction quality"), -}); - -const CONFIG = { - minEpisodesForCompaction: 5, // Minimum episodes to trigger compaction - compactionThreshold: 1, // Trigger after N new episodes - maxEpisodesPerBatch: 50, // Process in batches if needed -}; + processSessionCompaction, + type SessionCompactionPayload, +} from "~/jobs/session/session-compaction.logic"; export const sessionCompactionQueue = queue({ name: "session-compaction-queue", @@ -41,82 +13,7 @@ export const sessionCompactionTask = task({ id: "session-compaction", queue: sessionCompactionQueue, run: async (payload: SessionCompactionPayload) => { - const { userId, sessionId, source, triggerSource = "auto" } = payload; - - logger.info(`Starting session compaction`, { - userId, - sessionId, - source, - triggerSource, - }); - - try { - // Check if compaction already exists - const existingCompact = await getCompactedSessionBySessionId(sessionId, userId); - - // Fetch all episodes for this session - const episodes = await getSessionEpisodes(sessionId, userId, existingCompact?.endTime); - - console.log("episodes", episodes.length); - // Check if we have enough episodes - if (!existingCompact && episodes.length < CONFIG.minEpisodesForCompaction) { - logger.info(`Not enough episodes for compaction`, { - sessionId, - episodeCount: episodes.length, - minRequired: CONFIG.minEpisodesForCompaction, - }); - return { - success: false, - reason: "insufficient_episodes", - episodeCount: episodes.length, - }; - } else if (existingCompact && episodes.length < CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold) { - logger.info(`Not enough new episodes for compaction`, { - sessionId, - episodeCount: episodes.length, - minRequired: CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold, - }); - return { - success: false, - reason: "insufficient_new_episodes", - episodeCount: episodes.length, - }; - } - - // Generate or update compaction - const compactionResult = existingCompact - ? await updateCompaction(existingCompact, episodes, userId) - : await createCompaction(sessionId, episodes, userId, source); - - logger.info(`Session compaction completed`, { - sessionId, - compactUuid: compactionResult.uuid, - episodeCount: compactionResult.episodeCount, - compressionRatio: compactionResult.compressionRatio, - }); - - return { - success: true, - compactionResult: { - compactUuid: compactionResult.uuid, - sessionId: compactionResult.sessionId, - summary: compactionResult.summary, - episodeCount: compactionResult.episodeCount, - startTime: compactionResult.startTime, - endTime: compactionResult.endTime, - confidence: compactionResult.confidence, - compressionRatio: compactionResult.compressionRatio, - }, - }; - } catch (error) { - logger.error(`Session compaction failed`, { - sessionId, - userId, - error: error instanceof Error ? error.message : String(error), - }); - - throw error; - } + return await processSessionCompaction(payload); }, }); diff --git a/apps/webapp/app/trigger/spaces/space-summary.ts b/apps/webapp/app/trigger/spaces/space-summary.ts index 3f95831..ceafbeb 100644 --- a/apps/webapp/app/trigger/spaces/space-summary.ts +++ b/apps/webapp/app/trigger/spaces/space-summary.ts @@ -11,7 +11,7 @@ import { getSpace, updateSpace } from "../utils/space-utils"; import { EpisodeType } from "@core/types"; import { getSpaceEpisodeCount } from "~/services/graphModels/space"; -import { addToQueue } from "../utils/queue"; +import { addToQueue } from "~/lib/ingest.server"; interface SpaceSummaryPayload { userId: string; diff --git a/apps/webapp/app/utils/startup.ts b/apps/webapp/app/utils/startup.ts index eb5688f..5fcce3e 100644 --- a/apps/webapp/app/utils/startup.ts +++ b/apps/webapp/app/utils/startup.ts @@ -2,6 +2,7 @@ import { logger } from "~/services/logger.service"; import { fetchAndSaveStdioIntegrations } from "~/trigger/utils/mcp"; import { initNeo4jSchemaOnce } from "~/lib/neo4j.server"; import { env } from "~/env.server"; +import { startWorkers } from "~/bullmq/start-workers"; // Global flag to ensure startup only runs once per server process let startupInitialized = false; @@ -40,19 +41,23 @@ export async function initializeStartupServices() { process.exit(1); } - try { - const triggerApiUrl = env.TRIGGER_API_URL; - if (triggerApiUrl) { - await waitForTriggerLogin(triggerApiUrl); - await addEnvVariablesInTrigger(); - } else { - console.error("TRIGGER_API_URL is not set in environment variables."); + if (env.QUEUE_PROVIDER === "trigger") { + try { + const triggerApiUrl = env.TRIGGER_API_URL; + if (triggerApiUrl) { + await waitForTriggerLogin(triggerApiUrl); + await addEnvVariablesInTrigger(); + } else { + console.error("TRIGGER_API_URL is not set in environment variables."); + process.exit(1); + } + } catch (e) { + console.error(e); + console.error("Trigger is not configured"); process.exit(1); } - } catch (e) { - console.error(e); - console.error("Trigger is not configured"); - process.exit(1); + } else { + await startWorkers(); } try { diff --git a/docker-compose.aws.yaml b/docker-compose.aws.yaml deleted file mode 100644 index eefae80..0000000 --- a/docker-compose.aws.yaml +++ /dev/null @@ -1,70 +0,0 @@ -version: "3.8" - -services: - core: - container_name: core-app - image: redplanethq/core:${VERSION} - environment: - - NODE_ENV=${NODE_ENV} - - DATABASE_URL=${DATABASE_URL} - - DIRECT_URL=${DIRECT_URL} - - SESSION_SECRET=${SESSION_SECRET} - - ENCRYPTION_KEY=${ENCRYPTION_KEY} - - MAGIC_LINK_SECRET=${MAGIC_LINK_SECRET} - - LOGIN_ORIGIN=${LOGIN_ORIGIN} - - APP_ORIGIN=${APP_ORIGIN} - - REDIS_HOST=${REDIS_HOST} - - REDIS_PORT=${REDIS_PORT} - - REDIS_TLS_DISABLED=${REDIS_TLS_DISABLED} - - NEO4J_URI=${NEO4J_URI} - - NEO4J_USERNAME=${NEO4J_USERNAME} - - NEO4J_PASSWORD=${NEO4J_PASSWORD} - - OPENAI_API_KEY=${OPENAI_API_KEY} - - AUTH_GOOGLE_CLIENT_ID=${AUTH_GOOGLE_CLIENT_ID} - - AUTH_GOOGLE_CLIENT_SECRET=${AUTH_GOOGLE_CLIENT_SECRET} - - ENABLE_EMAIL_LOGIN=${ENABLE_EMAIL_LOGIN} - - OLLAMA_URL=${OLLAMA_URL} - - EMBEDDING_MODEL=${EMBEDDING_MODEL} - - MODEL=${MODEL} - - TRIGGER_PROJECT_ID=${TRIGGER_PROJECT_ID} - - TRIGGER_API_URL=${TRIGGER_API_URL} - - TRIGGER_SECRET_KEY=${TRIGGER_SECRET_KEY} - ports: - - "3033:3000" - depends_on: - - redis - - neo4j - networks: - - core - - redis: - container_name: core-redis - image: redis:7 - ports: - - "6379:6379" - networks: - - core - - neo4j: - container_name: core-neo4j - image: neo4j:5.25-community - environment: - - NEO4J_AUTH=${NEO4J_AUTH} - - NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.* - - NEO4J_dbms_security_procedures_allowlist=gds.*,apoc.* - ports: - - "7474:7474" - - "7687:7687" - volumes: - - type: bind - source: /efs/neo4j - target: /data - - type: bind - source: /efs/neo4j/plugins # version - 2.13.2 - target: /plugins - networks: - - core - -networks: - core: - driver: bridge diff --git a/hosting/docker/.env b/hosting/docker/.env index 6bc3a5e..c2ac97f 100644 --- a/hosting/docker/.env +++ b/hosting/docker/.env @@ -48,74 +48,4 @@ OLLAMA_URL= EMBEDDING_MODEL=text-embedding-3-small MODEL=gpt-4.1-2025-04-14 -## Trigger ## -TRIGGER_PROJECT_ID=proj_mqwudvjcukvybqxyjkjv -TRIGGER_SECRET_KEY=tr_prod_72iziCY2yWA5SdGxRFii - - -# ------------------------------------------------------------ -# Trigger.dev self-hosting environment variables -# - These are the default values for the self-hosting stack -# - You should change them to suit your needs, especially the secrets -# - See the docs for more information: https://trigger.dev/docs/self-hosting/overview - -# Secrets -# - Do NOT use these defaults in production -# - Generate your own by running `openssl rand -hex 16` for each secret -MANAGED_WORKER_SECRET=447c29678f9eaf289e9c4b70d3dd8a7f - -# Worker token -# - This is the token for the worker to connect to the webapp -# - When running the combined stack, this is set automatically during bootstrap -# - For the split setup, you will have to set this manually. The token is available in the webapp logs but will only be shown once. -# - See the docs for more information: https://trigger.dev/docs/self-hosting/docker -TRIGGER_WORKER_TOKEN=tr_wgt_jtRujkUnfK3RmNtUev049Clw7gaqwg77VMPGu7Iv -TRIGGER_TASKS_IMAGE=redplanethq/proj_core:latest - -# Worker URLs -# - In split setups, uncomment and set to the public URL of your webapp -# TRIGGER_API_URL=https://trigger.example.com -# OTEL_EXPORTER_OTLP_ENDPOINT=https://trigger.example.com/otel - -# Postgres -# - Do NOT use these defaults in production -# - Especially if you decide to expose the database to the internet -# POSTGRES_USER=postgres -TRIGGER_DB=trigger - -TRIGGER_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${DB_HOST}:${DB_PORT}/${TRIGGER_DB}?schema=public&sslmode=disable -TRIGGER_DIRECT_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${DB_HOST}:${DB_PORT}/${TRIGGER_DB}?schema=public&sslmode=disable -ELECTRIC_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${DB_HOST}/${TRIGGER_DB} - -# Trigger image tag -# - This is the version of the webapp and worker images to use, they should be locked to a specific version in production -# - For example: TRIGGER_IMAGE_TAG=v4.0.0-v4-beta.21 -TRIGGER_IMAGE_TAG=v4-beta - -# Webapp -# - These should generally be set to the same value -# - In production, these should be set to the public URL of your webapp, e.g. https://trigger.example.com -APP_ORIGIN=http://localhost:8030 -LOGIN_ORIGIN=http://localhost:8030 -API_ORIGIN=http://trigger-webapp:3000 -DEV_OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:8030/otel -# You may need to set this when testing locally or when using the combined setup -# API_ORIGIN=http://webapp:3000 - - - -# ClickHouse -# - Do NOT use these defaults in production -CLICKHOUSE_USER=default -CLICKHOUSE_PASSWORD=password -CLICKHOUSE_URL=http://default:password@clickhouse:8123?secure=false -RUN_REPLICATION_CLICKHOUSE_URL=http://default:password@clickhouse:8123 - -# Docker Registry -# - When testing locally, the default values should be fine -# - When deploying to production, you will have to change these, especially the password and URL -# - See the docs for more information: https://trigger.dev/docs/self-hosting/docker#registry-setup -DOCKER_REGISTRY_URL=docker.io -DOCKER_REGISTRY_USERNAME= -DOCKER_REGISTRY_PASSWORD= - +QUEUE_PROVIDER=bullmq \ No newline at end of file diff --git a/hosting/docker/docker-compose.yaml b/hosting/docker/docker-compose.yaml index c861344..48bb9d7 100644 --- a/hosting/docker/docker-compose.yaml +++ b/hosting/docker/docker-compose.yaml @@ -108,249 +108,12 @@ services: retries: 10 start_period: 20s - webapp: - container_name: trigger-webapp - image: ghcr.io/triggerdotdev/trigger.dev:v4.0.4 - restart: ${RESTART_POLICY:-unless-stopped} - logging: *logging-config - ports: - - ${WEBAPP_PUBLISH_IP:-0.0.0.0}:8030:3000 - depends_on: - clickhouse: - condition: service_started - init: - condition: service_started - networks: - - webapp - - supervisor - - core - volumes: - - shared:/home/node/shared - # Only needed for bootstrap - user: root - # Only needed for bootstrap - command: sh -c "chown -R node:node /home/node/shared && sleep 5 && exec ./scripts/entrypoint.sh" - healthcheck: - test: - [ - "CMD", - "node", - "-e", - "http.get('http://localhost:3000/healthcheck', res => process.exit(res.statusCode === 200 ? 0 : 1)).on('error', () => process.exit(1))", - ] - interval: 30s - timeout: 10s - retries: 5 - start_period: 10s - environment: - APP_ORIGIN: ${APP_ORIGIN:-http://localhost:8030} - LOGIN_ORIGIN: ${LOGIN_ORIGIN:-http://localhost:8030} - API_ORIGIN: ${API_ORIGIN:-http://localhost:8030} - ELECTRIC_ORIGIN: http://electric:3000 - DATABASE_URL: ${TRIGGER_DATABASE_URL:-postgresql://postgres:postgres@postgres:5432/main?schema=public&sslmode=disable} - DIRECT_URL: ${TRIGGER_DIRECT_URL:-postgresql://postgres:postgres@postgres:5432/main?schema=public&sslmode=disable} - SESSION_SECRET: ${SESSION_SECRET} - MAGIC_LINK_SECRET: ${MAGIC_LINK_SECRET} - ENCRYPTION_KEY: ${ENCRYPTION_KEY} - MANAGED_WORKER_SECRET: ${MANAGED_WORKER_SECRET} - REDIS_HOST: core-redis - REDIS_PORT: 6379 - REDIS_TLS_DISABLED: true - APP_LOG_LEVEL: info - DEV_OTEL_EXPORTER_OTLP_ENDPOINT: ${DEV_OTEL_EXPORTER_OTLP_ENDPOINT:-http://localhost:8030/otel} - DEPLOY_REGISTRY_HOST: ${DOCKER_REGISTRY_URL:-localhost:5000} - DEPLOY_REGISTRY_NAMESPACE: ${DOCKER_REGISTRY_NAMESPACE:-trigger} - OBJECT_STORE_BASE_URL: ${OBJECT_STORE_BASE_URL:-http://minio:9000} - OBJECT_STORE_ACCESS_KEY_ID: ${OBJECT_STORE_ACCESS_KEY_ID} - OBJECT_STORE_SECRET_ACCESS_KEY: ${OBJECT_STORE_SECRET_ACCESS_KEY} - GRACEFUL_SHUTDOWN_TIMEOUT: 1000 - # Bootstrap - this will automatically set up a worker group for you - # This will NOT work for split deployments - TRIGGER_BOOTSTRAP_ENABLED: 1 - TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: bootstrap - TRIGGER_BOOTSTRAP_WORKER_TOKEN_PATH: /home/node/shared/worker_token - # ClickHouse configuration - CLICKHOUSE_URL: ${CLICKHOUSE_URL:-http://default:password@clickhouse:8123?secure=false} - CLICKHOUSE_LOG_LEVEL: ${CLICKHOUSE_LOG_LEVEL:-info} - # Run replication - RUN_REPLICATION_ENABLED: ${RUN_REPLICATION_ENABLED:-1} - RUN_REPLICATION_CLICKHOUSE_URL: ${RUN_REPLICATION_CLICKHOUSE_URL:-http://default:password@clickhouse:8123} - RUN_REPLICATION_LOG_LEVEL: ${RUN_REPLICATION_LOG_LEVEL:-info} - # Limits - # TASK_PAYLOAD_OFFLOAD_THRESHOLD: 524288 # 512KB - # TASK_PAYLOAD_MAXIMUM_SIZE: 3145728 # 3MB - # BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: 1000000 # 1MB - # TASK_RUN_METADATA_MAXIMUM_SIZE: 262144 # 256KB - # DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: 100 - # DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: 100 - # Internal OTEL configuration - INTERNAL_OTEL_TRACE_LOGGING_ENABLED: ${INTERNAL_OTEL_TRACE_LOGGING_ENABLED:-0} - - electric: - container_name: trigger-electric - image: electricsql/electric:${ELECTRIC_IMAGE_TAG:-1.0.10} - restart: ${RESTART_POLICY:-unless-stopped} - logging: *logging-config - networks: - - webapp - - core - environment: - DATABASE_URL: ${ELECTRIC_DATABASE_URL:-postgresql://postgres:postgres@postgres:5432/main?schema=public&sslmode=disable} - ELECTRIC_INSECURE: true - ELECTRIC_USAGE_REPORTING: false - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:3000/v1/health"] - interval: 10s - timeout: 5s - retries: 5 - start_period: 10s - - clickhouse: - container_name: trigger-clickhouse - image: bitnami/clickhouse:${CLICKHOUSE_IMAGE_TAG:-latest} - restart: ${RESTART_POLICY:-unless-stopped} - logging: *logging-config - ports: - - ${CLICKHOUSE_PUBLISH_IP:-127.0.0.1}:9123:8123 - - ${CLICKHOUSE_PUBLISH_IP:-127.0.0.1}:9090:9000 - environment: - CLICKHOUSE_ADMIN_USER: ${CLICKHOUSE_USER:-default} - CLICKHOUSE_ADMIN_PASSWORD: ${CLICKHOUSE_PASSWORD:-password} - volumes: - - clickhouse:/bitnami/clickhouse - - ../clickhouse/override.xml:/bitnami/clickhouse/etc/config.d/override.xml:ro - networks: - - webapp - healthcheck: - test: - [ - "CMD", - "clickhouse-client", - "--host", - "localhost", - "--port", - "9000", - "--user", - "default", - "--password", - "password", - "--query", - "SELECT 1", - ] - interval: 5s - timeout: 5s - retries: 5 - start_period: 10s - - # Worker related - supervisor: - container_name: trigger-supervisor - image: ghcr.io/triggerdotdev/supervisor:v4.0.4 - restart: ${RESTART_POLICY:-unless-stopped} - logging: *logging-config - depends_on: - - docker-proxy - networks: - - supervisor - - docker-proxy - - webapp - - core - volumes: - - shared:/home/node/shared - # Only needed for bootstrap - user: root - # Only needed for bootstrap - command: sh -c "chown -R node:node /home/node/shared && exec /usr/bin/dumb-init -- pnpm run --filter supervisor start" - environment: - # This needs to match the token of the worker group you want to connect to - TRIGGER_WORKER_TOKEN: ${TRIGGER_WORKER_TOKEN} - # Use the bootstrap token created by the webapp - # TRIGGER_WORKER_TOKEN: file:///home/node/shared/worker_token - MANAGED_WORKER_SECRET: ${MANAGED_WORKER_SECRET} - TRIGGER_API_URL: ${TRIGGER_API_URL:-http://trigger-webapp:3000} - OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-http://trigger-webapp:3000/otel} - TRIGGER_WORKLOAD_API_DOMAIN: supervisor - TRIGGER_WORKLOAD_API_PORT_EXTERNAL: 8020 - # Optional settings - DEBUG: 1 - ENFORCE_MACHINE_PRESETS: 1 - TRIGGER_DEQUEUE_INTERVAL_MS: 1000 - DOCKER_HOST: tcp://docker-proxy:2375 - DOCKER_RUNNER_NETWORKS: webapp,supervisor,core - DOCKER_REGISTRY_URL: ${DOCKER_REGISTRY_URL:-localhost:5000} - DOCKER_REGISTRY_USERNAME: ${DOCKER_REGISTRY_USERNAME:-} - DOCKER_REGISTRY_PASSWORD: ${DOCKER_REGISTRY_PASSWORD:-} - DOCKER_AUTOREMOVE_EXITED_CONTAINERS: 0 - healthcheck: - test: - [ - "CMD", - "node", - "-e", - "http.get('http://localhost:8020/health', res => process.exit(res.statusCode === 200 ? 0 : 1)).on('error', () => process.exit(1))", - ] - interval: 30s - timeout: 10s - retries: 5 - start_period: 10s - - init: - container_name: trigger-init - image: redplanethq/init:${VERSION} - restart: "no" # prevent retries - environment: - - VERSION=${VERSION} - - DB_HOST=${DB_HOST} - - DB_PORT=${DB_PORT} - - TRIGGER_DB=${TRIGGER_DB} - - POSTGRES_USER=${POSTGRES_USER} - - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - TRIGGER_TASKS_IMAGE=${TRIGGER_TASKS_IMAGE} - - NODE_ENV=production - networks: - - webapp - - core - depends_on: - - postgres - - docker-proxy: - container_name: trigger-docker-proxy - image: tecnativa/docker-socket-proxy:${DOCKER_PROXY_IMAGE_TAG:-latest} - restart: ${RESTART_POLICY:-unless-stopped} - logging: *logging-config - volumes: - - /var/run/docker.sock:/var/run/docker.sock:ro - networks: - - docker-proxy - environment: - - LOG_LEVEL=info - - POST=1 - - CONTAINERS=1 - - IMAGES=1 - - INFO=1 - - NETWORKS=1 - healthcheck: - test: ["CMD", "nc", "-z", "127.0.0.1", "2375"] - interval: 30s - timeout: 5s - retries: 5 - start_period: 5s - networks: core: name: core driver: bridge - docker-proxy: - name: docker-proxy - supervisor: - name: supervisor - webapp: - name: webapp - driver: bridge volumes: postgres_data: neo4j_data: shared: - clickhouse: - minio: