mirror of
https://github.com/eliasstepanik/core.git
synced 2026-01-11 16:48:27 +00:00
* feat: space v3 * feat: connected space creation * fix: * fix: session_id for memory ingestion * chore: simplify gitignore patterns for agent directories --------- Co-authored-by: Manoj <saimanoj58@gmail.com>
385 lines
12 KiB
Python
385 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BERT Topic Modeling CLI for Echo Episodes
|
|
|
|
This CLI tool connects to Neo4j, retrieves episodes with their embeddings for a given userId,
|
|
and performs topic modeling using BERTopic to discover thematic clusters.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
from typing import List, Tuple, Dict, Any
|
|
import click
|
|
import numpy as np
|
|
from neo4j import GraphDatabase
|
|
from bertopic import BERTopic
|
|
from bertopic.vectorizers import ClassTfidfTransformer
|
|
from dotenv import load_dotenv
|
|
from sklearn.feature_extraction.text import CountVectorizer
|
|
from umap import UMAP
|
|
from hdbscan import HDBSCAN
|
|
|
|
|
|
class Neo4jConnection:
|
|
"""Manages Neo4j database connection."""
|
|
|
|
def __init__(self, uri: str, username: str, password: str, quiet: bool = False):
|
|
"""Initialize Neo4j connection.
|
|
|
|
Args:
|
|
uri: Neo4j connection URI (e.g., bolt://localhost:7687)
|
|
username: Neo4j username
|
|
password: Neo4j password
|
|
quiet: If True, suppress output messages
|
|
"""
|
|
self.quiet = quiet
|
|
try:
|
|
self.driver = GraphDatabase.driver(uri, auth=(username, password))
|
|
# Verify connection
|
|
self.driver.verify_connectivity()
|
|
if not quiet:
|
|
click.echo(f"✓ Connected to Neo4j at {uri}")
|
|
except Exception as e:
|
|
if not quiet:
|
|
click.echo(f"✗ Failed to connect to Neo4j: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
def close(self):
|
|
"""Close the Neo4j connection."""
|
|
if self.driver:
|
|
self.driver.close()
|
|
if not self.quiet:
|
|
click.echo("✓ Neo4j connection closed")
|
|
|
|
def get_episodes_with_embeddings(self, user_id: str) -> Tuple[List[str], List[str], np.ndarray]:
|
|
"""Fetch all episodes with their embeddings for a given user.
|
|
|
|
Args:
|
|
user_id: The user ID to fetch episodes for
|
|
|
|
Returns:
|
|
Tuple of (episode_uuids, episode_contents, embeddings_array)
|
|
"""
|
|
query = """
|
|
MATCH (e:Episode {userId: $userId})
|
|
WHERE e.contentEmbedding IS NOT NULL
|
|
AND size(e.contentEmbedding) > 0
|
|
AND e.content IS NOT NULL
|
|
AND e.content <> ''
|
|
RETURN e.uuid as uuid,
|
|
e.content as content,
|
|
e.contentEmbedding as embedding,
|
|
e.createdAt as createdAt
|
|
ORDER BY e.createdAt DESC
|
|
"""
|
|
|
|
with self.driver.session() as session:
|
|
result = session.run(query, userId=user_id)
|
|
records = list(result)
|
|
|
|
if not records:
|
|
if not self.quiet:
|
|
click.echo(f"✗ No episodes found for userId: {user_id}", err=True)
|
|
sys.exit(1)
|
|
|
|
uuids = []
|
|
contents = []
|
|
embeddings = []
|
|
|
|
for record in records:
|
|
uuids.append(record["uuid"])
|
|
contents.append(record["content"])
|
|
embeddings.append(record["embedding"])
|
|
|
|
embeddings_array = np.array(embeddings, dtype=np.float32)
|
|
|
|
if not self.quiet:
|
|
click.echo(f"✓ Fetched {len(contents)} episodes with embeddings")
|
|
return uuids, contents, embeddings_array
|
|
|
|
|
|
def run_bertopic_analysis(
|
|
contents: List[str],
|
|
embeddings: np.ndarray,
|
|
min_topic_size: int = 20,
|
|
nr_topics: int = None,
|
|
quiet: bool = False
|
|
) -> Tuple[BERTopic, List[int], List[float]]:
|
|
"""Run BERTopic clustering on episode contents with improved configuration.
|
|
|
|
Args:
|
|
contents: List of episode content strings
|
|
embeddings: Pre-computed embeddings for the episodes
|
|
min_topic_size: Minimum number of documents per topic
|
|
nr_topics: Target number of topics (optional, for topic reduction)
|
|
quiet: If True, suppress output messages
|
|
|
|
Returns:
|
|
Tuple of (bertopic_model, topic_assignments, probabilities)
|
|
"""
|
|
if not quiet:
|
|
click.echo(f"\n🔍 Running BERTopic analysis (min_topic_size={min_topic_size})...")
|
|
|
|
# Step 1: Configure UMAP for dimensionality reduction
|
|
# More aggressive reduction helps find distinct clusters
|
|
umap_model = UMAP(
|
|
n_neighbors=15, # Balance between local/global structure
|
|
n_components=5, # Reduce to 5 dimensions
|
|
min_dist=0.0, # Allow tight clusters
|
|
metric='cosine', # Use cosine similarity
|
|
random_state=42
|
|
)
|
|
|
|
# Step 2: Configure HDBSCAN for clustering
|
|
# Tuned to find more granular topics
|
|
hdbscan_model = HDBSCAN(
|
|
min_cluster_size=min_topic_size, # Minimum episodes per topic
|
|
min_samples=5, # More sensitive to local density
|
|
metric='euclidean',
|
|
cluster_selection_method='eom', # Excess of mass method
|
|
prediction_data=True
|
|
)
|
|
|
|
# Step 3: Configure vectorizer with stopword removal
|
|
# Remove common English stopwords that pollute topic keywords
|
|
vectorizer_model = CountVectorizer(
|
|
stop_words='english', # Remove common English words
|
|
min_df=2, # Word must appear in at least 2 docs
|
|
max_df=0.95, # Ignore words in >95% of docs
|
|
ngram_range=(1, 2) # Include unigrams and bigrams
|
|
)
|
|
|
|
# Step 4: Configure c-TF-IDF with better parameters
|
|
ctfidf_model = ClassTfidfTransformer(
|
|
reduce_frequent_words=True, # Further reduce common words
|
|
bm25_weighting=True # Use BM25 for better keyword extraction
|
|
)
|
|
|
|
# Step 5: Initialize BERTopic with all custom components
|
|
model = BERTopic(
|
|
embedding_model=None, # Use pre-computed embeddings
|
|
umap_model=umap_model,
|
|
hdbscan_model=hdbscan_model,
|
|
vectorizer_model=vectorizer_model,
|
|
ctfidf_model=ctfidf_model,
|
|
top_n_words=15, # More keywords per topic
|
|
nr_topics=nr_topics, # Optional topic reduction
|
|
calculate_probabilities=True,
|
|
verbose=(not quiet)
|
|
)
|
|
|
|
# Fit the model with pre-computed embeddings
|
|
topics, probs = model.fit_transform(contents, embeddings=embeddings)
|
|
|
|
# Get topic count
|
|
unique_topics = len(set(topics)) - (1 if -1 in topics else 0)
|
|
if not quiet:
|
|
click.echo(f"✓ Topic modeling complete - Found {unique_topics} topics")
|
|
|
|
return model, topics, probs
|
|
|
|
|
|
def print_topic_results(
|
|
model: BERTopic,
|
|
topics: List[int],
|
|
uuids: List[str],
|
|
contents: List[str]
|
|
):
|
|
"""Print formatted topic results.
|
|
|
|
Args:
|
|
model: Fitted BERTopic model
|
|
topics: Topic assignments for each episode
|
|
uuids: Episode UUIDs
|
|
contents: Episode contents
|
|
"""
|
|
# Get topic info
|
|
topic_info = model.get_topic_info()
|
|
num_topics = len(topic_info) - 1 # Exclude outlier topic (-1)
|
|
|
|
click.echo(f"\n{'='*80}")
|
|
click.echo(f"TOPIC MODELING RESULTS")
|
|
click.echo(f"{'='*80}")
|
|
click.echo(f"Total Topics Found: {num_topics}")
|
|
click.echo(f"Total Episodes: {len(contents)}")
|
|
click.echo(f"{'='*80}\n")
|
|
|
|
# Print each topic
|
|
for idx, row in topic_info.iterrows():
|
|
topic_id = row['Topic']
|
|
count = row['Count']
|
|
|
|
# Skip outlier topic
|
|
if topic_id == -1:
|
|
click.echo(f"Topic -1 (Outliers): {count} episodes\n")
|
|
continue
|
|
|
|
# Get top words for this topic
|
|
topic_words = model.get_topic(topic_id)
|
|
|
|
click.echo(f"{'─'*80}")
|
|
click.echo(f"Topic {topic_id}: {count} episodes")
|
|
click.echo(f"{'─'*80}")
|
|
|
|
# Print top keywords
|
|
if topic_words:
|
|
keywords = [word for word, score in topic_words[:10]]
|
|
click.echo(f"Keywords: {', '.join(keywords)}")
|
|
|
|
# Print sample episodes
|
|
topic_episodes = [(uuid, content) for uuid, content, topic
|
|
in zip(uuids, contents, topics) if topic == topic_id]
|
|
|
|
click.echo(f"\nSample Episodes (showing up to 3):")
|
|
for i, (uuid, content) in enumerate(topic_episodes[:3]):
|
|
# Truncate content for display
|
|
truncated = content[:200] + "..." if len(content) > 200 else content
|
|
click.echo(f" {i+1}. [{uuid}]")
|
|
click.echo(f" {truncated}\n")
|
|
|
|
click.echo()
|
|
|
|
|
|
def build_json_output(
|
|
model: BERTopic,
|
|
topics: List[int],
|
|
uuids: List[str]
|
|
) -> Dict[str, Any]:
|
|
"""Build JSON output structure.
|
|
|
|
Args:
|
|
model: Fitted BERTopic model
|
|
topics: Topic assignments for each episode
|
|
uuids: Episode UUIDs
|
|
|
|
Returns:
|
|
Dictionary with topics data
|
|
"""
|
|
# Build topics dictionary
|
|
topics_dict = {}
|
|
topic_info = model.get_topic_info()
|
|
|
|
for idx, row in topic_info.iterrows():
|
|
topic_id = row['Topic']
|
|
|
|
# Skip outlier topic
|
|
if topic_id == -1:
|
|
continue
|
|
|
|
# Get keywords
|
|
topic_words = model.get_topic(topic_id)
|
|
keywords = [word for word, score in topic_words[:10]] if topic_words else []
|
|
|
|
# Get episode IDs for this topic
|
|
episode_ids = [uuid for uuid, topic in zip(uuids, topics) if topic == topic_id]
|
|
|
|
topics_dict[topic_id] = {
|
|
"keywords": keywords,
|
|
"episodeIds": episode_ids
|
|
}
|
|
|
|
return {"topics": topics_dict}
|
|
|
|
|
|
@click.command()
|
|
@click.argument('user_id', type=str)
|
|
@click.option(
|
|
'--min-topic-size',
|
|
default=10,
|
|
type=int,
|
|
help='Minimum number of episodes per topic (default: 10, lower = more granular topics)'
|
|
)
|
|
@click.option(
|
|
'--nr-topics',
|
|
default=None,
|
|
type=int,
|
|
help='Target number of topics for reduction (optional, e.g., 20 for ~20 topics)'
|
|
)
|
|
@click.option(
|
|
'--neo4j-uri',
|
|
envvar='NEO4J_URI',
|
|
default='bolt://localhost:7687',
|
|
help='Neo4j connection URI (default: bolt://localhost:7687)'
|
|
)
|
|
@click.option(
|
|
'--neo4j-username',
|
|
envvar='NEO4J_USERNAME',
|
|
default='neo4j',
|
|
help='Neo4j username (default: neo4j)'
|
|
)
|
|
@click.option(
|
|
'--neo4j-password',
|
|
envvar='NEO4J_PASSWORD',
|
|
required=True,
|
|
help='Neo4j password (required, can use NEO4J_PASSWORD env var)'
|
|
)
|
|
@click.option(
|
|
'--json',
|
|
'json_output',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Output only final results in JSON format (suppresses all other output)'
|
|
)
|
|
def main(user_id: str, min_topic_size: int, nr_topics: int, neo4j_uri: str, neo4j_username: str, neo4j_password: str, json_output: bool):
|
|
"""
|
|
Run BERTopic analysis on episodes for a given USER_ID.
|
|
|
|
This tool connects to Neo4j, retrieves all episodes with embeddings for the specified user,
|
|
and performs topic modeling to discover thematic clusters.
|
|
|
|
Examples:
|
|
|
|
# Using environment variables from .env file
|
|
python main.py user-123
|
|
|
|
# With custom min topic size
|
|
python main.py user-123 --min-topic-size 10
|
|
|
|
# With explicit Neo4j credentials
|
|
python main.py user-123 --neo4j-uri bolt://localhost:7687 --neo4j-password mypassword
|
|
"""
|
|
# Print header only if not in JSON mode
|
|
if not json_output:
|
|
click.echo(f"\n{'='*80}")
|
|
click.echo("BERT TOPIC MODELING FOR ECHO EPISODES")
|
|
click.echo(f"{'='*80}")
|
|
click.echo(f"User ID: {user_id}")
|
|
click.echo(f"Min Topic Size: {min_topic_size}")
|
|
if nr_topics:
|
|
click.echo(f"Target Topics: ~{nr_topics}")
|
|
click.echo(f"{'='*80}\n")
|
|
|
|
# Connect to Neo4j (quiet mode if JSON output)
|
|
neo4j_conn = Neo4jConnection(neo4j_uri, neo4j_username, neo4j_password, quiet=json_output)
|
|
|
|
try:
|
|
# Fetch episodes with embeddings
|
|
uuids, contents, embeddings = neo4j_conn.get_episodes_with_embeddings(user_id)
|
|
|
|
# Run BERTopic analysis
|
|
model, topics, probs = run_bertopic_analysis(contents, embeddings, min_topic_size, nr_topics, quiet=json_output)
|
|
|
|
# Output results
|
|
if json_output:
|
|
# JSON output mode - only print JSON
|
|
output = build_json_output(model, topics, uuids)
|
|
click.echo(json.dumps(output, indent=2))
|
|
else:
|
|
# Normal output mode - print formatted results
|
|
print_topic_results(model, topics, uuids, contents)
|
|
|
|
click.echo(f"{'='*80}")
|
|
click.echo("✓ Analysis complete!")
|
|
click.echo(f"{'='*80}\n")
|
|
|
|
finally:
|
|
# Always close connection
|
|
neo4j_conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Load environment variables from .env file if present
|
|
load_dotenv()
|
|
main()
|