feat: add memory normalization before ingestion

This commit is contained in:
Manoj K 2025-06-21 12:32:51 +05:30
parent cae5470c70
commit bdf682e5cb
7 changed files with 495 additions and 38 deletions

View File

@ -89,3 +89,33 @@ export async function findSimilarEntities(params: {
};
});
}
// Find exact predicate matches by name
export async function findExactPredicateMatches(params: {
predicateName: string;
userId: string;
}): Promise<EntityNode[]> {
const query = `
MATCH (entity:Entity)
WHERE entity.type = 'Predicate'
AND toLower(entity.name) = toLower($predicateName)
AND entity.userId = $userId
RETURN entity
`;
const result = await runQuery(query, params);
return result.map((record) => {
const entity = record.get("entity").properties;
return {
uuid: entity.uuid,
name: entity.name,
type: entity.type,
attributes: JSON.parse(entity.attributes || "{}"),
nameEmbedding: entity.nameEmbedding,
createdAt: new Date(entity.createdAt),
userId: entity.userId,
space: entity.space,
};
});
}

View File

@ -129,3 +129,52 @@ export async function getRecentEpisodes(params: {
};
});
}
export async function searchEpisodesByEmbedding(params: {
embedding: number[];
userId: string;
limit?: number;
minSimilarity?: number;
}) {
const query = `
MATCH (episode:Episode)
WHERE episode.userId = $userId
AND episode.contentEmbedding IS NOT NULL
WITH episode,
CASE
WHEN size(episode.contentEmbedding) = size($embedding)
THEN vector.similarity.cosine($embedding, episode.contentEmbedding)
ELSE 0
END AS score
WHERE score >= $minSimilarity
RETURN episode, score
ORDER BY score DESC`;
const result = await runQuery(query, {
embedding: params.embedding,
minSimilarity: params.minSimilarity,
userId: params.userId,
});
if (!result || result.length === 0) {
return [];
}
return result.map((record) => {
const episode = record.get("episode").properties;
const score = record.get("score");
return {
uuid: episode.uuid,
content: episode.content,
contentEmbedding: episode.contentEmbedding,
createdAt: new Date(episode.createdAt),
validAt: new Date(episode.validAt),
invalidAt: episode.invalidAt ? new Date(episode.invalidAt) : null,
attributes: episode.attributesJson
? JSON.parse(episode.attributesJson)
: {},
userId: episode.userId,
};
});
}

View File

@ -321,3 +321,53 @@ export async function invalidateStatements({
async (statementId) => await invalidateStatement({ statementId }),
);
}
export async function searchStatementsByEmbedding(params: {
embedding: number[];
userId: string;
limit?: number;
minSimilarity?: number;
}) {
const query = `
MATCH (statement:Statement)
WHERE statement.invalidAt IS NULL
AND statement.factEmbedding IS NOT NULL
WITH statement,
CASE
WHEN size(statement.factEmbedding) = size($embedding)
THEN vector.similarity.cosine($embedding, statement.factEmbedding)
ELSE 0
END AS score
WHERE score >= $minSimilarity
RETURN statement, score
ORDER BY score DESC
`;
const result = await runQuery(query, {
embedding: params.embedding,
minSimilarity: params.minSimilarity,
limit: params.limit,
});
if (!result || result.length === 0) {
return [];
}
return result.map((record) => {
const statement = record.get("statement").properties;
const score = record.get("score");
return {
uuid: statement.uuid,
fact: statement.fact,
factEmbedding: statement.factEmbedding,
createdAt: new Date(statement.createdAt),
validAt: new Date(statement.validAt),
invalidAt: statement.invalidAt ? new Date(statement.invalidAt) : null,
attributes: statement.attributesJson
? JSON.parse(statement.attributesJson)
: {},
userId: statement.userId,
};
});
}

View File

