mirror of
https://github.com/eliasstepanik/core.git
synced 2026-01-10 23:38:27 +00:00
Feat: deep search for extension and obsidian (#107)
* Feat: add deep search api * Feat: deep search agent * fix: stream utils for deep search * fix: deep search --------- Co-authored-by: Manoj <saimanoj58@gmail.com>
This commit is contained in:
parent
7523c99660
commit
6732ff71c5
64
apps/webapp/app/routes/api.v1.deep-search.tsx
Normal file
64
apps/webapp/app/routes/api.v1.deep-search.tsx
Normal file
@ -0,0 +1,64 @@
|
||||
import { z } from "zod";
|
||||
import { json } from "@remix-run/node";
|
||||
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
|
||||
import { deepSearch } from "~/trigger/deep-search";
|
||||
import { runs } from "@trigger.dev/sdk";
|
||||
|
||||
const DeepSearchBodySchema = z.object({
|
||||
content: z.string().min(1, "Content is required"),
|
||||
intentOverride: z.string().optional(),
|
||||
stream: z.boolean().default(false),
|
||||
metadata: z
|
||||
.object({
|
||||
source: z.enum(["chrome", "obsidian", "mcp"]).optional(),
|
||||
url: z.string().optional(),
|
||||
pageTitle: z.string().optional(),
|
||||
})
|
||||
.optional(),
|
||||
});
|
||||
|
||||
const { action, loader } = createActionApiRoute(
|
||||
{
|
||||
body: DeepSearchBodySchema,
|
||||
method: "POST",
|
||||
allowJWT: true,
|
||||
authorization: {
|
||||
action: "search",
|
||||
},
|
||||
corsStrategy: "all",
|
||||
},
|
||||
async ({ body, authentication }) => {
|
||||
let trigger;
|
||||
if (!body.stream) {
|
||||
trigger = await deepSearch.trigger({
|
||||
content: body.content,
|
||||
userId: authentication.userId,
|
||||
stream: body.stream,
|
||||
intentOverride: body.intentOverride,
|
||||
metadata: body.metadata,
|
||||
});
|
||||
|
||||
return json(trigger);
|
||||
} else {
|
||||
const runHandler = await deepSearch.trigger({
|
||||
content: body.content,
|
||||
userId: authentication.userId,
|
||||
stream: body.stream,
|
||||
intentOverride: body.intentOverride,
|
||||
metadata: body.metadata,
|
||||
});
|
||||
|
||||
for await (const run of runs.subscribeToRun(runHandler.id)) {
|
||||
if (run.status === "COMPLETED") {
|
||||
return json(run.output);
|
||||
} else if (run.status === "FAILED") {
|
||||
return json(run.error);
|
||||
}
|
||||
}
|
||||
|
||||
return json({ error: "Run failed" });
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export { action, loader };
|
||||
@ -80,7 +80,7 @@ export class SearchService {
|
||||
opts,
|
||||
);
|
||||
|
||||
// // 3. Apply adaptive filtering based on score threshold and minimum count
|
||||
// 3. Apply adaptive filtering based on score threshold and minimum count
|
||||
const filteredResults = this.applyAdaptiveFiltering(rankedStatements, opts);
|
||||
|
||||
// 3. Return top results
|
||||
|
||||
@ -59,9 +59,37 @@ export async function* processTag(
|
||||
const hasStartTag = chunk.includes(startTag);
|
||||
const hasClosingTag = chunk.includes("</");
|
||||
|
||||
// Check if we're currently accumulating a potential end tag
|
||||
const accumulatingEndTag = state.message.endsWith("</") ||
|
||||
state.message.match(/<\/[a-z_]*$/i);
|
||||
|
||||
if (hasClosingTag && !hasStartTag && !hasEndTag) {
|
||||
// If chunk only has </ but not the full end tag, accumulate it
|
||||
state.message += chunk;
|
||||
} else if (accumulatingEndTag) {
|
||||
// Continue accumulating if we're in the middle of a potential end tag
|
||||
state.message += chunk;
|
||||
// Check if we now have the complete end tag
|
||||
if (state.message.includes(endTag)) {
|
||||
// Process the complete message with end tag
|
||||
const endIndex = state.message.indexOf(endTag);
|
||||
const finalMessage = state.message.slice(0, endIndex).trim();
|
||||
const messageToSend = finalMessage.slice(
|
||||
finalMessage.indexOf(state.lastSent) + state.lastSent.length,
|
||||
);
|
||||
|
||||
if (messageToSend) {
|
||||
yield Message(
|
||||
messageToSend,
|
||||
states.chunk as AgentMessageType,
|
||||
extraParams,
|
||||
);
|
||||
}
|
||||
yield Message("", states.end as AgentMessageType, extraParams);
|
||||
|
||||
state.message = finalMessage;
|
||||
state.messageEnded = true;
|
||||
}
|
||||
} else if (hasEndTag || (!hasEndTag && !hasClosingTag)) {
|
||||
let currentMessage = comingFromStart
|
||||
? state.message
|
||||
|
||||
292
apps/webapp/app/trigger/deep-search/deep-search-utils.ts
Normal file
292
apps/webapp/app/trigger/deep-search/deep-search-utils.ts
Normal file
@ -0,0 +1,292 @@
|
||||
import { type CoreMessage } from "ai";
|
||||
import { logger } from "@trigger.dev/sdk/v3";
|
||||
import { generate } from "./stream-utils";
|
||||
import { processTag } from "../chat/stream-utils";
|
||||
import { type AgentMessage, AgentMessageType, Message } from "../chat/types";
|
||||
import { type TotalCost } from "../utils/types";
|
||||
|
||||
/**
|
||||
* Run the deep search ReAct loop
|
||||
* Async generator that yields AgentMessage objects for streaming
|
||||
* Follows the exact same pattern as chat-utils.ts
|
||||
*/
|
||||
export async function* run(
|
||||
initialMessages: CoreMessage[],
|
||||
searchTool: any,
|
||||
): AsyncGenerator<AgentMessage, any, any> {
|
||||
let messages = [...initialMessages];
|
||||
let completed = false;
|
||||
let guardLoop = 0;
|
||||
let searchCount = 0;
|
||||
let totalEpisodesFound = 0;
|
||||
const seenEpisodeIds = new Set<string>(); // Track unique episodes
|
||||
const totalCost: TotalCost = {
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
cost: 0,
|
||||
};
|
||||
|
||||
const tools = {
|
||||
searchMemory: searchTool,
|
||||
};
|
||||
|
||||
logger.info("Starting deep search ReAct loop");
|
||||
|
||||
try {
|
||||
while (!completed && guardLoop < 50) {
|
||||
logger.info(
|
||||
`ReAct loop iteration ${guardLoop}, searches: ${searchCount}`,
|
||||
);
|
||||
|
||||
// Call LLM with current message history
|
||||
const response = generate(
|
||||
messages,
|
||||
(event) => {
|
||||
const usage = event.usage;
|
||||
totalCost.inputTokens += usage.promptTokens;
|
||||
totalCost.outputTokens += usage.completionTokens;
|
||||
},
|
||||
tools,
|
||||
);
|
||||
|
||||
let totalMessage = "";
|
||||
const toolCalls: any[] = [];
|
||||
|
||||
// States for streaming final_response tags
|
||||
const messageState = {
|
||||
inTag: false,
|
||||
message: "",
|
||||
messageEnded: false,
|
||||
lastSent: "",
|
||||
};
|
||||
|
||||
// Process streaming response
|
||||
for await (const chunk of response) {
|
||||
if (typeof chunk === "object" && chunk.type === "tool-call") {
|
||||
// Agent made a tool call
|
||||
toolCalls.push(chunk);
|
||||
logger.info(`Tool call: ${chunk.toolName}`);
|
||||
} else if (typeof chunk === "string") {
|
||||
totalMessage += chunk;
|
||||
|
||||
// Stream final_response tags using processTag
|
||||
if (!messageState.messageEnded) {
|
||||
yield* processTag(
|
||||
messageState,
|
||||
totalMessage,
|
||||
chunk,
|
||||
"<final_response>",
|
||||
"</final_response>",
|
||||
{
|
||||
start: AgentMessageType.MESSAGE_START,
|
||||
chunk: AgentMessageType.MESSAGE_CHUNK,
|
||||
end: AgentMessageType.MESSAGE_END,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for final response
|
||||
if (totalMessage.includes("<final_response>")) {
|
||||
const match = totalMessage.match(
|
||||
/<final_response>(.*?)<\/final_response>/s,
|
||||
);
|
||||
|
||||
if (match) {
|
||||
// Accept synthesis - completed
|
||||
completed = true;
|
||||
logger.info(
|
||||
`Final synthesis accepted after ${searchCount} searches, ${totalEpisodesFound} unique episodes found`,
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Execute tool calls in parallel for better performance
|
||||
if (toolCalls.length > 0) {
|
||||
// Notify about all searches starting
|
||||
for (const toolCall of toolCalls) {
|
||||
logger.info(`Executing search: ${JSON.stringify(toolCall.args)}`);
|
||||
yield Message("", AgentMessageType.SKILL_START);
|
||||
yield Message(
|
||||
`\nSearching memory: "${toolCall.args.query}"...\n`,
|
||||
AgentMessageType.SKILL_CHUNK,
|
||||
);
|
||||
yield Message("", AgentMessageType.SKILL_END);
|
||||
}
|
||||
|
||||
// Execute all searches in parallel
|
||||
const searchPromises = toolCalls.map((toolCall) =>
|
||||
searchTool.execute(toolCall.args).then((result: any) => ({
|
||||
toolCall,
|
||||
result,
|
||||
})),
|
||||
);
|
||||
|
||||
const searchResults = await Promise.all(searchPromises);
|
||||
|
||||
// Process results and add to message history
|
||||
for (const { toolCall, result } of searchResults) {
|
||||
searchCount++;
|
||||
|
||||
// Deduplicate episodes - track unique IDs
|
||||
let uniqueNewEpisodes = 0;
|
||||
if (result.episodes && Array.isArray(result.episodes)) {
|
||||
for (const episode of result.episodes) {
|
||||
const episodeId =
|
||||
episode.id || episode._id || JSON.stringify(episode);
|
||||
if (!seenEpisodeIds.has(episodeId)) {
|
||||
seenEpisodeIds.add(episodeId);
|
||||
uniqueNewEpisodes++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const episodesInThisSearch = result.episodes?.length || 0;
|
||||
totalEpisodesFound = seenEpisodeIds.size; // Use unique count
|
||||
|
||||
messages.push({
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "tool-call",
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
args: toolCall.args,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
// Add tool result to message history
|
||||
messages.push({
|
||||
role: "tool",
|
||||
content: [
|
||||
{
|
||||
type: "tool-result",
|
||||
toolName: toolCall.toolName,
|
||||
toolCallId: toolCall.toolCallId,
|
||||
result: result,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
logger.info(
|
||||
`Search ${searchCount} completed: ${episodesInThisSearch} episodes (${uniqueNewEpisodes} new, ${totalEpisodesFound} unique total)`,
|
||||
);
|
||||
}
|
||||
|
||||
// If found no episodes and haven't exhausted search attempts, require more searches
|
||||
if (totalEpisodesFound === 0 && searchCount < 7) {
|
||||
logger.info(
|
||||
`Agent attempted synthesis with 0 unique episodes after ${searchCount} searches - requiring more attempts`,
|
||||
);
|
||||
|
||||
yield Message("", AgentMessageType.SKILL_START);
|
||||
yield Message(
|
||||
`No relevant context found yet - trying different search angles...`,
|
||||
AgentMessageType.SKILL_CHUNK,
|
||||
);
|
||||
yield Message("", AgentMessageType.SKILL_END);
|
||||
|
||||
messages.push({
|
||||
role: "system",
|
||||
content: `You have performed ${searchCount} searches but found 0 unique relevant episodes. Your queries may be too abstract or not matching the user's actual conversation topics.
|
||||
|
||||
Review your DECOMPOSITION:
|
||||
- Are you using specific terms from the content?
|
||||
- Try searching broader related topics the user might have discussed
|
||||
- Try different terminology or related concepts
|
||||
- Search for user's projects, work areas, or interests
|
||||
|
||||
Continue with different search strategies (you can search up to 7-10 times total).`,
|
||||
});
|
||||
|
||||
guardLoop++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Soft nudging after all searches executed (awareness, not commands)
|
||||
if (totalEpisodesFound >= 30 && searchCount >= 3) {
|
||||
logger.info(
|
||||
`Nudging: ${totalEpisodesFound} unique episodes found - suggesting synthesis consideration`,
|
||||
);
|
||||
|
||||
messages.push({
|
||||
role: "system",
|
||||
content: `Context awareness: You have found ${totalEpisodesFound} unique episodes across ${searchCount} searches. This represents substantial context. Consider whether you have sufficient information for quality synthesis, or if additional search angles would meaningfully improve understanding.`,
|
||||
});
|
||||
} else if (totalEpisodesFound >= 15 && searchCount >= 5) {
|
||||
logger.info(
|
||||
`Nudging: ${totalEpisodesFound} unique episodes after ${searchCount} searches - suggesting evaluation`,
|
||||
);
|
||||
|
||||
messages.push({
|
||||
role: "system",
|
||||
content: `Progress update: You have ${totalEpisodesFound} unique episodes from ${searchCount} searches. Evaluate whether you have covered the main angles from your decomposition, or if important aspects remain unexplored.`,
|
||||
});
|
||||
} else if (searchCount >= 7) {
|
||||
logger.info(
|
||||
`Nudging: ${searchCount} searches completed with ${totalEpisodesFound} unique episodes`,
|
||||
);
|
||||
|
||||
messages.push({
|
||||
role: "system",
|
||||
content: `Search depth: You have performed ${searchCount} searches and found ${totalEpisodesFound} unique episodes. Consider whether additional searches would yield meaningfully different context, or if it's time to synthesize what you've discovered.`,
|
||||
});
|
||||
}
|
||||
if (searchCount >= 10) {
|
||||
logger.info(
|
||||
`Reached maximum search limit (10), forcing synthesis with ${totalEpisodesFound} unique episodes`,
|
||||
);
|
||||
|
||||
yield Message("", AgentMessageType.SKILL_START);
|
||||
yield Message(
|
||||
`Maximum searches reached - synthesizing results...`,
|
||||
AgentMessageType.SKILL_CHUNK,
|
||||
);
|
||||
yield Message("", AgentMessageType.SKILL_END);
|
||||
|
||||
messages.push({
|
||||
role: "system",
|
||||
content: `You have performed 10 searches and found ${totalEpisodesFound} unique episodes. This is the maximum allowed. You MUST now provide your final synthesis wrapped in <final_response> tags based on what you've found.`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Safety check - if no tool calls and no final response, something went wrong
|
||||
if (
|
||||
toolCalls.length === 0 &&
|
||||
!totalMessage.includes("<final_response>")
|
||||
) {
|
||||
logger.warn("Agent produced neither tool calls nor final response");
|
||||
|
||||
messages.push({
|
||||
role: "system",
|
||||
content:
|
||||
"You must either use the searchMemory tool to search for more context, or provide your final synthesis wrapped in <final_response> tags.",
|
||||
});
|
||||
}
|
||||
|
||||
guardLoop++;
|
||||
}
|
||||
|
||||
if (!completed) {
|
||||
logger.warn(
|
||||
`Loop ended without completion after ${guardLoop} iterations`,
|
||||
);
|
||||
yield Message("", AgentMessageType.MESSAGE_START);
|
||||
yield Message(
|
||||
"Deep search did not complete - maximum iterations reached.",
|
||||
AgentMessageType.MESSAGE_CHUNK,
|
||||
);
|
||||
yield Message("", AgentMessageType.MESSAGE_END);
|
||||
}
|
||||
|
||||
yield Message("Stream ended", AgentMessageType.STREAM_END);
|
||||
} catch (error) {
|
||||
logger.error(`Deep search error: ${error}`);
|
||||
yield Message((error as Error).message, AgentMessageType.ERROR);
|
||||
yield Message("Stream ended", AgentMessageType.STREAM_END);
|
||||
}
|
||||
}
|
||||
85
apps/webapp/app/trigger/deep-search/index.ts
Normal file
85
apps/webapp/app/trigger/deep-search/index.ts
Normal file
@ -0,0 +1,85 @@
|
||||
import { metadata, task } from "@trigger.dev/sdk";
|
||||
import { type CoreMessage } from "ai";
|
||||
import { logger } from "@trigger.dev/sdk/v3";
|
||||
import { nanoid } from "nanoid";
|
||||
import {
|
||||
deletePersonalAccessToken,
|
||||
getOrCreatePersonalAccessToken,
|
||||
} from "../utils/utils";
|
||||
import { getReActPrompt } from "./prompt";
|
||||
import { type DeepSearchPayload, type DeepSearchResponse } from "./types";
|
||||
import { createSearchMemoryTool } from "./utils";
|
||||
import { run } from "./deep-search-utils";
|
||||
import { AgentMessageType } from "../chat/types";
|
||||
|
||||
export const deepSearch = task({
|
||||
id: "deep-search",
|
||||
maxDuration: 3000,
|
||||
run: async (payload: DeepSearchPayload): Promise<DeepSearchResponse> => {
|
||||
const { content, userId, stream, metadata: meta, intentOverride } = payload;
|
||||
|
||||
const randomKeyName = `deepSearch_${nanoid(10)}`;
|
||||
|
||||
// Get or create token for search API calls
|
||||
const pat = await getOrCreatePersonalAccessToken({
|
||||
name: randomKeyName,
|
||||
userId: userId as string,
|
||||
});
|
||||
|
||||
if (!pat?.token) {
|
||||
throw new Error("Failed to create personal access token");
|
||||
}
|
||||
|
||||
try {
|
||||
// Create search tool that agent will use
|
||||
const searchTool = createSearchMemoryTool(pat.token);
|
||||
|
||||
// Build initial messages with ReAct prompt
|
||||
const initialMessages: CoreMessage[] = [
|
||||
{
|
||||
role: "system",
|
||||
content: getReActPrompt(meta, intentOverride),
|
||||
},
|
||||
{
|
||||
role: "user",
|
||||
content: `CONTENT TO ANALYZE:\n${content}\n\nPlease search my memory for relevant context and synthesize what you find.`,
|
||||
},
|
||||
];
|
||||
|
||||
// Run the ReAct loop generator
|
||||
const llmResponse = run(initialMessages, searchTool);
|
||||
|
||||
// Streaming mode: stream via metadata.stream like chat.ts does
|
||||
// This makes all message types available to clients in real-time
|
||||
const messageStream = await metadata.stream("messages", llmResponse);
|
||||
|
||||
let synthesis = "";
|
||||
|
||||
for await (const step of messageStream) {
|
||||
// MESSAGE_CHUNK: Final synthesis - accumulate and stream
|
||||
if (step.type === AgentMessageType.MESSAGE_CHUNK) {
|
||||
synthesis += step.message;
|
||||
}
|
||||
|
||||
// STREAM_END: Loop completed
|
||||
if (step.type === AgentMessageType.STREAM_END) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await deletePersonalAccessToken(pat?.id);
|
||||
|
||||
// Clean up any remaining tags
|
||||
synthesis = synthesis
|
||||
.replace(/<final_response>/gi, "")
|
||||
.replace(/<\/final_response>/gi, "")
|
||||
.trim();
|
||||
|
||||
return { synthesis };
|
||||
} catch (error) {
|
||||
await deletePersonalAccessToken(pat?.id);
|
||||
logger.error(`Deep search error: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
});
|
||||
148
apps/webapp/app/trigger/deep-search/prompt.ts
Normal file
148
apps/webapp/app/trigger/deep-search/prompt.ts
Normal file
@ -0,0 +1,148 @@
|
||||
export function getReActPrompt(
|
||||
metadata?: { source?: string; url?: string; pageTitle?: string },
|
||||
intentOverride?: string
|
||||
): string {
|
||||
const contextHints = [];
|
||||
|
||||
if (metadata?.source === "chrome" && metadata?.url?.includes("mail.google.com")) {
|
||||
contextHints.push("Content is from email - likely reading intent");
|
||||
}
|
||||
if (metadata?.source === "chrome" && metadata?.url?.includes("calendar.google.com")) {
|
||||
contextHints.push("Content is from calendar - likely meeting prep intent");
|
||||
}
|
||||
if (metadata?.source === "chrome" && metadata?.url?.includes("docs.google.com")) {
|
||||
contextHints.push("Content is from document editor - likely writing intent");
|
||||
}
|
||||
if (metadata?.source === "obsidian") {
|
||||
contextHints.push("Content is from note editor - likely writing or research intent");
|
||||
}
|
||||
|
||||
return `You are a memory research agent analyzing content to find relevant context.
|
||||
|
||||
YOUR PROCESS (ReAct Framework):
|
||||
|
||||
1. DECOMPOSE: First, break down the content into structured categories
|
||||
|
||||
Analyze the content and extract:
|
||||
a) ENTITIES: Specific people, project names, tools, products mentioned
|
||||
Example: "John Smith", "Phoenix API", "Redis", "mobile app"
|
||||
|
||||
b) TOPICS & CONCEPTS: Key subjects, themes, domains
|
||||
Example: "authentication", "database design", "performance optimization"
|
||||
|
||||
c) TEMPORAL MARKERS: Time references, deadlines, events
|
||||
Example: "last week's meeting", "Q2 launch", "yesterday's discussion"
|
||||
|
||||
d) ACTIONS & TASKS: What's being done, decided, or requested
|
||||
Example: "implement feature", "review code", "make decision on"
|
||||
|
||||
e) USER INTENT: What is the user trying to accomplish?
|
||||
${intentOverride ? `User specified: "${intentOverride}"` : "Infer from context: reading/writing/meeting prep/research/task tracking/review"}
|
||||
|
||||
2. FORM QUERIES: Create targeted search queries from your decomposition
|
||||
|
||||
Based on decomposition, form specific queries:
|
||||
- Search for each entity by name (people, projects, tools)
|
||||
- Search for topics the user has discussed before
|
||||
- Search for related work or conversations in this domain
|
||||
- Use the user's actual terminology, not generic concepts
|
||||
|
||||
EXAMPLE - Content: "Email from Sarah about the API redesign we discussed last week"
|
||||
Decomposition:
|
||||
- Entities: "Sarah", "API redesign"
|
||||
- Topics: "API design", "redesign"
|
||||
- Temporal: "last week"
|
||||
- Actions: "discussed", "email communication"
|
||||
- Intent: Reading (email) / meeting prep
|
||||
|
||||
Queries to form:
|
||||
✅ "Sarah" (find past conversations with Sarah)
|
||||
✅ "API redesign" or "API design" (find project discussions)
|
||||
✅ "last week" + "Sarah" (find recent context)
|
||||
✅ "meetings" or "discussions" (find related conversations)
|
||||
|
||||
❌ Avoid: "email communication patterns", "API architecture philosophy"
|
||||
(These are abstract - search what user actually discussed!)
|
||||
|
||||
3. SEARCH: Execute your queries using searchMemory tool
|
||||
- Start with 2-3 core searches based on main entities/topics
|
||||
- Make each search specific and targeted
|
||||
- Use actual terms from the content, not rephrased concepts
|
||||
|
||||
4. OBSERVE: Evaluate search results
|
||||
- Did you find relevant episodes? How many unique ones?
|
||||
- What specific context emerged?
|
||||
- What new entities/topics appeared in results?
|
||||
- Are there gaps in understanding?
|
||||
- Should you search more angles?
|
||||
|
||||
Note: Episode counts are automatically deduplicated across searches - overlapping episodes are only counted once.
|
||||
|
||||
5. REACT: Decide next action based on observations
|
||||
|
||||
STOPPING CRITERIA - Proceed to SYNTHESIZE if ANY of these are true:
|
||||
- You found 20+ unique episodes across your searches → ENOUGH CONTEXT
|
||||
- You performed 5+ searches and found relevant episodes → SUFFICIENT
|
||||
- You performed 7+ searches regardless of results → EXHAUSTED STRATEGIES
|
||||
- You found strong relevant context from multiple angles → COMPLETE
|
||||
|
||||
System nudges will provide awareness of your progress, but you decide when synthesis quality would be optimal.
|
||||
|
||||
If you found little/no context AND searched less than 7 times:
|
||||
- Try different query angles from your decomposition
|
||||
- Search broader related topics
|
||||
- Search user's projects or work areas
|
||||
- Try alternative terminology
|
||||
|
||||
⚠️ DO NOT search endlessly - if you found relevant episodes, STOP and synthesize!
|
||||
|
||||
6. SYNTHESIZE: After gathering sufficient context, provide final answer
|
||||
- Wrap your synthesis in <final_response> tags
|
||||
- Present direct factual context from memory - no meta-commentary
|
||||
- Write as if providing background context to an AI assistant
|
||||
- Include: facts, decisions, preferences, patterns, timelines
|
||||
- Note any gaps, contradictions, or evolution in thinking
|
||||
- Keep it concise and actionable
|
||||
- DO NOT use phrases like "Previous discussions on", "From conversations", "Past preferences indicate"
|
||||
- DO NOT use conversational language like "you said" or "you mentioned"
|
||||
- Present information as direct factual statements
|
||||
|
||||
FINAL RESPONSE FORMAT:
|
||||
<final_response>
|
||||
[Direct synthesized context - factual statements only]
|
||||
|
||||
Good examples:
|
||||
- "The API redesign focuses on performance and scalability. Key decisions: moving to GraphQL, caching layer with Redis."
|
||||
- "Project Phoenix launches Q2 2024. Main features: real-time sync, offline mode, collaborative editing."
|
||||
- "Sarah leads the backend team. Recent work includes authentication refactor and database migration."
|
||||
|
||||
Bad examples:
|
||||
❌ "Previous discussions on the API revealed..."
|
||||
❌ "From past conversations, it appears that..."
|
||||
❌ "Past preferences indicate..."
|
||||
❌ "The user mentioned that..."
|
||||
|
||||
Just state the facts directly.
|
||||
</final_response>
|
||||
|
||||
${contextHints.length > 0 ? `\nCONTEXT HINTS:\n${contextHints.join("\n")}` : ""}
|
||||
|
||||
CRITICAL REQUIREMENTS:
|
||||
- ALWAYS start with DECOMPOSE step - extract entities, topics, temporal markers, actions
|
||||
- Form specific queries from your decomposition - use user's actual terms
|
||||
- Minimum 3 searches required
|
||||
- Maximum 10 searches allowed - must synthesize after that
|
||||
- STOP and synthesize when you hit stopping criteria (20+ episodes, 5+ searches with results, 7+ searches total)
|
||||
- Each search should target different aspects from decomposition
|
||||
- Present synthesis directly without meta-commentary
|
||||
|
||||
SEARCH QUALITY CHECKLIST:
|
||||
✅ Queries use specific terms from content (names, projects, exact phrases)
|
||||
✅ Searched multiple angles from decomposition (entities, topics, related areas)
|
||||
✅ Stop when you have enough unique context - don't search endlessly
|
||||
✅ Tried alternative terminology if initial searches found nothing
|
||||
❌ Avoid generic/abstract queries that don't match user's vocabulary
|
||||
❌ Don't stop at 3 searches if you found zero unique episodes
|
||||
❌ Don't keep searching when you already found 20+ unique episodes
|
||||
}`
|
||||
}
|
||||
68
apps/webapp/app/trigger/deep-search/stream-utils.ts
Normal file
68
apps/webapp/app/trigger/deep-search/stream-utils.ts
Normal file
@ -0,0 +1,68 @@
|
||||
import { openai } from "@ai-sdk/openai";
|
||||
import { logger } from "@trigger.dev/sdk/v3";
|
||||
import {
|
||||
type CoreMessage,
|
||||
type LanguageModelV1,
|
||||
streamText,
|
||||
type ToolSet,
|
||||
} from "ai";
|
||||
|
||||
/**
|
||||
* Generate LLM responses with tool calling support
|
||||
* Simplified version for deep-search use case - NO maxSteps for manual ReAct control
|
||||
*/
|
||||
export async function* generate(
|
||||
messages: CoreMessage[],
|
||||
onFinish?: (event: any) => void,
|
||||
tools?: ToolSet,
|
||||
model?: string,
|
||||
): AsyncGenerator<
|
||||
| string
|
||||
| {
|
||||
type: string;
|
||||
toolName: string;
|
||||
args?: any;
|
||||
toolCallId?: string;
|
||||
}
|
||||
> {
|
||||
const modelToUse = model || process.env.MODEL || "gpt-4.1-2025-04-14";
|
||||
const modelInstance = openai(modelToUse) as LanguageModelV1;
|
||||
|
||||
logger.info(`Starting LLM generation with model: ${modelToUse}`);
|
||||
|
||||
try {
|
||||
const { textStream, fullStream } = streamText({
|
||||
model: modelInstance,
|
||||
messages,
|
||||
temperature: 1,
|
||||
tools,
|
||||
// NO maxSteps - we handle tool execution manually in the ReAct loop
|
||||
toolCallStreaming: true,
|
||||
onFinish,
|
||||
});
|
||||
|
||||
// Yield text chunks
|
||||
for await (const chunk of textStream) {
|
||||
yield chunk;
|
||||
}
|
||||
|
||||
// Yield tool calls
|
||||
for await (const fullChunk of fullStream) {
|
||||
if (fullChunk.type === "tool-call") {
|
||||
yield {
|
||||
type: "tool-call",
|
||||
toolName: fullChunk.toolName,
|
||||
toolCallId: fullChunk.toolCallId,
|
||||
args: fullChunk.args,
|
||||
};
|
||||
}
|
||||
|
||||
if (fullChunk.type === "error") {
|
||||
logger.error(`LLM error: ${JSON.stringify(fullChunk)}`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`LLM generation error: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
20
apps/webapp/app/trigger/deep-search/types.ts
Normal file
20
apps/webapp/app/trigger/deep-search/types.ts
Normal file
@ -0,0 +1,20 @@
|
||||
export interface DeepSearchPayload {
|
||||
content: string;
|
||||
userId: string;
|
||||
stream: boolean;
|
||||
intentOverride?: string;
|
||||
metadata?: {
|
||||
source?: "chrome" | "obsidian" | "mcp";
|
||||
url?: string;
|
||||
pageTitle?: string;
|
||||
};
|
||||
}
|
||||
|
||||
export interface DeepSearchResponse {
|
||||
synthesis: string;
|
||||
episodes?: Array<{
|
||||
content: string;
|
||||
createdAt: Date;
|
||||
spaceIds: string[];
|
||||
}>;
|
||||
}
|
||||
64
apps/webapp/app/trigger/deep-search/utils.ts
Normal file
64
apps/webapp/app/trigger/deep-search/utils.ts
Normal file
@ -0,0 +1,64 @@
|
||||
import { tool } from "ai";
|
||||
import { z } from "zod";
|
||||
import axios from "axios";
|
||||
import { logger } from "@trigger.dev/sdk/v3";
|
||||
|
||||
export function createSearchMemoryTool(token: string) {
|
||||
return tool({
|
||||
description:
|
||||
"Search the user's memory for relevant facts and episodes. Use this tool multiple times with different queries to gather comprehensive context.",
|
||||
parameters: z.object({
|
||||
query: z
|
||||
.string()
|
||||
.describe(
|
||||
"Search query to find relevant information. Be specific: entity names, topics, concepts.",
|
||||
),
|
||||
}),
|
||||
execute: async ({ query }) => {
|
||||
try {
|
||||
const response = await axios.post(
|
||||
`${process.env.API_BASE_URL || "https://core.heysol.ai"}/api/v1/search`,
|
||||
{ query },
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
const searchResult = response.data;
|
||||
|
||||
return {
|
||||
facts: searchResult.facts || [],
|
||||
episodes: searchResult.episodes || [],
|
||||
summary: `Found ${searchResult.episodes?.length || 0} relevant memories`,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error(`SearchMemory tool error: ${error}`);
|
||||
return {
|
||||
facts: [],
|
||||
episodes: [],
|
||||
summary: "No results found",
|
||||
};
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Helper to extract unique episodes from tool calls
|
||||
export function extractEpisodesFromToolCalls(toolCalls: any[]): any[] {
|
||||
const episodes: any[] = [];
|
||||
|
||||
for (const call of toolCalls || []) {
|
||||
if (call.toolName === "searchMemory" && call.result?.episodes) {
|
||||
episodes.push(...call.result.episodes);
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate by content + createdAt
|
||||
const uniqueEpisodes = Array.from(
|
||||
new Map(episodes.map((e) => [`${e.content}-${e.createdAt}`, e])).values(),
|
||||
);
|
||||
|
||||
return uniqueEpisodes.slice(0, 10);
|
||||
}
|
||||
@ -3,6 +3,7 @@ import { addToQueue } from "~/lib/ingest.server";
|
||||
import { logger } from "~/services/logger.service";
|
||||
import { SearchService } from "~/services/search.server";
|
||||
import { SpaceService } from "~/services/space.server";
|
||||
import { deepSearch } from "~/trigger/deep-search";
|
||||
import { IntegrationLoader } from "./integration-loader";
|
||||
|
||||
const searchService = new SearchService();
|
||||
@ -178,6 +179,27 @@ export const memoryTools = [
|
||||
required: ["integrationSlug", "action"],
|
||||
},
|
||||
},
|
||||
// {
|
||||
// name: "memory_deep_search",
|
||||
// description:
|
||||
// "Search CORE memory with document context and get synthesized insights. Automatically analyzes content to infer intent (reading, writing, meeting prep, research, task tracking, etc.) and provides context-aware synthesis. USE THIS TOOL: When analyzing documents, emails, notes, or any substantial text content for relevant memories. HOW TO USE: Provide the full content text. The tool will decompose it, search for relevant memories, and synthesize findings based on inferred intent. Returns: Synthesized context summary and related episodes.",
|
||||
// inputSchema: {
|
||||
// type: "object",
|
||||
// properties: {
|
||||
// content: {
|
||||
// type: "string",
|
||||
// description:
|
||||
// "Full document/text content to analyze and search against memory",
|
||||
// },
|
||||
// intentOverride: {
|
||||
// type: "string",
|
||||
// description:
|
||||
// "Optional: Explicitly specify intent (e.g., 'meeting preparation', 'blog writing') instead of auto-detection",
|
||||
// },
|
||||
// },
|
||||
// required: ["content"],
|
||||
// },
|
||||
// },
|
||||
];
|
||||
|
||||
// Function to call memory tools based on toolName
|
||||
@ -205,6 +227,8 @@ export async function callMemoryTool(
|
||||
return await handleGetIntegrationActions({ ...args });
|
||||
case "execute_integration_action":
|
||||
return await handleExecuteIntegrationAction({ ...args });
|
||||
case "memory_deep_search":
|
||||
return await handleMemoryDeepSearch({ ...args, userId, source });
|
||||
default:
|
||||
throw new Error(`Unknown memory tool: ${toolName}`);
|
||||
}
|
||||
@ -546,3 +570,58 @@ async function handleExecuteIntegrationAction(args: any) {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Handler for memory_deep_search
|
||||
async function handleMemoryDeepSearch(args: any) {
|
||||
try {
|
||||
const { content, intentOverride, userId, source } = args;
|
||||
|
||||
if (!content) {
|
||||
throw new Error("content is required");
|
||||
}
|
||||
|
||||
// Trigger non-streaming deep search task
|
||||
const handle = await deepSearch.triggerAndWait({
|
||||
content,
|
||||
userId,
|
||||
stream: false, // MCP doesn't need streaming
|
||||
intentOverride,
|
||||
metadata: { source },
|
||||
});
|
||||
|
||||
// Wait for task completion
|
||||
if (handle.ok) {
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: JSON.stringify(handle.output),
|
||||
},
|
||||
],
|
||||
isError: false,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Error performing deep search: ${handle.error instanceof Error ? handle.error.message : String(handle.error)}`,
|
||||
},
|
||||
],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`MCP deep search error: ${error}`);
|
||||
|
||||
return {
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: `Error performing deep search: ${error instanceof Error ? error.message : String(error)}`,
|
||||
},
|
||||
],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,8 +10,8 @@
|
||||
"lint:fix": "eslint 'app/**/*.{ts,tsx,js,jsx}' --rule 'turbo/no-undeclared-env-vars:error' -f table",
|
||||
"start": "node server.js",
|
||||
"typecheck": "tsc",
|
||||
"trigger:dev": "pnpm dlx trigger.dev@4.0.0-v4-beta.22 dev",
|
||||
"trigger:deploy": "pnpm dlx trigger.dev@4.0.0-v4-beta.22 deploy"
|
||||
"trigger:dev": "pnpm dlx trigger.dev@4.0.4 dev",
|
||||
"trigger:deploy": "pnpm dlx trigger.dev@4.0.4 deploy"
|
||||
},
|
||||
"dependencies": {
|
||||
"@ai-sdk/amazon-bedrock": "2.2.12",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user