1. Feat: added ingestion floating status

2. OAuth for mcp
This commit is contained in:
Harshith Mullapudi 2025-07-15 11:14:04 +05:30
parent 4de1d29fe4
commit ef320394d5
16 changed files with 284 additions and 191 deletions

View File

@ -0,0 +1,29 @@
import { LoaderCircle } from "lucide-react";
import { Card, CardContent } from "~/components/ui/card";
import { useIngestionStatus } from "~/hooks/use-ingestion-status";
export function FloatingIngestionStatus() {
const { data } = useIngestionStatus();
if (!data || data.count === 0) {
return null;
}
const processingCount = data.queue.filter(
(item) => item.status === "PROCESSING",
).length;
const pendingCount = data.queue.filter(
(item) => item.status === "PENDING",
).length;
return (
<div className="fixed right-4 bottom-4 z-50 max-w-sm">
<Card className="shadow">
<CardContent className="flex items-center gap-2 p-2">
<LoaderCircle className="text-primary h-4 w-4 animate-spin" />
<span>{processingCount + pendingCount} ingesting</span>
</CardContent>
</Card>
</div>
);
}

View File

@ -34,7 +34,6 @@ export function Icon(props: IconProps) {
);
}
console.error("Invalid icon", props);
return null;
}

View File

@ -0,0 +1,47 @@
import { cva, type VariantProps } from "class-variance-authority";
import React from "react";
import { cn } from "~/lib/utils";
const badgeVariants = cva(
"flex items-center h-5 gap-2 rounded-sm border px-1.5 py-0.5 text-sm transition-colors focus:outline-none focus:ring-2 focus:ring-ring focus:ring-offset-2",
{
variants: {
variant: {
default:
"border-transparent bg-primary text-primary-foreground shadow hover:bg-primary/80",
secondary: "border-none bg-grayAlpha-100",
destructive:
"border-transparent bg-destructive text-destructive-foreground shadow hover:bg-destructive/80",
outline: "text-foreground bg-background",
},
},
defaultVariants: {
variant: "default",
},
},
);
export interface BadgeProps
extends React.HTMLAttributes<HTMLDivElement>,
VariantProps<typeof badgeVariants> {}
function Badge({ className, variant, ...props }: BadgeProps) {
return (
<div className={cn(badgeVariants({ variant }), className)} {...props} />
);
}
interface BadgeColorProps extends React.HTMLAttributes<HTMLDivElement> {
className?: string;
}
function BadgeColor({ className, ...otherProps }: BadgeColorProps) {
return (
<span
className={cn("rounded-full", `h-1.5 w-1.5`, className)}
{...otherProps}
></span>
);
}
export { Badge, badgeVariants, BadgeColor };

View File

@ -0,0 +1,47 @@
import { useEffect, useState } from "react";
import { useFetcher } from "@remix-run/react";
export interface IngestionQueueItem {
id: string;
status: "PENDING" | "PROCESSING" | "COMPLETED" | "FAILED" | "CANCELLED";
createdAt: string;
error?: string;
data: any;
}
export interface IngestionStatusResponse {
queue: IngestionQueueItem[];
count: number;
}
export function useIngestionStatus() {
const fetcher = useFetcher<IngestionStatusResponse>();
const [isPolling, setIsPolling] = useState(false);
useEffect(() => {
const pollIngestionStatus = () => {
if (fetcher.state === "idle") {
fetcher.load("/api/v1/ingestion-queue/status");
}
};
// Initial load
pollIngestionStatus();
// Set up polling interval
const interval = setInterval(pollIngestionStatus, 3000); // Poll every 3 seconds
setIsPolling(true);
return () => {
clearInterval(interval);
setIsPolling(false);
};
}, [fetcher]);
return {
data: fetcher.data,
isLoading: fetcher.state === "loading",
isPolling,
error: fetcher.data === undefined && fetcher.state === "idle" ? "Error loading ingestion status" : null
};
}

View File

