diff --git a/apps/webapp/app/components/integrations/ingestion-rule-section.tsx b/apps/webapp/app/components/integrations/ingestion-rule-section.tsx index cd9b14b..cc10231 100644 --- a/apps/webapp/app/components/integrations/ingestion-rule-section.tsx +++ b/apps/webapp/app/components/integrations/ingestion-rule-section.tsx @@ -62,8 +62,7 @@ export function IngestionRuleSection({
); -} \ No newline at end of file +} diff --git a/apps/webapp/app/components/integrations/section.tsx b/apps/webapp/app/components/integrations/section.tsx index feb9189..d0c63fa 100644 --- a/apps/webapp/app/components/integrations/section.tsx +++ b/apps/webapp/app/components/integrations/section.tsx @@ -14,7 +14,7 @@ export function Section({ children, }: SectionProps) { return ( -
+
{icon && <>{icon}}

{title}

@@ -22,7 +22,7 @@ export function Section({ {metadata ? metadata : null}
-
+
{children}
diff --git a/apps/webapp/app/routes/api.v1.mcp.$slug.tsx b/apps/webapp/app/routes/api.v1.mcp.$slug.tsx index 6cfa568..ca34659 100644 --- a/apps/webapp/app/routes/api.v1.mcp.$slug.tsx +++ b/apps/webapp/app/routes/api.v1.mcp.$slug.tsx @@ -1,6 +1,7 @@ import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { createMCPProxy } from "@core/mcp-proxy"; + import { getIntegrationDefinitionWithSlug } from "~/services/integrationDefinition.server"; +import { proxyRequest } from "~/utils/proxy.server"; import { z } from "zod"; import { getIntegrationAccount } from "~/services/integrationAccount.server"; @@ -59,53 +60,35 @@ const { action, loader } = createActionApiRoute( ); } - const { serverUrl, transportStrategy } = spec.mcpAuth; + const { serverUrl } = spec.mcpAuth; - const mcpProxy = createMCPProxy( - { - serverUrl, - timeout: 30000, - debug: true, - transportStrategy: transportStrategy || "sse-first", - // Fix this - redirectUrl: "", - }, - // Callback to load credentials from the database - async () => { - // Find the integration account for this user and integration - const integrationAccount = await getIntegrationAccount( - integrationDefinition.id, - authentication.userId, - ); - - const integrationConfig = - integrationAccount?.integrationConfiguration as any; - - if ( - !integrationAccount || - !integrationConfig || - !integrationConfig.mcp - ) { - return null; - } - - return { - serverUrl, - tokens: { - access_token: integrationConfig.mcp.tokens.access_token, - token_type: integrationConfig.mcp.tokens.token_type || "bearer", - expires_in: integrationConfig.mcp.tokens.expires_in || 3600, - refresh_token: integrationConfig.mcp.tokens.refresh_token, - scope: integrationConfig.mcp.tokens.scope || "read write", - }, - expiresAt: integrationConfig.mcp.tokens.expiresAt - ? new Date(integrationConfig.mcp.tokens.expiresAt) - : new Date(Date.now() + 3600 * 1000), - }; - }, + // Find the integration account for this user and integration + const integrationAccount = await getIntegrationAccount( + integrationDefinition.id, + authentication.userId, ); - return await mcpProxy(request, ""); + const integrationConfig = + integrationAccount?.integrationConfiguration as any; + + if (!integrationAccount || !integrationConfig || !integrationConfig.mcp) { + return new Response( + JSON.stringify({ + error: "No integration account with mcp config", + }), + { + status: 400, + headers: { "Content-Type": "application/json" }, + }, + ); + } + + // Proxy the request to the serverUrl + return await proxyRequest( + request, + serverUrl, + integrationConfig.mcp.tokens.access_token, + ); } catch (error: any) { console.error("MCP Proxy Error:", error); return new Response(JSON.stringify({ error: error.message }), { diff --git a/apps/webapp/app/routes/home.integration.$slug.tsx b/apps/webapp/app/routes/home.integration.$slug.tsx index cfa9e1d..05a64f5 100644 --- a/apps/webapp/app/routes/home.integration.$slug.tsx +++ b/apps/webapp/app/routes/home.integration.$slug.tsx @@ -138,9 +138,8 @@ export default function IntegrationDetail() { const hasMCPAuth = !!specData?.mcpAuth; const Component = getIcon(integration.icon as IconType); - return ( -
+
= {}; - private StdioTransport: any; constructor() {} public async init() { this.Client = await MCP.importClient(); - this.StdioTransport = await MCP.importStdioTransport(); } private static async importClient() { @@ -28,18 +26,13 @@ export class MCP { agents.map(async (agent) => { return await this.connectToServer( agent, - `${process.env.BACKEND_HOST}/api/v1/mcp/${agent}`, + `${process.env.API_BASE_URL}/api/v1/mcp/${agent}`, headers, ); }), ); } - private static async importStdioTransport() { - const { StdioClientTransport } = await import("./stdio"); - return StdioClientTransport; - } - async allTools(): Promise { const clientEntries = Object.entries(this.clients); diff --git a/apps/webapp/app/utils/proxy.server.ts b/apps/webapp/app/utils/proxy.server.ts new file mode 100644 index 0000000..5022a8b --- /dev/null +++ b/apps/webapp/app/utils/proxy.server.ts @@ -0,0 +1,73 @@ +export async function proxyRequest( + request: Request, + targetUrl: string, + token: string, +): Promise { + try { + const targetURL = new URL(targetUrl); + + const headers = new Headers(); + + // Copy relevant headers from the original request + const headersToProxy = [ + "content-type", + "user-agent", + "accept", + "accept-language", + "accept-encoding", + "mcp-session-id", + "last-event-id", + ]; + + headersToProxy.forEach((headerName) => { + const value = request.headers.get(headerName); + if (value) { + headers.set(headerName, value); + } + }); + + headers.set("Authorization", `Bearer ${token}`); + + const body = + request.method !== "GET" && request.method !== "HEAD" + ? await request.arrayBuffer() + : undefined; + + const response = await fetch(targetURL.toString(), { + method: request.method, + headers, + body, + }); + + // Create response headers, excluding hop-by-hop headers + const responseHeaders = new Headers(); + const headersToExclude = [ + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", + "transfer-encoding", + "upgrade", + ]; + + response.headers.forEach((value, key) => { + if (!headersToExclude.includes(key.toLowerCase())) { + responseHeaders.set(key, value); + } + }); + + return new Response(response.body, { + status: response.status, + statusText: response.statusText, + headers: responseHeaders, + }); + } catch (error) { + console.error("Proxy request failed:", error); + return new Response(JSON.stringify({ error: "Proxy request failed" }), { + status: 500, + headers: { "Content-Type": "application/json" }, + }); + } +} diff --git a/apps/webapp/trigger.config.ts b/apps/webapp/trigger.config.ts index 4743a3f..215078e 100644 --- a/apps/webapp/trigger.config.ts +++ b/apps/webapp/trigger.config.ts @@ -27,8 +27,8 @@ export default defineConfig({ build: { extensions: [ syncEnvVars(() => ({ - DATABASE_URL: process.env.DATABASE_URL, - BACKEND_HOST: process.env.BACKEND_HOST, + DATABASE_URL: process.env.DATABASE_URL as string, + API_BASE_URL: process.env.API_BASE_URL as string, })), prismaExtension({ schema: "prisma/schema.prisma", diff --git a/packages/mcp-proxy/src/core/mcp-remote-client.ts b/packages/mcp-proxy/src/core/mcp-remote-client.ts index cbaf920..26f9975 100644 --- a/packages/mcp-proxy/src/core/mcp-remote-client.ts +++ b/packages/mcp-proxy/src/core/mcp-remote-client.ts @@ -82,12 +82,17 @@ export function createMCPProxy( ); } + // Extract session ID and last event ID from incoming request + const clientSessionId = request.headers.get("Mcp-Session-Id"); + const lastEventId = request.headers.get("Last-Event-Id"); + // Create remote transport (connects to the MCP server) FIRST const serverTransport = await createRemoteTransport( credentials.serverUrl, credentials, config.redirectUrl, - config.transportStrategy || "sse-first" + config.transportStrategy || "sse-first", + { sessionId: clientSessionId, lastEventId } // Pass both session and event IDs ); // Start server transport and wait for connection @@ -146,18 +151,27 @@ export function createMCPProxy( serverUrl: string, credentials: StoredCredentials, redirectUrl: string, - transportStrategy: TransportStrategy = "sse-first" + transportStrategy: TransportStrategy = "sse-first", + clientHeaders?: { sessionId?: string | null; lastEventId?: string | null } ): Promise { // Create auth provider with stored credentials using common factory const authProvider = await createAuthProviderForProxy(serverUrl, credentials, redirectUrl); const url = new URL(serverUrl); - const headers = { + const headers: Record = { Authorization: `Bearer ${credentials.tokens.access_token}`, "Content-Type": "application/json", ...config.headers, }; + // Add session and event headers if provided + if (clientHeaders?.sessionId) { + headers["Mcp-Session-Id"] = clientHeaders.sessionId; + } + if (clientHeaders?.lastEventId) { + headers["Last-Event-Id"] = clientHeaders.lastEventId; + } + // Create transport based on strategy (don't start yet) let transport: SSEClientTransport | StreamableHTTPClientTransport; @@ -185,7 +199,6 @@ export function createMCPProxy( } catch (error) { console.warn("SSE transport failed, falling back to HTTP:", error); transport = new StreamableHTTPClientTransport(url, { - authProvider, requestInit: { headers }, }); } diff --git a/packages/mcp-proxy/src/utils/mcp-transport-bridge.ts b/packages/mcp-proxy/src/utils/mcp-transport-bridge.ts index c341790..c71d575 100644 --- a/packages/mcp-proxy/src/utils/mcp-transport-bridge.ts +++ b/packages/mcp-proxy/src/utils/mcp-transport-bridge.ts @@ -22,24 +22,39 @@ export function createMCPTransportBridge( const logError = debug ? console.error : () => {}; // Forward messages from client to server - clientTransport.onmessage = (message: any) => { + clientTransport.onmessage = (message: any, extra: any) => { console.log(JSON.stringify(message)); log("[Client→Server]", message.method || message.id); onMessage?.("client-to-server", message); - serverTransport.send(message).catch((error) => { + // Forward any extra parameters (like resumption tokens) to the server + const serverOptions: any = {}; + if (extra?.relatedRequestId) { + serverOptions.relatedRequestId = extra.relatedRequestId; + } + + serverTransport.send(message, serverOptions).catch((error) => { logError("Error sending to server:", error); onError?.(error, "server"); }); }; // Forward messages from server to client - serverTransport.onmessage = (message: any) => { - console.log(JSON.stringify(message)); + serverTransport.onmessage = (message: any, extra: any) => { + console.log(JSON.stringify(message), JSON.stringify(extra)); log("[Server→Client]", message.method || message.id); onMessage?.("server-to-client", message); - clientTransport.send(message).catch((error) => { + // Forward the server's session ID as resumption token to client + const clientOptions: any = {}; + if (serverTransport.sessionId) { + clientOptions.resumptionToken = serverTransport.sessionId; + } + if (extra?.relatedRequestId) { + clientOptions.relatedRequestId = extra.relatedRequestId; + } + + clientTransport.send(message, clientOptions).catch((error) => { logError("Error sending to client:", error); onError?.(error, "client"); }); @@ -58,6 +73,7 @@ export function createMCPTransportBridge( serverTransport.onclose = () => { if (clientClosed) return; serverClosed = true; + console.log("closing"); log("Server transport closed, closing client transport"); clientTransport.close().catch((error) => { logError("Error closing client transport:", error); diff --git a/packages/mcp-proxy/src/utils/mcp-transport.ts b/packages/mcp-proxy/src/utils/mcp-transport.ts index bbac32d..f26d6f7 100644 --- a/packages/mcp-proxy/src/utils/mcp-transport.ts +++ b/packages/mcp-proxy/src/utils/mcp-transport.ts @@ -12,6 +12,7 @@ export class RemixMCPTransport implements Transport { private request: Request, private sendResponse: (response: Response) => void ) {} + sessionId?: string; setProtocolVersion?: (version: string) => void; @@ -55,15 +56,18 @@ export class RemixMCPTransport implements Transport { throw new Error("Transport is closed"); } + // Prepare headers + const headers: Record = { + "Content-Type": "application/json", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, Authorization", + }; + // Send the MCP response back as HTTP response const response = new Response(JSON.stringify(message), { status: 200, - headers: { - "Content-Type": "application/json", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "POST, OPTIONS", - "Access-Control-Allow-Headers": "Content-Type, Authorization", - }, + headers, }); this.sendResponse(response);