feat: add user-scoped queries and admin-only reingestion

This commit is contained in:
Manoj K 2025-07-03 15:36:25 +05:30
parent 7ac663b7ff
commit 39663db274
4 changed files with 55 additions and 12 deletions

View File

@ -1,10 +1,10 @@
import { json } from "@remix-run/node";
import { z } from "zod";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { addToQueue, type IngestBodyRequest } from "~/lib/ingest.server";
import { getUserQueue, type IngestBodyRequest } from "~/lib/ingest.server";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.service";
import { IngestionStatus } from "@core/database";
import { IngestionStatus, type Prisma } from "@core/database";
const ReingestionBodyRequest = z.object({
userId: z.string().optional(),
@ -15,9 +15,8 @@ const ReingestionBodyRequest = z.object({
type ReingestionRequest = z.infer<typeof ReingestionBodyRequest>;
async function getCompletedIngestionsByUser(userId?: string, spaceId?: string) {
const whereClause: any = {
const whereClause: Prisma.IngestionQueueWhereInput = {
status: IngestionStatus.COMPLETED,
deleted: null
};
if (userId) {
@ -81,6 +80,7 @@ async function reingestionForUser(userId: string, spaceId?: string, dryRun = fal
// Queue ingestions in temporal order (already sorted by createdAt ASC)
const queuedJobs = [];
const ingestionQueue = getUserQueue(userId);
for (const ingestion of ingestions) {
try {
// Parse the original data and add reingestion metadata
@ -96,8 +96,20 @@ async function reingestionForUser(userId: string, spaceId?: string, dryRun = fal
},
};
const queueResult = await addToQueue(reingestionData, userId);
queuedJobs.push(queueResult);
const jobDetails = await ingestionQueue.add(
`ingest-user-${userId}`,
{
queueId: ingestion.id,
spaceId: ingestion.spaceId,
userId: userId,
body: ingestion.data,
},
{
jobId: `${userId}-${Date.now()}`,
},
);
queuedJobs.push({id: jobDetails.id});
} catch (error) {
logger.error(`Failed to queue ingestion ${ingestion.id} for user ${userId}:`, {error});
}
@ -124,6 +136,23 @@ const { action, loader } = createActionApiRoute(
const { userId, spaceId, dryRun } = body;
try {
// Check if the user is an admin
const user = await prisma.user.findUnique({
where: { id: authentication.userId }
});
if (!user || user.admin !== true) {
logger.warn("Unauthorized reingest attempt", {
requestUserId: authentication.userId,
});
return json(
{
success: false,
error: "Unauthorized: Only admin users can perform reingestion"
},
{ status: 403 }
);
}
if (userId) {
// Reingest for specific user
const result = await reingestionForUser(userId, spaceId, dryRun);

View File

@ -101,19 +101,22 @@ export async function saveTriple(triple: Triple): Promise<string> {
export async function findContradictoryStatements({
subjectId,
predicateId,
userId,
}: {
subjectId: string;
predicateId: string;
userId: string;
}): Promise<StatementNode[]> {
const query = `
MATCH (statement:Statement)
WHERE statement.invalidAt IS NULL
WHERE statement.userId = $userId
AND statement.invalidAt IS NULL
MATCH (subject:Entity)<-[:HAS_SUBJECT]-(statement)-[:HAS_PREDICATE]->(predicate:Entity)
WHERE subject.uuid = $subjectId AND predicate.uuid = $predicateId
RETURN statement
`;
const result = await runQuery(query, { subjectId, predicateId });
const result = await runQuery(query, { subjectId, predicateId, userId });
if (!result || result.length === 0) {
return [];
@ -143,14 +146,17 @@ export async function findSimilarStatements({
factEmbedding,
threshold = 0.85,
excludeIds = [],
userId,
}: {
factEmbedding: number[];
threshold?: number;
excludeIds?: string[];
userId: string;
}): Promise<StatementNode[]> {
const query = `
MATCH (statement:Statement)
WHERE statement.invalidAt IS NULL
WHERE statement.userId = $userId
AND statement.invalidAt IS NULL
AND statement.factEmbedding IS NOT NULL
${excludeIds.length > 0 ? "AND NOT statement.uuid IN $excludeIds" : ""}
WITH statement, vector.similarity.cosine($factEmbedding, statement.factEmbedding) AS score
@ -163,6 +169,7 @@ export async function findSimilarStatements({
factEmbedding,
threshold,
excludeIds,
userId,
});
if (!result || result.length === 0) {
@ -334,7 +341,8 @@ export async function searchStatementsByEmbedding(params: {
}) {
const query = `
MATCH (statement:Statement)
WHERE statement.invalidAt IS NULL
WHERE statement.userId = $userId
AND statement.invalidAt IS NULL
AND statement.factEmbedding IS NOT NULL
WITH statement,
CASE
@ -351,6 +359,7 @@ export async function searchStatementsByEmbedding(params: {
embedding: params.embedding,
minSimilarity: params.minSimilarity,
limit: params.limit,
userId: params.userId,
});
if (!result || result.length === 0) {

View File

@ -600,6 +600,7 @@ export class KnowledgeGraphService {
const exactMatches = await findContradictoryStatements({
subjectId: triple.subject.uuid,
predicateId: triple.predicate.uuid,
userId: triple.provenance.userId,
});
if (exactMatches && exactMatches.length > 0) {
@ -612,6 +613,7 @@ export class KnowledgeGraphService {
factEmbedding: triple.statement.factEmbedding,
threshold: 0.85,
excludeIds: checkedStatementIds,
userId: triple.provenance.userId,
});
if (semanticMatches && semanticMatches.length > 0) {

View File

@ -138,7 +138,7 @@ export async function performBfsSearch(
): Promise<StatementNode[]> {
try {
// 1. Extract potential entities from query
const entities = await extractEntitiesFromQuery(embedding);
const entities = await extractEntitiesFromQuery(embedding, userId);
// 2. For each entity, perform BFS traversal
const allStatements: StatementNode[] = [];
@ -223,13 +223,15 @@ export async function bfsTraversal(
*/
export async function extractEntitiesFromQuery(
embedding: Embedding,
userId: string,
): Promise<EntityNode[]> {
try {
// Use vector similarity to find relevant entities
const cypher = `
// Match entities using vector similarity on name embeddings
MATCH (e:Entity)
WHERE e.nameEmbedding IS NOT NULL
WHERE e.nameEmbedding IS NOT NULL
AND e.userId = $userId
WITH e, vector.similarity.cosine(e.nameEmbedding, $embedding) AS score
WHERE score > 0.7
RETURN e
@ -239,6 +241,7 @@ export async function extractEntitiesFromQuery(
const params = {
embedding,
userId,
};
const records = await runQuery(cypher, params);