feat: add invalidatedBy field to statement model and optimize space queries

This commit is contained in:
Manoj K 2025-08-28 14:12:27 +05:30 committed by Harshith Mullapudi
parent 9568b97510
commit 9e0665c086
7 changed files with 64 additions and 37 deletions

View File

@ -336,6 +336,26 @@ const initializeSchema = async () => {
"CREATE INDEX cluster_aspect_type IF NOT EXISTS FOR (n:Cluster) ON (n.aspectType)",
);
// Space-optimized indexes for better query performance
await runQuery(
"CREATE INDEX space_user_uuid IF NOT EXISTS FOR (n:Space) ON (n.userId, n.uuid)",
);
await runQuery(
"CREATE INDEX space_user_active IF NOT EXISTS FOR (n:Space) ON (n.userId, n.isActive)",
);
await runQuery(
"CREATE INDEX statement_user_spaces IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.spaceIds)",
);
await runQuery(
"CREATE INDEX statement_user_invalid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.invalidAt)",
);
await runQuery(
"CREATE INDEX statement_user_uuid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.uuid)",
);
await runQuery(
"CREATE INDEX entity_user_uuid IF NOT EXISTS FOR (n:Entity) ON (n.userId, n.uuid)",
);
// Create vector indexes for semantic search (if using Neo4j 5.0+)
await runQuery(`
CREATE VECTOR INDEX entity_embedding IF NOT EXISTS FOR (n:Entity) ON n.nameEmbedding

View File

@ -57,9 +57,9 @@ export async function getSpace(
MATCH (s:Space {uuid: $spaceId, userId: $userId})
WHERE s.isActive = true
// Count statements in this space
OPTIONAL MATCH (stmt:Statement)
WHERE stmt.userId = $userId AND s.id IN stmt.spaceIds
// Count statements in this space using optimized approach
OPTIONAL MATCH (stmt:Statement {userId: $userId})
WHERE stmt.spaceIds IS NOT NULL AND $spaceId IN stmt.spaceIds AND stmt.invalidAt IS NULL
WITH s, count(stmt) as statementCount
RETURN s, statementCount
@ -152,8 +152,8 @@ export async function deleteSpace(
// 2. Clean up statement references (remove spaceId from spaceIds arrays)
const cleanupQuery = `
MATCH (s:Statement)
WHERE s.userId = $userId AND $spaceId IN s.spaceIds
MATCH (s:Statement {userId: $userId})
WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds
SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId]
RETURN count(s) as updatedStatements
`;
@ -203,8 +203,8 @@ export async function assignStatementsToSpace(
}
const query = `
MATCH (s:Statement)
WHERE s.uuid IN $statementIds AND s.userId = $userId
MATCH (s:Statement {userId: $userId})
WHERE s.uuid IN $statementIds
SET s.spaceIds = CASE
WHEN s.spaceIds IS NULL THEN [$spaceId]
WHEN $spaceId IN s.spaceIds THEN s.spaceIds
@ -244,8 +244,8 @@ export async function removeStatementsFromSpace(
): Promise<SpaceAssignmentResult> {
try {
const query = `
MATCH (s:Statement)
WHERE s.uuid IN $statementIds AND s.userId = $userId AND $spaceId IN s.spaceIds
MATCH (s:Statement {userId: $userId})
WHERE s.uuid IN $statementIds AND s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds
SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId]
RETURN count(s) as updated
`;
@ -271,8 +271,8 @@ export async function removeStatementsFromSpace(
*/
export async function getSpaceStatements(spaceId: string, userId: string) {
const query = `
MATCH (s:Statement)
WHERE s.userId = $userId AND s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds
MATCH (s:Statement {userId: $userId})
WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds AND s.invalidAt IS NULL
MATCH (s)-[:HAS_SUBJECT]->(subj:Entity)
MATCH (s)-[:HAS_PREDICATE]->(pred:Entity)
MATCH (s)-[:HAS_OBJECT]->(obj:Entity)
@ -309,9 +309,8 @@ export async function getSpaceStatementCount(
userId: string,
): Promise<number> {
const query = `
MATCH (s:Statement)
WHERE s.userId = $userId
AND s.spaceIds IS NOT NULL
MATCH (s:Statement {userId: $userId})
WHERE s.spaceIds IS NOT NULL
AND $spaceId IN s.spaceIds
AND s.invalidAt IS NULL
RETURN count(s) as statementCount

View File

@ -22,6 +22,7 @@ export async function saveTriple(triple: Triple): Promise<string> {
n.createdAt = $createdAt,
n.validAt = $validAt,
n.invalidAt = $invalidAt,
n.invalidatedBy = $invalidatedBy,
n.attributes = $attributes,
n.userId = $userId,
n.space = $space
@ -30,6 +31,7 @@ export async function saveTriple(triple: Triple): Promise<string> {
n.factEmbedding = $factEmbedding,
n.validAt = $validAt,
n.invalidAt = $invalidAt,
n.invalidatedBy = $invalidatedBy,
n.attributes = $attributes,
n.space = $space
RETURN n.uuid as uuid
@ -44,6 +46,7 @@ export async function saveTriple(triple: Triple): Promise<string> {
invalidAt: triple.statement.invalidAt
? triple.statement.invalidAt.toISOString()
: null,
invalidatedBy: triple.statement.invalidatedBy || null,
attributes: JSON.stringify(triple.statement.attributes || {}),
userId: triple.provenance.userId,
space: triple.statement.space || null,
@ -132,6 +135,7 @@ export async function findContradictoryStatements({
createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson
? JSON.parse(statement.attributesJson)
: {},
@ -186,6 +190,7 @@ export async function findStatementsWithSameSubjectObject({
createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson
? JSON.parse(statement.attributesJson)
: {},
@ -233,7 +238,6 @@ export async function findSimilarStatements({
return result.map((record) => {
const statement = record.get("statement").properties;
const score = record.get("score");
return {
uuid: statement.uuid,
@ -242,6 +246,7 @@ export async function findSimilarStatements({
createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson
? JSON.parse(statement.attributesJson)
: {},
@ -287,6 +292,7 @@ export async function getTripleForStatement({
invalidAt: statementProps.invalidAt
? new Date(statementProps.invalidAt)
: null,
invalidatedBy: statementProps.invalidatedBy || undefined,
attributes: statementProps.attributesJson
? JSON.parse(statementProps.attributesJson)
: {},
@ -360,17 +366,25 @@ export async function getTripleForStatement({
export async function invalidateStatement({
statementId,
invalidAt,
invalidatedBy,
}: {
statementId: string;
invalidAt: string;
invalidatedBy?: string;
}) {
const query = `
MATCH (statement:Statement {uuid: $statementId})
SET statement.invalidAt = $invalidAt
${invalidatedBy ? "SET statement.invalidatedBy = $invalidatedBy" : ""}
RETURN statement
`;
const result = await runQuery(query, { statementId, invalidAt });
const params = {
statementId,
invalidAt,
...(invalidatedBy && { invalidatedBy })
};
const result = await runQuery(query, params);
if (!result || result.length === 0) {
return null;
@ -381,13 +395,15 @@ export async function invalidateStatement({
export async function invalidateStatements({
statementIds,
invalidatedBy,
}: {
statementIds: string[];
invalidatedBy?: string;
}) {
const invalidAt = new Date().toISOString();
return statementIds.map(
async (statementId) =>
await invalidateStatement({ statementId, invalidAt }),
await invalidateStatement({ statementId, invalidAt, invalidatedBy }),
);
}
@ -421,7 +437,6 @@ export async function searchStatementsByEmbedding(params: {
return result.map((record) => {
const statement = record.get("statement").properties;
const score = record.get("score");
return {
uuid: statement.uuid,
@ -430,6 +445,7 @@ export async function searchStatementsByEmbedding(params: {
createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson
? JSON.parse(statement.attributesJson)
: {},

View File

@ -251,7 +251,10 @@ export class KnowledgeGraphService {
logger.log(`Saved triples in ${saveTriplesTime - updatedTriplesTime} ms`);
// Invalidate invalidated statements
await invalidateStatements({ statementIds: invalidatedStatements });
await invalidateStatements({
statementIds: invalidatedStatements,
invalidatedBy: episode.uuid
});
const endTime = Date.now();
const processingTimeMs = endTime - startTime;

View File

@ -174,6 +174,7 @@ async function checkAndTriggerSpacePatterns(
export const spaceAssignmentTask = task({
id: "space-assignment",
maxDuration: 900, // 15 minutes timeout
run: async (payload: SpaceAssignmentPayload) => {
const {
userId,
@ -471,12 +472,10 @@ async function getStatementsToAnalyze(
`;
} else {
query = `
MATCH (s:Statement)
WHERE s.userId = $userId
AND s.invalidAt IS NULL
AND s.episodeUuid = $episodeId
MATCH (e:Episode {uuid: $episodeId, userId: $userId})-[:HAS_PROVENANCE]->(s:Statement)
WHERE s.invalidAt IS NULL
MATCH (s)-[:HAS_SUBJECT]->(subj:Entity)
MATCH (s)-[:HAS_PREDICATE]->(pred:Entity)
MATCH (s)-[:HAS_PREDICATE]->(pred:Entity)
MATCH (s)-[:HAS_OBJECT]->(obj:Entity)
RETURN s, subj.name as subject, pred.name as predicate, obj.name as object
ORDER BY s.createdAt DESC
@ -561,7 +560,7 @@ async function processBatchAI(
});
// Poll for completion with improved handling
const maxPollingTime = 600000; // 10 minutes
const maxPollingTime = 780000; // 10 minutes
const pollInterval = 5000; // 5 seconds
const startTime = Date.now();

View File

@ -193,7 +193,7 @@ async function generateSpaceSummary(
const statements = await getSpaceStatements(
spaceId,
userId,
existingSummary?.lastUpdated,
isIncremental ? existingSummary?.lastUpdated : undefined,
);
// Handle case where no new statements exist for incremental update
@ -565,17 +565,6 @@ function parseSummaryResponse(response: string): {
let jsonContent = outputMatch[1].trim();
// Clean up common JSON formatting issues from LLM responses
jsonContent = jsonContent
.replace(/[\u0000-\u001F\u007F]/g, "") // Remove control characters
.replace(/\n/g, "\\n") // Escape newlines
.replace(/\r/g, "\\r") // Escape carriage returns
.replace(/\t/g, "\\t") // Escape tabs
.replace(/\\/g, "\\\\") // Escape backslashes (but avoid double escaping)
.replace(/\\\\n/g, "\\n") // Fix double-escaped newlines
.replace(/\\\\r/g, "\\r") // Fix double-escaped carriage returns
.replace(/\\\\t/g, "\\t"); // Fix double-escaped tabs
let parsed;
try {
parsed = JSON.parse(jsonContent);

View File

@ -50,6 +50,7 @@ export interface StatementNode {
createdAt: Date;
validAt: Date;
invalidAt: Date | null;
invalidatedBy?: string; // UUID of the episode that invalidated this statement
attributes: Record<string, any>;
userId: string;
space?: string; // Legacy field - deprecated in favor of spaceIds