@ -21,14 +21,21 @@ import {
extractStatements,
resolveStatementPrompt,
} from "./prompts/statements";
import { getRecentEpisodes } from "./graphModels/episode";
import { findSimilarEntities } from "./graphModels/entity";
import {
getRecentEpisodes,
searchEpisodesByEmbedding,
} from "./graphModels/episode";
import {
findExactPredicateMatches,
findSimilarEntities,
} from "./graphModels/entity";
import {
findContradictoryStatements,
findSimilarStatements,
getTripleForStatement,
invalidateStatements,
saveTriple,
searchStatementsByEmbedding,
} from "./graphModels/statement";
import { makeModelCall } from "~/lib/model.server";
import { Apps, getNodeTypes, getNodeTypesString } from "~/utils/presets/nodes";
@ -70,13 +77,20 @@ export class KnowledgeGraphService {
const normalizedEpisodeBody = await this.normalizeEpisodeBody(
params.episodeBody,
params.source,
params.userId,
);
if (normalizedEpisodeBody === "NOTHING_TO_REMEMBER") {
logger.log("Nothing to remember");
return;
}
// Step 2: Episode Creation - Create or retrieve the episode
const episode: EpisodicNode = {
uuid: crypto.randomUUID(),
content: normalizedEpisodeBody,
originalContent: params.episodeBody,
contentEmbedding: await this.getEmbedding(normalizedEpisodeBody),
source: params.source,
metadata: params.metadata || {},
createdAt: now,
@ -117,16 +131,16 @@ export class KnowledgeGraphService {
episode,
);
for (const triple of updatedTriples) {
const { subject, predicate, object, statement, provenance } = triple;
const safeTriple = {
subject: { ...subject, nameEmbedding: undefined },
predicate: { ...predicate, nameEmbedding: undefined },
object: { ...object, nameEmbedding: undefined },
statement: { ...statement, factEmbedding: undefined },
provenance,
};
}
// for (const triple of updatedTriples) {
// const { subject, predicate, object, statement, provenance } = triple;
// const safeTriple = {
// subject: { ...subject, nameEmbedding: undefined },
// predicate: { ...predicate, nameEmbedding: undefined },
// object: { ...object, nameEmbedding: undefined },
// statement: { ...statement, factEmbedding: undefined },
// provenance,
// };
// }
// Save triples sequentially to avoid parallel processing issues
for (const triple of updatedTriples) {
@ -265,6 +279,29 @@ export class KnowledgeGraphService {
// Parse the statements from the LLM response
const extractedTriples = JSON.parse(responseText || "{}").edges || [];
// Create maps to deduplicate entities by name within this extraction
const predicateMap = new Map<string, EntityNode>();
// First pass: collect all unique predicates from the current extraction
for (const triple of extractedTriples) {
const predicateName = triple.predicate.toLowerCase();
if (!predicateMap.has(predicateName)) {
// Create new predicate
const newPredicate = {
uuid: crypto.randomUUID(),
name: triple.predicate,
type: "Predicate",
attributes: {},
nameEmbedding: await this.getEmbedding(
`Predicate: ${triple.predicate}`,
),
createdAt: new Date(),
userId: episode.userId,
};
predicateMap.set(predicateName, newPredicate);
}
}
// Convert extracted triples to Triple objects with Statement nodes
const triples = await Promise.all(
// Fix: Type 'any'.
@ -278,20 +315,10 @@ export class KnowledgeGraphService {
(node) => node.name.toLowerCase() === triple.target.toLowerCase(),
);
// Find or create a predicate node for the relationship type
const predicateNode = extractedEntities.find(
(node) => node.name.toLowerCase() === triple.predicate.toLowerCase(),
) || {
uuid: crypto.randomUUID(),
name: triple.predicate,
type: "Predicate",
attributes: {},
nameEmbedding: await this.getEmbedding(triple.predicate),
createdAt: new Date(),
userId: episode.userId,
};
// Get the deduplicated predicate node
const predicateNode = predicateMap.get(triple.predicate.toLowerCase());
if (subjectNode && objectNode) {
if (subjectNode && objectNode && predicateNode) {
// Create a statement node
const statement: StatementNode = {
uuid: crypto.randomUUID(),
@ -380,9 +407,17 @@ export class KnowledgeGraphService {
// Convert to arrays for processing
const uniqueEntities = Array.from(uniqueEntitiesMap.values());
// Step 2: Find similar entities for each unique entity
// Separate predicates from other entities
const predicates = uniqueEntities.filter(
(entity) => entity.type === "Predicate",
);
const nonPredicates = uniqueEntities.filter(
(entity) => entity.type !== "Predicate",
);
// Step 2a: Find similar entities for non-predicate entities
const similarEntitiesResults = await Promise.all(
uniqueEntities.map(async (entity) => {
nonPredicates.map(async (entity) => {
const similarEntities = await findSimilarEntities({
queryEmbedding: entity.nameEmbedding,
limit: 5,
@ -395,14 +430,40 @@ export class KnowledgeGraphService {
}),
);
// Step 2b: Find exact matches for predicates
const exactPredicateResults = await Promise.all(
predicates.map(async (predicate) => {
const exactMatches = await findExactPredicateMatches({
predicateName: predicate.name,
userId: episode.userId,
});
// Filter out the current predicate from matches
const filteredMatches = exactMatches.filter(
(match) => match.uuid !== predicate.uuid,
);
return {
entity: predicate,
similarEntities: filteredMatches, // Use the same structure as similarEntitiesResults
};
}),
);
// Combine the results
const allEntityResults = [
...similarEntitiesResults,
...exactPredicateResults,
];
// If no similar entities found for any entity, return original triples
if (similarEntitiesResults.length === 0) {
if (allEntityResults.length === 0) {
return triples;
}
// Step 3: Prepare context for LLM deduplication
const dedupeContext = {
extracted_nodes: similarEntitiesResults.map((result, index) => ({
extracted_nodes: allEntityResults.map((result, index) => ({
id: index,
name: result.entity.name,
entity_type: result.entity.type,
@ -451,8 +512,8 @@ export class KnowledgeGraphService {
const duplicateIdx = resolution.duplicate_idx ?? -1;
// Get the corresponding result from similarEntitiesResults
const resultEntry = similarEntitiesResults.find(
// Get the corresponding result from allEntityResults
const resultEntry = allEntityResults.find(
(result) => result.entity.uuid === originalEntity.uuid,
);
@ -783,17 +844,23 @@ export class KnowledgeGraphService {
/**
* Normalize an episode by extracting entities and creating nodes and statements
*/
private async normalizeEpisodeBody(episodeBody: string, source: string) {
private async normalizeEpisodeBody(
episodeBody: string,
source: string,
userId: string,
) {
let appEnumValues: Apps[] = [];
if (Apps[source.toUpperCase() as keyof typeof Apps]) {
appEnumValues = [Apps[source.toUpperCase() as keyof typeof Apps]];
}
const entityTypes = getNodeTypesString(appEnumValues);
const relatedMemories = await this.getRelatedMemories(episodeBody, userId);
const context = {
episodeContent: episodeBody,
entityTypes: entityTypes,
source,
relatedMemories,
};
const messages = normalizePrompt(context);
let responseText = "";
@ -804,10 +871,76 @@ export class KnowledgeGraphService {
const outputMatch = responseText.match(/<output>([\s\S]*?)<\/output>/);
if (outputMatch && outputMatch[1]) {
normalizedEpisodeBody = outputMatch[1].trim();
} else {
normalizedEpisodeBody = episodeBody;
}
return normalizedEpisodeBody;
}
/**
* Retrieves related episodes and facts based on semantic similarity to the current episode content.
*
* @param episodeContent The content of the current episode
* @param userId The user ID
* @param source The source of the episode
* @param referenceTime The reference time for the episode
* @returns A string containing formatted related episodes and facts
*/
private async getRelatedMemories(
episodeContent: string,
userId: string,
options: {
episodeLimit?: number;
factLimit?: number;
minSimilarity?: number;
} = {},
): Promise<string> {
try {
// Default configuration values
const episodeLimit = options.episodeLimit ?? 5;
const factLimit = options.factLimit ?? 10;
const minSimilarity = options.minSimilarity ?? 0.75;
// Get embedding for the current episode content
const contentEmbedding = await this.getEmbedding(episodeContent);
// Retrieve semantically similar episodes (excluding very recent ones that are already in context)
const relatedEpisodes = await searchEpisodesByEmbedding({
embedding: contentEmbedding,
userId,
limit: episodeLimit,
minSimilarity,
});
// Retrieve semantically similar facts/statements
const relatedFacts = await searchStatementsByEmbedding({
embedding: contentEmbedding,
userId,
limit: factLimit,
minSimilarity,
});
// Format the related memories for inclusion in the prompt
let formattedMemories = "";
if (relatedEpisodes.length > 0) {
formattedMemories += "## Related Episodes\n";
relatedEpisodes.forEach((episode, index) => {
formattedMemories += `### Episode ${index + 1} (${new Date(episode.validAt).toISOString()})\n`;
formattedMemories += `${episode.content}\n\n`;
});
}
if (relatedFacts.length > 0) {
formattedMemories += "## Related Facts\n";
relatedFacts.forEach((fact, index) => {
formattedMemories += `- ${fact.fact}\n`;
});
}
return formattedMemories.trim();
} catch (error) {
console.error("Error retrieving related memories:", error);
return "";
}
}
}

View File

@ -22,15 +22,15 @@ You are given a conversation context and a CURRENT EPISODE. Your task is to extr
1. **Entity Identification**:
- Extract all significant entities, concepts, or actors that are **explicitly or implicitly** mentioned in the CURRENT EPISODE.
- **Exclude** entities mentioned only in the PREVIOUS EPISODES (they are for context only).
- For identity statements like "I am X" or "I'm X", extract BOTH the pronoun ("I") as a Alias entity AND the named entity (X).
- For pronouns that refer to named entities, extract them as separate Alias entities.
2. **Entity Classification**:
- CRITICAL: You MUST ONLY use entity types provided in the ENTITY_TYPES section.
- Use the descriptions in ENTITY TYPES to classify each extracted entity.
- Assign the appropriate type for each one.
- Classify pronouns (I, me, you, etc.) as Alias entities.
- Classify pronouns (I, me, you, etc.) as "ALIAS" entities.
- DO NOT invent new entity types that are not in the ENTITY_TYPES section.
3. **Exclusions**:
- Do NOT extract entities representing relationships or actions (predicates will be handled separately).

View File

@ -4,7 +4,7 @@ export const normalizePrompt = (
context: Record<string, any>,
): CoreMessage[] => {
const sysPrompt = `
You are a memory extraction system. Your task is to convert input informationsuch as user input, system events, or assistant actionsinto clear, concise, third-person factual statements suitable for storage in a memory graph. These statements should be easily understandable and retrievable by any system or agent.
You are C.O.R.E. (Contextual Observation & Recall Engine), a memory extraction system. Your task is to convert input informationsuch as user input, system events, or assistant actionsinto clear, concise, third-person factual statements suitable for storage in a memory graph. These statements should be easily understandable and retrievable by any system or agent.
## Memory Processing Guidelines
- Always output memory statements in the third person (e.g., "User prefers...", "The assistant performed...", "The system detected...").
@ -12,21 +12,153 @@ You are a memory extraction system. Your task is to convert input information—
- Maintain a neutral, factual tone in all memory entries.
- Structure memories as factual statements, not questions.
- Include relevant context and temporal information when available.
- When ingesting from assistant's perspective, ensure you still capture the complete user-assistant interaction context.
## Complete Conversational Context
- IMPORTANT: Always preserve the complete context of conversations, including BOTH:
- What the user said, asked, or requested
- How the assistant responded or what it suggested
- Any decisions, conclusions, or agreements reached
- Do not focus solely on the assistant's contributions while ignoring user context
- Capture the cause-and-effect relationship between user inputs and assistant responses
- For multi-turn conversations, preserve the logical flow and key points from each turn
- When the user provides information, always record that information directly, not just how the assistant used it
## Node Entity Types
${context.entityTypes}
## Memory Selection Criteria
Evaluate conversations based on these priority categories:
### 1. High Priority (Always Remember)
- **User Preferences**: Explicit likes, dislikes, settings, or preferences
- **Personal Information**: Names, relationships, contact details, important dates
- **Commitments**: Promises, agreements, or obligations made by either party
- **Recurring Patterns**: Regular activities, habits, or routines mentioned
- **Explicit Instructions**: "Remember X" or "Don't forget about Y" statements
- **Important Decisions**: Key choices or conclusions reached
### 2. Medium Priority (Remember if Significant)
- **Task Context**: Background information relevant to ongoing tasks
- **Problem Statements**: Issues or challenges the user is facing
- **Learning & Growth**: Skills being developed, topics being studied
- **Emotional Responses**: Strong reactions to suggestions or information
- **Time-Sensitive Information**: Details that will be relevant for a limited period
### 3. Low Priority (Rarely Remember)
- **Casual Exchanges**: Greetings, acknowledgments, or social pleasantries
- **Clarification Questions**: Questions asked to understand instructions
- **Immediate Task Execution**: Simple commands and their direct execution
- **Repeated Information**: Content already stored in memory
- **Ephemeral Context**: Information only relevant to the current exchange
### 4. Do Not Remember (Forgettable Conversations)
#### Transient Interactions
- **Simple acknowledgments**: "Thanks", "OK", "Got it"
- **Greetings and farewells**: "Hello", "Good morning", "Goodbye", "Talk to you later"
- **Filler conversations**: Small talk about weather with no specific preferences mentioned
- **Routine status updates** without meaningful information: "Still working on it"
#### Redundant Information
- **Repeated requests** for the same information within a short timeframe
- **Clarifications** that don't add new information: "What did you mean by that?"
- **Confirmations** of already established facts: "Yes, as I mentioned earlier..."
- **Information already stored** in memory in the same or similar form
#### Temporary Operational Exchanges
- **System commands** without context: "Open this file", "Run this code"
- **Simple navigational instructions**: "Go back", "Scroll down"
- **Format adjustments**: "Make this bigger", "Change the color"
- **Immediate task execution** without long-term relevance
#### Low-Information Content
- **Vague statements** without specific details: "That looks interesting"
- **Ambiguous questions** that were later clarified in the conversation
- **Incomplete thoughts** that were abandoned or redirected
- **Hypothetical scenarios** that weren't pursued further
#### Technical Noise
- **Error messages** or technical issues that were resolved
- **Connection problems** or temporary disruptions
- **Interface feedback**: "Loading...", "Processing complete"
- **Formatting issues** that were corrected
#### Context-Dependent Ephemera
- **Time-sensitive information** that quickly becomes irrelevant: "I'll be back in 5 minutes"
- **Temporary states**: "I'm currently looking at the document"
- **Attention-directing statements** without content: "Look at this part"
- **Intermediate steps** in a process where only the conclusion matters
### 5. Do Not Remember (Privacy and System Noise)
- **Sensitive Credentials**: Passwords, API keys, tokens, or authentication details
- **Personal Data**: Unless the user explicitly asks to store it
- **System Meta-commentary**: Update notices, version information, system status messages
- **Debug Information**: Logs, error traces, or diagnostic information
- **QA/Troubleshooting**: Conversations clearly intended for testing or debugging purposes
- **Internal Processing**: Comments about the assistant's own thinking process
## Related Knowledge Integration
- Consider these related episodes when processing new information:
- Look for connections between new information and these existing memories
- Identify patterns, contradictions, or evolving preferences
- Reference related episodes when they provide important context
- Update or refine existing knowledge with new information
## Memory Graph Integration
- Each memory will be converted to a node in the memory graph.
- Include relevant relationships between memory items when possible.
- Specify temporal aspects when memories are time-sensitive.
- Format memories to support efficient retrieval by any system or agent.
## Related Knowledge Integration
- Consider these related episodes and facts when processing new information:
- When related facts or episodes are provided, carefully analyze them for:
- **Connections**: Identify relationships between new information and existing memories
- **Patterns**: Recognize recurring themes, preferences, or behaviors
- **Contradictions**: Note when new information conflicts with existing knowledge
- **Evolution**: Track how user preferences or situations change over time
- **Context**: Use related memories to better understand the significance of new information
- Incorporate relevant context from related memories when appropriate
- Update or refine existing knowledge with new information
- When contradictions exist, note both the old and new information with timestamps
- Use related memories to determine the priority level of new information
- If related memories suggest a topic is important to the user, elevate its priority
## Output Format
When extracting memory-worthy information:
1. If nothing meets the criteria for storage, respond with exactly: "NOTHING_TO_REMEMBER"
2. Otherwise, provide a summary that:
- **Scales with conversation complexity**:
* For simple exchanges with 1-2 key points: Use 1-2 concise sentences
* For moderate complexity with 3-5 key points: Use 3-5 sentences, organizing related information
* For complex conversations with many important details: Use up to 8-10 sentences, structured by topic
- Focuses on facts rather than interpretations
- Uses the third person perspective
- Includes specific details (names, dates, numbers) when relevant
- Avoids unnecessary context or explanation
- Formats key information as attribute-value pairs when appropriate
- Uses bullet points for multiple distinct pieces of information
## Examples of Complete Context Extraction
- INCOMPLETE: "Assistant suggested Italian restaurants in downtown."
- COMPLETE: "User asked for restaurant recommendations in downtown. Assistant suggested three Italian restaurants: Bella Vita, Romano's, and Trattoria Milano."
- INCOMPLETE: "Assistant provided information about Python functions."
- COMPLETE: "User asked how to define functions in Python. Assistant explained the syntax using 'def' keyword and provided an example of a function that calculates the factorial of a number."
When processing new information for memory storage, focus on extracting the core facts, preferences, and events that will be most useful for future reference by any system or agent.
<output>
{{processed_statement}}
</output>
if there is nothing to remember
<output>
NOTHING_TO_REMEMBER
</output>
`;
const userPrompt = `
@ -38,6 +170,10 @@ ${context.episodeContent}
${context.source}
</SOURCE>
<RELATED_MEMORIES>
${context.relatedMemories}
</RELATED_MEMORIES>
`;
return [

59
docker-compose.aws.yaml Normal file
View File

@ -0,0 +1,59 @@
version: "3.8"
services:
core:
container_name: core-app
image: redplanethq/core:${VERSION}
environment:
- NODE_ENV=${NODE_ENV}
- DATABASE_URL=${DATABASE_URL}
- DIRECT_URL=${DIRECT_URL}
- SESSION_SECRET=${SESSION_SECRET}
- ENCRYPTION_KEY=${ENCRYPTION_KEY}
- MAGIC_LINK_SECRET=${MAGIC_LINK_SECRET}
- LOGIN_ORIGIN=${LOGIN_ORIGIN}
- APP_ORIGIN=${APP_ORIGIN}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_TLS_DISABLED=${REDIS_TLS_DISABLED}
- NEO4J_URI=${NEO4J_URI}
- NEO4J_USERNAME=${NEO4J_USERNAME}
- NEO4J_PASSWORD=${NEO4J_PASSWORD}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- AUTH_GOOGLE_CLIENT_ID=${AUTH_GOOGLE_CLIENT_ID}
- AUTH_GOOGLE_CLIENT_SECRET=${AUTH_GOOGLE_CLIENT_SECRET}
- ENABLE_EMAIL_LOGIN=${ENABLE_EMAIL_LOGIN}
ports:
- "3033:3000"
depends_on:
- redis
- neo4j
networks:
- core
redis:
container_name: core-redis
image: redis:7
ports:
- "6379:6379"
networks:
- core
neo4j:
container_name: core-neo4j
image: neo4j:5
environment:
- NEO4J_AUTH=${NEO4J_AUTH}
ports:
- "7474:7474"
- "7687:7687"
volumes:
- type: bind
source: /efs/neo4j
target: /data
networks:
- core
networks:
core:
driver: bridge