feat: remove trigger and run base on bullmq

This commit is contained in:
Harshith Mullapudi 2025-10-24 23:35:46 +05:30
parent b78713df41
commit 2030cebdc0
30 changed files with 2129 additions and 1128 deletions

View File

@ -56,7 +56,5 @@ AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_REGION=us-east-1
## Trigger ##
TRIGGER_PROJECT_ID=
TRIGGER_SECRET_KEY=
TRIGGER_API_URL=http://host.docker.internal:8030
QUEUE_PROVIDER=bullmq

View File

@ -0,0 +1,49 @@
import Redis, { type RedisOptions } from "ioredis";
let redisConnection: Redis | null = null;
/**
* Get or create a Redis connection for BullMQ
* This connection is shared across all queues and workers
*/
export function getRedisConnection() {
if (redisConnection) {
return redisConnection;
}
// Dynamically import ioredis only when needed
const redisConfig: RedisOptions = {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT as string),
maxRetriesPerRequest: null, // Required for BullMQ
enableReadyCheck: false, // Required for BullMQ
};
// Add TLS configuration if not disabled
if (!process.env.REDIS_TLS_DISABLED) {
redisConfig.tls = {};
}
redisConnection = new Redis(redisConfig);
redisConnection.on("error", (error) => {
console.error("Redis connection error:", error);
});
redisConnection.on("connect", () => {
console.log("Redis connected successfully");
});
return redisConnection;
}
/**
* Close the Redis connection (useful for graceful shutdown)
*/
export async function closeRedisConnection(): Promise<void> {
if (redisConnection) {
await redisConnection.quit();
redisConnection = null;
}
}

View File

@ -0,0 +1,115 @@
/**
* BullMQ Queues
*
* All queue definitions for the BullMQ implementation
*/
import { Queue } from "bullmq";
import { getRedisConnection } from "../connection";
/**
* Episode ingestion queue
* Handles individual episode ingestion (including document chunks)
*/
export const ingestQueue = new Queue("ingest-queue", {
connection: getRedisConnection(),
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
removeOnComplete: {
age: 3600, // Keep completed jobs for 1 hour
count: 1000, // Keep last 1000 completed jobs
},
removeOnFail: {
age: 86400, // Keep failed jobs for 24 hours
},
},
});
/**
* Document ingestion queue
* Handles document-level ingestion with differential processing
*/
export const documentIngestQueue = new Queue("document-ingest-queue", {
connection: getRedisConnection(),
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
removeOnComplete: {
age: 3600,
count: 1000,
},
removeOnFail: {
age: 86400,
},
},
});
/**
* Conversation title creation queue
*/
export const conversationTitleQueue = new Queue("conversation-title-queue", {
connection: getRedisConnection(),
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
removeOnComplete: {
age: 3600,
count: 1000,
},
removeOnFail: {
age: 86400,
},
},
});
/**
* Deep search queue
*/
export const deepSearchQueue = new Queue("deep-search-queue", {
connection: getRedisConnection(),
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
removeOnComplete: {
age: 3600,
count: 1000,
},
removeOnFail: {
age: 86400,
},
},
});
/**
* Session compaction queue
*/
export const sessionCompactionQueue = new Queue("session-compaction-queue", {
connection: getRedisConnection(),
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 2000,
},
removeOnComplete: {
age: 3600,
count: 1000,
},
removeOnFail: {
age: 86400,
},
},
});

View File

@ -0,0 +1,43 @@
/**
* BullMQ Worker Startup Script
*
* This script starts all BullMQ workers for processing background jobs.
* Run this as a separate process alongside your main application.
*
* Usage:
* tsx apps/webapp/app/bullmq/start-workers.ts
*/
import { logger } from "~/services/logger.service";
import {
ingestWorker,
documentIngestWorker,
conversationTitleWorker,
deepSearchWorker,
sessionCompactionWorker,
closeAllWorkers,
} from "./workers";
export async function startWorkers() {}
// Handle graceful shutdown
process.on("SIGTERM", async () => {
logger.log("SIGTERM received, closing workers gracefully...");
await closeAllWorkers();
process.exit(0);
});
process.on("SIGINT", async () => {
logger.log("SIGINT received, closing workers gracefully...");
await closeAllWorkers();
process.exit(0);
});
// Log worker startup
logger.log("Starting BullMQ workers...");
logger.log(`- Ingest worker: ${ingestWorker.name}`);
logger.log(`- Document ingest worker: ${documentIngestWorker.name}`);
logger.log(`- Conversation title worker: ${conversationTitleWorker.name}`);
logger.log(`- Deep search worker: ${deepSearchWorker.name}`);
logger.log(`- Session compaction worker: ${sessionCompactionWorker.name}`);
logger.log("All BullMQ workers started and listening for jobs");

View File

@ -0,0 +1,134 @@
/**
* BullMQ Job Finder Utilities
*
* Helper functions to find, retrieve, and cancel BullMQ jobs
*/
interface JobInfo {
id: string;
isCompleted: boolean;
status?: string;
}
/**
* Get all active queues
*/
async function getAllQueues() {
const {
ingestQueue,
documentIngestQueue,
conversationTitleQueue,
deepSearchQueue,
sessionCompactionQueue,
} = await import("../queues");
return [
ingestQueue,
documentIngestQueue,
conversationTitleQueue,
deepSearchQueue,
sessionCompactionQueue,
];
}
/**
* Find jobs by tags (metadata stored in job data)
* Since BullMQ doesn't have native tag support like Trigger.dev,
* we search through jobs and check if their data contains the required identifiers
*/
export async function getJobsByTags(
tags: string[],
taskIdentifier?: string,
): Promise<JobInfo[]> {
const queues = await getAllQueues();
const matchingJobs: JobInfo[] = [];
for (const queue of queues) {
// Skip if taskIdentifier is specified and doesn't match queue name
if (taskIdentifier && !queue.name.includes(taskIdentifier)) {
continue;
}
// Get all active and waiting jobs
const [active, waiting, delayed] = await Promise.all([
queue.getActive(),
queue.getWaiting(),
queue.getDelayed(),
]);
const allJobs = [...active, ...waiting, ...delayed];
for (const job of allJobs) {
// Check if job data contains all required tags
const jobData = job.data as any;
const matchesTags = tags.every(
(tag) =>
job.id?.includes(tag) ||
jobData.userId === tag ||
jobData.workspaceId === tag ||
jobData.queueId === tag,
);
if (matchesTags) {
const state = await job.getState();
matchingJobs.push({
id: job.id!,
isCompleted: state === "completed" || state === "failed",
status: state,
});
}
}
}
return matchingJobs;
}
/**
* Get a specific job by ID across all queues
*/
export async function getJobById(jobId: string): Promise<JobInfo | null> {
const queues = await getAllQueues();
for (const queue of queues) {
try {
const job = await queue.getJob(jobId);
if (job) {
const state = await job.getState();
return {
id: job.id!,
isCompleted: state === "completed" || state === "failed",
status: state,
};
}
} catch {
// Job not in this queue, continue
continue;
}
}
return null;
}
/**
* Cancel a job by ID
*/
export async function cancelJobById(jobId: string): Promise<void> {
const queues = await getAllQueues();
for (const queue of queues) {
try {
const job = await queue.getJob(jobId);
if (job) {
const state = await job.getState();
// Only remove if not already completed
if (state !== "completed" && state !== "failed") {
await job.remove();
}
return;
}
} catch {
// Job not in this queue, continue
continue;
}
}
}

View File

@ -0,0 +1,181 @@
/**
* BullMQ Workers
*
* All worker definitions for processing background jobs with BullMQ
*/
import { Worker } from "bullmq";
import { getRedisConnection } from "../connection";
import {
processEpisodeIngestion,
type IngestEpisodePayload,
} from "~/jobs/ingest/ingest-episode.logic";
import {
processDocumentIngestion,
type IngestDocumentPayload,
} from "~/jobs/ingest/ingest-document.logic";
import {
processConversationTitleCreation,
type CreateConversationTitlePayload,
} from "~/jobs/conversation/create-title.logic";
import {
processDeepSearch,
type ProcessDeepSearchPayload,
} from "~/jobs/deep-search/deep-search.logic";
import {
processSessionCompaction,
type SessionCompactionPayload,
} from "~/jobs/session/session-compaction.logic";
import {
enqueueIngestEpisode,
enqueueSpaceAssignment,
enqueueSessionCompaction,
} from "~/lib/queue-adapter.server";
import { logger } from "~/services/logger.service";
/**
* Episode ingestion worker
* Processes individual episode ingestion jobs with per-user concurrency
*
* Note: Per-user concurrency is achieved by using userId as part of the jobId
* when adding jobs to the queue, ensuring only one job per user runs at a time
*/
export const ingestWorker = new Worker(
"ingest-queue",
async (job) => {
const payload = job.data as IngestEpisodePayload;
return await processEpisodeIngestion(
payload,
// Callbacks to enqueue follow-up jobs
enqueueSpaceAssignment,
enqueueSessionCompaction,
);
},
{
connection: getRedisConnection(),
concurrency: 5, // Process up to 5 jobs in parallel
},
);
ingestWorker.on("completed", (job) => {
logger.log(`Job ${job.id} completed`);
});
ingestWorker.on("failed", (job, error) => {
logger.error(`Job ${job?.id} failed: ${error}`);
});
/**
* Document ingestion worker
* Handles document-level ingestion with differential processing
*
* Note: Per-user concurrency is achieved by using userId as part of the jobId
* when adding jobs to the queue
*/
export const documentIngestWorker = new Worker(
"document-ingest-queue",
async (job) => {
const payload = job.data as IngestDocumentPayload;
return await processDocumentIngestion(
payload,
// Callback to enqueue episode ingestion for each chunk
enqueueIngestEpisode,
);
},
{
connection: getRedisConnection(),
concurrency: 3, // Process up to 3 documents in parallel
},
);
documentIngestWorker.on("completed", (job) => {
logger.log(`Document job ${job.id} completed`);
});
documentIngestWorker.on("failed", (job, error) => {
logger.error(`Document job ${job?.id} failed: ${error}`);
});
/**
* Conversation title creation worker
*/
export const conversationTitleWorker = new Worker(
"conversation-title-queue",
async (job) => {
const payload = job.data as CreateConversationTitlePayload;
return await processConversationTitleCreation(payload);
},
{
connection: getRedisConnection(),
concurrency: 10, // Process up to 10 title creations in parallel
},
);
conversationTitleWorker.on("completed", (job) => {
logger.log(`Conversation title job ${job.id} completed`);
});
conversationTitleWorker.on("failed", (job, error) => {
logger.error(`Conversation title job ${job?.id} failed: ${error}`);
});
/**
* Deep search worker (non-streaming version for BullMQ)
*/
export const deepSearchWorker = new Worker(
"deep-search-queue",
async (job) => {
const payload = job.data as ProcessDeepSearchPayload;
return await processDeepSearch(payload);
},
{
connection: getRedisConnection(),
concurrency: 5, // Process up to 5 searches in parallel
},
);
deepSearchWorker.on("completed", (job) => {
logger.log(`Deep search job ${job.id} completed`);
});
deepSearchWorker.on("failed", (job, error) => {
logger.error(`Deep search job ${job?.id} failed: ${error}`);
});
/**
* Session compaction worker
*/
export const sessionCompactionWorker = new Worker(
"session-compaction-queue",
async (job) => {
const payload = job.data as SessionCompactionPayload;
return await processSessionCompaction(payload);
},
{
connection: getRedisConnection(),
concurrency: 3, // Process up to 3 compactions in parallel
},
);
sessionCompactionWorker.on("completed", (job) => {
logger.log(`Session compaction job ${job.id} completed`);
});
sessionCompactionWorker.on("failed", (job, error) => {
logger.error(`Session compaction job ${job?.id} failed: ${error}`);
});
/**
* Graceful shutdown handler
*/
export async function closeAllWorkers(): Promise<void> {
await Promise.all([
ingestWorker.close(),
documentIngestWorker.close(),
conversationTitleWorker.close(),
deepSearchWorker.close(),
sessionCompactionWorker.close(),
]);
logger.log("All BullMQ workers closed");
}

