feat: add invalidatedBy field to statement model and optimize space queries

This commit is contained in:
Manoj K 2025-08-28 14:12:27 +05:30
parent 26a2d04ca9
commit 14d38b0171
7 changed files with 64 additions and 37 deletions

View File

@ -336,6 +336,26 @@ const initializeSchema = async () => {
"CREATE INDEX cluster_aspect_type IF NOT EXISTS FOR (n:Cluster) ON (n.aspectType)", "CREATE INDEX cluster_aspect_type IF NOT EXISTS FOR (n:Cluster) ON (n.aspectType)",
); );
// Space-optimized indexes for better query performance
await runQuery(
"CREATE INDEX space_user_uuid IF NOT EXISTS FOR (n:Space) ON (n.userId, n.uuid)",
);
await runQuery(
"CREATE INDEX space_user_active IF NOT EXISTS FOR (n:Space) ON (n.userId, n.isActive)",
);
await runQuery(
"CREATE INDEX statement_user_spaces IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.spaceIds)",
);
await runQuery(
"CREATE INDEX statement_user_invalid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.invalidAt)",
);
await runQuery(
"CREATE INDEX statement_user_uuid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.uuid)",
);
await runQuery(
"CREATE INDEX entity_user_uuid IF NOT EXISTS FOR (n:Entity) ON (n.userId, n.uuid)",
);
// Create vector indexes for semantic search (if using Neo4j 5.0+) // Create vector indexes for semantic search (if using Neo4j 5.0+)
await runQuery(` await runQuery(`
CREATE VECTOR INDEX entity_embedding IF NOT EXISTS FOR (n:Entity) ON n.nameEmbedding CREATE VECTOR INDEX entity_embedding IF NOT EXISTS FOR (n:Entity) ON n.nameEmbedding

View File

@ -57,9 +57,9 @@ export async function getSpace(
MATCH (s:Space {uuid: $spaceId, userId: $userId}) MATCH (s:Space {uuid: $spaceId, userId: $userId})
WHERE s.isActive = true WHERE s.isActive = true
// Count statements in this space // Count statements in this space using optimized approach
OPTIONAL MATCH (stmt:Statement) OPTIONAL MATCH (stmt:Statement {userId: $userId})
WHERE stmt.userId = $userId AND s.id IN stmt.spaceIds WHERE stmt.spaceIds IS NOT NULL AND $spaceId IN stmt.spaceIds AND stmt.invalidAt IS NULL
WITH s, count(stmt) as statementCount WITH s, count(stmt) as statementCount
RETURN s, statementCount RETURN s, statementCount
@ -152,8 +152,8 @@ export async function deleteSpace(
// 2. Clean up statement references (remove spaceId from spaceIds arrays) // 2. Clean up statement references (remove spaceId from spaceIds arrays)
const cleanupQuery = ` const cleanupQuery = `
MATCH (s:Statement) MATCH (s:Statement {userId: $userId})
WHERE s.userId = $userId AND $spaceId IN s.spaceIds WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds
SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId] SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId]
RETURN count(s) as updatedStatements RETURN count(s) as updatedStatements
`; `;
@ -203,8 +203,8 @@ export async function assignStatementsToSpace(
} }
const query = ` const query = `
MATCH (s:Statement) MATCH (s:Statement {userId: $userId})
WHERE s.uuid IN $statementIds AND s.userId = $userId WHERE s.uuid IN $statementIds
SET s.spaceIds = CASE SET s.spaceIds = CASE
WHEN s.spaceIds IS NULL THEN [$spaceId] WHEN s.spaceIds IS NULL THEN [$spaceId]
WHEN $spaceId IN s.spaceIds THEN s.spaceIds WHEN $spaceId IN s.spaceIds THEN s.spaceIds
@ -244,8 +244,8 @@ export async function removeStatementsFromSpace(
): Promise<SpaceAssignmentResult> { ): Promise<SpaceAssignmentResult> {
try { try {
const query = ` const query = `
MATCH (s:Statement) MATCH (s:Statement {userId: $userId})
WHERE s.uuid IN $statementIds AND s.userId = $userId AND $spaceId IN s.spaceIds WHERE s.uuid IN $statementIds AND s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds
SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId] SET s.spaceIds = [id IN s.spaceIds WHERE id <> $spaceId]
RETURN count(s) as updated RETURN count(s) as updated
`; `;
@ -271,8 +271,8 @@ export async function removeStatementsFromSpace(
*/ */
export async function getSpaceStatements(spaceId: string, userId: string) { export async function getSpaceStatements(spaceId: string, userId: string) {
const query = ` const query = `
MATCH (s:Statement) MATCH (s:Statement {userId: $userId})
WHERE s.userId = $userId AND s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds WHERE s.spaceIds IS NOT NULL AND $spaceId IN s.spaceIds AND s.invalidAt IS NULL
MATCH (s)-[:HAS_SUBJECT]->(subj:Entity) MATCH (s)-[:HAS_SUBJECT]->(subj:Entity)
MATCH (s)-[:HAS_PREDICATE]->(pred:Entity) MATCH (s)-[:HAS_PREDICATE]->(pred:Entity)
MATCH (s)-[:HAS_OBJECT]->(obj:Entity) MATCH (s)-[:HAS_OBJECT]->(obj:Entity)
@ -309,9 +309,8 @@ export async function getSpaceStatementCount(
userId: string, userId: string,
): Promise<number> { ): Promise<number> {
const query = ` const query = `
MATCH (s:Statement) MATCH (s:Statement {userId: $userId})
WHERE s.userId = $userId WHERE s.spaceIds IS NOT NULL
AND s.spaceIds IS NOT NULL
AND $spaceId IN s.spaceIds AND $spaceId IN s.spaceIds
AND s.invalidAt IS NULL AND s.invalidAt IS NULL
RETURN count(s) as statementCount RETURN count(s) as statementCount

View File

@ -22,6 +22,7 @@ export async function saveTriple(triple: Triple): Promise<string> {
n.createdAt = $createdAt, n.createdAt = $createdAt,
n.validAt = $validAt, n.validAt = $validAt,
n.invalidAt = $invalidAt, n.invalidAt = $invalidAt,
n.invalidatedBy = $invalidatedBy,
n.attributes = $attributes, n.attributes = $attributes,
n.userId = $userId, n.userId = $userId,
n.space = $space n.space = $space
@ -30,6 +31,7 @@ export async function saveTriple(triple: Triple): Promise<string> {
n.factEmbedding = $factEmbedding, n.factEmbedding = $factEmbedding,
n.validAt = $validAt, n.validAt = $validAt,
n.invalidAt = $invalidAt, n.invalidAt = $invalidAt,
n.invalidatedBy = $invalidatedBy,
n.attributes = $attributes, n.attributes = $attributes,
n.space = $space n.space = $space
RETURN n.uuid as uuid RETURN n.uuid as uuid
@ -44,6 +46,7 @@ export async function saveTriple(triple: Triple): Promise<string> {
invalidAt: triple.statement.invalidAt invalidAt: triple.statement.invalidAt
? triple.statement.invalidAt.toISOString() ? triple.statement.invalidAt.toISOString()
: null, : null,
invalidatedBy: triple.statement.invalidatedBy || null,
attributes: JSON.stringify(triple.statement.attributes || {}), attributes: JSON.stringify(triple.statement.attributes || {}),
userId: triple.provenance.userId, userId: triple.provenance.userId,
space: triple.statement.space || null, space: triple.statement.space || null,
@ -132,6 +135,7 @@ export async function findContradictoryStatements({
createdAt: new Date(statement.createdAt), createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt), validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson attributes: statement.attributesJson
? JSON.parse(statement.attributesJson) ? JSON.parse(statement.attributesJson)
: {}, : {},
@ -186,6 +190,7 @@ export async function findStatementsWithSameSubjectObject({
createdAt: new Date(statement.createdAt), createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt), validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson attributes: statement.attributesJson
? JSON.parse(statement.attributesJson) ? JSON.parse(statement.attributesJson)
: {}, : {},
@ -233,7 +238,6 @@ export async function findSimilarStatements({
return result.map((record) => { return result.map((record) => {
const statement = record.get("statement").properties; const statement = record.get("statement").properties;
const score = record.get("score");
return { return {
uuid: statement.uuid, uuid: statement.uuid,
@ -242,6 +246,7 @@ export async function findSimilarStatements({
createdAt: new Date(statement.createdAt), createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt), validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson attributes: statement.attributesJson
? JSON.parse(statement.attributesJson) ? JSON.parse(statement.attributesJson)
: {}, : {},
@ -287,6 +292,7 @@ export async function getTripleForStatement({
invalidAt: statementProps.invalidAt invalidAt: statementProps.invalidAt
? new Date(statementProps.invalidAt) ? new Date(statementProps.invalidAt)
: null, : null,
invalidatedBy: statementProps.invalidatedBy || undefined,
attributes: statementProps.attributesJson attributes: statementProps.attributesJson
? JSON.parse(statementProps.attributesJson) ? JSON.parse(statementProps.attributesJson)
: {}, : {},
@ -360,17 +366,25 @@ export async function getTripleForStatement({
export async function invalidateStatement({ export async function invalidateStatement({
statementId, statementId,
invalidAt, invalidAt,
invalidatedBy,
}: { }: {
statementId: string; statementId: string;
invalidAt: string; invalidAt: string;
invalidatedBy?: string;
}) { }) {
const query = ` const query = `
MATCH (statement:Statement {uuid: $statementId}) MATCH (statement:Statement {uuid: $statementId})
SET statement.invalidAt = $invalidAt SET statement.invalidAt = $invalidAt
${invalidatedBy ? "SET statement.invalidatedBy = $invalidatedBy" : ""}
RETURN statement RETURN statement
`; `;
const result = await runQuery(query, { statementId, invalidAt }); const params = {
statementId,
invalidAt,
...(invalidatedBy && { invalidatedBy })
};
const result = await runQuery(query, params);
if (!result || result.length === 0) { if (!result || result.length === 0) {
return null; return null;
@ -381,13 +395,15 @@ export async function invalidateStatement({
export async function invalidateStatements({ export async function invalidateStatements({
statementIds, statementIds,
invalidatedBy,
}: { }: {
statementIds: string[]; statementIds: string[];
invalidatedBy?: string;
}) { }) {
const invalidAt = new Date().toISOString(); const invalidAt = new Date().toISOString();
return statementIds.map( return statementIds.map(
async (statementId) => async (statementId) =>
await invalidateStatement({ statementId, invalidAt }), await invalidateStatement({ statementId, invalidAt, invalidatedBy }),
); );
} }
@ -421,7 +437,6 @@ export async function searchStatementsByEmbedding(params: {
return result.map((record) => { return result.map((record) => {
const statement = record.get("statement").properties; const statement = record.get("statement").properties;
const score = record.get("score");
return { return {
uuid: statement.uuid, uuid: statement.uuid,
@ -430,6 +445,7 @@ export async function searchStatementsByEmbedding(params: {
createdAt: new Date(statement.createdAt), createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt), validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
invalidatedBy: statement.invalidatedBy || undefined,
attributes: statement.attributesJson attributes: statement.attributesJson
? JSON.parse(statement.attributesJson) ? JSON.parse(statement.attributesJson)
: {}, : {},

View File

@ -251,7 +251,10 @@ export class KnowledgeGraphService {
logger.log(`Saved triples in ${saveTriplesTime - updatedTriplesTime} ms`); logger.log(`Saved triples in ${saveTriplesTime - updatedTriplesTime} ms`);
// Invalidate invalidated statements // Invalidate invalidated statements
await invalidateStatements({ statementIds: invalidatedStatements }); await invalidateStatements({
statementIds: invalidatedStatements,
invalidatedBy: episode.uuid
});
const endTime = Date.now(); const endTime = Date.now();
const processingTimeMs = endTime - startTime; const processingTimeMs = endTime - startTime;

View File

@ -174,6 +174,7 @@ async function checkAndTriggerSpacePatterns(
export const spaceAssignmentTask = task({ export const spaceAssignmentTask = task({
id: "space-assignment", id: "space-assignment",
maxDuration: 900, // 15 minutes timeout
run: async (payload: SpaceAssignmentPayload) => { run: async (payload: SpaceAssignmentPayload) => {
const { const {
userId, userId,
@ -471,10 +472,8 @@ async function getStatementsToAnalyze(
`; `;
} else { } else {
query = ` query = `
MATCH (s:Statement) MATCH (e:Episode {uuid: $episodeId, userId: $userId})-[:HAS_PROVENANCE]->(s:Statement)
WHERE s.userId = $userId WHERE s.invalidAt IS NULL
AND s.invalidAt IS NULL
AND s.episodeUuid = $episodeId
MATCH (s)-[:HAS_SUBJECT]->(subj:Entity) MATCH (s)-[:HAS_SUBJECT]->(subj:Entity)
MATCH (s)-[:HAS_PREDICATE]->(pred:Entity) MATCH (s)-[:HAS_PREDICATE]->(pred:Entity)
MATCH (s)-[:HAS_OBJECT]->(obj:Entity) MATCH (s)-[:HAS_OBJECT]->(obj:Entity)
@ -561,7 +560,7 @@ async function processBatchAI(
}); });
// Poll for completion with improved handling // Poll for completion with improved handling
const maxPollingTime = 600000; // 10 minutes const maxPollingTime = 780000; // 10 minutes
const pollInterval = 5000; // 5 seconds const pollInterval = 5000; // 5 seconds
const startTime = Date.now(); const startTime = Date.now();

View File

@ -193,7 +193,7 @@ async function generateSpaceSummary(
const statements = await getSpaceStatements( const statements = await getSpaceStatements(
spaceId, spaceId,
userId, userId,
existingSummary?.lastUpdated, isIncremental ? existingSummary?.lastUpdated : undefined,
); );
// Handle case where no new statements exist for incremental update // Handle case where no new statements exist for incremental update
@ -565,17 +565,6 @@ function parseSummaryResponse(response: string): {
let jsonContent = outputMatch[1].trim(); let jsonContent = outputMatch[1].trim();
// Clean up common JSON formatting issues from LLM responses
jsonContent = jsonContent
.replace(/[\u0000-\u001F\u007F]/g, "") // Remove control characters
.replace(/\n/g, "\\n") // Escape newlines
.replace(/\r/g, "\\r") // Escape carriage returns
.replace(/\t/g, "\\t") // Escape tabs
.replace(/\\/g, "\\\\") // Escape backslashes (but avoid double escaping)
.replace(/\\\\n/g, "\\n") // Fix double-escaped newlines
.replace(/\\\\r/g, "\\r") // Fix double-escaped carriage returns
.replace(/\\\\t/g, "\\t"); // Fix double-escaped tabs
let parsed; let parsed;
try { try {
parsed = JSON.parse(jsonContent); parsed = JSON.parse(jsonContent);

View File

@ -50,6 +50,7 @@ export interface StatementNode {
createdAt: Date; createdAt: Date;
validAt: Date; validAt: Date;
invalidAt: Date | null; invalidAt: Date | null;
invalidatedBy?: string; // UUID of the episode that invalidated this statement
attributes: Record<string, any>; attributes: Record<string, any>;
userId: string; userId: string;
space?: string; // Legacy field - deprecated in favor of spaceIds space?: string; // Legacy field - deprecated in favor of spaceIds