From 6afd993da7efe2d29414365943a59451ef5f7df8 Mon Sep 17 00:00:00 2001 From: Manoj K Date: Fri, 1 Aug 2025 08:44:32 +0530 Subject: [PATCH] Feat: cluster drift --- apps/webapp/app/services/clustering.server.ts | 327 ++++++++++++++++-- 1 file changed, 300 insertions(+), 27 deletions(-) diff --git a/apps/webapp/app/services/clustering.server.ts b/apps/webapp/app/services/clustering.server.ts index 61e903f..bca1d6f 100644 --- a/apps/webapp/app/services/clustering.server.ts +++ b/apps/webapp/app/services/clustering.server.ts @@ -74,7 +74,7 @@ export class ClusteringService { /** * Execute Leiden algorithm for community detection on statement similarity graph */ - async executeLeidentClustering( + async executeLeidenClustering( userId: string, incremental: boolean = false, ): Promise { @@ -173,7 +173,7 @@ export class ClusteringService { let newClustersCreated = 0; // Run incremental clustering on remaining statements - await this.executeLeidentClustering(userId, true); + await this.executeLeidenClustering(userId, true); await this.createClusterNodes(userId); // Count new clusters created @@ -185,9 +185,21 @@ export class ClusteringService { const newClustersResult = await runQuery(newClustersQuery, { userId }); newClustersCreated = newClustersResult[0]?.get("newClusters") || 0; + const drift = await this.detectClusterDrift(userId); + const newClustersCreatedDrift = 0; + if (drift.driftDetected) { + logger.info("Cluster drift detected, evolving clusters"); + const { newClustersCreated: newClustersCreatedDrift, splitClusters } = + await this.handleClusterDrift(userId); + + if (splitClusters.length > 0) { + logger.info("Split clusters detected, evolving clusters"); + } + } + return { newStatementsProcessed: unclusteredCount, - newClustersCreated, + newClustersCreated: newClustersCreated + newClustersCreatedDrift, }; } catch (error) { logger.error("Error in incremental clustering:", { error }); @@ -236,7 +248,7 @@ export class ClusteringService { ); // Execute complete clustering pipeline - await this.executeLeidentClustering(userId, false); + await this.executeLeidenClustering(userId, false); await this.createClusterNodes(userId); // Get results @@ -795,34 +807,295 @@ Based on these distinctive patterns, what is the most accurate name for this sem } /** - * Handle cluster evolution when drift is detected + * Handle cluster evolution when drift is detected by splitting low-cohesion clusters */ - async evolveCluster(oldClusterId: string, userId: string): Promise { - logger.info(`Evolving cluster ${oldClusterId}`); + async evolveCluster(oldClusterId: string, userId: string): Promise { + logger.info(`Splitting cluster ${oldClusterId} due to low cohesion`); - const newClusterId = crypto.randomUUID(); + try { + // Step 1: Get all statements from the low-cohesion cluster + const statementsQuery = ` + MATCH (s:Statement) + WHERE s.clusterId = $oldClusterId AND s.userId = $userId + RETURN collect(s.uuid) as statementIds, count(s) as statementCount + `; + const statementsResult = await runQuery(statementsQuery, { + oldClusterId, + userId, + }); + const statementIds = statementsResult[0]?.get("statementIds") || []; + const statementCount = statementsResult[0]?.get("statementCount") || 0; - // Create evolution relationship - const evolutionQuery = ` - MATCH (oldCluster:Cluster {uuid: $oldClusterId}) - CREATE (newCluster:Cluster { - uuid: $newClusterId, - createdAt: datetime(), - userId: $userId, - size: 0, - needsNaming: true - }) - CREATE (oldCluster)-[:EVOLVED_TO {createdAt: datetime()}]->(newCluster) - RETURN newCluster.uuid as uuid - `; + if (statementCount < this.MIN_CLUSTER_SIZE * 2) { + logger.info( + `Cluster ${oldClusterId} too small to split (${statementCount} statements)`, + ); + return [oldClusterId]; // Return original cluster if too small to split + } - await runQuery(evolutionQuery, { - oldClusterId, - newClusterId, - userId, - }); + // Step 2: Create similarity edges within this cluster's statements + const similarityQuery = ` + MATCH (s1:Statement)-[:HAS_SUBJECT|HAS_PREDICATE|HAS_OBJECT]->(e:Entity)<-[:HAS_SUBJECT|HAS_PREDICATE|HAS_OBJECT]-(s2:Statement) + WHERE s1.clusterId = $oldClusterId AND s2.clusterId = $oldClusterId + AND s1.userId = $userId AND s2.userId = $userId + AND s1.invalidAt IS NULL AND s2.invalidAt IS NULL + AND id(s1) < id(s2) + WITH s1, s2, collect(DISTINCT e.uuid) as sharedEntities + WHERE size(sharedEntities) > 0 + MERGE (s1)-[r:TEMP_SIMILAR_TO]-(s2) + SET r.weight = size(sharedEntities) * 2, + r.sharedEntities = sharedEntities + RETURN count(r) as edgesCreated + `; + await runQuery(similarityQuery, { oldClusterId, userId }); - return newClusterId; + // Step 3: Run Leiden clustering on the cluster's statements + const leidenQuery = ` + MATCH (source:Statement) WHERE source.userId = $userId + OPTIONAL MATCH (source)-[r:TEMP_SIMILAR_TO]->(target:Statement) + WHERE target.userId = $userId and target.clusterId = $oldClusterId + WITH gds.graph.project( + 'cluster_split_' + $userId + '_' + $oldClusterId, + source, + target, + { + relationshipProperties: r { .weight } + }, + { undirectedRelationshipTypes: ['*'] } + ) AS g + + CALL gds.leiden.write( + g.graphName, + { + writeProperty: 'tempClusterId', + relationshipWeightProperty: 'weight', + gamma: $gamma, + maxLevels: $maxLevels, + tolerance: $tolerance, + } + ) + YIELD communityCount + + CALL gds.graph.drop(g.graphName) + YIELD graphName as droppedGraphName + + RETURN communityCount, g.nodeCount, g.relationshipCount + `; + + const leidenResult = await runQuery(leidenQuery, { + oldClusterId, + userId, + gamma: this.LEIDEN_GAMMA, + maxLevels: this.LEIDEN_MAX_LEVELS, + tolerance: this.LEIDEN_TOLERANCE, + }); + const subClusterCount = leidenResult[0]?.get("communityCount") || 1; + + // Step 4: Create new cluster IDs for sub-clusters that meet minimum size + const newClusterIds: string[] = []; + const assignClustersQuery = ` + MATCH (s:Statement) + WHERE s.clusterId = $oldClusterId AND s.userId = $userId AND s.tempClusterId IS NOT NULL + WITH s.tempClusterId as tempId, collect(s) as statements + WHERE size(statements) >= $minSize + + WITH tempId, statements, randomUUID() as newClusterId + + FOREACH (stmt IN statements | + SET stmt.clusterId = newClusterId + REMOVE stmt.tempClusterId + ) + + RETURN collect(newClusterId) as newClusterIds, count(DISTINCT newClusterId) as validSubClusters + `; + + const assignResult = await runQuery(assignClustersQuery, { + oldClusterId, + userId, + minSize: this.MIN_CLUSTER_SIZE, + }); + const validNewClusterIds = assignResult[0]?.get("newClusterIds") || []; + newClusterIds.push(...validNewClusterIds); + + // Step 5: Handle statements that didn't make it into valid sub-clusters + const orphanQuery = ` + MATCH (s:Statement) + WHERE s.clusterId = $oldClusterId AND s.userId = $userId + REMOVE s.tempClusterId + + // If we have valid sub-clusters, assign orphans to the largest one + WITH count(s) as orphanCount + MATCH (s2:Statement) + WHERE s2.clusterId IN $newClusterIds AND s2.userId = $userId + WITH s2.clusterId as clusterId, count(s2) as clusterSize, orphanCount + ORDER BY clusterSize DESC + LIMIT 1 + + MATCH (orphan:Statement) + WHERE orphan.clusterId = $oldClusterId AND orphan.userId = $userId + SET orphan.clusterId = clusterId + + RETURN count(orphan) as orphansReassigned + `; + + if (newClusterIds.length > 0) { + await runQuery(orphanQuery, { oldClusterId, userId, newClusterIds }); + } + + // Step 6: Create new Cluster nodes and evolution relationships + if (newClusterIds.length > 1) { + const createClustersQuery = ` + MATCH (oldCluster:Cluster {uuid: $oldClusterId}) + + UNWIND $newClusterIds as newClusterId + + MATCH (s:Statement {clusterId: newClusterId, userId: $userId}) + WITH oldCluster, newClusterId, count(s) as statementCount + + CREATE (newCluster:Cluster { + uuid: newClusterId, + createdAt: datetime(), + userId: $userId, + size: statementCount, + needsNaming: true, + aspectType: oldCluster.aspectType + }) + + CREATE (oldCluster)-[:SPLIT_INTO { + createdAt: datetime(), + reason: 'low_cohesion', + originalSize: $originalSize, + newSize: statementCount + }]->(newCluster) + + RETURN count(newCluster) as clustersCreated + `; + + await runQuery(createClustersQuery, { + oldClusterId, + newClusterIds, + originalSize: statementCount, + userId, + }); + + // Mark old cluster as evolved + await runQuery( + ` + MATCH (c:Cluster {uuid: $oldClusterId}) + SET c.evolved = true, c.evolvedAt = datetime() + `, + { oldClusterId }, + ); + + logger.info( + `Successfully split cluster ${oldClusterId} into ${newClusterIds.length} sub-clusters`, + ); + } else { + logger.info(`Cluster ${oldClusterId} could not be meaningfully split`); + newClusterIds.push(oldClusterId); // Keep original if splitting didn't work + } + + // Step 7: Clean up temporary relationships + await runQuery( + ` + MATCH ()-[r:TEMP_SIMILAR_TO]-() + DELETE r + `, + {}, + ); + + return newClusterIds; + } catch (error) { + logger.error(`Error splitting cluster ${oldClusterId}:`, { error }); + // Clean up on error + await runQuery( + ` + MATCH ()-[r:TEMP_SIMILAR_TO]-() + DELETE r + + MATCH (s:Statement) + WHERE s.clusterId = $oldClusterId AND s.userId = $userId + REMOVE s.tempClusterId + `, + { oldClusterId, userId }, + ); + throw error; + } + } + + /** + * Handle drift by splitting low-cohesion clusters + */ + async handleClusterDrift(userId: string): Promise<{ + clustersProcessed: number; + newClustersCreated: number; + splitClusters: string[]; + }> { + logger.info(`Handling cluster drift for user ${userId}`); + + try { + // Detect drift and get low-cohesion clusters + const driftMetrics = await this.detectClusterDrift(userId); + + if ( + !driftMetrics.driftDetected || + driftMetrics.lowCohesionClusters.length === 0 + ) { + logger.info("No drift detected or no clusters need splitting"); + return { + clustersProcessed: 0, + newClustersCreated: 0, + splitClusters: [], + }; + } + + logger.info( + `Found ${driftMetrics.lowCohesionClusters.length} clusters with low cohesion`, + ); + + let totalNewClusters = 0; + const splitClusters: string[] = []; + + // Process each low-cohesion cluster + for (const clusterId of driftMetrics.lowCohesionClusters) { + try { + const newClusterIds = await this.evolveCluster(clusterId, userId); + + if (newClusterIds.length > 1) { + // Cluster was successfully split + totalNewClusters += newClusterIds.length; + splitClusters.push(clusterId); + logger.info( + `Split cluster ${clusterId} into ${newClusterIds.length} sub-clusters`, + ); + } else { + logger.info(`Cluster ${clusterId} could not be split meaningfully`); + } + } catch (error) { + logger.error(`Failed to split cluster ${clusterId}:`, { error }); + // Continue with other clusters even if one fails + } + } + + // Update cluster embeddings for new clusters + if (totalNewClusters > 0) { + await this.updateClusterEmbeddings(userId); + await this.generateClusterNames(userId); + } + + logger.info( + `Drift handling completed: ${splitClusters.length} clusters split, ${totalNewClusters} new clusters created`, + ); + + return { + clustersProcessed: splitClusters.length, + newClustersCreated: totalNewClusters, + splitClusters, + }; + } catch (error) { + logger.error("Error handling cluster drift:", { error }); + throw error; + } } /**