diff --git a/.gitignore b/.gitignore index bf72edb..790b1d6 100644 --- a/.gitignore +++ b/.gitignore @@ -41,4 +41,7 @@ docker-compose.dev.yaml clickhouse/ .vscode/ -registry/ \ No newline at end of file +registry/ + +.cursor +CLAUDE.md \ No newline at end of file diff --git a/apps/webapp/app/services/conversation.server.ts b/apps/webapp/app/services/conversation.server.ts index 910897c..70694a3 100644 --- a/apps/webapp/app/services/conversation.server.ts +++ b/apps/webapp/app/services/conversation.server.ts @@ -2,15 +2,16 @@ import { UserTypeEnum } from "@core/types"; import { auth, runs, tasks } from "@trigger.dev/sdk/v3"; import { prisma } from "~/db.server"; -import { getOrCreatePersonalAccessToken } from "./personalAccessToken.server"; import { createConversationTitle } from "~/trigger/conversation/create-conversation-title"; import { z } from "zod"; +import { type ConversationHistory } from "@prisma/client"; export const CreateConversationSchema = z.object({ message: z.string(), title: z.string().optional(), conversationId: z.string().optional(), + userType: z.nativeEnum(UserTypeEnum).optional(), }); export type CreateConversationDto = z.infer; @@ -22,15 +23,13 @@ export async function createConversation( conversationData: CreateConversationDto, ) { const { title, conversationId, ...otherData } = conversationData; - // Ensure PAT exists for the user - await getOrCreatePersonalAccessToken({ name: "trigger", userId }); if (conversationId) { // Add a new message to an existing conversation const conversationHistory = await prisma.conversationHistory.create({ data: { ...otherData, - userType: UserTypeEnum.User, + userType: otherData.userType || UserTypeEnum.User, ...(userId && { user: { connect: { id: userId }, @@ -45,12 +44,13 @@ export async function createConversation( }, }); - // No context logic here + const context = await getConversationContext(conversationHistory.id); const handler = await tasks.trigger( "chat", { conversationHistoryId: conversationHistory.id, conversationId: conversationHistory.conversation.id, + context, }, { tags: [conversationHistory.id, workspaceId, conversationId] }, ); @@ -73,7 +73,7 @@ export async function createConversation( ConversationHistory: { create: { userId, - userType: UserTypeEnum.User, + userType: otherData.userType || UserTypeEnum.User, ...otherData, }, }, @@ -84,6 +84,7 @@ export async function createConversation( }); const conversationHistory = conversation.ConversationHistory[0]; + const context = await getConversationContext(conversationHistory.id); // Trigger conversation title task await tasks.trigger( @@ -100,6 +101,7 @@ export async function createConversation( { conversationHistoryId: conversationHistory.id, conversationId: conversation.id, + context, }, { tags: [conversationHistory.id, workspaceId, conversation.id] }, ); @@ -226,3 +228,42 @@ export async function stopConversation( return await runs.cancel(run.id); } + +export async function getConversationContext( + conversationHistoryId: string, +): Promise<{ + previousHistory: ConversationHistory[]; +}> { + const conversationHistory = await prisma.conversationHistory.findUnique({ + where: { id: conversationHistoryId }, + include: { conversation: true }, + }); + + if (!conversationHistory) { + return { + previousHistory: [], + }; + } + + // Get previous conversation history message and response + let previousHistory: ConversationHistory[] = []; + + if (conversationHistory.conversationId) { + previousHistory = await prisma.conversationHistory.findMany({ + where: { + conversationId: conversationHistory.conversationId, + id: { + not: conversationHistoryId, + }, + deleted: null, + }, + orderBy: { + createdAt: "asc", + }, + }); + } + + return { + previousHistory, + }; +} diff --git a/apps/webapp/app/services/knowledgeGraph.server.ts b/apps/webapp/app/services/knowledgeGraph.server.ts index d4d47b6..efe2da1 100644 --- a/apps/webapp/app/services/knowledgeGraph.server.ts +++ b/apps/webapp/app/services/knowledgeGraph.server.ts @@ -45,8 +45,7 @@ import { createOllama } from "ollama-ai-provider"; const DEFAULT_EPISODE_WINDOW = 5; export class KnowledgeGraphService { - async getEmbedding(text: string, useOpenAI = true) { - console.log(text, useOpenAI); + async getEmbedding(text: string, useOpenAI = false) { if (useOpenAI) { // Use OpenAI embedding model when explicitly requested const { embedding } = await embed({ diff --git a/apps/webapp/app/trigger/chat/chat-utils.ts b/apps/webapp/app/trigger/chat/chat-utils.ts index 573a653..9e53a60 100644 --- a/apps/webapp/app/trigger/chat/chat-utils.ts +++ b/apps/webapp/app/trigger/chat/chat-utils.ts @@ -65,15 +65,19 @@ const searchMemoryTool = tool({ properties: { query: { type: "string", - description: "The search query to find relevant information in memory", + description: "The search query in third person perspective", }, - spaceId: { + validAt: { type: "string", - description: "Optional space ID to search within a specific space", + description: "The valid at time in ISO format", }, - sessionId: { + startTime: { type: "string", - description: "Optional session ID to search within a specific session", + description: "The start time in ISO format", + }, + endTime: { + type: "string", + description: "The end time in ISO format", }, }, required: ["query"], @@ -86,34 +90,12 @@ const addMemoryTool = tool({ parameters: jsonSchema({ type: "object", properties: { - episodeBody: { + message: { type: "string", description: "The content/text to add to memory", }, - referenceTime: { - type: "string", - description: - "ISO 8601 timestamp for when this information is relevant (defaults to current time)", - }, - source: { - type: "string", - description: - "Source of the information (e.g., 'user', 'chat', 'system')", - }, - spaceId: { - type: "string", - description: "Optional space ID to add memory to a specific space", - }, - sessionId: { - type: "string", - description: "Optional session ID to associate with a specific session", - }, - metadata: { - type: "object", - description: "Optional metadata object for additional context", - }, }, - required: ["episodeBody"], + required: ["message"], additionalProperties: false, }), }); diff --git a/apps/webapp/app/trigger/chat/chat.ts b/apps/webapp/app/trigger/chat/chat.ts index 738a1d4..72882ab 100644 --- a/apps/webapp/app/trigger/chat/chat.ts +++ b/apps/webapp/app/trigger/chat/chat.ts @@ -8,6 +8,7 @@ import { MCP } from "../utils/mcp"; import { type HistoryStep } from "../utils/types"; import { createConversationHistoryForAgent, + deletePersonalAccessToken, getPreviousExecutionHistory, init, type RunChatPayload, @@ -54,6 +55,7 @@ export const chat = task({ }, workpsaceId: init?.conversation.workspaceId, resources: otherData.resources, + todayDate: new Date().toISOString(), }; // Extract user's goal from conversation history @@ -123,8 +125,15 @@ export const chat = task({ // init.preferences, // init.userName, // ); + + if (init?.tokenId) { + await deletePersonalAccessToken(init.tokenId); + } } catch (e) { await updateConversationStatus("failed", payload.conversationId); + if (init?.tokenId) { + await deletePersonalAccessToken(init.tokenId); + } throw new Error(e as string); } }, diff --git a/apps/webapp/app/trigger/chat/memory-utils.ts b/apps/webapp/app/trigger/chat/memory-utils.ts index 8d08eac..e99b692 100644 --- a/apps/webapp/app/trigger/chat/memory-utils.ts +++ b/apps/webapp/app/trigger/chat/memory-utils.ts @@ -4,12 +4,13 @@ import axios from "axios"; // Memory API functions using axios interceptor export interface SearchMemoryParams { query: string; - spaceId?: string; - sessionId?: string; + validAt?: string; + startTime?: string; + endTime?: string; } export interface AddMemoryParams { - episodeBody: string; + message: string; referenceTime?: string; source?: string; spaceId?: string; @@ -32,8 +33,9 @@ export const addMemory = async (params: AddMemoryParams) => { // Set defaults for required fields const memoryInput = { ...params, + episodeBody: params.message, referenceTime: params.referenceTime || new Date().toISOString(), - source: params.source || "chat", + source: params.source || "CORE", }; const response = await axios.post( diff --git a/apps/webapp/app/trigger/chat/stream-utils.ts b/apps/webapp/app/trigger/chat/stream-utils.ts index 94be24a..f9773e4 100644 --- a/apps/webapp/app/trigger/chat/stream-utils.ts +++ b/apps/webapp/app/trigger/chat/stream-utils.ts @@ -139,11 +139,12 @@ export async function* generate( const anthropicKey = process.env.ANTHROPIC_API_KEY; const googleKey = process.env.GOOGLE_GENERATIVE_AI_API_KEY; const openaiKey = process.env.OPENAI_API_KEY; - const ollamaUrl = process.env.OLLAMA_URL; + let ollamaUrl = process.env.OLLAMA_URL; model = model || process.env.MODEL; let modelInstance; let modelTemperature = Number(process.env.MODEL_TEMPERATURE) || 1; + ollamaUrl = undefined; // First check if Ollama URL exists and use Ollama if (ollamaUrl) { diff --git a/apps/webapp/app/trigger/utils/utils.ts b/apps/webapp/app/trigger/utils/utils.ts index 31d5801..f8ce5a7 100644 --- a/apps/webapp/app/trigger/utils/utils.ts +++ b/apps/webapp/app/trigger/utils/utils.ts @@ -14,9 +14,109 @@ import { type CoreMessage } from "ai"; import { type HistoryStep } from "./types"; import axios from "axios"; +import nodeCrypto from "node:crypto"; +import { customAlphabet, nanoid } from "nanoid"; const prisma = new PrismaClient(); +// Token generation utilities +const tokenValueLength = 40; +const tokenGenerator = customAlphabet( + "123456789abcdefghijkmnopqrstuvwxyz", + tokenValueLength, +); +const tokenPrefix = "rc_pat_"; + +type CreatePersonalAccessTokenOptions = { + name: string; + userId: string; +}; + +// Helper functions for token management +function createToken() { + return `${tokenPrefix}${tokenGenerator()}`; +} + +function obfuscateToken(token: string) { + const withoutPrefix = token.replace(tokenPrefix, ""); + const obfuscated = `${withoutPrefix.slice(0, 4)}${"•".repeat(18)}${withoutPrefix.slice(-4)}`; + return `${tokenPrefix}${obfuscated}`; +} + +function encryptToken(value: string) { + const encryptionKey = process.env.ENCRYPTION_KEY; + if (!encryptionKey) { + throw new Error("ENCRYPTION_KEY environment variable is required"); + } + + const nonce = nodeCrypto.randomBytes(12); + const cipher = nodeCrypto.createCipheriv("aes-256-gcm", encryptionKey, nonce); + + let encrypted = cipher.update(value, "utf8", "hex"); + encrypted += cipher.final("hex"); + + const tag = cipher.getAuthTag().toString("hex"); + + return { + nonce: nonce.toString("hex"), + ciphertext: encrypted, + tag, + }; +} + +function hashToken(token: string): string { + const hash = nodeCrypto.createHash("sha256"); + hash.update(token); + return hash.digest("hex"); +} + +export async function getOrCreatePersonalAccessToken({ + name, + userId, +}: CreatePersonalAccessTokenOptions) { + // Try to find an existing, non-revoked token + const existing = await prisma.personalAccessToken.findFirst({ + where: { + name, + userId, + revokedAt: null, + }, + }); + + if (existing) { + // Do not return the unencrypted token if it already exists + return { + id: existing.id, + name: existing.name, + userId: existing.userId, + obfuscatedToken: existing.obfuscatedToken, + // token is not returned + }; + } + + // Create a new token + const token = createToken(); + const encryptedToken = encryptToken(token); + + const personalAccessToken = await prisma.personalAccessToken.create({ + data: { + name, + userId, + encryptedToken, + obfuscatedToken: obfuscateToken(token), + hashedToken: hashToken(token), + }, + }); + + return { + id: personalAccessToken.id, + name, + userId, + token, + obfuscatedToken: personalAccessToken.obfuscatedToken, + }; +} + export interface InitChatPayload { conversationId: string; conversationHistoryId: string; @@ -61,8 +161,10 @@ export const init = async (payload: InitChatPayload) => { return { conversation, conversationHistory }; } - const pat = await prisma.personalAccessToken.findFirst({ - where: { userId: workspace.userId as string, name: "default" }, + const randomKeyName = `chat_${nanoid(10)}`; + const pat = await getOrCreatePersonalAccessToken({ + name: randomKeyName, + userId: workspace.userId as string, }); const user = await prisma.user.findFirst({ @@ -76,6 +178,21 @@ export const init = async (payload: InitChatPayload) => { include: { integrationDefinition: true }, }); + // Set up axios interceptor for memory operations + axios.interceptors.request.use((config) => { + if (config.url?.startsWith("https://core::memory")) { + // Handle both search and ingest endpoints + if (config.url.includes("/search")) { + config.url = `${process.env.API_BASE_URL}/search`; + } else if (config.url.includes("/ingest")) { + config.url = `${process.env.API_BASE_URL}/ingest`; + } + config.headers.Authorization = `Bearer ${pat.token}`; + } + + return config; + }); + // Create MCP server configurations for each integration account // eslint-disable-next-line @typescript-eslint/no-explicit-any const integrationMCPServers: Record = {}; @@ -136,20 +253,6 @@ export const init = async (payload: InitChatPayload) => { integrationMCPServers[account.integrationDefinition.slug] = configuredMCP; } - - axios.interceptors.request.use((config) => { - if (config.url?.startsWith("https://core::memory")) { - // Handle both search and ingest endpoints - if (config.url.includes("/search")) { - config.url = `${process.env.API_BASE_URL}/search`; - } else if (config.url.includes("/ingest")) { - config.url = `${process.env.API_BASE_URL}/ingest`; - } - config.headers.Authorization = `Bearer ${payload.pat}`; - } - - return config; - }); } catch (error) { logger.error( `Failed to configure MCP for ${account.integrationDefinition?.slug}:`, @@ -161,7 +264,8 @@ export const init = async (payload: InitChatPayload) => { return { conversation, conversationHistory, - token: pat?.obfuscatedToken, + tokenId: pat.id, + token: pat.token, userId: user?.id, userName: user?.name, }; @@ -430,3 +534,11 @@ export async function getContinuationAgentConversationHistory( take: 1, }); } + +export async function deletePersonalAccessToken(tokenId: string) { + return await prisma.personalAccessToken.delete({ + where: { + id: tokenId, + }, + }); +} diff --git a/apps/webapp/trigger.config.ts b/apps/webapp/trigger.config.ts index e262170..18b35f1 100644 --- a/apps/webapp/trigger.config.ts +++ b/apps/webapp/trigger.config.ts @@ -11,7 +11,7 @@ export default defineConfig({ retries: { enabledInDev: true, default: { - maxAttempts: 3, + maxAttempts: 1, minTimeoutInMs: 1000, maxTimeoutInMs: 10000, factor: 2, diff --git a/packages/types/src/integration.ts b/packages/types/src/integration.ts index 53949a9..e9036d8 100644 --- a/packages/types/src/integration.ts +++ b/packages/types/src/integration.ts @@ -1,3 +1,5 @@ +import { Spec } from "./oauth"; + export enum IntegrationPayloadEventType { /** * When a webhook is received, this event is triggered to identify which integration