Feat: cluster drift

This commit is contained in:
Manoj K 2025-08-01 08:44:32 +05:30 committed by Harshith Mullapudi
parent 29ca8b7ad3
commit 6afd993da7

View File

@ -74,7 +74,7 @@ export class ClusteringService {
/**
* Execute Leiden algorithm for community detection on statement similarity graph
*/
async executeLeidentClustering(
async executeLeidenClustering(
userId: string,
incremental: boolean = false,
): Promise<void> {
@ -173,7 +173,7 @@ export class ClusteringService {
let newClustersCreated = 0;
// Run incremental clustering on remaining statements
await this.executeLeidentClustering(userId, true);
await this.executeLeidenClustering(userId, true);
await this.createClusterNodes(userId);
// Count new clusters created
@ -185,9 +185,21 @@ export class ClusteringService {
const newClustersResult = await runQuery(newClustersQuery, { userId });
newClustersCreated = newClustersResult[0]?.get("newClusters") || 0;
const drift = await this.detectClusterDrift(userId);
const newClustersCreatedDrift = 0;
if (drift.driftDetected) {
logger.info("Cluster drift detected, evolving clusters");
const { newClustersCreated: newClustersCreatedDrift, splitClusters } =
await this.handleClusterDrift(userId);
if (splitClusters.length > 0) {
logger.info("Split clusters detected, evolving clusters");
}
}
return {
newStatementsProcessed: unclusteredCount,
newClustersCreated,
newClustersCreated: newClustersCreated + newClustersCreatedDrift,
};
} catch (error) {
logger.error("Error in incremental clustering:", { error });
@ -236,7 +248,7 @@ export class ClusteringService {
);
// Execute complete clustering pipeline
await this.executeLeidentClustering(userId, false);
await this.executeLeidenClustering(userId, false);
await this.createClusterNodes(userId);
// Get results
@ -795,34 +807,295 @@ Based on these distinctive patterns, what is the most accurate name for this sem
}
/**
* Handle cluster evolution when drift is detected
* Handle cluster evolution when drift is detected by splitting low-cohesion clusters
*/
async evolveCluster(oldClusterId: string, userId: string): Promise<string> {
logger.info(`Evolving cluster ${oldClusterId}`);
async evolveCluster(oldClusterId: string, userId: string): Promise<string[]> {
logger.info(`Splitting cluster ${oldClusterId} due to low cohesion`);
const newClusterId = crypto.randomUUID();
try {
// Step 1: Get all statements from the low-cohesion cluster
const statementsQuery = `
MATCH (s:Statement)
WHERE s.clusterId = $oldClusterId AND s.userId = $userId
RETURN collect(s.uuid) as statementIds, count(s) as statementCount
`;
const statementsResult = await runQuery(statementsQuery, {
oldClusterId,
userId,
});
const statementIds = statementsResult[0]?.get("statementIds") || [];
const statementCount = statementsResult[0]?.get("statementCount") || 0;
// Create evolution relationship
const evolutionQuery = `
MATCH (oldCluster:Cluster {uuid: $oldClusterId})
CREATE (newCluster:Cluster {
uuid: $newClusterId,
createdAt: datetime(),
userId: $userId,
size: 0,
needsNaming: true
})
CREATE (oldCluster)-[:EVOLVED_TO {createdAt: datetime()}]->(newCluster)
RETURN newCluster.uuid as uuid
`;
if (statementCount < this.MIN_CLUSTER_SIZE * 2) {
logger.info(
`Cluster ${oldClusterId} too small to split (${statementCount} statements)`,
);
return [oldClusterId]; // Return original cluster if too small to split
}
await runQuery(evolutionQuery, {
oldClusterId,
newClusterId,
userId,
});
// Step 2: Create similarity edges within this cluster's statements
const similarityQuery = `
MATCH (s1:Statement)-[:HAS_SUBJECT|HAS_PREDICATE|HAS_OBJECT]->(e:Entity)<-[:HAS_SUBJECT|HAS_PREDICATE|HAS_OBJECT]-(s2:Statement)
WHERE s1.clusterId = $oldClusterId AND s2.clusterId = $oldClusterId
AND s1.userId = $userId AND s2.userId = $userId
AND s1.invalidAt IS NULL AND s2.invalidAt IS NULL
AND id(s1) < id(s2)
WITH s1, s2, collect(DISTINCT e.uuid) as sharedEntities
WHERE size(sharedEntities) > 0
MERGE (s1)-[r:TEMP_SIMILAR_TO]-(s2)
SET r.weight = size(sharedEntities) * 2,
r.sharedEntities = sharedEntities
RETURN count(r) as edgesCreated
`;
await runQuery(similarityQuery, { oldClusterId, userId });
return newClusterId;
// Step 3: Run Leiden clustering on the cluster's statements
const leidenQuery = `
MATCH (source:Statement) WHERE source.userId = $userId
OPTIONAL MATCH (source)-[r:TEMP_SIMILAR_TO]->(target:Statement)
WHERE target.userId = $userId and target.clusterId = $oldClusterId
WITH gds.graph.project(
'cluster_split_' + $userId + '_' + $oldClusterId,
source,
target,
{
relationshipProperties: r { .weight }
},
{ undirectedRelationshipTypes: ['*'] }
) AS g
CALL gds.leiden.write(
g.graphName,
{
writeProperty: 'tempClusterId',
relationshipWeightProperty: 'weight',
gamma: $gamma,
maxLevels: $maxLevels,
tolerance: $tolerance,
}
)
YIELD communityCount
CALL gds.graph.drop(g.graphName)
YIELD graphName as droppedGraphName
RETURN communityCount, g.nodeCount, g.relationshipCount
`;
const leidenResult = await runQuery(leidenQuery, {
oldClusterId,
userId,
gamma: this.LEIDEN_GAMMA,
maxLevels: this.LEIDEN_MAX_LEVELS,
tolerance: this.LEIDEN_TOLERANCE,
});
const subClusterCount = leidenResult[0]?.get("communityCount") || 1;
// Step 4: Create new cluster IDs for sub-clusters that meet minimum size
const newClusterIds: string[] = [];
const assignClustersQuery = `
MATCH (s:Statement)
WHERE s.clusterId = $oldClusterId AND s.userId = $userId AND s.tempClusterId IS NOT NULL
WITH s.tempClusterId as tempId, collect(s) as statements
WHERE size(statements) >= $minSize
WITH tempId, statements, randomUUID() as newClusterId
FOREACH (stmt IN statements |
SET stmt.clusterId = newClusterId
REMOVE stmt.tempClusterId
)
RETURN collect(newClusterId) as newClusterIds, count(DISTINCT newClusterId) as validSubClusters
`;
const assignResult = await runQuery(assignClustersQuery, {
oldClusterId,
userId,
minSize: this.MIN_CLUSTER_SIZE,
});
const validNewClusterIds = assignResult[0]?.get("newClusterIds") || [];
newClusterIds.push(...validNewClusterIds);
// Step 5: Handle statements that didn't make it into valid sub-clusters
const orphanQuery = `
MATCH (s:Statement)
WHERE s.clusterId = $oldClusterId AND s.userId = $userId
REMOVE s.tempClusterId
// If we have valid sub-clusters, assign orphans to the largest one
WITH count(s) as orphanCount
MATCH (s2:Statement)
WHERE s2.clusterId IN $newClusterIds AND s2.userId = $userId
WITH s2.clusterId as clusterId, count(s2) as clusterSize, orphanCount
ORDER BY clusterSize DESC
LIMIT 1
MATCH (orphan:Statement)
WHERE orphan.clusterId = $oldClusterId AND orphan.userId = $userId
SET orphan.clusterId = clusterId
RETURN count(orphan) as orphansReassigned
`;
if (newClusterIds.length > 0) {
await runQuery(orphanQuery, { oldClusterId, userId, newClusterIds });
}
// Step 6: Create new Cluster nodes and evolution relationships
if (newClusterIds.length > 1) {
const createClustersQuery = `
MATCH (oldCluster:Cluster {uuid: $oldClusterId})
UNWIND $newClusterIds as newClusterId
MATCH (s:Statement {clusterId: newClusterId, userId: $userId})
WITH oldCluster, newClusterId, count(s) as statementCount
CREATE (newCluster:Cluster {
uuid: newClusterId,
createdAt: datetime(),
userId: $userId,
size: statementCount,
needsNaming: true,
aspectType: oldCluster.aspectType
})
CREATE (oldCluster)-[:SPLIT_INTO {
createdAt: datetime(),
reason: 'low_cohesion',
originalSize: $originalSize,
newSize: statementCount
}]->(newCluster)
RETURN count(newCluster) as clustersCreated
`;
await runQuery(createClustersQuery, {
oldClusterId,
newClusterIds,
originalSize: statementCount,
userId,
});
// Mark old cluster as evolved
await runQuery(
`
MATCH (c:Cluster {uuid: $oldClusterId})
SET c.evolved = true, c.evolvedAt = datetime()
`,
{ oldClusterId },
);
logger.info(
`Successfully split cluster ${oldClusterId} into ${newClusterIds.length} sub-clusters`,
);
} else {
logger.info(`Cluster ${oldClusterId} could not be meaningfully split`);
newClusterIds.push(oldClusterId); // Keep original if splitting didn't work
}
// Step 7: Clean up temporary relationships
await runQuery(
`
MATCH ()-[r:TEMP_SIMILAR_TO]-()
DELETE r
`,
{},
);
return newClusterIds;
} catch (error) {
logger.error(`Error splitting cluster ${oldClusterId}:`, { error });
// Clean up on error
await runQuery(
`
MATCH ()-[r:TEMP_SIMILAR_TO]-()
DELETE r
MATCH (s:Statement)
WHERE s.clusterId = $oldClusterId AND s.userId = $userId
REMOVE s.tempClusterId
`,
{ oldClusterId, userId },
);
throw error;
}
}
/**
* Handle drift by splitting low-cohesion clusters
*/
async handleClusterDrift(userId: string): Promise<{
clustersProcessed: number;
newClustersCreated: number;
splitClusters: string[];
}> {
logger.info(`Handling cluster drift for user ${userId}`);
try {
// Detect drift and get low-cohesion clusters
const driftMetrics = await this.detectClusterDrift(userId);
if (
!driftMetrics.driftDetected ||
driftMetrics.lowCohesionClusters.length === 0
) {
logger.info("No drift detected or no clusters need splitting");
return {
clustersProcessed: 0,
newClustersCreated: 0,
splitClusters: [],
};
}
logger.info(
`Found ${driftMetrics.lowCohesionClusters.length} clusters with low cohesion`,
);
let totalNewClusters = 0;
const splitClusters: string[] = [];
// Process each low-cohesion cluster
for (const clusterId of driftMetrics.lowCohesionClusters) {
try {
const newClusterIds = await this.evolveCluster(clusterId, userId);
if (newClusterIds.length > 1) {
// Cluster was successfully split
totalNewClusters += newClusterIds.length;
splitClusters.push(clusterId);
logger.info(
`Split cluster ${clusterId} into ${newClusterIds.length} sub-clusters`,
);
} else {
logger.info(`Cluster ${clusterId} could not be split meaningfully`);
}
} catch (error) {
logger.error(`Failed to split cluster ${clusterId}:`, { error });
// Continue with other clusters even if one fails
}
}
// Update cluster embeddings for new clusters
if (totalNewClusters > 0) {
await this.updateClusterEmbeddings(userId);
await this.generateClusterNames(userId);
}
logger.info(
`Drift handling completed: ${splitClusters.length} clusters split, ${totalNewClusters} new clusters created`,
);
return {
clustersProcessed: splitClusters.length,
newClustersCreated: totalNewClusters,
splitClusters,
};
} catch (error) {
logger.error("Error handling cluster drift:", { error });
throw error;
}
}
/**