fix: deep search

This commit is contained in:
Harshith Mullapudi 2025-10-15 23:39:42 +05:30
parent 949b534383
commit 5e05f1f56b
6 changed files with 87 additions and 108 deletions

View File

@ -29,7 +29,7 @@ const { action, loader } = createActionApiRoute(
}, },
async ({ body, authentication }) => { async ({ body, authentication }) => {
let trigger; let trigger;
if (body.stream) { if (!body.stream) {
trigger = await deepSearch.trigger({ trigger = await deepSearch.trigger({
content: body.content, content: body.content,
userId: authentication.userId, userId: authentication.userId,
@ -58,7 +58,7 @@ const { action, loader } = createActionApiRoute(
return json({ error: "Run failed" }); return json({ error: "Run failed" });
} }
} },
); );
export { action, loader }; export { action, loader };

View File

@ -3,7 +3,7 @@ import { logger } from "@trigger.dev/sdk/v3";
import { generate } from "./stream-utils"; import { generate } from "./stream-utils";
import { processTag } from "../chat/stream-utils"; import { processTag } from "../chat/stream-utils";
import { type AgentMessage, AgentMessageType, Message } from "../chat/types"; import { type AgentMessage, AgentMessageType, Message } from "../chat/types";
import { TotalCost } from "../utils/types"; import { type TotalCost } from "../utils/types";
/** /**
* Run the deep search ReAct loop * Run the deep search ReAct loop
@ -12,7 +12,7 @@ import { TotalCost } from "../utils/types";
*/ */
export async function* run( export async function* run(
initialMessages: CoreMessage[], initialMessages: CoreMessage[],
searchTool: any searchTool: any,
): AsyncGenerator<AgentMessage, any, any> { ): AsyncGenerator<AgentMessage, any, any> {
let messages = [...initialMessages]; let messages = [...initialMessages];
let completed = false; let completed = false;
@ -34,13 +34,20 @@ export async function* run(
try { try {
while (!completed && guardLoop < 50) { while (!completed && guardLoop < 50) {
logger.info(`ReAct loop iteration ${guardLoop}, searches: ${searchCount}`); logger.info(
`ReAct loop iteration ${guardLoop}, searches: ${searchCount}`,
);
// Call LLM with current message history // Call LLM with current message history
const response = generate(messages, (event)=>{const usage = event.usage; const response = generate(
totalCost.inputTokens += usage.promptTokens; messages,
totalCost.outputTokens += usage.completionTokens; (event) => {
}, tools); const usage = event.usage;
totalCost.inputTokens += usage.promptTokens;
totalCost.outputTokens += usage.completionTokens;
},
tools,
);
let totalMessage = ""; let totalMessage = "";
const toolCalls: any[] = []; const toolCalls: any[] = [];
@ -74,7 +81,7 @@ export async function* run(
start: AgentMessageType.MESSAGE_START, start: AgentMessageType.MESSAGE_START,
chunk: AgentMessageType.MESSAGE_CHUNK, chunk: AgentMessageType.MESSAGE_CHUNK,
end: AgentMessageType.MESSAGE_END, end: AgentMessageType.MESSAGE_END,
} },
); );
} }
} }
@ -83,14 +90,14 @@ export async function* run(
// Check for final response // Check for final response
if (totalMessage.includes("<final_response>")) { if (totalMessage.includes("<final_response>")) {
const match = totalMessage.match( const match = totalMessage.match(
/<final_response>(.*?)<\/final_response>/s /<final_response>(.*?)<\/final_response>/s,
); );
if (match) { if (match) {
// Accept synthesis - completed // Accept synthesis - completed
completed = true; completed = true;
logger.info( logger.info(
`Final synthesis accepted after ${searchCount} searches, ${totalEpisodesFound} unique episodes found` `Final synthesis accepted after ${searchCount} searches, ${totalEpisodesFound} unique episodes found`,
); );
break; break;
} }
@ -104,7 +111,7 @@ export async function* run(
yield Message("", AgentMessageType.SKILL_START); yield Message("", AgentMessageType.SKILL_START);
yield Message( yield Message(
`\nSearching memory: "${toolCall.args.query}"...\n`, `\nSearching memory: "${toolCall.args.query}"...\n`,
AgentMessageType.SKILL_CHUNK AgentMessageType.SKILL_CHUNK,
); );
yield Message("", AgentMessageType.SKILL_END); yield Message("", AgentMessageType.SKILL_END);
} }
@ -114,7 +121,7 @@ export async function* run(
searchTool.execute(toolCall.args).then((result: any) => ({ searchTool.execute(toolCall.args).then((result: any) => ({
toolCall, toolCall,
result, result,
})) })),
); );
const searchResults = await Promise.all(searchPromises); const searchResults = await Promise.all(searchPromises);
@ -165,20 +172,20 @@ export async function* run(
}); });
logger.info( logger.info(
`Search ${searchCount} completed: ${episodesInThisSearch} episodes (${uniqueNewEpisodes} new, ${totalEpisodesFound} unique total)` `Search ${searchCount} completed: ${episodesInThisSearch} episodes (${uniqueNewEpisodes} new, ${totalEpisodesFound} unique total)`,
); );
} }
// If found no episodes and haven't exhausted search attempts, require more searches // If found no episodes and haven't exhausted search attempts, require more searches
if (totalEpisodesFound === 0 && searchCount < 7) { if (totalEpisodesFound === 0 && searchCount < 7) {
logger.info( logger.info(
`Agent attempted synthesis with 0 unique episodes after ${searchCount} searches - requiring more attempts` `Agent attempted synthesis with 0 unique episodes after ${searchCount} searches - requiring more attempts`,
); );
yield Message("", AgentMessageType.SKILL_START); yield Message("", AgentMessageType.SKILL_START);
yield Message( yield Message(
`No relevant context found yet - trying different search angles...`, `No relevant context found yet - trying different search angles...`,
AgentMessageType.SKILL_CHUNK AgentMessageType.SKILL_CHUNK,
); );
yield Message("", AgentMessageType.SKILL_END); yield Message("", AgentMessageType.SKILL_END);
@ -202,7 +209,7 @@ Continue with different search strategies (you can search up to 7-10 times total
// Soft nudging after all searches executed (awareness, not commands) // Soft nudging after all searches executed (awareness, not commands)
if (totalEpisodesFound >= 30 && searchCount >= 3) { if (totalEpisodesFound >= 30 && searchCount >= 3) {
logger.info( logger.info(
`Nudging: ${totalEpisodesFound} unique episodes found - suggesting synthesis consideration` `Nudging: ${totalEpisodesFound} unique episodes found - suggesting synthesis consideration`,
); );
messages.push({ messages.push({
@ -211,7 +218,7 @@ Continue with different search strategies (you can search up to 7-10 times total
}); });
} else if (totalEpisodesFound >= 15 && searchCount >= 5) { } else if (totalEpisodesFound >= 15 && searchCount >= 5) {
logger.info( logger.info(
`Nudging: ${totalEpisodesFound} unique episodes after ${searchCount} searches - suggesting evaluation` `Nudging: ${totalEpisodesFound} unique episodes after ${searchCount} searches - suggesting evaluation`,
); );
messages.push({ messages.push({
@ -220,22 +227,23 @@ Continue with different search strategies (you can search up to 7-10 times total
}); });
} else if (searchCount >= 7) { } else if (searchCount >= 7) {
logger.info( logger.info(
`Nudging: ${searchCount} searches completed with ${totalEpisodesFound} unique episodes` `Nudging: ${searchCount} searches completed with ${totalEpisodesFound} unique episodes`,
); );
messages.push({ messages.push({
role: "system", role: "system",
content: `Search depth: You have performed ${searchCount} searches and found ${totalEpisodesFound} unique episodes. Consider whether additional searches would yield meaningfully different context, or if it's time to synthesize what you've discovered.`, content: `Search depth: You have performed ${searchCount} searches and found ${totalEpisodesFound} unique episodes. Consider whether additional searches would yield meaningfully different context, or if it's time to synthesize what you've discovered.`,
}); });
} if (searchCount >= 10) { }
if (searchCount >= 10) {
logger.info( logger.info(
`Reached maximum search limit (10), forcing synthesis with ${totalEpisodesFound} unique episodes` `Reached maximum search limit (10), forcing synthesis with ${totalEpisodesFound} unique episodes`,
); );
yield Message("", AgentMessageType.SKILL_START); yield Message("", AgentMessageType.SKILL_START);
yield Message( yield Message(
`Maximum searches reached - synthesizing results...`, `Maximum searches reached - synthesizing results...`,
AgentMessageType.SKILL_CHUNK AgentMessageType.SKILL_CHUNK,
); );
yield Message("", AgentMessageType.SKILL_END); yield Message("", AgentMessageType.SKILL_END);
@ -247,7 +255,10 @@ Continue with different search strategies (you can search up to 7-10 times total
} }
// Safety check - if no tool calls and no final response, something went wrong // Safety check - if no tool calls and no final response, something went wrong
if (toolCalls.length === 0 && !totalMessage.includes("<final_response>")) { if (
toolCalls.length === 0 &&
!totalMessage.includes("<final_response>")
) {
logger.warn("Agent produced neither tool calls nor final response"); logger.warn("Agent produced neither tool calls nor final response");
messages.push({ messages.push({
@ -261,11 +272,13 @@ Continue with different search strategies (you can search up to 7-10 times total
} }
if (!completed) { if (!completed) {
logger.warn(`Loop ended without completion after ${guardLoop} iterations`); logger.warn(
`Loop ended without completion after ${guardLoop} iterations`,
);
yield Message("", AgentMessageType.MESSAGE_START); yield Message("", AgentMessageType.MESSAGE_START);
yield Message( yield Message(
"Deep search did not complete - maximum iterations reached.", "Deep search did not complete - maximum iterations reached.",
AgentMessageType.MESSAGE_CHUNK AgentMessageType.MESSAGE_CHUNK,
); );
yield Message("", AgentMessageType.MESSAGE_END); yield Message("", AgentMessageType.MESSAGE_END);
} }

View File

@ -16,13 +16,7 @@ export const deepSearch = task({
id: "deep-search", id: "deep-search",
maxDuration: 3000, maxDuration: 3000,
run: async (payload: DeepSearchPayload): Promise<DeepSearchResponse> => { run: async (payload: DeepSearchPayload): Promise<DeepSearchResponse> => {
const { const { content, userId, stream, metadata: meta, intentOverride } = payload;
content,
userId,
stream,
metadata: meta,
intentOverride,
} = payload;
const randomKeyName = `deepSearch_${nanoid(10)}`; const randomKeyName = `deepSearch_${nanoid(10)}`;
@ -55,59 +49,33 @@ export const deepSearch = task({
// Run the ReAct loop generator // Run the ReAct loop generator
const llmResponse = run(initialMessages, searchTool); const llmResponse = run(initialMessages, searchTool);
if (stream) { // Streaming mode: stream via metadata.stream like chat.ts does
// Streaming mode: stream via metadata.stream like chat.ts does // This makes all message types available to clients in real-time
// This makes all message types available to clients in real-time const messageStream = await metadata.stream("messages", llmResponse);
const messageStream = await metadata.stream("messages", llmResponse);
let synthesis = ""; let synthesis = "";
for await (const step of messageStream) { for await (const step of messageStream) {
// MESSAGE_CHUNK: Final synthesis - accumulate and stream // MESSAGE_CHUNK: Final synthesis - accumulate and stream
if (step.type === AgentMessageType.MESSAGE_CHUNK) { if (step.type === AgentMessageType.MESSAGE_CHUNK) {
synthesis += step.message; synthesis += step.message;
}
// STREAM_END: Loop completed
if (step.type === AgentMessageType.STREAM_END) {
break;
}
} }
await deletePersonalAccessToken(pat?.id); // STREAM_END: Loop completed
if (step.type === AgentMessageType.STREAM_END) {
// Clean up any remaining tags break;
synthesis = synthesis
.replace(/<final_response>/gi, "")
.replace(/<\/final_response>/gi, "")
.trim();
return { synthesis };
} else {
// Non-streaming mode: consume generator without streaming
let synthesis = "";
for await (const step of llmResponse) {
if (step.type === AgentMessageType.MESSAGE_CHUNK) {
synthesis += step.message;
}
// Could also collect episodes from tool results if needed
} }
await deletePersonalAccessToken(pat?.id);
// Clean up any remaining tags
synthesis = synthesis
.replace(/<final_response>/gi, "")
.replace(/<\/final_response>/gi, "")
.trim();
// For non-streaming, we need to get episodes from search results
// Since we don't have direct access to search results in this flow,
// we'll return synthesis without episodes for now
// (episodes can be extracted from tool results if needed)
return { synthesis };
} }
await deletePersonalAccessToken(pat?.id);
// Clean up any remaining tags
synthesis = synthesis
.replace(/<final_response>/gi, "")
.replace(/<\/final_response>/gi, "")
.trim();
return { synthesis };
} catch (error) { } catch (error) {
await deletePersonalAccessToken(pat?.id); await deletePersonalAccessToken(pat?.id);
logger.error(`Deep search error: ${error}`); logger.error(`Deep search error: ${error}`);

View File

@ -11,7 +11,7 @@ export function createSearchMemoryTool(token: string) {
query: z query: z
.string() .string()
.describe( .describe(
"Search query to find relevant information. Be specific: entity names, topics, concepts." "Search query to find relevant information. Be specific: entity names, topics, concepts.",
), ),
}), }),
execute: async ({ query }) => { execute: async ({ query }) => {
@ -23,7 +23,7 @@ export function createSearchMemoryTool(token: string) {
headers: { headers: {
Authorization: `Bearer ${token}`, Authorization: `Bearer ${token}`,
}, },
} },
); );
const searchResult = response.data; const searchResult = response.data;
@ -57,9 +57,7 @@ export function extractEpisodesFromToolCalls(toolCalls: any[]): any[] {
// Deduplicate by content + createdAt // Deduplicate by content + createdAt
const uniqueEpisodes = Array.from( const uniqueEpisodes = Array.from(
new Map( new Map(episodes.map((e) => [`${e.content}-${e.createdAt}`, e])).values(),
episodes.map((e) => [`${e.content}-${e.createdAt}`, e])
).values()
); );
return uniqueEpisodes.slice(0, 10); return uniqueEpisodes.slice(0, 10);

View File

@ -179,27 +179,27 @@ export const memoryTools = [
required: ["integrationSlug", "action"], required: ["integrationSlug", "action"],
}, },
}, },
{ // {
name: "memory_deep_search", // name: "memory_deep_search",
description: // description:
"Search CORE memory with document context and get synthesized insights. Automatically analyzes content to infer intent (reading, writing, meeting prep, research, task tracking, etc.) and provides context-aware synthesis. USE THIS TOOL: When analyzing documents, emails, notes, or any substantial text content for relevant memories. HOW TO USE: Provide the full content text. The tool will decompose it, search for relevant memories, and synthesize findings based on inferred intent. Returns: Synthesized context summary and related episodes.", // "Search CORE memory with document context and get synthesized insights. Automatically analyzes content to infer intent (reading, writing, meeting prep, research, task tracking, etc.) and provides context-aware synthesis. USE THIS TOOL: When analyzing documents, emails, notes, or any substantial text content for relevant memories. HOW TO USE: Provide the full content text. The tool will decompose it, search for relevant memories, and synthesize findings based on inferred intent. Returns: Synthesized context summary and related episodes.",
inputSchema: { // inputSchema: {
type: "object", // type: "object",
properties: { // properties: {
content: { // content: {
type: "string", // type: "string",
description: // description:
"Full document/text content to analyze and search against memory", // "Full document/text content to analyze and search against memory",
}, // },
intentOverride: { // intentOverride: {
type: "string", // type: "string",
description: // description:
"Optional: Explicitly specify intent (e.g., 'meeting preparation', 'blog writing') instead of auto-detection", // "Optional: Explicitly specify intent (e.g., 'meeting preparation', 'blog writing') instead of auto-detection",
}, // },
}, // },
required: ["content"], // required: ["content"],
}, // },
}, // },
]; ];
// Function to call memory tools based on toolName // Function to call memory tools based on toolName

View File

@ -10,8 +10,8 @@
"lint:fix": "eslint 'app/**/*.{ts,tsx,js,jsx}' --rule 'turbo/no-undeclared-env-vars:error' -f table", "lint:fix": "eslint 'app/**/*.{ts,tsx,js,jsx}' --rule 'turbo/no-undeclared-env-vars:error' -f table",
"start": "node server.js", "start": "node server.js",
"typecheck": "tsc", "typecheck": "tsc",
"trigger:dev": "pnpm dlx trigger.dev@4.0.0-v4-beta.22 dev", "trigger:dev": "pnpm dlx trigger.dev@4.0.4 dev",
"trigger:deploy": "pnpm dlx trigger.dev@4.0.0-v4-beta.22 deploy" "trigger:deploy": "pnpm dlx trigger.dev@4.0.4 deploy"
}, },
"dependencies": { "dependencies": {
"@ai-sdk/amazon-bedrock": "2.2.12", "@ai-sdk/amazon-bedrock": "2.2.12",