View File

@ -95,6 +95,9 @@ const EnvironmentSchema = z.object({
AWS_ACCESS_KEY_ID: z.string().optional(),
AWS_SECRET_ACCESS_KEY: z.string().optional(),
AWS_REGION: z.string().optional(),
// Queue provider
QUEUE_PROVIDER: z.enum(["trigger", "bullmq"]).default("trigger"),
});
export type Environment = z.infer<typeof EnvironmentSchema>;

View File

@ -0,0 +1,96 @@
import { LLMMappings } from "@core/types";
import { generate } from "~/trigger/chat/stream-utils";
import { conversationTitlePrompt } from "~/trigger/conversation/prompt";
import { prisma } from "~/trigger/utils/prisma";
import { logger } from "~/services/logger.service";
export interface CreateConversationTitlePayload {
conversationId: string;
message: string;
}
export interface CreateConversationTitleResult {
success: boolean;
title?: string;
error?: string;
}
/**
* Core business logic for creating conversation titles
* This is shared between Trigger.dev and BullMQ implementations
*/
export async function processConversationTitleCreation(
payload: CreateConversationTitlePayload,
): Promise<CreateConversationTitleResult> {
try {
let conversationTitleResponse = "";
const gen = generate(
[
{
role: "user",
content: conversationTitlePrompt.replace(
"{{message}}",
payload.message,
),
},
],
false,
() => {},
undefined,
"",
LLMMappings.GPT41,
);
for await (const chunk of gen) {
if (typeof chunk === "string") {
conversationTitleResponse += chunk;
} else if (chunk && typeof chunk === "object" && chunk.message) {
conversationTitleResponse += chunk.message;
}
}
const outputMatch = conversationTitleResponse.match(
/<output>(.*?)<\/output>/s,
);
logger.info(`Conversation title data: ${JSON.stringify(outputMatch)}`);
if (!outputMatch) {
logger.error("No output found in recurrence response");
throw new Error("Invalid response format from AI");
}
const jsonStr = outputMatch[1].trim();
const conversationTitleData = JSON.parse(jsonStr);
if (conversationTitleData) {
await prisma.conversation.update({
where: {
id: payload.conversationId,
},
data: {
title: conversationTitleData.title,
},
});
return {
success: true,
title: conversationTitleData.title,
};
}
return {
success: false,
error: "No title generated",
};
} catch (error: any) {
logger.error(
`Error creating conversation title for ${payload.conversationId}:`,
error,
);
return {
success: false,
error: error.message,
};
}
}

View File

@ -0,0 +1,105 @@
import { type CoreMessage } from "ai";
import { logger } from "~/services/logger.service";
import { nanoid } from "nanoid";
import {
deletePersonalAccessToken,
getOrCreatePersonalAccessToken,
} from "~/trigger/utils/utils";
import { getReActPrompt } from "~/trigger/deep-search/prompt";
import { type DeepSearchPayload, type DeepSearchResponse } from "~/trigger/deep-search/types";
import { createSearchMemoryTool } from "~/trigger/deep-search/utils";
import { run } from "~/trigger/deep-search/deep-search-utils";
import { AgentMessageType } from "~/trigger/chat/types";
export interface ProcessDeepSearchPayload {
content: string;
userId: string;
metadata?: any;
intentOverride?: string;
}
export interface ProcessDeepSearchResult {
success: boolean;
synthesis?: string;
error?: string;
}
/**
* Core business logic for deep search (non-streaming version for BullMQ)
* This is shared logic, but the streaming happens in Trigger.dev via metadata.stream
*/
export async function processDeepSearch(
payload: ProcessDeepSearchPayload,
): Promise<ProcessDeepSearchResult> {
const { content, userId, metadata: meta, intentOverride } = payload;
const randomKeyName = `deepSearch_${nanoid(10)}`;
// Get or create token for search API calls
const pat = await getOrCreatePersonalAccessToken({
name: randomKeyName,
userId: userId as string,
});
if (!pat?.token) {
return {
success: false,
error: "Failed to create personal access token",
};
}
try {
// Create search tool that agent will use
const searchTool = createSearchMemoryTool(pat.token);
// Build initial messages with ReAct prompt
const initialMessages: CoreMessage[] = [
{
role: "system",
content: getReActPrompt(meta, intentOverride),
},
{
role: "user",
content: `CONTENT TO ANALYZE:\n${content}\n\nPlease search my memory for relevant context and synthesize what you find.`,
},
];
// Run the ReAct loop generator
const llmResponse = run(initialMessages, searchTool);
let synthesis = "";
// For BullMQ: iterate without streaming, just accumulate the final synthesis
for await (const step of llmResponse) {
// MESSAGE_CHUNK: Final synthesis - accumulate
if (step.type === AgentMessageType.MESSAGE_CHUNK) {
synthesis += step.message;
}
// STREAM_END: Loop completed
if (step.type === AgentMessageType.STREAM_END) {
break;
}
}
await deletePersonalAccessToken(pat?.id);
// Clean up any remaining tags
synthesis = synthesis
.replace(/<final_response>/gi, "")
.replace(/<\/final_response>/gi, "")
.trim();
return {
success: true,
synthesis,
};
} catch (error: any) {
await deletePersonalAccessToken(pat?.id);
logger.error(`Deep search error: ${error}`);
return {
success: false,
error: error.message,
};
}
}

View File

@ -0,0 +1,289 @@
import { type z } from "zod";
import crypto from "crypto";
import { IngestionStatus } from "@core/database";
import { EpisodeTypeEnum } from "@core/types";
import { logger } from "~/services/logger.service";
import { saveDocument } from "~/services/graphModels/document";
import { type IngestBodyRequest } from "~/lib/ingest.server";
import { DocumentVersioningService } from "~/services/documentVersioning.server";
import { DocumentDifferentialService } from "~/services/documentDiffer.server";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { prisma } from "~/trigger/utils/prisma";
export interface IngestDocumentPayload {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}
export interface IngestDocumentResult {
success: boolean;
error?: string;
}
/**
* Core business logic for document ingestion with differential processing
* This is shared between Trigger.dev and BullMQ implementations
*
* Note: This function should NOT call trigger functions directly for chunk processing.
* Instead, use the enqueueEpisodeIngestion callback to queue episode ingestion jobs.
*/
export async function processDocumentIngestion(
payload: IngestDocumentPayload,
// Callback function for enqueueing episode ingestion for each chunk
enqueueEpisodeIngestion?: (params: {
body: any;
userId: string;
workspaceId: string;
queueId: string;
}) => Promise<{ id?: string }>,
): Promise<IngestDocumentResult> {
const startTime = Date.now();
try {
logger.log(`Processing document for user ${payload.userId}`, {
contentLength: payload.body.episodeBody.length,
});
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.PROCESSING,
},
});
const documentBody = payload.body;
// Step 1: Initialize services and prepare document version
const versioningService = new DocumentVersioningService();
const differentialService = new DocumentDifferentialService();
const knowledgeGraphService = new KnowledgeGraphService();
const {
documentNode: document,
versionInfo,
chunkedDocument,
} = await versioningService.prepareDocumentVersion(
documentBody.sessionId!,
payload.userId,
documentBody.metadata?.documentTitle?.toString() || "Untitled Document",
documentBody.episodeBody,
documentBody.source,
documentBody.metadata || {},
);
logger.log(`Document version analysis:`, {
version: versionInfo.newVersion,
isNewDocument: versionInfo.isNewDocument,
hasContentChanged: versionInfo.hasContentChanged,
changePercentage: versionInfo.chunkLevelChanges.changePercentage,
changedChunks: versionInfo.chunkLevelChanges.changedChunkIndices.length,
totalChunks: versionInfo.chunkLevelChanges.totalChunks,
});
// Step 2: Determine processing strategy
const differentialDecision =
await differentialService.analyzeDifferentialNeed(
documentBody.episodeBody,
versionInfo.existingDocument,
chunkedDocument,
);
logger.log(`Differential analysis:`, {
shouldUseDifferential: differentialDecision.shouldUseDifferential,
strategy: differentialDecision.strategy,
reason: differentialDecision.reason,
documentSizeTokens: differentialDecision.documentSizeTokens,
});
// Early return for unchanged documents
if (differentialDecision.strategy === "skip_processing") {
logger.log("Document content unchanged, skipping processing");
return {
success: true,
};
}
// Step 3: Save the new document version
await saveDocument(document);
// Step 3.1: Invalidate statements from previous document version if it exists
let invalidationResults = null;
if (versionInfo.existingDocument && versionInfo.hasContentChanged) {
logger.log(
`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`,
);
invalidationResults =
await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion(
{
previousDocumentUuid: versionInfo.existingDocument.uuid,
newDocumentContent: documentBody.episodeBody,
userId: payload.userId,
invalidatedBy: document.uuid,
semanticSimilarityThreshold: 0.75, // Configurable threshold
},
);
logger.log(`Statement invalidation completed:`, {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
});
}
logger.log(
`Document chunked into ${chunkedDocument.chunks.length} chunks`,
);
// Step 4: Process chunks based on differential strategy
let chunksToProcess = chunkedDocument.chunks;
let processingMode = "full";
if (
differentialDecision.shouldUseDifferential &&
differentialDecision.strategy === "chunk_level_diff"
) {
// Only process changed chunks
const chunkComparisons = differentialService.getChunkComparisons(
versionInfo.existingDocument!,
chunkedDocument,
);
const changedIndices =
differentialService.getChunksNeedingReprocessing(chunkComparisons);
chunksToProcess = chunkedDocument.chunks.filter((chunk) =>
changedIndices.includes(chunk.chunkIndex),
);
processingMode = "differential";
logger.log(
`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`,
);
} else if (differentialDecision.strategy === "full_reingest") {
// Process all chunks
processingMode = "full";
logger.log(
`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`,
);
}
// Step 5: Queue chunks for processing
const episodeHandlers = [];
if (enqueueEpisodeIngestion) {
for (const chunk of chunksToProcess) {
const chunkEpisodeData = {
episodeBody: chunk.content,
referenceTime: documentBody.referenceTime,
metadata: {
...documentBody.metadata,
processingMode,
differentialStrategy: differentialDecision.strategy,
chunkHash: chunk.contentHash,
documentTitle:
documentBody.metadata?.documentTitle?.toString() ||
"Untitled Document",
chunkIndex: chunk.chunkIndex,
documentUuid: document.uuid,
},
source: documentBody.source,
spaceIds: documentBody.spaceIds,
sessionId: documentBody.sessionId,
type: EpisodeTypeEnum.DOCUMENT,
};
const episodeHandler = await enqueueEpisodeIngestion({
body: chunkEpisodeData,
userId: payload.userId,
workspaceId: payload.workspaceId,
queueId: payload.queueId,
});
if (episodeHandler.id) {
episodeHandlers.push(episodeHandler.id);
logger.log(
`Queued chunk ${chunk.chunkIndex + 1} for ${processingMode} processing`,
{
handlerId: episodeHandler.id,
chunkSize: chunk.content.length,
chunkHash: chunk.contentHash,
},
);
}
}
}
// Calculate cost savings
const costSavings = differentialService.calculateCostSavings(
chunkedDocument.chunks.length,
chunksToProcess.length,
);
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: {
documentUuid: document.uuid,
version: versionInfo.newVersion,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
processingMode,
differentialStrategy: differentialDecision.strategy,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: null,
episodes: [],
episodeHandlers,
},
status: IngestionStatus.PROCESSING,
},
});
const processingTimeMs = Date.now() - startTime;
logger.log(
`Document differential processing completed in ${processingTimeMs}ms`,
{
documentUuid: document.uuid,
version: versionInfo.newVersion,
processingMode,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: "No previous version",
},
);
return { success: true };
} catch (err: any) {
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
error: err.message,
status: IngestionStatus.FAILED,
},
});
logger.error(
`Error processing document for user ${payload.userId}:`,
err,
);
return { success: false, error: err.message };
}
}

