feat: add logger to bullmq workers

This commit is contained in:
Harshith Mullapudi 2025-10-25 16:26:05 +05:30
parent a548bae670
commit cf91a824d1
7 changed files with 305 additions and 34 deletions

View File

@ -17,27 +17,107 @@ import {
sessionCompactionWorker,
closeAllWorkers,
} from "./workers";
import {
ingestQueue,
documentIngestQueue,
conversationTitleQueue,
deepSearchQueue,
sessionCompactionQueue,
} from "./queues";
import {
setupWorkerLogging,
startPeriodicMetricsLogging,
} from "./utils/worker-logger";
export async function startWorkers() {}
let metricsInterval: NodeJS.Timeout | null = null;
// Handle graceful shutdown
process.on("SIGTERM", async () => {
logger.log("SIGTERM received, closing workers gracefully...");
/**
* Initialize and start all BullMQ workers with comprehensive logging
*/
export async function initWorkers(): Promise<void> {
// Setup comprehensive logging for all workers
setupWorkerLogging(ingestWorker, ingestQueue, "ingest-episode");
setupWorkerLogging(
documentIngestWorker,
documentIngestQueue,
"ingest-document",
);
setupWorkerLogging(
conversationTitleWorker,
conversationTitleQueue,
"conversation-title",
);
setupWorkerLogging(deepSearchWorker, deepSearchQueue, "deep-search");
setupWorkerLogging(
sessionCompactionWorker,
sessionCompactionQueue,
"session-compaction",
);
// Start periodic metrics logging (every 60 seconds)
metricsInterval = startPeriodicMetricsLogging(
[
{ worker: ingestWorker, queue: ingestQueue, name: "ingest-episode" },
{
worker: documentIngestWorker,
queue: documentIngestQueue,
name: "ingest-document",
},
{
worker: conversationTitleWorker,
queue: conversationTitleQueue,
name: "conversation-title",
},
{ worker: deepSearchWorker, queue: deepSearchQueue, name: "deep-search" },
{
worker: sessionCompactionWorker,
queue: sessionCompactionQueue,
name: "session-compaction",
},
],
60000, // Log metrics every 60 seconds
);
// Log worker startup
logger.log("\n🚀 Starting BullMQ workers...");
logger.log("─".repeat(80));
logger.log(`✓ Ingest worker: ${ingestWorker.name} (concurrency: 5)`);
logger.log(
`✓ Document ingest worker: ${documentIngestWorker.name} (concurrency: 3)`,
);
logger.log(
`✓ Conversation title worker: ${conversationTitleWorker.name} (concurrency: 10)`,
);
logger.log(`✓ Deep search worker: ${deepSearchWorker.name} (concurrency: 5)`);
logger.log(
`✓ Session compaction worker: ${sessionCompactionWorker.name} (concurrency: 3)`,
);
logger.log("─".repeat(80));
logger.log("✅ All BullMQ workers started and listening for jobs");
logger.log("📊 Metrics will be logged every 60 seconds\n");
}
/**
* Shutdown all workers gracefully
*/
export async function shutdownWorkers(): Promise<void> {
logger.log("Shutdown signal received, closing workers gracefully...");
if (metricsInterval) {
clearInterval(metricsInterval);
}
await closeAllWorkers();
process.exit(0);
});
}
process.on("SIGINT", async () => {
logger.log("SIGINT received, closing workers gracefully...");
await closeAllWorkers();
process.exit(0);
});
// If running as standalone script, initialize workers
if (import.meta.url === `file://${process.argv[1]}`) {
initWorkers();
// Log worker startup
logger.log("Starting BullMQ workers...");
logger.log(`- Ingest worker: ${ingestWorker.name}`);
logger.log(`- Document ingest worker: ${documentIngestWorker.name}`);
logger.log(`- Conversation title worker: ${conversationTitleWorker.name}`);
logger.log(`- Deep search worker: ${deepSearchWorker.name}`);
logger.log(`- Session compaction worker: ${sessionCompactionWorker.name}`);
logger.log("All BullMQ workers started and listening for jobs");
// Handle graceful shutdown
const shutdown = async () => {
await shutdownWorkers();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}

View File

@ -0,0 +1,184 @@
/**
* BullMQ Worker Logger
*
* Comprehensive logging utility for tracking worker status, queue metrics,
* and job lifecycle events
*/
import { type Worker, type Queue } from "bullmq";
import { logger } from "~/services/logger.service";
interface WorkerMetrics {
name: string;
concurrency: number;
activeJobs: number;
waitingJobs: number;
delayedJobs: number;
failedJobs: number;
completedJobs: number;
}
/**
* Setup comprehensive logging for a worker
*/
export function setupWorkerLogging(
worker: Worker,
queue: Queue,
workerName: string,
): void {
// Job picked up and started processing
worker.on("active", async (job) => {
const counts = await getQueueCounts(queue);
logger.log(
`[${workerName}] 🔄 Job started: ${job.id} | Queue: ${counts.waiting} waiting, ${counts.active} active, ${counts.delayed} delayed`,
);
});
// Job completed successfully
worker.on("completed", async (job, result) => {
const counts = await getQueueCounts(queue);
const duration = job.finishedOn ? job.finishedOn - job.processedOn! : 0;
logger.log(
`[${workerName}] ✅ Job completed: ${job.id} (${duration}ms) | Queue: ${counts.waiting} waiting, ${counts.active} active`,
);
});
// Job failed
worker.on("failed", async (job, error) => {
const counts = await getQueueCounts(queue);
const attempt = job?.attemptsMade || 0;
const maxAttempts = job?.opts?.attempts || 3;
logger.error(
`[${workerName}] ❌ Job failed: ${job?.id} (attempt ${attempt}/${maxAttempts}) | Error: ${error.message} | Queue: ${counts.waiting} waiting, ${counts.failed} failed`,
);
});
// Job progress update (if job reports progress)
worker.on("progress", async (job, progress) => {
logger.log(`[${workerName}] 📊 Job progress: ${job.id} - ${progress}%`);
});
// Worker stalled (job took too long)
worker.on("stalled", async (jobId) => {
logger.warn(`[${workerName}] ⚠️ Job stalled: ${jobId}`);
});
// Worker error
worker.on("error", (error) => {
logger.error(`[${workerName}] 🔥 Worker error: ${error.message}`);
});
// Worker closed
worker.on("closed", () => {
logger.log(`[${workerName}] 🛑 Worker closed`);
});
}
/**
* Get queue counts for logging
*/
async function getQueueCounts(queue: Queue): Promise<{
waiting: number;
active: number;
delayed: number;
failed: number;
completed: number;
}> {
try {
const counts = await queue.getJobCounts(
"waiting",
"active",
"delayed",
"failed",
"completed",
);
return {
waiting: counts.waiting || 0,
active: counts.active || 0,
delayed: counts.delayed || 0,
failed: counts.failed || 0,
completed: counts.completed || 0,
};
} catch (error) {
return { waiting: 0, active: 0, delayed: 0, failed: 0, completed: 0 };
}
}
/**
* Get metrics for all workers
*/
export async function getAllWorkerMetrics(
workers: Array<{ worker: Worker; queue: Queue; name: string }>,
): Promise<WorkerMetrics[]> {
const metrics = await Promise.all(
workers.map(async ({ worker, queue, name }) => {
const counts = await getQueueCounts(queue);
return {
name,
concurrency: worker.opts.concurrency || 1,
activeJobs: counts.active,
waitingJobs: counts.waiting,
delayedJobs: counts.delayed,
failedJobs: counts.failed,
completedJobs: counts.completed,
};
}),
);
return metrics;
}
/**
* Log worker metrics summary
*/
export function logWorkerMetrics(metrics: WorkerMetrics[]): void {
logger.log("\n📊 BullMQ Worker Metrics:");
logger.log("─".repeat(80));
for (const metric of metrics) {
logger.log(
`[${metric.name.padEnd(25)}] Concurrency: ${metric.concurrency} | ` +
`Active: ${metric.activeJobs} | Waiting: ${metric.waitingJobs} | ` +
`Delayed: ${metric.delayedJobs} | Failed: ${metric.failedJobs} | ` +
`Completed: ${metric.completedJobs}`,
);
}
const totals = metrics.reduce(
(acc, m) => ({
active: acc.active + m.activeJobs,
waiting: acc.waiting + m.waitingJobs,
delayed: acc.delayed + m.delayedJobs,
failed: acc.failed + m.failedJobs,
completed: acc.completed + m.completedJobs,
}),
{ active: 0, waiting: 0, delayed: 0, failed: 0, completed: 0 },
);
logger.log("─".repeat(80));
logger.log(
`[TOTAL] Active: ${totals.active} | Waiting: ${totals.waiting} | ` +
`Delayed: ${totals.delayed} | Failed: ${totals.failed} | ` +
`Completed: ${totals.completed}`,
);
logger.log("─".repeat(80) + "\n");
}
/**
* Start periodic metrics logging
*/
export function startPeriodicMetricsLogging(
workers: Array<{ worker: Worker; queue: Queue; name: string }>,
intervalMs: number = 60000, // Default: 1 minute
): NodeJS.Timeout {
const logMetrics = async () => {
const metrics = await getAllWorkerMetrics(workers);
logWorkerMetrics(metrics);
};
// Log immediately on start
logMetrics();
// Then log periodically
return setInterval(logMetrics, intervalMs);
}

View File

@ -6,7 +6,6 @@ import {
getOrCreatePersonalAccessToken,
} from "~/trigger/utils/utils";
import { getReActPrompt } from "~/trigger/deep-search/prompt";
import { type DeepSearchPayload, type DeepSearchResponse } from "~/trigger/deep-search/types";
import { createSearchMemoryTool } from "~/trigger/deep-search/utils";
import { run } from "~/trigger/deep-search/deep-search-utils";
import { AgentMessageType } from "~/trigger/chat/types";

View File

@ -1,15 +1,15 @@
import { type z } from "zod";
import crypto from "crypto";
import { IngestionStatus } from "@core/database";
import { EpisodeTypeEnum } from "@core/types";
import { logger } from "~/services/logger.service";
import { saveDocument } from "~/services/graphModels/document";
import { type IngestBodyRequest } from "~/lib/ingest.server";
import { DocumentVersioningService } from "~/services/documentVersioning.server";
import { DocumentDifferentialService } from "~/services/documentDiffer.server";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { prisma } from "~/trigger/utils/prisma";
import { type IngestBodyRequest } from "./ingest-episode.logic";
export interface IngestDocumentPayload {
body: z.infer<typeof IngestBodyRequest>;
@ -140,9 +140,7 @@ export async function processDocumentIngestion(
});
}
logger.log(
`Document chunked into ${chunkedDocument.chunks.length} chunks`,
);
logger.log(`Document chunked into ${chunkedDocument.chunks.length} chunks`);
// Step 4: Process chunks based on differential strategy
let chunksToProcess = chunkedDocument.chunks;
@ -286,10 +284,7 @@ export async function processDocumentIngestion(
},
});
logger.error(
`Error processing document for user ${payload.userId}:`,
err,
);
logger.error(`Error processing document for user ${payload.userId}:`, err);
return { success: false, error: err.message };
}
}

