mirror of
https://github.com/eliasstepanik/core.git
synced 2026-01-11 17:18:28 +00:00
426 lines
14 KiB
TypeScript
426 lines
14 KiB
TypeScript
import neo4j from "neo4j-driver";
|
|
import { type RawTriplet } from "~/components/graph/type";
|
|
import { logger } from "~/services/logger.service";
|
|
import { singleton } from "~/utils/singleton";
|
|
|
|
// Create a singleton driver instance
|
|
const driver = singleton("neo4j", getDriver);
|
|
const EMBEDDING_MODEL_SIZE = process.env.EMBEDDING_MODEL_SIZE ?? "1024";
|
|
|
|
function getDriver() {
|
|
return neo4j.driver(
|
|
process.env.NEO4J_URI ?? "bolt://localhost:7687",
|
|
neo4j.auth.basic(
|
|
process.env.NEO4J_USERNAME as string,
|
|
process.env.NEO4J_PASSWORD as string,
|
|
),
|
|
{
|
|
maxConnectionPoolSize: 50,
|
|
logging: {
|
|
level: "info",
|
|
logger: (level, message) => {
|
|
logger.info(message);
|
|
},
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
let schemaInitialized = false;
|
|
|
|
// Test the connection
|
|
const verifyConnectivity = async () => {
|
|
try {
|
|
await driver.verifyConnectivity();
|
|
logger.info("Connected to Neo4j database");
|
|
return true;
|
|
} catch (error) {
|
|
logger.error("Failed to connect to Neo4j database");
|
|
return false;
|
|
}
|
|
};
|
|
// Run a Cypher query
|
|
const runQuery = async (cypher: string, params = {}) => {
|
|
const session = driver.session();
|
|
try {
|
|
const result = await session.run(cypher, params);
|
|
return result.records;
|
|
} catch (error) {
|
|
logger.error(`Error running Cypher query: ${cypher} ${error}`);
|
|
throw error;
|
|
} finally {
|
|
await session.close();
|
|
}
|
|
};
|
|
|
|
// Get all nodes and relationships for a user
|
|
export const getAllNodesForUser = async (userId: string) => {
|
|
const session = driver.session();
|
|
try {
|
|
const result = await session.run(
|
|
`MATCH (n)-[r]->(m)
|
|
WHERE n.userId = $userId OR m.userId = $userId
|
|
RETURN n, r, m`,
|
|
{ userId },
|
|
);
|
|
return result.records;
|
|
} catch (error) {
|
|
logger.error(`Error getting nodes for user ${userId}: ${error}`);
|
|
throw error;
|
|
} finally {
|
|
await session.close();
|
|
}
|
|
};
|
|
|
|
export const getNodeLinks = async (userId: string) => {
|
|
const result = await getAllNodesForUser(userId);
|
|
const triplets: RawTriplet[] = [];
|
|
|
|
result.forEach((record) => {
|
|
const sourceNode = record.get("n");
|
|
const targetNode = record.get("m");
|
|
const edge = record.get("r");
|
|
triplets.push({
|
|
sourceNode: {
|
|
uuid: sourceNode.identity.toString(),
|
|
labels: sourceNode.labels,
|
|
attributes: sourceNode.properties,
|
|
name: sourceNode.properties.name || "",
|
|
createdAt: sourceNode.properties.createdAt || "",
|
|
},
|
|
edge: {
|
|
uuid: edge.identity.toString(),
|
|
type: edge.type,
|
|
source_node_uuid: sourceNode.identity.toString(),
|
|
target_node_uuid: targetNode.identity.toString(),
|
|
createdAt: edge.properties.createdAt || "",
|
|
},
|
|
targetNode: {
|
|
uuid: targetNode.identity.toString(),
|
|
labels: targetNode.labels,
|
|
attributes: targetNode.properties,
|
|
name: targetNode.properties.name || "",
|
|
createdAt: edge.properties.createdAt || "",
|
|
},
|
|
});
|
|
});
|
|
|
|
return triplets;
|
|
};
|
|
|
|
// Get graph data with cluster information for reified graph
|
|
export const getClusteredGraphData = async (userId: string) => {
|
|
const session = driver.session();
|
|
try {
|
|
// Get the proper reified graph structure: Entity -> Statement -> Entity
|
|
const result = await session.run(
|
|
`// Get all statements and their entity connections for reified graph
|
|
MATCH (s:Statement)
|
|
WHERE s.userId = $userId
|
|
|
|
// Get all entities connected to each statement
|
|
MATCH (s)-[:HAS_SUBJECT]->(subj:Entity)
|
|
MATCH (s)-[:HAS_PREDICATE]->(pred:Entity)
|
|
MATCH (s)-[:HAS_OBJECT]->(obj:Entity)
|
|
|
|
// Return both Entity->Statement and Statement->Entity relationships
|
|
WITH s, subj, pred, obj
|
|
UNWIND [
|
|
// Subject Entity -> Statement
|
|
{source: subj, target: s, type: 'HAS_SUBJECT', isEntityToStatement: true},
|
|
// Statement -> Predicate Entity
|
|
{source: s, target: pred, type: 'HAS_PREDICATE', isStatementToEntity: true},
|
|
// Statement -> Object Entity
|
|
{source: s, target: obj, type: 'HAS_OBJECT', isStatementToEntity: true}
|
|
] AS rel
|
|
|
|
RETURN DISTINCT
|
|
rel.source.uuid as sourceUuid,
|
|
rel.source.name as sourceName,
|
|
rel.source.labels as sourceLabels,
|
|
rel.source.type as sourceType,
|
|
rel.source.properties as sourceProperties,
|
|
rel.target.uuid as targetUuid,
|
|
rel.target.name as targetName,
|
|
rel.target.type as targetType,
|
|
rel.target.labels as targetLabels,
|
|
rel.target.properties as targetProperties,
|
|
rel.type as relationshipType,
|
|
s.uuid as statementUuid,
|
|
s.spaceIds as spaceIds,
|
|
s.fact as fact,
|
|
s.invalidAt as invalidAt,
|
|
s.validAt as validAt,
|
|
s.createdAt as createdAt,
|
|
rel.isEntityToStatement as isEntityToStatement,
|
|
rel.isStatementToEntity as isStatementToEntity`,
|
|
{ userId },
|
|
);
|
|
|
|
const triplets: RawTriplet[] = [];
|
|
const processedEdges = new Set<string>();
|
|
|
|
result.records.forEach((record) => {
|
|
const sourceUuid = record.get("sourceUuid");
|
|
const sourceName = record.get("sourceName");
|
|
const sourceType = record.get("sourceType");
|
|
const sourceLabels = record.get("sourceLabels") || [];
|
|
const sourceProperties = record.get("sourceProperties") || {};
|
|
|
|
const targetUuid = record.get("targetUuid");
|
|
const targetName = record.get("targetName");
|
|
const targetLabels = record.get("targetLabels") || [];
|
|
const targetProperties = record.get("targetProperties") || {};
|
|
const targetType = record.get("targetType");
|
|
|
|
const relationshipType = record.get("relationshipType");
|
|
const statementUuid = record.get("statementUuid");
|
|
const clusterIds = record.get("spaceIds");
|
|
const clusterId = clusterIds ? clusterIds[0] : undefined;
|
|
const fact = record.get("fact");
|
|
const invalidAt = record.get("invalidAt");
|
|
const validAt = record.get("validAt");
|
|
const createdAt = record.get("createdAt");
|
|
|
|
// Create unique edge identifier to avoid duplicates
|
|
const edgeKey = `${sourceUuid}-${targetUuid}-${relationshipType}`;
|
|
if (processedEdges.has(edgeKey)) return;
|
|
processedEdges.add(edgeKey);
|
|
|
|
// Determine node types and add appropriate cluster information
|
|
const isSourceStatement =
|
|
sourceLabels.includes("Statement") || sourceUuid === statementUuid;
|
|
const isTargetStatement =
|
|
targetLabels.includes("Statement") || targetUuid === statementUuid;
|
|
|
|
// Statement nodes get cluster info, Entity nodes get default attributes
|
|
const sourceAttributes = isSourceStatement
|
|
? {
|
|
...sourceProperties,
|
|
clusterId,
|
|
nodeType: "Statement",
|
|
fact,
|
|
invalidAt,
|
|
validAt,
|
|
}
|
|
: {
|
|
...sourceProperties,
|
|
nodeType: "Entity",
|
|
type: sourceType,
|
|
name: sourceName,
|
|
};
|
|
|
|
const targetAttributes = isTargetStatement
|
|
? {
|
|
...targetProperties,
|
|
clusterId,
|
|
nodeType: "Statement",
|
|
fact,
|
|
invalidAt,
|
|
validAt,
|
|
}
|
|
: {
|
|
...targetProperties,
|
|
nodeType: "Entity",
|
|
type: targetType,
|
|
name: targetName,
|
|
};
|
|
|
|
triplets.push({
|
|
sourceNode: {
|
|
uuid: sourceUuid,
|
|
labels: sourceLabels,
|
|
attributes: sourceAttributes,
|
|
name: isSourceStatement ? fact : sourceName || sourceUuid,
|
|
clusterId,
|
|
createdAt: createdAt || "",
|
|
},
|
|
edge: {
|
|
uuid: `${sourceUuid}-${targetUuid}-${relationshipType}`,
|
|
type: relationshipType,
|
|
source_node_uuid: sourceUuid,
|
|
target_node_uuid: targetUuid,
|
|
createdAt: createdAt || "",
|
|
},
|
|
targetNode: {
|
|
uuid: targetUuid,
|
|
labels: targetLabels,
|
|
attributes: targetAttributes,
|
|
clusterId,
|
|
name: isTargetStatement ? fact : targetName || targetUuid,
|
|
createdAt: createdAt || "",
|
|
},
|
|
});
|
|
});
|
|
|
|
return triplets;
|
|
} catch (error) {
|
|
logger.error(
|
|
`Error getting clustered graph data for user ${userId}: ${error}`,
|
|
);
|
|
throw error;
|
|
} finally {
|
|
await session.close();
|
|
}
|
|
};
|
|
|
|
export async function initNeo4jSchemaOnce() {
|
|
if (schemaInitialized) return;
|
|
|
|
const session = driver.session();
|
|
|
|
try {
|
|
// Check if schema already exists
|
|
const result = await session.run(`
|
|
SHOW INDEXES YIELD name WHERE name = "entity_name" RETURN name
|
|
`);
|
|
|
|
if (result.records.length === 0) {
|
|
// Run your schema creation here (indexes, constraints, etc.)
|
|
await initializeSchema();
|
|
}
|
|
|
|
schemaInitialized = true;
|
|
} catch (e: any) {
|
|
logger.error("Error in initialising", e);
|
|
} finally {
|
|
await session.close();
|
|
}
|
|
}
|
|
|
|
// Initialize the database schema
|
|
const initializeSchema = async () => {
|
|
try {
|
|
logger.info("Initialising neo4j schema");
|
|
|
|
// Create constraints for unique IDs
|
|
await runQuery(
|
|
"CREATE CONSTRAINT episode_uuid IF NOT EXISTS FOR (n:Episode) REQUIRE n.uuid IS UNIQUE",
|
|
);
|
|
await runQuery(
|
|
"CREATE CONSTRAINT entity_uuid IF NOT EXISTS FOR (n:Entity) REQUIRE n.uuid IS UNIQUE",
|
|
);
|
|
await runQuery(
|
|
"CREATE CONSTRAINT statement_uuid IF NOT EXISTS FOR (n:Statement) REQUIRE n.uuid IS UNIQUE",
|
|
);
|
|
await runQuery(
|
|
"CREATE CONSTRAINT cluster_uuid IF NOT EXISTS FOR (n:Cluster) REQUIRE n.uuid IS UNIQUE",
|
|
);
|
|
|
|
// Create indexes for better query performance
|
|
await runQuery(
|
|
"CREATE INDEX episode_valid_at IF NOT EXISTS FOR (n:Episode) ON (n.validAt)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_valid_at IF NOT EXISTS FOR (n:Statement) ON (n.validAt)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_invalid_at IF NOT EXISTS FOR (n:Statement) ON (n.invalidAt)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_cluster_id IF NOT EXISTS FOR (n:Statement) ON (n.clusterId)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_space_id IF NOT EXISTS FOR (n:Statement) ON (n.spaceId)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX entity_user_id IF NOT EXISTS FOR (n:Entity) ON (n.userId)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_user_id IF NOT EXISTS FOR (n:Statement) ON (n.userId)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX cluster_user_id IF NOT EXISTS FOR (n:Cluster) ON (n.userId)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX cluster_aspect_type IF NOT EXISTS FOR (n:Cluster) ON (n.aspectType)",
|
|
);
|
|
|
|
// Space-optimized indexes for better query performance
|
|
await runQuery(
|
|
"CREATE INDEX space_user_uuid IF NOT EXISTS FOR (n:Space) ON (n.userId, n.uuid)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX space_user_active IF NOT EXISTS FOR (n:Space) ON (n.userId, n.isActive)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_user_spaces IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.spaceIds)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_user_invalid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.invalidAt)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX statement_user_uuid IF NOT EXISTS FOR (n:Statement) ON (n.userId, n.uuid)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX entity_user_uuid IF NOT EXISTS FOR (n:Entity) ON (n.userId, n.uuid)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX episode_user_uuid IF NOT EXISTS FOR (n:Episode) ON (n.userId, n.uuid)",
|
|
);
|
|
await runQuery(
|
|
"CREATE INDEX episode_user_id IF NOT EXISTS FOR (n:Episode) ON (n.userId)",
|
|
);
|
|
|
|
// Create vector indexes for semantic search (if using Neo4j 5.0+)
|
|
await runQuery(`
|
|
CREATE VECTOR INDEX entity_embedding IF NOT EXISTS FOR (n:Entity) ON n.nameEmbedding
|
|
OPTIONS {indexConfig: {\`vector.dimensions\`: ${EMBEDDING_MODEL_SIZE}, \`vector.similarity_function\`: 'cosine', \`vector.hnsw.ef_construction\`: 400, \`vector.hnsw.m\`: 32}}
|
|
`);
|
|
|
|
await runQuery(`
|
|
CREATE VECTOR INDEX statement_embedding IF NOT EXISTS FOR (n:Statement) ON n.factEmbedding
|
|
OPTIONS {indexConfig: {\`vector.dimensions\`: ${EMBEDDING_MODEL_SIZE}, \`vector.similarity_function\`: 'cosine', \`vector.hnsw.ef_construction\`: 400, \`vector.hnsw.m\`: 32}}
|
|
`);
|
|
|
|
await runQuery(`
|
|
CREATE VECTOR INDEX episode_embedding IF NOT EXISTS FOR (n:Episode) ON n.contentEmbedding
|
|
OPTIONS {indexConfig: {\`vector.dimensions\`: ${EMBEDDING_MODEL_SIZE}, \`vector.similarity_function\`: 'cosine', \`vector.hnsw.ef_construction\`: 400, \`vector.hnsw.m\`: 32}}
|
|
`);
|
|
|
|
// Create fulltext indexes for BM25 search
|
|
await runQuery(`
|
|
CREATE FULLTEXT INDEX statement_fact_index IF NOT EXISTS
|
|
FOR (n:Statement) ON EACH [n.fact]
|
|
OPTIONS {
|
|
indexConfig: {
|
|
\`fulltext.analyzer\`: 'english'
|
|
}
|
|
}
|
|
`);
|
|
|
|
await runQuery(`
|
|
CREATE FULLTEXT INDEX entity_name_index IF NOT EXISTS
|
|
FOR (n:Entity) ON EACH [n.name]
|
|
OPTIONS {
|
|
indexConfig: {
|
|
\`fulltext.analyzer\`: 'english'
|
|
}
|
|
}
|
|
`);
|
|
|
|
logger.info("Neo4j schema initialized successfully");
|
|
return true;
|
|
} catch (error) {
|
|
logger.error("Failed to initialize Neo4j schema", { error });
|
|
return false;
|
|
}
|
|
};
|
|
|
|
// Close the driver when the application shuts down
|
|
const closeDriver = async () => {
|
|
await driver.close();
|
|
logger.info("Neo4j driver closed");
|
|
};
|
|
|
|
export { driver, verifyConnectivity, runQuery, initializeSchema, closeDriver };
|