View File

@ -0,0 +1,266 @@
import { z } from "zod";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { linkEpisodeToDocument } from "~/services/graphModels/document";
import { IngestionStatus } from "@core/database";
import { logger } from "~/services/logger.service";
import { prisma } from "~/trigger/utils/prisma";
import { EpisodeType } from "@core/types";
import { deductCredits, hasCredits } from "~/trigger/utils/utils";
import { assignEpisodesToSpace } from "~/services/graphModels/space";
export const IngestBodyRequest = z.object({
episodeBody: z.string(),
referenceTime: z.string(),
metadata: z.record(z.union([z.string(), z.number(), z.boolean()])).optional(),
source: z.string(),
spaceIds: z.array(z.string()).optional(),
sessionId: z.string().optional(),
type: z
.enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT])
.default(EpisodeType.CONVERSATION),
});
export interface IngestEpisodePayload {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}
export interface IngestEpisodeResult {
success: boolean;
episodeDetails?: any;
error?: string;
}
/**
* Core business logic for ingesting a single episode
* This is shared between Trigger.dev and BullMQ implementations
*
* Note: This function should NOT call trigger functions directly.
* Instead, return data that indicates follow-up jobs are needed,
* and let the caller (Trigger task or BullMQ worker) handle job queueing.
*/
export async function processEpisodeIngestion(
payload: IngestEpisodePayload,
// Callback functions for enqueueing follow-up jobs
enqueueSpaceAssignment?: (params: {
userId: string;
workspaceId: string;
mode: "episode";
episodeIds: string[];
}) => Promise<any>,
enqueueSessionCompaction?: (params: {
userId: string;
sessionId: string;
source: string;
}) => Promise<any>,
): Promise<IngestEpisodeResult> {
try {
logger.log(`Processing job for user ${payload.userId}`);
// Check if workspace has sufficient credits before processing
const hasSufficientCredits = await hasCredits(
payload.workspaceId,
"addEpisode",
);
if (!hasSufficientCredits) {
logger.warn(`Insufficient credits for workspace ${payload.workspaceId}`);
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.NO_CREDITS,
error:
"Insufficient credits. Please upgrade your plan or wait for your credits to reset.",
},
});
return {
success: false,
error: "Insufficient credits",
};
}
const ingestionQueue = await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.PROCESSING,
},
});
const knowledgeGraphService = new KnowledgeGraphService();
const episodeBody = payload.body as any;
const episodeDetails = await knowledgeGraphService.addEpisode(
{
...episodeBody,
userId: payload.userId,
},
prisma,
);
// Link episode to document if it's a document chunk
if (
episodeBody.type === EpisodeType.DOCUMENT &&
episodeBody.metadata.documentUuid &&
episodeDetails.episodeUuid
) {
try {
await linkEpisodeToDocument(
episodeDetails.episodeUuid,
episodeBody.metadata.documentUuid,
episodeBody.metadata.chunkIndex || 0,
);
logger.log(
`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.metadata.documentUuid} at chunk ${episodeBody.metadata.chunkIndex || 0}`,
);
} catch (error) {
logger.error(`Failed to link episode to document:`, {
error,
episodeUuid: episodeDetails.episodeUuid,
documentUuid: episodeBody.metadata.documentUuid,
});
}
}
let finalOutput = episodeDetails;
let episodeUuids: string[] = episodeDetails.episodeUuid
? [episodeDetails.episodeUuid]
: [];
let currentStatus: IngestionStatus = IngestionStatus.COMPLETED;
if (episodeBody.type === EpisodeType.DOCUMENT) {
const currentOutput = ingestionQueue.output as any;
currentOutput.episodes.push(episodeDetails);
episodeUuids = currentOutput.episodes.map(
(episode: any) => episode.episodeUuid,
);
finalOutput = {
...currentOutput,
};
if (currentOutput.episodes.length !== currentOutput.totalChunks) {
currentStatus = IngestionStatus.PROCESSING;
}
}
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: finalOutput,
status: currentStatus,
},
});
// Deduct credits for episode creation
if (currentStatus === IngestionStatus.COMPLETED) {
await deductCredits(
payload.workspaceId,
"addEpisode",
finalOutput.statementsCreated,
);
}
// Handle space assignment after successful ingestion
try {
// If spaceIds were explicitly provided, immediately assign the episode to those spaces
if (
episodeBody.spaceIds &&
episodeBody.spaceIds.length > 0 &&
episodeDetails.episodeUuid
) {
logger.info(`Assigning episode to explicitly provided spaces`, {
userId: payload.userId,
episodeId: episodeDetails.episodeUuid,
spaceIds: episodeBody.spaceIds,
});
// Assign episode to each space
for (const spaceId of episodeBody.spaceIds) {
await assignEpisodesToSpace(
[episodeDetails.episodeUuid],
spaceId,
payload.userId,
);
}
logger.info(
`Skipping LLM space assignment - episode explicitly assigned to ${episodeBody.spaceIds.length} space(s)`,
);
} else {
// Only trigger automatic LLM space assignment if no explicit spaceIds were provided
logger.info(
`Triggering LLM space assignment after successful ingestion`,
{
userId: payload.userId,
workspaceId: payload.workspaceId,
episodeId: episodeDetails?.episodeUuid,
},
);
if (
episodeDetails.episodeUuid &&
currentStatus === IngestionStatus.COMPLETED &&
enqueueSpaceAssignment
) {
await enqueueSpaceAssignment({
userId: payload.userId,
workspaceId: payload.workspaceId,
mode: "episode",
episodeIds: episodeUuids,
});
}
}
} catch (assignmentError) {
// Don't fail the ingestion if assignment fails
logger.warn(`Failed to trigger space assignment after ingestion:`, {
error: assignmentError,
userId: payload.userId,
episodeId: episodeDetails?.episodeUuid,
});
}
// Auto-trigger session compaction if episode has sessionId
try {
if (
episodeBody.sessionId &&
currentStatus === IngestionStatus.COMPLETED &&
enqueueSessionCompaction
) {
logger.info(`Checking if session compaction should be triggered`, {
userId: payload.userId,
sessionId: episodeBody.sessionId,
source: episodeBody.source,
});
await enqueueSessionCompaction({
userId: payload.userId,
sessionId: episodeBody.sessionId,
source: episodeBody.source,
});
}
} catch (compactionError) {
// Don't fail the ingestion if compaction fails
logger.warn(`Failed to trigger session compaction after ingestion:`, {
error: compactionError,
userId: payload.userId,
sessionId: episodeBody.sessionId,
});
}
return { success: true, episodeDetails };
} catch (err: any) {
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
error: err.message,
status: IngestionStatus.FAILED,
},
});
logger.error(`Error processing job for user ${payload.userId}:`, err);
return { success: false, error: err.message };
}
}

View File

