Feat: add mcp for memory as http streamable

This commit is contained in:
Harshith Mullapudi 2025-07-17 23:02:51 +05:30
parent ac811bfcb3
commit e82e7c1db2
2 changed files with 362 additions and 0 deletions

View File

@ -0,0 +1,259 @@
import { json } from "@remix-run/node";
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import {
isInitializeRequest,
JSONRPCMessage,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import { createHybridActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { addToQueue, IngestBodyRequest } from "~/lib/ingest.server";
import { SearchService } from "~/services/search.server";
import { PassThrough } from "stream";
import { handleTransport } from "~/utils/mcp";
// Map to store transports by session ID with cleanup tracking
const transports: {
[sessionId: string]: {
transport: StreamableHTTPServerTransport;
createdAt: number;
};
} = {};
// Cleanup old sessions every 5 minutes
setInterval(
() => {
const now = Date.now();
const maxAge = 30 * 60 * 1000; // 30 minutes
Object.keys(transports).forEach((sessionId) => {
if (now - transports[sessionId].createdAt > maxAge) {
transports[sessionId].transport.close();
delete transports[sessionId];
}
});
},
5 * 60 * 1000,
);
// MCP request body schema
const MCPRequestSchema = z.object({}).passthrough();
// Search parameters schema for MCP tool
const SearchParamsSchema = z.object({
query: z.string(),
startTime: z.string().optional(),
endTime: z.string().optional(),
spaceId: z.string().optional(),
limit: z.number().optional(),
maxBfsDepth: z.number().optional(),
includeInvalidated: z.boolean().optional(),
entityTypes: z.array(z.string()).optional(),
scoreThreshold: z.number().optional(),
minResults: z.number().optional(),
});
const searchService = new SearchService();
// Handle MCP HTTP requests properly
const handleMCPRequest = async (
request: Request,
body: any,
authentication: any,
) => {
const sessionId = request.headers.get("mcp-session-id") as string | undefined;
let transport: StreamableHTTPServerTransport;
try {
if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId].transport;
} else if (!sessionId && isInitializeRequest(body)) {
// New initialization request
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// Store the transport by session ID with timestamp
transports[sessionId] = {
transport,
createdAt: Date.now(),
};
},
});
// Clean up transport when closed
transport.onclose = () => {
if (transport.sessionId) {
delete transports[transport.sessionId];
}
};
const server = new McpServer(
{
name: "echo-memory-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
},
);
// Register ingest tool
server.registerTool(
"ingest",
{
title: "Ingest Data",
description: "Ingest data into the memory system",
inputSchema: IngestBodyRequest.shape,
},
async (args) => {
try {
const userId = authentication.userId;
const response = addToQueue(
args as z.infer<typeof IngestBodyRequest>,
userId,
);
return {
content: [
{
type: "text",
text: JSON.stringify(response),
},
],
};
} catch (error) {
console.error("MCP ingest error:", error);
return {
content: [
{
type: "text",
text: `Error ingesting data: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
},
);
// Register search tool
server.registerTool(
"search",
{
title: "Search Data",
description: "Search through ingested data",
inputSchema: SearchParamsSchema.shape,
},
async (args) => {
try {
const userId = authentication.userId;
const results = await searchService.search(args.query, userId, {
startTime: args.startTime ? new Date(args.startTime) : undefined,
endTime: args.endTime ? new Date(args.endTime) : undefined,
});
return {
content: [
{
type: "text",
text: JSON.stringify(results),
},
],
};
} catch (error) {
console.error("MCP search error:", error);
return {
content: [
{
type: "text",
text: `Error searching: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
},
);
// Connect to the MCP server
await server.connect(transport);
} else {
// Invalid request
throw new Error("Bad Request: No valid session ID provided");
}
const response = await handleTransport(transport, request, body);
return response;
} catch (error) {
console.error("MCP request error:", error);
return json(
{
jsonrpc: "2.0",
error: {
code: -32000,
message:
error instanceof Error ? error.message : "Internal server error",
},
id: body?.id || null,
},
{ status: 500 },
);
}
};
// Handle DELETE requests for session cleanup
const handleDelete = async (request: Request, authentication: any) => {
const sessionId = request.headers.get("mcp-session-id") as string | undefined;
if (!sessionId || !transports[sessionId]) {
return new Response("Invalid or missing session ID", { status: 400 });
}
const transport = transports[sessionId].transport;
// Clean up transport
transport.close();
delete transports[sessionId];
return new Response(null, { status: 204 });
};
const { action, loader } = createHybridActionApiRoute(
{
body: MCPRequestSchema,
allowJWT: true,
authorization: {
action: "mcp",
},
corsStrategy: "all",
},
async ({ body, authentication, request }) => {
const method = request.method;
if (method === "POST") {
return await handleMCPRequest(request, body, authentication);
} else if (method === "DELETE") {
return await handleDelete(request, authentication);
} else {
return json(
{
jsonrpc: "2.0",
error: {
code: -32601,
message: "Method not allowed",
},
id: null,
},
{ status: 405 },
);
}
},
);
export { action, loader };

View File

@ -0,0 +1,103 @@
import { type StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import {
type RequestId,
type JSONRPCMessage,
} from "@modelcontextprotocol/sdk/types.js";
import { json } from "@remix-run/node";
const convertRemixRequestToTransport = (remixRequest: Request) => {
return {
method: remixRequest.method,
url: remixRequest.url,
headers: Object.fromEntries(remixRequest.headers.entries()),
};
};
export const handleTransport = (
transport: StreamableHTTPServerTransport,
request: Request,
body: any,
) => {
let responseData: any;
let responseStatus = 200;
let responseHeaders: Record<string, string> = {};
return new Promise<Response>(async (resolve) => {
const captureResponse = {
// Node.js ServerResponse methods required by StreamableHTTPServerTransport
writeHead: (statusCode: number, headers?: Record<string, string>) => {
responseStatus = statusCode;
if (headers) {
Object.assign(responseHeaders, headers);
}
return captureResponse;
},
end: (chunk?: any) => {
responseData = chunk;
if (responseStatus !== 200) {
resolve(
json(
typeof responseData === "string"
? JSON.parse(responseData)
: responseData,
{
status: responseStatus,
headers: responseHeaders,
},
),
);
}
return captureResponse;
},
setHeader: (name: string, value: string) => {
responseHeaders[name] = value;
return captureResponse;
},
flushHeaders: () => {
// No-op for our mock, but required by transport
return captureResponse;
},
on: (event: string, callback: Function) => {
// Mock event handling - transport uses this for 'close' events
// In a real implementation, you'd want to handle cleanup
return captureResponse;
},
// Properties that transport may access
statusCode: responseStatus,
headersSent: false,
finished: false,
};
transport.send = async (
message: JSONRPCMessage,
options?: {
relatedRequestId?: RequestId;
},
) => {
responseData = message;
resolve(
json(
typeof responseData === "string"
? JSON.parse(responseData)
: responseData,
{
status: responseStatus,
headers: { ...responseHeaders, "Content-Type": "application/json" },
},
),
);
};
await transport.handleRequest(
convertRemixRequestToTransport(request) as any,
captureResponse as any,
body,
);
});
};