diff --git a/apps/webapp/app/bullmq/start-workers.ts b/apps/webapp/app/bullmq/start-workers.ts index dace682..23169b0 100644 --- a/apps/webapp/app/bullmq/start-workers.ts +++ b/apps/webapp/app/bullmq/start-workers.ts @@ -17,27 +17,107 @@ import { sessionCompactionWorker, closeAllWorkers, } from "./workers"; +import { + ingestQueue, + documentIngestQueue, + conversationTitleQueue, + deepSearchQueue, + sessionCompactionQueue, +} from "./queues"; +import { + setupWorkerLogging, + startPeriodicMetricsLogging, +} from "./utils/worker-logger"; -export async function startWorkers() {} +let metricsInterval: NodeJS.Timeout | null = null; -// Handle graceful shutdown -process.on("SIGTERM", async () => { - logger.log("SIGTERM received, closing workers gracefully..."); +/** + * Initialize and start all BullMQ workers with comprehensive logging + */ +export async function initWorkers(): Promise { + // Setup comprehensive logging for all workers + setupWorkerLogging(ingestWorker, ingestQueue, "ingest-episode"); + setupWorkerLogging( + documentIngestWorker, + documentIngestQueue, + "ingest-document", + ); + setupWorkerLogging( + conversationTitleWorker, + conversationTitleQueue, + "conversation-title", + ); + setupWorkerLogging(deepSearchWorker, deepSearchQueue, "deep-search"); + setupWorkerLogging( + sessionCompactionWorker, + sessionCompactionQueue, + "session-compaction", + ); + + // Start periodic metrics logging (every 60 seconds) + metricsInterval = startPeriodicMetricsLogging( + [ + { worker: ingestWorker, queue: ingestQueue, name: "ingest-episode" }, + { + worker: documentIngestWorker, + queue: documentIngestQueue, + name: "ingest-document", + }, + { + worker: conversationTitleWorker, + queue: conversationTitleQueue, + name: "conversation-title", + }, + { worker: deepSearchWorker, queue: deepSearchQueue, name: "deep-search" }, + { + worker: sessionCompactionWorker, + queue: sessionCompactionQueue, + name: "session-compaction", + }, + ], + 60000, // Log metrics every 60 seconds + ); + + // Log worker startup + logger.log("\nšŸš€ Starting BullMQ workers..."); + logger.log("─".repeat(80)); + logger.log(`āœ“ Ingest worker: ${ingestWorker.name} (concurrency: 5)`); + logger.log( + `āœ“ Document ingest worker: ${documentIngestWorker.name} (concurrency: 3)`, + ); + logger.log( + `āœ“ Conversation title worker: ${conversationTitleWorker.name} (concurrency: 10)`, + ); + logger.log(`āœ“ Deep search worker: ${deepSearchWorker.name} (concurrency: 5)`); + logger.log( + `āœ“ Session compaction worker: ${sessionCompactionWorker.name} (concurrency: 3)`, + ); + logger.log("─".repeat(80)); + logger.log("āœ… All BullMQ workers started and listening for jobs"); + logger.log("šŸ“Š Metrics will be logged every 60 seconds\n"); +} + +/** + * Shutdown all workers gracefully + */ +export async function shutdownWorkers(): Promise { + logger.log("Shutdown signal received, closing workers gracefully..."); + if (metricsInterval) { + clearInterval(metricsInterval); + } await closeAllWorkers(); - process.exit(0); -}); +} -process.on("SIGINT", async () => { - logger.log("SIGINT received, closing workers gracefully..."); - await closeAllWorkers(); - process.exit(0); -}); +// If running as standalone script, initialize workers +if (import.meta.url === `file://${process.argv[1]}`) { + initWorkers(); -// Log worker startup -logger.log("Starting BullMQ workers..."); -logger.log(`- Ingest worker: ${ingestWorker.name}`); -logger.log(`- Document ingest worker: ${documentIngestWorker.name}`); -logger.log(`- Conversation title worker: ${conversationTitleWorker.name}`); -logger.log(`- Deep search worker: ${deepSearchWorker.name}`); -logger.log(`- Session compaction worker: ${sessionCompactionWorker.name}`); -logger.log("All BullMQ workers started and listening for jobs"); + // Handle graceful shutdown + const shutdown = async () => { + await shutdownWorkers(); + process.exit(0); + }; + + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); +} diff --git a/apps/webapp/app/bullmq/utils/worker-logger.ts b/apps/webapp/app/bullmq/utils/worker-logger.ts new file mode 100644 index 0000000..0059879 --- /dev/null +++ b/apps/webapp/app/bullmq/utils/worker-logger.ts @@ -0,0 +1,184 @@ +/** + * BullMQ Worker Logger + * + * Comprehensive logging utility for tracking worker status, queue metrics, + * and job lifecycle events + */ + +import { type Worker, type Queue } from "bullmq"; +import { logger } from "~/services/logger.service"; + +interface WorkerMetrics { + name: string; + concurrency: number; + activeJobs: number; + waitingJobs: number; + delayedJobs: number; + failedJobs: number; + completedJobs: number; +} + +/** + * Setup comprehensive logging for a worker + */ +export function setupWorkerLogging( + worker: Worker, + queue: Queue, + workerName: string, +): void { + // Job picked up and started processing + worker.on("active", async (job) => { + const counts = await getQueueCounts(queue); + logger.log( + `[${workerName}] šŸ”„ Job started: ${job.id} | Queue: ${counts.waiting} waiting, ${counts.active} active, ${counts.delayed} delayed`, + ); + }); + + // Job completed successfully + worker.on("completed", async (job, result) => { + const counts = await getQueueCounts(queue); + const duration = job.finishedOn ? job.finishedOn - job.processedOn! : 0; + logger.log( + `[${workerName}] āœ… Job completed: ${job.id} (${duration}ms) | Queue: ${counts.waiting} waiting, ${counts.active} active`, + ); + }); + + // Job failed + worker.on("failed", async (job, error) => { + const counts = await getQueueCounts(queue); + const attempt = job?.attemptsMade || 0; + const maxAttempts = job?.opts?.attempts || 3; + logger.error( + `[${workerName}] āŒ Job failed: ${job?.id} (attempt ${attempt}/${maxAttempts}) | Error: ${error.message} | Queue: ${counts.waiting} waiting, ${counts.failed} failed`, + ); + }); + + // Job progress update (if job reports progress) + worker.on("progress", async (job, progress) => { + logger.log(`[${workerName}] šŸ“Š Job progress: ${job.id} - ${progress}%`); + }); + + // Worker stalled (job took too long) + worker.on("stalled", async (jobId) => { + logger.warn(`[${workerName}] āš ļø Job stalled: ${jobId}`); + }); + + // Worker error + worker.on("error", (error) => { + logger.error(`[${workerName}] šŸ”„ Worker error: ${error.message}`); + }); + + // Worker closed + worker.on("closed", () => { + logger.log(`[${workerName}] šŸ›‘ Worker closed`); + }); +} + +/** + * Get queue counts for logging + */ +async function getQueueCounts(queue: Queue): Promise<{ + waiting: number; + active: number; + delayed: number; + failed: number; + completed: number; +}> { + try { + const counts = await queue.getJobCounts( + "waiting", + "active", + "delayed", + "failed", + "completed", + ); + return { + waiting: counts.waiting || 0, + active: counts.active || 0, + delayed: counts.delayed || 0, + failed: counts.failed || 0, + completed: counts.completed || 0, + }; + } catch (error) { + return { waiting: 0, active: 0, delayed: 0, failed: 0, completed: 0 }; + } +} + +/** + * Get metrics for all workers + */ +export async function getAllWorkerMetrics( + workers: Array<{ worker: Worker; queue: Queue; name: string }>, +): Promise { + const metrics = await Promise.all( + workers.map(async ({ worker, queue, name }) => { + const counts = await getQueueCounts(queue); + return { + name, + concurrency: worker.opts.concurrency || 1, + activeJobs: counts.active, + waitingJobs: counts.waiting, + delayedJobs: counts.delayed, + failedJobs: counts.failed, + completedJobs: counts.completed, + }; + }), + ); + + return metrics; +} + +/** + * Log worker metrics summary + */ +export function logWorkerMetrics(metrics: WorkerMetrics[]): void { + logger.log("\nšŸ“Š BullMQ Worker Metrics:"); + logger.log("─".repeat(80)); + + for (const metric of metrics) { + logger.log( + `[${metric.name.padEnd(25)}] Concurrency: ${metric.concurrency} | ` + + `Active: ${metric.activeJobs} | Waiting: ${metric.waitingJobs} | ` + + `Delayed: ${metric.delayedJobs} | Failed: ${metric.failedJobs} | ` + + `Completed: ${metric.completedJobs}`, + ); + } + + const totals = metrics.reduce( + (acc, m) => ({ + active: acc.active + m.activeJobs, + waiting: acc.waiting + m.waitingJobs, + delayed: acc.delayed + m.delayedJobs, + failed: acc.failed + m.failedJobs, + completed: acc.completed + m.completedJobs, + }), + { active: 0, waiting: 0, delayed: 0, failed: 0, completed: 0 }, + ); + + logger.log("─".repeat(80)); + logger.log( + `[TOTAL] Active: ${totals.active} | Waiting: ${totals.waiting} | ` + + `Delayed: ${totals.delayed} | Failed: ${totals.failed} | ` + + `Completed: ${totals.completed}`, + ); + logger.log("─".repeat(80) + "\n"); +} + +/** + * Start periodic metrics logging + */ +export function startPeriodicMetricsLogging( + workers: Array<{ worker: Worker; queue: Queue; name: string }>, + intervalMs: number = 60000, // Default: 1 minute +): NodeJS.Timeout { + const logMetrics = async () => { + const metrics = await getAllWorkerMetrics(workers); + logWorkerMetrics(metrics); + }; + + // Log immediately on start + logMetrics(); + + // Then log periodically + return setInterval(logMetrics, intervalMs); +} diff --git a/apps/webapp/app/jobs/deep-search/deep-search.logic.ts b/apps/webapp/app/jobs/deep-search/deep-search.logic.ts index 94ee2b8..a7be659 100644 --- a/apps/webapp/app/jobs/deep-search/deep-search.logic.ts +++ b/apps/webapp/app/jobs/deep-search/deep-search.logic.ts @@ -6,7 +6,6 @@ import { getOrCreatePersonalAccessToken, } from "~/trigger/utils/utils"; import { getReActPrompt } from "~/trigger/deep-search/prompt"; -import { type DeepSearchPayload, type DeepSearchResponse } from "~/trigger/deep-search/types"; import { createSearchMemoryTool } from "~/trigger/deep-search/utils"; import { run } from "~/trigger/deep-search/deep-search-utils"; import { AgentMessageType } from "~/trigger/chat/types"; diff --git a/apps/webapp/app/jobs/ingest/ingest-document.logic.ts b/apps/webapp/app/jobs/ingest/ingest-document.logic.ts index 7798df2..e3ff521 100644 --- a/apps/webapp/app/jobs/ingest/ingest-document.logic.ts +++ b/apps/webapp/app/jobs/ingest/ingest-document.logic.ts @@ -1,15 +1,15 @@ import { type z } from "zod"; -import crypto from "crypto"; import { IngestionStatus } from "@core/database"; import { EpisodeTypeEnum } from "@core/types"; import { logger } from "~/services/logger.service"; import { saveDocument } from "~/services/graphModels/document"; -import { type IngestBodyRequest } from "~/lib/ingest.server"; + import { DocumentVersioningService } from "~/services/documentVersioning.server"; import { DocumentDifferentialService } from "~/services/documentDiffer.server"; import { KnowledgeGraphService } from "~/services/knowledgeGraph.server"; import { prisma } from "~/trigger/utils/prisma"; +import { type IngestBodyRequest } from "./ingest-episode.logic"; export interface IngestDocumentPayload { body: z.infer; @@ -140,9 +140,7 @@ export async function processDocumentIngestion( }); } - logger.log( - `Document chunked into ${chunkedDocument.chunks.length} chunks`, - ); + logger.log(`Document chunked into ${chunkedDocument.chunks.length} chunks`); // Step 4: Process chunks based on differential strategy let chunksToProcess = chunkedDocument.chunks; @@ -286,10 +284,7 @@ export async function processDocumentIngestion( }, }); - logger.error( - `Error processing document for user ${payload.userId}:`, - err, - ); + logger.error(`Error processing document for user ${payload.userId}:`, err); return { success: false, error: err.message }; } } diff --git a/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts b/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts index abaeaae..e1b515a 100644 --- a/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts +++ b/apps/webapp/app/jobs/ingest/ingest-episode.logic.ts @@ -7,7 +7,6 @@ import { prisma } from "~/trigger/utils/prisma"; import { EpisodeType } from "@core/types"; import { deductCredits, hasCredits } from "~/trigger/utils/utils"; import { assignEpisodesToSpace } from "~/services/graphModels/space"; -import { trackEvent, trackError } from "~/services/telemetry.server"; export const IngestBodyRequest = z.object({ episodeBody: z.string(), diff --git a/apps/webapp/app/utils/startup.ts b/apps/webapp/app/utils/startup.ts index 24b4b15..76d5c5e 100644 --- a/apps/webapp/app/utils/startup.ts +++ b/apps/webapp/app/utils/startup.ts @@ -2,7 +2,7 @@ import { logger } from "~/services/logger.service"; import { fetchAndSaveStdioIntegrations } from "~/trigger/utils/mcp"; import { initNeo4jSchemaOnce } from "~/lib/neo4j.server"; import { env } from "~/env.server"; -import { startWorkers } from "~/bullmq/start-workers"; +import { initWorkers, shutdownWorkers } from "~/bullmq/start-workers"; import { trackConfig } from "~/services/telemetry.server"; // Global flag to ensure startup only runs once per server process @@ -47,7 +47,11 @@ export async function initializeStartupServices() { const triggerApiUrl = env.TRIGGER_API_URL; // At this point, env validation should have already ensured these are present // But we add a runtime check for safety - if (!triggerApiUrl || !env.TRIGGER_PROJECT_ID || !env.TRIGGER_SECRET_KEY) { + if ( + !triggerApiUrl || + !env.TRIGGER_PROJECT_ID || + !env.TRIGGER_SECRET_KEY + ) { console.error( "TRIGGER_API_URL, TRIGGER_PROJECT_ID, and TRIGGER_SECRET_KEY must be set when QUEUE_PROVIDER=trigger", ); @@ -61,7 +65,16 @@ export async function initializeStartupServices() { process.exit(1); } } else { - await startWorkers(); + await initWorkers(); + + // Handle graceful shutdown + process.on("SIGTERM", async () => { + await shutdownWorkers(); + }); + process.on("SIGINT", async () => { + await shutdownWorkers(); + process.exit(0); + }); } try { diff --git a/hosting/docker/docker-compose.yaml b/hosting/docker/docker-compose.yaml index 5ed47be..e4bce7f 100644 --- a/hosting/docker/docker-compose.yaml +++ b/hosting/docker/docker-compose.yaml @@ -84,7 +84,7 @@ services: neo4j: container_name: core-neo4j - image: core-neo4j:0.1.0 + image: neo4j:5 environment: - NEO4J_AUTH=${NEO4J_AUTH} - NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.* @@ -94,6 +94,7 @@ services: - NEO4J_apoc_import_file_use_neo4j_config=true - NEO4J_server_memory_heap_initial__size=2G - NEO4J_server_memory_heap_max__size=4G + - NEO4JLABS_PLUGINS=apoc,graph-data-science ports: - "7474:7474" - "7687:7687"