diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 08afc4a..34e839a 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -39,6 +39,14 @@ const EnvironmentSchema = z.object({ REDIS_HOST: z.string().default("localhost"), REDIS_PORT: z.coerce.number().default(6379), REDIS_TLS_DISABLED: z.coerce.boolean().default(true), + + //Neo4j + NEO4J_URI: z.string(), + NEO4J_USERNAME: z.string(), + NEO4J_PASSWORD: z.string(), + + //OpenAI + OPENAI_API_KEY: z.string(), }); export type Environment = z.infer; diff --git a/apps/webapp/app/lib/neo4j.server.ts b/apps/webapp/app/lib/neo4j.server.ts new file mode 100644 index 0000000..809ac0e --- /dev/null +++ b/apps/webapp/app/lib/neo4j.server.ts @@ -0,0 +1,87 @@ +import neo4j from "neo4j-driver"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.service"; + +// Create a driver instance +const driver = neo4j.driver( + env.NEO4J_URI, + neo4j.auth.basic(env.NEO4J_USERNAME, env.NEO4J_PASSWORD), + { + maxConnectionPoolSize: 50, + logging: { + level: "info", + logger: (level, message) => { + logger.info(message); + }, + }, + }, +); + +// Test the connection +const verifyConnectivity = async () => { + try { + await driver.verifyConnectivity(); + logger.info("Connected to Neo4j database"); + return true; + } catch (error) { + logger.error("Failed to connect to Neo4j database"); + return false; + } +}; + +// Run a Cypher query +const runQuery = async (cypher: string, params = {}) => { + const session = driver.session(); + try { + const result = await session.run(cypher, params); + return result.records; + } catch (error) { + logger.error(`Error running Cypher query: ${cypher} ${error}`); + throw error; + } finally { + await session.close(); + } +}; + +// Initialize the database schema +const initializeSchema = async () => { + try { + // Run schema setup queries + await runQuery(` + // Create constraints for unique IDs + CREATE CONSTRAINT episode_uuid IF NOT EXISTS FOR (n:Episode) REQUIRE n.uuid IS UNIQUE; + CREATE CONSTRAINT entity_uuid IF NOT EXISTS FOR (n:Entity) REQUIRE n.uuid IS UNIQUE; + CREATE CONSTRAINT statement_uuid IF NOT EXISTS FOR (n:Statement) REQUIRE n.uuid IS UNIQUE; + + // Create indexes for better query performance + CREATE INDEX episode_valid_at IF NOT EXISTS FOR (n:Episode) ON (n.validAt); + CREATE INDEX statement_valid_at IF NOT EXISTS FOR (n:Statement) ON (n.validAt); + CREATE INDEX statement_invalid_at IF NOT EXISTS FOR (n:Statement) ON (n.invalidAt); + CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name); + + // Create vector indexes for semantic search (if using Neo4j 5.0+) + CREATE VECTOR INDEX entity_embedding IF NOT EXISTS FOR (n:Entity) ON n.nameEmbedding + OPTIONS {indexConfig: {dimensions: 1536, similarity: "cosine"}}; + + CREATE VECTOR INDEX statement_embedding IF NOT EXISTS FOR (n:Statement) ON n.factEmbedding + OPTIONS {indexConfig: {dimensions: 1536, similarity: "cosine"}}; + + CREATE VECTOR INDEX episode_embedding IF NOT EXISTS FOR (n:Episode) ON n.contentEmbedding + OPTIONS {indexConfig: {dimensions: 1536, similarity: "cosine"}}; + `); + + logger.info("Neo4j schema initialized successfully"); + return true; + } catch (error) { + logger.error("Failed to initialize Neo4j schema", { error }); + return false; + } +}; + +// Close the driver when the application shuts down +const closeDriver = async () => { + await driver.close(); + logger.info("Neo4j driver closed"); +}; + +export { driver, verifyConnectivity, runQuery, initializeSchema, closeDriver }; diff --git a/apps/webapp/app/routes/_index.tsx b/apps/webapp/app/routes/_index.tsx index 13a5c00..15581f1 100644 --- a/apps/webapp/app/routes/_index.tsx +++ b/apps/webapp/app/routes/_index.tsx @@ -1,4 +1,5 @@ import type { MetaFunction } from "@remix-run/node"; +import { createPersonalAccessToken } from "~/services/personalAccessToken.server"; export const meta: MetaFunction = () => { return [ diff --git a/apps/webapp/app/routes/ingest.tsx b/apps/webapp/app/routes/ingest.tsx index 84d1aa7..3717be7 100644 --- a/apps/webapp/app/routes/ingest.tsx +++ b/apps/webapp/app/routes/ingest.tsx @@ -2,7 +2,6 @@ import { EpisodeType } from "@recall/types"; import { json, LoaderFunctionArgs } from "@remix-run/node"; import { z } from "zod"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { KnowledgeGraphService } from "../services/knowledgeGraph.server"; import { getUserQueue } from "~/lib/ingest.queue"; import { prisma } from "~/db.server"; import { IngestionStatus } from "@recall/database"; diff --git a/apps/webapp/app/services/graphModels/entity.ts b/apps/webapp/app/services/graphModels/entity.ts new file mode 100644 index 0000000..0fc3b8f --- /dev/null +++ b/apps/webapp/app/services/graphModels/entity.ts @@ -0,0 +1,91 @@ +import type { EntityNode } from "@recall/types"; +import { runQuery } from "~/lib/neo4j.server"; + +export async function saveEntity(entity: EntityNode): Promise { + const query = ` + MERGE (n:Entity {uuid: $uuid}) + ON CREATE SET + n.name = $name, + n.type = $type, + n.attributesJson = $attributesJson, + n.nameEmbedding = $nameEmbedding, + n.createdAt = $createdAt, + n.userId = $userId, + n.space = $space + ON MATCH SET + n.name = $name, + n.type = $type, + n.attributesJson = $attributesJson, + n.nameEmbedding = $nameEmbedding, + n.space = $space + RETURN n.uuid as uuid + `; + + const params = { + uuid: entity.uuid, + name: entity.name, + type: entity.type, + attributesJson: JSON.stringify(entity.attributes || {}), + nameEmbedding: entity.nameEmbedding, + createdAt: entity.createdAt.toISOString(), + userId: entity.userId, + space: entity.space || null, + }; + + const result = await runQuery(query, params); + return result[0].get("uuid"); +} + +export async function getEntity(uuid: string): Promise { + const query = ` + MATCH (entity:Entity {uuid: $uuid}) + RETURN entity + `; + + const result = await runQuery(query, { uuid }); + if (result.length === 0) return null; + + const entity = result[0].get("entity").properties; + return { + uuid: entity.uuid, + name: entity.name, + type: entity.type, + attributes: JSON.parse(entity.attributesJson || "{}"), + nameEmbedding: entity.nameEmbedding, + createdAt: new Date(entity.createdAt), + userId: entity.userId, + space: entity.space, + }; +} + +// Find semantically similar entities +export async function findSimilarEntities(params: { + queryEmbedding: number[]; + limit: number; + threshold: number; +}): Promise { + const query = ` + MATCH (entity:Entity) + WHERE entity.nameEmbedding IS NOT NULL + WITH entity, vector.similarity.cosine($queryEmbedding, entity.nameEmbedding) AS score + WHERE score >= $threshold + RETURN entity, score + ORDER BY score DESC + `; + + const result = await runQuery(query, params); + return result.map((record) => { + const entity = record.get("entity").properties; + + return { + uuid: entity.uuid, + name: entity.name, + type: entity.type, + attributes: JSON.parse(entity.attributesJson || "{}"), + nameEmbedding: entity.nameEmbedding, + createdAt: new Date(entity.createdAt), + userId: entity.userId, + space: entity.space, + }; + }); +} diff --git a/apps/webapp/app/services/graphModels/episode.ts b/apps/webapp/app/services/graphModels/episode.ts new file mode 100644 index 0000000..b6f8da3 --- /dev/null +++ b/apps/webapp/app/services/graphModels/episode.ts @@ -0,0 +1,131 @@ +import { runQuery } from "~/lib/neo4j.server"; +import type { EpisodicNode } from "@recall/types"; + +export async function saveEpisode(episode: EpisodicNode): Promise { + const query = ` + MERGE (e:Episode {uuid: $uuid}) + ON CREATE SET + e.name = $name, + e.content = $content, + e.contentEmbedding = $contentEmbedding, + e.type = $type, + e.source = $source, + e.createdAt = $createdAt, + e.validAt = $validAt, + e.userId = $userId, + e.labels = $labels, + e.space = $space, + e.sessionId = $sessionId + ON MATCH SET + e.name = $name, + e.content = $content, + e.contentEmbedding = $contentEmbedding, + e.type = $type, + e.source = $source, + e.validAt = $validAt, + e.labels = $labels, + e.space = $space, + e.sessionId = $sessionId + RETURN e.uuid as uuid + `; + + const params = { + uuid: episode.uuid, + name: episode.name, + content: episode.content, + source: episode.source, + type: episode.type, + userId: episode.userId || null, + labels: episode.labels || [], + createdAt: episode.createdAt.toISOString(), + validAt: episode.validAt.toISOString(), + contentEmbedding: episode.contentEmbedding || [], + space: episode.space || null, + sessionId: episode.sessionId || null, + }; + + const result = await runQuery(query, params); + return result[0].get("uuid"); +} + +// Get an episode by UUID +export async function getEpisode(uuid: string): Promise { + const query = ` + MATCH (e:Episode {uuid: $uuid}) + RETURN e + `; + + const result = await runQuery(query, { uuid }); + if (result.length === 0) return null; + + const episode = result[0].get("e").properties; + return { + uuid: episode.uuid, + name: episode.name, + content: episode.content, + contentEmbedding: episode.contentEmbedding, + type: episode.type, + source: episode.source, + createdAt: new Date(episode.createdAt), + validAt: new Date(episode.validAt), + labels: episode.labels, + userId: episode.userId, + space: episode.space, + sessionId: episode.sessionId, + }; +} + +// Get recent episodes with optional filters +export async function getRecentEpisodes(params: { + referenceTime: Date; + limit: number; + userId: string; + source?: string; + sessionId?: string; +}): Promise { + let filters = `WHERE e.validAt <= $referenceTime + AND e.userId = $userId`; + + if (params.source) { + filters += `\nAND e.source = $source`; + } + + if (params.sessionId) { + filters += `\nAND e.sessionId = $sessionId`; + } + + const query = ` + MATCH (e:Episode) + ${filters} + RETURN e + ORDER BY e.validAt DESC + LIMIT ${params.limit} + `; + + const queryParams = { + referenceTime: new Date(params.referenceTime).toISOString(), + userId: params.userId, + source: params.source || null, + sessionId: params.sessionId || null, + }; + + const result = await runQuery(query, queryParams); + + return result.map((record) => { + const episode = record.get("e").properties; + return { + uuid: episode.uuid, + name: episode.name, + content: episode.content, + contentEmbedding: episode.contentEmbedding, + type: episode.type, + source: episode.source, + createdAt: new Date(episode.createdAt), + validAt: new Date(episode.validAt), + labels: episode.labels, + userId: episode.userId, + space: episode.space, + sessionId: episode.sessionId, + }; + }); +} diff --git a/apps/webapp/app/services/graphModels/statement.ts b/apps/webapp/app/services/graphModels/statement.ts new file mode 100644 index 0000000..2e16af1 --- /dev/null +++ b/apps/webapp/app/services/graphModels/statement.ts @@ -0,0 +1,319 @@ +import type { + EntityNode, + EpisodicNode, + StatementNode, + Triple, +} from "@recall/types"; +import { runQuery } from "~/lib/neo4j.server"; +import { saveEntity } from "./entity"; +import { saveEpisode } from "./episode"; + +export async function saveTriple(triple: Triple): Promise { + // First, save the Episode + await saveEpisode(triple.provenance); + + // Then, save the Statement + const statementQuery = ` + MERGE (n:Statement {uuid: $uuid, userId: $userId}) + ON CREATE SET + n.fact = $fact, + n.embedding = $embedding, + n.createdAt = $createdAt, + n.validAt = $validAt, + n.invalidAt = $invalidAt, + n.attributesJson = $attributesJson, + n.userId = $userId, + n.space = $space + ON MATCH SET + n.fact = $fact, + n.embedding = $embedding, + n.validAt = $validAt, + n.invalidAt = $invalidAt, + n.attributesJson = $attributesJson, + n.space = $space + RETURN n.uuid as uuid + `; + + const statementParams = { + uuid: triple.statement.uuid, + fact: triple.statement.fact, + embedding: triple.statement.factEmbedding, + createdAt: triple.statement.createdAt.toISOString(), + validAt: triple.statement.validAt.toISOString(), + invalidAt: triple.statement.invalidAt + ? triple.statement.invalidAt.toISOString() + : null, + attributesJson: JSON.stringify(triple.statement.attributes || {}), + userId: triple.provenance.userId, + space: triple.statement.space || null, + }; + + const statementResult = await runQuery(statementQuery, statementParams); + const statementUuid = statementResult[0].get("uuid"); + + // Then, save the Entities + const subjectUuid = await saveEntity(triple.subject); + const predicateUuid = await saveEntity(triple.predicate); + const objectUuid = await saveEntity(triple.object); + + // Then, create relationships + const relationshipsQuery = ` + MATCH (statement:Statement {uuid: $statementUuid, userId: $userId}) + MATCH (subject:Entity {uuid: $subjectUuid, userId: $userId}) + MATCH (predicate:Entity {uuid: $predicateUuid, userId: $userId}) + MATCH (object:Entity {uuid: $objectUuid, userId: $userId}) + MATCH (episode:Episode {uuid: $episodeUuid, userId: $userId}) + + CREATE (episode)-[:HAS_PROVENANCE {uuid: $provenanceEdgeUuid, createdAt: $createdAt}]->(statement) + CREATE (statement)-[:HAS_SUBJECT {uuid: $subjectEdgeUuid, createdAt: $createdAt}]->(subject) + CREATE (statement)-[:HAS_PREDICATE {uuid: $predicateEdgeUuid, createdAt: $createdAt}]->(predicate) + CREATE (statement)-[:HAS_OBJECT {uuid: $objectEdgeUuid, createdAt: $createdAt}]->(object) + + RETURN statement.uuid as uuid + `; + + const now = new Date().toISOString(); + const relationshipsParams = { + statementUuid, + subjectUuid, + predicateUuid, + objectUuid, + episodeUuid: triple.provenance.uuid, + subjectEdgeUuid: crypto.randomUUID(), + predicateEdgeUuid: crypto.randomUUID(), + objectEdgeUuid: crypto.randomUUID(), + provenanceEdgeUuid: crypto.randomUUID(), + createdAt: now, + userId: triple.provenance.userId, + }; + + await runQuery(relationshipsQuery, relationshipsParams); + return statementUuid; +} + +/** + * Find statements that might contradict a new statement (same subject and predicate) + */ +export async function findContradictoryStatements({ + subjectId, + predicateId, +}: { + subjectId: string; + predicateId: string; +}): Promise { + const query = ` + MATCH (statement:Statement) + WHERE statement.invalidAt IS NULL + MATCH (subject:Entity)<-[:HAS_SUBJECT]-(statement)-[:HAS_PREDICATE]->(predicate:Entity) + WHERE subject.uuid = $subjectId AND predicate.uuid = $predicateId + RETURN statement + `; + + const result = await runQuery(query, { subjectId, predicateId }); + + if (!result || result.length === 0) { + return []; + } + + return result.map((record) => { + const statement = record.get("statement").properties; + return { + uuid: statement.uuid, + fact: statement.fact, + factEmbedding: statement.factEmbedding, + createdAt: new Date(statement.createdAt), + validAt: new Date(statement.validAt), + invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, + attributes: statement.attributesJson + ? JSON.parse(statement.attributesJson) + : {}, + userId: statement.userId, + }; + }); +} + +/** + * Find statements that are semantically similar to a given statement using embedding similarity + */ +export async function findSimilarStatements({ + factEmbedding, + threshold = 0.85, + excludeIds = [], +}: { + factEmbedding: number[]; + threshold?: number; + excludeIds?: string[]; +}): Promise { + const query = ` + MATCH (statement:Statement) + WHERE statement.invalidAt IS NULL + AND statement.factEmbedding IS NOT NULL + ${excludeIds.length > 0 ? "AND NOT statement.uuid IN $excludeIds" : ""} + WITH statement, vector.similarity.cosine($factEmbedding, statement.factEmbedding) AS score + WHERE score >= $threshold + RETURN statement, score + ORDER BY score DESC + `; + + const result = await runQuery(query, { + factEmbedding, + threshold, + excludeIds, + }); + + if (!result || result.length === 0) { + return []; + } + + return result.map((record) => { + const statement = record.get("statement").properties; + const score = record.get("score"); + + return { + uuid: statement.uuid, + fact: statement.fact, + factEmbedding: statement.factEmbedding, + createdAt: new Date(statement.createdAt), + validAt: new Date(statement.validAt), + invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null, + attributes: statement.attributesJson + ? JSON.parse(statement.attributesJson) + : {}, + userId: statement.userId, + }; + }); +} + +export async function getTripleForStatement({ + statementId, +}: { + statementId: string; +}): Promise { + const query = ` + MATCH (statement:Statement {uuid: $statementId}) + MATCH (subject:Entity)<-[:HAS_SUBJECT]-(statement) + MATCH (predicate:Entity)<-[:HAS_PREDICATE]-(statement) + MATCH (object:Entity)<-[:HAS_OBJECT]-(statement) + OPTIONAL MATCH (episode:Episode)-[:HAS_PROVENANCE]->(statement) + RETURN statement, subject, predicate, object, episode + `; + + const result = await runQuery(query, { statementId }); + + if (!result || result.length === 0) { + return null; + } + + const record = result[0]; + + const statementProps = record.get("statement").properties; + const subjectProps = record.get("subject").properties; + const predicateProps = record.get("predicate").properties; + const objectProps = record.get("object").properties; + const episodeProps = record.get("episode")?.properties; + + const statement: StatementNode = { + uuid: statementProps.uuid, + fact: statementProps.fact, + factEmbedding: statementProps.factEmbedding, + createdAt: new Date(statementProps.createdAt), + validAt: new Date(statementProps.validAt), + invalidAt: statementProps.invalidAt + ? new Date(statementProps.invalidAt) + : null, + attributes: statementProps.attributesJson + ? JSON.parse(statementProps.attributesJson) + : {}, + userId: statementProps.userId, + }; + + const subject: EntityNode = { + uuid: subjectProps.uuid, + name: subjectProps.name, + type: subjectProps.type, + nameEmbedding: subjectProps.nameEmbedding, + attributes: subjectProps.attributesJson + ? JSON.parse(subjectProps.attributesJson) + : {}, + createdAt: new Date(subjectProps.createdAt), + userId: subjectProps.userId, + }; + + const predicate: EntityNode = { + uuid: predicateProps.uuid, + name: predicateProps.name, + type: predicateProps.type, + nameEmbedding: predicateProps.nameEmbedding, + attributes: predicateProps.attributesJson + ? JSON.parse(predicateProps.attributesJson) + : {}, + createdAt: new Date(predicateProps.createdAt), + userId: predicateProps.userId, + }; + + const object: EntityNode = { + uuid: objectProps.uuid, + name: objectProps.name, + type: objectProps.type, + nameEmbedding: objectProps.nameEmbedding, + attributes: objectProps.attributesJson + ? JSON.parse(objectProps.attributesJson) + : {}, + createdAt: new Date(objectProps.createdAt), + userId: objectProps.userId, + }; + + // Episode might be null + const provenance: EpisodicNode = { + uuid: episodeProps.uuid, + name: episodeProps.name, + content: episodeProps.content, + source: episodeProps.source, + type: episodeProps.type, + createdAt: new Date(episodeProps.createdAt), + validAt: new Date(episodeProps.validAt), + contentEmbedding: episodeProps.contentEmbedding, + userId: episodeProps.userId, + labels: episodeProps.labels || [], + space: episodeProps.space, + sessionId: episodeProps.sessionId, + }; + + return { + statement, + subject, + predicate, + object, + provenance, + }; +} + +export async function invalidateStatement({ + statementId, +}: { + statementId: string; +}) { + const query = ` + MATCH (statement:Statement {uuid: $statementId}) + SET statement.invalidAt = $invalidAt + RETURN statement + `; + + const result = await runQuery(query, { statementId, invalidAt: new Date() }); + + if (!result || result.length === 0) { + return null; + } + + return result[0].get("statement").properties; +} + +export async function invalidateStatements({ + statementIds, +}: { + statementIds: string[]; +}) { + return statementIds.map( + async (statementId) => await invalidateStatement({ statementId }), + ); +} diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index 270ef5f..3c1d9c1 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -6,97 +6,33 @@ import { type LanguageModelV1, streamText, } from "ai"; -import { EpisodeType, LLMMappings, LLMModelEnum } from "@recall/types"; +import { + entityTypes, + EpisodeType, + LLMMappings, + LLMModelEnum, + type AddEpisodeParams, + type EntityNode, + type EpisodicNode, + type StatementNode, + type Triple, +} from "@recall/types"; import { logger } from "./logger.service"; import crypto from "crypto"; -import { dedupeNodes, extract_message, extract_text } from "./prompts/nodes"; -import { extract_statements } from "./prompts/statements"; - -const HelixDB = await import("helix-ts").then((m) => m.default); - -/** - * Interface for episodic node in the reified knowledge graph - * Episodes are containers for statements and represent source information - */ -export interface EpisodicNode { - uuid?: string; - name: string; - content: string; - contentEmbedding?: number[]; - type: string; - source: string; - createdAt: Date; - validAt: Date; - labels: string[]; - userId: string; - space?: string; - sessionId?: string; -} - -/** - * Interface for entity node in the reified knowledge graph - * Entities represent subjects, objects, or predicates in statements - */ -export interface EntityNode { - uuid: string; - name: string; - type: string; - attributes: Record; - nameEmbedding: number[]; - createdAt: Date; - userId: string; - space?: string; -} - -/** - * Interface for statement node in the reified knowledge graph - * Statements are first-class objects representing facts with temporal properties - */ -export interface StatementNode { - uuid?: string; - fact: string; - factEmbedding: number[]; - createdAt: Date; - validAt: Date; - invalidAt: Date | null; - attributes: Record; - userId: string; - space?: string; -} - -/** - * Interface for a triple in the reified knowledge graph - * A triple connects a subject, predicate, object via a statement node - * and maintains provenance information - */ -export interface Triple { - statement: StatementNode; - subject: EntityNode; - predicate: EntityNode; - object: EntityNode; - provenance: EpisodicNode; -} - -export type AddEpisodeParams = { - name: string; - episodeBody: string; - referenceTime: Date; - type: EpisodeType; - source: string; - userId: string; - spaceId?: string; - sessionId?: string; -}; - -export type AddEpisodeResult = { - episodeUuid: string; - nodesCreated: number; - statementsCreated: number; - processingTimeMs: number; -}; - -// Initialize Helix client -const helixClient = new HelixDB(); +import { dedupeNodes, extractMessage, extractText } from "./prompts/nodes"; +import { + extractStatements, + resolveStatementPrompt, +} from "./prompts/statements"; +import { getRecentEpisodes } from "./graphModels/episode"; +import { findSimilarEntities } from "./graphModels/entity"; +import { + findContradictoryStatements, + findSimilarStatements, + getTripleForStatement, + invalidateStatements, + saveTriple, +} from "./graphModels/statement"; // Default number of previous episodes to retrieve for context const DEFAULT_EPISODE_WINDOW = 5; @@ -112,53 +48,6 @@ export class KnowledgeGraphService { return embedding; } - async retrieveEpisodes( - referenceTime: Date, - episodeWindow: number = DEFAULT_EPISODE_WINDOW, - userId?: string, - type?: EpisodeType, - ): Promise { - try { - // Use the proper HelixDB query for retrieving episodes - const episodes = await helixClient.query("getRecentEpisodes", { - referenceTime: referenceTime.toISOString(), - limit: episodeWindow, - userId: userId || null, - source: type || null, - }); - - if (!episodes || !Array.isArray(episodes)) { - logger.warn( - "Unexpected response from HelixDB for getRecentEpisodes:", - episodes, - ); - return []; - } - - // Map to EpisodicNode interface - return episodes - .map((ep) => ({ - uuid: ep.uuid, - name: ep.name, - content: ep.content, - sourceDescription: ep.sourceDescription, - source: ep.source as EpisodeType, - createdAt: new Date(ep.createdAt), - validAt: new Date(ep.validAt), - entityEdges: ep.entityEdges || [], - userId: ep.userId, - type: ep.type, - labels: ep.labels || [], - space: ep.space, - sessionId: ep.sessionId, - })) - .reverse(); - } catch (error) { - logger.error("Error retrieving episode context:", { error }); - return []; - } - } - /** * Process an episode and update the knowledge graph. * @@ -169,26 +58,24 @@ export class KnowledgeGraphService { const startTime = Date.now(); const now = new Date(); - console.log(params); - try { // Step 1: Context Retrieval - Get previous episodes for context - const previousEpisodes = await this.retrieveEpisodes( - params.referenceTime, - RELEVANT_SCHEMA_LIMIT, - params.userId, - params.type, - ); + const previousEpisodes = await getRecentEpisodes({ + referenceTime: params.referenceTime, + limit: DEFAULT_EPISODE_WINDOW, + userId: params.userId, + source: params.source, + }); // Step 2: Episode Creation - Create or retrieve the episode const episode: EpisodicNode = { uuid: crypto.randomUUID(), name: params.name, content: params.episodeBody, - source: params.source || EpisodeType.Text, - type: params.type, + source: params.source, + type: params.type || EpisodeType.Text, createdAt: now, - validAt: params.referenceTime, + validAt: new Date(params.referenceTime), labels: [], userId: params.userId, space: params.spaceId, @@ -201,49 +88,35 @@ export class KnowledgeGraphService { previousEpisodes, ); - // Step 4: Entity Resolution - Resolve extracted nodes to existing nodes or create new ones - // const { resolvedNodes, uuidMap } = await this.resolveExtractedNodes( - // extractedNodes, - // episode, - // previousEpisodes, - // ); + // Step 4: Statement Extraction - Extract statements (triples) instead of direct edges + const extractedStatements = await this.extractStatements( + episode, + extractedNodes, + previousEpisodes, + ); - // // Step 5: Statement Extraction - Extract statements (triples) instead of direct edges - // const extractedStatements = await this.extractStatements( - // episode, - // resolvedNodes, - // previousEpisodes, - // ); + // Step 5: Entity Resolution - Resolve extracted nodes to existing nodes or create new ones + const resolvedTriples = await this.resolveExtractedNodes( + extractedStatements, + episode, + previousEpisodes, + ); - // // Step 6: Statement Resolution - Resolve statements and detect contradictions - // const { resolvedStatements, invalidatedStatements } = - // await this.resolveStatements( - // extractedStatements, - // episode, - // resolvedNodes, - // ); + // Step 6: Statement Resolution - Resolve statements and detect contradictions + const { resolvedStatements, invalidatedStatements } = + await this.resolveStatements(resolvedTriples, episode); - // // Step 7: Role Assignment & Attribute Extraction - Extract additional attributes for nodes - // const hydratedNodes = await this.extractAttributesFromNodes( - // resolvedNodes, - // episode, - // previousEpisodes, - // ); + // Save triples sequentially to avoid parallel processing issues + for (const triple of resolvedStatements) { + await saveTriple(triple); + } - // Step 8: Generate embeddings for semantic search - // Note: In this implementation, embeddings are generated during extraction - // but could be moved to a separate step for clarity - - // Step 10: Save everything to HelixDB using the reified + temporal structure - // await this.saveToHelixDB( - // episode, - // hydratedNodes, - // resolvedStatements, - // invalidatedStatements, - // ); + // Invalidate invalidated statements + await invalidateStatements({ statementIds: invalidatedStatements }); const endTime = Date.now(); const processingTimeMs = endTime - startTime; + logger.log(`Processing time: ${processingTimeMs} ms`); return { episodeUuid: episode.uuid, @@ -271,14 +144,14 @@ export class KnowledgeGraphService { content: ep.content, createdAt: ep.createdAt.toISOString(), })), - entityTypes: {}, // Could be populated with entity type definitions + entityTypes: entityTypes, }; // Get the extract_json prompt from the prompt library const messages = episode.type === EpisodeType.Conversation - ? extract_message(context) - : extract_text(context); + ? extractMessage(context) + : extractText(context); let responseText = ""; @@ -318,69 +191,32 @@ export class KnowledgeGraphService { } /** - * Resolve extracted nodes to existing nodes or create new ones + * Extract statements as first-class objects from an episode using LLM + * This replaces the previous extractEdges method with a reified approach */ - private async resolveExtractedNodes( - extractedNodes: EntityNode[], + private async extractStatements( episode: EpisodicNode, + extractedEntities: EntityNode[], previousEpisodes: EpisodicNode[], - ) { - const uuidMap = new Map(); - - const existingNodesLists = await Promise.all( - extractedNodes.map(async (extractedNode) => { - // Check if a similar node already exists in HelixDB - // Use vector similarity search to find similar entities - // Threshold is 0.85 - meaning at least 85% similarity (lower cosine distance) - const similarEntities = await helixClient.query("findSimilarEntities", { - queryEmbedding: extractedNode.nameEmbedding, - limit: 5, // Get top 5 matches - threshold: 0.85, // 85% similarity threshold - }); - - return similarEntities.nodes; - }), - ); - - if (!existingNodesLists || existingNodesLists.length === 0) { - extractedNodes.forEach((node) => { - uuidMap.set(node.uuid, node.uuid); - }); - return { resolvedNodes: extractedNodes, uuidMap }; - } - - // Prepare context for LLM - const extractedNodesContext = extractedNodes.map( - (node: EntityNode, i: number) => { - return { - id: i, - name: node.name, - entity_type: node.type, - entity_type_description: "Default Entity Type", - duplication_candidates: existingNodesLists[i].map( - (candidate: EntityNode, j: number) => ({ - idx: j, - name: candidate.name, - entity_types: candidate.type, - ...candidate.attributes, - }), - ), - }; - }, - ); - + ): Promise { + // Use the prompt library to get the appropriate prompts const context = { - extracted_nodes: extractedNodesContext, - episode_content: episode ? episode.content : "", - previous_episodes: previousEpisodes - ? previousEpisodes.map((ep) => ep.content) - : [], + episodeContent: episode.content, + previousEpisodes: previousEpisodes.map((ep) => ({ + content: ep.content, + createdAt: ep.createdAt.toISOString(), + })), + entities: extractedEntities.map((node) => ({ + name: node.name, + type: node.type, + })), + referenceTime: episode.validAt.toISOString(), }; - const messages = dedupeNodes(context); + // Get the statement extraction prompt from the prompt library + const messages = extractStatements(context); let responseText = ""; - await this.makeModelCall( false, LLMModelEnum.GPT41, @@ -393,74 +229,9 @@ export class KnowledgeGraphService { const outputMatch = responseText.match(/([\s\S]*?)<\/output>/); if (outputMatch && outputMatch[1]) { responseText = outputMatch[1].trim(); - const parsedResponse = JSON.parse(responseText); - const nodeResolutions = parsedResponse.entity_resolutions || []; - - // Process each node resolution to either map to an existing node or keep as new - const resolvedNodes = nodeResolutions.map((resolution: any) => { - const resolutionId = resolution.id ?? -1; - const duplicateIdx = resolution.duplicate_idx ?? -1; - const extractedNode = extractedNodes[resolutionId]; - - // If a duplicate was found, use the existing node, otherwise use the extracted node - const resolvedNode = - duplicateIdx >= 0 && - duplicateIdx < existingNodesLists[resolutionId]?.length - ? existingNodesLists[resolutionId][duplicateIdx] - : extractedNode; - - // Update the name if provided in the resolution - if (resolution.name) { - resolvedNode.name = resolution.name; - } - - // Map the extracted UUID to the resolved UUID - uuidMap.set(extractedNode.uuid, resolvedNode.uuid); - - return resolvedNode; - }); - - return { resolvedNodes, uuidMap }; + } else { + responseText = "{}"; } - } - - /** - * Extract statements as first-class objects from an episode using LLM - * This replaces the previous extractEdges method with a reified approach - */ - private async extractStatements( - episode: EpisodicNode, - resolvedNodes: EntityNode[], - previousEpisodes: EpisodicNode[], - ): Promise { - // Use the prompt library to get the appropriate prompts - const context = { - episodeContent: episode.content, - previousEpisodes: previousEpisodes.map((ep) => ({ - content: ep.content, - createdAt: ep.createdAt.toISOString(), - })), - entities: resolvedNodes.map((node) => ({ - name: node.name, - type: node.type, - uuid: node.uuid, - })), - referenceTime: episode.validAt.toISOString(), - }; - - // Get the statement extraction prompt from the prompt library - const messages = extract_statements(context); - - let responseText = ""; - - await this.makeModelCall( - false, - LLMModelEnum.GPT41, - messages as CoreMessage[], - (text) => { - responseText = text; - }, - ); // Parse the statements from the LLM response const extractedTriples = JSON.parse(responseText || "{}").edges || []; @@ -470,24 +241,23 @@ export class KnowledgeGraphService { // Fix: Type 'any'. extractedTriples.map(async (triple: any) => { // Find the subject and object nodes - const subjectNode = resolvedNodes.find( + const subjectNode = extractedEntities.find( (node) => node.name.toLowerCase() === triple.source.toLowerCase(), ); - const objectNode = resolvedNodes.find( + const objectNode = extractedEntities.find( (node) => node.name.toLowerCase() === triple.target.toLowerCase(), ); // Find or create a predicate node for the relationship type - const predicateNode = resolvedNodes.find( - (node) => - node.name.toLowerCase() === triple.relationship.toLowerCase(), + const predicateNode = extractedEntities.find( + (node) => node.name.toLowerCase() === triple.predicate.toLowerCase(), ) || { uuid: crypto.randomUUID(), - name: triple.relationship, + name: triple.predicate, type: "Predicate", attributes: {}, - nameEmbedding: await this.getEmbedding(triple.relationship), + nameEmbedding: await this.getEmbedding(triple.predicate), createdAt: new Date(), userId: episode.userId, }; @@ -521,32 +291,186 @@ export class KnowledgeGraphService { return triples.filter(Boolean) as Triple[]; } - private async resolvePredicateNodes( + /** + * Resolve extracted nodes to existing nodes or create new ones + */ + private async resolveExtractedNodes( triples: Triple[], episode: EpisodicNode, - ) { - const predicateNodes: EntityNode[] = triples.map((triple: Triple) => { - return triple.predicate; + previousEpisodes: EpisodicNode[], + ): Promise { + // Step 1: Extract unique entities from triples + const uniqueEntitiesMap = new Map(); + const entityIdToPositions = new Map< + string, + Array<{ + tripleIndex: number; + position: "subject" | "predicate" | "object"; + }> + >(); + + // First pass: collect all unique entities and their positions in triples + triples.forEach((triple, tripleIndex) => { + // Process subject + if (!uniqueEntitiesMap.has(triple.subject.uuid)) { + uniqueEntitiesMap.set(triple.subject.uuid, triple.subject); + } + if (!entityIdToPositions.has(triple.subject.uuid)) { + entityIdToPositions.set(triple.subject.uuid, []); + } + entityIdToPositions.get(triple.subject.uuid)!.push({ + tripleIndex, + position: "subject", + }); + + // Process predicate + if (!uniqueEntitiesMap.has(triple.predicate.uuid)) { + uniqueEntitiesMap.set(triple.predicate.uuid, triple.predicate); + } + if (!entityIdToPositions.has(triple.predicate.uuid)) { + entityIdToPositions.set(triple.predicate.uuid, []); + } + entityIdToPositions.get(triple.predicate.uuid)!.push({ + tripleIndex, + position: "predicate", + }); + + // Process object + if (!uniqueEntitiesMap.has(triple.object.uuid)) { + uniqueEntitiesMap.set(triple.object.uuid, triple.object); + } + if (!entityIdToPositions.has(triple.object.uuid)) { + entityIdToPositions.set(triple.object.uuid, []); + } + entityIdToPositions.get(triple.object.uuid)!.push({ + tripleIndex, + position: "object", + }); }); - if (predicateNodes.length === 0) { - return; - } + // Convert to arrays for processing + const uniqueEntities = Array.from(uniqueEntitiesMap.values()); - const existingNodesLists = await Promise.all( - predicateNodes.map(async (predicateNode) => { - // Check if a similar node already exists in HelixDB - // Use vector similarity search to find similar entities - // Threshold is 0.85 - meaning at least 85% similarity (lower cosine distance) - const similarEntities = await helixClient.query("findSimilarEntities", { - queryEmbedding: predicateNode.nameEmbedding, - limit: 5, // Get top 5 matches - threshold: 0.85, // 85% similarity threshold + // Step 2: Find similar entities for each unique entity + const similarEntitiesResults = await Promise.all( + uniqueEntities.map(async (entity) => { + const similarEntities = await findSimilarEntities({ + queryEmbedding: entity.nameEmbedding, + limit: 5, + threshold: 0.85, }); - - return similarEntities.nodes; + return { + entity, + similarEntities, + }; }), ); + + // If no similar entities found for any entity, return original triples + if (similarEntitiesResults.length === 0) { + return triples; + } + + // Step 3: Prepare context for LLM deduplication + const dedupeContext = { + extracted_nodes: similarEntitiesResults.map((result, index) => ({ + id: index, + name: result.entity.name, + entity_type: result.entity.type, + duplication_candidates: result.similarEntities.map((candidate, j) => ({ + idx: j, + name: candidate.name, + entity_types: candidate.type, + })), + })), + episode_content: episode ? episode.content : "", + previous_episodes: previousEpisodes + ? previousEpisodes.map((ep) => ep.content) + : [], + }; + + // Step 4: Call LLM to resolve duplicates + const messages = dedupeNodes(dedupeContext); + let responseText = ""; + + await this.makeModelCall( + false, + LLMModelEnum.GPT41, + messages as CoreMessage[], + (text) => { + responseText = text; + }, + ); + + // Step 5: Process LLM response + const outputMatch = responseText.match(/([\s\S]*?)<\/output>/); + if (!outputMatch || !outputMatch[1]) { + return triples; // Return original if parsing fails + } + + try { + responseText = outputMatch[1].trim(); + const parsedResponse = JSON.parse(responseText); + const nodeResolutions = parsedResponse.entity_resolutions || []; + + // Step 6: Create mapping from original entity UUID to resolved entity + const entityResolutionMap = new Map(); + + nodeResolutions.forEach((resolution: any, index: number) => { + const originalEntity = uniqueEntities[resolution.id ?? index]; + if (!originalEntity) return; + + const duplicateIdx = resolution.duplicate_idx ?? -1; + + // Get the corresponding result from similarEntitiesResults + const resultEntry = similarEntitiesResults.find( + (result) => result.entity.uuid === originalEntity.uuid, + ); + + if (!resultEntry) return; + + // If a duplicate was found, use that entity, otherwise keep original + const resolvedEntity = + duplicateIdx >= 0 && duplicateIdx < resultEntry.similarEntities.length + ? resultEntry.similarEntities[duplicateIdx] + : originalEntity; + + // Update name if provided + if (resolution.name) { + resolvedEntity.name = resolution.name; + } + + // Map original UUID to resolved entity + entityResolutionMap.set(originalEntity.uuid, resolvedEntity); + }); + + // Step 7: Reconstruct triples with resolved entities + const resolvedTriples = triples.map((triple) => { + const newTriple = { ...triple }; + + // Replace subject if resolved + if (entityResolutionMap.has(triple.subject.uuid)) { + newTriple.subject = entityResolutionMap.get(triple.subject.uuid)!; + } + + // Replace predicate if resolved + if (entityResolutionMap.has(triple.predicate.uuid)) { + newTriple.predicate = entityResolutionMap.get(triple.predicate.uuid)!; + } + + // Replace object if resolved + if (entityResolutionMap.has(triple.object.uuid)) { + newTriple.object = entityResolutionMap.get(triple.object.uuid)!; + } + + return newTriple; + }); + + return resolvedTriples; + } catch (error) { + console.error("Error processing entity resolutions:", error); + return triples; // Return original triples on error + } } /** @@ -556,76 +480,179 @@ export class KnowledgeGraphService { private async resolveStatements( triples: Triple[], episode: EpisodicNode, - nodes: EntityNode[], ): Promise<{ resolvedStatements: Triple[]; - invalidatedStatements: Triple[]; + invalidatedStatements: string[]; }> { const resolvedStatements: Triple[] = []; - const invalidatedStatements: Triple[] = []; + const invalidatedStatements: string[] = []; + + if (triples.length === 0) { + return { resolvedStatements, invalidatedStatements }; + } + + // Step 1: Collect all potential matches for all triples at once + const allPotentialMatches: Map = new Map(); + const allExistingTripleData: Map = new Map(); + + // For preparing the LLM context + const newStatements: any[] = []; + const similarStatements: any[] = []; for (const triple of triples) { - // Find similar existing statements in HelixDB using the findContradictoryStatements query - const existingStatements = await helixClient.query( - "findContradictoryStatements", - { - subjectId: triple.subject.uuid, - predicateId: triple.predicate.uuid, - }, - ); + // Track IDs of statements we've already checked to avoid duplicates + const checkedStatementIds: string[] = []; + let potentialMatches: StatementNode[] = []; - if (existingStatements && existingStatements.length > 0) { - // If we have statements with the same subject and predicate, - // check if they have different objects (contradiction) + // Phase 1: Find statements with exact subject-predicate match + const exactMatches = await findContradictoryStatements({ + subjectId: triple.subject.uuid, + predicateId: triple.predicate.uuid, + }); - // Get full triple information for the existing statement - const existingTripleData = await helixClient.query( - "getTripleForStatement", - { - statementId: existingStatements[0].uuid, - }, + if (exactMatches && exactMatches.length > 0) { + potentialMatches.push(...exactMatches); + checkedStatementIds.push(...exactMatches.map((s) => s.uuid)); + } + + // Phase 2: Find semantically similar statements + const semanticMatches = await findSimilarStatements({ + factEmbedding: triple.statement.factEmbedding, + threshold: 0.85, + excludeIds: checkedStatementIds, + }); + + if (semanticMatches && semanticMatches.length > 0) { + potentialMatches.push(...semanticMatches); + } + + if (potentialMatches.length > 0) { + logger.info( + `Found ${potentialMatches.length} potential matches for: ${triple.statement.fact}`, ); + allPotentialMatches.set(triple.statement.uuid, potentialMatches); - if ( - existingTripleData && - existingTripleData.object.uuid !== triple.object.uuid - ) { - // This is potentially a contradiction - objects differ for same subject+predicate + // Get full triple information for each potential match + for (const match of potentialMatches) { + if (!allExistingTripleData.has(match.uuid)) { + const existingTripleData = await getTripleForStatement({ + statementId: match.uuid, + }); - // Use LLM to determine if this is truly a contradiction - const isContradiction = await this.detectContradiction( - triple.statement.fact, - existingTripleData.statement.fact, + if (existingTripleData) { + allExistingTripleData.set(match.uuid, existingTripleData); + + // Add to similarStatements for LLM context + similarStatements.push({ + statementId: match.uuid, + fact: existingTripleData.statement.fact, + subject: existingTripleData.subject.name, + predicate: existingTripleData.predicate.name, + object: existingTripleData.object.name, + }); + } + } + } + } + + // Add to newStatements for LLM context + newStatements.push({ + statement: { + uuid: triple.statement.uuid, + fact: triple.statement.fact, + }, + subject: triple.subject.name, + predicate: triple.predicate.name, + object: triple.object.name, + }); + } + + // Step 2: If we have potential matches, use the LLM to analyze them in batch + if (similarStatements.length > 0) { + // Prepare context for the LLM + const promptContext = { + newStatements, + similarStatements, + episodeContent: episode.content, + referenceTime: episode.validAt.toISOString(), + }; + + // Get the statement resolution prompt + const messages = resolveStatementPrompt(promptContext); + + let responseText = ""; + + // Call the LLM to analyze all statements at once + await this.makeModelCall(false, LLMModelEnum.GPT41, messages, (text) => { + responseText = text; + }); + + try { + // Extract the JSON response from the output tags + const jsonMatch = responseText.match(/([\s\S]*?)<\/output>/); + const analysisResult = jsonMatch ? JSON.parse(jsonMatch[1]) : []; + + // Process the analysis results + for (const result of analysisResult) { + const tripleIndex = triples.findIndex( + (t) => t.statement.uuid === result.statementId, ); + if (tripleIndex === -1) continue; - if (isContradiction) { - // Create a copy of the existing statement as invalidated - const invalidatedStatement: Triple = { - statement: { - ...existingTripleData.statement, - invalidAt: episode.validAt, // Mark as invalid at this episode's time - }, - subject: existingTripleData.subject, - predicate: existingTripleData.predicate, - object: existingTripleData.object, - provenance: existingTripleData.provenance, - }; + const triple = triples[tripleIndex]; - invalidatedStatements.push(invalidatedStatement); + // Handle duplicates + if (result.isDuplicate && result.duplicateId) { + const duplicateTriple = allExistingTripleData.get( + result.duplicateId, + ); + if (duplicateTriple) { + logger.info(`Statement is a duplicate: ${triple.statement.fact}`); + resolvedStatements.push(duplicateTriple); + continue; + } + } - // Add the new statement as a replacement - resolvedStatements.push(triple); - } else { - // Not a contradiction, just add the new statement + // Handle contradictions + if (result.contradictions && result.contradictions.length > 0) { + for (const contradictionId of result.contradictions) { + const contradictedTriple = + allExistingTripleData.get(contradictionId); + if (contradictedTriple) { + invalidatedStatements.push(contradictedTriple.statement.uuid); + } + } + } + + // Add the new statement if it's not a duplicate + if (!result.isDuplicate) { + logger.info(`Adding new statement: ${triple.statement.fact}`); resolvedStatements.push(triple); } - } else { - // Same triple already exists, no need to create a duplicate - // We could merge additional metadata or update provenance information - resolvedStatements.push(triple); } - } else { - // This is a new statement, add it as is + } catch (e) { + logger.error("Error processing batch analysis:", { error: e }); + + // Fallback: add all statements as new if we couldn't process the analysis + for (const triple of triples) { + if ( + !resolvedStatements.some( + (s) => s.statement.uuid === triple.statement.uuid, + ) + ) { + logger.info( + `Fallback: Adding statement as new: ${triple.statement.fact}`, + ); + resolvedStatements.push(triple); + } + } + } + } else { + // No potential matches found for any statements, add them all as new + for (const triple of triples) { + logger.info( + `No matches found, adding as new: ${triple.statement.fact}`, + ); resolvedStatements.push(triple); } } @@ -633,185 +660,6 @@ export class KnowledgeGraphService { return { resolvedStatements, invalidatedStatements }; } - /** - * Detect if a new statement contradicts an existing statement - * This supports the reified + temporal knowledge graph approach by detecting - * statement-level contradictions rather than edge-level contradictions - */ - private async detectContradiction( - newFact: string, - existingFact: string, - context?: { subject?: string; predicate?: string }, - ): Promise { - // Use the prompt library to get the appropriate prompts - const promptContext = { - newFact, - existingFact, - subject: context?.subject || null, - predicate: context?.predicate || null, - }; - - // Get the detect_contradiction prompt from the prompt library - // The prompt should be updated to handle reified statements specifically - - // promptLibrary.detectContradiction.detect_json.call(promptContext); - - let responseText = ""; - - await this.makeModelCall(false, LLMModelEnum.GPT41, [], (text) => { - responseText = text; - }); - - try { - const result = JSON.parse(responseText); - - // If we have a well-formed response with temporal information, use it - if ( - result.temporalAnalysis && - typeof result.temporalAnalysis === "object" - ) { - // Check if the statements contradict based on temporal validity - // This is important for the reified + temporal approach - if (result.temporalAnalysis.areCompatible === false) { - return true; // This is a contradiction - } - } - - // Fall back to the direct contradiction flag if temporal analysis isn't available - return result.isContradiction === true; - } catch (e) { - // Fallback to simple text parsing if JSON parsing fails - return ( - responseText.toLowerCase().includes("true") || - responseText.toLowerCase().includes("contradiction") - ); - } - } - - /** - * Extract additional attributes for nodes - */ - private async extractAttributesFromNodes( - nodes: EntityNode[], - episode: EpisodicNode, - previousEpisodes: EpisodicNode[], - ): Promise { - // This could involve LLM to extract more attributes for each node - // For simplicity, we'll just return the nodes as is - return nodes; - } - - // buildEpisodicEdges method removed as part of the reified knowledge graph refactoring. - // In the reified model, episodes connect to entities through Statement nodes and HasProvenance edges. - - /** - * Save all entities and statements to HelixDB using reified structure - * Creates statements and HasSubject, HasObject, HasPredicate, HasProvenance edges - */ - private async saveToHelixDB( - episode: EpisodicNode, - nodes: EntityNode[], - resolvedStatements: Triple[], - invalidatedStatements: Triple[], - ): Promise { - try { - // 1. Save the episode first - await helixClient.query("saveEpisode", { - uuid: episode.uuid, - name: episode.name, - content: episode.content, - source: episode.source, - // sourceDescription: episode.sourceDescription, - userId: episode.userId || null, - labels: episode.labels || [], - createdAt: episode.createdAt.toISOString(), - validAt: episode.validAt.toISOString(), - embedding: [], // Embedding could be added here if needed - }); - - // 2. Save or update all entity nodes - for (const node of nodes) { - await helixClient.query("saveEntity", { - uuid: node.uuid, - name: node.name, - summary: node.type, // Using type as summary - userId: node.userId || null, - createdAt: node.createdAt.toISOString(), - attributesJson: JSON.stringify(node.attributes || {}), - embedding: node.nameEmbedding || [], - }); - } - - // 3. Process all resolved statements - for (const triple of resolvedStatements) { - // Save the statement node first - await helixClient.query("saveStatement", { - uuid: triple.statement.uuid, - fact: triple.statement.fact, - // groupId: triple.statement.groupId, - userId: triple.statement.userId || null, - createdAt: triple.statement.createdAt.toISOString(), - validAt: triple.statement.validAt.toISOString(), - invalidAt: triple.statement.invalidAt - ? triple.statement.invalidAt.toISOString() - : null, - // attributesJson: triple.statement.attributesJson, - // embedding: triple.statement.embedding || [], - }); - - // Create HasSubject edge - await helixClient.query("createHasSubjectEdge", { - uuid: crypto.randomUUID(), - statementId: triple.statement.uuid, - entityId: triple.subject.uuid, - createdAt: new Date().toISOString(), - }); - - // Create HasObject edge - await helixClient.query("createHasObjectEdge", { - uuid: crypto.randomUUID(), - statementId: triple.statement.uuid, - entityId: triple.object.uuid, - createdAt: new Date().toISOString(), - }); - - // Create HasPredicate edge - await helixClient.query("createHasPredicateEdge", { - uuid: crypto.randomUUID(), - statementId: triple.statement.uuid, - entityId: triple.predicate.uuid, - createdAt: new Date().toISOString(), - }); - - // Create HasProvenance edge to link the statement to its source episode - await helixClient.query("createHasProvenanceEdge", { - uuid: crypto.randomUUID(), - statementId: triple.statement.uuid, - episodeId: episode.uuid, - createdAt: new Date().toISOString(), - }); - } - - // 4. Handle invalidated statements (update them with new invalidAt time) - for (const triple of invalidatedStatements) { - await helixClient.query("saveStatement", { - uuid: triple.statement.uuid, - fact: triple.statement.fact, - // groupId: triple.statement.groupId, - userId: triple.statement.userId || null, - createdAt: triple.statement.createdAt.toISOString(), - validAt: triple.statement.validAt.toISOString(), - // invalidAt: triple.statement.invalidAt.toISOString(), // This will be the episode.validAt timestamp - // attributesJson: triple.statement.attributesJson, - // embedding: triple.statement.embedding || [], - }); - } - } catch (error) { - console.error("Error saving to HelixDB:", error); - throw error; - } - } - private async makeModelCall( stream: boolean, model: LLMModelEnum, diff --git a/apps/webapp/app/services/prompts/contradiction.ts b/apps/webapp/app/services/prompts/contradiction.ts deleted file mode 100644 index 46e4a18..0000000 --- a/apps/webapp/app/services/prompts/contradiction.ts +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Prompts for detecting contradictions between facts - */ - -import { type CoreMessage } from "ai"; - -export interface ContradictionResult { - isContradiction: boolean; - explanation?: string; - resolution?: string; -} - -/** - * Detect contradictions between facts - */ -export const detect = (context: Record): CoreMessage[] => { - return [ - { - role: "system", - content: - "You are an expert at detecting contradictions between facts in a knowledge graph.", - }, - { - role: "user", - content: ` - -${JSON.stringify(context.existingFact, null, 2)} - - - -${JSON.stringify(context.newFact, null, 2)} - - -Determine if the NEW FACT contradicts the EXISTING FACT. A contradiction occurs when: -1. Both facts cannot be simultaneously true -2. The facts present mutually exclusive information about the same entities and relationship - -Respond with a JSON object containing: -- "isContradiction": boolean (true if contradiction exists) -- "explanation": string (brief explanation of the contradiction if one exists) -- "resolution": string (suggested resolution approach, if applicable) - -Be careful to consider: -- Temporal context (facts may be true at different times) -- Different levels of specificity (one fact may be more detailed) -- Different perspectives or interpretations -`, - }, - ]; -}; diff --git a/apps/webapp/app/services/prompts/extractEdges.ts b/apps/webapp/app/services/prompts/extractEdges.ts deleted file mode 100644 index 2067ef6..0000000 --- a/apps/webapp/app/services/prompts/extractEdges.ts +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Prompts for extracting relationships between entities - */ - -import { Message, type CoreMessage } from "ai"; - -export interface Edge { - relationshipType: string; - sourceEntityName: string; - targetEntityName: string; - fact: string; - validAt?: string; - invalidAt?: string; -} - -export interface ExtractedEdges { - edges: Edge[]; -} - -export interface MissingFacts { - missingFacts: string[]; -} - -/** - * Extract relationships between entities - */ -export const edge = (context: Record): CoreMessage[] => { - return [ - { - role: "system", - content: - "You are an expert fact extractor that extracts fact triples from text. " + - "1. Extracted fact triples should also be extracted with relevant date information." + - "2. Treat the CURRENT TIME as the time the CURRENT EPISODE was created. All temporal information should be extracted relative to this time.", - }, - { - role: "user", - content: ` - -${JSON.stringify(context.previousEpisodes || [], null, 2)} - - - -${context.episodeContent} - - - -${JSON.stringify(context.nodes || [], null, 2)} - - - -${context.referenceTime} # ISO 8601 (UTC); used to resolve relative time mentions - - - -${JSON.stringify(context.edgeTypes || {}, null, 2)} - - -# TASK -Extract all factual relationships between the given ENTITIES based on the CURRENT EPISODE. -Only extract facts that: -- involve two DISTINCT ENTITIES from the ENTITIES list, -- are clearly stated or unambiguously implied in the CURRENT EPISODE, - and can be represented as edges in a knowledge graph. -- The RELATIONSHIP TYPES provide a list of the most important types of relationships, make sure to extract any facts that - could be classified into one of the provided relationship types - -You may use information from the PREVIOUS EPISODES only to disambiguate references or support continuity. - -${context.customPrompt || ""} - -# EXTRACTION RULES - -1. Only emit facts where both the subject and object match entities in ENTITIES. -2. Each fact must involve two **distinct** entities. -3. Use a SCREAMING_SNAKE_CASE string as the \`relationshipType\` (e.g., FOUNDED, WORKS_AT). -4. Do not emit duplicate or semantically redundant facts. -5. The \`fact\` should quote or closely paraphrase the original source sentence(s). -6. Use \`REFERENCE_TIME\` to resolve vague or relative temporal expressions (e.g., "last week"). -7. Do **not** hallucinate or infer temporal bounds from unrelated events. - -# DATETIME RULES - -- Use ISO 8601 with "Z" suffix (UTC) (e.g., 2025-04-30T00:00:00Z). -- If the fact is ongoing (present tense), set \`validAt\` to REFERENCE_TIME. -- If a change/termination is expressed, set \`invalidAt\` to the relevant timestamp. -- Leave both fields \`null\` if no explicit or resolvable time is stated. -- If only a date is mentioned (no time), assume 00:00:00. -- If only a year is mentioned, use January 1st at 00:00:00. - -Respond with a JSON object containing an "edges" array of objects, each with "relationshipType", "sourceEntityName", "targetEntityName", "fact", and optionally "validAt" and "invalidAt" properties. -`, - }, - ]; -}; - -/** - * Check for missed facts - */ -export const reflexion = (context: Record): CoreMessage[] => { - const sysPrompt = `You are an AI assistant that determines which facts have not been extracted from the given context`; - - const userPrompt = ` - -${JSON.stringify(context.previousEpisodes || [], null, 2)} - - - -${context.episodeContent} - - - -${JSON.stringify(context.nodes || [], null, 2)} - - - -${JSON.stringify(context.extractedFacts || [], null, 2)} - - -Given the above EPISODES, list of EXTRACTED ENTITIES entities, and list of EXTRACTED FACTS; -determine if any facts haven't been extracted. Respond with a JSON object containing a "missingFacts" array of strings. -`; - - return [ - { role: "system", content: sysPrompt }, - { role: "user", content: userPrompt }, - ]; -}; - -/** - * Extract additional attributes for edges - */ -export const extract_attributes = ( - context: Record, -): CoreMessage[] => { - return [ - { - role: "system", - content: - "You are a helpful assistant that extracts fact properties from the provided text.", - }, - { - role: "user", - content: ` - -${JSON.stringify(context.episodeContent, null, 2)} - - - -${context.referenceTime} - - -Given the above EPISODE, its REFERENCE TIME, and the following FACT, update any of its attributes based on the information provided -in EPISODE. Use the provided attribute descriptions to better understand how each attribute should be determined. - -Guidelines: -1. Do not hallucinate entity property values if they cannot be found in the current context. -2. Only use the provided EPISODES and FACT to set attribute values. - - -${JSON.stringify(context.fact, null, 2)} - -`, - }, - ]; -}; diff --git a/apps/webapp/app/services/prompts/index.ts b/apps/webapp/app/services/prompts/index.ts index 9ea72aa..0fc6732 100644 --- a/apps/webapp/app/services/prompts/index.ts +++ b/apps/webapp/app/services/prompts/index.ts @@ -1,8 +1,2 @@ -/** - * Exports for the prompts module - */ - -// Export types from individual prompt modules -export { type ExtractedEntity, type ExtractedEntities } from "./nodes"; -export { type Edge, type ExtractedEdges } from "./extractEdges"; -export { type ContradictionResult } from "./contradiction"; +export * from "./nodes"; +export * from "./statements"; diff --git a/apps/webapp/app/services/prompts/models.ts b/apps/webapp/app/services/prompts/models.ts deleted file mode 100644 index 01854e9..0000000 --- a/apps/webapp/app/services/prompts/models.ts +++ /dev/null @@ -1,14 +0,0 @@ -/** - * Models for prompt system - */ - -export interface Message { - role: 'system' | 'user' | 'assistant'; - content: string; -} - -export type PromptFunction = (context: Record) => Message[]; - -export interface PromptVersion { - [version: string]: (context: Record) => Message[]; -} diff --git a/apps/webapp/app/services/prompts/nodes.ts b/apps/webapp/app/services/prompts/nodes.ts index be7be15..f0aa792 100644 --- a/apps/webapp/app/services/prompts/nodes.ts +++ b/apps/webapp/app/services/prompts/nodes.ts @@ -31,9 +31,7 @@ export interface EntityClassification { /** * Extract entities from an episode using message-based approach */ -export const extract_message = ( - context: Record, -): CoreMessage[] => { +export const extractMessage = (context: Record): CoreMessage[] => { const sysPrompt = `You are an AI assistant that extracts entity nodes from conversational messages for a reified knowledge graph. Your primary task is to extract and classify significant entities mentioned in the conversation. @@ -82,7 +80,13 @@ ${JSON.stringify(context.previousEpisodes || [], null, 2)} ${context.episodeContent} -`; + + + +${JSON.stringify(context.entityTypes || {}, null, 2)} + + +`; return [ { role: "system", content: sysPrompt }, @@ -93,15 +97,15 @@ ${context.episodeContent} /** * Extract entities from text-based content */ -export const extract_text = (context: Record): CoreMessage[] => { +export const extractText = (context: Record): CoreMessage[] => { const sysPrompt = ` - You are an AI assistant that extracts entity nodes from text for a reified knowledge graph. +You are an AI assistant that extracts entity nodes from text for a reified knowledge graph. Your primary task is to extract and classify significant entities mentioned in the provided text. In a reified knowledge graph, we need to identify subject and object entities that will be connected through statements. Focus on extracting: -1. Subject entities (people, objects, concepts) -2. Object entities (people, objects, concepts) +1. Subject entities +2. Object entities Instructions: @@ -119,7 +123,7 @@ You are given a TEXT. Your task is to extract **entity nodes** mentioned **expli - Do NOT extract dates, times, or other temporal information—these will be handled separately. 4. **Formatting**: - - Be **explicit and unambiguous** in naming entities (e.g., use full names when available). + - Be **explicit and unambiguous** when naming entities (e.g., use full names when available). Format your response as a JSON object with the following structure: @@ -138,6 +142,10 @@ Format your response as a JSON object with the following structure: ${context.episodeContent} + + +${JSON.stringify(context.entityTypes || {}, null, 2)} + `; return [ @@ -149,7 +157,7 @@ ${context.episodeContent} /** * Extract entities from an episode using JSON-based approach */ -export const extract_json = (context: Record): CoreMessage[] => { +export const extractJson = (context: Record): CoreMessage[] => { const sysPrompt = `You are an AI assistant that extracts entity nodes from text. Your primary task is to extract and classify significant entities mentioned in the content.`; @@ -186,72 +194,6 @@ ${context.customPrompt || ""} ]; }; -/** - * Check for missed entities - */ -export const reflexion = (context: Record): CoreMessage[] => { - const sysPrompt = `You are an AI assistant that determines which entities have not been extracted from the given context`; - - const userPrompt = ` - -${JSON.stringify(context.previousEpisodes || [], null, 2)} - - - -${context.episodeContent} - - - -${JSON.stringify(context.extractedEntities || [], null, 2)} - - -Given the above previous episodes, current episode, and list of extracted entities; determine if any entities haven't been -extracted. Respond with a JSON object containing a "missedEntities" array of strings. -`; - - return [ - { role: "system", content: sysPrompt }, - { role: "user", content: userPrompt }, - ]; -}; - -/** - * Extract additional attributes for entities - */ -export const extract_attributes = ( - context: Record, -): CoreMessage[] => { - return [ - { - role: "system", - content: - "You are a helpful assistant that extracts entity properties from the provided text.", - }, - { - role: "user", - content: ` - -${JSON.stringify(context.previousEpisodes || [], null, 2)} -${JSON.stringify(context.episodeContent, null, 2)} - - -Given the above EPISODES and the following ENTITY, update any of its attributes based on the information provided -in EPISODES. Use the provided attribute descriptions to better understand how each attribute should be determined. - -Guidelines: -1. Do not hallucinate entity property values if they cannot be found in the current context. -2. Only use the provided EPISODES and ENTITY to set attribute values. -3. The summary attribute represents a summary of the ENTITY, and should be updated with new information about the Entity from the EPISODES. - Summaries must be no longer than 250 words. - - -${JSON.stringify(context.node, null, 2)} - -`, - }, - ]; -}; - /** * Resolve entity duplications */ diff --git a/apps/webapp/app/services/prompts/statements.ts b/apps/webapp/app/services/prompts/statements.ts index 7331ca4..1101c88 100644 --- a/apps/webapp/app/services/prompts/statements.ts +++ b/apps/webapp/app/services/prompts/statements.ts @@ -1,12 +1,12 @@ +import { type Triple } from "@recall/types"; import { type CoreMessage } from "ai"; -import { type Triple } from "../knowledgeGraph.server"; /** * Extract statements (triples) from episode content in a reified knowledge graph model * This function generates a prompt for LLM to extract subject-predicate-object statements * and represent them as first-class nodes with proper connections */ -export const extract_statements = ( +export const extractStatements = ( context: Record, ): CoreMessage[] => { return [ @@ -105,76 +105,12 @@ ${JSON.stringify(context.entities, null, 2)} ]; }; -/** - * Detect contradictions between statements in the knowledge graph - */ -export const detect_contradictions = ( - context: Record, -): CoreMessage[] => { - return [ - { - role: "system", - content: - "You are a knowledge graph reasoning expert that identifies contradictions between statements. " + - "Your task is to analyze pairs of statements and determine if they contradict each other " + - "based on their temporal validity and factual content.", - }, - { - role: "user", - content: ` -I need to detect contradictions between statements in a temporal knowledge graph. - - -${context.newStatement} - - - -${JSON.stringify(context.existingStatements, null, 2)} - - - -${context.referenceTime} - - -Determine if the NEW STATEMENT contradicts any of the EXISTING STATEMENTS. -A contradiction occurs when: - -1. Two statements assert incompatible facts about the same subject-predicate pair -2. The statements overlap in their temporal validity periods - -For example, if one statement says "John works at Company A from January 2023" and another says -"John works at Company B from March 2023", these would contradict if a person can only work at one -company at a time. - -Format your response as a JSON object with the following structure: -{ - "hasContradiction": true/false, - "contradictedStatements": [ - { - "statementId": "[ID of the contradicted statement]", - "reason": "[Explanation of why these statements contradict]", - "temporalRelationship": "[overlapping/containing/contained/after/before]" - } - ] -} - -Important guidelines: -- Consider the temporal validity of statements -- Only mark as contradictions if statements are truly incompatible -- Provide clear reasoning for each identified contradiction -- Consider the context and domain constraints -- If no contradictions exist, return an empty contradictedStatements array -`, - }, - ]; -}; - /** * Analyze similar statements to determine duplications and contradictions * This prompt helps the LLM evaluate semantically similar statements found through vector search * to determine if they are duplicates or contradictions */ -export const resolve_statements = ( +export const resolveStatementPrompt = ( context: Record, ): CoreMessage[] => { return [ diff --git a/apps/webapp/app/services/search.server.ts b/apps/webapp/app/services/search.server.ts new file mode 100644 index 0000000..790d79a --- /dev/null +++ b/apps/webapp/app/services/search.server.ts @@ -0,0 +1,378 @@ +import { + type EntityNode, + type KnowledgeGraphService, + type StatementNode, +} from "./knowledgeGraph.server"; +import { openai } from "@ai-sdk/openai"; +import { embed } from "ai"; +import HelixDB from "helix-ts"; + +// Initialize OpenAI for embeddings +const openaiClient = openai("gpt-4.1-2025-04-14"); + +// Initialize Helix client +const helixClient = new HelixDB(); + +/** + * SearchService provides methods to search the reified + temporal knowledge graph + * using a hybrid approach combining BM25, vector similarity, and BFS traversal. + */ +export class SearchService { + private knowledgeGraphService: KnowledgeGraphService; + + constructor(knowledgeGraphService: KnowledgeGraphService) { + this.knowledgeGraphService = knowledgeGraphService; + } + + async getEmbedding(text: string) { + const { embedding } = await embed({ + model: openai.embedding("text-embedding-3-small"), + value: text, + }); + + return embedding; + } + + /** + * Search the knowledge graph using a hybrid approach + * @param query The search query + * @param userId The user ID for personalization + * @param options Search options + * @returns Array of relevant statements + */ + public async search( + query: string, + userId: string, + options: SearchOptions = {}, + ): Promise { + // Default options + const opts: Required = { + limit: options.limit || 10, + maxBfsDepth: options.maxBfsDepth || 4, + validAt: options.validAt || new Date(), + includeInvalidated: options.includeInvalidated || false, + entityTypes: options.entityTypes || [], + predicateTypes: options.predicateTypes || [], + }; + + // 1. Run parallel search methods + const [bm25Results, vectorResults, bfsResults] = await Promise.all([ + this.performBM25Search(query, userId, opts), + this.performVectorSearch(query, userId, opts), + this.performBfsSearch(query, userId, opts), + ]); + + // 2. Combine and deduplicate results + const combinedStatements = this.combineAndDeduplicate([ + ...bm25Results, + ...vectorResults, + ...bfsResults, + ]); + + // 3. Rerank the combined results + const rerankedStatements = await this.rerankStatements( + query, + combinedStatements, + opts, + ); + + // 4. Return top results + return rerankedStatements.slice(0, opts.limit); + } + + /** + * Perform BM25 keyword-based search on statements + */ + private async performBM25Search( + query: string, + userId: string, + options: Required, + ): Promise { + // TODO: Implement BM25 search using HelixDB or external search index + // This is a placeholder implementation + try { + const results = await helixClient.query("searchStatementsByKeywords", { + query, + userId, + validAt: options.validAt.toISOString(), + includeInvalidated: options.includeInvalidated, + limit: options.limit * 2, // Fetch more for reranking + }); + + return results.statements || []; + } catch (error) { + console.error("BM25 search error:", error); + return []; + } + } + + /** + * Perform vector similarity search on statement embeddings + */ + private async performVectorSearch( + query: string, + userId: string, + options: Required, + ): Promise { + try { + // 1. Generate embedding for the query + const embedding = await this.generateEmbedding(query); + + // 2. Search for similar statements + const results = await helixClient.query("searchStatementsByVector", { + embedding, + userId, + validAt: options.validAt.toISOString(), + includeInvalidated: options.includeInvalidated, + limit: options.limit * 2, // Fetch more for reranking + }); + + return results.statements || []; + } catch (error) { + console.error("Vector search error:", error); + return []; + } + } + + /** + * Perform BFS traversal starting from entities mentioned in the query + */ + private async performBfsSearch( + query: string, + userId: string, + options: Required, + ): Promise { + try { + // 1. Extract potential entities from query + const entities = await this.extractEntitiesFromQuery(query); + + // 2. For each entity, perform BFS traversal + const allStatements: StatementNode[] = []; + + for (const entity of entities) { + const statements = await this.bfsTraversal( + entity.uuid, + options.maxBfsDepth, + options.validAt, + userId, + options.includeInvalidated, + ); + allStatements.push(...statements); + } + + return allStatements; + } catch (error) { + console.error("BFS search error:", error); + return []; + } + } + + /** + * Perform BFS traversal starting from an entity + */ + private async bfsTraversal( + startEntityId: string, + maxDepth: number, + validAt: Date, + userId: string, + includeInvalidated: boolean, + ): Promise { + // Track visited nodes to avoid cycles + const visited = new Set(); + // Track statements found during traversal + const statements: StatementNode[] = []; + // Queue for BFS traversal [nodeId, depth] + const queue: [string, number][] = [[startEntityId, 0]]; + + while (queue.length > 0) { + const [nodeId, depth] = queue.shift()!; + + // Skip if already visited or max depth reached + if (visited.has(nodeId) || depth > maxDepth) continue; + visited.add(nodeId); + + // Get statements where this entity is subject or object + const connectedStatements = await helixClient.query( + "getConnectedStatements", + { + entityId: nodeId, + userId, + validAt: validAt.toISOString(), + includeInvalidated, + }, + ); + + // Add statements to results + if (connectedStatements.statements) { + statements.push(...connectedStatements.statements); + + // Add connected entities to queue + for (const statement of connectedStatements.statements) { + // Get subject and object entities + if (statement.subjectId && !visited.has(statement.subjectId)) { + queue.push([statement.subjectId, depth + 1]); + } + if (statement.objectId && !visited.has(statement.objectId)) { + queue.push([statement.objectId, depth + 1]); + } + } + } + } + + return statements; + } + + /** + * Extract potential entities from a query using embeddings or LLM + */ + private async extractEntitiesFromQuery(query: string): Promise { + // TODO: Implement more sophisticated entity extraction + // This is a placeholder implementation that uses simple vector search + try { + const embedding = await this.getEmbedding(query); + + const results = await helixClient.query("searchEntitiesByVector", { + embedding, + limit: 3, // Start with top 3 entities + }); + + return results.entities || []; + } catch (error) { + console.error("Entity extraction error:", error); + return []; + } + } + + /** + * Combine and deduplicate statements from multiple sources + */ + private combineAndDeduplicate(statements: StatementNode[]): StatementNode[] { + const uniqueStatements = new Map(); + + for (const statement of statements) { + if (!uniqueStatements.has(statement.uuid)) { + uniqueStatements.set(statement.uuid, statement); + } + } + + return Array.from(uniqueStatements.values()); + } + + /** + * Rerank statements based on relevance to the query + */ + private async rerankStatements( + query: string, + statements: StatementNode[], + options: Required, + ): Promise { + // TODO: Implement more sophisticated reranking + // This is a placeholder implementation using cosine similarity + try { + // 1. Generate embedding for the query + const queryEmbedding = await this.getEmbedding(query); + + // 2. Generate or retrieve embeddings for statements + const statementEmbeddings = await Promise.all( + statements.map(async (statement) => { + // If statement has embedding, use it; otherwise generate + if (statement.factEmbedding && statement.factEmbedding.length > 0) { + return { statement, embedding: statement.factEmbedding }; + } + + // Generate text representation of statement + const statementText = this.statementToText(statement); + const embedding = await this.getEmbedding(statementText); + + return { statement, embedding }; + }), + ); + + // 3. Calculate cosine similarity + const scoredStatements = statementEmbeddings.map( + ({ statement, embedding }) => { + const similarity = this.cosineSimilarity(queryEmbedding, embedding); + return { statement, score: similarity }; + }, + ); + + // 4. Sort by score (descending) + scoredStatements.sort((a, b) => b.score - a.score); + + // 5. Return statements in order of relevance + return scoredStatements.map(({ statement }) => statement); + } catch (error) { + console.error("Reranking error:", error); + // Fallback: return original order + return statements; + } + } + + /** + * Convert a statement to a text representation + */ + private statementToText(statement: StatementNode): string { + // TODO: Implement more sophisticated text representation + // This is a placeholder implementation + return `${statement.subjectName || "Unknown"} ${statement.predicateName || "has relation with"} ${statement.objectName || "Unknown"}`; + } + + /** + * Calculate cosine similarity between two embeddings + */ + private cosineSimilarity(a: number[], b: number[]): number { + if (a.length !== b.length) { + throw new Error("Embeddings must have the same length"); + } + + let dotProduct = 0; + let normA = 0; + let normB = 0; + + for (let i = 0; i < a.length; i++) { + dotProduct += a[i] * b[i]; + normA += a[i] * a[i]; + normB += b[i] * b[i]; + } + + normA = Math.sqrt(normA); + normB = Math.sqrt(normB); + + if (normA === 0 || normB === 0) { + return 0; + } + + return dotProduct / (normA * normB); + } +} + +/** + * Search options interface + */ +export interface SearchOptions { + limit?: number; + maxBfsDepth?: number; + validAt?: Date; + includeInvalidated?: boolean; + entityTypes?: string[]; + predicateTypes?: string[]; +} + +/** + * Create a singleton instance of the search service + */ +let searchServiceInstance: SearchService | null = null; + +export function getSearchService( + knowledgeGraphService?: KnowledgeGraphService, +): SearchService { + if (!searchServiceInstance) { + if (!knowledgeGraphService) { + throw new Error( + "KnowledgeGraphService must be provided when initializing SearchService", + ); + } + searchServiceInstance = new SearchService(knowledgeGraphService); + } + return searchServiceInstance; +} diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 32cc196..363b8a4 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -12,6 +12,7 @@ }, "dependencies": { "@ai-sdk/openai": "^1.3.21", + "@coji/remix-auth-google": "^4.2.0", "@opentelemetry/api": "1.9.0", "@radix-ui/react-slot": "^1.2.3", "@recall/database": "workspace:*", @@ -23,29 +24,28 @@ "@remix-run/serve": "2.16.7", "@remix-run/server-runtime": "2.16.7", "@remix-run/v1-meta": "^0.1.3", + "@remixicon/react": "^4.2.0", "@tailwindcss/container-queries": "^0.1.1", "@tailwindcss/postcss": "^4.1.7", "ai": "4.3.14", + "bullmq": "^5.53.2", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "compression": "^1.7.4", "cross-env": "^7.0.3", "express": "^4.18.1", - "helix-ts": "^1.0.4", + "ioredis": "^5.6.1", "isbot": "^4.1.0", + "jose": "^5.2.3", "lucide-react": "^0.511.0", "morgan": "^1.10.0", "nanoid": "3.3.8", - "jose": "^5.2.3", - "bullmq": "^5.53.2", - "ioredis": "^5.6.1", + "neo4j-driver": "^5.28.1", "non.geist": "^1.0.2", "posthog-js": "^1.116.6", "react": "^18.2.0", "react-dom": "^18.2.0", - "@remixicon/react": "^4.2.0", "remix-auth": "^4.2.0", - "@coji/remix-auth-google": "^4.2.0", "remix-auth-oauth2": "^3.4.1", "remix-themes": "^1.3.1", "remix-typedjson": "0.3.1", diff --git a/helixdb-cfg/config.hx.json b/helixdb-cfg/config.hx.json deleted file mode 100644 index 3ae481f..0000000 --- a/helixdb-cfg/config.hx.json +++ /dev/null @@ -1,12 +0,0 @@ - -{ - "vector_config": { - "m": 16, - "ef_construction": 128, - "ef_search": 768 - }, - "graph_config": { - "secondary_indices": [] - }, - "db_max_size_gb": 10 -} diff --git a/helixdb-cfg/queries.hx b/helixdb-cfg/queries.hx deleted file mode 100644 index 58c5464..0000000 --- a/helixdb-cfg/queries.hx +++ /dev/null @@ -1,17 +0,0 @@ -// Save an episode to the database -QUERY saveEpisode(name: String, content: String, source: String, - userId: String, - createdAt: I64, space: String, episodeType: String, - sessionId: String, validAt: I64, embedding: [F64]) => - episode <- AddV(embedding, { - name: name, - content: content, - source: source, - userId: userId, - createdAt: createdAt, - space: space, - sessionId: sessionId, - episodeType: episodeType, - validAt: validAt - }) - RETURN episode diff --git a/helixdb-cfg/schema.hx b/helixdb-cfg/schema.hx deleted file mode 100644 index dec5c6c..0000000 --- a/helixdb-cfg/schema.hx +++ /dev/null @@ -1,78 +0,0 @@ -// Knowledge Graph Schema: Combines reified relationships with temporal graph memory -// This schema implements a hybrid approach that allows for: -// 1. Representing facts as first-class entities (reification) -// 2. Tracking temporal validity of information -// 3. Maintaining provenance (where information came from) -// 4. Supporting direct entity-to-entity relationships for performance - - -V::Episode { - name: String, - content: String, - source: String, - episodeType: String, - userId: String, - createdAt: I64, - validAt: I64, - labels: [String], - space: String, - sessionId: String -} - -V::Entity { - name: String, - summary: String, - entityType: String, - createdAt: Date, - attributes: String, - userId: String, - space: String -} - -// Statement node is the core of reification - turning facts into first-class objects -// This allows tracking validity periods, provenance, and treating facts as objects themselves -V::Statement { - fact: String, - createdAt: Date, - validAt: Date, - invalidAt: Date, - attributes: String, - userId: String, - space: String -} - -// Subject of the statement (the entity the statement is about) -E::HasSubject { - From: Statement, - To: Entity, - Properties: { - createdAt: Date - } -} - -// Object of the statement (the entity that receives the action or is related to) -E::HasObject { - From: Statement, - To: Entity, - Properties: { - createdAt: Date - } -} - -// Predicate of the statement (the relationship type or verb) -E::HasPredicate { - From: Statement, - To: Entity, - Properties: { - createdAt: Date - } -} - -// Provenance connection - links a statement to its source episode -E::HasProvenance { - From: Statement, - To: Episode, - Properties: { - createdAt: Date - } -} \ No newline at end of file diff --git a/packages/types/src/graph/graph.entity.ts b/packages/types/src/graph/graph.entity.ts index 055fbf2..ec6d624 100644 --- a/packages/types/src/graph/graph.entity.ts +++ b/packages/types/src/graph/graph.entity.ts @@ -3,55 +3,150 @@ export enum EpisodeType { Text = "TEXT", } -export interface AddEpisodeParams { - name: string; - episodeBody: string; - sourceDescription: string; - referenceTime: Date; - source?: EpisodeType; - userId?: string; - uuid?: string; -} - -export interface AddEpisodeResult { - episodeUuid: string; - nodesCreated: number; - edgesCreated: number; - processingTimeMs: number; -} - -export interface EntityNode { - uuid: string; - name: string; - type: string; - attributes?: Record; - nameEmbedding?: number[]; - createdAt: Date; - userId?: string; -} - -export interface EntityEdge { - uuid: string; - source: string; // source node uuid - target: string; // target node uuid - relationship: string; - fact: string; - factEmbedding?: number[]; - validAt: Date; - invalidAt?: Date; - isValid: boolean; - episodes: string[]; // episode uuids where this edge was mentioned - userId?: string; -} - +/** + * Interface for episodic node in the reified knowledge graph + * Episodes are containers for statements and represent source information + */ export interface EpisodicNode { uuid: string; name: string; content: string; - sourceDescription: string; - source: EpisodeType; + contentEmbedding?: number[]; + type: string; + source: string; createdAt: Date; validAt: Date; - entityEdges: string[]; // edge uuids - userId?: string; + labels: string[]; + userId: string; + space?: string; + sessionId?: string; } + +/** + * Interface for entity node in the reified knowledge graph + * Entities represent subjects, objects, or predicates in statements + */ +export interface EntityNode { + uuid: string; + name: string; + type: string; + attributes: Record; + nameEmbedding: number[]; + createdAt: Date; + userId: string; + space?: string; +} + +/** + * Interface for statement node in the reified knowledge graph + * Statements are first-class objects representing facts with temporal properties + */ +export interface StatementNode { + uuid: string; + fact: string; + factEmbedding: number[]; + createdAt: Date; + validAt: Date; + invalidAt: Date | null; + attributes: Record; + userId: string; + space?: string; +} + +/** + * Interface for a triple in the reified knowledge graph + * A triple connects a subject, predicate, object via a statement node + * and maintains provenance information + */ +export interface Triple { + statement: StatementNode; + subject: EntityNode; + predicate: EntityNode; + object: EntityNode; + provenance: EpisodicNode; +} + +export type AddEpisodeParams = { + name: string; + episodeBody: string; + referenceTime: Date; + type: EpisodeType; + source: string; + userId: string; + spaceId?: string; + sessionId?: string; +}; + +export type AddEpisodeResult = { + episodeUuid: string; + nodesCreated: number; + statementsCreated: number; + processingTimeMs: number; +}; + +export const entityTypes = { + general: { + PERSON: { + type: "PERSON", + description: "Any named individual mentioned or referenced.", + }, + APP: { + type: "APP", + description: "Any third-party service or platform used by the user.", + }, + SOL_AUTOMATION: { + type: "SOL_AUTOMATION", + description: "User workflows or flows combining triggers and actions.", + }, + SOL_PREFERENCE: { + type: "SOL_PREFERENCE", + description: "User-stated intent, setting, or configuration like format, timezone, etc.", + }, + COMMAND: { + type: "COMMAND", + description: "Trigger phrase mapped to an internal action, often starts with / or !", + }, + TASK: { + type: "TASK", + description: "User-stated or inferred goal; may link to Person or App.", + }, + EVENT: { + type: "EVENT", + description: "Time-based mention; supports parsing of phrases like 'next week', 'tomorrow'.", + }, + LABEL: { + type: "LABEL", + description: "Optional categorization tag for organization or filtering.", + }, + OBJECT: { + type: "OBJECT", + description: "Named non-person objects in the user's world (e.g., Projector, Car).", + }, + TEAM: { + type: "TEAM", + description: "User-defined group of people, useful for permissions or targeting.", + }, + }, + app_specific: { + SLACK_CHANNEL: { + type: "SLACK_CHANNEL", + description: "Slack channel where automations or communications happen.", + }, + SLACK_USER: { + type: "SLACK_USER", + description: "A user in Slack, can be tagged or messaged.", + }, + GMAIL_THREAD: { + type: "GMAIL_THREAD", + description: "An email conversation thread in Gmail.", + }, + NOTION_PAGE: { + type: "NOTION_PAGE", + description: "A page in Notion workspace.", + }, + CALENDAR_EVENT: { + type: "CALENDAR_EVENT", + description: "Event from user's calendar (Google, Outlook, etc.).", + }, + }, +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 180d644..3a3fc8e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -99,9 +99,6 @@ importers: express: specifier: ^4.18.1 version: 4.21.2 - helix-ts: - specifier: ^1.0.4 - version: 1.0.4 ioredis: specifier: ^5.6.1 version: 5.6.1 @@ -120,6 +117,9 @@ importers: nanoid: specifier: 3.3.8 version: 3.3.8 + neo4j-driver: + specifier: ^5.28.1 + version: 5.28.1 non.geist: specifier: ^1.0.2 version: 1.0.4 @@ -2165,6 +2165,9 @@ packages: buffer@5.7.1: resolution: {integrity: sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==} + buffer@6.0.3: + resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} + bullmq@5.53.2: resolution: {integrity: sha512-xHgxrP/yNJHD7VCw1h+eRBh+2TCPBCM39uC9gCyksYc6ufcJP+HTZ/A2lzB2x7qMFWrvsX7tM40AT2BmdkYL/Q==} @@ -3347,9 +3350,6 @@ packages: hast-util-whitespace@2.0.1: resolution: {integrity: sha512-nAxA0v8+vXSBDt3AnRUNjyRIQ0rD+ntpbAp4LnPkumc5M9yUbSMa4XDU9Q6etY4f1Wp4bNgvc1yjiZtsTTrSng==} - helix-ts@1.0.4: - resolution: {integrity: sha512-mugPlPyOMOTwEb4Dyl7y38eV5CTCrJhIHsHny5SmVJP3q4F1fcPRfYCpyJPPCpJ2tIJMGQVsPKC5M0p76vcdUg==} - hosted-git-info@2.8.9: resolution: {integrity: sha512-mxIDAb9Lsm6DoOJ7xH+5+X4y1LU/4Hi50L9C5sIswK3JzULS4bwk1FvjdBgvYR4bzT4tuUQiC15FE2f5HbLvYw==} @@ -4207,6 +4207,15 @@ packages: resolution: {integrity: sha512-myRT3DiWPHqho5PrJaIRyaMv2kgYf0mUVgBNOYMuCH5Ki1yEiQaf/ZJuQ62nvpc44wL5WDbTX7yGJi1Neevw8w==} engines: {node: '>= 0.6'} + neo4j-driver-bolt-connection@5.28.1: + resolution: {integrity: sha512-nY8GBhjOW7J0rDtpiyJn6kFdk2OiNVZZhZrO8//mwNXnf5VQJ6HqZQTDthH/9pEaX0Jvbastz1xU7ZL8xzqY0w==} + + neo4j-driver-core@5.28.1: + resolution: {integrity: sha512-14vN8TlxC0JvJYfjWic5PwjsZ38loQLOKFTXwk4fWLTbCk6VhrhubB2Jsy9Rz+gM6PtTor4+6ClBEFDp1q/c8g==} + + neo4j-driver@5.28.1: + resolution: {integrity: sha512-jbyBwyM0a3RLGcP43q3hIxPUPxA+1bE04RovOKdNAS42EtBMVCKcPSeOvWiHxgXp1ZFd0a8XqK+7LtguInOLUg==} + node-abort-controller@3.1.1: resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==} @@ -5008,6 +5017,9 @@ packages: run-parallel@1.2.0: resolution: {integrity: sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==} + rxjs@7.8.2: + resolution: {integrity: sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA==} + sade@1.8.1: resolution: {integrity: sha512-xal3CZX1Xlo/k4ApwCFrHVACi9fBqJ7V+mwhBsuf/1IOKbBy098Fex+Wa/5QMubw09pSZ/u8EY8PWgevJsXp1A==} engines: {node: '>=6'} @@ -7905,6 +7917,11 @@ snapshots: base64-js: 1.5.1 ieee754: 1.2.1 + buffer@6.0.3: + dependencies: + base64-js: 1.5.1 + ieee754: 1.2.1 + bullmq@5.53.2: dependencies: cron-parser: 4.9.0 @@ -9350,8 +9367,6 @@ snapshots: hast-util-whitespace@2.0.1: {} - helix-ts@1.0.4: {} - hosted-git-info@2.8.9: {} hosted-git-info@6.1.3: @@ -10322,6 +10337,20 @@ snapshots: negotiator@0.6.4: {} + neo4j-driver-bolt-connection@5.28.1: + dependencies: + buffer: 6.0.3 + neo4j-driver-core: 5.28.1 + string_decoder: 1.3.0 + + neo4j-driver-core@5.28.1: {} + + neo4j-driver@5.28.1: + dependencies: + neo4j-driver-bolt-connection: 5.28.1 + neo4j-driver-core: 5.28.1 + rxjs: 7.8.2 + node-abort-controller@3.1.1: {} node-emoji@1.11.0: @@ -11103,6 +11132,10 @@ snapshots: dependencies: queue-microtask: 1.2.3 + rxjs@7.8.2: + dependencies: + tslib: 2.8.1 + sade@1.8.1: dependencies: mri: 1.2.0 diff --git a/turbo.json b/turbo.json index 8b16422..363667f 100644 --- a/turbo.json +++ b/turbo.json @@ -3,21 +3,15 @@ "ui": "tui", "tasks": { "build": { - "dependsOn": [ "^build" ], - "inputs": [ "$TURBO_DEFAULT$", ".env*" ], - "outputs": [ - "dist/**", - "public/build/**", - "build/**", - "app/styles/tailwind.css", - ".cache" - ] + "dependsOn": ["^build"], + "inputs": ["$TURBO_DEFAULT$", ".env*"], + "outputs": ["dist/**", "public/build/**", "build/**", "app/styles/tailwind.css", ".cache"] }, "lint": { - "dependsOn": [ "^lint" ] + "dependsOn": ["^lint"] }, "check-types": { - "dependsOn": [ "^check-types" ] + "dependsOn": ["^check-types"] }, "dev": { "cache": false @@ -36,14 +30,10 @@ "cache": false }, "generate": { - "dependsOn": [ - "^generate" - ] + "dependsOn": ["^generate"] } }, - "globalDependencies": [ - ".env" - ], + "globalDependencies": [".env"], "globalEnv": [ "NODE_ENV", "REMIX_APP_PORT", @@ -61,6 +51,10 @@ "ENCRYPTION_KEY", "REDIS_HOST", "REDIS_PORT", - "REDIS_TLS_DISABLED" + "REDIS_TLS_DISABLED", + "NEO4J_URI", + "NEO4J_USERNAME", + "NEO4J_PASSWORD", + "OPENAI_API_KEY" ] }