From 39663db27401a514bbf20d01fc95d76a8c3c0d75 Mon Sep 17 00:00:00 2001 From: Manoj K Date: Thu, 3 Jul 2025 15:36:25 +0530 Subject: [PATCH] feat: add user-scoped queries and admin-only reingestion --- apps/webapp/app/routes/reingest.tsx | 41 ++++++++++++++++--- .../app/services/graphModels/statement.ts | 17 ++++++-- .../app/services/knowledgeGraph.server.ts | 2 + apps/webapp/app/services/search/utils.ts | 7 +++- 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/apps/webapp/app/routes/reingest.tsx b/apps/webapp/app/routes/reingest.tsx index d619c54..e783617 100644 --- a/apps/webapp/app/routes/reingest.tsx +++ b/apps/webapp/app/routes/reingest.tsx @@ -1,10 +1,10 @@ import { json } from "@remix-run/node"; import { z } from "zod"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { addToQueue, type IngestBodyRequest } from "~/lib/ingest.server"; +import { getUserQueue, type IngestBodyRequest } from "~/lib/ingest.server"; import { prisma } from "~/db.server"; import { logger } from "~/services/logger.service"; -import { IngestionStatus } from "@core/database"; +import { IngestionStatus, type Prisma } from "@core/database"; const ReingestionBodyRequest = z.object({ userId: z.string().optional(), @@ -15,9 +15,8 @@ const ReingestionBodyRequest = z.object({ type ReingestionRequest = z.infer; async function getCompletedIngestionsByUser(userId?: string, spaceId?: string) { - const whereClause: any = { + const whereClause: Prisma.IngestionQueueWhereInput = { status: IngestionStatus.COMPLETED, - deleted: null }; if (userId) { @@ -81,6 +80,7 @@ async function reingestionForUser(userId: string, spaceId?: string, dryRun = fal // Queue ingestions in temporal order (already sorted by createdAt ASC) const queuedJobs = []; + const ingestionQueue = getUserQueue(userId); for (const ingestion of ingestions) { try { // Parse the original data and add reingestion metadata @@ -96,8 +96,20 @@ async function reingestionForUser(userId: string, spaceId?: string, dryRun = fal }, }; - const queueResult = await addToQueue(reingestionData, userId); - queuedJobs.push(queueResult); + const jobDetails = await ingestionQueue.add( + `ingest-user-${userId}`, + { + queueId: ingestion.id, + spaceId: ingestion.spaceId, + userId: userId, + body: ingestion.data, + }, + { + jobId: `${userId}-${Date.now()}`, + }, + ); + + queuedJobs.push({id: jobDetails.id}); } catch (error) { logger.error(`Failed to queue ingestion ${ingestion.id} for user ${userId}:`, {error}); } @@ -124,6 +136,23 @@ const { action, loader } = createActionApiRoute( const { userId, spaceId, dryRun } = body; try { + // Check if the user is an admin + const user = await prisma.user.findUnique({ + where: { id: authentication.userId } + }); + + if (!user || user.admin !== true) { + logger.warn("Unauthorized reingest attempt", { + requestUserId: authentication.userId, + }); + return json( + { + success: false, + error: "Unauthorized: Only admin users can perform reingestion" + }, + { status: 403 } + ); + } if (userId) { // Reingest for specific user const result = await reingestionForUser(userId, spaceId, dryRun); diff --git a/apps/webapp/app/services/graphModels/statement.ts b/apps/webapp/app/services/graphModels/statement.ts index c69752c..d5e9164 100644 --- a/apps/webapp/app/services/graphModels/statement.ts +++ b/apps/webapp/app/services/graphModels/statement.ts @@ -101,19 +101,22 @@ export async function saveTriple(triple: Triple): Promise { export async function findContradictoryStatements({ subjectId, predicateId, + userId, }: { subjectId: string; predicateId: string; + userId: string; }): Promise { const query = ` MATCH (statement:Statement) - WHERE statement.invalidAt IS NULL + WHERE statement.userId = $userId + AND statement.invalidAt IS NULL MATCH (subject:Entity)<-[:HAS_SUBJECT]-(statement)-[:HAS_PREDICATE]->(predicate:Entity) WHERE subject.uuid = $subjectId AND predicate.uuid = $predicateId RETURN statement `; - const result = await runQuery(query, { subjectId, predicateId }); + const result = await runQuery(query, { subjectId, predicateId, userId }); if (!result || result.length === 0) { return []; @@ -143,14 +146,17 @@ export async function findSimilarStatements({ factEmbedding, threshold = 0.85, excludeIds = [], + userId, }: { factEmbedding: number[]; threshold?: number; excludeIds?: string[]; + userId: string; }): Promise { const query = ` MATCH (statement:Statement) - WHERE statement.invalidAt IS NULL + WHERE statement.userId = $userId + AND statement.invalidAt IS NULL AND statement.factEmbedding IS NOT NULL ${excludeIds.length > 0 ? "AND NOT statement.uuid IN $excludeIds" : ""} WITH statement, vector.similarity.cosine($factEmbedding, statement.factEmbedding) AS score @@ -163,6 +169,7 @@ export async function findSimilarStatements({ factEmbedding, threshold, excludeIds, + userId, }); if (!result || result.length === 0) { @@ -334,7 +341,8 @@ export async function searchStatementsByEmbedding(params: { }) { const query = ` MATCH (statement:Statement) - WHERE statement.invalidAt IS NULL + WHERE statement.userId = $userId + AND statement.invalidAt IS NULL AND statement.factEmbedding IS NOT NULL WITH statement, CASE @@ -351,6 +359,7 @@ export async function searchStatementsByEmbedding(params: { embedding: params.embedding, minSimilarity: params.minSimilarity, limit: params.limit, + userId: params.userId, }); if (!result || result.length === 0) { diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index 3a7770d..67b1f3a 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -600,6 +600,7 @@ export class KnowledgeGraphService { const exactMatches = await findContradictoryStatements({ subjectId: triple.subject.uuid, predicateId: triple.predicate.uuid, + userId: triple.provenance.userId, }); if (exactMatches && exactMatches.length > 0) { @@ -612,6 +613,7 @@ export class KnowledgeGraphService { factEmbedding: triple.statement.factEmbedding, threshold: 0.85, excludeIds: checkedStatementIds, + userId: triple.provenance.userId, }); if (semanticMatches && semanticMatches.length > 0) { diff --git a/apps/webapp/app/services/search/utils.ts b/apps/webapp/app/services/search/utils.ts index 9f2026e..b37fc08 100644 --- a/apps/webapp/app/services/search/utils.ts +++ b/apps/webapp/app/services/search/utils.ts @@ -138,7 +138,7 @@ export async function performBfsSearch( ): Promise { try { // 1. Extract potential entities from query - const entities = await extractEntitiesFromQuery(embedding); + const entities = await extractEntitiesFromQuery(embedding, userId); // 2. For each entity, perform BFS traversal const allStatements: StatementNode[] = []; @@ -223,13 +223,15 @@ export async function bfsTraversal( */ export async function extractEntitiesFromQuery( embedding: Embedding, + userId: string, ): Promise { try { // Use vector similarity to find relevant entities const cypher = ` // Match entities using vector similarity on name embeddings MATCH (e:Entity) - WHERE e.nameEmbedding IS NOT NULL + WHERE e.nameEmbedding IS NOT NULL + AND e.userId = $userId WITH e, vector.similarity.cosine(e.nameEmbedding, $embedding) AS score WHERE score > 0.7 RETURN e @@ -239,6 +241,7 @@ export async function extractEntitiesFromQuery( const params = { embedding, + userId, }; const records = await runQuery(cypher, params);