+
@@ -58,44 +58,52 @@ export default function LogsActivity() {
]}
/>
- {logs.length > 0 && (
-
- )}
+ {isInitialLoad ? (
+ <>
+
{" "}
+ >
+ ) : (
+ <>
+ {logs.length > 0 && (
+
+ )}
- {/* Logs List */}
-
- {logs.length === 0 ? (
-
-
-
-
-
- No activity logs found
-
-
- {selectedSource || selectedStatus
- ? "Try adjusting your filters to see more results."
- : "No activity ingestion logs are available yet."}
-
-
-
-
- ) : (
-
- )}
-
+ {/* Logs List */}
+
+ {logs.length === 0 ? (
+
+
+
+
+
+ No activity logs found
+
+
+ {selectedSource || selectedStatus
+ ? "Try adjusting your filters to see more results."
+ : "No activity ingestion logs are available yet."}
+
+
+
+
+ ) : (
+
+ )}
+
+ >
+ )}
);
diff --git a/apps/webapp/app/routes/home.logs.all.tsx b/apps/webapp/app/routes/home.logs.all.tsx
index 232b114..2797967 100644
--- a/apps/webapp/app/routes/home.logs.all.tsx
+++ b/apps/webapp/app/routes/home.logs.all.tsx
@@ -26,18 +26,6 @@ export default function LogsAll() {
status: selectedStatus,
});
- if (isInitialLoad) {
- return (
-
- {/* Filters */}
- {logs.length > 0 && (
-
+ {isInitialLoad ? (
+ <>
+
{" "}
+ >
+ ) : (
+ <>
+ {" "}
+ {/* Filters */}
+ {logs.length > 0 && (
+
+ )}
+ {/* Logs List */}
+
+ {logs.length === 0 ? (
+
+
+
+
+
+ No logs found
+
+
+ {selectedSource || selectedStatus
+ ? "Try adjusting your filters to see more results."
+ : "No ingestion logs are available yet."}
+
+
+
+
+ ) : (
+
+ )}
+
+ >
)}
-
- {/* Logs List */}
-
- {logs.length === 0 ? (
-
-
-
-
-
No logs found
-
- {selectedSource || selectedStatus
- ? "Try adjusting your filters to see more results."
- : "No ingestion logs are available yet."}
-
-
-
-
- ) : (
-
- )}
-
>
);
diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts
index 8812905..9cbc9ec 100644
--- a/apps/webapp/app/services/knowledgeGraph.server.ts
+++ b/apps/webapp/app/services/knowledgeGraph.server.ts
@@ -748,7 +748,7 @@ export class KnowledgeGraphService {
// Phase 2: Find semantically similar statements
const semanticMatches = await findSimilarStatements({
factEmbedding: triple.statement.factEmbedding,
- threshold: 0.7,
+ threshold: 0.85,
excludeIds: checkedStatementIds,
userId: triple.provenance.userId,
});
diff --git a/apps/webapp/app/trigger/chat/chat.ts b/apps/webapp/app/trigger/chat/chat.ts
index efc8823..08fd44f 100644
--- a/apps/webapp/app/trigger/chat/chat.ts
+++ b/apps/webapp/app/trigger/chat/chat.ts
@@ -17,7 +17,7 @@ import {
const chatQueue = queue({
name: "chat-queue",
- concurrencyLimit: 10,
+ concurrencyLimit: 50,
});
/**
diff --git a/apps/webapp/app/trigger/temp.ts b/apps/webapp/app/trigger/temp.ts
new file mode 100644
index 0000000..780467c
--- /dev/null
+++ b/apps/webapp/app/trigger/temp.ts
@@ -0,0 +1,334 @@
+import { task, logger, queue } from "@trigger.dev/sdk";
+import { runQuery } from "~/lib/neo4j.server";
+import { getEmbedding } from "~/lib/model.server";
+import type { EntityNode } from "@core/types";
+
+interface EntityUpdateResult {
+ uuid: string;
+ name: string;
+ type: string;
+ success: boolean;
+ error?: string;
+}
+
+interface BatchResult {
+ batchId: string;
+ entities: number;
+ successful: number;
+ failed: number;
+ results: EntityUpdateResult[];
+}
+
+export const entity = queue({
+ name: "entity-queue",
+ concurrencyLimit: 10,
+});
+
+/**
+ * Main orchestrator task that fans out batches of 100 entities
+ */
+export const updateAllEntityEmbeddings = task({
+ id: "update-all-entity-embeddings",
+ machine: "large-1x",
+
+ run: async (payload: { userId?: string; batchSize?: number } = {}) => {
+ const { userId, batchSize = 100 } = payload;
+
+ logger.info("Starting entity embeddings update with fan-out approach", {
+ userId,
+ batchSize,
+ targetScope: userId ? `user ${userId}` : "all users",
+ });
+
+ try {
+ // Step 1: Fetch all entities
+ const entities = await getAllEntities(userId);
+ logger.info(`Found ${entities.length} entities to update`);
+
+ if (entities.length === 0) {
+ return {
+ success: true,
+ totalEntities: 0,
+ totalBatches: 0,
+ updated: 0,
+ failed: 0,
+ batchResults: [],
+ };
+ }
+
+ // Step 2: Split entities into batches and fan out
+ const batches: EntityNode[][] = [];
+ for (let i = 0; i < entities.length; i += batchSize) {
+ batches.push(entities.slice(i, i + batchSize));
+ }
+
+ logger.info(
+ `Fanning out ${batches.length} batches of ~${batchSize} entities each`,
+ );
+
+ // Step 3: Fan out batch processing tasks in parallel
+ const batchPromises = batches.map((batch, index) =>
+ updateEntityBatch.trigger({
+ entities: batch,
+ batchId: `batch-${index + 1}`,
+ batchNumber: index + 1,
+ totalBatches: batches.length,
+ }),
+ );
+
+ // Wait for all batch tasks to complete
+ const batchRuns = await Promise.all(batchPromises);
+
+ // Step 4: Collect results from all batches
+ const batchResults: BatchResult[] = [];
+ let totalUpdated = 0;
+ let totalFailed = 0;
+
+ for (const run of batchRuns) {
+ try {
+ // Note: In a real implementation, you'd need to wait for the run to complete
+ // and fetch its result. This is a simplified version.
+ logger.info(`Batch run ${run.id} started successfully`);
+ } catch (error) {
+ logger.error(`Failed to start batch run:`, { error });
+ }
+ }
+
+ logger.info("All batches have been dispatched", {
+ totalBatches: batches.length,
+ totalEntities: entities.length,
+ batchRunIds: batchRuns.map((r) => r.id),
+ });
+
+ return {
+ success: true,
+ totalEntities: entities.length,
+ totalBatches: batches.length,
+ batchRunIds: batchRuns.map((r) => r.id),
+ message:
+ "All batches dispatched successfully. Check individual batch runs for detailed results.",
+ };
+ } catch (error) {
+ logger.error(
+ "Fatal error during entity embeddings update orchestration:",
+ { error },
+ );
+ throw error;
+ }
+ },
+});
+
+/**
+ * Worker task that processes a single batch of entities
+ */
+export const updateEntityBatch = task({
+ id: "update-entity-batch",
+ queue: entity,
+ run: async (payload: {
+ entities: EntityNode[];
+ batchId: string;
+ batchNumber: number;
+ totalBatches: number;
+ }) => {
+ const { entities, batchId, batchNumber, totalBatches } = payload;
+
+ logger.info(`Processing ${batchId} (${batchNumber}/${totalBatches})`, {
+ entityCount: entities.length,
+ });
+
+ const results: EntityUpdateResult[] = [];
+
+ try {
+ // Process all entities in this batch in parallel
+ const entityPromises = entities.map((entity) =>
+ updateEntityEmbeddings(entity),
+ );
+ const entityResults = await Promise.allSettled(entityPromises);
+
+ // Collect results
+ entityResults.forEach((result, index) => {
+ const entity = entities[index];
+ if (result.status === "fulfilled") {
+ results.push(result.value);
+ } else {
+ logger.error(
+ `Failed to update entity ${entity.uuid} in ${batchId}:`,
+ { error: result.reason },
+ );
+ results.push({
+ uuid: entity.uuid,
+ name: entity.name,
+ type: entity.type,
+ success: false,
+ error: result.reason?.message || "Unknown error",
+ });
+ }
+ });
+
+ const successful = results.filter((r) => r.success).length;
+ const failed = results.filter((r) => !r.success).length;
+
+ logger.info(`Completed ${batchId}`, {
+ total: entities.length,
+ successful,
+ failed,
+ successRate: `${((successful / entities.length) * 100).toFixed(2)}%`,
+ });
+
+ return {
+ batchId,
+ batchNumber,
+ totalBatches,
+ entities: entities.length,
+ successful,
+ failed,
+ results,
+ };
+ } catch (error) {
+ logger.error(`Fatal error in ${batchId}:`, { error });
+ throw error;
+ }
+ },
+});
+
+/**
+ * Fetch all entities from Neo4j database
+ */
+async function getAllEntities(userId?: string): Promise