Fix: proxy server

This commit is contained in:
Harshith Mullapudi 2025-07-17 09:41:38 +05:30
parent af48e97166
commit 8bb46a2c4d
10 changed files with 157 additions and 77 deletions

View File

@ -62,8 +62,7 @@ export function IngestionRuleSection({
</div>
<div className="flex justify-end">
<Button
type="button"
variant="default"
variant="secondary"
disabled={
!ingestionRuleText.trim() ||
ingestionRuleFetcher.state === "submitting"
@ -78,4 +77,4 @@ export function IngestionRuleSection({
</div>
</div>
);
}
}

View File

@ -14,7 +14,7 @@ export function Section({
children,
}: SectionProps) {
return (
<div className="flex gap-6">
<div className="flex h-full gap-6">
<div className="flex w-[400px] shrink-0 flex-col">
{icon && <>{icon}</>}
<h3 className="text-lg"> {title} </h3>
@ -22,7 +22,7 @@ export function Section({
{metadata ? metadata : null}
</div>
<div className="grow">
<div className="flex h-full w-full justify-center">
<div className="flex h-full w-full justify-end overflow-auto">
<div className="flex h-full max-w-[76ch] grow flex-col gap-2">
{children}
</div>

View File

@ -1,6 +1,7 @@
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { createMCPProxy } from "@core/mcp-proxy";
import { getIntegrationDefinitionWithSlug } from "~/services/integrationDefinition.server";
import { proxyRequest } from "~/utils/proxy.server";
import { z } from "zod";
import { getIntegrationAccount } from "~/services/integrationAccount.server";
@ -59,53 +60,35 @@ const { action, loader } = createActionApiRoute(
);
}
const { serverUrl, transportStrategy } = spec.mcpAuth;
const { serverUrl } = spec.mcpAuth;
const mcpProxy = createMCPProxy(
{
serverUrl,
timeout: 30000,
debug: true,
transportStrategy: transportStrategy || "sse-first",
// Fix this
redirectUrl: "",
},
// Callback to load credentials from the database
async () => {
// Find the integration account for this user and integration
const integrationAccount = await getIntegrationAccount(
integrationDefinition.id,
authentication.userId,
);
const integrationConfig =
integrationAccount?.integrationConfiguration as any;
if (
!integrationAccount ||
!integrationConfig ||
!integrationConfig.mcp
) {
return null;
}
return {
serverUrl,
tokens: {
access_token: integrationConfig.mcp.tokens.access_token,
token_type: integrationConfig.mcp.tokens.token_type || "bearer",
expires_in: integrationConfig.mcp.tokens.expires_in || 3600,
refresh_token: integrationConfig.mcp.tokens.refresh_token,
scope: integrationConfig.mcp.tokens.scope || "read write",
},
expiresAt: integrationConfig.mcp.tokens.expiresAt
? new Date(integrationConfig.mcp.tokens.expiresAt)
: new Date(Date.now() + 3600 * 1000),
};
},
// Find the integration account for this user and integration
const integrationAccount = await getIntegrationAccount(
integrationDefinition.id,
authentication.userId,
);
return await mcpProxy(request, "");
const integrationConfig =
integrationAccount?.integrationConfiguration as any;
if (!integrationAccount || !integrationConfig || !integrationConfig.mcp) {
return new Response(
JSON.stringify({
error: "No integration account with mcp config",
}),
{
status: 400,
headers: { "Content-Type": "application/json" },
},
);
}
// Proxy the request to the serverUrl
return await proxyRequest(
request,
serverUrl,
integrationConfig.mcp.tokens.access_token,
);
} catch (error: any) {
console.error("MCP Proxy Error:", error);
return new Response(JSON.stringify({ error: error.message }), {

View File

@ -138,9 +138,8 @@ export default function IntegrationDetail() {
const hasMCPAuth = !!specData?.mcpAuth;
const Component = getIcon(integration.icon as IconType);
return (
<div className="p-4 px-5">
<div className="overflow-hidden p-4 px-5">
<Section
title={integration.name}
description={integration.description}

View File

@ -7,13 +7,11 @@ import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/
export class MCP {
private Client: any;
private clients: Record<string, any> = {};
private StdioTransport: any;
constructor() {}
public async init() {
this.Client = await MCP.importClient();
this.StdioTransport = await MCP.importStdioTransport();
}
private static async importClient() {
@ -28,18 +26,13 @@ export class MCP {
agents.map(async (agent) => {
return await this.connectToServer(
agent,
`${process.env.BACKEND_HOST}/api/v1/mcp/${agent}`,
`${process.env.API_BASE_URL}/api/v1/mcp/${agent}`,
headers,
);
}),
);
}
private static async importStdioTransport() {
const { StdioClientTransport } = await import("./stdio");
return StdioClientTransport;
}
async allTools(): Promise<ToolSet> {
const clientEntries = Object.entries(this.clients);

View File

@ -0,0 +1,73 @@
export async function proxyRequest(
request: Request,
targetUrl: string,
token: string,
): Promise<Response> {
try {
const targetURL = new URL(targetUrl);
const headers = new Headers();
// Copy relevant headers from the original request
const headersToProxy = [
"content-type",
"user-agent",
"accept",
"accept-language",
"accept-encoding",
"mcp-session-id",
"last-event-id",
];
headersToProxy.forEach((headerName) => {
const value = request.headers.get(headerName);
if (value) {
headers.set(headerName, value);
}
});
headers.set("Authorization", `Bearer ${token}`);
const body =
request.method !== "GET" && request.method !== "HEAD"
? await request.arrayBuffer()
: undefined;
const response = await fetch(targetURL.toString(), {
method: request.method,
headers,
body,
});
// Create response headers, excluding hop-by-hop headers
const responseHeaders = new Headers();
const headersToExclude = [
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
];
response.headers.forEach((value, key) => {
if (!headersToExclude.includes(key.toLowerCase())) {
responseHeaders.set(key, value);
}
});
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: responseHeaders,
});
} catch (error) {
console.error("Proxy request failed:", error);
return new Response(JSON.stringify({ error: "Proxy request failed" }), {
status: 500,
headers: { "Content-Type": "application/json" },
});
}
}

View File

@ -27,8 +27,8 @@ export default defineConfig({
build: {
extensions: [
syncEnvVars(() => ({
DATABASE_URL: process.env.DATABASE_URL,
BACKEND_HOST: process.env.BACKEND_HOST,
DATABASE_URL: process.env.DATABASE_URL as string,
API_BASE_URL: process.env.API_BASE_URL as string,
})),
prismaExtension({
schema: "prisma/schema.prisma",

View File

@ -82,12 +82,17 @@ export function createMCPProxy(
);
}
// Extract session ID and last event ID from incoming request
const clientSessionId = request.headers.get("Mcp-Session-Id");
const lastEventId = request.headers.get("Last-Event-Id");
// Create remote transport (connects to the MCP server) FIRST
const serverTransport = await createRemoteTransport(
credentials.serverUrl,
credentials,
config.redirectUrl,
config.transportStrategy || "sse-first"
config.transportStrategy || "sse-first",
{ sessionId: clientSessionId, lastEventId } // Pass both session and event IDs
);
// Start server transport and wait for connection
@ -146,18 +151,27 @@ export function createMCPProxy(
serverUrl: string,
credentials: StoredCredentials,
redirectUrl: string,
transportStrategy: TransportStrategy = "sse-first"
transportStrategy: TransportStrategy = "sse-first",
clientHeaders?: { sessionId?: string | null; lastEventId?: string | null }
): Promise<SSEClientTransport | StreamableHTTPClientTransport> {
// Create auth provider with stored credentials using common factory
const authProvider = await createAuthProviderForProxy(serverUrl, credentials, redirectUrl);
const url = new URL(serverUrl);
const headers = {
const headers: Record<string, string> = {
Authorization: `Bearer ${credentials.tokens.access_token}`,
"Content-Type": "application/json",
...config.headers,
};
// Add session and event headers if provided
if (clientHeaders?.sessionId) {
headers["Mcp-Session-Id"] = clientHeaders.sessionId;
}
if (clientHeaders?.lastEventId) {
headers["Last-Event-Id"] = clientHeaders.lastEventId;
}
// Create transport based on strategy (don't start yet)
let transport: SSEClientTransport | StreamableHTTPClientTransport;
@ -185,7 +199,6 @@ export function createMCPProxy(
} catch (error) {
console.warn("SSE transport failed, falling back to HTTP:", error);
transport = new StreamableHTTPClientTransport(url, {
authProvider,
requestInit: { headers },
});
}

View File

@ -22,24 +22,39 @@ export function createMCPTransportBridge(
const logError = debug ? console.error : () => {};
// Forward messages from client to server
clientTransport.onmessage = (message: any) => {
clientTransport.onmessage = (message: any, extra: any) => {
console.log(JSON.stringify(message));
log("[Client→Server]", message.method || message.id);
onMessage?.("client-to-server", message);
serverTransport.send(message).catch((error) => {
// Forward any extra parameters (like resumption tokens) to the server
const serverOptions: any = {};
if (extra?.relatedRequestId) {
serverOptions.relatedRequestId = extra.relatedRequestId;
}
serverTransport.send(message, serverOptions).catch((error) => {
logError("Error sending to server:", error);
onError?.(error, "server");
});
};
// Forward messages from server to client
serverTransport.onmessage = (message: any) => {
console.log(JSON.stringify(message));
serverTransport.onmessage = (message: any, extra: any) => {
console.log(JSON.stringify(message), JSON.stringify(extra));
log("[Server→Client]", message.method || message.id);
onMessage?.("server-to-client", message);
clientTransport.send(message).catch((error) => {
// Forward the server's session ID as resumption token to client
const clientOptions: any = {};
if (serverTransport.sessionId) {
clientOptions.resumptionToken = serverTransport.sessionId;
}
if (extra?.relatedRequestId) {
clientOptions.relatedRequestId = extra.relatedRequestId;
}
clientTransport.send(message, clientOptions).catch((error) => {
logError("Error sending to client:", error);
onError?.(error, "client");
});
@ -58,6 +73,7 @@ export function createMCPTransportBridge(
serverTransport.onclose = () => {
if (clientClosed) return;
serverClosed = true;
console.log("closing");
log("Server transport closed, closing client transport");
clientTransport.close().catch((error) => {
logError("Error closing client transport:", error);

View File

@ -12,6 +12,7 @@ export class RemixMCPTransport implements Transport {
private request: Request,
private sendResponse: (response: Response) => void
) {}
sessionId?: string;
setProtocolVersion?: (version: string) => void;
@ -55,15 +56,18 @@ export class RemixMCPTransport implements Transport {
throw new Error("Transport is closed");
}
// Prepare headers
const headers: Record<string, string> = {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
};
// Send the MCP response back as HTTP response
const response = new Response(JSON.stringify(message), {
status: 200,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
},
headers,
});
this.sendResponse(response);