diff --git a/apps/webapp/app/routes/api.v1.deep-search.tsx b/apps/webapp/app/routes/api.v1.deep-search.tsx index aabd500..92897cb 100644 --- a/apps/webapp/app/routes/api.v1.deep-search.tsx +++ b/apps/webapp/app/routes/api.v1.deep-search.tsx @@ -1,12 +1,13 @@ import { z } from "zod"; import { json } from "@remix-run/node"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { DeepSearchService } from "~/services/deepSearch.server"; -import { SearchService } from "~/services/search.server"; +import { deepSearch } from "~/trigger/deep-search"; +import { runs } from "@trigger.dev/sdk"; const DeepSearchBodySchema = z.object({ content: z.string().min(1, "Content is required"), intentOverride: z.string().optional(), + stream: z.boolean().default(false), metadata: z .object({ source: z.enum(["chrome", "obsidian", "mcp"]).optional(), @@ -27,15 +28,36 @@ const { action, loader } = createActionApiRoute( corsStrategy: "all", }, async ({ body, authentication }) => { - const searchService = new SearchService(); - const deepSearchService = new DeepSearchService(searchService); + let trigger; + if (body.stream) { + trigger = await deepSearch.trigger({ + content: body.content, + userId: authentication.userId, + stream: body.stream, + intentOverride: body.intentOverride, + metadata: body.metadata, + }); - const result = await deepSearchService.deepSearch( - body, - authentication.userId - ); + return json(trigger); + } else { + const runHandler = await deepSearch.trigger({ + content: body.content, + userId: authentication.userId, + stream: body.stream, + intentOverride: body.intentOverride, + metadata: body.metadata, + }); - return json(result); + for await (const run of runs.subscribeToRun(runHandler.id)) { + if (run.status === "COMPLETED") { + return json(run.output); + } else if (run.status === "FAILED") { + return json(run.error); + } + } + + return json({ error: "Run failed" }); + } } ); diff --git a/apps/webapp/app/services/deepSearch.server.ts b/apps/webapp/app/services/deepSearch.server.ts deleted file mode 100644 index bc7003a..0000000 --- a/apps/webapp/app/services/deepSearch.server.ts +++ /dev/null @@ -1,601 +0,0 @@ -import { logger } from "./logger.service"; -import { SearchService } from "./search.server"; -import { makeModelCall } from "~/lib/model.server"; - -/** - * Request interface for deep search - */ -export interface DeepSearchRequest { - content: string; - intentOverride?: string; - metadata?: { - source?: "chrome" | "obsidian" | "mcp"; - url?: string; - pageTitle?: string; - }; -} - -/** - * Content analysis result from Phase 1 - */ -interface ContentAnalysis { - intent: string; - reasoning: string; - entities: string[]; - temporal: string[]; - actions: string[]; - topics: string[]; - priority: string[]; -} - -/** - * Agent decision from Phase 3 - */ -interface AgentDecision { - shouldContinue: boolean; - confidence: number; - reasoning: string; - followUpQueries: string[]; -} - -/** - * Response interface for deep search - */ -export interface DeepSearchResponse { - synthesis: string; - episodes: Array<{ - content: string; - createdAt: Date; - spaceIds: string[]; - }>; -} - -/** - * Deep Search Service - * - * Implements a 4-phase intelligent document search pipeline: - * 1. Content Analysis - Infer intent and decompose content - * 2. Parallel Broad Search - Fire multiple queries simultaneously - * 3. Agent Deep Dive - Evaluate and follow up on promising leads - * 4. Synthesis - Generate intent-aware context summary - */ -export class DeepSearchService { - constructor(private searchService: SearchService) {} - - /** - * Main entry point for deep search - */ - async deepSearch( - request: DeepSearchRequest, - userId: string - ): Promise { - const startTime = Date.now(); - const { content, intentOverride, metadata } = request; - - logger.info("Deep search started", { userId, contentLength: content.length }); - - try { - // Phase 1: Analyze content and infer intent - const analysis = intentOverride - ? await this.createAnalysisFromOverride(content, intentOverride) - : await this.analyzeContent(content, this.getIntentHints(metadata)); - - logger.info("Phase 1 complete", { intent: analysis.intent }); - - // Extract spaceIds from metadata if available - const spaceIds: string[] = []; - - // Phase 2: Parallel broad search - const { episodes: broadEpisodes } = await this.performBroadSearch( - analysis, - userId, - spaceIds - ); - - logger.info("Phase 2 complete", { episodesCount: broadEpisodes.length }); - - // Phase 3: Agent-driven deep dive (using episodes for richer context) - const { episodes: deepDiveEpisodes } = await this.performDeepDive( - content, - analysis, - broadEpisodes, - userId, - spaceIds - ); - - logger.info("Phase 3 complete", { - deepDiveEpisodes: deepDiveEpisodes.length, - }); - - // Combine and deduplicate episodes - const allEpisodes = [...broadEpisodes, ...deepDiveEpisodes]; - const episodeMap = new Map(); - allEpisodes.forEach((ep) => { - const key = `${ep.content}-${new Date(ep.createdAt).toISOString()}`; - if (!episodeMap.has(key)) { - episodeMap.set(key, ep); - } - }); - const episodes = Array.from(episodeMap.values()); - - // Phase 4: Synthesize results using episodes (richer context than facts) - const synthesis = await this.synthesizeResults( - content, - analysis, - episodes - ); - - logger.info("Phase 4 complete", { - duration: Date.now() - startTime, - totalEpisodes: episodes.length, - }); - - return { - synthesis, - episodes, - }; - } catch (error) { - logger.error("Deep search error", { error }); - throw error; - } - } - - /** - * Phase 1: Analyze content and infer intent - */ - private async analyzeContent( - content: string, - contextHints: string - ): Promise { - const prompt = ` -Analyze this content holistically and determine the user's intent. - -CONTENT: -${content} -${contextHints} - -YOUR TASK: -1. INFER INTENT: What is the user trying to do with this content? - Examples: reading email, writing blog post, preparing for meeting, - researching topic, tracking tasks, reviewing changes, etc. - Be specific and descriptive. - -2. EXTRACT KEY ELEMENTS: - - Entities: People, places, organizations, objects (e.g., "John Doe", "Project Phoenix") - - Temporal: Dates, times, recurring events (e.g., "Wednesday standup", "last month") - - Actions: Verbs, action items, tasks (e.g., "follow up", "review", "fix bug") - - Topics: Themes, subjects, domains (e.g., "car maintenance", "API design") - -3. PRIORITIZE: Which elements are most important to search first? - Return array like ["entities", "temporal", "topics"] ordered by importance. - -RESPONSE FORMAT (JSON): -{ - "intent": "specific intent description", - "reasoning": "why this intent was inferred", - "entities": ["entity1", "entity2"], - "temporal": ["temporal1", "temporal2"], - "actions": ["action1", "action2"], - "topics": ["topic1", "topic2"], - "priority": ["entities", "temporal", "topics"] -} -`; - -let responseText = ""; - await makeModelCall( - false, - [{ role: "user", content: prompt }], - (text) => { - responseText = text; - }, - {}, - "high" - ); - - return JSON.parse(responseText); - } - - /** - * Create analysis from explicit intent override - */ - private async createAnalysisFromOverride( - content: string, - intentOverride: string - ): Promise { - const prompt = ` -The user has specified their intent as: "${intentOverride}" - -CONTENT: -${content} - -YOUR TASK: -Extract key elements from this content: -- Entities: People, places, organizations, objects -- Temporal: Dates, times, recurring events -- Actions: Verbs, action items, tasks -- Topics: Themes, subjects, domains - -Prioritize elements based on the specified intent. - -RESPONSE FORMAT (JSON): -{ - "intent": "${intentOverride}", - "reasoning": "user-specified intent", - "entities": ["entity1", "entity2"], - "temporal": ["temporal1", "temporal2"], - "actions": ["action1", "action2"], - "topics": ["topic1", "topic2"], - "priority": ["entities", "temporal", "topics"] -} -`; - -let responseText = ""; - await makeModelCall( - false, - [{ role: "user", content: prompt }], - (text) => { - responseText = text; - }, - {}, - "high" - ); - - return JSON.parse(responseText); - } - - /** - * Phase 2: Perform parallel broad search - */ - private async performBroadSearch( - analysis: ContentAnalysis, - userId: string, - spaceIds: string[] - ): Promise<{ facts: any[]; episodes: any[] }> { - // Build query list based on priority - const queries: string[] = []; - - // Add queries based on priority order - for (const category of analysis.priority) { - switch (category) { - case "entities": - queries.push(...analysis.entities.slice(0, 3)); - break; - case "temporal": - queries.push(...analysis.temporal.slice(0, 2)); - break; - case "topics": - queries.push(...analysis.topics.slice(0, 2)); - break; - case "actions": - queries.push(...analysis.actions.slice(0, 2)); - break; - } - } - - // Ensure we have at least some queries - if (queries.length === 0) { - queries.push( - ...analysis.entities.slice(0, 2), - ...analysis.topics.slice(0, 2) - ); - } - - // Cap at 10 queries max - const finalQueries = queries.slice(0, 10); - - logger.info(`Broad search: ${finalQueries.length} parallel queries`); - - // Fire all searches in parallel - const results = await Promise.all( - finalQueries.map((query) => - this.searchService.search(query, userId, { - limit: 20, - spaceIds, - }) - ) - ); - - // Flatten and deduplicate facts - const allFacts = results.flatMap((r) => r.facts); - const uniqueFacts = Array.from( - new Map(allFacts.map((f) => [f.fact, f])).values() - ); - - // Flatten and deduplicate episodes - const allEpisodes = results.flatMap((r) => r.episodes); - const uniqueEpisodes = Array.from( - new Map(allEpisodes.map((e) => [`${e.content}-${e.createdAt}`, e])).values() - ); - - return { facts: uniqueFacts, episodes: uniqueEpisodes }; - } - - /** - * Phase 3: Perform agent-driven deep dive using episodes - */ - private async performDeepDive( - content: string, - analysis: ContentAnalysis, - broadEpisodes: any[], - userId: string, - spaceIds: string[] - ): Promise<{ facts: any[]; episodes: any[] }> { - // Check if we have any results worth evaluating - if (broadEpisodes.length === 0) { - logger.info("No episodes from broad search, skipping deep dive"); - return { facts: [], episodes: [] }; - } - - // Agent decides on follow-up based on episodes - const decision = await this.decideFollowUp( - content, - analysis, - broadEpisodes - ); - - if (!decision.shouldContinue) { - logger.info(`Agent stopped: ${decision.reasoning}`); - return { facts: [], episodes: [] }; - } - - logger.info( - `Agent continuing with ${decision.followUpQueries.length} follow-up queries` - ); - - // Execute follow-up queries sequentially - const deepDiveFacts = []; - const deepDiveEpisodes = []; - - for (const query of decision.followUpQueries) { - const result = await this.searchService.search(query, userId, { - limit: 20, - spaceIds, - }); - - deepDiveFacts.push(...result.facts); - deepDiveEpisodes.push(...result.episodes); - - // Stop if we've gathered enough episodes - if (deepDiveEpisodes.length > 20) { - logger.info("Sufficient context gathered, stopping early"); - break; - } - } - - return { facts: deepDiveFacts, episodes: deepDiveEpisodes }; - } - - /** - * Agent decides on follow-up queries based on episodes - */ - private async decideFollowUp( - content: string, - analysis: ContentAnalysis, - episodes: any[] - ): Promise { - const prompt = ` -You are analyzing memory search results to decide if deeper investigation is needed. - -ORIGINAL CONTENT: -${content} - -INFERRED INTENT: ${analysis.intent} - -FOUND MEMORIES (${episodes.length} episodes): -${episodes - .map((ep, i) => { - const date = new Date(ep.createdAt).toISOString().split("T")[0]; - const preview = ep.content; - return ` ---- Memory ${i + 1} (${date}) --- -${preview} -`; - }) - .join("\n")} - -YOUR TASK: -1. EVALUATE MEMORY RELEVANCE: - - Are these memories directly relevant to the original content? - - Do they provide sufficient context for the intent "${analysis.intent}"? - - What key information or connections are missing? - - Are there entities, topics, or concepts mentioned that warrant deeper exploration? - -2. DECIDE ON FOLLOW-UP: - - If memories are highly relevant and complete: STOP, no follow-up needed - - If memories are relevant but incomplete: Continue with 1-2 clarifying queries - - If memories reveal new entities/topics worth exploring: Continue with 2-3 follow-up queries - - If memories are sparse or off-topic: STOP, unlikely to find better results - -3. GENERATE FOLLOW-UP QUERIES (if continuing): - - Extract new entities, topics, or connections mentioned in the memories - - Formulate specific, targeted queries based on what's missing - - Focus on enriching context for the "${analysis.intent}" intent - - Maximum 3 queries - -RESPONSE FORMAT (JSON): -{ - "shouldContinue": true/false, - "confidence": 0.0-1.0, - "reasoning": "explanation of decision based on memory analysis", - "followUpQueries": ["query1", "query2"] -} -`; - - let responseText = ""; - await makeModelCall( - false, - [{ role: "user", content: prompt }], - (text) => { - responseText = text; - }, - {}, - "high" - ); - - return JSON.parse(responseText); - } - - /** - * Phase 4: Synthesize results based on intent using episodes - */ - private async synthesizeResults( - content: string, - analysis: ContentAnalysis, - episodes: any[] - ): Promise { - if (episodes.length === 0) { - return "No relevant context found in memory."; - } - - const prompt = ` -You are synthesizing relevant context from the user's memory to help an AI assistant respond more effectively. - -CURRENT CONTENT: -${content} - -USER INTENT: ${analysis.intent} - -RELEVANT MEMORY CONTEXT (${episodes.length} past conversations): -${episodes - .map((ep, i) => { - const date = new Date(ep.createdAt).toISOString().split("T")[0]; - const preview = ep.content; - return ` -[${date}] -${preview} -`; - }) - .join("\n\n")} - -SYNTHESIS OBJECTIVE: -${this.getIntentGuidance(analysis.intent)} - -OUTPUT REQUIREMENTS: -- Provide clear, actionable context from the memories -- Start directly with relevant information, no meta-commentary -- Present facts, decisions, preferences, and patterns from past conversations -- Connect past context to current content when relevant -- Note any gaps, contradictions, or evolution in thinking -- Keep it factual and concise - this will be used by an AI assistant -- Do not use conversational language like "you said" or "you mentioned" -- Present information in third person or as direct facts - -Good examples: -- "Previous discussions on X covered Y and Z. Key decision: ..." -- "From March 2024 conversation: [specific context]" -- "Related work on [project] established that..." -- "Past preferences indicate..." -- "Timeline: [sequence of events/decisions]" -`; - - let synthesis = ""; - await makeModelCall( - false, - [{ role: "user", content: prompt }], - (text) => { - synthesis = text; - }, - {}, - "high" - ); - - return synthesis; - } - - /** - * Get synthesis guidance based on intent keywords - */ - private getIntentGuidance(intent: string): string { - const intentLower = intent.toLowerCase(); - - if ( - intentLower.includes("read") || - intentLower.includes("understand") || - intentLower.includes("email") - ) { - return "Focus on: Who/what is this about? What context should the reader know? Provide recognition and background."; - } - - if ( - intentLower.includes("writ") || - intentLower.includes("draft") || - intentLower.includes("blog") || - intentLower.includes("post") - ) { - return "Focus on: What has been said before on this topic? What's consistent with past statements? What gaps or contradictions exist?"; - } - - if ( - intentLower.includes("meeting") || - intentLower.includes("prep") || - intentLower.includes("standup") || - intentLower.includes("agenda") - ) { - return "Focus on: Key discussion topics, recent relevant context, pending action items, what needs to be addressed."; - } - - if ( - intentLower.includes("research") || - intentLower.includes("explore") || - intentLower.includes("learn") - ) { - return "Focus on: Patterns across memories, connections between topics, insights and evolution over time."; - } - - if ( - intentLower.includes("follow") || - intentLower.includes("task") || - intentLower.includes("todo") || - intentLower.includes("action") - ) { - return "Focus on: Action items, pending tasks, decisions made, what needs follow-up, deadlines."; - } - - if ( - intentLower.includes("review") || - intentLower.includes("change") || - intentLower.includes("update") || - intentLower.includes("diff") - ) { - return "Focus on: What has changed, what's new information, how things have evolved, timeline of updates."; - } - - // Default - return "Focus on: Most relevant context and key insights that would be valuable for understanding this content."; - } - - /** - * Generate context hints from metadata - */ - private getIntentHints( - metadata?: DeepSearchRequest["metadata"] - ): string { - if (!metadata) return ""; - - const hints: string[] = []; - - // Chrome extension context - if (metadata.source === "chrome") { - if (metadata.url?.includes("mail.google.com")) { - hints.push("Content is from email client (likely reading)"); - } - if (metadata.url?.includes("calendar.google.com")) { - hints.push("Content is from calendar (likely meeting_prep)"); - } - if (metadata.url?.includes("docs.google.com")) { - hints.push("Content is from document editor (likely writing)"); - } - } - - // Obsidian context - if (metadata.source === "obsidian") { - hints.push( - "Content is from note editor (could be writing or research)" - ); - } - - return hints.length > 0 - ? `\n\nCONTEXT HINTS:\n${hints.join("\n")}` - : ""; - } -} diff --git a/apps/webapp/app/services/search.server.ts b/apps/webapp/app/services/search.server.ts index 6832e6f..1945504 100644 --- a/apps/webapp/app/services/search.server.ts +++ b/apps/webapp/app/services/search.server.ts @@ -80,7 +80,7 @@ export class SearchService { opts, ); - // // 3. Apply adaptive filtering based on score threshold and minimum count + // 3. Apply adaptive filtering based on score threshold and minimum count const filteredResults = this.applyAdaptiveFiltering(rankedStatements, opts); // 3. Return top results diff --git a/apps/webapp/app/trigger/chat/stream-utils.ts b/apps/webapp/app/trigger/chat/stream-utils.ts index 1bba693..56ed59b 100644 --- a/apps/webapp/app/trigger/chat/stream-utils.ts +++ b/apps/webapp/app/trigger/chat/stream-utils.ts @@ -59,9 +59,37 @@ export async function* processTag( const hasStartTag = chunk.includes(startTag); const hasClosingTag = chunk.includes(" { + let messages = [...initialMessages]; + let completed = false; + let guardLoop = 0; + let searchCount = 0; + let totalEpisodesFound = 0; + const seenEpisodeIds = new Set(); // Track unique episodes + const totalCost: TotalCost = { + inputTokens: 0, + outputTokens: 0, + cost: 0, + }; + + const tools = { + searchMemory: searchTool, + }; + + logger.info("Starting deep search ReAct loop"); + + try { + while (!completed && guardLoop < 50) { + logger.info(`ReAct loop iteration ${guardLoop}, searches: ${searchCount}`); + + // Call LLM with current message history + const response = generate(messages, false, (event)=>{const usage = event.usage; + totalCost.inputTokens += usage.promptTokens; + totalCost.outputTokens += usage.completionTokens; + }, tools); + + let totalMessage = ""; + const toolCalls: any[] = []; + + // States for streaming final_response tags + const messageState = { + inTag: false, + message: "", + messageEnded: false, + lastSent: "", + }; + + // Process streaming response + for await (const chunk of response) { + if (typeof chunk === "object" && chunk.type === "tool-call") { + // Agent made a tool call + toolCalls.push(chunk); + logger.info(`Tool call: ${chunk.toolName}`); + } else if (typeof chunk === "string") { + totalMessage += chunk; + + // Stream final_response tags using processTag + if (!messageState.messageEnded) { + yield* processTag( + messageState, + totalMessage, + chunk, + "", + "", + { + start: AgentMessageType.MESSAGE_START, + chunk: AgentMessageType.MESSAGE_CHUNK, + end: AgentMessageType.MESSAGE_END, + } + ); + } + } + } + + // Check for final response + if (totalMessage.includes("")) { + const match = totalMessage.match( + /(.*?)<\/final_response>/s + ); + + if (match) { + // Accept synthesis - completed + completed = true; + logger.info( + `Final synthesis accepted after ${searchCount} searches, ${totalEpisodesFound} unique episodes found` + ); + break; + } + } + + // Execute tool calls sequentially + if (toolCalls.length > 0) { + for (const toolCall of toolCalls) { + // Add assistant message with tool call + messages.push({ + role: "assistant", + content: [ + { + type: "tool-call", + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args: toolCall.args, + }, + ], + }); + + // Execute the search tool + logger.info(`Executing search: ${JSON.stringify(toolCall.args)}`); + + // Notify about search starting + yield Message("", AgentMessageType.SKILL_START); + yield Message( + `\nSearching memory: "${toolCall.args.query}"...\n`, + AgentMessageType.SKILL_CHUNK + ); + yield Message("", AgentMessageType.SKILL_END); + + const result = await searchTool.execute(toolCall.args); + searchCount++; + + // Deduplicate episodes - track unique IDs + let uniqueNewEpisodes = 0; + if (result.episodes && Array.isArray(result.episodes)) { + for (const episode of result.episodes) { + const episodeId = + episode.id || episode._id || JSON.stringify(episode); + if (!seenEpisodeIds.has(episodeId)) { + seenEpisodeIds.add(episodeId); + uniqueNewEpisodes++; + } + } + } + + const episodesInThisSearch = result.episodes?.length || 0; + totalEpisodesFound = seenEpisodeIds.size; // Use unique count + + // Add tool result to message history + messages.push({ + role: "tool", + content: [ + { + type: "tool-result", + toolName: toolCall.toolName, + toolCallId: toolCall.toolCallId, + result: result, + }, + ], + }); + + logger.info( + `Search ${searchCount} completed: ${episodesInThisSearch} episodes (${uniqueNewEpisodes} new, ${totalEpisodesFound} unique total)` + ); + } + + // If found no episodes and haven't exhausted search attempts, require more searches + if (totalEpisodesFound === 0 && searchCount < 7) { + logger.info( + `Agent attempted synthesis with 0 unique episodes after ${searchCount} searches - requiring more attempts` + ); + + yield Message("", AgentMessageType.SKILL_START); + yield Message( + `No relevant context found yet - trying different search angles...`, + AgentMessageType.SKILL_CHUNK + ); + yield Message("", AgentMessageType.SKILL_END); + + messages.push({ + role: "system", + content: `You have performed ${searchCount} searches but found 0 unique relevant episodes. Your queries may be too abstract or not matching the user's actual conversation topics. + +Review your DECOMPOSITION: +- Are you using specific terms from the content? +- Try searching broader related topics the user might have discussed +- Try different terminology or related concepts +- Search for user's projects, work areas, or interests + +Continue with different search strategies (you can search up to 7-10 times total).`, + }); + + guardLoop++; + continue; + } + + // Soft nudging after all searches executed (awareness, not commands) + if (totalEpisodesFound >= 30 && searchCount >= 3) { + logger.info( + `Nudging: ${totalEpisodesFound} unique episodes found - suggesting synthesis consideration` + ); + + messages.push({ + role: "system", + content: `Context awareness: You have found ${totalEpisodesFound} unique episodes across ${searchCount} searches. This represents substantial context. Consider whether you have sufficient information for quality synthesis, or if additional search angles would meaningfully improve understanding.`, + }); + } else if (totalEpisodesFound >= 15 && searchCount >= 5) { + logger.info( + `Nudging: ${totalEpisodesFound} unique episodes after ${searchCount} searches - suggesting evaluation` + ); + + messages.push({ + role: "system", + content: `Progress update: You have ${totalEpisodesFound} unique episodes from ${searchCount} searches. Evaluate whether you have covered the main angles from your decomposition, or if important aspects remain unexplored.`, + }); + } else if (searchCount >= 7) { + logger.info( + `Nudging: ${searchCount} searches completed with ${totalEpisodesFound} unique episodes` + ); + + messages.push({ + role: "system", + content: `Search depth: You have performed ${searchCount} searches and found ${totalEpisodesFound} unique episodes. Consider whether additional searches would yield meaningfully different context, or if it's time to synthesize what you've discovered.`, + }); + } if (searchCount >= 10) { + logger.info( + `Reached maximum search limit (10), forcing synthesis with ${totalEpisodesFound} unique episodes` + ); + + yield Message("", AgentMessageType.SKILL_START); + yield Message( + `Maximum searches reached - synthesizing results...`, + AgentMessageType.SKILL_CHUNK + ); + yield Message("", AgentMessageType.SKILL_END); + + messages.push({ + role: "system", + content: `You have performed 10 searches and found ${totalEpisodesFound} unique episodes. This is the maximum allowed. You MUST now provide your final synthesis wrapped in tags based on what you've found.`, + }); + } + } + + // Safety check - if no tool calls and no final response, something went wrong + if (toolCalls.length === 0 && !totalMessage.includes("")) { + logger.warn("Agent produced neither tool calls nor final response"); + + messages.push({ + role: "system", + content: + "You must either use the searchMemory tool to search for more context, or provide your final synthesis wrapped in tags.", + }); + } + + guardLoop++; + } + + if (!completed) { + logger.warn(`Loop ended without completion after ${guardLoop} iterations`); + yield Message("", AgentMessageType.MESSAGE_START); + yield Message( + "Deep search did not complete - maximum iterations reached.", + AgentMessageType.MESSAGE_CHUNK + ); + yield Message("", AgentMessageType.MESSAGE_END); + } + + yield Message("Stream ended", AgentMessageType.STREAM_END); + } catch (error) { + logger.error(`Deep search error: ${error}`); + yield Message((error as Error).message, AgentMessageType.ERROR); + yield Message("Stream ended", AgentMessageType.STREAM_END); + } +} diff --git a/apps/webapp/app/trigger/deep-search/index.ts b/apps/webapp/app/trigger/deep-search/index.ts new file mode 100644 index 0000000..8924c29 --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/index.ts @@ -0,0 +1,117 @@ +import { metadata, task } from "@trigger.dev/sdk"; +import { type CoreMessage } from "ai"; +import { logger } from "@trigger.dev/sdk/v3"; +import { nanoid } from "nanoid"; +import { + deletePersonalAccessToken, + getOrCreatePersonalAccessToken, +} from "../utils/utils"; +import { getReActPrompt } from "./prompt"; +import { type DeepSearchPayload, type DeepSearchResponse } from "./types"; +import { createSearchMemoryTool } from "./utils"; +import { run } from "./deep-search-utils"; +import { AgentMessageType } from "../chat/types"; + +export const deepSearch = task({ + id: "deep-search", + maxDuration: 3000, + run: async (payload: DeepSearchPayload): Promise => { + const { + content, + userId, + stream, + metadata: meta, + intentOverride, + } = payload; + + const randomKeyName = `deepSearch_${nanoid(10)}`; + + // Get or create token for search API calls + const pat = await getOrCreatePersonalAccessToken({ + name: randomKeyName, + userId: userId as string, + }); + + if (!pat?.token) { + throw new Error("Failed to create personal access token"); + } + + try { + // Create search tool that agent will use + const searchTool = createSearchMemoryTool(pat.token); + + // Build initial messages with ReAct prompt + const initialMessages: CoreMessage[] = [ + { + role: "system", + content: getReActPrompt(meta, intentOverride), + }, + { + role: "user", + content: `CONTENT TO ANALYZE:\n${content}\n\nPlease search my memory for relevant context and synthesize what you find.`, + }, + ]; + + // Run the ReAct loop generator + const llmResponse = run(initialMessages, searchTool); + + if (stream) { + // Streaming mode: stream via metadata.stream like chat.ts does + // This makes all message types available to clients in real-time + const messageStream = await metadata.stream("messages", llmResponse); + + let synthesis = ""; + + for await (const step of messageStream) { + // MESSAGE_CHUNK: Final synthesis - accumulate and stream + if (step.type === AgentMessageType.MESSAGE_CHUNK) { + synthesis += step.message; + } + + // STREAM_END: Loop completed + if (step.type === AgentMessageType.STREAM_END) { + break; + } + } + + await deletePersonalAccessToken(pat?.id); + + // Clean up any remaining tags + synthesis = synthesis + .replace(//gi, "") + .replace(/<\/final_response>/gi, "") + .trim(); + + return { synthesis }; + } else { + // Non-streaming mode: consume generator without streaming + let synthesis = ""; + + for await (const step of llmResponse) { + if (step.type === AgentMessageType.MESSAGE_CHUNK) { + synthesis += step.message; + } + // Could also collect episodes from tool results if needed + } + + await deletePersonalAccessToken(pat?.id); + + // Clean up any remaining tags + synthesis = synthesis + .replace(//gi, "") + .replace(/<\/final_response>/gi, "") + .trim(); + + // For non-streaming, we need to get episodes from search results + // Since we don't have direct access to search results in this flow, + // we'll return synthesis without episodes for now + // (episodes can be extracted from tool results if needed) + return { synthesis }; + } + } catch (error) { + await deletePersonalAccessToken(pat?.id); + logger.error(`Deep search error: ${error}`); + throw error; + } + }, +}); diff --git a/apps/webapp/app/trigger/deep-search/prompt.ts b/apps/webapp/app/trigger/deep-search/prompt.ts new file mode 100644 index 0000000..fa56b44 --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/prompt.ts @@ -0,0 +1,148 @@ +export function getReActPrompt( + metadata?: { source?: string; url?: string; pageTitle?: string }, + intentOverride?: string +): string { + const contextHints = []; + + if (metadata?.source === "chrome" && metadata?.url?.includes("mail.google.com")) { + contextHints.push("Content is from email - likely reading intent"); + } + if (metadata?.source === "chrome" && metadata?.url?.includes("calendar.google.com")) { + contextHints.push("Content is from calendar - likely meeting prep intent"); + } + if (metadata?.source === "chrome" && metadata?.url?.includes("docs.google.com")) { + contextHints.push("Content is from document editor - likely writing intent"); + } + if (metadata?.source === "obsidian") { + contextHints.push("Content is from note editor - likely writing or research intent"); + } + + return `You are a memory research agent analyzing content to find relevant context. + +YOUR PROCESS (ReAct Framework): + +1. DECOMPOSE: First, break down the content into structured categories + + Analyze the content and extract: + a) ENTITIES: Specific people, project names, tools, products mentioned + Example: "John Smith", "Phoenix API", "Redis", "mobile app" + + b) TOPICS & CONCEPTS: Key subjects, themes, domains + Example: "authentication", "database design", "performance optimization" + + c) TEMPORAL MARKERS: Time references, deadlines, events + Example: "last week's meeting", "Q2 launch", "yesterday's discussion" + + d) ACTIONS & TASKS: What's being done, decided, or requested + Example: "implement feature", "review code", "make decision on" + + e) USER INTENT: What is the user trying to accomplish? + ${intentOverride ? `User specified: "${intentOverride}"` : "Infer from context: reading/writing/meeting prep/research/task tracking/review"} + +2. FORM QUERIES: Create targeted search queries from your decomposition + + Based on decomposition, form specific queries: + - Search for each entity by name (people, projects, tools) + - Search for topics the user has discussed before + - Search for related work or conversations in this domain + - Use the user's actual terminology, not generic concepts + + EXAMPLE - Content: "Email from Sarah about the API redesign we discussed last week" + Decomposition: + - Entities: "Sarah", "API redesign" + - Topics: "API design", "redesign" + - Temporal: "last week" + - Actions: "discussed", "email communication" + - Intent: Reading (email) / meeting prep + + Queries to form: + ✅ "Sarah" (find past conversations with Sarah) + ✅ "API redesign" or "API design" (find project discussions) + ✅ "last week" + "Sarah" (find recent context) + ✅ "meetings" or "discussions" (find related conversations) + + ❌ Avoid: "email communication patterns", "API architecture philosophy" + (These are abstract - search what user actually discussed!) + +3. SEARCH: Execute your queries using searchMemory tool + - Start with 2-3 core searches based on main entities/topics + - Make each search specific and targeted + - Use actual terms from the content, not rephrased concepts + +4. OBSERVE: Evaluate search results + - Did you find relevant episodes? How many unique ones? + - What specific context emerged? + - What new entities/topics appeared in results? + - Are there gaps in understanding? + - Should you search more angles? + + Note: Episode counts are automatically deduplicated across searches - overlapping episodes are only counted once. + +5. REACT: Decide next action based on observations + + STOPPING CRITERIA - Proceed to SYNTHESIZE if ANY of these are true: + - You found 20+ unique episodes across your searches → ENOUGH CONTEXT + - You performed 5+ searches and found relevant episodes → SUFFICIENT + - You performed 7+ searches regardless of results → EXHAUSTED STRATEGIES + - You found strong relevant context from multiple angles → COMPLETE + + System nudges will provide awareness of your progress, but you decide when synthesis quality would be optimal. + + If you found little/no context AND searched less than 7 times: + - Try different query angles from your decomposition + - Search broader related topics + - Search user's projects or work areas + - Try alternative terminology + + ⚠️ DO NOT search endlessly - if you found relevant episodes, STOP and synthesize! + +6. SYNTHESIZE: After gathering sufficient context, provide final answer + - Wrap your synthesis in tags + - Present direct factual context from memory - no meta-commentary + - Write as if providing background context to an AI assistant + - Include: facts, decisions, preferences, patterns, timelines + - Note any gaps, contradictions, or evolution in thinking + - Keep it concise and actionable + - DO NOT use phrases like "Previous discussions on", "From conversations", "Past preferences indicate" + - DO NOT use conversational language like "you said" or "you mentioned" + - Present information as direct factual statements + +FINAL RESPONSE FORMAT: + +[Direct synthesized context - factual statements only] + +Good examples: +- "The API redesign focuses on performance and scalability. Key decisions: moving to GraphQL, caching layer with Redis." +- "Project Phoenix launches Q2 2024. Main features: real-time sync, offline mode, collaborative editing." +- "Sarah leads the backend team. Recent work includes authentication refactor and database migration." + +Bad examples: +❌ "Previous discussions on the API revealed..." +❌ "From past conversations, it appears that..." +❌ "Past preferences indicate..." +❌ "The user mentioned that..." + +Just state the facts directly. + + +${contextHints.length > 0 ? `\nCONTEXT HINTS:\n${contextHints.join("\n")}` : ""} + +CRITICAL REQUIREMENTS: +- ALWAYS start with DECOMPOSE step - extract entities, topics, temporal markers, actions +- Form specific queries from your decomposition - use user's actual terms +- Minimum 3 searches required +- Maximum 10 searches allowed - must synthesize after that +- STOP and synthesize when you hit stopping criteria (20+ episodes, 5+ searches with results, 7+ searches total) +- Each search should target different aspects from decomposition +- Present synthesis directly without meta-commentary + +SEARCH QUALITY CHECKLIST: +✅ Queries use specific terms from content (names, projects, exact phrases) +✅ Searched multiple angles from decomposition (entities, topics, related areas) +✅ Stop when you have enough unique context - don't search endlessly +✅ Tried alternative terminology if initial searches found nothing +❌ Avoid generic/abstract queries that don't match user's vocabulary +❌ Don't stop at 3 searches if you found zero unique episodes +❌ Don't keep searching when you already found 20+ unique episodes +}` +} diff --git a/apps/webapp/app/trigger/deep-search/types.ts b/apps/webapp/app/trigger/deep-search/types.ts new file mode 100644 index 0000000..b54dcec --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/types.ts @@ -0,0 +1,20 @@ +export interface DeepSearchPayload { + content: string; + userId: string; + stream: boolean; + intentOverride?: string; + metadata?: { + source?: "chrome" | "obsidian" | "mcp"; + url?: string; + pageTitle?: string; + }; +} + +export interface DeepSearchResponse { + synthesis: string; + episodes?: Array<{ + content: string; + createdAt: Date; + spaceIds: string[]; + }>; +} diff --git a/apps/webapp/app/trigger/deep-search/utils.ts b/apps/webapp/app/trigger/deep-search/utils.ts new file mode 100644 index 0000000..f2a0cbd --- /dev/null +++ b/apps/webapp/app/trigger/deep-search/utils.ts @@ -0,0 +1,66 @@ +import { tool } from "ai"; +import { z } from "zod"; +import axios from "axios"; +import { logger } from "@trigger.dev/sdk/v3"; + +export function createSearchMemoryTool(token: string) { + return tool({ + description: + "Search the user's memory for relevant facts and episodes. Use this tool multiple times with different queries to gather comprehensive context.", + parameters: z.object({ + query: z + .string() + .describe( + "Search query to find relevant information. Be specific: entity names, topics, concepts." + ), + }), + execute: async ({ query }) => { + try { + const response = await axios.post( + `${process.env.API_BASE_URL || "https://core.heysol.ai"}/api/v1/search`, + { query }, + { + headers: { + Authorization: `Bearer ${token}`, + }, + } + ); + + const searchResult = response.data; + + return { + facts: searchResult.facts || [], + episodes: searchResult.episodes || [], + summary: `Found ${searchResult.episodes?.length || 0} relevant memories`, + }; + } catch (error) { + logger.error(`SearchMemory tool error: ${error}`); + return { + facts: [], + episodes: [], + summary: "No results found", + }; + } + }, + }); +} + +// Helper to extract unique episodes from tool calls +export function extractEpisodesFromToolCalls(toolCalls: any[]): any[] { + const episodes: any[] = []; + + for (const call of toolCalls || []) { + if (call.toolName === "searchMemory" && call.result?.episodes) { + episodes.push(...call.result.episodes); + } + } + + // Deduplicate by content + createdAt + const uniqueEpisodes = Array.from( + new Map( + episodes.map((e) => [`${e.content}-${e.createdAt}`, e]) + ).values() + ); + + return uniqueEpisodes.slice(0, 10); +} diff --git a/apps/webapp/app/utils/mcp/memory.ts b/apps/webapp/app/utils/mcp/memory.ts index 3e65887..77c5735 100644 --- a/apps/webapp/app/utils/mcp/memory.ts +++ b/apps/webapp/app/utils/mcp/memory.ts @@ -3,7 +3,7 @@ import { addToQueue } from "~/lib/ingest.server"; import { logger } from "~/services/logger.service"; import { SearchService } from "~/services/search.server"; import { SpaceService } from "~/services/space.server"; -import { DeepSearchService } from "~/services/deepSearch.server"; +import { deepSearch } from "~/trigger/deep-search"; import { IntegrationLoader } from "./integration-loader"; const searchService = new SearchService(); @@ -580,26 +580,37 @@ async function handleMemoryDeepSearch(args: any) { throw new Error("content is required"); } - const deepSearchService = new DeepSearchService(searchService); - - const result = await deepSearchService.deepSearch( - { - content, - intentOverride, - metadata: { source }, - }, + // Trigger non-streaming deep search task + const handle = await deepSearch.triggerAndWait({ + content, userId, - ); + stream: false, // MCP doesn't need streaming + intentOverride, + metadata: { source }, + }); - return { - content: [ - { - type: "text", - text: JSON.stringify(result), - }, - ], - isError: false, - }; + // Wait for task completion + if (handle.ok) { + return { + content: [ + { + type: "text", + text: JSON.stringify(handle.output), + }, + ], + isError: false, + }; + } else { + return { + content: [ + { + type: "text", + text: `Error performing deep search: ${handle.error instanceof Error ? handle.error.message : String(handle.error)}`, + }, + ], + isError: true, + }; + } } catch (error) { logger.error(`MCP deep search error: ${error}`);