Enhance conversation handling and memory management

This commit is contained in:
Manoj K 2025-07-08 15:15:58 +05:30
parent 158d26f7c2
commit 6ef523520d
10 changed files with 212 additions and 61 deletions

5
.gitignore vendored
View File

@ -41,4 +41,7 @@ docker-compose.dev.yaml
clickhouse/ clickhouse/
.vscode/ .vscode/
registry/ registry/
.cursor
CLAUDE.md

View File

@ -2,15 +2,16 @@ import { UserTypeEnum } from "@core/types";
import { auth, runs, tasks } from "@trigger.dev/sdk/v3"; import { auth, runs, tasks } from "@trigger.dev/sdk/v3";
import { prisma } from "~/db.server"; import { prisma } from "~/db.server";
import { getOrCreatePersonalAccessToken } from "./personalAccessToken.server";
import { createConversationTitle } from "~/trigger/conversation/create-conversation-title"; import { createConversationTitle } from "~/trigger/conversation/create-conversation-title";
import { z } from "zod"; import { z } from "zod";
import { type ConversationHistory } from "@prisma/client";
export const CreateConversationSchema = z.object({ export const CreateConversationSchema = z.object({
message: z.string(), message: z.string(),
title: z.string().optional(), title: z.string().optional(),
conversationId: z.string().optional(), conversationId: z.string().optional(),
userType: z.nativeEnum(UserTypeEnum).optional(),
}); });
export type CreateConversationDto = z.infer<typeof CreateConversationSchema>; export type CreateConversationDto = z.infer<typeof CreateConversationSchema>;
@ -22,15 +23,13 @@ export async function createConversation(
conversationData: CreateConversationDto, conversationData: CreateConversationDto,
) { ) {
const { title, conversationId, ...otherData } = conversationData; const { title, conversationId, ...otherData } = conversationData;
// Ensure PAT exists for the user
await getOrCreatePersonalAccessToken({ name: "trigger", userId });
if (conversationId) { if (conversationId) {
// Add a new message to an existing conversation // Add a new message to an existing conversation
const conversationHistory = await prisma.conversationHistory.create({ const conversationHistory = await prisma.conversationHistory.create({
data: { data: {
...otherData, ...otherData,
userType: UserTypeEnum.User, userType: otherData.userType || UserTypeEnum.User,
...(userId && { ...(userId && {
user: { user: {
connect: { id: userId }, connect: { id: userId },
@ -45,12 +44,13 @@ export async function createConversation(
}, },
}); });
// No context logic here const context = await getConversationContext(conversationHistory.id);
const handler = await tasks.trigger( const handler = await tasks.trigger(
"chat", "chat",
{ {
conversationHistoryId: conversationHistory.id, conversationHistoryId: conversationHistory.id,
conversationId: conversationHistory.conversation.id, conversationId: conversationHistory.conversation.id,
context,
}, },
{ tags: [conversationHistory.id, workspaceId, conversationId] }, { tags: [conversationHistory.id, workspaceId, conversationId] },
); );
@ -73,7 +73,7 @@ export async function createConversation(
ConversationHistory: { ConversationHistory: {
create: { create: {
userId, userId,
userType: UserTypeEnum.User, userType: otherData.userType || UserTypeEnum.User,
...otherData, ...otherData,
}, },
}, },
@ -84,6 +84,7 @@ export async function createConversation(
}); });
const conversationHistory = conversation.ConversationHistory[0]; const conversationHistory = conversation.ConversationHistory[0];
const context = await getConversationContext(conversationHistory.id);
// Trigger conversation title task // Trigger conversation title task
await tasks.trigger<typeof createConversationTitle>( await tasks.trigger<typeof createConversationTitle>(
@ -100,6 +101,7 @@ export async function createConversation(
{ {
conversationHistoryId: conversationHistory.id, conversationHistoryId: conversationHistory.id,
conversationId: conversation.id, conversationId: conversation.id,
context,
}, },
{ tags: [conversationHistory.id, workspaceId, conversation.id] }, { tags: [conversationHistory.id, workspaceId, conversation.id] },
); );
@ -226,3 +228,42 @@ export async function stopConversation(
return await runs.cancel(run.id); return await runs.cancel(run.id);
} }
export async function getConversationContext(
conversationHistoryId: string,
): Promise<{
previousHistory: ConversationHistory[];
}> {
const conversationHistory = await prisma.conversationHistory.findUnique({
where: { id: conversationHistoryId },
include: { conversation: true },
});
if (!conversationHistory) {
return {
previousHistory: [],
};
}
// Get previous conversation history message and response
let previousHistory: ConversationHistory[] = [];
if (conversationHistory.conversationId) {
previousHistory = await prisma.conversationHistory.findMany({
where: {
conversationId: conversationHistory.conversationId,
id: {
not: conversationHistoryId,
},
deleted: null,
},
orderBy: {
createdAt: "asc",
},
});
}
return {
previousHistory,
};
}

View File

@ -45,8 +45,7 @@ import { createOllama } from "ollama-ai-provider";
const DEFAULT_EPISODE_WINDOW = 5; const DEFAULT_EPISODE_WINDOW = 5;
export class KnowledgeGraphService { export class KnowledgeGraphService {
async getEmbedding(text: string, useOpenAI = true) { async getEmbedding(text: string, useOpenAI = false) {
console.log(text, useOpenAI);
if (useOpenAI) { if (useOpenAI) {
// Use OpenAI embedding model when explicitly requested // Use OpenAI embedding model when explicitly requested
const { embedding } = await embed({ const { embedding } = await embed({

View File

@ -65,15 +65,19 @@ const searchMemoryTool = tool({
properties: { properties: {
query: { query: {
type: "string", type: "string",
description: "The search query to find relevant information in memory", description: "The search query in third person perspective",
}, },
spaceId: { validAt: {
type: "string", type: "string",
description: "Optional space ID to search within a specific space", description: "The valid at time in ISO format",
}, },
sessionId: { startTime: {
type: "string", type: "string",
description: "Optional session ID to search within a specific session", description: "The start time in ISO format",
},
endTime: {
type: "string",
description: "The end time in ISO format",
}, },
}, },
required: ["query"], required: ["query"],
@ -86,34 +90,12 @@ const addMemoryTool = tool({
parameters: jsonSchema({ parameters: jsonSchema({
type: "object", type: "object",
properties: { properties: {
episodeBody: { message: {
type: "string", type: "string",
description: "The content/text to add to memory", description: "The content/text to add to memory",
}, },
referenceTime: {
type: "string",
description:
"ISO 8601 timestamp for when this information is relevant (defaults to current time)",
},
source: {
type: "string",
description:
"Source of the information (e.g., 'user', 'chat', 'system')",
},
spaceId: {
type: "string",
description: "Optional space ID to add memory to a specific space",
},
sessionId: {
type: "string",
description: "Optional session ID to associate with a specific session",
},
metadata: {
type: "object",
description: "Optional metadata object for additional context",
},
}, },
required: ["episodeBody"], required: ["message"],
additionalProperties: false, additionalProperties: false,
}), }),
}); });

View File

@ -8,6 +8,7 @@ import { MCP } from "../utils/mcp";
import { type HistoryStep } from "../utils/types"; import { type HistoryStep } from "../utils/types";
import { import {
createConversationHistoryForAgent, createConversationHistoryForAgent,
deletePersonalAccessToken,
getPreviousExecutionHistory, getPreviousExecutionHistory,
init, init,
type RunChatPayload, type RunChatPayload,
@ -54,6 +55,7 @@ export const chat = task({
}, },
workpsaceId: init?.conversation.workspaceId, workpsaceId: init?.conversation.workspaceId,
resources: otherData.resources, resources: otherData.resources,
todayDate: new Date().toISOString(),
}; };
// Extract user's goal from conversation history // Extract user's goal from conversation history
@ -123,8 +125,15 @@ export const chat = task({
// init.preferences, // init.preferences,
// init.userName, // init.userName,
// ); // );
if (init?.tokenId) {
await deletePersonalAccessToken(init.tokenId);
}
} catch (e) { } catch (e) {
await updateConversationStatus("failed", payload.conversationId); await updateConversationStatus("failed", payload.conversationId);
if (init?.tokenId) {
await deletePersonalAccessToken(init.tokenId);
}
throw new Error(e as string); throw new Error(e as string);
} }
}, },

View File

@ -4,12 +4,13 @@ import axios from "axios";
// Memory API functions using axios interceptor // Memory API functions using axios interceptor
export interface SearchMemoryParams { export interface SearchMemoryParams {
query: string; query: string;
spaceId?: string; validAt?: string;
sessionId?: string; startTime?: string;
endTime?: string;
} }
export interface AddMemoryParams { export interface AddMemoryParams {
episodeBody: string; message: string;
referenceTime?: string; referenceTime?: string;
source?: string; source?: string;
spaceId?: string; spaceId?: string;
@ -32,8 +33,9 @@ export const addMemory = async (params: AddMemoryParams) => {
// Set defaults for required fields // Set defaults for required fields
const memoryInput = { const memoryInput = {
...params, ...params,
episodeBody: params.message,
referenceTime: params.referenceTime || new Date().toISOString(), referenceTime: params.referenceTime || new Date().toISOString(),
source: params.source || "chat", source: params.source || "CORE",
}; };
const response = await axios.post( const response = await axios.post(

View File

@ -139,11 +139,12 @@ export async function* generate(
const anthropicKey = process.env.ANTHROPIC_API_KEY; const anthropicKey = process.env.ANTHROPIC_API_KEY;
const googleKey = process.env.GOOGLE_GENERATIVE_AI_API_KEY; const googleKey = process.env.GOOGLE_GENERATIVE_AI_API_KEY;
const openaiKey = process.env.OPENAI_API_KEY; const openaiKey = process.env.OPENAI_API_KEY;
const ollamaUrl = process.env.OLLAMA_URL; let ollamaUrl = process.env.OLLAMA_URL;
model = model || process.env.MODEL; model = model || process.env.MODEL;
let modelInstance; let modelInstance;
let modelTemperature = Number(process.env.MODEL_TEMPERATURE) || 1; let modelTemperature = Number(process.env.MODEL_TEMPERATURE) || 1;
ollamaUrl = undefined;
// First check if Ollama URL exists and use Ollama // First check if Ollama URL exists and use Ollama
if (ollamaUrl) { if (ollamaUrl) {

View File

@ -14,9 +14,109 @@ import { type CoreMessage } from "ai";
import { type HistoryStep } from "./types"; import { type HistoryStep } from "./types";
import axios from "axios"; import axios from "axios";
import nodeCrypto from "node:crypto";
import { customAlphabet, nanoid } from "nanoid";
const prisma = new PrismaClient(); const prisma = new PrismaClient();
// Token generation utilities
const tokenValueLength = 40;
const tokenGenerator = customAlphabet(
"123456789abcdefghijkmnopqrstuvwxyz",
tokenValueLength,
);
const tokenPrefix = "rc_pat_";
type CreatePersonalAccessTokenOptions = {
name: string;
userId: string;
};
// Helper functions for token management
function createToken() {
return `${tokenPrefix}${tokenGenerator()}`;
}
function obfuscateToken(token: string) {
const withoutPrefix = token.replace(tokenPrefix, "");
const obfuscated = `${withoutPrefix.slice(0, 4)}${"•".repeat(18)}${withoutPrefix.slice(-4)}`;
return `${tokenPrefix}${obfuscated}`;
}
function encryptToken(value: string) {
const encryptionKey = process.env.ENCRYPTION_KEY;
if (!encryptionKey) {
throw new Error("ENCRYPTION_KEY environment variable is required");
}
const nonce = nodeCrypto.randomBytes(12);
const cipher = nodeCrypto.createCipheriv("aes-256-gcm", encryptionKey, nonce);
let encrypted = cipher.update(value, "utf8", "hex");
encrypted += cipher.final("hex");
const tag = cipher.getAuthTag().toString("hex");
return {
nonce: nonce.toString("hex"),
ciphertext: encrypted,
tag,
};
}
function hashToken(token: string): string {
const hash = nodeCrypto.createHash("sha256");
hash.update(token);
return hash.digest("hex");
}
export async function getOrCreatePersonalAccessToken({
name,
userId,
}: CreatePersonalAccessTokenOptions) {
// Try to find an existing, non-revoked token
const existing = await prisma.personalAccessToken.findFirst({
where: {
name,
userId,
revokedAt: null,
},
});
if (existing) {
// Do not return the unencrypted token if it already exists
return {
id: existing.id,
name: existing.name,
userId: existing.userId,
obfuscatedToken: existing.obfuscatedToken,
// token is not returned
};
}
// Create a new token
const token = createToken();
const encryptedToken = encryptToken(token);
const personalAccessToken = await prisma.personalAccessToken.create({
data: {
name,
userId,
encryptedToken,
obfuscatedToken: obfuscateToken(token),
hashedToken: hashToken(token),
},
});
return {
id: personalAccessToken.id,
name,
userId,
token,
obfuscatedToken: personalAccessToken.obfuscatedToken,
};
}
export interface InitChatPayload { export interface InitChatPayload {
conversationId: string; conversationId: string;
conversationHistoryId: string; conversationHistoryId: string;
@ -61,8 +161,10 @@ export const init = async (payload: InitChatPayload) => {
return { conversation, conversationHistory }; return { conversation, conversationHistory };
} }
const pat = await prisma.personalAccessToken.findFirst({ const randomKeyName = `chat_${nanoid(10)}`;
where: { userId: workspace.userId as string, name: "default" }, const pat = await getOrCreatePersonalAccessToken({
name: randomKeyName,
userId: workspace.userId as string,
}); });
const user = await prisma.user.findFirst({ const user = await prisma.user.findFirst({
@ -76,6 +178,21 @@ export const init = async (payload: InitChatPayload) => {
include: { integrationDefinition: true }, include: { integrationDefinition: true },
}); });
// Set up axios interceptor for memory operations
axios.interceptors.request.use((config) => {
if (config.url?.startsWith("https://core::memory")) {
// Handle both search and ingest endpoints
if (config.url.includes("/search")) {
config.url = `${process.env.API_BASE_URL}/search`;
} else if (config.url.includes("/ingest")) {
config.url = `${process.env.API_BASE_URL}/ingest`;
}
config.headers.Authorization = `Bearer ${pat.token}`;
}
return config;
});
// Create MCP server configurations for each integration account // Create MCP server configurations for each integration account
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
const integrationMCPServers: Record<string, any> = {}; const integrationMCPServers: Record<string, any> = {};
@ -136,20 +253,6 @@ export const init = async (payload: InitChatPayload) => {
integrationMCPServers[account.integrationDefinition.slug] = integrationMCPServers[account.integrationDefinition.slug] =
configuredMCP; configuredMCP;
} }
axios.interceptors.request.use((config) => {
if (config.url?.startsWith("https://core::memory")) {
// Handle both search and ingest endpoints
if (config.url.includes("/search")) {
config.url = `${process.env.API_BASE_URL}/search`;
} else if (config.url.includes("/ingest")) {
config.url = `${process.env.API_BASE_URL}/ingest`;
}
config.headers.Authorization = `Bearer ${payload.pat}`;
}
return config;
});
} catch (error) { } catch (error) {
logger.error( logger.error(
`Failed to configure MCP for ${account.integrationDefinition?.slug}:`, `Failed to configure MCP for ${account.integrationDefinition?.slug}:`,
@ -161,7 +264,8 @@ export const init = async (payload: InitChatPayload) => {
return { return {
conversation, conversation,
conversationHistory, conversationHistory,
token: pat?.obfuscatedToken, tokenId: pat.id,
token: pat.token,
userId: user?.id, userId: user?.id,
userName: user?.name, userName: user?.name,
}; };
@ -430,3 +534,11 @@ export async function getContinuationAgentConversationHistory(
take: 1, take: 1,
}); });
} }
export async function deletePersonalAccessToken(tokenId: string) {
return await prisma.personalAccessToken.delete({
where: {
id: tokenId,
},
});
}

View File

@ -11,7 +11,7 @@ export default defineConfig({
retries: { retries: {
enabledInDev: true, enabledInDev: true,
default: { default: {
maxAttempts: 3, maxAttempts: 1,
minTimeoutInMs: 1000, minTimeoutInMs: 1000,
maxTimeoutInMs: 10000, maxTimeoutInMs: 10000,
factor: 2, factor: 2,

View File

@ -1,3 +1,5 @@
import { Spec } from "./oauth";
export enum IntegrationPayloadEventType { export enum IntegrationPayloadEventType {
/** /**
* When a webhook is received, this event is triggered to identify which integration * When a webhook is received, this event is triggered to identify which integration