fix: streaming

This commit is contained in:
Harshith Mullapudi 2025-10-26 12:14:48 +05:30
parent 6a05ea4f37
commit c1ccb2bb23
13 changed files with 61 additions and 63 deletions

View File

@ -10,6 +10,15 @@ interface AIConversationItemProps {
message: UIMessage; message: UIMessage;
} }
function getMessage(message: string) {
let finalMessage = message.replace("<final_response>", "");
finalMessage = finalMessage.replace("</final_response>", "");
finalMessage = finalMessage.replace("<question_response>", "");
finalMessage = finalMessage.replace("</question_response>", "");
return finalMessage;
}
const ConversationItemComponent = ({ message }: AIConversationItemProps) => { const ConversationItemComponent = ({ message }: AIConversationItemProps) => {
const isUser = message.role === "user" || false; const isUser = message.role === "user" || false;
const textPart = message.parts.find((part) => part.type === "text"); const textPart = message.parts.find((part) => part.type === "text");
@ -17,12 +26,12 @@ const ConversationItemComponent = ({ message }: AIConversationItemProps) => {
const editor = useEditor({ const editor = useEditor({
extensions: [...extensionsForConversation, skillExtension], extensions: [...extensionsForConversation, skillExtension],
editable: false, editable: false,
content: textPart ? textPart.text : "", content: textPart ? getMessage(textPart.text) : "",
}); });
useEffect(() => { useEffect(() => {
if (textPart) { if (textPart) {
editor?.commands.setContent(textPart.text); editor?.commands.setContent(getMessage(textPart.text));
} }
// eslint-disable-next-line react-hooks/exhaustive-deps // eslint-disable-next-line react-hooks/exhaustive-deps
}, [message]); }, [message]);

View File

@ -1,5 +1,4 @@
import { StopCondition, tool } from "ai"; import { type StopCondition } from "ai";
import z from "zod";
export const hasAnswer: StopCondition<any> = ({ steps }) => { export const hasAnswer: StopCondition<any> = ({ steps }) => {
return ( return (
@ -9,8 +8,7 @@ export const hasAnswer: StopCondition<any> = ({ steps }) => {
export const hasQuestion: StopCondition<any> = ({ steps }) => { export const hasQuestion: StopCondition<any> = ({ steps }) => {
return ( return (
steps.some((step) => step.text?.includes("</question_response>")) ?? steps.some((step) => step.text?.includes("</question_response>")) ?? false
false
); );
}; };
@ -163,19 +161,6 @@ CRITICAL:
</communication> </communication>
`; `;
export const fixedTools = {
progressUpdate: tool({
description:
"Send a progress update to the user about what has been discovered or will be done next in a crisp and user friendly way no technical terms",
inputSchema: z.object({
message: z.string(),
}),
execute: async ({ message }: { message: string }) => ({
message,
}),
}),
};
export function getReActPrompt( export function getReActPrompt(
metadata?: { source?: string; url?: string; pageTitle?: string }, metadata?: { source?: string; url?: string; pageTitle?: string },
intentOverride?: string, intentOverride?: string,

View File

@ -15,6 +15,7 @@ import type { z } from "zod";
import type { IngestBodyRequest } from "~/jobs/ingest/ingest-episode.logic"; import type { IngestBodyRequest } from "~/jobs/ingest/ingest-episode.logic";
import type { CreateConversationTitlePayload } from "~/jobs/conversation/create-title.logic"; import type { CreateConversationTitlePayload } from "~/jobs/conversation/create-title.logic";
import type { SessionCompactionPayload } from "~/jobs/session/session-compaction.logic"; import type { SessionCompactionPayload } from "~/jobs/session/session-compaction.logic";
import { type SpaceAssignmentPayload } from "~/trigger/spaces/space-assignment";
type QueueProvider = "trigger" | "bullmq"; type QueueProvider = "trigger" | "bullmq";
@ -145,12 +146,9 @@ export async function enqueueSessionCompaction(
* Enqueue space assignment job * Enqueue space assignment job
* (Helper for common job logic to call) * (Helper for common job logic to call)
*/ */
export async function enqueueSpaceAssignment(payload: { export async function enqueueSpaceAssignment(
userId: string; payload: SpaceAssignmentPayload,
workspaceId: string; ): Promise<void> {
mode: "episode";
episodeIds: string[];
}): Promise<void> {
const provider = env.QUEUE_PROVIDER as QueueProvider; const provider = env.QUEUE_PROVIDER as QueueProvider;
if (provider === "trigger") { if (provider === "trigger") {

View File

@ -6,7 +6,6 @@ import {
experimental_createMCPClient as createMCPClient, experimental_createMCPClient as createMCPClient,
generateId, generateId,
stepCountIs, stepCountIs,
StopCondition,
} from "ai"; } from "ai";
import { z } from "zod"; import { z } from "zod";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
@ -21,7 +20,11 @@ import { getModel } from "~/lib/model.server";
import { UserTypeEnum } from "@core/types"; import { UserTypeEnum } from "@core/types";
import { nanoid } from "nanoid"; import { nanoid } from "nanoid";
import { getOrCreatePersonalAccessToken } from "~/services/personalAccessToken.server"; import { getOrCreatePersonalAccessToken } from "~/services/personalAccessToken.server";
import { hasAnswer, hasQuestion, REACT_SYSTEM_PROMPT } from "~/lib/prompt.server"; import {
hasAnswer,
hasQuestion,
REACT_SYSTEM_PROMPT,
} from "~/lib/prompt.server";
import { enqueueCreateConversationTitle } from "~/lib/queue-adapter.server"; import { enqueueCreateConversationTitle } from "~/lib/queue-adapter.server";
import { env } from "~/env.server"; import { env } from "~/env.server";
@ -109,8 +112,6 @@ const { loader, action } = createHybridActionApiRoute(
messages: finalMessages, messages: finalMessages,
}); });
const result = streamText({ const result = streamText({
model: getModel() as LanguageModel, model: getModel() as LanguageModel,
messages: [ messages: [
@ -121,7 +122,7 @@ const { loader, action } = createHybridActionApiRoute(
...convertToModelMessages(validatedMessages), ...convertToModelMessages(validatedMessages),
], ],
tools, tools,
stopWhen: [stepCountIs(10), hasAnswer,hasQuestion], stopWhen: [stepCountIs(10), hasAnswer, hasQuestion],
}); });
result.consumeStream(); // no await result.consumeStream(); // no await
@ -129,7 +130,6 @@ const { loader, action } = createHybridActionApiRoute(
return result.toUIMessageStreamResponse({ return result.toUIMessageStreamResponse({
originalMessages: validatedMessages, originalMessages: validatedMessages,
onFinish: async ({ messages }) => { onFinish: async ({ messages }) => {
console.log(JSON.stringify(messages));
const lastMessage = messages.pop(); const lastMessage = messages.pop();
let message = ""; let message = "";
lastMessage?.parts.forEach((part) => { lastMessage?.parts.forEach((part) => {

View File

@ -10,7 +10,6 @@ import {
import { import {
convertToModelMessages, convertToModelMessages,
type CoreMessage,
generateId, generateId,
generateText, generateText,
type LanguageModel, type LanguageModel,
@ -109,11 +108,17 @@ const { action, loader } = createActionApiRoute(
const tools = { const tools = {
searchMemory: searchTool, searchMemory: searchTool,
}; };
// Build initial messages with ReAct prompt // Build initial messages with ReAct prompt
const initialMessages = [ const initialMessages = [
{ {
role: "user", role: "user",
parts: [{ type: "text", text: `CONTENT TO ANALYZE:\n${body.content}\n\nPlease search my memory for relevant context and synthesize what you find.` }], parts: [
{
type: "text",
text: `CONTENT TO ANALYZE:\n${body.content}\n\nPlease search my memory for relevant context and synthesize what you find.`,
},
],
id: generateId(), id: generateId(),
}, },
]; ];
@ -134,7 +139,7 @@ const { action, loader } = createActionApiRoute(
...convertToModelMessages(validatedMessages), ...convertToModelMessages(validatedMessages),
], ],
tools, tools,
stopWhen: [stepCountIs(10), hasAnswer], stopWhen: [hasAnswer, stepCountIs(10)],
}); });
return result.toUIMessageStreamResponse({ return result.toUIMessageStreamResponse({
@ -151,7 +156,7 @@ const { action, loader } = createActionApiRoute(
...convertToModelMessages(validatedMessages), ...convertToModelMessages(validatedMessages),
], ],
tools, tools,
stopWhen: [stepCountIs(10), hasAnswer], stopWhen: [hasAnswer, stepCountIs(10)],
}); });
await deletePersonalAccessToken(pat?.id); await deletePersonalAccessToken(pat?.id);

View File

@ -3,7 +3,7 @@ import { createHybridActionApiRoute } from "~/services/routeBuilders/apiBuilder.
import { SpaceService } from "~/services/space.server"; import { SpaceService } from "~/services/space.server";
import { json } from "@remix-run/node"; import { json } from "@remix-run/node";
import { logger } from "~/services/logger.service"; import { logger } from "~/services/logger.service";
import { triggerSpaceAssignment } from "~/trigger/spaces/space-assignment"; import { enqueueSpaceAssignment } from "~/lib/queue-adapter.server";
// Schema for space ID parameter // Schema for space ID parameter
const SpaceParamsSchema = z.object({ const SpaceParamsSchema = z.object({
@ -31,7 +31,7 @@ const { loader, action } = createHybridActionApiRoute(
// Trigger automatic episode assignment for the reset space // Trigger automatic episode assignment for the reset space
try { try {
await triggerSpaceAssignment({ await enqueueSpaceAssignment({
userId: userId, userId: userId,
workspaceId: space.workspaceId, workspaceId: space.workspaceId,
mode: "new_space", mode: "new_space",

View File

@ -1,8 +1,8 @@
import { z } from "zod"; import { z } from "zod";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { json } from "@remix-run/node"; import { json } from "@remix-run/node";
import { triggerSpaceAssignment } from "~/trigger/spaces/space-assignment";
import { prisma } from "~/db.server"; import { prisma } from "~/db.server";
import { enqueueSpaceAssignment } from "~/lib/queue-adapter.server";
// Schema for manual assignment trigger // Schema for manual assignment trigger
const ManualAssignmentSchema = z.object({ const ManualAssignmentSchema = z.object({
@ -38,7 +38,7 @@ const { action } = createActionApiRoute(
let taskRun; let taskRun;
// Direct LLM assignment trigger // Direct LLM assignment trigger
taskRun = await triggerSpaceAssignment({ taskRun = await enqueueSpaceAssignment({
userId, userId,
workspaceId: user?.Workspace?.id as string, workspaceId: user?.Workspace?.id as string,
mode: body.mode, mode: body.mode,
@ -49,7 +49,7 @@ const { action } = createActionApiRoute(
return json({ return json({
success: true, success: true,
message: `${body.mode} assignment task triggered successfully`, message: `${body.mode} assignment task triggered successfully`,
taskId: taskRun.id,
payload: { payload: {
userId, userId,
mode: body.mode, mode: body.mode,

View File

@ -55,9 +55,10 @@ export default function SingleConversation() {
}, },
}), }),
}); });
console.log("new", messages);
React.useEffect(() => { React.useEffect(() => {
if (conversation.ConversationHistory.length === 1) { if (messages.length === 1) {
regenerate(); regenerate();
} }
}, []); }, []);
@ -96,7 +97,7 @@ export default function SingleConversation() {
<div className="w-full max-w-[80ch] px-1 pr-2"> <div className="w-full max-w-[80ch] px-1 pr-2">
<ConversationTextarea <ConversationTextarea
className="bg-background-3 w-full border-1 border-gray-300" className="bg-background-3 w-full border-1 border-gray-300"
isLoading={status === "streaming"} isLoading={status === "streaming" || status === "submitted"}
onConversationCreated={(message) => { onConversationCreated={(message) => {
if (message) { if (message) {
sendMessage({ text: message }); sendMessage({ text: message });

View File

@ -6,7 +6,6 @@ import {
} from "@core/types"; } from "@core/types";
import { type Space } from "@prisma/client"; import { type Space } from "@prisma/client";
import { triggerSpaceAssignment } from "~/trigger/spaces/space-assignment";
import { import {
assignEpisodesToSpace, assignEpisodesToSpace,
createSpace, createSpace,
@ -18,6 +17,7 @@ import {
} from "./graphModels/space"; } from "./graphModels/space";
import { prisma } from "~/trigger/utils/prisma"; import { prisma } from "~/trigger/utils/prisma";
import { trackFeatureUsage } from "./telemetry.server"; import { trackFeatureUsage } from "./telemetry.server";
import { enqueueSpaceAssignment } from "~/lib/queue-adapter.server";
export class SpaceService { export class SpaceService {
/** /**
@ -69,7 +69,7 @@ export class SpaceService {
// Trigger automatic LLM assignment for the new space // Trigger automatic LLM assignment for the new space
try { try {
await triggerSpaceAssignment({ await enqueueSpaceAssignment({
userId: params.userId, userId: params.userId,
workspaceId: params.workspaceId, workspaceId: params.workspaceId,
mode: "new_space", mode: "new_space",

View File

@ -128,6 +128,7 @@ export async function trackEvent(
modelProvider: getModelProvider(), modelProvider: getModelProvider(),
embeddingModel: env.EMBEDDING_MODEL, embeddingModel: env.EMBEDDING_MODEL,
appEnv: env.APP_ENV, appEnv: env.APP_ENV,
appOrigin: env.APP_ORIGIN,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}; };
@ -175,6 +176,7 @@ export async function trackFeatureUsage(
event: feature, event: feature,
properties: { properties: {
...properties, ...properties,
appOrigin: env.APP_ORIGIN,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}, },
}); });
@ -204,6 +206,7 @@ export async function trackConfig(): Promise<void> {
appEnv: env.APP_ENV, appEnv: env.APP_ENV,
nodeEnv: env.NODE_ENV, nodeEnv: env.NODE_ENV,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
appOrigin: env.APP_ORIGIN,
}, },
}); });
} catch (error) { } catch (error) {
@ -231,6 +234,7 @@ export async function trackError(
properties: { properties: {
errorType: error.name, errorType: error.name,
errorMessage: error.message, errorMessage: error.message,
appOrigin: env.APP_ORIGIN,
stackTrace: error.stack, stackTrace: error.stack,
...context, ...context,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),

View File

@ -18,7 +18,7 @@ import type { CoreMessage } from "ai";
import { z } from "zod"; import { z } from "zod";
import { type Space } from "@prisma/client"; import { type Space } from "@prisma/client";
interface SpaceAssignmentPayload { export interface SpaceAssignmentPayload {
userId: string; userId: string;
workspaceId: string; workspaceId: string;
mode: "new_space" | "episode"; mode: "new_space" | "episode";

View File

@ -1,22 +1,16 @@
FROM neo4j:5 FROM neo4j:5.26.0
# Set environment variables for plugin versions # Manual installation of plugins with correct download URLs
# GDS 2.13 is compatible with Neo4j 5.26 # GDS 2.13.2 is compatible with Neo4j 5.26
# APOC 5.26.14 is the latest for Neo4j 5.x # APOC 5.26.0 matches Neo4j 5.26
ENV GDS_VERSION=2.13.0
ENV APOC_VERSION=5.26.0
# Install GDS and APOC plugins
RUN apt-get update && apt-get install -y curl && \ RUN apt-get update && apt-get install -y curl && \
curl -L https://github.com/neo4j/graph-data-science/releases/download/${GDS_VERSION}/neo4j-graph-data-science-${GDS_VERSION}.jar \ curl -L https://github.com/neo4j/graph-data-science/releases/download/2.13.2/neo4j-graph-data-science-2.13.2.jar \
-o /var/lib/neo4j/plugins/neo4j-graph-data-science-${GDS_VERSION}.jar && \ -o /var/lib/neo4j/plugins/neo4j-graph-data-science.jar && \
curl -L https://github.com/neo4j/apoc/releases/download/${APOC_VERSION}/apoc-${APOC_VERSION}-core.jar \ curl -L https://github.com/neo4j/apoc/releases/download/5.26.0/apoc-5.26.0-core.jar \
-o /var/lib/neo4j/plugins/apoc-${APOC_VERSION}-core.jar && \ -o /var/lib/neo4j/plugins/apoc-core.jar && \
apt-get clean && \ apt-get clean && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/* && \
chown -R neo4j:neo4j /var/lib/neo4j/plugins
# Set proper permissions
RUN chown -R neo4j:neo4j /var/lib/neo4j/plugins
# Default configuration for GDS and APOC # Default configuration for GDS and APOC
ENV NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.* ENV NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.*

View File

@ -42,6 +42,9 @@ services:
- FROM_EMAIL=${FROM_EMAIL} - FROM_EMAIL=${FROM_EMAIL}
- RESEND_API_KEY=${RESEND_API_KEY} - RESEND_API_KEY=${RESEND_API_KEY}
- COHERE_API_KEY=${COHERE_API_KEY} - COHERE_API_KEY=${COHERE_API_KEY}
- QUEUE_PROVIDER=${QUEUE_PROVIDER}
- TELEMETRY_ENABLED=${TELEMETRY_ENABLED}
- TELEMETRY_ANONYMOUS=${TELEMETRY_ANONYMOUS}
ports: ports:
- "3033:3000" - "3033:3000"
depends_on: depends_on:
@ -84,7 +87,7 @@ services:
neo4j: neo4j:
container_name: core-neo4j container_name: core-neo4j
image: neo4j:5 image: redplanethq/neo4j:0.1.0
environment: environment:
- NEO4J_AUTH=${NEO4J_AUTH} - NEO4J_AUTH=${NEO4J_AUTH}
- NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.* - NEO4J_dbms_security_procedures_unrestricted=gds.*,apoc.*
@ -94,7 +97,6 @@ services:
- NEO4J_apoc_import_file_use_neo4j_config=true - NEO4J_apoc_import_file_use_neo4j_config=true
- NEO4J_server_memory_heap_initial__size=2G - NEO4J_server_memory_heap_initial__size=2G
- NEO4J_server_memory_heap_max__size=4G - NEO4J_server_memory_heap_max__size=4G
- NEO4JLABS_PLUGINS=apoc,graph-data-science
ports: ports:
- "7474:7474" - "7474:7474"
- "7687:7687" - "7687:7687"