diff --git a/apps/webapp/app/lib/ingest.server.ts b/apps/webapp/app/lib/ingest.server.ts index a3847af..269b79a 100644 --- a/apps/webapp/app/lib/ingest.server.ts +++ b/apps/webapp/app/lib/ingest.server.ts @@ -8,6 +8,7 @@ import { EpisodeType } from "@core/types"; import { prisma } from "~/db.server"; import { IngestionStatus } from "@core/database"; import { logger } from "~/services/logger.service"; +import { posthogService } from "~/services/posthog.server"; const connection = new IORedis({ port: env.REDIS_PORT, @@ -20,6 +21,12 @@ const userQueues = new Map(); const userWorkers = new Map(); async function processUserJob(userId: string, job: any) { + const startTime = Date.now(); + const episodeLength = job.data.body.episodeBody?.length || 0; + const metadata = job.data.body.metadata || {}; + const source = job.data.body.source; + const spaceId = job.data.body.spaceId; + try { logger.log(`Processing job for user ${userId}`); @@ -30,6 +37,15 @@ async function processUserJob(userId: string, job: any) { }, }); + // Track ingestion start in PostHog + posthogService.capture("ingestion_started", userId, { + queue_id: job.data.queueId, + episode_length: episodeLength, + source, + space_id: spaceId, + ...metadata + }).catch(error => logger.error("Failed to track ingestion start", { error })); + const knowledgeGraphService = new KnowledgeGraphService(); const episodeDetails = await knowledgeGraphService.addEpisode({ @@ -45,7 +61,18 @@ async function processUserJob(userId: string, job: any) { }, }); - // your processing logic + // Track successful ingestion in PostHog + const processingTime = Date.now() - startTime; + posthogService.trackIngestion(userId, episodeLength, { + queue_id: job.data.queueId, + processing_time_ms: processingTime, + source, + space_id: spaceId, + entity_count: episodeDetails?.entities?.length || 0, + statement_count: episodeDetails?.statements?.length || 0, + ...metadata + }, true).catch(error => logger.error("Failed to track ingestion completion", { error })); + } catch (err: any) { await prisma.ingestionQueue.update({ where: { id: job.data.queueId }, @@ -55,6 +82,17 @@ async function processUserJob(userId: string, job: any) { }, }); + // Track failed ingestion in PostHog + const processingTime = Date.now() - startTime; + posthogService.trackIngestion(userId, episodeLength, { + queue_id: job.data.queueId, + processing_time_ms: processingTime, + error: err.message, + source, + space_id: spaceId, + ...metadata + }, false).catch(error => logger.error("Failed to track ingestion failure", { error })); + console.error(`Error processing job for user ${userId}:`, err); } } @@ -128,6 +166,16 @@ export const addToQueue = async ( }, ); + // Track ingestion queue event in PostHog + posthogService.capture("ingestion_queued", userId, { + queue_id: queuePersist.id, + episode_length: body.episodeBody?.length || 0, + source: body.source, + space_id: body.spaceId, + metadata: body.metadata || {}, + timestamp: new Date().toISOString(), + }).catch(error => logger.error("Failed to track ingestion queue event", { error })); + return { id: jobDetails.id, }; diff --git a/apps/webapp/app/services/posthog.server.test.ts b/apps/webapp/app/services/posthog.server.test.ts new file mode 100644 index 0000000..033b0c5 --- /dev/null +++ b/apps/webapp/app/services/posthog.server.test.ts @@ -0,0 +1,148 @@ +import { posthogService } from './posthog.server'; +import { vi, describe, it, expect, beforeEach, afterEach } from 'vitest'; +import fetch from 'node-fetch'; + +// Mock node-fetch +vi.mock('node-fetch'); + +// Mock environment variables +vi.mock('~/env.server', () => ({ + env: { + POSTHOG_PROJECT_KEY: 'test-api-key', + }, +})); + +// Mock logger +vi.mock('./logger.service', () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + log: vi.fn(), + }, +})); + +describe('PostHogService', () => { + beforeEach(() => { + vi.resetAllMocks(); + + // Default successful response + (fetch as unknown as jest.Mock).mockResolvedValue({ + ok: true, + status: 200, + statusText: 'OK', + }); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('should capture events with the correct payload structure', async () => { + const userId = 'test-user-id'; + const event = 'test-event'; + const properties = { test: 'property' }; + + await posthogService.capture(event, userId, properties); + + expect(fetch).toHaveBeenCalledTimes(1); + expect(fetch).toHaveBeenCalledWith('https://eu.posthog.com/capture/', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer test-api-key', + }, + body: expect.stringContaining(event), + }); + + const callPayload = JSON.parse((fetch as unknown as jest.Mock).mock.calls[0][1].body); + expect(callPayload.api_key).toBe('test-api-key'); + expect(callPayload.batch).toHaveLength(1); + expect(callPayload.batch[0].event).toBe(event); + expect(callPayload.batch[0].distinctId).toBe(userId); + expect(callPayload.batch[0].properties).toMatchObject({ + ...properties, + $lib: 'server', + $lib_version: '1.0.0', + }); + }); + + it('should track search events with appropriate properties', async () => { + const userId = 'test-user-id'; + const query = 'test search query'; + const options = { limit: 10 }; + const resultCounts = { result_count_total: 5 }; + + await posthogService.trackSearch(userId, query, options, resultCounts); + + expect(fetch).toHaveBeenCalledTimes(1); + + const callPayload = JSON.parse((fetch as unknown as jest.Mock).mock.calls[0][1].body); + expect(callPayload.batch[0].event).toBe('search'); + expect(callPayload.batch[0].distinctId).toBe(userId); + expect(callPayload.batch[0].properties).toMatchObject({ + query, + query_length: query.length, + limit: 10, + result_count_total: 5, + }); + }); + + it('should track ingestion events with appropriate properties', async () => { + const userId = 'test-user-id'; + const episodeLength = 1000; + const metadata = { source: 'test-source' }; + const success = true; + + await posthogService.trackIngestion(userId, episodeLength, metadata, success); + + expect(fetch).toHaveBeenCalledTimes(1); + + const callPayload = JSON.parse((fetch as unknown as jest.Mock).mock.calls[0][1].body); + expect(callPayload.batch[0].event).toBe('ingestion'); + expect(callPayload.batch[0].distinctId).toBe(userId); + expect(callPayload.batch[0].properties).toMatchObject({ + episode_length: 1000, + source: 'test-source', + success: true, + }); + }); + + it('should handle fetch errors gracefully', async () => { + (fetch as unknown as jest.Mock).mockRejectedValue(new Error('Network error')); + + const result = await posthogService.capture('test-event', 'test-user-id'); + + expect(result).toBe(false); + }); + + it('should handle API errors gracefully', async () => { + (fetch as unknown as jest.Mock).mockResolvedValue({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + }); + + const result = await posthogService.capture('test-event', 'test-user-id'); + + expect(result).toBe(false); + }); + + it('should not send events if no API key is provided', async () => { + // Override env mock for this test + vi.mock('~/env.server', () => ({ + env: { + POSTHOG_PROJECT_KEY: '', + }, + }), { virtual: true }); + + // Need to recreate the service to pick up the new env mock + const mockService = new (posthogService.constructor as any)(); + + const result = await mockService.capture('test-event', 'test-user-id'); + + expect(result).toBe(false); + expect(fetch).not.toHaveBeenCalled(); + }); +}); \ No newline at end of file diff --git a/apps/webapp/app/services/posthog.server.ts b/apps/webapp/app/services/posthog.server.ts new file mode 100644 index 0000000..b3e57c2 --- /dev/null +++ b/apps/webapp/app/services/posthog.server.ts @@ -0,0 +1,136 @@ +import { env } from "~/env.server"; +import { logger } from "./logger.service"; +import fetch from "node-fetch"; + +interface PostHogEvent { + event: string; + distinctId: string; + properties?: Record; + timestamp?: string; +} + +/** + * Server-side PostHog client for analytics tracking + * Provides methods to track events on the server without requiring the client-side JS + */ +export class PostHogService { + private readonly apiKey: string; + private readonly host: string; + private readonly enabled: boolean; + + constructor() { + this.apiKey = env.POSTHOG_PROJECT_KEY; + this.host = "https://eu.posthog.com"; + this.enabled = !!this.apiKey && this.apiKey.length > 0; + + if (!this.enabled) { + logger.warn("PostHog tracking is disabled. Set POSTHOG_PROJECT_KEY to enable."); + } + } + + /** + * Capture an event in PostHog + * @param event Event name + * @param distinctId User ID for identification + * @param properties Additional properties to track + * @returns Promise resolving to true if successful + */ + public async capture( + event: string, + distinctId: string, + properties: Record = {} + ): Promise { + if (!this.enabled) return false; + if (!distinctId) { + logger.warn("PostHog event capture failed: No distinctId provided"); + return false; + } + + try { + const eventData: PostHogEvent = { + event, + distinctId, + properties: { + ...properties, + $lib: "server", + $lib_version: "1.0.0", + }, + timestamp: new Date().toISOString(), + }; + + const response = await fetch(`${this.host}/capture/`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${this.apiKey}`, + }, + body: JSON.stringify({ + api_key: this.apiKey, + batch: [eventData], + }), + }); + + if (!response.ok) { + logger.error(`PostHog capture failed: ${response.status} ${response.statusText}`); + return false; + } + + logger.debug(`PostHog event captured: ${event}`, { + distinctId, + eventName: event + }); + return true; + } catch (error) { + logger.error("Error sending event to PostHog", { error }); + return false; + } + } + + /** + * Track search event in PostHog + * @param userId User ID + * @param query Search query + * @param options Search options + * @param resultCounts Result counts + * @returns Promise resolving to true if successful + */ + public async trackSearch( + userId: string, + query: string, + options: Record = {}, + resultCounts: Record = {} + ): Promise { + return this.capture("search", userId, { + query, + query_length: query.length, + ...options, + ...resultCounts, + timestamp: new Date().toISOString(), + }); + } + + /** + * Track ingestion event in PostHog + * @param userId User ID + * @param episodeLength Length of ingested content + * @param metadata Additional metadata + * @param success Whether ingestion succeeded + * @returns Promise resolving to true if successful + */ + public async trackIngestion( + userId: string, + episodeLength: number, + metadata: Record = {}, + success: boolean = true + ): Promise { + return this.capture("ingestion", userId, { + episode_length: episodeLength, + success, + ...metadata, + timestamp: new Date().toISOString(), + }); + } +} + +// Singleton instance for use across the application +export const posthogService = new PostHogService(); \ No newline at end of file diff --git a/apps/webapp/app/services/search.server.ts b/apps/webapp/app/services/search.server.ts index 335762c..6a5075d 100644 --- a/apps/webapp/app/services/search.server.ts +++ b/apps/webapp/app/services/search.server.ts @@ -2,6 +2,7 @@ import { openai } from "@ai-sdk/openai"; import type { StatementNode } from "@core/types"; import { embed } from "ai"; import { logger } from "./logger.service"; +import { posthogService } from "./posthog.server"; import { applyCrossEncoderReranking, applyWeightedRRF } from "./search/rerank"; import { getEpisodesByStatements, @@ -76,10 +77,37 @@ export class SearchService { // 3. Return top results const episodes = await getEpisodesByStatements(filteredResults); - return { + const results = { episodes: episodes.map((episode) => episode.content), facts: filteredResults.map((statement) => statement.fact), }; + + // Track search metrics in PostHog + posthogService.trackSearch(userId, query, + { + limit: opts.limit, + max_bfs_depth: opts.maxBfsDepth, + valid_at: opts.validAt.toISOString(), + include_invalidated: opts.includeInvalidated, + has_entity_filters: opts.entityTypes.length > 0, + has_predicate_filters: opts.predicateTypes.length > 0, + score_threshold: opts.scoreThreshold, + min_results: opts.minResults, + time_range: opts.startTime && opts.endTime ? + (new Date(opts.endTime).getTime() - new Date(opts.startTime || 0).getTime()) / (1000 * 60 * 60 * 24) : null + }, + { + result_count_total: results.episodes.length + results.facts.length, + result_count_episodes: results.episodes.length, + result_count_facts: results.facts.length, + result_count_bm25: bm25Results.length, + result_count_vector: vectorResults.length, + result_count_bfs: bfsResults.length, + result_count_after_filtering: filteredResults.length + } + ).catch(error => logger.error("Failed to track search metrics", { error })); + + return results; } /**