From c7954b30f64c72612f021a9b3193e585b821e240 Mon Sep 17 00:00:00 2001 From: Manoj K Date: Sun, 27 Jul 2025 12:07:28 +0530 Subject: [PATCH] Fix: integration account webhooks --- .../api.v1.integration_account.disconnect.tsx | 3 +- .../trigger/integrations/integration-run.ts | 2 + apps/webapp/app/trigger/temp.ts | 73 ++++++++++++++++-- .../webhooks/integration-webhook-delivery.ts | 74 ++++++++++++------- .../app/trigger/webhooks/webhook-delivery.ts | 42 ++++++++++- apps/webapp/trigger.config.ts | 20 ++--- 6 files changed, 167 insertions(+), 47 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.integration_account.disconnect.tsx b/apps/webapp/app/routes/api.v1.integration_account.disconnect.tsx index 2c976bc..95defab 100644 --- a/apps/webapp/app/routes/api.v1.integration_account.disconnect.tsx +++ b/apps/webapp/app/routes/api.v1.integration_account.disconnect.tsx @@ -22,11 +22,9 @@ export async function action({ request }: ActionFunctionArgs) { ); } - // Soft delete the integration account by setting deletedAt const updatedAccount = await prisma.integrationAccount.delete({ where: { id: integrationAccountId, - deleted: null, }, }); @@ -34,6 +32,7 @@ export async function action({ request }: ActionFunctionArgs) { integrationAccountId, userId, "integration.disconnected", + updatedAccount.workspaceId, ); logger.info("Integration account disconnected (soft deleted)", { diff --git a/apps/webapp/app/trigger/integrations/integration-run.ts b/apps/webapp/app/trigger/integrations/integration-run.ts index d8d0e6e..1792a19 100644 --- a/apps/webapp/app/trigger/integrations/integration-run.ts +++ b/apps/webapp/app/trigger/integrations/integration-run.ts @@ -232,6 +232,7 @@ async function handleAccountMessage( integrationAccountId, userId, "mcp.connected", + workspaceId, ); return config; } @@ -255,6 +256,7 @@ async function handleAccountMessage( integrationAccount.id, userId, "integration.connected", + workspaceId, ); } catch (error) { logger.error("Failed to trigger OAuth integration webhook", { diff --git a/apps/webapp/app/trigger/temp.ts b/apps/webapp/app/trigger/temp.ts index 780467c..ee30ab5 100644 --- a/apps/webapp/app/trigger/temp.ts +++ b/apps/webapp/app/trigger/temp.ts @@ -21,7 +21,7 @@ interface BatchResult { export const entity = queue({ name: "entity-queue", - concurrencyLimit: 10, + concurrencyLimit: 5, }); /** @@ -31,19 +31,30 @@ export const updateAllEntityEmbeddings = task({ id: "update-all-entity-embeddings", machine: "large-1x", - run: async (payload: { userId?: string; batchSize?: number } = {}) => { - const { userId, batchSize = 100 } = payload; + run: async ( + payload: { + userId?: string; + batchSize?: number; + forceUpdate?: boolean; + } = {}, + ) => { + const { userId, batchSize = 50, forceUpdate = false } = payload; logger.info("Starting entity embeddings update with fan-out approach", { userId, batchSize, + forceUpdate, targetScope: userId ? `user ${userId}` : "all users", }); try { - // Step 1: Fetch all entities - const entities = await getAllEntities(userId); - logger.info(`Found ${entities.length} entities to update`); + // Step 1: Fetch entities (either all or only those needing updates) + const entities = forceUpdate + ? await getAllEntitiesForceRefresh(userId) + : await getAllEntities(userId); + logger.info(`Found ${entities.length} entities to update`, { + strategy: forceUpdate ? "force-refresh-all" : "missing-embeddings-only", + }); if (entities.length === 0) { return { @@ -192,9 +203,56 @@ export const updateEntityBatch = task({ }); /** - * Fetch all entities from Neo4j database + * Fetch all entities from Neo4j database that need embedding updates */ async function getAllEntities(userId?: string): Promise { + try { + // Only fetch entities that either: + // 1. Have null/empty embeddings, OR + // 2. Have embeddings but might need updates (optional: add timestamp check) + const query = userId + ? `MATCH (entity:Entity {userId: $userId}) + WHERE entity.nameEmbedding IS NULL + OR entity.typeEmbedding IS NULL + OR size(entity.nameEmbedding) = 0 + OR size(entity.typeEmbedding) = 0 + RETURN entity ORDER BY entity.createdAt` + : `MATCH (entity:Entity) + WHERE entity.nameEmbedding IS NULL + OR entity.typeEmbedding IS NULL + OR size(entity.nameEmbedding) = 0 + OR size(entity.typeEmbedding) = 0 + RETURN entity ORDER BY entity.createdAt`; + + const params = userId ? { userId } : {}; + const records = await runQuery(query, params); + + return records.map((record) => { + const entityProps = record.get("entity").properties; + return { + uuid: entityProps.uuid, + name: entityProps.name, + type: entityProps.type, + attributes: JSON.parse(entityProps.attributes || "{}"), + nameEmbedding: entityProps.nameEmbedding || [], + typeEmbedding: entityProps.typeEmbedding || [], + createdAt: new Date(entityProps.createdAt), + userId: entityProps.userId, + space: entityProps.space, + }; + }); + } catch (error) { + logger.error("Error fetching entities:", { error }); + throw new Error(`Failed to fetch entities: ${error}`); + } +} + +/** + * Fetch ALL entities from Neo4j database (for force refresh) + */ +async function getAllEntitiesForceRefresh( + userId?: string, +): Promise { try { const query = userId ? `MATCH (entity:Entity {userId: $userId}) RETURN entity ORDER BY entity.createdAt` @@ -287,6 +345,7 @@ export async function triggerEntityEmbeddingsUpdate( options: { userId?: string; batchSize?: number; + forceUpdate?: boolean; } = {}, ) { try { diff --git a/apps/webapp/app/trigger/webhooks/integration-webhook-delivery.ts b/apps/webapp/app/trigger/webhooks/integration-webhook-delivery.ts index 892e1cb..aff4794 100644 --- a/apps/webapp/app/trigger/webhooks/integration-webhook-delivery.ts +++ b/apps/webapp/app/trigger/webhooks/integration-webhook-delivery.ts @@ -17,6 +17,7 @@ interface OAuthIntegrationWebhookPayload { integrationAccountId: string; eventType: WebhookEventType; userId: string; + workspaceId: string; } export const integrationWebhookTask = task({ @@ -36,11 +37,51 @@ export const integrationWebhookTask = task({ }, }); - if (!integrationAccount) { + let webhookPayload: any = {}; + + if ( + !integrationAccount && + payload.eventType === "integration.disconnected" + ) { + webhookPayload = { + event: payload.eventType, + user_id: payload.userId, + integration: { + id: payload.integrationAccountId, + }, + }; + } else if (!integrationAccount) { logger.error( `Integration account ${payload.integrationAccountId} not found`, ); return { success: false, error: "Integration account not found" }; + } else { + const integrationConfig = + integrationAccount.integrationConfiguration as any; + + const integrationSpec = integrationAccount.integrationDefinition + .spec as any; + let mcpEndpoint = undefined; + + if (integrationSpec.mcp) { + mcpEndpoint = `${process.env.API_BASE_URL}/api/v1/mcp/${integrationAccount.integrationDefinition.slug}`; + } else if (integrationSpec.mcp.type === "stdio") { + mcpEndpoint = `${process.env.API_BASE_URL}/api/v1/mcp/${integrationAccount.integrationDefinition.slug}`; + } + + // Prepare webhook payload + webhookPayload = { + event: payload.eventType, + user_id: payload.userId, + integration: { + id: integrationAccount.id, + provider: integrationAccount.integrationDefinition.slug, + mcp_endpoint: mcpEndpoint, + name: integrationAccount.integrationDefinition.name, + icon: integrationAccount.integrationDefinition.icon, + }, + timestamp: new Date().toISOString(), + }; } // Get all OAuth clients that: @@ -48,13 +89,15 @@ export const integrationWebhookTask = task({ // 2. Have webhook URLs configured const oauthClients = await prisma.oAuthClientInstallation.findMany({ where: { - workspaceId: integrationAccount.workspaceId, + workspaceId: payload.workspaceId, installedById: payload.userId, isActive: true, - // Check if client has integration scope in allowedScopes grantedScopes: { contains: "integration", }, + oauthClient: { + clientType: "regular", + }, }, select: { id: true, @@ -77,24 +120,6 @@ export const integrationWebhookTask = task({ return { success: true, message: "No OAuth clients to notify" }; } - const integrationConfig = - integrationAccount.integrationConfiguration as any; - // Prepare webhook payload - const webhookPayload = { - event: payload.eventType, - user_id: payload.userId, - integration: { - id: integrationAccount.id, - provider: integrationAccount.integrationDefinition.slug, - mcp_endpoint: integrationConfig.mcp - ? `${process.env.API_BASE_URL}/api/v1/mcp/${integrationAccount.integrationDefinition.slug}` - : undefined, - name: integrationAccount.integrationDefinition.name, - icon: integrationAccount.integrationDefinition.icon, - }, - timestamp: new Date().toISOString(), - }; - // Convert OAuth clients to targets const targets: WebhookTarget[] = oauthClients .filter((client) => client.oauthClient?.webhookUrl) @@ -116,11 +141,6 @@ export const integrationWebhookTask = task({ logger.log( `OAuth integration webhook delivery completed: ${successfulDeliveries}/${totalDeliveries} successful`, - { - integrationId: integrationAccount.id, - integrationProvider: integrationAccount.integrationDefinition.slug, - userId: payload.userId, - }, ); return { @@ -151,12 +171,14 @@ export async function triggerIntegrationWebhook( integrationAccountId: string, userId: string, eventType: WebhookEventType, + workspaceId: string, ) { try { await integrationWebhookTask.trigger({ integrationAccountId, userId, eventType, + workspaceId, }); logger.log( `Triggered OAuth integration webhook delivery for integration account ${integrationAccountId}`, diff --git a/apps/webapp/app/trigger/webhooks/webhook-delivery.ts b/apps/webapp/app/trigger/webhooks/webhook-delivery.ts index 1f17797..43f51db 100644 --- a/apps/webapp/app/trigger/webhooks/webhook-delivery.ts +++ b/apps/webapp/app/trigger/webhooks/webhook-delivery.ts @@ -51,9 +51,38 @@ export const webhookDeliveryTask = task({ workspaceId: payload.workspaceId, isActive: true, }, + select: { + id: true, + url: true, + secret: true, + }, }); - if (webhooks.length === 0) { + const oauthClients = await prisma.oAuthClientInstallation.findMany({ + where: { + workspaceId: activity.workspaceId, + installedById: activity.workspace.userId!, + isActive: true, + grantedScopes: { + contains: "integration", + }, + oauthClient: { + clientType: "regular", + }, + }, + select: { + id: true, + oauthClient: { + select: { + clientId: true, + webhookUrl: true, + webhookSecret: true, + }, + }, + }, + }); + + if (webhooks.length === 0 && oauthClients.length === 0) { logger.log( `No active webhooks found for workspace ${payload.workspaceId}`, ); @@ -87,7 +116,16 @@ export const webhookDeliveryTask = task({ }; // Convert webhooks to targets using common utils - const targets = prepareWebhookTargets(webhooks); + const targets = prepareWebhookTargets( + [...webhooks, ...oauthClients].map((webhook) => ({ + url: "url" in webhook ? webhook.url : webhook.oauthClient.webhookUrl!, + secret: + "secret" in webhook + ? webhook.secret + : webhook.oauthClient.webhookSecret, + id: webhook.id, + })), + ); // Use common delivery function const result = await deliverWebhook({ diff --git a/apps/webapp/trigger.config.ts b/apps/webapp/trigger.config.ts index 8e0aeea..a048e88 100644 --- a/apps/webapp/trigger.config.ts +++ b/apps/webapp/trigger.config.ts @@ -27,16 +27,16 @@ export default defineConfig({ build: { extensions: [ syncEnvVars(() => ({ - ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY as string, - API_BASE_URL: process.env.API_BASE_URL as string, - DATABASE_URL: process.env.DATABASE_URL as string, - EMBEDDING_MODEL: process.env.EMBEDDING_MODEL as string, - ENCRYPTION_KEY: process.env.ENCRYPTION_KEY as string, - MODEL: process.env.MODEL ?? "gpt-4.1-2025-04-14", - NEO4J_PASSWORD: process.env.NEO4J_PASSWORD as string, - NEO4J_URI: process.env.NEO4J_URI as string, - NEO4J_USERNAME: process.env.NEO4J_USERNAME as string, - OPENAI_API_KEY: process.env.OPENAI_API_KEY as string, + // ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY as string, + // API_BASE_URL: process.env.API_BASE_URL as string, + // DATABASE_URL: process.env.DATABASE_URL as string, + // EMBEDDING_MODEL: process.env.EMBEDDING_MODEL as string, + // ENCRYPTION_KEY: process.env.ENCRYPTION_KEY as string, + // MODEL: process.env.MODEL ?? "gpt-4.1-2025-04-14", + // NEO4J_PASSWORD: process.env.NEO4J_PASSWORD as string, + // NEO4J_URI: process.env.NEO4J_URI as string, + // NEO4J_USERNAME: process.env.NEO4J_USERNAME as string, + // OPENAI_API_KEY: process.env.OPENAI_API_KEY as string, })), prismaExtension({ schema: "prisma/schema.prisma",