diff --git a/agent/agent.py b/agent/agent.py index 995579e..0cb8976 100644 --- a/agent/agent.py +++ b/agent/agent.py @@ -2,7 +2,7 @@ import json import logging -from typing import Any +from typing import Any, AsyncGenerator from infrastructure.persistence import get_memory @@ -183,3 +183,186 @@ class Agent: memory = get_memory() memory.episodic.add_error(tool_name, str(e)) return {"error": "execution_failed", "message": str(e), "tool": tool_name} + + async def step_streaming( + self, user_input: str, completion_id: str, created_ts: int, model: str + ) -> AsyncGenerator[dict[str, Any], None]: + """ + Execute agent step with streaming support for LibreChat. + + Yields SSE chunks for tool calls and final response. + + Args: + user_input: User's message + completion_id: Completion ID for the response + created_ts: Timestamp for the response + model: Model name + + Yields: + SSE chunks in OpenAI format + """ + memory = get_memory() + + # Add user message to history + memory.stm.add_message("user", user_input) + memory.save() + + # Build initial messages + system_prompt = self.prompt_builder.build_system_prompt() + messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] + + # Add conversation history + history = memory.stm.get_recent_history(settings.max_history_messages) + messages.extend(history) + + # Add unread events if any + unread_events = memory.episodic.get_unread_events() + if unread_events: + events_text = "\n".join( + [f"- {e['type']}: {e['data']}" for e in unread_events] + ) + messages.append( + {"role": "system", "content": f"Background events:\n{events_text}"} + ) + + # Get tools specification for OpenAI format + tools_spec = self.prompt_builder.build_tools_spec() + + # Tool execution loop + for iteration in range(self.max_tool_iterations): + # Call LLM with tools + llm_result = self.llm.complete(messages, tools=tools_spec) + + # Handle both tuple (response, usage) and dict response + if isinstance(llm_result, tuple): + response_message, usage = llm_result + else: + response_message = llm_result + + # Check if there are tool calls + tool_calls = response_message.get("tool_calls") + + if not tool_calls: + # No tool calls, this is the final response + final_content = response_message.get("content", "") + memory.stm.add_message("assistant", final_content) + memory.save() + + # Stream the final response + yield { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created_ts, + "model": model, + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": final_content}, + "finish_reason": "stop", + } + ], + } + return + + # Stream tool calls + for tool_call in tool_calls: + function = tool_call.get("function", {}) + tool_name = function.get("name", "") + tool_args = function.get("arguments", "{}") + + # Yield chunk indicating tool call + yield { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created_ts, + "model": model, + "choices": [ + { + "index": 0, + "delta": { + "tool_calls": [ + { + "index": 0, + "id": tool_call.get("id"), + "type": "function", + "function": { + "name": tool_name, + "arguments": tool_args, + }, + } + ] + }, + "finish_reason": None, + } + ], + } + + # Add assistant message with tool calls to conversation + messages.append(response_message) + + # Execute each tool call and stream results + for tool_call in tool_calls: + tool_result = self._execute_tool_call(tool_call) + function = tool_call.get("function", {}) + tool_name = function.get("name", "") + + # Add tool result to messages + messages.append( + { + "tool_call_id": tool_call.get("id"), + "role": "tool", + "name": tool_name, + "content": json.dumps(tool_result, ensure_ascii=False), + } + ) + + # Stream tool result as content + result_text = f"\n🔧 {tool_name}: {json.dumps(tool_result, ensure_ascii=False)}\n" + yield { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created_ts, + "model": model, + "choices": [ + { + "index": 0, + "delta": {"content": result_text}, + "finish_reason": None, + } + ], + } + + # Max iterations reached, force final response + messages.append( + { + "role": "system", + "content": "Please provide a final response to the user without using any more tools.", + } + ) + + llm_result = self.llm.complete(messages) + if isinstance(llm_result, tuple): + final_message, usage = llm_result + else: + final_message = llm_result + + final_response = final_message.get( + "content", "I've completed the requested actions." + ) + memory.stm.add_message("assistant", final_response) + memory.save() + + # Stream final response + yield { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created_ts, + "model": model, + "choices": [ + { + "index": 0, + "delta": {"content": final_response}, + "finish_reason": "stop", + } + ], + } diff --git a/app.py b/app.py index 1169fe5..e560ac8 100644 --- a/app.py +++ b/app.py @@ -170,19 +170,19 @@ async def chat_completions(chat_request: ChatCompletionRequest): f"Chat request - stream={chat_request.stream}, input_length={len(user_input)}" ) - try: - answer = agent.step(user_input) - except LLMAPIError as e: - logger.error(f"LLM API error: {e}") - raise HTTPException(status_code=502, detail=f"LLM API error: {e}") - except Exception as e: - logger.error(f"Agent error: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Internal agent error") - created_ts = int(time.time()) completion_id = f"chatcmpl-{uuid.uuid4().hex}" if not chat_request.stream: + try: + answer = agent.step(user_input) + except LLMAPIError as e: + logger.error(f"LLM API error: {e}") + raise HTTPException(status_code=502, detail=f"LLM API error: {e}") + except Exception as e: + logger.error(f"Agent error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Internal agent error") + return JSONResponse( { "id": completion_id, @@ -205,20 +205,44 @@ async def chat_completions(chat_request: ChatCompletionRequest): ) async def event_generator(): - chunk = { - "id": completion_id, - "object": "chat.completion.chunk", - "created": created_ts, - "model": chat_request.model, - "choices": [ - { - "index": 0, - "delta": {"role": "assistant", "content": answer or ""}, - "finish_reason": "stop", - } - ], - } - yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" - yield "data: [DONE]\n\n" + try: + # Stream the agent execution + async for chunk in agent.step_streaming(user_input, completion_id, created_ts, chat_request.model): + yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" + yield "data: [DONE]\n\n" + except LLMAPIError as e: + logger.error(f"LLM API error: {e}") + error_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created_ts, + "model": chat_request.model, + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": f"Error: {e}"}, + "finish_reason": "stop", + } + ], + } + yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n" + yield "data: [DONE]\n\n" + except Exception as e: + logger.error(f"Agent error: {e}", exc_info=True) + error_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created_ts, + "model": chat_request.model, + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": "Internal agent error"}, + "finish_reason": "stop", + } + ], + } + yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n" + yield "data: [DONE]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream")