diff --git a/apps/webapp/app/routes/api.v1.mcp.memory.tsx b/apps/webapp/app/routes/api.v1.mcp.memory.tsx new file mode 100644 index 0000000..83f4cef --- /dev/null +++ b/apps/webapp/app/routes/api.v1.mcp.memory.tsx @@ -0,0 +1,259 @@ +import { json } from "@remix-run/node"; +import { randomUUID } from "node:crypto"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { + isInitializeRequest, + JSONRPCMessage, +} from "@modelcontextprotocol/sdk/types.js"; +import { z } from "zod"; +import { createHybridActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { addToQueue, IngestBodyRequest } from "~/lib/ingest.server"; +import { SearchService } from "~/services/search.server"; +import { PassThrough } from "stream"; +import { handleTransport } from "~/utils/mcp"; + +// Map to store transports by session ID with cleanup tracking +const transports: { + [sessionId: string]: { + transport: StreamableHTTPServerTransport; + createdAt: number; + }; +} = {}; + +// Cleanup old sessions every 5 minutes +setInterval( + () => { + const now = Date.now(); + const maxAge = 30 * 60 * 1000; // 30 minutes + + Object.keys(transports).forEach((sessionId) => { + if (now - transports[sessionId].createdAt > maxAge) { + transports[sessionId].transport.close(); + delete transports[sessionId]; + } + }); + }, + 5 * 60 * 1000, +); + +// MCP request body schema +const MCPRequestSchema = z.object({}).passthrough(); + +// Search parameters schema for MCP tool +const SearchParamsSchema = z.object({ + query: z.string(), + startTime: z.string().optional(), + endTime: z.string().optional(), + spaceId: z.string().optional(), + limit: z.number().optional(), + maxBfsDepth: z.number().optional(), + includeInvalidated: z.boolean().optional(), + entityTypes: z.array(z.string()).optional(), + scoreThreshold: z.number().optional(), + minResults: z.number().optional(), +}); + +const searchService = new SearchService(); + +// Handle MCP HTTP requests properly +const handleMCPRequest = async ( + request: Request, + body: any, + authentication: any, +) => { + const sessionId = request.headers.get("mcp-session-id") as string | undefined; + let transport: StreamableHTTPServerTransport; + + try { + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId].transport; + } else if (!sessionId && isInitializeRequest(body)) { + // New initialization request + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (sessionId) => { + // Store the transport by session ID with timestamp + transports[sessionId] = { + transport, + createdAt: Date.now(), + }; + }, + }); + + // Clean up transport when closed + transport.onclose = () => { + if (transport.sessionId) { + delete transports[transport.sessionId]; + } + }; + + const server = new McpServer( + { + name: "echo-memory-server", + version: "1.0.0", + }, + { + capabilities: { + tools: {}, + }, + }, + ); + + // Register ingest tool + server.registerTool( + "ingest", + { + title: "Ingest Data", + description: "Ingest data into the memory system", + inputSchema: IngestBodyRequest.shape, + }, + async (args) => { + try { + const userId = authentication.userId; + + const response = addToQueue( + args as z.infer, + userId, + ); + return { + content: [ + { + type: "text", + text: JSON.stringify(response), + }, + ], + }; + } catch (error) { + console.error("MCP ingest error:", error); + return { + content: [ + { + type: "text", + text: `Error ingesting data: ${error instanceof Error ? error.message : String(error)}`, + }, + ], + isError: true, + }; + } + }, + ); + + // Register search tool + server.registerTool( + "search", + { + title: "Search Data", + description: "Search through ingested data", + inputSchema: SearchParamsSchema.shape, + }, + async (args) => { + try { + const userId = authentication.userId; + + const results = await searchService.search(args.query, userId, { + startTime: args.startTime ? new Date(args.startTime) : undefined, + endTime: args.endTime ? new Date(args.endTime) : undefined, + }); + + return { + content: [ + { + type: "text", + text: JSON.stringify(results), + }, + ], + }; + } catch (error) { + console.error("MCP search error:", error); + return { + content: [ + { + type: "text", + text: `Error searching: ${error instanceof Error ? error.message : String(error)}`, + }, + ], + isError: true, + }; + } + }, + ); + + // Connect to the MCP server + await server.connect(transport); + } else { + // Invalid request + throw new Error("Bad Request: No valid session ID provided"); + } + + const response = await handleTransport(transport, request, body); + + return response; + } catch (error) { + console.error("MCP request error:", error); + return json( + { + jsonrpc: "2.0", + error: { + code: -32000, + message: + error instanceof Error ? error.message : "Internal server error", + }, + id: body?.id || null, + }, + { status: 500 }, + ); + } +}; + +// Handle DELETE requests for session cleanup +const handleDelete = async (request: Request, authentication: any) => { + const sessionId = request.headers.get("mcp-session-id") as string | undefined; + + if (!sessionId || !transports[sessionId]) { + return new Response("Invalid or missing session ID", { status: 400 }); + } + + const transport = transports[sessionId].transport; + + // Clean up transport + transport.close(); + delete transports[sessionId]; + + return new Response(null, { status: 204 }); +}; + +const { action, loader } = createHybridActionApiRoute( + { + body: MCPRequestSchema, + allowJWT: true, + authorization: { + action: "mcp", + }, + corsStrategy: "all", + }, + async ({ body, authentication, request }) => { + const method = request.method; + + if (method === "POST") { + return await handleMCPRequest(request, body, authentication); + } else if (method === "DELETE") { + return await handleDelete(request, authentication); + } else { + return json( + { + jsonrpc: "2.0", + error: { + code: -32601, + message: "Method not allowed", + }, + id: null, + }, + { status: 405 }, + ); + } + }, +); + +export { action, loader }; diff --git a/apps/webapp/app/utils/mcp.ts b/apps/webapp/app/utils/mcp.ts new file mode 100644 index 0000000..bc06136 --- /dev/null +++ b/apps/webapp/app/utils/mcp.ts @@ -0,0 +1,103 @@ +import { type StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import { + type RequestId, + type JSONRPCMessage, +} from "@modelcontextprotocol/sdk/types.js"; +import { json } from "@remix-run/node"; + +const convertRemixRequestToTransport = (remixRequest: Request) => { + return { + method: remixRequest.method, + url: remixRequest.url, + headers: Object.fromEntries(remixRequest.headers.entries()), + }; +}; + +export const handleTransport = ( + transport: StreamableHTTPServerTransport, + request: Request, + body: any, +) => { + let responseData: any; + let responseStatus = 200; + let responseHeaders: Record = {}; + return new Promise(async (resolve) => { + const captureResponse = { + // Node.js ServerResponse methods required by StreamableHTTPServerTransport + writeHead: (statusCode: number, headers?: Record) => { + responseStatus = statusCode; + if (headers) { + Object.assign(responseHeaders, headers); + } + return captureResponse; + }, + + end: (chunk?: any) => { + responseData = chunk; + + if (responseStatus !== 200) { + resolve( + json( + typeof responseData === "string" + ? JSON.parse(responseData) + : responseData, + { + status: responseStatus, + headers: responseHeaders, + }, + ), + ); + } + return captureResponse; + }, + + setHeader: (name: string, value: string) => { + responseHeaders[name] = value; + return captureResponse; + }, + + flushHeaders: () => { + // No-op for our mock, but required by transport + return captureResponse; + }, + on: (event: string, callback: Function) => { + // Mock event handling - transport uses this for 'close' events + // In a real implementation, you'd want to handle cleanup + return captureResponse; + }, + + // Properties that transport may access + statusCode: responseStatus, + headersSent: false, + finished: false, + }; + + transport.send = async ( + message: JSONRPCMessage, + options?: { + relatedRequestId?: RequestId; + }, + ) => { + responseData = message; + + resolve( + json( + typeof responseData === "string" + ? JSON.parse(responseData) + : responseData, + { + status: responseStatus, + headers: { ...responseHeaders, "Content-Type": "application/json" }, + }, + ), + ); + }; + + await transport.handleRequest( + convertRemixRequestToTransport(request) as any, + captureResponse as any, + body, + ); + }); +};