Fix: integration account webhooks

This commit is contained in:
Manoj K 2025-07-27 12:07:28 +05:30 committed by Harshith Mullapudi
parent 0dad877166
commit c7954b30f6
6 changed files with 167 additions and 47 deletions

View File

@ -22,11 +22,9 @@ export async function action({ request }: ActionFunctionArgs) {
);
}
// Soft delete the integration account by setting deletedAt
const updatedAccount = await prisma.integrationAccount.delete({
where: {
id: integrationAccountId,
deleted: null,
},
});
@ -34,6 +32,7 @@ export async function action({ request }: ActionFunctionArgs) {
integrationAccountId,
userId,
"integration.disconnected",
updatedAccount.workspaceId,
);
logger.info("Integration account disconnected (soft deleted)", {

View File

@ -232,6 +232,7 @@ async function handleAccountMessage(
integrationAccountId,
userId,
"mcp.connected",
workspaceId,
);
return config;
}
@ -255,6 +256,7 @@ async function handleAccountMessage(
integrationAccount.id,
userId,
"integration.connected",
workspaceId,
);
} catch (error) {
logger.error("Failed to trigger OAuth integration webhook", {

View File

@ -21,7 +21,7 @@ interface BatchResult {
export const entity = queue({
name: "entity-queue",
concurrencyLimit: 10,
concurrencyLimit: 5,
});
/**
@ -31,19 +31,30 @@ export const updateAllEntityEmbeddings = task({
id: "update-all-entity-embeddings",
machine: "large-1x",
run: async (payload: { userId?: string; batchSize?: number } = {}) => {
const { userId, batchSize = 100 } = payload;
run: async (
payload: {
userId?: string;
batchSize?: number;
forceUpdate?: boolean;
} = {},
) => {
const { userId, batchSize = 50, forceUpdate = false } = payload;
logger.info("Starting entity embeddings update with fan-out approach", {
userId,
batchSize,
forceUpdate,
targetScope: userId ? `user ${userId}` : "all users",
});
try {
// Step 1: Fetch all entities
const entities = await getAllEntities(userId);
logger.info(`Found ${entities.length} entities to update`);
// Step 1: Fetch entities (either all or only those needing updates)
const entities = forceUpdate
? await getAllEntitiesForceRefresh(userId)
: await getAllEntities(userId);
logger.info(`Found ${entities.length} entities to update`, {
strategy: forceUpdate ? "force-refresh-all" : "missing-embeddings-only",
});
if (entities.length === 0) {
return {
@ -192,9 +203,56 @@ export const updateEntityBatch = task({
});
/**
* Fetch all entities from Neo4j database
* Fetch all entities from Neo4j database that need embedding updates
*/
async function getAllEntities(userId?: string): Promise<EntityNode[]> {
try {
// Only fetch entities that either:
// 1. Have null/empty embeddings, OR
// 2. Have embeddings but might need updates (optional: add timestamp check)
const query = userId
? `MATCH (entity:Entity {userId: $userId})
WHERE entity.nameEmbedding IS NULL
OR entity.typeEmbedding IS NULL
OR size(entity.nameEmbedding) = 0
OR size(entity.typeEmbedding) = 0
RETURN entity ORDER BY entity.createdAt`
: `MATCH (entity:Entity)
WHERE entity.nameEmbedding IS NULL
OR entity.typeEmbedding IS NULL
OR size(entity.nameEmbedding) = 0
OR size(entity.typeEmbedding) = 0
RETURN entity ORDER BY entity.createdAt`;
const params = userId ? { userId } : {};
const records = await runQuery(query, params);
return records.map((record) => {
const entityProps = record.get("entity").properties;
return {
uuid: entityProps.uuid,
name: entityProps.name,
type: entityProps.type,
attributes: JSON.parse(entityProps.attributes || "{}"),
nameEmbedding: entityProps.nameEmbedding || [],
typeEmbedding: entityProps.typeEmbedding || [],
createdAt: new Date(entityProps.createdAt),
userId: entityProps.userId,
space: entityProps.space,
};
});
} catch (error) {
logger.error("Error fetching entities:", { error });
throw new Error(`Failed to fetch entities: ${error}`);
}
}
/**
* Fetch ALL entities from Neo4j database (for force refresh)
*/
async function getAllEntitiesForceRefresh(
userId?: string,
): Promise<EntityNode[]> {
try {
const query = userId
? `MATCH (entity:Entity {userId: $userId}) RETURN entity ORDER BY entity.createdAt`
@ -287,6 +345,7 @@ export async function triggerEntityEmbeddingsUpdate(
options: {
userId?: string;
batchSize?: number;
forceUpdate?: boolean;
} = {},
) {
try {

View File

@ -17,6 +17,7 @@ interface OAuthIntegrationWebhookPayload {
integrationAccountId: string;
eventType: WebhookEventType;
userId: string;
workspaceId: string;
}
export const integrationWebhookTask = task({
@ -36,11 +37,51 @@ export const integrationWebhookTask = task({
},
});
if (!integrationAccount) {
let webhookPayload: any = {};
if (
!integrationAccount &&
payload.eventType === "integration.disconnected"
) {
webhookPayload = {
event: payload.eventType,
user_id: payload.userId,
integration: {
id: payload.integrationAccountId,
},
};
} else if (!integrationAccount) {
logger.error(
`Integration account ${payload.integrationAccountId} not found`,
);
return { success: false, error: "Integration account not found" };
} else {
const integrationConfig =
integrationAccount.integrationConfiguration as any;
const integrationSpec = integrationAccount.integrationDefinition
.spec as any;
let mcpEndpoint = undefined;
if (integrationSpec.mcp) {
mcpEndpoint = `${process.env.API_BASE_URL}/api/v1/mcp/${integrationAccount.integrationDefinition.slug}`;
} else if (integrationSpec.mcp.type === "stdio") {
mcpEndpoint = `${process.env.API_BASE_URL}/api/v1/mcp/${integrationAccount.integrationDefinition.slug}`;
}
// Prepare webhook payload
webhookPayload = {
event: payload.eventType,
user_id: payload.userId,
integration: {
id: integrationAccount.id,
provider: integrationAccount.integrationDefinition.slug,
mcp_endpoint: mcpEndpoint,
name: integrationAccount.integrationDefinition.name,
icon: integrationAccount.integrationDefinition.icon,
},
timestamp: new Date().toISOString(),
};
}
// Get all OAuth clients that:
@ -48,13 +89,15 @@ export const integrationWebhookTask = task({
// 2. Have webhook URLs configured
const oauthClients = await prisma.oAuthClientInstallation.findMany({
where: {
workspaceId: integrationAccount.workspaceId,
workspaceId: payload.workspaceId,
installedById: payload.userId,
isActive: true,
// Check if client has integration scope in allowedScopes
grantedScopes: {
contains: "integration",
},
oauthClient: {
clientType: "regular",
},
},
select: {
id: true,
@ -77,24 +120,6 @@ export const integrationWebhookTask = task({
return { success: true, message: "No OAuth clients to notify" };
}
const integrationConfig =
integrationAccount.integrationConfiguration as any;
// Prepare webhook payload
const webhookPayload = {
event: payload.eventType,
user_id: payload.userId,
integration: {
id: integrationAccount.id,
provider: integrationAccount.integrationDefinition.slug,
mcp_endpoint: integrationConfig.mcp
? `${process.env.API_BASE_URL}/api/v1/mcp/${integrationAccount.integrationDefinition.slug}`
: undefined,
name: integrationAccount.integrationDefinition.name,
icon: integrationAccount.integrationDefinition.icon,
},
timestamp: new Date().toISOString(),
};
// Convert OAuth clients to targets
const targets: WebhookTarget[] = oauthClients
.filter((client) => client.oauthClient?.webhookUrl)
@ -116,11 +141,6 @@ export const integrationWebhookTask = task({
logger.log(
`OAuth integration webhook delivery completed: ${successfulDeliveries}/${totalDeliveries} successful`,
{
integrationId: integrationAccount.id,
integrationProvider: integrationAccount.integrationDefinition.slug,
userId: payload.userId,
},
);
return {
@ -151,12 +171,14 @@ export async function triggerIntegrationWebhook(
integrationAccountId: string,
userId: string,
eventType: WebhookEventType,
workspaceId: string,
) {
try {
await integrationWebhookTask.trigger({
integrationAccountId,
userId,
eventType,
workspaceId,
});
logger.log(
`Triggered OAuth integration webhook delivery for integration account ${integrationAccountId}`,

View File

@ -51,9 +51,38 @@ export const webhookDeliveryTask = task({
workspaceId: payload.workspaceId,
isActive: true,
},
select: {
id: true,
url: true,
secret: true,
},
});
if (webhooks.length === 0) {
const oauthClients = await prisma.oAuthClientInstallation.findMany({
where: {
workspaceId: activity.workspaceId,
installedById: activity.workspace.userId!,
isActive: true,
grantedScopes: {
contains: "integration",
},
oauthClient: {
clientType: "regular",
},
},
select: {
id: true,
oauthClient: {
select: {
clientId: true,
webhookUrl: true,
webhookSecret: true,
},
},
},
});
if (webhooks.length === 0 && oauthClients.length === 0) {
logger.log(
`No active webhooks found for workspace ${payload.workspaceId}`,
);
@ -87,7 +116,16 @@ export const webhookDeliveryTask = task({
};
// Convert webhooks to targets using common utils
const targets = prepareWebhookTargets(webhooks);
const targets = prepareWebhookTargets(
[...webhooks, ...oauthClients].map((webhook) => ({
url: "url" in webhook ? webhook.url : webhook.oauthClient.webhookUrl!,
secret:
"secret" in webhook
? webhook.secret
: webhook.oauthClient.webhookSecret,
id: webhook.id,
})),
);
// Use common delivery function
const result = await deliverWebhook({

View File

@ -27,16 +27,16 @@ export default defineConfig({
build: {
extensions: [
syncEnvVars(() => ({
ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY as string,
API_BASE_URL: process.env.API_BASE_URL as string,
DATABASE_URL: process.env.DATABASE_URL as string,
EMBEDDING_MODEL: process.env.EMBEDDING_MODEL as string,
ENCRYPTION_KEY: process.env.ENCRYPTION_KEY as string,
MODEL: process.env.MODEL ?? "gpt-4.1-2025-04-14",
NEO4J_PASSWORD: process.env.NEO4J_PASSWORD as string,
NEO4J_URI: process.env.NEO4J_URI as string,
NEO4J_USERNAME: process.env.NEO4J_USERNAME as string,
OPENAI_API_KEY: process.env.OPENAI_API_KEY as string,
// ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY as string,
// API_BASE_URL: process.env.API_BASE_URL as string,
// DATABASE_URL: process.env.DATABASE_URL as string,
// EMBEDDING_MODEL: process.env.EMBEDDING_MODEL as string,
// ENCRYPTION_KEY: process.env.ENCRYPTION_KEY as string,
// MODEL: process.env.MODEL ?? "gpt-4.1-2025-04-14",
// NEO4J_PASSWORD: process.env.NEO4J_PASSWORD as string,
// NEO4J_URI: process.env.NEO4J_URI as string,
// NEO4J_USERNAME: process.env.NEO4J_USERNAME as string,
// OPENAI_API_KEY: process.env.OPENAI_API_KEY as string,
})),
prismaExtension({
schema: "prisma/schema.prisma",