From 6732ff71c5f28e7c35e3958d9d98d606e84759b8 Mon Sep 17 00:00:00 2001 From: Harshith Mullapudi Date: Wed, 15 Oct 2025 23:51:21 +0530 Subject: [PATCH] Feat: deep search for extension and obsidian (#107) * Feat: add deep search api * Feat: deep search agent * fix: stream utils for deep search * fix: deep search --------- Co-authored-by: Manoj --- apps/webapp/app/routes/api.v1.deep-search.tsx | 64 ++++ apps/webapp/app/services/search.server.ts | 2 +- apps/webapp/app/trigger/chat/stream-utils.ts | 28 ++ .../trigger/deep-search/deep-search-utils.ts | 292 ++++++++++++++++++ apps/webapp/app/trigger/deep-search/index.ts | 85 +++++ apps/webapp/app/trigger/deep-search/prompt.ts | 148 +++++++++ .../app/trigger/deep-search/stream-utils.ts | 68 ++++ apps/webapp/app/trigger/deep-search/types.ts | 20 ++ apps/webapp/app/trigger/deep-search/utils.ts | 64 ++++ apps/webapp/app/utils/mcp/memory.ts | 79 +++++ apps/webapp/package.json | 4 +- 11 files changed, 851 insertions(+), 3 deletions(-) create mode 100644 apps/webapp/app/routes/api.v1.deep-search.tsx create mode 100644 apps/webapp/app/trigger/deep-search/deep-search-utils.ts create mode 100644 apps/webapp/app/trigger/deep-search/index.ts create mode 100644 apps/webapp/app/trigger/deep-search/prompt.ts create mode 100644 apps/webapp/app/trigger/deep-search/stream-utils.ts create mode 100644 apps/webapp/app/trigger/deep-search/types.ts create mode 100644 apps/webapp/app/trigger/deep-search/utils.ts diff --git a/apps/webapp/app/routes/api.v1.deep-search.tsx b/apps/webapp/app/routes/api.v1.deep-search.tsx new file mode 100644 index 0000000..0ea57f7 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.deep-search.tsx @@ -0,0 +1,64 @@ +import { z } from "zod"; +import { json } from "@remix-run/node"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { deepSearch } from "~/trigger/deep-search"; +import { runs } from "@trigger.dev/sdk"; + +const DeepSearchBodySchema = z.object({ + content: z.string().min(1, "Content is required"), + intentOverride: z.string().optional(), + stream: z.boolean().default(false), + metadata: z + .object({ + source: z.enum(["chrome", "obsidian", "mcp"]).optional(), + url: z.string().optional(), + pageTitle: z.string().optional(), + }) + .optional(), +}); + +const { action, loader } = createActionApiRoute( + { + body: DeepSearchBodySchema, + method: "POST", + allowJWT: true, + authorization: { + action: "search", + }, + corsStrategy: "all", + }, + async ({ body, authentication }) => { + let trigger; + if (!body.stream) { + trigger = await deepSearch.trigger({ + content: body.content, + userId: authentication.userId, + stream: body.stream, + intentOverride: body.intentOverride, + metadata: body.metadata, + }); + + return json(trigger); + } else { + const runHandler = await deepSearch.trigger({ + content: body.content, + userId: authentication.userId, + stream: body.stream, + intentOverride: body.intentOverride, + metadata: body.metadata, + }); + + for await (const run of runs.subscribeToRun(runHandler.id)) { + if (run.status === "COMPLETED") { + return json(run.output); + } else if (run.status === "FAILED") { + return json(run.error); + } + } + + return json({ error: "Run failed" }); + } + }, +); + +export { action, loader }; diff --git a/apps/webapp/app/services/search.server.ts b/apps/webapp/app/services/search.server.ts index 6832e6f..1945504 100644 --- a/apps/webapp/app/services/search.server.ts +++ b/apps/webapp/app/services/search.server.ts @@ -80,7 +80,7 @@ export class SearchService { opts, ); - // // 3. Apply adaptive filtering based on score threshold and minimum count + // 3. Apply adaptive filtering based on score threshold and minimum count const filteredResults = this.applyAdaptiveFiltering(rankedStatements, opts); // 3. Return top results diff --git a/apps/webapp/app/trigger/chat/stream-utils.ts b/apps/webapp/app/trigger/chat/stream-utils.ts index 1bba693..56ed59b 100644 --- a/apps/webapp/app/trigger/chat/stream-utils.ts +++ b/apps/webapp/app/trigger/chat/stream-utils.ts @@ -59,9 +59,37 @@ export async function* processTag( const hasStartTag = chunk.includes(startTag); const hasClosingTag = chunk.includes(" { + let messages = [...initialMessages]; + let completed = false; + let guardLoop = 0; + let searchCount = 0; + let totalEpisodesFound = 0; + const seenEpisodeIds = new Set(); // Track unique episodes + const totalCost: TotalCost = { + inputTokens: 0, + outputTokens: 0, + cost: 0, + }; + + const tools = { + searchMemory: searchTool, + }; + + logger.info("Starting deep search ReAct loop"); + + try { + while (!completed && guardLoop < 50) { + logger.info( + `ReAct loop iteration ${guardLoop}, searches: ${searchCount}`, + ); + + // Call LLM with current message history + const response = generate( + messages, + (event) => { + const usage = event.usage; + totalCost.inputTokens += usage.promptTokens; + totalCost.outputTokens += usage.completionTokens; + }, + tools, + ); + + let totalMessage = ""; + const toolCalls: any[] = []; + + // States for streaming final_response tags + const messageState = { + inTag: false, + message: "", + messageEnded: false, + lastSent: "", + }; + + // Process streaming response + for await (const chunk of response) { + if (typeof chunk === "object" && chunk.type === "tool-call") { + // Agent made a tool call + toolCalls.push(chunk); + logger.info(`Tool call: ${chunk.toolName}`); + } else if (typeof chunk === "string") { + totalMessage += chunk; + + // Stream final_response tags using processTag + if (!messageState.messageEnded) { + yield* processTag( + messageState, + totalMessage, + chunk, + "", + "", + { + start: AgentMessageType.MESSAGE_START, + chunk: AgentMessageType.MESSAGE_CHUNK, + end: AgentMessageType.MESSAGE_END, + }, + ); + } + } + } + + // Check for final response + if (totalMessage.includes("")) { + const match = totalMessage.match( + /(.*?)<\/final_response>/s, + ); + + if (match) { + // Accept synthesis - completed + completed = true; + logger.info( + `Final synthesis accepted after ${searchCount} searches, ${totalEpisodesFound} unique episodes found`, + ); + break; + } + } + + // Execute tool calls in parallel for better performance + if (toolCalls.length > 0) { + // Notify about all searches starting + for (const toolCall of toolCalls) { + logger.info(`Executing search: ${JSON.stringify(toolCall.args)}`); + yield Message("", AgentMessageType.SKILL_START); + yield Message( + `\nSearching memory: "${toolCall.args.query}"...\n`, + AgentMessageType.SKILL_CHUNK, + ); + yield Message("", AgentMessageType.SKILL_END); + } + + // Execute all searches in parallel + const searchPromises = toolCalls.map((toolCall) => + searchTool.execute(toolCall.args).then((result: any) => ({ + toolCall, + result, + })), + ); + + const searchResults = await Promise.all(searchPromises); + + // Process results and add to message history + for (const { toolCall, result } of searchResults) { + searchCount++; + + // Deduplicate episodes - track unique IDs + let uniqueNewEpisodes = 0; + if (result.episodes && Array.isArray(result.episodes)) { + for (const episode of result.episodes) { + const episodeId = + episode.id || episode._id || JSON.stringify(episode); + if (!seenEpisodeIds.has(episodeId)) { + seenEpisodeIds.add(episodeId); + uniqueNewEpisodes++; + } + } + } + + const episodesInThisSearch = result.episodes?.length || 0; + totalEpisodesFound = seenEpisodeIds.size; // Use unique count + + messages.push({ + role: "assistant", + content: [ + { + type: "tool-call", + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args: toolCall.args, + }, + ], + }); + + // Add tool result to message history + messages.push({ + role: "tool", + content: [ + { + type: "tool-result", + toolName: toolCall.toolName, + toolCallId: toolCall.toolCallId, + result: result, + }, + ], + }); + + logger.info( + `Search ${searchCount} completed: ${episodesInThisSearch} episodes (${uniqueNewEpisodes} new, ${totalEpisodesFound} unique total)`, + ); + } + + // If found no episodes and haven't exhausted search attempts, require more searches + if (totalEpisodesFound === 0 && searchCount < 7) { + logger.info( + `Agent attempted synthesis with 0 unique episodes after ${searchCount} searches - requiring more attempts`, + ); + + yield Message("", AgentMessageType.SKILL_START); + yield Message( + `No relevant context found yet - trying different search angles...`, + AgentMessageType.SKILL_CHUNK, + ); + yield Message("", AgentMessageType.SKILL_END); + + messages.push({ + role: "system", + content: `You have performed ${searchCount} searches but found 0 unique relevant episodes. Your queries may be too abstract or not matching the user's actual conversation topics. + +Review your DECOMPOSITION: +- Are you using specific terms from the content? +- Try searching broader related topics the user might have discussed +- Try different terminology or related concepts +- Search for user's projects, work areas, or interests + +Continue with different search strategies (you can search up to 7-10 times total).`, + }); + + guardLoop++; + continue; + } + + // Soft nudging after all searches executed (awareness, not commands) + if (totalEpisodesFound >= 30 && searchCount >= 3) { + logger.info( + `Nudging: ${totalEpisodesFound} unique episodes found - suggesting synthesis consideration`, + ); + + messages.push({ + role: "system", + content: `Context awareness: You have found ${totalEpisodesFound} unique episodes across ${searchCount} searches. This represents substantial context. Consider whether you have sufficient information for quality synthesis, or if additional search angles would meaningfully improve understanding.`, + }); + } else if (totalEpisodesFound >= 15 && searchCount >= 5) { + logger.info( + `Nudging: ${totalEpisodesFound} unique episodes after ${searchCount} searches - suggesting evaluation`, + ); + + messages.push({ + role: "system", + content: `Progress update: You have ${totalEpisodesFound} unique episodes from ${searchCount} searches. Evaluate whether you have covered the main angles from your decomposition, or if important aspects remain unexplored.`, + }); + } else if (searchCount >= 7) { + logger.info( + `Nudging: ${searchCount} searches completed with ${totalEpisodesFound} unique episodes`, + ); + + messages.push({ + role: "system", + content: `Search depth: You have performed ${searchCount} searches and found ${totalEpisodesFound} unique episodes. Consider whether additional searches would yield meaningfully different context, or if it's time to synthesize what you've discovered.`, + }); + } + if (searchCount >= 10) { + logger.info( + `Reached maximum search limit (10), forcing synthesis with ${totalEpisodesFound} unique episodes`, + ); + + yield Message("", AgentMessageType.SKILL_START); + yield Message( + `Maximum searches reached - synthesizing results...`, + AgentMessageType.SKILL_CHUNK, + ); + yield Message("", AgentMessageType.SKILL_END); + + messages.push({ + role: "system", + content: `You have performed 10 searches and found ${totalEpisodesFound} unique episodes. This is the maximum allowed. You MUST now provide your final synthesis wrapped in tags based on what you've found.`, + }); + } + } + + // Safety check - if no tool calls and no final response, something went wrong + if ( + toolCalls.length === 0 && + !totalMessage.includes("") + ) { + logger.warn("Agent produced neither tool calls nor final response"); + + messages.push({ + role: "system", + content: + "You must either use the searchMemory tool to search for more context, or provide your final synthesis wrapped in tags.", + }); + } + + guardLoop++; + } + + if (!completed) { + logger.warn( + `Loop ended without completion after ${guardLoop} iterations`, + ); + yield Message("", AgentMessageType.MESSAGE_START); + yield Message( + "Deep search did not complete - maximum iterations reached.", + AgentMessageType.MESSAGE_CHUNK, + ); + yield Message("", AgentMessageType.MESSAGE_END); + } + + yield Message("Stream ended", AgentMessageType.STREAM_END); + } catch (error) { + logger.error(`Deep search error: ${error}`); + yield Message((error as Error).message, AgentMessageType.ERROR); + yield Message("Stream ended", AgentMessageType.STREAM_END); + } +} diff --git a/apps/webapp/app/trigger/deep-search/index.ts b/apps/webapp/app/trigger/deep-search/index.ts new file mode 100644 index 0000000..a8b61f4 --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/index.ts @@ -0,0 +1,85 @@ +import { metadata, task } from "@trigger.dev/sdk"; +import { type CoreMessage } from "ai"; +import { logger } from "@trigger.dev/sdk/v3"; +import { nanoid } from "nanoid"; +import { + deletePersonalAccessToken, + getOrCreatePersonalAccessToken, +} from "../utils/utils"; +import { getReActPrompt } from "./prompt"; +import { type DeepSearchPayload, type DeepSearchResponse } from "./types"; +import { createSearchMemoryTool } from "./utils"; +import { run } from "./deep-search-utils"; +import { AgentMessageType } from "../chat/types"; + +export const deepSearch = task({ + id: "deep-search", + maxDuration: 3000, + run: async (payload: DeepSearchPayload): Promise => { + const { content, userId, stream, metadata: meta, intentOverride } = payload; + + const randomKeyName = `deepSearch_${nanoid(10)}`; + + // Get or create token for search API calls + const pat = await getOrCreatePersonalAccessToken({ + name: randomKeyName, + userId: userId as string, + }); + + if (!pat?.token) { + throw new Error("Failed to create personal access token"); + } + + try { + // Create search tool that agent will use + const searchTool = createSearchMemoryTool(pat.token); + + // Build initial messages with ReAct prompt + const initialMessages: CoreMessage[] = [ + { + role: "system", + content: getReActPrompt(meta, intentOverride), + }, + { + role: "user", + content: `CONTENT TO ANALYZE:\n${content}\n\nPlease search my memory for relevant context and synthesize what you find.`, + }, + ]; + + // Run the ReAct loop generator + const llmResponse = run(initialMessages, searchTool); + + // Streaming mode: stream via metadata.stream like chat.ts does + // This makes all message types available to clients in real-time + const messageStream = await metadata.stream("messages", llmResponse); + + let synthesis = ""; + + for await (const step of messageStream) { + // MESSAGE_CHUNK: Final synthesis - accumulate and stream + if (step.type === AgentMessageType.MESSAGE_CHUNK) { + synthesis += step.message; + } + + // STREAM_END: Loop completed + if (step.type === AgentMessageType.STREAM_END) { + break; + } + } + + await deletePersonalAccessToken(pat?.id); + + // Clean up any remaining tags + synthesis = synthesis + .replace(//gi, "") + .replace(/<\/final_response>/gi, "") + .trim(); + + return { synthesis }; + } catch (error) { + await deletePersonalAccessToken(pat?.id); + logger.error(`Deep search error: ${error}`); + throw error; + } + }, +}); diff --git a/apps/webapp/app/trigger/deep-search/prompt.ts b/apps/webapp/app/trigger/deep-search/prompt.ts new file mode 100644 index 0000000..fa56b44 --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/prompt.ts @@ -0,0 +1,148 @@ +export function getReActPrompt( + metadata?: { source?: string; url?: string; pageTitle?: string }, + intentOverride?: string +): string { + const contextHints = []; + + if (metadata?.source === "chrome" && metadata?.url?.includes("mail.google.com")) { + contextHints.push("Content is from email - likely reading intent"); + } + if (metadata?.source === "chrome" && metadata?.url?.includes("calendar.google.com")) { + contextHints.push("Content is from calendar - likely meeting prep intent"); + } + if (metadata?.source === "chrome" && metadata?.url?.includes("docs.google.com")) { + contextHints.push("Content is from document editor - likely writing intent"); + } + if (metadata?.source === "obsidian") { + contextHints.push("Content is from note editor - likely writing or research intent"); + } + + return `You are a memory research agent analyzing content to find relevant context. + +YOUR PROCESS (ReAct Framework): + +1. DECOMPOSE: First, break down the content into structured categories + + Analyze the content and extract: + a) ENTITIES: Specific people, project names, tools, products mentioned + Example: "John Smith", "Phoenix API", "Redis", "mobile app" + + b) TOPICS & CONCEPTS: Key subjects, themes, domains + Example: "authentication", "database design", "performance optimization" + + c) TEMPORAL MARKERS: Time references, deadlines, events + Example: "last week's meeting", "Q2 launch", "yesterday's discussion" + + d) ACTIONS & TASKS: What's being done, decided, or requested + Example: "implement feature", "review code", "make decision on" + + e) USER INTENT: What is the user trying to accomplish? + ${intentOverride ? `User specified: "${intentOverride}"` : "Infer from context: reading/writing/meeting prep/research/task tracking/review"} + +2. FORM QUERIES: Create targeted search queries from your decomposition + + Based on decomposition, form specific queries: + - Search for each entity by name (people, projects, tools) + - Search for topics the user has discussed before + - Search for related work or conversations in this domain + - Use the user's actual terminology, not generic concepts + + EXAMPLE - Content: "Email from Sarah about the API redesign we discussed last week" + Decomposition: + - Entities: "Sarah", "API redesign" + - Topics: "API design", "redesign" + - Temporal: "last week" + - Actions: "discussed", "email communication" + - Intent: Reading (email) / meeting prep + + Queries to form: + ✅ "Sarah" (find past conversations with Sarah) + ✅ "API redesign" or "API design" (find project discussions) + ✅ "last week" + "Sarah" (find recent context) + ✅ "meetings" or "discussions" (find related conversations) + + ❌ Avoid: "email communication patterns", "API architecture philosophy" + (These are abstract - search what user actually discussed!) + +3. SEARCH: Execute your queries using searchMemory tool + - Start with 2-3 core searches based on main entities/topics + - Make each search specific and targeted + - Use actual terms from the content, not rephrased concepts + +4. OBSERVE: Evaluate search results + - Did you find relevant episodes? How many unique ones? + - What specific context emerged? + - What new entities/topics appeared in results? + - Are there gaps in understanding? + - Should you search more angles? + + Note: Episode counts are automatically deduplicated across searches - overlapping episodes are only counted once. + +5. REACT: Decide next action based on observations + + STOPPING CRITERIA - Proceed to SYNTHESIZE if ANY of these are true: + - You found 20+ unique episodes across your searches → ENOUGH CONTEXT + - You performed 5+ searches and found relevant episodes → SUFFICIENT + - You performed 7+ searches regardless of results → EXHAUSTED STRATEGIES + - You found strong relevant context from multiple angles → COMPLETE + + System nudges will provide awareness of your progress, but you decide when synthesis quality would be optimal. + + If you found little/no context AND searched less than 7 times: + - Try different query angles from your decomposition + - Search broader related topics + - Search user's projects or work areas + - Try alternative terminology + + ⚠️ DO NOT search endlessly - if you found relevant episodes, STOP and synthesize! + +6. SYNTHESIZE: After gathering sufficient context, provide final answer + - Wrap your synthesis in tags + - Present direct factual context from memory - no meta-commentary + - Write as if providing background context to an AI assistant + - Include: facts, decisions, preferences, patterns, timelines + - Note any gaps, contradictions, or evolution in thinking + - Keep it concise and actionable + - DO NOT use phrases like "Previous discussions on", "From conversations", "Past preferences indicate" + - DO NOT use conversational language like "you said" or "you mentioned" + - Present information as direct factual statements + +FINAL RESPONSE FORMAT: + +[Direct synthesized context - factual statements only] + +Good examples: +- "The API redesign focuses on performance and scalability. Key decisions: moving to GraphQL, caching layer with Redis." +- "Project Phoenix launches Q2 2024. Main features: real-time sync, offline mode, collaborative editing." +- "Sarah leads the backend team. Recent work includes authentication refactor and database migration." + +Bad examples: +❌ "Previous discussions on the API revealed..." +❌ "From past conversations, it appears that..." +❌ "Past preferences indicate..." +❌ "The user mentioned that..." + +Just state the facts directly. + + +${contextHints.length > 0 ? `\nCONTEXT HINTS:\n${contextHints.join("\n")}` : ""} + +CRITICAL REQUIREMENTS: +- ALWAYS start with DECOMPOSE step - extract entities, topics, temporal markers, actions +- Form specific queries from your decomposition - use user's actual terms +- Minimum 3 searches required +- Maximum 10 searches allowed - must synthesize after that +- STOP and synthesize when you hit stopping criteria (20+ episodes, 5+ searches with results, 7+ searches total) +- Each search should target different aspects from decomposition +- Present synthesis directly without meta-commentary + +SEARCH QUALITY CHECKLIST: +✅ Queries use specific terms from content (names, projects, exact phrases) +✅ Searched multiple angles from decomposition (entities, topics, related areas) +✅ Stop when you have enough unique context - don't search endlessly +✅ Tried alternative terminology if initial searches found nothing +❌ Avoid generic/abstract queries that don't match user's vocabulary +❌ Don't stop at 3 searches if you found zero unique episodes +❌ Don't keep searching when you already found 20+ unique episodes +}` +} diff --git a/apps/webapp/app/trigger/deep-search/stream-utils.ts b/apps/webapp/app/trigger/deep-search/stream-utils.ts new file mode 100644 index 0000000..11910c6 --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/stream-utils.ts @@ -0,0 +1,68 @@ +import { openai } from "@ai-sdk/openai"; +import { logger } from "@trigger.dev/sdk/v3"; +import { + type CoreMessage, + type LanguageModelV1, + streamText, + type ToolSet, +} from "ai"; + +/** + * Generate LLM responses with tool calling support + * Simplified version for deep-search use case - NO maxSteps for manual ReAct control + */ +export async function* generate( + messages: CoreMessage[], + onFinish?: (event: any) => void, + tools?: ToolSet, + model?: string, +): AsyncGenerator< + | string + | { + type: string; + toolName: string; + args?: any; + toolCallId?: string; + } +> { + const modelToUse = model || process.env.MODEL || "gpt-4.1-2025-04-14"; + const modelInstance = openai(modelToUse) as LanguageModelV1; + + logger.info(`Starting LLM generation with model: ${modelToUse}`); + + try { + const { textStream, fullStream } = streamText({ + model: modelInstance, + messages, + temperature: 1, + tools, + // NO maxSteps - we handle tool execution manually in the ReAct loop + toolCallStreaming: true, + onFinish, + }); + + // Yield text chunks + for await (const chunk of textStream) { + yield chunk; + } + + // Yield tool calls + for await (const fullChunk of fullStream) { + if (fullChunk.type === "tool-call") { + yield { + type: "tool-call", + toolName: fullChunk.toolName, + toolCallId: fullChunk.toolCallId, + args: fullChunk.args, + }; + } + + if (fullChunk.type === "error") { + logger.error(`LLM error: ${JSON.stringify(fullChunk)}`); + } + } + } catch (error) { + logger.error(`LLM generation error: ${error}`); + throw error; + } +} diff --git a/apps/webapp/app/trigger/deep-search/types.ts b/apps/webapp/app/trigger/deep-search/types.ts new file mode 100644 index 0000000..b54dcec --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/types.ts @@ -0,0 +1,20 @@ +export interface DeepSearchPayload { + content: string; + userId: string; + stream: boolean; + intentOverride?: string; + metadata?: { + source?: "chrome" | "obsidian" | "mcp"; + url?: string; + pageTitle?: string; + }; +} + +export interface DeepSearchResponse { + synthesis: string; + episodes?: Array<{ + content: string; + createdAt: Date; + spaceIds: string[]; + }>; +} diff --git a/apps/webapp/app/trigger/deep-search/utils.ts b/apps/webapp/app/trigger/deep-search/utils.ts new file mode 100644 index 0000000..75148c8 --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/utils.ts @@ -0,0 +1,64 @@ +import { tool } from "ai"; +import { z } from "zod"; +import axios from "axios"; +import { logger } from "@trigger.dev/sdk/v3"; + +export function createSearchMemoryTool(token: string) { + return tool({ + description: + "Search the user's memory for relevant facts and episodes. Use this tool multiple times with different queries to gather comprehensive context.", + parameters: z.object({ + query: z + .string() + .describe( + "Search query to find relevant information. Be specific: entity names, topics, concepts.", + ), + }), + execute: async ({ query }) => { + try { + const response = await axios.post( + `${process.env.API_BASE_URL || "https://core.heysol.ai"}/api/v1/search`, + { query }, + { + headers: { + Authorization: `Bearer ${token}`, + }, + }, + ); + + const searchResult = response.data; + + return { + facts: searchResult.facts || [], + episodes: searchResult.episodes || [], + summary: `Found ${searchResult.episodes?.length || 0} relevant memories`, + }; + } catch (error) { + logger.error(`SearchMemory tool error: ${error}`); + return { + facts: [], + episodes: [], + summary: "No results found", + }; + } + }, + }); +} + +// Helper to extract unique episodes from tool calls +export function extractEpisodesFromToolCalls(toolCalls: any[]): any[] { + const episodes: any[] = []; + + for (const call of toolCalls || []) { + if (call.toolName === "searchMemory" && call.result?.episodes) { + episodes.push(...call.result.episodes); + } + } + + // Deduplicate by content + createdAt + const uniqueEpisodes = Array.from( + new Map(episodes.map((e) => [`${e.content}-${e.createdAt}`, e])).values(), + ); + + return uniqueEpisodes.slice(0, 10); +} diff --git a/apps/webapp/app/utils/mcp/memory.ts b/apps/webapp/app/utils/mcp/memory.ts index eda326b..c4cdd90 100644 --- a/apps/webapp/app/utils/mcp/memory.ts +++ b/apps/webapp/app/utils/mcp/memory.ts @@ -3,6 +3,7 @@ import { addToQueue } from "~/lib/ingest.server"; import { logger } from "~/services/logger.service"; import { SearchService } from "~/services/search.server"; import { SpaceService } from "~/services/space.server"; +import { deepSearch } from "~/trigger/deep-search"; import { IntegrationLoader } from "./integration-loader"; const searchService = new SearchService(); @@ -178,6 +179,27 @@ export const memoryTools = [ required: ["integrationSlug", "action"], }, }, + // { + // name: "memory_deep_search", + // description: + // "Search CORE memory with document context and get synthesized insights. Automatically analyzes content to infer intent (reading, writing, meeting prep, research, task tracking, etc.) and provides context-aware synthesis. USE THIS TOOL: When analyzing documents, emails, notes, or any substantial text content for relevant memories. HOW TO USE: Provide the full content text. The tool will decompose it, search for relevant memories, and synthesize findings based on inferred intent. Returns: Synthesized context summary and related episodes.", + // inputSchema: { + // type: "object", + // properties: { + // content: { + // type: "string", + // description: + // "Full document/text content to analyze and search against memory", + // }, + // intentOverride: { + // type: "string", + // description: + // "Optional: Explicitly specify intent (e.g., 'meeting preparation', 'blog writing') instead of auto-detection", + // }, + // }, + // required: ["content"], + // }, + // }, ]; // Function to call memory tools based on toolName @@ -205,6 +227,8 @@ export async function callMemoryTool( return await handleGetIntegrationActions({ ...args }); case "execute_integration_action": return await handleExecuteIntegrationAction({ ...args }); + case "memory_deep_search": + return await handleMemoryDeepSearch({ ...args, userId, source }); default: throw new Error(`Unknown memory tool: ${toolName}`); } @@ -546,3 +570,58 @@ async function handleExecuteIntegrationAction(args: any) { }; } } + +// Handler for memory_deep_search +async function handleMemoryDeepSearch(args: any) { + try { + const { content, intentOverride, userId, source } = args; + + if (!content) { + throw new Error("content is required"); + } + + // Trigger non-streaming deep search task + const handle = await deepSearch.triggerAndWait({ + content, + userId, + stream: false, // MCP doesn't need streaming + intentOverride, + metadata: { source }, + }); + + // Wait for task completion + if (handle.ok) { + return { + content: [ + { + type: "text", + text: JSON.stringify(handle.output), + }, + ], + isError: false, + }; + } else { + return { + content: [ + { + type: "text", + text: `Error performing deep search: ${handle.error instanceof Error ? handle.error.message : String(handle.error)}`, + }, + ], + isError: true, + }; + } + } catch (error) { + logger.error(`MCP deep search error: ${error}`); + + return { + content: [ + { + type: "text", + text: `Error performing deep search: ${error instanceof Error ? error.message : String(error)}`, + }, + ], + isError: true, + }; + } +} diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 2aec46d..292cd3b 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -10,8 +10,8 @@ "lint:fix": "eslint 'app/**/*.{ts,tsx,js,jsx}' --rule 'turbo/no-undeclared-env-vars:error' -f table", "start": "node server.js", "typecheck": "tsc", - "trigger:dev": "pnpm dlx trigger.dev@4.0.0-v4-beta.22 dev", - "trigger:deploy": "pnpm dlx trigger.dev@4.0.0-v4-beta.22 deploy" + "trigger:dev": "pnpm dlx trigger.dev@4.0.4 dev", + "trigger:deploy": "pnpm dlx trigger.dev@4.0.4 deploy" }, "dependencies": { "@ai-sdk/amazon-bedrock": "2.2.12",