@ -0,0 +1,455 @@
import { logger } from "~/services/logger.service";
import type { CoreMessage } from "ai";
import { z } from "zod";
import { getEmbedding, makeModelCall } from "~/lib/model.server";
import {
getCompactedSessionBySessionId,
linkEpisodesToCompact,
getSessionEpisodes,
type CompactedSessionNode,
type SessionEpisodeData,
saveCompactedSession,
} from "~/services/graphModels/compactedSession";
export interface SessionCompactionPayload {
userId: string;
sessionId: string;
source: string;
triggerSource?: "auto" | "manual" | "threshold";
}
export interface SessionCompactionResult {
success: boolean;
compactionResult?: {
compactUuid: string;
sessionId: string;
summary: string;
episodeCount: number;
startTime: Date;
endTime: Date;
confidence: number;
compressionRatio: number;
};
reason?: string;
episodeCount?: number;
error?: string;
}
// Zod schema for LLM response validation
const CompactionResultSchema = z.object({
summary: z.string().describe("Consolidated narrative of the entire session"),
confidence: z
.number()
.min(0)
.max(1)
.describe("Confidence score of the compaction quality"),
});
const CONFIG = {
minEpisodesForCompaction: 5, // Minimum episodes to trigger compaction
compactionThreshold: 1, // Trigger after N new episodes
maxEpisodesPerBatch: 50, // Process in batches if needed
};
/**
* Core business logic for session compaction
* This is shared between Trigger.dev and BullMQ implementations
*/
export async function processSessionCompaction(
payload: SessionCompactionPayload,
): Promise<SessionCompactionResult> {
const { userId, sessionId, source, triggerSource = "auto" } = payload;
logger.info(`Starting session compaction`, {
userId,
sessionId,
source,
triggerSource,
});
try {
// Check if compaction already exists
const existingCompact = await getCompactedSessionBySessionId(
sessionId,
userId,
);
// Fetch all episodes for this session
const episodes = await getSessionEpisodes(
sessionId,
userId,
existingCompact?.endTime,
);
console.log("episodes", episodes.length);
// Check if we have enough episodes
if (!existingCompact && episodes.length < CONFIG.minEpisodesForCompaction) {
logger.info(`Not enough episodes for compaction`, {
sessionId,
episodeCount: episodes.length,
minRequired: CONFIG.minEpisodesForCompaction,
});
return {
success: false,
reason: "insufficient_episodes",
episodeCount: episodes.length,
};
} else if (
existingCompact &&
episodes.length <
CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold
) {
logger.info(`Not enough new episodes for compaction`, {
sessionId,
episodeCount: episodes.length,
minRequired:
CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold,
});
return {
success: false,
reason: "insufficient_new_episodes",
episodeCount: episodes.length,
};
}
// Generate or update compaction
const compactionResult = existingCompact
? await updateCompaction(existingCompact, episodes, userId)
: await createCompaction(sessionId, episodes, userId, source);
logger.info(`Session compaction completed`, {
sessionId,
compactUuid: compactionResult.uuid,
episodeCount: compactionResult.episodeCount,
compressionRatio: compactionResult.compressionRatio,
});
return {
success: true,
compactionResult: {
compactUuid: compactionResult.uuid,
sessionId: compactionResult.sessionId,
summary: compactionResult.summary,
episodeCount: compactionResult.episodeCount,
startTime: compactionResult.startTime,
endTime: compactionResult.endTime,
confidence: compactionResult.confidence,
compressionRatio: compactionResult.compressionRatio,
},
};
} catch (error) {
logger.error(`Session compaction failed`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
return {
success: false,
error: error instanceof Error ? error.message : String(error),
};
}
}
/**
* Create new compaction
*/
async function createCompaction(
sessionId: string,
episodes: SessionEpisodeData[],
userId: string,
source: string,
): Promise<CompactedSessionNode> {
logger.info(`Creating new compaction`, {
sessionId,
episodeCount: episodes.length,
});
// Generate compaction using LLM
const compactionData = await generateCompaction(episodes, null);
// Generate embedding for summary
const summaryEmbedding = await getEmbedding(compactionData.summary);
// Create CompactedSession node using graph model
const compactUuid = crypto.randomUUID();
const now = new Date();
const startTime = new Date(episodes[0].createdAt);
const endTime = new Date(episodes[episodes.length - 1].createdAt);
const episodeUuids = episodes.map((e) => e.uuid);
const compressionRatio = episodes.length / 1;
const compactNode: CompactedSessionNode = {
uuid: compactUuid,
sessionId,
summary: compactionData.summary,
summaryEmbedding,
episodeCount: episodes.length,
startTime,
endTime,
createdAt: now,
confidence: compactionData.confidence,
userId,
source,
compressionRatio,
metadata: { triggerType: "create" },
};
console.log("compactNode", compactNode);
// Use graph model functions
await saveCompactedSession(compactNode);
await linkEpisodesToCompact(compactUuid, episodeUuids, userId);
logger.info(`Compaction created`, {
compactUuid,
episodeCount: episodes.length,
});
return compactNode;
}
/**
* Update existing compaction with new episodes
*/
async function updateCompaction(
existingCompact: CompactedSessionNode,
newEpisodes: SessionEpisodeData[],
userId: string,
): Promise<CompactedSessionNode> {
logger.info(`Updating existing compaction`, {
compactUuid: existingCompact.uuid,
newEpisodeCount: newEpisodes.length,
});
// Generate updated compaction using LLM (merging)
const compactionData = await generateCompaction(
newEpisodes,
existingCompact.summary,
);
// Generate new embedding for updated summary
const summaryEmbedding = await getEmbedding(compactionData.summary);
// Update CompactedSession node using graph model
const now = new Date();
const endTime = newEpisodes[newEpisodes.length - 1].createdAt;
const totalEpisodeCount = existingCompact.episodeCount + newEpisodes.length;
const compressionRatio = totalEpisodeCount / 1;
const episodeUuids = newEpisodes.map((e) => e.uuid);
const updatedNode: CompactedSessionNode = {
...existingCompact,
summary: compactionData.summary,
summaryEmbedding,
episodeCount: totalEpisodeCount,
endTime,
updatedAt: now,
confidence: compactionData.confidence,
compressionRatio,
metadata: { triggerType: "update", newEpisodesAdded: newEpisodes.length },
};
// Use graph model functions
await saveCompactedSession(updatedNode);
await linkEpisodesToCompact(existingCompact.uuid, episodeUuids, userId);
logger.info(`Compaction updated`, {
compactUuid: existingCompact.uuid,
totalEpisodeCount,
});
return updatedNode;
}
/**
* Generate compaction using LLM (similar to Claude Code's compact approach)
*/
async function generateCompaction(
episodes: SessionEpisodeData[],
existingSummary: string | null,
): Promise<z.infer<typeof CompactionResultSchema>> {
const systemPrompt = createCompactionSystemPrompt();
const userPrompt = createCompactionUserPrompt(episodes, existingSummary);
const messages: CoreMessage[] = [
{ role: "system", content: systemPrompt },
{ role: "user", content: userPrompt },
];
logger.info(`Generating compaction with LLM`, {
episodeCount: episodes.length,
hasExistingSummary: !!existingSummary,
});
try {
let responseText = "";
await makeModelCall(
false,
messages,
(text: string) => {
responseText = text;
},
undefined,
"high",
);
return parseCompactionResponse(responseText);
} catch (error) {
logger.error(`Failed to generate compaction`, {
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
/**
* System prompt for compaction (for agent recall/context retrieval)
*/
function createCompactionSystemPrompt(): string {
return `You are a session compaction specialist. Your task is to create a rich, informative summary that will help AI agents understand what happened in this conversation session when they need context for future interactions.
## PURPOSE
This summary will be retrieved by AI agents when the user references this session in future conversations. The agent needs enough context to:
- Understand what was discussed and why
- Know what decisions were made and their rationale
- Grasp the outcome and current state
- Have relevant technical details to provide informed responses
## COMPACTION GOALS
1. **Comprehensive Context**: Capture all important information that might be referenced later
2. **Decision Documentation**: Clearly state what was decided, why, and what alternatives were considered
3. **Technical Details**: Include specific implementations, tools, configurations, and technical choices
4. **Outcome Clarity**: Make it clear what was accomplished and what the final state is
5. **Evolution Tracking**: Show how thinking or decisions evolved during the session
## COMPACTION RULES
1. **Be Information-Dense**: Pack useful details without fluff or repetition
2. **Structure Chronologically**: Start with problem/question, show progression, end with outcome
3. **Highlight Key Points**: Emphasize decisions, implementations, results, and learnings
4. **Include Specifics**: Names of libraries, specific configurations, metrics, numbers matter
5. **Resolve Contradictions**: Always use the most recent/final version when information conflicts
## OUTPUT REQUIREMENTS
- **summary**: A detailed, information-rich narrative that tells the complete story
- Structure naturally based on content - use as many paragraphs as needed
- Each distinct topic, decision, or phase should get its own paragraph(s)
- Start with context and initial problem/question
- Progress chronologically through discussions, decisions, and implementations
- **Final paragraph MUST**: State the outcome, results, and current state
- Don't artificially limit length - capture everything important
- **confidence**: Score (0-1) reflecting how well this summary captures the session's essence
Your response MUST be valid JSON wrapped in <output></output> tags.
## KEY PRINCIPLES
- Write for an AI agent that needs to help the user in future conversations
- Include technical specifics that might be referenced (library names, configurations, metrics)
- Make outcomes and current state crystal clear in the final paragraph
- Show the reasoning behind decisions, not just the decisions themselves
- Be comprehensive but concise - every sentence should add value
- Each major topic or phase deserves its own paragraph(s)
- Don't compress too much - agents need the details
`;
}
/**
* User prompt for compaction
*/
function createCompactionUserPrompt(
episodes: SessionEpisodeData[],
existingSummary: string | null,
): string {
let prompt = "";
if (existingSummary) {
prompt += `## EXISTING SUMMARY (from previous compaction)\n\n${existingSummary}\n\n`;
prompt += `## NEW EPISODES (to merge into existing summary)\n\n`;
} else {
prompt += `## SESSION EPISODES (to compact)\n\n`;
}
episodes.forEach((episode, index) => {
const timestamp = new Date(episode.validAt).toISOString();
prompt += `### Episode ${index + 1} (${timestamp})\n`;
prompt += `Source: ${episode.source}\n`;
prompt += `Content:\n${episode.originalContent}\n\n`;
});
if (existingSummary) {
prompt += `\n## INSTRUCTIONS\n\n`;
prompt += `Merge the new episodes into the existing summary. Update facts, add new information, and maintain narrative coherence. Ensure the consolidated summary reflects the complete session including both old and new content.\n`;
} else {
prompt += `\n## INSTRUCTIONS\n\n`;
prompt += `Create a compact summary of this entire session. Consolidate all information into a coherent narrative with deduplicated key facts.\n`;
}
return prompt;
}
/**
* Parse LLM response for compaction
*/
function parseCompactionResponse(
response: string,
): z.infer<typeof CompactionResultSchema> {
try {
// Extract content from <output> tags
const outputMatch = response.match(/<output>([\s\S]*?)<\/output>/);
if (!outputMatch) {
logger.warn("No <output> tags found in LLM compaction response");
logger.debug("Full LLM response:", { response });
throw new Error("Invalid LLM response format - missing <output> tags");
}
let jsonContent = outputMatch[1].trim();
// Remove markdown code blocks if present
jsonContent = jsonContent.replace(/```json\n?/g, "").replace(/```\n?/g, "");
const parsed = JSON.parse(jsonContent);
// Validate with schema
const validated = CompactionResultSchema.parse(parsed);
return validated;
} catch (error) {
logger.error("Failed to parse compaction response", {
error: error instanceof Error ? error.message : String(error),
response: response.substring(0, 500),
});
throw new Error(`Failed to parse compaction response: ${error}`);
}
}
/**
* Helper function to check if compaction should be triggered
*/
export async function shouldTriggerCompaction(
sessionId: string,
userId: string,
): Promise<boolean> {
const existingCompact = await getCompactedSessionBySessionId(
sessionId,
userId,
);
if (!existingCompact) {
// Check if we have enough episodes for initial compaction
const episodes = await getSessionEpisodes(sessionId, userId);
return episodes.length >= CONFIG.minEpisodesForCompaction;
}
// Check if we have enough new episodes to update
const newEpisodes = await getSessionEpisodes(
sessionId,
userId,
existingCompact.endTime,
);
return newEpisodes.length >= CONFIG.compactionThreshold;
}

View File

@ -4,8 +4,11 @@ import { EpisodeType } from "@core/types";
import { type z } from "zod";
import { prisma } from "~/db.server";
import { hasCredits } from "~/services/billing.server";
import { type IngestBodyRequest, ingestTask } from "~/trigger/ingest/ingest";
import { ingestDocumentTask } from "~/trigger/ingest/ingest-document";
import { type IngestBodyRequest } from "~/trigger/ingest/ingest";
import {
enqueueIngestDocument,
enqueueIngestEpisode,
} from "~/lib/queue-adapter.server";
export const addToQueue = async (
rawBody: z.infer<typeof IngestBodyRequest>,
@ -51,36 +54,22 @@ export const addToQueue = async (
let handler;
if (body.type === EpisodeType.DOCUMENT) {
handler = await ingestDocumentTask.trigger(
{
body,
userId,
workspaceId: user.Workspace.id,
queueId: queuePersist.id,
},
{
queue: "document-ingestion-queue",
concurrencyKey: userId,
tags: [user.id, queuePersist.id],
},
);
handler = await enqueueIngestDocument({
body,
userId,
workspaceId: user.Workspace.id,
queueId: queuePersist.id,
});
} else if (body.type === EpisodeType.CONVERSATION) {
handler = await ingestTask.trigger(
{
body,
userId,
workspaceId: user.Workspace.id,
queueId: queuePersist.id,
},
{
queue: "ingestion-queue",
concurrencyKey: userId,
tags: [user.id, queuePersist.id],
},
);
handler = await enqueueIngestEpisode({
body,
userId,
workspaceId: user.Workspace.id,
queueId: queuePersist.id,
});
}
return { id: handler?.id, token: handler?.publicAccessToken };
return { id: handler?.id, publicAccessToken: handler?.token };
};
export { IngestBodyRequest };

View File

@ -0,0 +1,196 @@
/**
* Queue Adapter
*
* This module provides a unified interface for queueing background jobs,
* supporting both Trigger.dev and BullMQ backends based on the QUEUE_PROVIDER
* environment variable.
*
* Usage:
* - Set QUEUE_PROVIDER="trigger" for Trigger.dev (default, good for production scaling)
* - Set QUEUE_PROVIDER="bullmq" for BullMQ (good for open-source deployments)
*/
import { env } from "~/env.server";
import type { z } from "zod";
import type { IngestBodyRequest } from "~/jobs/ingest/ingest-episode.logic";
import type { CreateConversationTitlePayload } from "~/jobs/conversation/create-title.logic";
import type { ProcessDeepSearchPayload } from "~/jobs/deep-search/deep-search.logic";
import type { SessionCompactionPayload } from "~/jobs/session/session-compaction.logic";
type QueueProvider = "trigger" | "bullmq";
/**
* Enqueue episode ingestion job
*/
export async function enqueueIngestEpisode(payload: {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}): Promise<{ id?: string; token?: string }> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { ingestTask } = await import("~/trigger/ingest/ingest");
const handler = await ingestTask.trigger(payload, {
queue: "ingestion-queue",
concurrencyKey: payload.userId,
tags: [payload.userId, payload.queueId],
});
return { id: handler.id, token: handler.publicAccessToken };
} else {
// BullMQ
const { ingestQueue } = await import("~/bullmq/queues");
const job = await ingestQueue.add("ingest-episode", payload, {
jobId: payload.queueId,
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
});
return { id: job.id };
}
}
/**
* Enqueue document ingestion job
*/
export async function enqueueIngestDocument(payload: {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}): Promise<{ id?: string; token?: string }> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { ingestDocumentTask } = await import(
"~/trigger/ingest/ingest-document"
);
const handler = await ingestDocumentTask.trigger(payload, {
queue: "document-ingestion-queue",
concurrencyKey: payload.userId,
tags: [payload.userId, payload.queueId],
});
return { id: handler.id, token: handler.publicAccessToken };
} else {
// BullMQ
const { documentIngestQueue } = await import("~/bullmq/queues");
const job = await documentIngestQueue.add("ingest-document", payload, {
jobId: payload.queueId,
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
});
return { id: job.id };
}
}
/**
* Enqueue conversation title creation job
*/
export async function enqueueCreateConversationTitle(
payload: CreateConversationTitlePayload,
): Promise<{ id?: string }> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { createConversationTitle } = await import(
"~/trigger/conversation/create-conversation-title"
);
const handler = await createConversationTitle.trigger(payload);
return { id: handler.id };
} else {
// BullMQ
const { conversationTitleQueue } = await import("~/bullmq/queues");
const job = await conversationTitleQueue.add(
"create-conversation-title",
payload,
{
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
},
);
return { id: job.id };
}
}
/**
* Enqueue deep search job
*/
export async function enqueueDeepSearch(
payload: ProcessDeepSearchPayload,
): Promise<{ id?: string }> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { deepSearch } = await import("~/trigger/deep-search");
const handler = await deepSearch.trigger({
content: payload.content,
userId: payload.userId,
stream: true,
metadata: payload.metadata,
intentOverride: payload.intentOverride,
});
return { id: handler.id };
} else {
// BullMQ
const { deepSearchQueue } = await import("~/bullmq/queues");
const job = await deepSearchQueue.add("deep-search", payload, {
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
});
return { id: job.id };
}
}
/**
* Enqueue session compaction job
*/
export async function enqueueSessionCompaction(
payload: SessionCompactionPayload,
): Promise<{ id?: string }> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { sessionCompactionTask } = await import(
"~/trigger/session/session-compaction"
);
const handler = await sessionCompactionTask.trigger(payload);
return { id: handler.id };
} else {
// BullMQ
const { sessionCompactionQueue } = await import("~/bullmq/queues");
const job = await sessionCompactionQueue.add(
"session-compaction",
payload,
{
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
},
);
return { id: job.id };
}
}
/**
* Enqueue space assignment job
* (Helper for common job logic to call)
*/
export async function enqueueSpaceAssignment(payload: {
userId: string;
workspaceId: string;
mode: "episode";
episodeIds: string[];
}): Promise<void> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { triggerSpaceAssignment } = await import(
"~/trigger/spaces/space-assignment"
);
await triggerSpaceAssignment(payload);
} else {
// For BullMQ, space assignment is not implemented yet
// You can add it later when needed
console.warn("Space assignment not implemented for BullMQ yet");
}
}