View File

@ -7,7 +7,6 @@ import { prisma } from "~/trigger/utils/prisma";
import { EpisodeType } from "@core/types";
import { deductCredits, hasCredits } from "~/trigger/utils/utils";
import { assignEpisodesToSpace } from "~/services/graphModels/space";
import { trackEvent, trackError } from "~/services/telemetry.server";
export const IngestBodyRequest = z.object({
episodeBody: z.string(),

View File

@ -2,7 +2,7 @@ import { logger } from "~/services/logger.service";
import { fetchAndSaveStdioIntegrations } from "~/trigger/utils/mcp";
import { initNeo4jSchemaOnce } from "~/lib/neo4j.server";
import { env } from "~/env.server";
import { startWorkers } from "~/bullmq/start-workers";
import { initWorkers, shutdownWorkers } from "~/bullmq/start-workers";
import { trackConfig } from "~/services/telemetry.server";
// Global flag to ensure startup only runs once per server process
@ -47,7 +47,11 @@ export async function initializeStartupServices() {
const triggerApiUrl = env.TRIGGER_API_URL;
// At this point, env validation should have already ensured these are present
// But we add a runtime check for safety
if (!triggerApiUrl || !env.TRIGGER_PROJECT_ID || !env.TRIGGER_SECRET_KEY) {
if (
!triggerApiUrl ||
!env.TRIGGER_PROJECT_ID ||
!env.TRIGGER_SECRET_KEY
) {
console.error(
"TRIGGER_API_URL, TRIGGER_PROJECT_ID, and TRIGGER_SECRET_KEY must be set when QUEUE_PROVIDER=trigger",
);
@ -61,7 +65,16 @@ export async function initializeStartupServices() {
process.exit(1);
}
} else {
await startWorkers();
await initWorkers();
// Handle graceful shutdown
process.on("SIGTERM", async () => {
await shutdownWorkers();
});
process.on("SIGINT", async () => {
await shutdownWorkers();
process.exit(0);
});
}
try {

View File

@ -84,7 +84,7 @@ services:
neo4j:
container_name: core-neo4j
image: core-neo4j:0.1.0
image: neo4j:5
environment:
- NEO4J_AUTH=${NEO4J_AUTH}
- NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.*
@ -94,6 +94,7 @@ services:
- NEO4J_apoc_import_file_use_neo4j_config=true
- NEO4J_server_memory_heap_initial__size=2G
- NEO4J_server_memory_heap_max__size=4G
- NEO4JLABS_PLUGINS=apoc,graph-data-science
ports:
- "7474:7474"
- "7687:7687"