@ -1,79 +1,8 @@
// lib/ingest.queue.ts
import { Queue, Worker } from "bullmq";
import IORedis from "ioredis";
import { env } from "~/env.server";
import { KnowledgeGraphService } from "../services/knowledgeGraph.server";
import { z } from "zod";
import { EpisodeType } from "@core/types";
import { prisma } from "~/db.server";
import { IngestionStatus } from "@core/database";
import { logger } from "~/services/logger.service";
const connection = new IORedis({
port: env.REDIS_PORT,
host: env.REDIS_HOST,
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
const userQueues = new Map<string, Queue>();
const userWorkers = new Map<string, Worker>();
async function processUserJob(userId: string, job: any) {
try {
logger.log(`Processing job for user ${userId}`);
await prisma.ingestionQueue.update({
where: { id: job.data.queueId },
data: {
status: IngestionStatus.PROCESSING,
},
});
const knowledgeGraphService = new KnowledgeGraphService();
const episodeDetails = await knowledgeGraphService.addEpisode({
...job.data.body,
userId,
});
await prisma.ingestionQueue.update({
where: { id: job.data.queueId },
data: {
output: episodeDetails,
status: IngestionStatus.COMPLETED,
},
});
// your processing logic
} catch (err: any) {
await prisma.ingestionQueue.update({
where: { id: job.data.queueId },
data: {
error: err.message,
status: IngestionStatus.FAILED,
},
});
console.error(`Error processing job for user ${userId}:`, err);
}
}
export function getUserQueue(userId: string) {
if (!userQueues.has(userId)) {
const queueName = `ingest-user-${userId}`;
const queue = new Queue(queueName, { connection });
userQueues.set(userId, queue);
const worker = new Worker(queueName, (job) => processUserJob(userId, job), {
connection,
concurrency: 1,
});
userWorkers.set(userId, worker);
}
return userQueues.get(userId)!;
}
import { z } from "zod";
import { prisma } from "~/db.server";
import { ingestTask } from "~/trigger/ingest/ingest";
export const IngestBodyRequest = z.object({
episodeBody: z.string(),
@ -113,22 +42,14 @@ export const addToQueue = async (
},
});
const ingestionQueue = getUserQueue(userId);
const jobDetails = await ingestionQueue.add(
`ingest-user-${userId}`, // 👈 unique name per user
const handler = await ingestTask.trigger(
{ body, userId, workspaceId: user.Workspace.id, queueId: queuePersist.id },
{
queueId: queuePersist.id,
spaceId: body.spaceId,
userId: userId,
body,
},
{
jobId: `${userId}-${Date.now()}`, // unique per job but grouped under user
queue: "ingestion-queue",
concurrencyKey: userId,
tags: [user.id, queuePersist.id],
},
);
return {
id: jobDetails.id,
};
return { id: handler.id, token: handler.publicAccessToken };
};

View File

@ -2,11 +2,7 @@ import { json } from "@remix-run/node";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { getWorkspaceByUser } from "~/models/workspace.server";
import {
createConversation,
CreateConversationSchema,
readConversation,
} from "~/services/conversation.server";
import { readConversation } from "~/services/conversation.server";
import { z } from "zod";
export const ConversationIdSchema = z.object({

View File

@ -0,0 +1,40 @@
import { type LoaderFunctionArgs, json } from "@remix-run/node";
import { prisma } from "~/db.server";
import { requireUserId } from "~/services/session.server";
export async function loader({ request }: LoaderFunctionArgs) {
const userId = await requireUserId(request);
const user = await prisma.user.findUnique({
where: { id: userId },
include: { Workspace: true },
});
if (!user?.Workspace) {
throw new Response("Workspace not found", { status: 404 });
}
const activeIngestionQueue = await prisma.ingestionQueue.findMany({
where: {
workspaceId: user.Workspace.id,
status: {
in: ["PENDING", "PROCESSING"],
},
},
select: {
id: true,
status: true,
createdAt: true,
error: true,
data: true,
},
orderBy: {
createdAt: "desc",
},
});
return json({
queue: activeIngestionQueue,
count: activeIngestionQueue.length,
});
}

View File

@ -2,7 +2,10 @@ import { json } from "@remix-run/node";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { OAuthBodySchema } from "~/services/oauth/oauth-utils.server";
import { getRedirectURL, getRedirectURLForMCP } from "~/services/oauth/oauth.server";
import {
getRedirectURL,
getRedirectURLForMCP,
} from "~/services/oauth/oauth.server";
import { getWorkspaceByUser } from "~/models/workspace.server";
// This route handles the OAuth redirect URL generation, similar to the NestJS controller
@ -15,23 +18,13 @@ const { action, loader } = createActionApiRoute(
},
corsStrategy: "all",
},
async ({ body, authentication, request }) => {
async ({ body, authentication }) => {
const workspace = await getWorkspaceByUser(authentication.userId);
const url = new URL(request.url);
const isMCP = url.searchParams.get("mcp") === "true";
// Call the appropriate service based on MCP flag
const redirectURL = isMCP
? await getRedirectURLForMCP(
body,
authentication.userId,
workspace?.id,
)
: await getRedirectURL(
body,
authentication.userId,
workspace?.id,
);
const redirectURL = body.mcp
? await getRedirectURLForMCP(body, authentication.userId, workspace?.id)
: await getRedirectURL(body, authentication.userId, workspace?.id);
return json(redirectURL);
},

View File

@ -7,7 +7,8 @@ import { logger } from "~/services/logger.service";
import { env } from "~/env.server";
import { getIntegrationDefinitionForState } from "~/services/oauth/oauth.server";
const MCP_CALLBACK_URL = `${process.env.OAUTH_CALLBACK_URL ?? ""}/mcp`;
const CALLBACK_URL = `${env.APP_ORIGIN}/api/v1/oauth/callback`;
const MCP_CALLBACK_URL = `${CALLBACK_URL}/mcp`;
export async function loader({ request }: LoaderFunctionArgs) {
const url = new URL(request.url);
@ -80,7 +81,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
return new Response(null, {
status: 302,
headers: {
Location: `${redirectURL}/integrations?success=true&integrationName=${encodeURIComponent(
Location: `${redirectURL}?success=true&integrationName=${encodeURIComponent(
integrationDefinition.name,
)}`,
},
@ -91,7 +92,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
return new Response(null, {
status: 302,
headers: {
Location: `${redirectURL}/integrations?success=false&error=${encodeURIComponent(
Location: `${redirectURL}?success=false&error=${encodeURIComponent(
error.message || "OAuth callback failed",
)}`,
},

View File

@ -8,6 +8,7 @@ import { clearRedirectTo, commitSession } from "~/services/redirectTo.server";
import { AppSidebar } from "~/components/sidebar/app-sidebar";
import { SidebarInset, SidebarProvider } from "~/components/ui/sidebar";
import { SiteHeader } from "~/components/ui/header";
import { FloatingIngestionStatus } from "~/components/ingestion/floating-ingestion-status";
export const loader = async ({ request }: LoaderFunctionArgs) => {
const user = await requireUser(request);
@ -47,6 +48,7 @@ export default function Home() {
</div>
</div>
</div>
<FloatingIngestionStatus />
</SidebarInset>
</SidebarProvider>
);

View File

@ -30,6 +30,7 @@ export class OAuthBodyInterface {
export const OAuthBodySchema = z.object({
redirectURL: z.string(),
integrationDefinitionId: z.string(),
mcp: z.boolean().optional().default(false),
});
export type CallbackParams = Record<string, string>;

View File

@ -17,7 +17,7 @@ import { env } from "~/env.server";
import { createMCPAuthClient } from "@core/mcp-proxy";
// Use process.env for config in Remix
const CALLBACK_URL = process.env.OAUTH_CALLBACK_URL ?? "";
const CALLBACK_URL = `${env.APP_ORIGIN}/api/v1/oauth/callback`;
const MCP_CALLBACK_URL = `${CALLBACK_URL}/mcp`;
// Session store (in-memory, for single server)
@ -299,14 +299,18 @@ export async function getRedirectURLForMCP(
}
export async function getIntegrationDefinitionForState(state: string) {
if (!state) {
try {
if (!state) {
throw new Error("No state found");
}
const sessionRecord = mcpSession[state];
// Delete the session once it's used
delete mcpSession[state];
return sessionRecord;
} catch (e) {
throw new Error("No state found");
}
const sessionRecord = mcpSession[state];
// Delete the session once it's used
delete mcpSession[state];
return sessionRecord;
}

View File

@ -364,7 +364,7 @@
}
p.is-editor-empty:before {
@apply text-muted-foreground;
@apply text-muted-foreground/70;
font-size: 14px !important;
content: attr(data-placeholder);
@ -375,8 +375,6 @@
}
}
.title-bar-sigma {
user-select: none;
-webkit-user-select: none;

View File

@ -0,0 +1,64 @@
import { queue, task } from "@trigger.dev/sdk";
import { type z } from "zod";
import { KnowledgeGraphService } from "~/services/knowledgeGraph.server";
import { prisma } from "~/db.server";
import { IngestionStatus } from "@core/database";
import { logger } from "~/services/logger.service";
import { type IngestBodyRequest } from "~/lib/ingest.server";
const ingestionQueue = queue({
name: "ingestion-queue",
});
// Register the Trigger.dev task
export const ingestTask = task({
id: "ingest-episode",
queue: ingestionQueue,
run: async (payload: {
body: z.infer<typeof IngestBodyRequest>;
userId: string;
workspaceId: string;
queueId: string;
}) => {
try {
logger.log(`Processing job for user ${payload.userId}`);
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
status: IngestionStatus.PROCESSING,
},
});
const knowledgeGraphService = new KnowledgeGraphService();
const episodeBody = payload.body as any;
const episodeDetails = await knowledgeGraphService.addEpisode({
...episodeBody,
userId: payload.userId,
});
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
output: episodeDetails,
status: IngestionStatus.COMPLETED,
},
});
return { success: true, episodeDetails };
} catch (err: any) {
await prisma.ingestionQueue.update({
where: { id: payload.queueId },
data: {
error: err.message,
status: IngestionStatus.FAILED,
},
});
logger.error(`Error processing job for user ${payload.userId}:`, err);
return { success: false, error: err.message };
}
},
});

View File

@ -5,13 +5,12 @@ import {
MCPRemoteClientConfig,
AuthenticationResult,
ProxyConnectionConfig,
CredentialSaveCallback,
CredentialLoadCallback,
MCPProxyFunction,
StoredCredentials,
TransportStrategy,
} from "../types/remote-client.js";
import { MCPAuthProxyError, OAuthError } from "../utils/errors.js";
import { MCPAuthProxyError } from "../utils/errors.js";
import { NodeOAuthClientProvider } from "../lib/node-oauth-client-provider.js";
import { globalAuthStorage } from "../lib/in-memory-auth-storage.js";
import { getServerUrlHash } from "../lib/utils.js";
@ -28,11 +27,8 @@ import {
* @param onCredentialSave Callback to save credentials to your database
* @returns Authentication client with OAuth capabilities
*/
export function createMCPAuthClient(
config: MCPRemoteClientConfig,
onCredentialSave?: CredentialSaveCallback
): MCPAuthenticationClient {
return new MCPAuthenticationClient(config, onCredentialSave);
export function createMCPAuthClient(config: MCPRemoteClientConfig): MCPAuthenticationClient {
return new MCPAuthenticationClient(config);
}
/**
@ -267,12 +263,10 @@ export class MCPAuthenticationClient {
private authProvider: NodeOAuthClientProvider | null = null;
private client: Client | null = null;
constructor(
private config: MCPRemoteClientConfig,
private onCredentialSave?: CredentialSaveCallback
) {
constructor(private config: MCPRemoteClientConfig) {
this.serverUrlHash = getServerUrlHash(config.serverUrl);
console.log(config);
// Validate configuration
this.validateConfig();
}
@ -341,12 +335,12 @@ export class MCPAuthenticationClient {
const authProvider = this.getAuthProvider();
// State validation (if state is provided - for backward compatibility)
if (options.state) {
const providerState = authProvider.state?.() || "";
if (options.state !== providerState) {
throw new OAuthError("Invalid state parameter - possible CSRF attack");
}
}
// if (options.state) {
// const providerState = authProvider.state?.() || "";
// if (options.state !== providerState) {
// throw new OAuthError("Invalid state parameter - possible CSRF attack");
// }
// }
// Use the NodeOAuthClientProvider's completeAuth method
await authProvider.completeAuth({

View File

@ -1,7 +1,4 @@
import {
OAuthTokens,
OAuthClientInformationFull,
} from "@modelcontextprotocol/sdk/shared/auth.js";
import { OAuthTokens, OAuthClientInformationFull } from "@modelcontextprotocol/sdk/shared/auth.js";
import {
readFileSync,
writeFileSync,
@ -78,83 +75,47 @@ export class InMemoryAuthStorage {
clientInformation: OAuthClientInformationFull
): Promise<void> {
this.clientInfo.set(serverUrlHash, clientInformation);
this.saveTempFile(serverUrlHash, "clientInfo", clientInformation);
}
async getClientInformation(
serverUrlHash: string
): Promise<OAuthClientInformationFull | undefined> {
let clientInfo = this.clientInfo.get(serverUrlHash);
if (!clientInfo) {
// Try to load from temp file
clientInfo = this.loadTempFile<OAuthClientInformationFull>(
serverUrlHash,
"clientInfo"
) as any;
if (clientInfo) {
this.clientInfo.set(serverUrlHash, clientInfo);
}
}
return clientInfo || undefined;
}
// OAuth Tokens
async saveTokens(serverUrlHash: string, tokens: OAuthTokens): Promise<void> {
this.tokens.set(serverUrlHash, tokens);
this.saveTempFile(serverUrlHash, "tokens", tokens);
}
async getTokens(serverUrlHash: string): Promise<OAuthTokens | null> {
let tokens = this.tokens.get(serverUrlHash);
if (!tokens) {
// Try to load from temp file
tokens = this.loadTempFile<OAuthTokens>(serverUrlHash, "tokens") as any;
if (tokens) {
this.tokens.set(serverUrlHash, tokens);
}
}
return tokens || null;
}
// Code Verifiers (PKCE)
async saveCodeVerifier(
serverUrlHash: string,
codeVerifier: string
): Promise<void> {
async saveCodeVerifier(serverUrlHash: string, codeVerifier: string): Promise<void> {
this.codeVerifiers.set(serverUrlHash, codeVerifier);
this.saveTempFile(serverUrlHash, "codeVerifier", codeVerifier);
}
async getCodeVerifier(serverUrlHash: string): Promise<string | null> {
let codeVerifier = this.codeVerifiers.get(serverUrlHash);
if (!codeVerifier) {
// Try to load from temp file
codeVerifier = this.loadTempFile<string>(
serverUrlHash,
"codeVerifier"
) as string;
if (codeVerifier) {
this.codeVerifiers.set(serverUrlHash, codeVerifier);
}
}
return codeVerifier || null;
}
// OAuth States
async saveState(state: string, data: any): Promise<void> {
this.states.set(state, data);
this.saveTempFile(state, "state", data);
}
async getState(state: string): Promise<any | null> {
let stateData = this.states.get(state);
if (!stateData) {
// Try to load from temp file
stateData = this.loadTempFile<any>(state, "state");
if (stateData) {
this.states.set(state, stateData);
}
}
return stateData || null;
}
@ -248,11 +209,7 @@ export interface LockfileData {
class InMemoryLockManager {
private locks = new Map<string, LockfileData>();
async createLockfile(
serverUrlHash: string,
pid: number,
port: number
): Promise<void> {
async createLockfile(serverUrlHash: string, pid: number, port: number): Promise<void> {
this.locks.set(serverUrlHash, {
pid,
port,