From 14d38b017165b2f4921c8378d3f3e07b702186a7 Mon Sep 17 00:00:00 2001 From: Manoj K Date: Thu, 28 Aug 2025 14:12:27 +0530 Subject: [PATCH] feat: add invalidatedBy field to statement model and optimize space queries --- apps/webapp/app/lib/neo4j.server.ts | 20 ++++++++++++++ apps/webapp/app/services/graphModels/space.ts | 27 +++++++++---------- .../app/services/graphModels/statement.ts | 24 ++++++++++++++--- .../app/services/knowledgeGraph.server.ts | 5 +++- .../app/trigger/spaces/space-assignment.ts | 11 ++++---- .../app/trigger/spaces/space-summary.ts | 13 +-------- packages/types/src/graph/graph.entity.ts | 1 + 7 files changed, 64 insertions(+), 37 deletions(-) diff --git a/apps/webapp/app/lib/neo4j.server.ts b/apps/webapp/app/lib/neo4j.server.ts index 22a498c..8c067a5 100644 --- a/apps/webapp/app/lib/neo4j.server.ts +++ b/apps/webapp/app/lib/neo4j.server.ts @@ -336,6 +336,26 @@ const initializeSchema = async () => { "CREATE INDEX cluster_aspect_type IF NOT EXISTS FOR (n:Cluster) ON (n.aspectType)", ); + // Space-optimized indexes for better query performance + await runQuery( + "CREATE INDEX space_user_uuid IF NOT EXISTS FOR (n:Space) ON (n.userId, n.uuid)", + ); + await runQuery( + "CREATE INDEX space_user_active IF NOT EXISTS FOR (n:Space) ON (n.userId, n.isActive)", + ); + await runQuery( + "CREATE INDEX statement_user_spaces IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.spaceIds)", + ); + await runQuery( + "CREATE INDEX statement_user_invalid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.invalidAt)", + ); + await runQuery( + "CREATE INDEX statement_user_uuid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.uuid)", + ); + await runQuery( + "CREATE INDEX entity_user_uuid IF NOT EXISTS FOR (n:Entity) ON (n.userId, n.uuid)", + ); + // Create vector indexes for semantic search (if using Neo4j 5.0+) await runQuery(` CREATE VECTOR INDEX entity_embedding IF NOT EXISTS FOR (n:Entity) ON n.nameEmbedding diff --git a/apps/webapp/app/services/graphModels/space.ts b/apps/webapp/app/services/graphModels/space.ts index 5ee9a9b..04c7b52 100644 --- a/apps/webapp/app/services/graphModels/space.ts +++ b/apps/webapp/app/services/graphModels/space.ts @@ -57,9 +57,9 @@ export async function getSpace( MATCH (s:Space {uuid: $spaceId, userId: $userId}) WHERE s.isActive = true - // Count statements in this space - OPTIONAL MATCH (stmt:Statement) - WHERE stmt.userId = $userId AND s.id IN stmt.spaceIds + // Count statements in this space using optimized approach + OPTIONAL MATCH (stmt:Statement {userId: $userId}) + WHERE stmt.spaceIds IS NOT NULL AND $spaceId IN stmt.spaceIds AND stmt.invalidAt IS NULL WITH s, count(stmt) as statementCount RETURN s, statementCount @@ -152,8 +152,8 @@ export async function deleteSpace( // 2. Clean up statement references (remove spaceId from spaceIds arrays) const cleanupQuery = ` - MATCH (s:Statement) - WHERE s.userId = $userId AND $spaceId IN s.spaceIds + MATCH (s:Statement {userId: $userId}) + WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId] RETURN count(s) as updatedStatements `; @@ -203,8 +203,8 @@ export async function assignStatementsToSpace( } const query = ` - MATCH (s:Statement) - WHERE s.uuid IN $statementIds AND s.userId = $userId + MATCH (s:Statement {userId: $userId}) + WHERE s.uuid IN $statementIds SET s.spaceIds = CASE WHEN s.spaceIds IS NULL THEN [$spaceId] WHEN $spaceId IN s.spaceIds THEN s.spaceIds @@ -244,8 +244,8 @@ export async function removeStatementsFromSpace( ): Promise { try { const query = ` - MATCH (s:Statement) - WHERE s.uuid IN $statementIds AND s.userId = $userId AND $spaceId IN s.spaceIds + MATCH (s:Statement {userId: $userId}) + WHERE s.uuid IN $statementIds AND s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId] RETURN count(s) as updated `; @@ -271,8 +271,8 @@ export async function removeStatementsFromSpace( */ export async function getSpaceStatements(spaceId: string, userId: string) { const query = ` - MATCH (s:Statement) - WHERE s.userId = $userId AND s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds + MATCH (s:Statement {userId: $userId}) + WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds AND s.invalidAt IS NULL MATCH (s)-[:HAS_SUBJECT]->(subj:Entity) MATCH (s)-[:HAS_PREDICATE]->(pred:Entity) MATCH (s)-[:HAS_OBJECT]->(obj:Entity) @@ -309,9 +309,8 @@ export async function getSpaceStatementCount( userId: string, ): Promise { const query = ` - MATCH (s:Statement) - WHERE s.userId = $userId - AND s.spaceIds IS NOT NULL + MATCH (s:Statement {userId: $userId}) + WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds AND s.invalidAt IS NULL RETURN count(s) as statementCount diff --git a/apps/webapp/app/services/graphModels/statement.ts b/apps/webapp/app/services/graphModels/statement.ts index bbdf0cc..5279d93 100644 --- a/apps/webapp/app/services/graphModels/statement.ts +++ b/apps/webapp/app/services/graphModels/statement.ts @@ -22,6 +22,7 @@ export async function saveTriple(triple: Triple): Promise { n.createdAt = $createdAt, n.validAt = $validAt, n.invalidAt = $invalidAt, + n.invalidatedBy = $invalidatedBy, n.attributes = $attributes, n.userId = $userId, n.space = $space @@ -30,6 +31,7 @@ export async function saveTriple(triple: Triple): Promise { n.factEmbedding = $factEmbedding, n.validAt = $validAt, n.invalidAt = $invalidAt, + n.invalidatedBy = $invalidatedBy, n.attributes = $attributes, n.space = $space RETURN n.uuid as uuid @@ -44,6 +46,7 @@ export async function saveTriple(triple: Triple): Promise { invalidAt: triple.statement.invalidAt ? triple.statement.invalidAt.toISOString() : null, + invalidatedBy: triple.statement.invalidatedBy || null, attributes: JSON.stringify(triple.statement.attributes || {}), userId: triple.provenance.userId, space: triple.statement.space || null, @@ -132,6 +135,7 @@ export async function findContradictoryStatements({ createdAt: new Date(statement.createdAt), validAt: new Date(statement.validAt), invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, + invalidatedBy: statement.invalidatedBy || undefined, attributes: statement.attributesJson ? JSON.parse(statement.attributesJson) : {}, @@ -186,6 +190,7 @@ export async function findStatementsWithSameSubjectObject({ createdAt: new Date(statement.createdAt), validAt: new Date(statement.validAt), invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, + invalidatedBy: statement.invalidatedBy || undefined, attributes: statement.attributesJson ? JSON.parse(statement.attributesJson) : {}, @@ -233,7 +238,6 @@ export async function findSimilarStatements({ return result.map((record) => { const statement = record.get("statement").properties; - const score = record.get("score"); return { uuid: statement.uuid, @@ -242,6 +246,7 @@ export async function findSimilarStatements({ createdAt: new Date(statement.createdAt), validAt: new Date(statement.validAt), invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, + invalidatedBy: statement.invalidatedBy || undefined, attributes: statement.attributesJson ? JSON.parse(statement.attributesJson) : {}, @@ -287,6 +292,7 @@ export async function getTripleForStatement({ invalidAt: statementProps.invalidAt ? new Date(statementProps.invalidAt) : null, + invalidatedBy: statementProps.invalidatedBy || undefined, attributes: statementProps.attributesJson ? JSON.parse(statementProps.attributesJson) : {}, @@ -360,17 +366,25 @@ export async function getTripleForStatement({ export async function invalidateStatement({ statementId, invalidAt, + invalidatedBy, }: { statementId: string; invalidAt: string; + invalidatedBy?: string; }) { const query = ` MATCH (statement:Statement {uuid: $statementId}) SET statement.invalidAt = $invalidAt + ${invalidatedBy ? "SET statement.invalidatedBy = $invalidatedBy" : ""} RETURN statement `; - const result = await runQuery(query, { statementId, invalidAt }); + const params = { + statementId, + invalidAt, + ...(invalidatedBy && { invalidatedBy }) + }; + const result = await runQuery(query, params); if (!result || result.length === 0) { return null; @@ -381,13 +395,15 @@ export async function invalidateStatement({ export async function invalidateStatements({ statementIds, + invalidatedBy, }: { statementIds: string[]; + invalidatedBy?: string; }) { const invalidAt = new Date().toISOString(); return statementIds.map( async (statementId) => - await invalidateStatement({ statementId, invalidAt }), + await invalidateStatement({ statementId, invalidAt, invalidatedBy }), ); } @@ -421,7 +437,6 @@ export async function searchStatementsByEmbedding(params: { return result.map((record) => { const statement = record.get("statement").properties; - const score = record.get("score"); return { uuid: statement.uuid, @@ -430,6 +445,7 @@ export async function searchStatementsByEmbedding(params: { createdAt: new Date(statement.createdAt), validAt: new Date(statement.validAt), invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, + invalidatedBy: statement.invalidatedBy || undefined, attributes: statement.attributesJson ? JSON.parse(statement.attributesJson) : {}, diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index dd90ee7..d7019b6 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -251,7 +251,10 @@ export class KnowledgeGraphService { logger.log(`Saved triples in ${saveTriplesTime - updatedTriplesTime} ms`); // Invalidate invalidated statements - await invalidateStatements({ statementIds: invalidatedStatements }); + await invalidateStatements({ + statementIds: invalidatedStatements, + invalidatedBy: episode.uuid + }); const endTime = Date.now(); const processingTimeMs = endTime - startTime; diff --git a/apps/webapp/app/trigger/spaces/space-assignment.ts b/apps/webapp/app/trigger/spaces/space-assignment.ts index 579d0ac..7f80b6a 100644 --- a/apps/webapp/app/trigger/spaces/space-assignment.ts +++ b/apps/webapp/app/trigger/spaces/space-assignment.ts @@ -174,6 +174,7 @@ async function checkAndTriggerSpacePatterns( export const spaceAssignmentTask = task({ id: "space-assignment", + maxDuration: 900, // 15 minutes timeout run: async (payload: SpaceAssignmentPayload) => { const { userId, @@ -471,12 +472,10 @@ async function getStatementsToAnalyze( `; } else { query = ` - MATCH (s:Statement) - WHERE s.userId = $userId - AND s.invalidAt IS NULL - AND s.episodeUuid = $episodeId + MATCH (e:Episode {uuid: $episodeId, userId: $userId})-[:HAS_PROVENANCE]->(s:Statement) + WHERE s.invalidAt IS NULL MATCH (s)-[:HAS_SUBJECT]->(subj:Entity) - MATCH (s)-[:HAS_PREDICATE]->(pred:Entity) + MATCH (s)-[:HAS_PREDICATE]->(pred:Entity) MATCH (s)-[:HAS_OBJECT]->(obj:Entity) RETURN s, subj.name as subject, pred.name as predicate, obj.name as object ORDER BY s.createdAt DESC @@ -561,7 +560,7 @@ async function processBatchAI( }); // Poll for completion with improved handling - const maxPollingTime = 600000; // 10 minutes + const maxPollingTime = 780000; // 10 minutes const pollInterval = 5000; // 5 seconds const startTime = Date.now(); diff --git a/apps/webapp/app/trigger/spaces/space-summary.ts b/apps/webapp/app/trigger/spaces/space-summary.ts index ebd0903..18c48f8 100644 --- a/apps/webapp/app/trigger/spaces/space-summary.ts +++ b/apps/webapp/app/trigger/spaces/space-summary.ts @@ -193,7 +193,7 @@ async function generateSpaceSummary( const statements = await getSpaceStatements( spaceId, userId, - existingSummary?.lastUpdated, + isIncremental ? existingSummary?.lastUpdated : undefined, ); // Handle case where no new statements exist for incremental update @@ -565,17 +565,6 @@ function parseSummaryResponse(response: string): { let jsonContent = outputMatch[1].trim(); - // Clean up common JSON formatting issues from LLM responses - jsonContent = jsonContent - .replace(/[\u0000-\u001F\u007F]/g, "") // Remove control characters - .replace(/\n/g, "\\n") // Escape newlines - .replace(/\r/g, "\\r") // Escape carriage returns - .replace(/\t/g, "\\t") // Escape tabs - .replace(/\\/g, "\\\\") // Escape backslashes (but avoid double escaping) - .replace(/\\\\n/g, "\\n") // Fix double-escaped newlines - .replace(/\\\\r/g, "\\r") // Fix double-escaped carriage returns - .replace(/\\\\t/g, "\\t"); // Fix double-escaped tabs - let parsed; try { parsed = JSON.parse(jsonContent); diff --git a/packages/types/src/graph/graph.entity.ts b/packages/types/src/graph/graph.entity.ts index 1f72a79..45d4684 100644 --- a/packages/types/src/graph/graph.entity.ts +++ b/packages/types/src/graph/graph.entity.ts @@ -50,6 +50,7 @@ export interface StatementNode { createdAt: Date; validAt: Date; invalidAt: Date | null; + invalidatedBy?: string; // UUID of the episode that invalidated this statement attributes: Record; userId: string; space?: string; // Legacy field - deprecated in favor of spaceIds