View File

@ -1,7 +1,7 @@
import { z } from "zod";
import { json } from "@remix-run/node";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { deepSearch } from "~/trigger/deep-search";
import { enqueueDeepSearch } from "~/lib/queue-adapter.server";
import { runs } from "@trigger.dev/sdk";
const DeepSearchBodySchema = z.object({
@ -30,7 +30,7 @@ const { action, loader } = createActionApiRoute(
async ({ body, authentication }) => {
let trigger;
if (!body.stream) {
trigger = await deepSearch.trigger({
trigger = await enqueueDeepSearch({
content: body.content,
userId: authentication.userId,
stream: body.stream,
@ -40,7 +40,7 @@ const { action, loader } = createActionApiRoute(
return json(trigger);
} else {
const runHandler = await deepSearch.trigger({
const runHandler = await enqueueDeepSearch({
content: body.content,
userId: authentication.userId,
stream: body.stream,

View File

@ -6,7 +6,7 @@ import {
deleteIngestionQueue,
getIngestionQueue,
} from "~/services/ingestionLogs.server";
import { runs, tasks } from "@trigger.dev/sdk";
import { findRunningJobs, cancelJob } from "~/services/jobManager.server";
export const DeleteEpisodeBodyRequest = z.object({
id: z.string(),
@ -37,19 +37,15 @@ const { action, loader } = createHybridActionApiRoute(
}
const output = ingestionQueue.output as any;
const runningTasks = await runs.list({
tag: [authentication.userId, ingestionQueue.id],
const runningTasks = await findRunningJobs({
tags: [authentication.userId, ingestionQueue.id],
taskIdentifier: "ingest-episode",
});
const latestTask = runningTasks.data.find(
(task) =>
task.tags.includes(authentication.userId) &&
task.tags.includes(ingestionQueue.id),
);
const latestTask = runningTasks[0];
if (latestTask && !latestTask?.isCompleted) {
runs.cancel(latestTask?.id as string);
if (latestTask && !latestTask.isCompleted) {
await cancelJob(latestTask.id);
}
let result;

View File

@ -1,5 +1,4 @@
import { json } from "@remix-run/node";
import { runs } from "@trigger.dev/sdk";
import { z } from "zod";
import { deleteEpisodeWithRelatedNodes } from "~/services/graphModels/episode";
import {
@ -11,6 +10,7 @@ import {
createHybridActionApiRoute,
createHybridLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
import { findRunningJobs, cancelJob } from "~/services/jobManager.server";
// Schema for space ID parameter
const LogParamsSchema = z.object({
@ -59,19 +59,15 @@ const { action } = createHybridActionApiRoute(
}
const output = ingestionQueue.output as any;
const runningTasks = await runs.list({
tag: [authentication.userId, ingestionQueue.id],
const runningTasks = await findRunningJobs({
tags: [authentication.userId, ingestionQueue.id],
taskIdentifier: "ingest-episode",
});
const latestTask = runningTasks.data.find(
(task) =>
task.tags.includes(authentication.userId) &&
task.tags.includes(ingestionQueue.id),
);
const latestTask = runningTasks[0];
if (latestTask && !latestTask?.isCompleted) {
runs.cancel(latestTask?.id);
if (latestTask && !latestTask.isCompleted) {
await cancelJob(latestTask.id);
}
let result;

View File

@ -1,4 +1,4 @@
import { type LoaderFunctionArgs, json } from "@remix-run/node";
import { json } from "@remix-run/node";
import { z } from "zod";
import { prisma } from "~/db.server";
import { createHybridLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

View File

@ -2,7 +2,7 @@ import { UserTypeEnum } from "@core/types";
import { auth, runs, tasks } from "@trigger.dev/sdk/v3";
import { prisma } from "~/db.server";
import { createConversationTitle } from "~/trigger/conversation/create-conversation-title";
import { enqueueCreateConversationTitle } from "~/lib/queue-adapter.server";
import { z } from "zod";
import { type ConversationHistory } from "@prisma/client";
@ -87,14 +87,10 @@ export async function createConversation(
const context = await getConversationContext(conversationHistory.id);
// Trigger conversation title task
await tasks.trigger<typeof createConversationTitle>(
createConversationTitle.id,
{
conversationId: conversation.id,
message: conversationData.message,
},
{ tags: [conversation.id, workspaceId] },
);
await enqueueCreateConversationTitle({
conversationId: conversation.id,
message: conversationData.message,
});
const handler = await tasks.trigger(
"chat",

View File

@ -0,0 +1,87 @@
/**
* Job Manager Service
*
* Unified interface for managing background jobs across both
* Trigger.dev and BullMQ queue providers.
*/
import { env } from "~/env.server";
type QueueProvider = "trigger" | "bullmq";
interface JobInfo {
id: string;
isCompleted: boolean;
status?: string;
}
/**
* Find running jobs by tags/identifiers
*/
export async function findRunningJobs(params: {
tags: string[];
taskIdentifier?: string;
}): Promise<JobInfo[]> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { runs } = await import("@trigger.dev/sdk");
const runningTasks = await runs.list({
tag: params.tags,
taskIdentifier: params.taskIdentifier,
});
return runningTasks.data.map((task) => ({
id: task.id,
isCompleted: task.isCompleted,
status: task.status,
}));
} else {
// BullMQ
const { getJobsByTags } = await import("~/bullmq/utils/job-finder");
const jobs = await getJobsByTags(params.tags, params.taskIdentifier);
return jobs;
}
}
/**
* Cancel a running job
*/
export async function cancelJob(jobId: string): Promise<void> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { runs } = await import("@trigger.dev/sdk");
await runs.cancel(jobId);
} else {
// BullMQ
const { cancelJobById } = await import("~/bullmq/utils/job-finder");
await cancelJobById(jobId);
}
}
/**
* Get job status
*/
export async function getJobStatus(jobId: string): Promise<JobInfo | null> {
const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") {
const { runs } = await import("@trigger.dev/sdk");
try {
const run = await runs.retrieve(jobId);
return {
id: run.id,
isCompleted: run.isCompleted,
status: run.status,
};
} catch {
return null;
}
} else {
// BullMQ
const { getJobById } = await import("~/bullmq/utils/job-finder");
return await getJobById(jobId);
}
}

View File

@ -5,7 +5,7 @@ import {
getSessionEpisodes,
type CompactedSessionNode,
} from "~/services/graphModels/compactedSession";
import { tasks } from "@trigger.dev/sdk/v3";
import { enqueueSessionCompaction } from "~/lib/queue-adapter.server";
/**
* Configuration for session compaction
@ -144,7 +144,7 @@ export class SessionCompactionService {
reason: check.reason,
});
const handle = await tasks.trigger("session-compaction", {
const handle = await enqueueSessionCompaction({
userId,
sessionId,
source,

View File

@ -1,61 +1,12 @@
import { LLMMappings } from "@core/types";
import { logger, task } from "@trigger.dev/sdk/v3";
import { generate } from "../chat/stream-utils";
import { conversationTitlePrompt } from "./prompt";
import { prisma } from "../utils/prisma";
import { task } from "@trigger.dev/sdk/v3";
import {
processConversationTitleCreation,
type CreateConversationTitlePayload,
} from "~/jobs/conversation/create-title.logic";
export const createConversationTitle = task({
id: "create-conversation-title",
run: async (payload: { conversationId: string; message: string }) => {
let conversationTitleResponse = "";
const gen = generate(
[
{
role: "user",
content: conversationTitlePrompt.replace(
"{{message}}",
payload.message,
),
},
],
false,
() => {},
undefined,
"",
LLMMappings.GPT41,
);
for await (const chunk of gen) {
if (typeof chunk === "string") {
conversationTitleResponse += chunk;
} else if (chunk && typeof chunk === "object" && chunk.message) {
conversationTitleResponse += chunk.message;
}
}
const outputMatch = conversationTitleResponse.match(
/<output>(.*?)<\/output>/s,
);
logger.info(`Conversation title data: ${JSON.stringify(outputMatch)}`);
if (!outputMatch) {
logger.error("No output found in recurrence response");
throw new Error("Invalid response format from AI");
}
const jsonStr = outputMatch[1].trim();
const conversationTitleData = JSON.parse(jsonStr);
if (conversationTitleData) {
await prisma.conversation.update({
where: {
id: payload.conversationId,
},
data: {
title: conversationTitleData.title,
},
});
}
run: async (payload: CreateConversationTitlePayload) => {
return await processConversationTitleCreation(payload);
},
});

View File

@ -1,16 +1,8 @@
import { queue, task } from "@trigger.dev/sdk";
import { type z } from "zod";
import crypto from "crypto";
import { IngestionStatus } from "@core/database";
import { EpisodeTypeEnum } from "@core/types";
import { logger } from "~/services/logger.service";
import { saveDocument } from "~/services/graphModels/document";
import { type IngestBodyRequest } from "~/lib/ingest.server";
import { DocumentVersioningService } from "~/services/documentVersioning.server";
import { DocumentDifferentialService } from "~/services/documentDiffer.server";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { prisma } from "../utils/prisma";
import {
processDocumentIngestion,
type IngestDocumentPayload,
} from "~/jobs/ingest/ingest-document.logic";
import { ingestTask } from "./ingest";
const documentIngestionQueue = queue({
@ -23,266 +15,19 @@ export const ingestDocumentTask = task({
id: "ingest-document",
queue: documentIngestionQueue,
machine: "medium-2x",
run: async (payload: {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}) => {
const startTime = Date.now();
try {
logger.log(`Processing document for user ${payload.userId}`, {
contentLength: payload.body.episodeBody.length,
});
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.PROCESSING,
},
});
const documentBody = payload.body;
// Step 1: Initialize services and prepare document version
const versioningService = new DocumentVersioningService();
const differentialService = new DocumentDifferentialService();
const knowledgeGraphService = new KnowledgeGraphService();
const {
documentNode: document,
versionInfo,
chunkedDocument,
} = await versioningService.prepareDocumentVersion(
documentBody.sessionId!,
payload.userId,
documentBody.metadata?.documentTitle?.toString() || "Untitled Document",
documentBody.episodeBody,
documentBody.source,
documentBody.metadata || {},
);
logger.log(`Document version analysis:`, {
version: versionInfo.newVersion,
isNewDocument: versionInfo.isNewDocument,
hasContentChanged: versionInfo.hasContentChanged,
changePercentage: versionInfo.chunkLevelChanges.changePercentage,
changedChunks: versionInfo.chunkLevelChanges.changedChunkIndices.length,
totalChunks: versionInfo.chunkLevelChanges.totalChunks,
});
// Step 2: Determine processing strategy
const differentialDecision =
await differentialService.analyzeDifferentialNeed(
documentBody.episodeBody,
versionInfo.existingDocument,
chunkedDocument,
);
logger.log(`Differential analysis:`, {
shouldUseDifferential: differentialDecision.shouldUseDifferential,
strategy: differentialDecision.strategy,
reason: differentialDecision.reason,
documentSizeTokens: differentialDecision.documentSizeTokens,
});
// Early return for unchanged documents
if (differentialDecision.strategy === "skip_processing") {
logger.log("Document content unchanged, skipping processing");
return {
success: true,
documentsProcessed: 1,
chunksProcessed: 0,
episodesCreated: 0,
entitiesExtracted: 0,
};
}
// Step 3: Save the new document version
await saveDocument(document);
// Step 3.1: Invalidate statements from previous document version if it exists
let invalidationResults = null;
if (versionInfo.existingDocument && versionInfo.hasContentChanged) {
logger.log(
`Invalidating statements from previous document version: ${versionInfo.existingDocument.uuid}`,
);
invalidationResults =
await knowledgeGraphService.invalidateStatementsFromPreviousDocumentVersion(
{
previousDocumentUuid: versionInfo.existingDocument.uuid,
newDocumentContent: documentBody.episodeBody,
userId: payload.userId,
invalidatedBy: document.uuid,
semanticSimilarityThreshold: 0.75, // Configurable threshold
},
);
logger.log(`Statement invalidation completed:`, {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
run: async (payload: IngestDocumentPayload) => {
// Use common logic with Trigger-specific callback for episode ingestion
return await processDocumentIngestion(
payload,
// Callback for enqueueing episode ingestion for each chunk
async (episodePayload) => {
const episodeHandler = await ingestTask.trigger(episodePayload, {
queue: "ingestion-queue",
concurrencyKey: episodePayload.userId,
tags: [episodePayload.userId, episodePayload.queueId],
});
}
logger.log(
`Document chunked into ${chunkedDocument.chunks.length} chunks`,
);
// Step 4: Process chunks based on differential strategy
let chunksToProcess = chunkedDocument.chunks;
let processingMode = "full";
if (
differentialDecision.shouldUseDifferential &&
differentialDecision.strategy === "chunk_level_diff"
) {
// Only process changed chunks
const chunkComparisons = differentialService.getChunkComparisons(
versionInfo.existingDocument!,
chunkedDocument,
);
const changedIndices =
differentialService.getChunksNeedingReprocessing(chunkComparisons);
chunksToProcess = chunkedDocument.chunks.filter((chunk) =>
changedIndices.includes(chunk.chunkIndex),
);
processingMode = "differential";
logger.log(
`Differential processing: ${chunksToProcess.length}/${chunkedDocument.chunks.length} chunks need reprocessing`,
);
} else if (differentialDecision.strategy === "full_reingest") {
// Process all chunks
processingMode = "full";
logger.log(
`Full reingestion: processing all ${chunkedDocument.chunks.length} chunks`,
);
}
// Step 5: Queue chunks for processing
const episodeHandlers = [];
for (const chunk of chunksToProcess) {
const chunkEpisodeData = {
episodeBody: chunk.content,
referenceTime: documentBody.referenceTime,
metadata: {
...documentBody.metadata,
processingMode,
differentialStrategy: differentialDecision.strategy,
chunkHash: chunk.contentHash,
documentTitle:
documentBody.metadata?.documentTitle?.toString() ||
"Untitled Document",
chunkIndex: chunk.chunkIndex,
documentUuid: document.uuid,
},
source: documentBody.source,
spaceIds: documentBody.spaceIds,
sessionId: documentBody.sessionId,
type: EpisodeTypeEnum.DOCUMENT,
};
const episodeHandler = await ingestTask.trigger(
{
body: chunkEpisodeData,
userId: payload.userId,
workspaceId: payload.workspaceId,
queueId: payload.queueId,
},
{
queue: "ingestion-queue",
concurrencyKey: payload.userId,
tags: [payload.userId, payload.queueId, processingMode],
},
);
if (episodeHandler.id) {
episodeHandlers.push(episodeHandler.id);
logger.log(
`Queued chunk ${chunk.chunkIndex + 1} for ${processingMode} processing`,
{
handlerId: episodeHandler.id,
chunkSize: chunk.content.length,
chunkHash: chunk.contentHash,
},
);
}
}
// Calculate cost savings
const costSavings = differentialService.calculateCostSavings(
chunkedDocument.chunks.length,
chunksToProcess.length,
);
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: {
documentUuid: document.uuid,
version: versionInfo.newVersion,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
processingMode,
differentialStrategy: differentialDecision.strategy,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: null,
episodes: [],
episodeHandlers,
},
status: IngestionStatus.PROCESSING,
},
});
const processingTimeMs = Date.now() - startTime;
logger.log(
`Document differential processing completed in ${processingTimeMs}ms`,
{
documentUuid: document.uuid,
version: versionInfo.newVersion,
processingMode,
totalChunks: chunkedDocument.chunks.length,
chunksProcessed: chunksToProcess.length,
chunksSkipped: costSavings.chunksSkipped,
estimatedSavings: `${costSavings.estimatedSavingsPercentage.toFixed(1)}%`,
changePercentage: `${differentialDecision.changePercentage.toFixed(1)}%`,
statementInvalidation: invalidationResults
? {
totalAnalyzed: invalidationResults.totalStatementsAnalyzed,
invalidated: invalidationResults.invalidatedStatements.length,
preserved: invalidationResults.preservedStatements.length,
}
: "No previous version",
},
);
return { success: true };
} catch (err: any) {
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
error: err.message,
status: IngestionStatus.FAILED,
},
});
logger.error(
`Error processing document for user ${payload.userId}:`,
err,
);
return { success: false, error: err.message };
}
return { id: episodeHandler.id };
},
);
},
});

View File

@ -1,251 +1,37 @@
import { queue, task } from "@trigger.dev/sdk";
import { z } from "zod";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { linkEpisodeToDocument } from "~/services/graphModels/document";
import { IngestionStatus } from "@core/database";
import { logger } from "~/services/logger.service";
import {
processEpisodeIngestion,
IngestBodyRequest,
type IngestEpisodePayload,
} from "~/jobs/ingest/ingest-episode.logic";
import { triggerSpaceAssignment } from "../spaces/space-assignment";
import { prisma } from "../utils/prisma";
import { EpisodeType } from "@core/types";
import { deductCredits, hasCredits } from "../utils/utils";
import { assignEpisodesToSpace } from "~/services/graphModels/space";
import { triggerSessionCompaction } from "../session/session-compaction";
export const IngestBodyRequest = z.object({
episodeBody: z.string(),
referenceTime: z.string(),
metadata: z.record(z.union([z.string(), z.number(), z.boolean()])).optional(),
source: z.string(),
spaceIds: z.array(z.string()).optional(),
sessionId: z.string().optional(),
type: z
.enum([EpisodeType.CONVERSATION, EpisodeType.DOCUMENT])
.default(EpisodeType.CONVERSATION),
});
const ingestionQueue = queue({
name: "ingestion-queue",
concurrencyLimit: 1,
});
// Export for backwards compatibility
export { IngestBodyRequest };
// Register the Trigger.dev task
export const ingestTask = task({
id: "ingest-episode",
queue: ingestionQueue,
machine: "medium-2x",
run: async (payload: {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}) => {
try {
logger.log(`Processing job for user ${payload.userId}`);
// Check if workspace has sufficient credits before processing
const hasSufficientCredits = await hasCredits(
payload.workspaceId,
"addEpisode",
);
if (!hasSufficientCredits) {
logger.warn(
`Insufficient credits for workspace ${payload.workspaceId}`,
);
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.NO_CREDITS,
error:
"Insufficient credits. Please upgrade your plan or wait for your credits to reset.",
},
});
return {
success: false,
error: "Insufficient credits",
};
}
const ingestionQueue = await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.PROCESSING,
},
});
const knowledgeGraphService = new KnowledgeGraphService();
const episodeBody = payload.body as any;
const episodeDetails = await knowledgeGraphService.addEpisode(
{
...episodeBody,
userId: payload.userId,
},
prisma,
);
// Link episode to document if it's a document chunk
if (
episodeBody.type === EpisodeType.DOCUMENT &&
episodeBody.metadata.documentUuid &&
episodeDetails.episodeUuid
) {
try {
await linkEpisodeToDocument(
episodeDetails.episodeUuid,
episodeBody.metadata.documentUuid,
episodeBody.metadata.chunkIndex || 0,
);
logger.log(
`Linked episode ${episodeDetails.episodeUuid} to document ${episodeBody.metadata.documentUuid} at chunk ${episodeBody.metadata.chunkIndex || 0}`,
);
} catch (error) {
logger.error(`Failed to link episode to document:`, {
error,
episodeUuid: episodeDetails.episodeUuid,
documentUuid: episodeBody.metadata.documentUuid,
});
}
}
let finalOutput = episodeDetails;
let episodeUuids: string[] = episodeDetails.episodeUuid
? [episodeDetails.episodeUuid]
: [];
let currentStatus: IngestionStatus = IngestionStatus.COMPLETED;
if (episodeBody.type === EpisodeType.DOCUMENT) {
const currentOutput = ingestionQueue.output as any;
currentOutput.episodes.push(episodeDetails);
episodeUuids = currentOutput.episodes.map(
(episode: any) => episode.episodeUuid,
);
finalOutput = {
...currentOutput,
};
if (currentOutput.episodes.length !== currentOutput.totalChunks) {
currentStatus = IngestionStatus.PROCESSING;
}
}
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: finalOutput,
status: currentStatus,
},
});
// Deduct credits for episode creation
if (currentStatus === IngestionStatus.COMPLETED) {
await deductCredits(
payload.workspaceId,
"addEpisode",
finalOutput.statementsCreated,
);
}
// Handle space assignment after successful ingestion
try {
// If spaceIds were explicitly provided, immediately assign the episode to those spaces
if (
episodeBody.spaceIds &&
episodeBody.spaceIds.length > 0 &&
episodeDetails.episodeUuid
) {
logger.info(`Assigning episode to explicitly provided spaces`, {
userId: payload.userId,
episodeId: episodeDetails.episodeUuid,
spaceIds: episodeBody.spaceIds,
});
// Assign episode to each space
for (const spaceId of episodeBody.spaceIds) {
await assignEpisodesToSpace(
[episodeDetails.episodeUuid],
spaceId,
payload.userId,
);
}
logger.info(
`Skipping LLM space assignment - episode explicitly assigned to ${episodeBody.spaceIds.length} space(s)`,
);
} else {
// Only trigger automatic LLM space assignment if no explicit spaceIds were provided
logger.info(
`Triggering LLM space assignment after successful ingestion`,
{
userId: payload.userId,
workspaceId: payload.workspaceId,
episodeId: episodeDetails?.episodeUuid,
},
);
if (
episodeDetails.episodeUuid &&
currentStatus === IngestionStatus.COMPLETED
) {
await triggerSpaceAssignment({
userId: payload.userId,
workspaceId: payload.workspaceId,
mode: "episode",
episodeIds: episodeUuids,
});
}
}
} catch (assignmentError) {
// Don't fail the ingestion if assignment fails
logger.warn(`Failed to trigger space assignment after ingestion:`, {
error: assignmentError,
userId: payload.userId,
episodeId: episodeDetails?.episodeUuid,
});
}
// Auto-trigger session compaction if episode has sessionId
try {
if (
episodeBody.sessionId &&
currentStatus === IngestionStatus.COMPLETED
) {
logger.info(`Checking if session compaction should be triggered`, {
userId: payload.userId,
sessionId: episodeBody.sessionId,
source: episodeBody.source,
});
await triggerSessionCompaction({
userId: payload.userId,
sessionId: episodeBody.sessionId,
source: episodeBody.source,
});
}
} catch (compactionError) {
// Don't fail the ingestion if compaction fails
logger.warn(`Failed to trigger session compaction after ingestion:`, {
error: compactionError,
userId: payload.userId,
sessionId: episodeBody.sessionId,
});
}
return { success: true, episodeDetails };
} catch (err: any) {
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
error: err.message,
status: IngestionStatus.FAILED,
},
});
logger.error(`Error processing job for user ${payload.userId}:`, err);
return { success: false, error: err.message };
}
run: async (payload: IngestEpisodePayload) => {
// Use common logic with Trigger-specific callbacks for follow-up jobs
return await processEpisodeIngestion(
payload,
// Callback for space assignment
async (params) => {
await triggerSpaceAssignment(params);
},
// Callback for session compaction
async (params) => {
await triggerSessionCompaction(params);
},
);
},
});

View File

@ -1,36 +1,8 @@
import { queue, task } from "@trigger.dev/sdk/v3";
import { logger } from "~/services/logger.service";
import { runQuery } from "~/lib/neo4j.server";
import type { CoreMessage } from "ai";
import { z } from "zod";
import { getEmbedding, makeModelCall } from "~/lib/model.server";
import {
getCompactedSessionBySessionId,
linkEpisodesToCompact,
getSessionEpisodes,
type CompactedSessionNode,
type SessionEpisodeData,
saveCompactedSession,
} from "~/services/graphModels/compactedSession";
interface SessionCompactionPayload {
userId: string;
sessionId: string;
source: string;
triggerSource?: "auto" | "manual" | "threshold";
}
// Zod schema for LLM response validation
const CompactionResultSchema = z.object({
summary: z.string().describe("Consolidated narrative of the entire session"),
confidence: z.number().min(0).max(1).describe("Confidence score of the compaction quality"),
});
const CONFIG = {
minEpisodesForCompaction: 5, // Minimum episodes to trigger compaction
compactionThreshold: 1, // Trigger after N new episodes
maxEpisodesPerBatch: 50, // Process in batches if needed
};
processSessionCompaction,
type SessionCompactionPayload,
} from "~/jobs/session/session-compaction.logic";
export const sessionCompactionQueue = queue({
name: "session-compaction-queue",
@ -41,82 +13,7 @@ export const sessionCompactionTask = task({
id: "session-compaction",
queue: sessionCompactionQueue,
run: async (payload: SessionCompactionPayload) => {
const { userId, sessionId, source, triggerSource = "auto" } = payload;
logger.info(`Starting session compaction`, {
userId,
sessionId,
source,
triggerSource,
});
try {
// Check if compaction already exists
const existingCompact = await getCompactedSessionBySessionId(sessionId, userId);
// Fetch all episodes for this session
const episodes = await getSessionEpisodes(sessionId, userId, existingCompact?.endTime);
console.log("episodes", episodes.length);
// Check if we have enough episodes
if (!existingCompact && episodes.length < CONFIG.minEpisodesForCompaction) {
logger.info(`Not enough episodes for compaction`, {
sessionId,
episodeCount: episodes.length,
minRequired: CONFIG.minEpisodesForCompaction,
});
return {
success: false,
reason: "insufficient_episodes",
episodeCount: episodes.length,
};
} else if (existingCompact && episodes.length < CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold) {
logger.info(`Not enough new episodes for compaction`, {
sessionId,
episodeCount: episodes.length,
minRequired: CONFIG.minEpisodesForCompaction + CONFIG.compactionThreshold,
});
return {
success: false,
reason: "insufficient_new_episodes",
episodeCount: episodes.length,
};
}
// Generate or update compaction
const compactionResult = existingCompact
? await updateCompaction(existingCompact, episodes, userId)
: await createCompaction(sessionId, episodes, userId, source);
logger.info(`Session compaction completed`, {
sessionId,
compactUuid: compactionResult.uuid,
episodeCount: compactionResult.episodeCount,
compressionRatio: compactionResult.compressionRatio,
});
return {
success: true,
compactionResult: {
compactUuid: compactionResult.uuid,
sessionId: compactionResult.sessionId,
summary: compactionResult.summary,
episodeCount: compactionResult.episodeCount,
startTime: compactionResult.startTime,
endTime: compactionResult.endTime,
confidence: compactionResult.confidence,
compressionRatio: compactionResult.compressionRatio,
},
};
} catch (error) {
logger.error(`Session compaction failed`, {
sessionId,
userId,
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
return await processSessionCompaction(payload);
},
});

View File

@ -11,7 +11,7 @@ import { getSpace, updateSpace } from "../utils/space-utils";
import { EpisodeType } from "@core/types";
import { getSpaceEpisodeCount } from "~/services/graphModels/space";
import { addToQueue } from "../utils/queue";
import { addToQueue } from "~/lib/ingest.server";
interface SpaceSummaryPayload {
userId: string;

View File

@ -2,6 +2,7 @@ import { logger } from "~/services/logger.service";
import { fetchAndSaveStdioIntegrations } from "~/trigger/utils/mcp";
import { initNeo4jSchemaOnce } from "~/lib/neo4j.server";
import { env } from "~/env.server";
import { startWorkers } from "~/bullmq/start-workers";
// Global flag to ensure startup only runs once per server process
let startupInitialized = false;
@ -40,19 +41,23 @@ export async function initializeStartupServices() {
process.exit(1);
}
try {
const triggerApiUrl = env.TRIGGER_API_URL;
if (triggerApiUrl) {
await waitForTriggerLogin(triggerApiUrl);
await addEnvVariablesInTrigger();
} else {
console.error("TRIGGER_API_URL is not set in environment variables.");
if (env.QUEUE_PROVIDER === "trigger") {
try {
const triggerApiUrl = env.TRIGGER_API_URL;
if (triggerApiUrl) {
await waitForTriggerLogin(triggerApiUrl);
await addEnvVariablesInTrigger();
} else {
console.error("TRIGGER_API_URL is not set in environment variables.");
process.exit(1);
}
} catch (e) {
console.error(e);
console.error("Trigger is not configured");
process.exit(1);
}
} catch (e) {
console.error(e);
console.error("Trigger is not configured");
process.exit(1);
} else {
await startWorkers();
}
try {

View File

@ -1,70 +0,0 @@
version: "3.8"
services:
core:
container_name: core-app
image: redplanethq/core:${VERSION}
environment:
- NODE_ENV=${NODE_ENV}
- DATABASE_URL=${DATABASE_URL}
- DIRECT_URL=${DIRECT_URL}
- SESSION_SECRET=${SESSION_SECRET}
- ENCRYPTION_KEY=${ENCRYPTION_KEY}
- MAGIC_LINK_SECRET=${MAGIC_LINK_SECRET}
- LOGIN_ORIGIN=${LOGIN_ORIGIN}
- APP_ORIGIN=${APP_ORIGIN}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_TLS_DISABLED=${REDIS_TLS_DISABLED}
- NEO4J_URI=${NEO4J_URI}
- NEO4J_USERNAME=${NEO4J_USERNAME}
- NEO4J_PASSWORD=${NEO4J_PASSWORD}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- AUTH_GOOGLE_CLIENT_ID=${AUTH_GOOGLE_CLIENT_ID}
- AUTH_GOOGLE_CLIENT_SECRET=${AUTH_GOOGLE_CLIENT_SECRET}
- ENABLE_EMAIL_LOGIN=${ENABLE_EMAIL_LOGIN}
- OLLAMA_URL=${OLLAMA_URL}
- EMBEDDING_MODEL=${EMBEDDING_MODEL}
- MODEL=${MODEL}
- TRIGGER_PROJECT_ID=${TRIGGER_PROJECT_ID}
- TRIGGER_API_URL=${TRIGGER_API_URL}
- TRIGGER_SECRET_KEY=${TRIGGER_SECRET_KEY}
ports:
- "3033:3000"
depends_on:
- redis
- neo4j
networks:
- core
redis:
container_name: core-redis
image: redis:7
ports:
- "6379:6379"
networks:
- core
neo4j:
container_name: core-neo4j
image: neo4j:5.25-community
environment:
- NEO4J_AUTH=${NEO4J_AUTH}
- NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.*
- NEO4J_dbms_security_procedures_allowlist=gds.*,apoc.*
ports:
- "7474:7474"
- "7687:7687"
volumes:
- type: bind
source: /efs/neo4j
target: /data
- type: bind
source: /efs/neo4j/plugins # version - 2.13.2
target: /plugins
networks:
- core
networks:
core:
driver: bridge

View File

@ -48,74 +48,4 @@ OLLAMA_URL=
EMBEDDING_MODEL=text-embedding-3-small
MODEL=gpt-4.1-2025-04-14
## Trigger ##
TRIGGER_PROJECT_ID=proj_mqwudvjcukvybqxyjkjv
TRIGGER_SECRET_KEY=tr_prod_72iziCY2yWA5SdGxRFii
# ------------------------------------------------------------
# Trigger.dev self-hosting environment variables
# - These are the default values for the self-hosting stack
# - You should change them to suit your needs, especially the secrets
# - See the docs for more information: https://trigger.dev/docs/self-hosting/overview
# Secrets
# - Do NOT use these defaults in production
# - Generate your own by running `openssl rand -hex 16` for each secret
MANAGED_WORKER_SECRET=447c29678f9eaf289e9c4b70d3dd8a7f
# Worker token
# - This is the token for the worker to connect to the webapp
# - When running the combined stack, this is set automatically during bootstrap
# - For the split setup, you will have to set this manually. The token is available in the webapp logs but will only be shown once.
# - See the docs for more information: https://trigger.dev/docs/self-hosting/docker
TRIGGER_WORKER_TOKEN=tr_wgt_jtRujkUnfK3RmNtUev049Clw7gaqwg77VMPGu7Iv
TRIGGER_TASKS_IMAGE=redplanethq/proj_core:latest
# Worker URLs
# - In split setups, uncomment and set to the public URL of your webapp
# TRIGGER_API_URL=https://trigger.example.com
# OTEL_EXPORTER_OTLP_ENDPOINT=https://trigger.example.com/otel
# Postgres
# - Do NOT use these defaults in production
# - Especially if you decide to expose the database to the internet
# POSTGRES_USER=postgres
TRIGGER_DB=trigger
TRIGGER_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${DB_HOST}:${DB_PORT}/${TRIGGER_DB}?schema=public&sslmode=disable
TRIGGER_DIRECT_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${DB_HOST}:${DB_PORT}/${TRIGGER_DB}?schema=public&sslmode=disable
ELECTRIC_DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${DB_HOST}/${TRIGGER_DB}
# Trigger image tag
# - This is the version of the webapp and worker images to use, they should be locked to a specific version in production
# - For example: TRIGGER_IMAGE_TAG=v4.0.0-v4-beta.21
TRIGGER_IMAGE_TAG=v4-beta
# Webapp
# - These should generally be set to the same value
# - In production, these should be set to the public URL of your webapp, e.g. https://trigger.example.com
APP_ORIGIN=http://localhost:8030
LOGIN_ORIGIN=http://localhost:8030
API_ORIGIN=http://trigger-webapp:3000
DEV_OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:8030/otel
# You may need to set this when testing locally or when using the combined setup
# API_ORIGIN=http://webapp:3000
# ClickHouse
# - Do NOT use these defaults in production
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_URL=http://default:password@clickhouse:8123?secure=false
RUN_REPLICATION_CLICKHOUSE_URL=http://default:password@clickhouse:8123
# Docker Registry
# - When testing locally, the default values should be fine
# - When deploying to production, you will have to change these, especially the password and URL
# - See the docs for more information: https://trigger.dev/docs/self-hosting/docker#registry-setup
DOCKER_REGISTRY_URL=docker.io
DOCKER_REGISTRY_USERNAME=
DOCKER_REGISTRY_PASSWORD=
QUEUE_PROVIDER=bullmq

View File

@ -108,249 +108,12 @@ services:
retries: 10
start_period: 20s
webapp:
container_name: trigger-webapp
image: ghcr.io/triggerdotdev/trigger.dev:v4.0.4
restart: ${RESTART_POLICY:-unless-stopped}
logging: *logging-config
ports:
- ${WEBAPP_PUBLISH_IP:-0.0.0.0}:8030:3000
depends_on:
clickhouse:
condition: service_started
init:
condition: service_started
networks:
- webapp
- supervisor
- core
volumes:
- shared:/home/node/shared
# Only needed for bootstrap
user: root
# Only needed for bootstrap
command: sh -c "chown -R node:node /home/node/shared && sleep 5 && exec ./scripts/entrypoint.sh"
healthcheck:
test:
[
"CMD",
"node",
"-e",
"http.get('http://localhost:3000/healthcheck', res => process.exit(res.statusCode === 200 ? 0 : 1)).on('error', () => process.exit(1))",
]
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
environment:
APP_ORIGIN: ${APP_ORIGIN:-http://localhost:8030}
LOGIN_ORIGIN: ${LOGIN_ORIGIN:-http://localhost:8030}
API_ORIGIN: ${API_ORIGIN:-http://localhost:8030}
ELECTRIC_ORIGIN: http://electric:3000
DATABASE_URL: ${TRIGGER_DATABASE_URL:-postgresql://postgres:postgres@postgres:5432/main?schema=public&sslmode=disable}
DIRECT_URL: ${TRIGGER_DIRECT_URL:-postgresql://postgres:postgres@postgres:5432/main?schema=public&sslmode=disable}
SESSION_SECRET: ${SESSION_SECRET}
MAGIC_LINK_SECRET: ${MAGIC_LINK_SECRET}
ENCRYPTION_KEY: ${ENCRYPTION_KEY}
MANAGED_WORKER_SECRET: ${MANAGED_WORKER_SECRET}
REDIS_HOST: core-redis
REDIS_PORT: 6379
REDIS_TLS_DISABLED: true
APP_LOG_LEVEL: info
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: ${DEV_OTEL_EXPORTER_OTLP_ENDPOINT:-http://localhost:8030/otel}
DEPLOY_REGISTRY_HOST: ${DOCKER_REGISTRY_URL:-localhost:5000}
DEPLOY_REGISTRY_NAMESPACE: ${DOCKER_REGISTRY_NAMESPACE:-trigger}
OBJECT_STORE_BASE_URL: ${OBJECT_STORE_BASE_URL:-http://minio:9000}
OBJECT_STORE_ACCESS_KEY_ID: ${OBJECT_STORE_ACCESS_KEY_ID}
OBJECT_STORE_SECRET_ACCESS_KEY: ${OBJECT_STORE_SECRET_ACCESS_KEY}
GRACEFUL_SHUTDOWN_TIMEOUT: 1000
# Bootstrap - this will automatically set up a worker group for you
# This will NOT work for split deployments
TRIGGER_BOOTSTRAP_ENABLED: 1
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: bootstrap
TRIGGER_BOOTSTRAP_WORKER_TOKEN_PATH: /home/node/shared/worker_token
# ClickHouse configuration
CLICKHOUSE_URL: ${CLICKHOUSE_URL:-http://default:password@clickhouse:8123?secure=false}
CLICKHOUSE_LOG_LEVEL: ${CLICKHOUSE_LOG_LEVEL:-info}
# Run replication
RUN_REPLICATION_ENABLED: ${RUN_REPLICATION_ENABLED:-1}
RUN_REPLICATION_CLICKHOUSE_URL: ${RUN_REPLICATION_CLICKHOUSE_URL:-http://default:password@clickhouse:8123}
RUN_REPLICATION_LOG_LEVEL: ${RUN_REPLICATION_LOG_LEVEL:-info}
# Limits
# TASK_PAYLOAD_OFFLOAD_THRESHOLD: 524288 # 512KB
# TASK_PAYLOAD_MAXIMUM_SIZE: 3145728 # 3MB
# BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: 1000000 # 1MB
# TASK_RUN_METADATA_MAXIMUM_SIZE: 262144 # 256KB
# DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: 100
# DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: 100
# Internal OTEL configuration
INTERNAL_OTEL_TRACE_LOGGING_ENABLED: ${INTERNAL_OTEL_TRACE_LOGGING_ENABLED:-0}
electric:
container_name: trigger-electric
image: electricsql/electric:${ELECTRIC_IMAGE_TAG:-1.0.10}
restart: ${RESTART_POLICY:-unless-stopped}
logging: *logging-config
networks:
- webapp
- core
environment:
DATABASE_URL: ${ELECTRIC_DATABASE_URL:-postgresql://postgres:postgres@postgres:5432/main?schema=public&sslmode=disable}
ELECTRIC_INSECURE: true
ELECTRIC_USAGE_REPORTING: false
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/v1/health"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
clickhouse:
container_name: trigger-clickhouse
image: bitnami/clickhouse:${CLICKHOUSE_IMAGE_TAG:-latest}
restart: ${RESTART_POLICY:-unless-stopped}
logging: *logging-config
ports:
- ${CLICKHOUSE_PUBLISH_IP:-127.0.0.1}:9123:8123
- ${CLICKHOUSE_PUBLISH_IP:-127.0.0.1}:9090:9000
environment:
CLICKHOUSE_ADMIN_USER: ${CLICKHOUSE_USER:-default}
CLICKHOUSE_ADMIN_PASSWORD: ${CLICKHOUSE_PASSWORD:-password}
volumes:
- clickhouse:/bitnami/clickhouse
- ../clickhouse/override.xml:/bitnami/clickhouse/etc/config.d/override.xml:ro
networks:
- webapp
healthcheck:
test:
[
"CMD",
"clickhouse-client",
"--host",
"localhost",
"--port",
"9000",
"--user",
"default",
"--password",
"password",
"--query",
"SELECT 1",
]
interval: 5s
timeout: 5s
retries: 5
start_period: 10s
# Worker related
supervisor:
container_name: trigger-supervisor
image: ghcr.io/triggerdotdev/supervisor:v4.0.4
restart: ${RESTART_POLICY:-unless-stopped}
logging: *logging-config
depends_on:
- docker-proxy
networks:
- supervisor
- docker-proxy
- webapp
- core
volumes:
- shared:/home/node/shared
# Only needed for bootstrap
user: root
# Only needed for bootstrap
command: sh -c "chown -R node:node /home/node/shared && exec /usr/bin/dumb-init -- pnpm run --filter supervisor start"
environment:
# This needs to match the token of the worker group you want to connect to
TRIGGER_WORKER_TOKEN: ${TRIGGER_WORKER_TOKEN}
# Use the bootstrap token created by the webapp
# TRIGGER_WORKER_TOKEN: file:///home/node/shared/worker_token
MANAGED_WORKER_SECRET: ${MANAGED_WORKER_SECRET}
TRIGGER_API_URL: ${TRIGGER_API_URL:-http://trigger-webapp:3000}
OTEL_EXPORTER_OTLP_ENDPOINT: ${OTEL_EXPORTER_OTLP_ENDPOINT:-http://trigger-webapp:3000/otel}
TRIGGER_WORKLOAD_API_DOMAIN: supervisor
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: 8020
# Optional settings
DEBUG: 1
ENFORCE_MACHINE_PRESETS: 1
TRIGGER_DEQUEUE_INTERVAL_MS: 1000
DOCKER_HOST: tcp://docker-proxy:2375
DOCKER_RUNNER_NETWORKS: webapp,supervisor,core
DOCKER_REGISTRY_URL: ${DOCKER_REGISTRY_URL:-localhost:5000}
DOCKER_REGISTRY_USERNAME: ${DOCKER_REGISTRY_USERNAME:-}
DOCKER_REGISTRY_PASSWORD: ${DOCKER_REGISTRY_PASSWORD:-}
DOCKER_AUTOREMOVE_EXITED_CONTAINERS: 0
healthcheck:
test:
[
"CMD",
"node",
"-e",
"http.get('http://localhost:8020/health', res => process.exit(res.statusCode === 200 ? 0 : 1)).on('error', () => process.exit(1))",
]
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
init:
container_name: trigger-init
image: redplanethq/init:${VERSION}
restart: "no" # prevent retries
environment:
- VERSION=${VERSION}
- DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT}
- TRIGGER_DB=${TRIGGER_DB}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- TRIGGER_TASKS_IMAGE=${TRIGGER_TASKS_IMAGE}
- NODE_ENV=production
networks:
- webapp
- core
depends_on:
- postgres
docker-proxy:
container_name: trigger-docker-proxy
image: tecnativa/docker-socket-proxy:${DOCKER_PROXY_IMAGE_TAG:-latest}
restart: ${RESTART_POLICY:-unless-stopped}
logging: *logging-config
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
networks:
- docker-proxy
environment:
- LOG_LEVEL=info
- POST=1
- CONTAINERS=1
- IMAGES=1
- INFO=1
- NETWORKS=1
healthcheck:
test: ["CMD", "nc", "-z", "127.0.0.1", "2375"]
interval: 30s
timeout: 5s
retries: 5
start_period: 5s
networks:
core:
name: core
driver: bridge
docker-proxy:
name: docker-proxy
supervisor:
name: supervisor
webapp:
name: webapp
driver: bridge
volumes:
postgres_data:
neo4j_data:
shared:
clickhouse:
minio: