/** * BullMQ Worker Startup Script * * This script starts all BullMQ workers for processing background jobs. * Run this as a separate process alongside your main application. * * Usage: * tsx apps/webapp/app/bullmq/start-workers.ts */ import { logger } from "~/services/logger.service"; import { ingestWorker, documentIngestWorker, conversationTitleWorker, sessionCompactionWorker, closeAllWorkers, bertTopicWorker, spaceAssignmentWorker, spaceSummaryWorker, } from "./workers"; import { ingestQueue, documentIngestQueue, conversationTitleQueue, sessionCompactionQueue, bertTopicQueue, spaceAssignmentQueue, spaceSummaryQueue, } from "./queues"; import { setupWorkerLogging, startPeriodicMetricsLogging, } from "./utils/worker-logger"; let metricsInterval: NodeJS.Timeout | null = null; /** * Initialize and start all BullMQ workers with comprehensive logging */ export async function initWorkers(): Promise { // Setup comprehensive logging for all workers setupWorkerLogging(ingestWorker, ingestQueue, "ingest-episode"); setupWorkerLogging( documentIngestWorker, documentIngestQueue, "ingest-document", ); setupWorkerLogging( conversationTitleWorker, conversationTitleQueue, "conversation-title", ); setupWorkerLogging( sessionCompactionWorker, sessionCompactionQueue, "session-compaction", ); setupWorkerLogging(bertTopicWorker, bertTopicQueue, "bert-topic"); setupWorkerLogging( spaceAssignmentWorker, spaceAssignmentQueue, "space-assignment", ); setupWorkerLogging(spaceSummaryWorker, spaceSummaryQueue, "space-summary"); // Start periodic metrics logging (every 60 seconds) metricsInterval = startPeriodicMetricsLogging( [ { worker: ingestWorker, queue: ingestQueue, name: "ingest-episode" }, { worker: documentIngestWorker, queue: documentIngestQueue, name: "ingest-document", }, { worker: conversationTitleWorker, queue: conversationTitleQueue, name: "conversation-title", }, { worker: sessionCompactionWorker, queue: sessionCompactionQueue, name: "session-compaction", }, { worker: bertTopicWorker, queue: bertTopicQueue, name: "bert-topic", }, { worker: spaceAssignmentWorker, queue: spaceAssignmentQueue, name: "space-assignment", }, { worker: spaceSummaryWorker, queue: spaceAssignmentQueue, name: "space-summary", }, ], 60000, // Log metrics every 60 seconds ); // Log worker startup logger.log("\nšŸš€ Starting BullMQ workers..."); logger.log("─".repeat(80)); logger.log(`āœ“ Ingest worker: ${ingestWorker.name} (concurrency: 5)`); logger.log( `āœ“ Document ingest worker: ${documentIngestWorker.name} (concurrency: 3)`, ); logger.log( `āœ“ Conversation title worker: ${conversationTitleWorker.name} (concurrency: 10)`, ); logger.log( `āœ“ Session compaction worker: ${sessionCompactionWorker.name} (concurrency: 3)`, ); logger.log("─".repeat(80)); logger.log("āœ… All BullMQ workers started and listening for jobs"); logger.log("šŸ“Š Metrics will be logged every 60 seconds\n"); } /** * Shutdown all workers gracefully */ export async function shutdownWorkers(): Promise { logger.log("Shutdown signal received, closing workers gracefully..."); if (metricsInterval) { clearInterval(metricsInterval); } await closeAllWorkers(); } // If running as standalone script, initialize workers if (import.meta.url === `file://${process.argv[1]}`) { initWorkers(); // Handle graceful shutdown const shutdown = async () => { await shutdownWorkers(); process.exit(0); }; process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); }