Recovered tests

This commit is contained in:
2025-12-06 23:55:21 +01:00
parent 9ca31e45e0
commit 5b71233fb0
11 changed files with 3420 additions and 555 deletions

View File

@@ -0,0 +1,256 @@
"""Pytest configuration and shared fixtures."""
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, MagicMock
from infrastructure.persistence import Memory, init_memory, set_memory, get_memory
from infrastructure.persistence.memory import (
LongTermMemory,
ShortTermMemory,
EpisodicMemory,
)
@pytest.fixture
def temp_dir():
"""Create a temporary directory for tests."""
dirpath = tempfile.mkdtemp()
yield Path(dirpath)
shutil.rmtree(dirpath)
@pytest.fixture
def memory(temp_dir):
"""Create a fresh Memory instance for testing."""
mem = Memory(storage_dir=str(temp_dir))
set_memory(mem)
yield mem
@pytest.fixture
def memory_with_config(memory):
"""Memory with pre-configured folders."""
memory.ltm.set_config("download_folder", "/tmp/downloads")
memory.ltm.set_config("movie_folder", "/tmp/movies")
memory.ltm.set_config("tvshow_folder", "/tmp/tvshows")
memory.ltm.set_config("torrent_folder", "/tmp/torrents")
return memory
@pytest.fixture
def memory_with_search_results(memory):
"""Memory with pre-populated search results."""
memory.episodic.store_search_results(
query="Inception 1080p",
results=[
{
"name": "Inception.2010.1080p.BluRay.x264",
"size": "2.5 GB",
"seeders": 150,
"leechers": 10,
"magnet": "magnet:?xt=urn:btih:abc123",
"tracker": "ThePirateBay",
},
{
"name": "Inception.2010.1080p.WEB-DL.x265",
"size": "1.8 GB",
"seeders": 80,
"leechers": 5,
"magnet": "magnet:?xt=urn:btih:def456",
"tracker": "1337x",
},
{
"name": "Inception.2010.720p.BluRay",
"size": "1.2 GB",
"seeders": 45,
"leechers": 2,
"magnet": "magnet:?xt=urn:btih:ghi789",
"tracker": "RARBG",
},
],
search_type="torrent",
)
return memory
@pytest.fixture
def memory_with_history(memory):
"""Memory with conversation history."""
memory.stm.add_message("user", "Hello")
memory.stm.add_message("assistant", "Hi! How can I help you?")
memory.stm.add_message("user", "Find me Inception")
memory.stm.add_message("assistant", "I found Inception (2010)...")
return memory
@pytest.fixture
def memory_with_library(memory):
"""Memory with movies in library."""
memory.ltm.library["movies"] = [
{
"imdb_id": "tt1375666",
"title": "Inception",
"release_year": 2010,
"quality": "1080p",
"file_path": "/movies/Inception.2010.1080p.mkv",
"added_at": "2024-01-15T10:30:00",
},
{
"imdb_id": "tt0816692",
"title": "Interstellar",
"release_year": 2014,
"quality": "4K",
"file_path": "/movies/Interstellar.2014.4K.mkv",
"added_at": "2024-01-16T14:20:00",
},
]
memory.ltm.library["tv_shows"] = [
{
"imdb_id": "tt0944947",
"title": "Game of Thrones",
"seasons_count": 8,
"status": "ended",
"added_at": "2024-01-10T09:00:00",
},
]
return memory
@pytest.fixture
def mock_llm():
"""Create a mock LLM client."""
llm = Mock()
llm.complete = Mock(return_value="I found what you're looking for!")
return llm
@pytest.fixture
def mock_llm_with_tool_call():
"""Create a mock LLM that returns a tool call then a response."""
llm = Mock()
llm.complete = Mock(
side_effect=[
'{"thought": "Searching", "action": {"name": "find_torrents", "args": {"media_title": "Inception"}}}',
"I found 3 torrents for Inception!",
]
)
return llm
@pytest.fixture
def mock_tmdb_client():
"""Create a mock TMDB client."""
client = Mock()
client.search_movie = Mock(
return_value=Mock(
results=[
Mock(
id=27205,
title="Inception",
release_date="2010-07-16",
overview="A thief who steals corporate secrets...",
)
]
)
)
client.get_external_ids = Mock(return_value={"imdb_id": "tt1375666"})
return client
@pytest.fixture
def mock_knaben_client():
"""Create a mock Knaben client."""
client = Mock()
client.search = Mock(
return_value=[
Mock(
title="Inception.2010.1080p.BluRay",
size="2.5 GB",
seeders=150,
leechers=10,
magnet="magnet:?xt=urn:btih:abc123",
info_hash="abc123",
tracker="TPB",
upload_date="2024-01-01",
category="Movies",
),
]
)
return client
@pytest.fixture
def mock_qbittorrent_client():
"""Create a mock qBittorrent client."""
client = Mock()
client.add_torrent = Mock(return_value=True)
client.get_torrents = Mock(return_value=[])
return client
@pytest.fixture
def real_folder(temp_dir):
"""Create a real folder structure for filesystem tests."""
downloads = temp_dir / "downloads"
movies = temp_dir / "movies"
tvshows = temp_dir / "tvshows"
downloads.mkdir()
movies.mkdir()
tvshows.mkdir()
# Create some test files
(downloads / "test_movie.mkv").touch()
(downloads / "test_series").mkdir()
(downloads / "test_series" / "episode1.mkv").touch()
return {
"root": temp_dir,
"downloads": downloads,
"movies": movies,
"tvshows": tvshows,
}
@pytest.fixture(scope="session", autouse=True)
def mock_deepseek_globally():
"""
Mock DeepSeekClient globally before any imports happen.
This prevents real API calls in all tests.
"""
import sys
from unittest.mock import Mock, MagicMock
# Create a mock module for deepseek
mock_deepseek_module = MagicMock()
class MockDeepSeekClient:
def __init__(self, *args, **kwargs):
self.complete = Mock(return_value="Mocked LLM response")
mock_deepseek_module.DeepSeekClient = MockDeepSeekClient
# Inject the mock before the real module is imported
sys.modules['agent.llm.deepseek'] = mock_deepseek_module
yield
# Cleanup (optional, but good practice)
if 'agent.llm.deepseek' in sys.modules:
del sys.modules['agent.llm.deepseek']
@pytest.fixture
def mock_agent_step():
"""
Fixture to easily mock the agent's step method in API tests.
Returns a context manager that patches app.agent.step.
"""
from unittest.mock import patch
def _mock_step(return_value="Mocked agent response"):
return patch("app.agent.step", return_value=return_value)
return _mock_step

View File

@@ -0,0 +1,453 @@
"""Edge case tests for the Agent."""
import pytest
import json
from unittest.mock import Mock, patch
from agent.agent import Agent
from infrastructure.persistence import get_memory
class TestParseIntentEdgeCases:
"""Edge case tests for _parse_intent."""
def test_nested_json(self, memory, mock_llm):
"""Should handle deeply nested JSON."""
agent = Agent(llm=mock_llm)
text = '''{"thought": "test", "action": {"name": "test", "args": {"nested": {"deep": {"value": 1}}}}}'''
intent = agent._parse_intent(text)
assert intent is not None
assert intent["action"]["args"]["nested"]["deep"]["value"] == 1
def test_json_with_unicode(self, memory, mock_llm):
"""Should handle unicode in JSON."""
agent = Agent(llm=mock_llm)
text = '{"thought": "日本語", "action": {"name": "test", "args": {"title": "Amélie"}}}'
intent = agent._parse_intent(text)
assert intent is not None
assert intent["thought"] == "日本語"
def test_json_with_escaped_characters(self, memory, mock_llm):
"""Should handle escaped characters."""
agent = Agent(llm=mock_llm)
text = r'{"thought": "test \"quoted\"", "action": {"name": "test", "args": {}}}'
intent = agent._parse_intent(text)
assert intent is not None
assert 'quoted' in intent["thought"]
def test_json_with_newlines(self, memory, mock_llm):
"""Should handle JSON with newlines."""
agent = Agent(llm=mock_llm)
text = '''{
"thought": "test",
"action": {
"name": "test",
"args": {}
}
}'''
intent = agent._parse_intent(text)
assert intent is not None
def test_multiple_json_objects(self, memory, mock_llm):
"""Should extract first valid JSON."""
agent = Agent(llm=mock_llm)
text = '''Here's the first: {"thought": "1", "action": {"name": "first", "args": {}}}
And second: {"thought": "2", "action": {"name": "second", "args": {}}}'''
intent = agent._parse_intent(text)
# May return first valid JSON or None depending on implementation
assert intent is None or intent is not None
def test_json_with_array_action(self, memory, mock_llm):
"""Should reject action as array."""
agent = Agent(llm=mock_llm)
text = '{"thought": "test", "action": ["not", "valid"]}'
intent = agent._parse_intent(text)
assert intent is None
def test_json_with_numeric_action_name(self, memory, mock_llm):
"""Should reject numeric action name."""
agent = Agent(llm=mock_llm)
text = '{"thought": "test", "action": {"name": 123, "args": {}}}'
intent = agent._parse_intent(text)
assert intent is None
def test_json_with_null_values(self, memory, mock_llm):
"""Should handle null values."""
agent = Agent(llm=mock_llm)
text = '{"thought": null, "action": {"name": "test", "args": null}}'
intent = agent._parse_intent(text)
assert intent is not None
def test_truncated_json(self, memory, mock_llm):
"""Should handle truncated JSON."""
agent = Agent(llm=mock_llm)
text = '{"thought": "test", "action": {"name": "test", "args":'
intent = agent._parse_intent(text)
assert intent is None
def test_json_with_comments(self, memory, mock_llm):
"""Should handle JSON-like text with comments."""
agent = Agent(llm=mock_llm)
# JSON doesn't support comments, but LLM might add them
text = '''// This is a comment
{"thought": "test", "action": {"name": "test", "args": {}}}'''
intent = agent._parse_intent(text)
# Should still extract the JSON
assert intent is not None
def test_empty_string(self, memory, mock_llm):
"""Should handle empty string."""
agent = Agent(llm=mock_llm)
intent = agent._parse_intent("")
assert intent is None
def test_only_whitespace(self, memory, mock_llm):
"""Should handle whitespace-only string."""
agent = Agent(llm=mock_llm)
intent = agent._parse_intent(" \n\t ")
assert intent is None
def test_json_in_markdown_code_block(self, memory, mock_llm):
"""Should extract JSON from markdown code block."""
agent = Agent(llm=mock_llm)
text = '''Here's the action:
```json
{"thought": "test", "action": {"name": "test", "args": {}}}
```'''
intent = agent._parse_intent(text)
assert intent is not None
class TestExecuteActionEdgeCases:
"""Edge case tests for _execute_action."""
def test_tool_returns_none(self, memory, mock_llm):
"""Should handle tool returning None."""
agent = Agent(llm=mock_llm)
# Mock a tool that returns None
agent.tools["test_tool"] = Mock()
agent.tools["test_tool"].func = Mock(return_value=None)
intent = {"action": {"name": "test_tool", "args": {}}}
result = agent._execute_action(intent)
# May return None or error dict
assert result is None or isinstance(result, dict)
def test_tool_raises_keyboard_interrupt(self, memory, mock_llm):
"""Should propagate KeyboardInterrupt."""
agent = Agent(llm=mock_llm)
agent.tools["test_tool"] = Mock()
agent.tools["test_tool"].func = Mock(side_effect=KeyboardInterrupt())
intent = {"action": {"name": "test_tool", "args": {}}}
with pytest.raises(KeyboardInterrupt):
agent._execute_action(intent)
def test_tool_with_extra_args(self, memory, mock_llm, real_folder):
"""Should handle extra arguments gracefully."""
agent = Agent(llm=mock_llm)
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
intent = {
"action": {
"name": "list_folder",
"args": {
"folder_type": "download",
"extra_arg": "should be ignored",
},
}
}
result = agent._execute_action(intent)
# Should fail with bad_args since extra_arg is not expected
assert result.get("error") == "bad_args"
def test_tool_with_wrong_type_args(self, memory, mock_llm):
"""Should handle wrong argument types."""
agent = Agent(llm=mock_llm)
intent = {
"action": {
"name": "get_torrent_by_index",
"args": {"index": "not an int"},
}
}
result = agent._execute_action(intent)
# Should handle gracefully
assert "error" in result or "status" in result
def test_action_with_empty_name(self, memory, mock_llm):
"""Should handle empty action name."""
agent = Agent(llm=mock_llm)
intent = {"action": {"name": "", "args": {}}}
result = agent._execute_action(intent)
assert result["error"] == "unknown_tool"
def test_action_with_whitespace_name(self, memory, mock_llm):
"""Should handle whitespace action name."""
agent = Agent(llm=mock_llm)
intent = {"action": {"name": " ", "args": {}}}
result = agent._execute_action(intent)
assert result["error"] == "unknown_tool"
class TestStepEdgeCases:
"""Edge case tests for step method."""
def test_step_with_empty_input(self, memory, mock_llm):
"""Should handle empty user input."""
mock_llm.complete.return_value = "I didn't receive any input."
agent = Agent(llm=mock_llm)
response = agent.step("")
assert response is not None
def test_step_with_very_long_input(self, memory, mock_llm):
"""Should handle very long user input."""
mock_llm.complete.return_value = "Response"
agent = Agent(llm=mock_llm)
long_input = "x" * 100000
response = agent.step(long_input)
assert response is not None
def test_step_with_unicode_input(self, memory, mock_llm):
"""Should handle unicode input."""
mock_llm.complete.return_value = "日本語の応答"
agent = Agent(llm=mock_llm)
response = agent.step("日本語の質問")
assert response == "日本語の応答"
def test_step_llm_returns_empty(self, memory, mock_llm):
"""Should handle LLM returning empty string."""
mock_llm.complete.return_value = ""
agent = Agent(llm=mock_llm)
response = agent.step("Hello")
assert response == ""
def test_step_llm_returns_only_whitespace(self, memory, mock_llm):
"""Should handle LLM returning only whitespace."""
mock_llm.complete.return_value = " \n\t "
agent = Agent(llm=mock_llm)
response = agent.step("Hello")
# Whitespace is not a tool call, so it's returned as-is
assert response.strip() == ""
def test_step_llm_raises_exception(self, memory, mock_llm):
"""Should propagate LLM exceptions."""
mock_llm.complete.side_effect = Exception("LLM Error")
agent = Agent(llm=mock_llm)
with pytest.raises(Exception, match="LLM Error"):
agent.step("Hello")
def test_step_tool_loop_with_same_tool(self, memory, mock_llm):
"""Should handle tool calling same tool repeatedly."""
call_count = [0]
def mock_complete(messages):
call_count[0] += 1
if call_count[0] <= 3:
return '{"thought": "loop", "action": {"name": "list_folder", "args": {"folder_type": "download"}}}'
return "Done looping"
mock_llm.complete.side_effect = mock_complete
agent = Agent(llm=mock_llm, max_tool_iterations=3)
response = agent.step("Loop test")
# Should stop after max iterations
assert call_count[0] == 4 # 3 tool calls + 1 final response
def test_step_preserves_history_order(self, memory, mock_llm):
"""Should preserve message order in history."""
mock_llm.complete.return_value = "Response"
agent = Agent(llm=mock_llm)
agent.step("First")
agent.step("Second")
agent.step("Third")
mem = get_memory()
history = mem.stm.get_recent_history(10)
# Should be in order: First, Response, Second, Response, Third, Response
user_messages = [h["content"] for h in history if h["role"] == "user"]
assert user_messages == ["First", "Second", "Third"]
def test_step_with_pending_question(self, memory, mock_llm):
"""Should include pending question in context."""
memory.episodic.set_pending_question(
"Which one?",
[{"index": 1, "label": "Option 1"}],
{},
)
mock_llm.complete.return_value = "I see you have a pending question."
agent = Agent(llm=mock_llm)
response = agent.step("Hello")
# The prompt should have included the pending question
call_args = mock_llm.complete.call_args[0][0]
system_prompt = call_args[0]["content"]
assert "PENDING QUESTION" in system_prompt
def test_step_with_active_downloads(self, memory, mock_llm):
"""Should include active downloads in context."""
memory.episodic.add_active_download({
"task_id": "123",
"name": "Movie.mkv",
"progress": 50,
})
mock_llm.complete.return_value = "I see you have an active download."
agent = Agent(llm=mock_llm)
response = agent.step("Hello")
call_args = mock_llm.complete.call_args[0][0]
system_prompt = call_args[0]["content"]
assert "ACTIVE DOWNLOADS" in system_prompt
def test_step_clears_events_after_notification(self, memory, mock_llm):
"""Should mark events as read after notification."""
memory.episodic.add_background_event("test_event", {"data": "test"})
mock_llm.complete.return_value = "Response"
agent = Agent(llm=mock_llm)
agent.step("Hello")
# Events should be marked as read
unread = memory.episodic.get_unread_events()
assert len(unread) == 0
class TestAgentConcurrencyEdgeCases:
"""Edge case tests for concurrent access."""
def test_multiple_agents_same_memory(self, memory, mock_llm):
"""Should handle multiple agents with same memory."""
mock_llm.complete.return_value = "Response"
agent1 = Agent(llm=mock_llm)
agent2 = Agent(llm=mock_llm)
agent1.step("From agent 1")
agent2.step("From agent 2")
mem = get_memory()
history = mem.stm.get_recent_history(10)
# Both should have added to history
assert len(history) == 4 # 2 user + 2 assistant
def test_tool_modifies_memory_during_step(self, memory, mock_llm, real_folder):
"""Should handle memory modifications during step."""
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
mock_llm.complete.side_effect = [
'{"thought": "set path", "action": {"name": "set_path_for_folder", "args": {"folder_name": "movie", "path_value": "' + str(real_folder["movies"]) + '"}}}',
"Path set successfully.",
]
agent = Agent(llm=mock_llm)
response = agent.step("Set movie folder")
# Memory should have been modified
mem = get_memory()
assert mem.ltm.get_config("movie_folder") == str(real_folder["movies"])
class TestAgentErrorRecovery:
"""Tests for agent error recovery."""
def test_recovers_from_tool_error(self, memory, mock_llm):
"""Should recover from tool error and continue."""
mock_llm.complete.side_effect = [
'{"thought": "try", "action": {"name": "list_folder", "args": {"folder_type": "download"}}}',
"The folder is not configured. Please set it first.",
]
agent = Agent(llm=mock_llm)
response = agent.step("List downloads")
# Should have recovered and provided a response
assert "not configured" in response.lower() or "set" in response.lower()
def test_error_tracked_in_memory(self, memory, mock_llm):
"""Should track errors in episodic memory."""
mock_llm.complete.side_effect = [
'{"thought": "try", "action": {"name": "list_folder", "args": {"folder_type": "download"}}}',
"Error occurred.",
]
agent = Agent(llm=mock_llm)
agent.step("List downloads")
mem = get_memory()
assert len(mem.episodic.recent_errors) > 0
def test_multiple_errors_in_sequence(self, memory, mock_llm):
"""Should track multiple errors."""
call_count = [0]
def mock_complete(messages):
call_count[0] += 1
if call_count[0] <= 3:
return '{"thought": "try", "action": {"name": "list_folder", "args": {"folder_type": "download"}}}'
return "All attempts failed."
mock_llm.complete.side_effect = mock_complete
agent = Agent(llm=mock_llm, max_tool_iterations=3)
agent.step("Try multiple times")
mem = get_memory()
# Should have tracked multiple errors
assert len(mem.episodic.recent_errors) >= 1

View File

@@ -0,0 +1,210 @@
"""Tests for FastAPI endpoints."""
import pytest
from unittest.mock import Mock, patch, MagicMock
from fastapi.testclient import TestClient
class TestHealthEndpoint:
"""Tests for /health endpoint."""
def test_health_check(self, memory):
"""Should return healthy status."""
from app import app
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
class TestModelsEndpoint:
"""Tests for /v1/models endpoint."""
def test_list_models(self, memory):
"""Should return model list."""
from app import app
client = TestClient(app)
response = client.get("/v1/models")
assert response.status_code == 200
data = response.json()
assert data["object"] == "list"
assert len(data["data"]) > 0
assert data["data"][0]["id"] == "agent-media"
class TestMemoryEndpoints:
"""Tests for memory debug endpoints."""
def test_get_memory_state(self, memory):
"""Should return full memory state."""
from app import app
client = TestClient(app)
response = client.get("/memory/state")
assert response.status_code == 200
data = response.json()
assert "ltm" in data
assert "stm" in data
assert "episodic" in data
def test_get_search_results_empty(self, memory):
"""Should return empty when no search results."""
from app import app
client = TestClient(app)
response = client.get("/memory/episodic/search-results")
assert response.status_code == 200
data = response.json()
assert data["status"] == "empty"
def test_get_search_results_with_data(self, memory_with_search_results):
"""Should return search results when available."""
from app import app
client = TestClient(app)
response = client.get("/memory/episodic/search-results")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["query"] == "Inception 1080p"
assert data["result_count"] == 3
def test_clear_session(self, memory_with_search_results):
"""Should clear session memories."""
from app import app
client = TestClient(app)
response = client.post("/memory/clear-session")
assert response.status_code == 200
assert response.json()["status"] == "ok"
# Verify cleared
state = client.get("/memory/state").json()
assert state["episodic"]["last_search_results"] is None
class TestChatCompletionsEndpoint:
"""Tests for /v1/chat/completions endpoint."""
def test_chat_completion_success(self, memory):
"""Should return chat completion."""
from app import app
# Patch the agent's step method directly
with patch("app.agent.step", return_value="Hello! How can I help?"):
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "Hello"}],
})
assert response.status_code == 200
data = response.json()
assert data["object"] == "chat.completion"
assert "Hello" in data["choices"][0]["message"]["content"]
def test_chat_completion_no_user_message(self, memory):
"""Should return error if no user message."""
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "system", "content": "You are helpful"}],
})
assert response.status_code == 422
detail = response.json()["detail"]
# Pydantic returns a list of errors or a string
if isinstance(detail, list):
detail_str = str(detail).lower()
else:
detail_str = detail.lower()
assert "user message" in detail_str
def test_chat_completion_empty_messages(self, memory):
"""Should return error for empty messages."""
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [],
})
assert response.status_code == 422
def test_chat_completion_invalid_json(self, memory):
"""Should return error for invalid JSON."""
from app import app
client = TestClient(app)
response = client.post(
"/v1/chat/completions",
content="not json",
headers={"Content-Type": "application/json"},
)
assert response.status_code == 422
def test_chat_completion_streaming(self, memory):
"""Should support streaming mode."""
from app import app
with patch("app.agent.step", return_value="Streaming response"):
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "Hello"}],
"stream": True,
})
assert response.status_code == 200
assert "text/event-stream" in response.headers["content-type"]
def test_chat_completion_extracts_last_user_message(self, memory):
"""Should use last user message."""
from app import app
with patch("app.agent.step", return_value="Response") as mock_step:
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [
{"role": "user", "content": "First message"},
{"role": "assistant", "content": "Response"},
{"role": "user", "content": "Second message"},
],
})
assert response.status_code == 200
# Verify the agent received the last user message
mock_step.assert_called_once_with("Second message")
def test_chat_completion_response_format(self, memory):
"""Should return OpenAI-compatible format."""
from app import app
with patch("app.agent.step", return_value="Test response"):
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "Test"}],
})
data = response.json()
assert "id" in data
assert data["id"].startswith("chatcmpl-")
assert "created" in data
assert "model" in data
assert "choices" in data
assert "usage" in data
assert data["choices"][0]["finish_reason"] == "stop"
assert data["choices"][0]["message"]["role"] == "assistant"

View File

@@ -0,0 +1,465 @@
"""Edge case tests for FastAPI endpoints."""
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
from fastapi.testclient import TestClient
class TestChatCompletionsEdgeCases:
"""Edge case tests for /v1/chat/completions endpoint."""
def test_very_long_message(self, memory):
"""Should handle very long user message."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
long_message = "x" * 100000
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": long_message}],
})
assert response.status_code == 200
def test_unicode_message(self, memory):
"""Should handle unicode in message."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "日本語の応答"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "日本語のメッセージ 🎬"}],
})
assert response.status_code == 200
content = response.json()["choices"][0]["message"]["content"]
# Response may vary based on agent behavior
assert "日本語" in content or len(content) > 0
def test_special_characters_in_message(self, memory):
"""Should handle special characters."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
special_message = 'Test with "quotes" and \\backslash and \n newline'
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": special_message}],
})
assert response.status_code == 200
def test_empty_content_in_message(self, memory):
"""Should handle empty content in message."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": ""}],
})
# Empty content should be rejected
assert response.status_code == 422
def test_null_content_in_message(self, memory):
"""Should handle null content in message."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": None}],
})
assert response.status_code == 422
def test_missing_content_field(self, memory):
"""Should handle missing content field."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user"}], # No content
})
# May accept or reject depending on validation
assert response.status_code in [200, 400, 422]
def test_missing_role_field(self, memory):
"""Should handle missing role field."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"content": "Hello"}], # No role
})
# Should reject or accept depending on validation
assert response.status_code in [200, 400, 422]
def test_invalid_role(self, memory):
"""Should handle invalid role."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "invalid_role", "content": "Hello"}],
})
# Should reject or ignore invalid role
assert response.status_code in [200, 400, 422]
def test_many_messages(self, memory):
"""Should handle many messages in conversation."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
messages = []
for i in range(100):
messages.append({"role": "user", "content": f"Message {i}"})
messages.append({"role": "assistant", "content": f"Response {i}"})
messages.append({"role": "user", "content": "Final message"})
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": messages,
})
assert response.status_code == 200
def test_only_system_messages(self, memory):
"""Should reject if only system messages."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [
{"role": "system", "content": "You are helpful"},
{"role": "system", "content": "Be concise"},
],
})
assert response.status_code == 422
def test_only_assistant_messages(self, memory):
"""Should reject if only assistant messages."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [
{"role": "assistant", "content": "Hello"},
],
})
assert response.status_code == 422
def test_messages_not_array(self, memory):
"""Should reject if messages is not array."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": "not an array",
})
assert response.status_code == 422
# Pydantic validation error
def test_message_not_object(self, memory):
"""Should handle message that is not object."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": ["not an object", 123, None],
})
assert response.status_code == 422
# Pydantic validation error
def test_extra_fields_in_request(self, memory):
"""Should ignore extra fields in request."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "Hello"}],
"extra_field": "should be ignored",
"temperature": 0.7,
"max_tokens": 100,
})
assert response.status_code == 200
def test_streaming_with_tool_call(self, memory, real_folder):
"""Should handle streaming with tool execution."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.side_effect = [
'{"thought": "list", "action": {"name": "list_folder", "args": {"folder_type": "download"}}}',
"Listed the folder.",
]
mock_llm_class.return_value = mock_llm
from app import app
from infrastructure.persistence import get_memory
mem = get_memory()
mem.ltm.set_config("download_folder", str(real_folder["downloads"]))
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "List downloads"}],
"stream": True,
})
assert response.status_code == 200
def test_concurrent_requests_simulation(self, memory):
"""Should handle rapid sequential requests."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
mock_llm.complete.return_value = "Response"
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
for i in range(10):
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": f"Request {i}"}],
})
assert response.status_code == 200
def test_llm_returns_json_in_response(self, memory):
"""Should handle LLM returning JSON in text response."""
with patch("app.DeepSeekClient") as mock_llm_class:
mock_llm = Mock()
# LLM returns JSON but not a tool call
mock_llm.complete.return_value = '{"result": "some data", "count": 5}'
mock_llm_class.return_value = mock_llm
from app import app
client = TestClient(app)
response = client.post("/v1/chat/completions", json={
"model": "agent-media",
"messages": [{"role": "user", "content": "Give me JSON"}],
})
assert response.status_code == 200
# Should return the JSON as-is since it's not a tool call
content = response.json()["choices"][0]["message"]["content"]
# May parse as tool call or return as text
assert "result" in content or len(content) > 0
class TestMemoryEndpointsEdgeCases:
"""Edge case tests for memory endpoints."""
def test_memory_state_with_large_data(self, memory):
"""Should handle large memory state."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
# Add lots of data to memory
for i in range(100):
memory.stm.add_message("user", f"Message {i}" * 100)
memory.episodic.add_error("action", f"Error {i}")
client = TestClient(app)
response = client.get("/memory/state")
assert response.status_code == 200
data = response.json()
assert "stm" in data
def test_memory_state_with_unicode(self, memory):
"""Should handle unicode in memory state."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
memory.ltm.set_config("japanese", "日本語テスト")
memory.stm.add_message("user", "🎬 Movie request")
client = TestClient(app)
response = client.get("/memory/state")
assert response.status_code == 200
data = response.json()
assert "日本語" in str(data)
def test_search_results_with_special_chars(self, memory):
"""Should handle special characters in search results."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
memory.episodic.store_search_results(
"Test <script>alert('xss')</script>",
[{"name": "Result with \"quotes\" and 'apostrophes'"}],
)
client = TestClient(app)
response = client.get("/memory/episodic/search-results")
assert response.status_code == 200
# Should be properly escaped in JSON
data = response.json()
assert "script" in data["query"]
def test_clear_session_idempotent(self, memory):
"""Should be idempotent - multiple clears should work."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
client = TestClient(app)
# Clear multiple times
for _ in range(5):
response = client.post("/memory/clear-session")
assert response.status_code == 200
def test_clear_session_preserves_ltm(self, memory):
"""Should preserve LTM after clear."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
memory.ltm.set_config("important", "data")
memory.stm.add_message("user", "Hello")
client = TestClient(app)
client.post("/memory/clear-session")
response = client.get("/memory/state")
data = response.json()
assert data["ltm"]["config"]["important"] == "data"
assert data["stm"]["conversation_history"] == []
class TestHealthEndpointEdgeCases:
"""Edge case tests for health endpoint."""
def test_health_returns_version(self, memory):
"""Should return version in health check."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
assert "version" in response.json()
def test_health_with_query_params(self, memory):
"""Should ignore query parameters."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
client = TestClient(app)
response = client.get("/health?extra=param&another=value")
assert response.status_code == 200
class TestModelsEndpointEdgeCases:
"""Edge case tests for models endpoint."""
def test_models_response_format(self, memory):
"""Should return OpenAI-compatible format."""
with patch("app.DeepSeekClient") as mock_llm:
mock_llm.return_value = Mock()
from app import app
client = TestClient(app)
response = client.get("/v1/models")
data = response.json()
assert data["object"] == "list"
assert isinstance(data["data"], list)
assert len(data["data"]) > 0
assert "id" in data["data"][0]
assert "object" in data["data"][0]
assert "created" in data["data"][0]
assert "owned_by" in data["data"][0]

View File

@@ -0,0 +1,309 @@
"""Edge case tests for configuration and parameters."""
import pytest
import os
from unittest.mock import patch
from agent.config import Settings, ConfigurationError
from agent.parameters import (
ParameterSchema,
REQUIRED_PARAMETERS,
format_parameters_for_prompt,
get_missing_required_parameters,
)
class TestSettingsEdgeCases:
"""Edge case tests for Settings."""
def test_default_values(self):
"""Should have sensible defaults."""
with patch.dict(os.environ, {}, clear=True):
settings = Settings()
assert settings.temperature == 0.2
assert settings.max_tool_iterations == 5
assert settings.request_timeout == 30
def test_temperature_boundary_low(self):
"""Should accept temperature at lower boundary."""
with patch.dict(os.environ, {"TEMPERATURE": "0.0"}, clear=True):
settings = Settings()
assert settings.temperature == 0.0
def test_temperature_boundary_high(self):
"""Should accept temperature at upper boundary."""
with patch.dict(os.environ, {"TEMPERATURE": "2.0"}, clear=True):
settings = Settings()
assert settings.temperature == 2.0
def test_temperature_below_boundary(self):
"""Should reject temperature below 0."""
with patch.dict(os.environ, {"TEMPERATURE": "-0.1"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_temperature_above_boundary(self):
"""Should reject temperature above 2."""
with patch.dict(os.environ, {"TEMPERATURE": "2.1"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_max_tool_iterations_boundary_low(self):
"""Should accept max_tool_iterations at lower boundary."""
with patch.dict(os.environ, {"MAX_TOOL_ITERATIONS": "1"}, clear=True):
settings = Settings()
assert settings.max_tool_iterations == 1
def test_max_tool_iterations_boundary_high(self):
"""Should accept max_tool_iterations at upper boundary."""
with patch.dict(os.environ, {"MAX_TOOL_ITERATIONS": "20"}, clear=True):
settings = Settings()
assert settings.max_tool_iterations == 20
def test_max_tool_iterations_below_boundary(self):
"""Should reject max_tool_iterations below 1."""
with patch.dict(os.environ, {"MAX_TOOL_ITERATIONS": "0"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_max_tool_iterations_above_boundary(self):
"""Should reject max_tool_iterations above 20."""
with patch.dict(os.environ, {"MAX_TOOL_ITERATIONS": "21"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_request_timeout_boundary_low(self):
"""Should accept request_timeout at lower boundary."""
with patch.dict(os.environ, {"REQUEST_TIMEOUT": "1"}, clear=True):
settings = Settings()
assert settings.request_timeout == 1
def test_request_timeout_boundary_high(self):
"""Should accept request_timeout at upper boundary."""
with patch.dict(os.environ, {"REQUEST_TIMEOUT": "300"}, clear=True):
settings = Settings()
assert settings.request_timeout == 300
def test_request_timeout_below_boundary(self):
"""Should reject request_timeout below 1."""
with patch.dict(os.environ, {"REQUEST_TIMEOUT": "0"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_request_timeout_above_boundary(self):
"""Should reject request_timeout above 300."""
with patch.dict(os.environ, {"REQUEST_TIMEOUT": "301"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_invalid_deepseek_url(self):
"""Should reject invalid DeepSeek URL."""
with patch.dict(os.environ, {"DEEPSEEK_BASE_URL": "not-a-url"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_invalid_tmdb_url(self):
"""Should reject invalid TMDB URL."""
with patch.dict(os.environ, {"TMDB_BASE_URL": "ftp://invalid"}, clear=True):
with pytest.raises(ConfigurationError):
Settings()
def test_http_url_accepted(self):
"""Should accept http:// URLs."""
with patch.dict(os.environ, {
"DEEPSEEK_BASE_URL": "http://localhost:8080",
"TMDB_BASE_URL": "http://localhost:3000",
}, clear=True):
settings = Settings()
assert settings.deepseek_base_url == "http://localhost:8080"
def test_https_url_accepted(self):
"""Should accept https:// URLs."""
with patch.dict(os.environ, {
"DEEPSEEK_BASE_URL": "https://api.example.com",
"TMDB_BASE_URL": "https://api.example.com",
}, clear=True):
settings = Settings()
assert settings.deepseek_base_url == "https://api.example.com"
def test_is_deepseek_configured_with_key(self):
"""Should return True when API key is set."""
with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}, clear=True):
settings = Settings()
assert settings.is_deepseek_configured() is True
def test_is_deepseek_configured_without_key(self):
"""Should return False when API key is not set."""
with patch.dict(os.environ, {"DEEPSEEK_API_KEY": ""}, clear=True):
settings = Settings()
assert settings.is_deepseek_configured() is False
def test_is_tmdb_configured_with_key(self):
"""Should return True when API key is set."""
with patch.dict(os.environ, {"TMDB_API_KEY": "test-key"}, clear=True):
settings = Settings()
assert settings.is_tmdb_configured() is True
def test_is_tmdb_configured_without_key(self):
"""Should return False when API key is not set."""
with patch.dict(os.environ, {"TMDB_API_KEY": ""}, clear=True):
settings = Settings()
assert settings.is_tmdb_configured() is False
def test_non_numeric_temperature(self):
"""Should handle non-numeric temperature."""
with patch.dict(os.environ, {"TEMPERATURE": "not-a-number"}, clear=True):
with pytest.raises((ConfigurationError, ValueError)):
Settings()
def test_non_numeric_max_iterations(self):
"""Should handle non-numeric max_tool_iterations."""
with patch.dict(os.environ, {"MAX_TOOL_ITERATIONS": "five"}, clear=True):
with pytest.raises((ConfigurationError, ValueError)):
Settings()
class TestParametersEdgeCases:
"""Edge case tests for parameters module."""
def test_parameter_creation(self):
"""Should create parameter with all fields."""
param = ParameterSchema(
key="test_key",
description="Test description",
why_needed="Test reason",
type="string",
)
assert param.key == "test_key"
assert param.description == "Test description"
assert param.why_needed == "Test reason"
assert param.type == "string"
def test_required_parameters_not_empty(self):
"""Should have at least one required parameter."""
assert len(REQUIRED_PARAMETERS) > 0
def test_format_parameters_for_prompt(self):
"""Should format parameters for prompt."""
result = format_parameters_for_prompt()
assert isinstance(result, str)
# Should contain parameter information
for param in REQUIRED_PARAMETERS:
assert param.key in result or param.description in result
def test_get_missing_required_parameters_all_missing(self):
"""Should return all parameters when none configured."""
memory_data = {"config": {}}
missing = get_missing_required_parameters(memory_data)
# Config may have defaults, so check it's a list
assert isinstance(missing, list)
assert len(missing) >= 0
def test_get_missing_required_parameters_none_missing(self):
"""Should return empty when all configured."""
memory_data = {"config": {}}
for param in REQUIRED_PARAMETERS:
memory_data["config"][param.key] = "/some/path"
missing = get_missing_required_parameters(memory_data)
assert len(missing) == 0
def test_get_missing_required_parameters_some_missing(self):
"""Should return only missing parameters."""
memory_data = {"config": {}}
if REQUIRED_PARAMETERS:
# Configure first parameter only
memory_data["config"][REQUIRED_PARAMETERS[0].key] = "/path"
missing = get_missing_required_parameters(memory_data)
# Config may have defaults
assert isinstance(missing, list)
assert len(missing) >= 0
def test_get_missing_required_parameters_with_none_value(self):
"""Should treat None as missing."""
memory_data = {"config": {}}
for param in REQUIRED_PARAMETERS:
memory_data["config"][param.key] = None
missing = get_missing_required_parameters(memory_data)
# Config may have defaults
assert isinstance(missing, list)
assert len(missing) >= 0
def test_get_missing_required_parameters_with_empty_string(self):
"""Should treat empty string as missing."""
memory_data = {"config": {}}
for param in REQUIRED_PARAMETERS:
memory_data["config"][param.key] = ""
missing = get_missing_required_parameters(memory_data)
# Behavior depends on implementation
# Empty string might be considered as "set" or "missing"
assert isinstance(missing, list)
def test_get_missing_required_parameters_no_config_key(self):
"""Should handle missing config key in memory."""
memory_data = {} # No config key at all
missing = get_missing_required_parameters(memory_data)
# Config may have defaults
assert isinstance(missing, list)
assert len(missing) >= 0
def test_get_missing_required_parameters_config_not_dict(self):
"""Should handle config that is not a dict."""
memory_data = {"config": "not a dict"}
# Should either handle gracefully or raise
try:
missing = get_missing_required_parameters(memory_data)
assert isinstance(missing, list)
except (TypeError, AttributeError):
pass # Also acceptable
class TestParameterValidation:
"""Tests for parameter validation."""
def test_parameter_with_unicode(self):
"""Should handle unicode in parameter fields."""
param = ParameterSchema(
key="日本語_key",
description="日本語の説明",
why_needed="日本語の理由",
type="string",
)
assert "日本語" in param.description
def test_parameter_with_special_chars(self):
"""Should handle special characters."""
param = ParameterSchema(
key="key_with_special",
description='Description with "quotes" and \\backslash',
why_needed="Reason with <html> tags",
type="string",
)
assert '"quotes"' in param.description
def test_parameter_with_empty_fields(self):
"""Should handle empty fields."""
param = ParameterSchema(
key="",
description="",
why_needed="",
type="",
)
assert param.key == ""

View File

@@ -1,29 +1,38 @@
"""Tests for the Memory system.""" """Tests for the Memory system."""
import json
import pytest import pytest
import json
from datetime import datetime
from pathlib import Path
from infrastructure.persistence import ( from infrastructure.persistence import (
EpisodicMemory,
LongTermMemory,
Memory, Memory,
LongTermMemory,
ShortTermMemory, ShortTermMemory,
get_memory, EpisodicMemory,
has_memory,
init_memory, init_memory,
get_memory,
set_memory, set_memory,
has_memory,
) )
from infrastructure.persistence.context import _memory_ctx from infrastructure.persistence.context import _memory_ctx
def is_iso_format(s: str) -> bool:
"""Helper to check if a string is a valid ISO 8601 timestamp."""
if not isinstance(s, str):
return False
try:
# Attempt to parse the string as an ISO 8601 timestamp
datetime.fromisoformat(s.replace('Z', '+00:00'))
return True
except (ValueError, TypeError):
return False
class TestLongTermMemory: class TestLongTermMemory:
"""Tests for LongTermMemory.""" """Tests for LongTermMemory."""
def test_default_values(self): def test_default_values(self):
"""LTM should have sensible defaults."""
ltm = LongTermMemory() ltm = LongTermMemory()
assert ltm.config == {} assert ltm.config == {}
assert ltm.preferences["preferred_quality"] == "1080p" assert ltm.preferences["preferred_quality"] == "1080p"
assert "en" in ltm.preferences["preferred_languages"] assert "en" in ltm.preferences["preferred_languages"]
@@ -31,666 +40,195 @@ class TestLongTermMemory:
assert ltm.following == [] assert ltm.following == []
def test_set_and_get_config(self): def test_set_and_get_config(self):
"""Should set and retrieve config values."""
ltm = LongTermMemory() ltm = LongTermMemory()
ltm.set_config("download_folder", "/path/to/downloads") ltm.set_config("download_folder", "/path/to/downloads")
assert ltm.get_config("download_folder") == "/path/to/downloads" assert ltm.get_config("download_folder") == "/path/to/downloads"
def test_get_config_default(self): def test_get_config_default(self):
"""Should return default for missing config."""
ltm = LongTermMemory() ltm = LongTermMemory()
assert ltm.get_config("nonexistent") is None assert ltm.get_config("nonexistent") is None
assert ltm.get_config("nonexistent", "default") == "default" assert ltm.get_config("nonexistent", "default") == "default"
def test_has_config(self): def test_has_config(self):
"""Should check if config exists."""
ltm = LongTermMemory() ltm = LongTermMemory()
assert not ltm.has_config("download_folder") assert not ltm.has_config("download_folder")
ltm.set_config("download_folder", "/path") ltm.set_config("download_folder", "/path")
assert ltm.has_config("download_folder") assert ltm.has_config("download_folder")
def test_has_config_none_value(self): def test_has_config_none_value(self):
"""Should return False for None values."""
ltm = LongTermMemory() ltm = LongTermMemory()
ltm.config["key"] = None ltm.config["key"] = None
assert not ltm.has_config("key") assert not ltm.has_config("key")
def test_add_to_library(self): def test_add_to_library(self):
"""Should add media to library."""
ltm = LongTermMemory() ltm = LongTermMemory()
movie = {"imdb_id": "tt1375666", "title": "Inception"} movie = {"imdb_id": "tt1375666", "title": "Inception"}
ltm.add_to_library("movies", movie) ltm.add_to_library("movies", movie)
assert len(ltm.library["movies"]) == 1 assert len(ltm.library["movies"]) == 1
assert ltm.library["movies"][0]["title"] == "Inception" assert ltm.library["movies"][0]["title"] == "Inception"
assert "added_at" in ltm.library["movies"][0] assert is_iso_format(ltm.library["movies"][0].get("added_at"))
def test_add_to_library_no_duplicates(self): def test_add_to_library_no_duplicates(self):
"""Should not add duplicate media."""
ltm = LongTermMemory() ltm = LongTermMemory()
movie = {"imdb_id": "tt1375666", "title": "Inception"} movie = {"imdb_id": "tt1375666", "title": "Inception"}
ltm.add_to_library("movies", movie) ltm.add_to_library("movies", movie)
ltm.add_to_library("movies", movie) ltm.add_to_library("movies", movie)
assert len(ltm.library["movies"]) == 1 assert len(ltm.library["movies"]) == 1
def test_add_to_library_new_type(self): def test_add_to_library_new_type(self):
"""Should create new media type if not exists."""
ltm = LongTermMemory() ltm = LongTermMemory()
subtitle = {"imdb_id": "tt1375666", "language": "en"} subtitle = {"imdb_id": "tt1375666", "language": "en"}
ltm.add_to_library("subtitles", subtitle) ltm.add_to_library("subtitles", subtitle)
assert "subtitles" in ltm.library assert "subtitles" in ltm.library
assert len(ltm.library["subtitles"]) == 1 assert len(ltm.library["subtitles"]) == 1
def test_get_library(self): def test_get_library(self):
"""Should get library for media type."""
ltm = LongTermMemory() ltm = LongTermMemory()
ltm.add_to_library("movies", {"imdb_id": "tt1", "title": "Movie 1"}) ltm.add_to_library("movies", {"imdb_id": "tt1", "title": "Movie 1"})
ltm.add_to_library("movies", {"imdb_id": "tt2", "title": "Movie 2"}) ltm.add_to_library("movies", {"imdb_id": "tt2", "title": "Movie 2"})
movies = ltm.get_library("movies") movies = ltm.get_library("movies")
assert len(movies) == 2 assert len(movies) == 2
def test_get_library_empty(self): def test_get_library_empty(self):
"""Should return empty list for unknown type."""
ltm = LongTermMemory() ltm = LongTermMemory()
assert ltm.get_library("unknown") == [] assert ltm.get_library("unknown") == []
def test_follow_show(self): def test_follow_show(self):
"""Should add show to following list."""
ltm = LongTermMemory() ltm = LongTermMemory()
show = {"imdb_id": "tt0944947", "title": "Game of Thrones"} show = {"imdb_id": "tt0944947", "title": "Game of Thrones"}
ltm.follow_show(show) ltm.follow_show(show)
assert len(ltm.following) == 1 assert len(ltm.following) == 1
assert ltm.following[0]["title"] == "Game of Thrones" assert ltm.following[0]["title"] == "Game of Thrones"
assert "followed_at" in ltm.following[0] assert is_iso_format(ltm.following[0].get("followed_at"))
def test_follow_show_no_duplicates(self): def test_follow_show_no_duplicates(self):
"""Should not follow same show twice."""
ltm = LongTermMemory() ltm = LongTermMemory()
show = {"imdb_id": "tt0944947", "title": "Game of Thrones"} show = {"imdb_id": "tt0944947", "title": "Game of Thrones"}
ltm.follow_show(show) ltm.follow_show(show)
ltm.follow_show(show) ltm.follow_show(show)
assert len(ltm.following) == 1 assert len(ltm.following) == 1
def test_to_dict(self): def test_to_dict(self):
"""Should serialize to dict."""
ltm = LongTermMemory() ltm = LongTermMemory()
ltm.set_config("key", "value") ltm.set_config("key", "value")
data = ltm.to_dict() data = ltm.to_dict()
assert "config" in data assert "config" in data
assert "preferences" in data
assert "library" in data
assert "following" in data
assert data["config"]["key"] == "value" assert data["config"]["key"] == "value"
def test_from_dict(self): def test_from_dict(self):
"""Should deserialize from dict.""" data = {"config": {"download_folder": "/downloads"}, "preferences": {"preferred_quality": "4K"}, "library": {"movies": [{"imdb_id": "tt1", "title": "Test"}]}, "following": []}
data = {
"config": {"download_folder": "/downloads"},
"preferences": {"preferred_quality": "4K"},
"library": {"movies": [{"imdb_id": "tt1", "title": "Test"}]},
"following": [],
}
ltm = LongTermMemory.from_dict(data) ltm = LongTermMemory.from_dict(data)
assert ltm.get_config("download_folder") == "/downloads" assert ltm.get_config("download_folder") == "/downloads"
assert ltm.preferences["preferred_quality"] == "4K" assert ltm.preferences["preferred_quality"] == "4K"
assert len(ltm.library["movies"]) == 1 assert len(ltm.library["movies"]) == 1
def test_from_dict_missing_keys(self):
"""Should handle missing keys with defaults."""
ltm = LongTermMemory.from_dict({})
assert ltm.config == {}
assert ltm.preferences["preferred_quality"] == "1080p"
class TestShortTermMemory: class TestShortTermMemory:
"""Tests for ShortTermMemory.""" """Tests for ShortTermMemory."""
def test_default_values(self): def test_default_values(self):
"""STM should start empty."""
stm = ShortTermMemory() stm = ShortTermMemory()
assert stm.conversation_history == [] assert stm.conversation_history == []
assert stm.current_workflow is None assert stm.current_workflow is None
assert stm.extracted_entities == {} assert stm.extracted_entities == {}
assert stm.current_topic is None assert stm.current_topic is None
assert stm.language == "en"
def test_add_message(self): def test_add_message(self):
"""Should add message to history."""
stm = ShortTermMemory() stm = ShortTermMemory()
stm.add_message("user", "Hello") stm.add_message("user", "Hello")
assert len(stm.conversation_history) == 1 assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["role"] == "user" assert is_iso_format(stm.conversation_history[0].get("timestamp"))
assert stm.conversation_history[0]["content"] == "Hello"
assert "timestamp" in stm.conversation_history[0]
def test_add_message_max_history(self): def test_add_message_max_history(self):
"""Should limit history to max_history.""" stm = ShortTermMemory(max_history=5)
stm = ShortTermMemory()
stm.max_history = 5
for i in range(10): for i in range(10):
stm.add_message("user", f"Message {i}") stm.add_message("user", f"Message {i}")
assert len(stm.conversation_history) == 5 assert len(stm.conversation_history) == 5
assert stm.conversation_history[0]["content"] == "Message 5" assert stm.conversation_history[0]["content"] == "Message 5"
def test_get_recent_history(self): def test_language_management(self):
"""Should get last N messages."""
stm = ShortTermMemory() stm = ShortTermMemory()
assert stm.language == "en"
for i in range(10): stm.set_language("fr")
stm.add_message("user", f"Message {i}") assert stm.language == "fr"
stm.clear()
recent = stm.get_recent_history(3) assert stm.language == "en"
assert len(recent) == 3
assert recent[0]["content"] == "Message 7"
def test_get_recent_history_less_than_n(self):
"""Should return all if less than N messages."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
stm.add_message("assistant", "Hi")
recent = stm.get_recent_history(10)
assert len(recent) == 2
def test_start_workflow(self):
"""Should start a workflow."""
stm = ShortTermMemory()
stm.start_workflow("download", {"title": "Inception"})
assert stm.current_workflow is not None
assert stm.current_workflow["type"] == "download"
assert stm.current_workflow["target"]["title"] == "Inception"
assert stm.current_workflow["stage"] == "started"
def test_update_workflow_stage(self):
"""Should update workflow stage."""
stm = ShortTermMemory()
stm.start_workflow("download", {"title": "Inception"})
stm.update_workflow_stage("searching")
assert stm.current_workflow["stage"] == "searching"
def test_update_workflow_stage_no_workflow(self):
"""Should do nothing if no workflow."""
stm = ShortTermMemory()
stm.update_workflow_stage("searching") # Should not raise
assert stm.current_workflow is None
def test_end_workflow(self):
"""Should end workflow."""
stm = ShortTermMemory()
stm.start_workflow("download", {"title": "Inception"})
stm.end_workflow()
assert stm.current_workflow is None
def test_set_and_get_entity(self):
"""Should set and get entities."""
stm = ShortTermMemory()
stm.set_entity("movie_title", "Inception")
stm.set_entity("year", 2010)
assert stm.get_entity("movie_title") == "Inception"
assert stm.get_entity("year") == 2010
def test_get_entity_default(self):
"""Should return default for missing entity."""
stm = ShortTermMemory()
assert stm.get_entity("nonexistent") is None
assert stm.get_entity("nonexistent", "default") == "default"
def test_clear_entities(self):
"""Should clear all entities."""
stm = ShortTermMemory()
stm.set_entity("key1", "value1")
stm.set_entity("key2", "value2")
stm.clear_entities()
assert stm.extracted_entities == {}
def test_set_topic(self):
"""Should set current topic."""
stm = ShortTermMemory()
stm.set_topic("searching_movie")
assert stm.current_topic == "searching_movie"
def test_clear(self): def test_clear(self):
"""Should clear all STM data."""
stm = ShortTermMemory() stm = ShortTermMemory()
stm.add_message("user", "Hello") stm.add_message("user", "Hello")
stm.start_workflow("download", {}) stm.set_language("fr")
stm.set_entity("key", "value")
stm.set_topic("topic")
stm.clear() stm.clear()
assert stm.conversation_history == [] assert stm.conversation_history == []
assert stm.current_workflow is None assert stm.language == "en"
assert stm.extracted_entities == {}
assert stm.current_topic is None
def test_to_dict(self):
"""Should serialize to dict."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
stm.set_topic("test")
data = stm.to_dict()
assert "conversation_history" in data
assert "current_workflow" in data
assert "extracted_entities" in data
assert "current_topic" in data
class TestEpisodicMemory: class TestEpisodicMemory:
"""Tests for EpisodicMemory.""" """Tests for EpisodicMemory."""
def test_default_values(self):
"""Episodic should start empty."""
episodic = EpisodicMemory()
assert episodic.last_search_results is None
assert episodic.active_downloads == []
assert episodic.recent_errors == []
assert episodic.pending_question is None
assert episodic.background_events == []
def test_store_search_results(self):
"""Should store search results with indexes."""
episodic = EpisodicMemory()
results = [
{"name": "Result 1", "seeders": 100},
{"name": "Result 2", "seeders": 50},
]
episodic.store_search_results("test query", results)
assert episodic.last_search_results is not None
assert episodic.last_search_results["query"] == "test query"
assert len(episodic.last_search_results["results"]) == 2
assert episodic.last_search_results["results"][0]["index"] == 1
assert episodic.last_search_results["results"][1]["index"] == 2
def test_get_result_by_index(self):
"""Should get result by 1-based index."""
episodic = EpisodicMemory()
results = [
{"name": "Result 1"},
{"name": "Result 2"},
{"name": "Result 3"},
]
episodic.store_search_results("query", results)
result = episodic.get_result_by_index(2)
assert result is not None
assert result["name"] == "Result 2"
def test_get_result_by_index_not_found(self):
"""Should return None for invalid index."""
episodic = EpisodicMemory()
results = [{"name": "Result 1"}]
episodic.store_search_results("query", results)
assert episodic.get_result_by_index(5) is None
assert episodic.get_result_by_index(0) is None
assert episodic.get_result_by_index(-1) is None
def test_get_result_by_index_no_results(self):
"""Should return None if no search results."""
episodic = EpisodicMemory()
assert episodic.get_result_by_index(1) is None
def test_clear_search_results(self):
"""Should clear search results."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Result"}])
episodic.clear_search_results()
assert episodic.last_search_results is None
def test_add_active_download(self):
"""Should add download with timestamp."""
episodic = EpisodicMemory()
episodic.add_active_download(
{
"task_id": "123",
"name": "Test Movie",
"magnet": "magnet:?xt=...",
}
)
assert len(episodic.active_downloads) == 1
assert episodic.active_downloads[0]["name"] == "Test Movie"
assert "started_at" in episodic.active_downloads[0]
def test_update_download_progress(self):
"""Should update download progress."""
episodic = EpisodicMemory()
episodic.add_active_download({"task_id": "123", "name": "Test"})
episodic.update_download_progress("123", 50, "downloading")
assert episodic.active_downloads[0]["progress"] == 50
assert episodic.active_downloads[0]["status"] == "downloading"
def test_update_download_progress_not_found(self):
"""Should do nothing for unknown task_id."""
episodic = EpisodicMemory()
episodic.add_active_download({"task_id": "123", "name": "Test"})
episodic.update_download_progress("999", 50) # Should not raise
assert episodic.active_downloads[0].get("progress") is None
def test_complete_download(self):
"""Should complete download and add event."""
episodic = EpisodicMemory()
episodic.add_active_download({"task_id": "123", "name": "Test Movie"})
completed = episodic.complete_download("123", "/path/to/file.mkv")
assert len(episodic.active_downloads) == 0
assert completed["status"] == "completed"
assert completed["file_path"] == "/path/to/file.mkv"
assert len(episodic.background_events) == 1
assert episodic.background_events[0]["type"] == "download_complete"
def test_complete_download_not_found(self):
"""Should return None for unknown task_id."""
episodic = EpisodicMemory()
result = episodic.complete_download("999", "/path")
assert result is None
def test_add_error(self): def test_add_error(self):
"""Should add error with timestamp."""
episodic = EpisodicMemory() episodic = EpisodicMemory()
episodic.add_error("find_torrent", "API timeout")
episodic.add_error("find_torrent", "API timeout", {"query": "test"})
assert len(episodic.recent_errors) == 1 assert len(episodic.recent_errors) == 1
assert episodic.recent_errors[0]["action"] == "find_torrent" assert is_iso_format(episodic.recent_errors[0].get("timestamp"))
assert episodic.recent_errors[0]["error"] == "API timeout"
def test_add_error_max_limit(self): def test_add_error_max_limit(self):
"""Should limit errors to max_errors.""" episodic = EpisodicMemory(max_errors=3)
episodic = EpisodicMemory()
episodic.max_errors = 3
for i in range(5): for i in range(5):
episodic.add_error("action", f"Error {i}") episodic.add_error("action", f"Error {i}")
assert len(episodic.recent_errors) == 3 assert len(episodic.recent_errors) == 3
assert episodic.recent_errors[0]["error"] == "Error 2" error_messages = [e["error"] for e in episodic.recent_errors]
assert error_messages == ["Error 2", "Error 3", "Error 4"]
def test_set_pending_question(self): def test_store_search_results(self):
"""Should set pending question."""
episodic = EpisodicMemory() episodic = EpisodicMemory()
episodic.store_search_results("test query", [])
assert is_iso_format(episodic.last_search_results.get("timestamp"))
options = [ def test_get_result_by_index(self):
{"index": 1, "label": "Option 1"},
{"index": 2, "label": "Option 2"},
]
episodic.set_pending_question(
"Which one?",
options,
{"context": "test"},
"choice",
)
assert episodic.pending_question is not None
assert episodic.pending_question["question"] == "Which one?"
assert len(episodic.pending_question["options"]) == 2
def test_resolve_pending_question(self):
"""Should resolve question and return chosen option."""
episodic = EpisodicMemory() episodic = EpisodicMemory()
results = [{"name": "Result 1"}, {"name": "Result 2"}]
options = [ episodic.store_search_results("query", results)
{"index": 1, "label": "Option 1"}, result = episodic.get_result_by_index(2)
{"index": 2, "label": "Option 2"}, assert result is not None
] assert result["name"] == "Result 2"
episodic.set_pending_question("Which?", options, {})
result = episodic.resolve_pending_question(2)
assert result["label"] == "Option 2"
assert episodic.pending_question is None
def test_resolve_pending_question_cancel(self):
"""Should cancel question if no index."""
episodic = EpisodicMemory()
episodic.set_pending_question("Which?", [], {})
result = episodic.resolve_pending_question(None)
assert result is None
assert episodic.pending_question is None
def test_add_background_event(self):
"""Should add background event."""
episodic = EpisodicMemory()
episodic.add_background_event("download_complete", {"name": "Movie"})
assert len(episodic.background_events) == 1
assert episodic.background_events[0]["type"] == "download_complete"
assert episodic.background_events[0]["read"] is False
def test_add_background_event_max_limit(self):
"""Should limit events to max_events."""
episodic = EpisodicMemory()
episodic.max_events = 3
for i in range(5):
episodic.add_background_event("event", {"i": i})
assert len(episodic.background_events) == 3
def test_get_unread_events(self):
"""Should get unread events and mark as read."""
episodic = EpisodicMemory()
episodic.add_background_event("event1", {})
episodic.add_background_event("event2", {})
unread = episodic.get_unread_events()
assert len(unread) == 2
assert all(e["read"] for e in episodic.background_events)
def test_get_unread_events_already_read(self):
"""Should not return already read events."""
episodic = EpisodicMemory()
episodic.add_background_event("event1", {})
episodic.get_unread_events() # Mark as read
episodic.add_background_event("event2", {})
unread = episodic.get_unread_events()
assert len(unread) == 1
assert unread[0]["type"] == "event2"
def test_clear(self):
"""Should clear all episodic data."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{}])
episodic.add_active_download({"task_id": "1", "name": "Test"})
episodic.add_error("action", "error")
episodic.set_pending_question("?", [], {})
episodic.add_background_event("event", {})
episodic.clear()
assert episodic.last_search_results is None
assert episodic.active_downloads == []
assert episodic.recent_errors == []
assert episodic.pending_question is None
assert episodic.background_events == []
class TestMemory: class TestMemory:
"""Tests for the Memory manager.""" """Tests for the Memory manager."""
def test_init_creates_directories(self, temp_dir): def test_init_creates_directories(self, temp_dir):
"""Should create storage directory."""
storage = temp_dir / "memory_data" storage = temp_dir / "memory_data"
memory = Memory(storage_dir=str(storage)) Memory(storage_dir=str(storage))
assert storage.exists() assert storage.exists()
def test_init_loads_existing_ltm(self, temp_dir): def test_save_and_load_ltm(self, temp_dir):
"""Should load existing LTM from file.""" storage = str(temp_dir)
ltm_file = temp_dir / "ltm.json" memory = Memory(storage_dir=storage)
ltm_file.write_text(
json.dumps(
{
"config": {"download_folder": "/downloads"},
"preferences": {"preferred_quality": "4K"},
"library": {"movies": []},
"following": [],
}
)
)
memory = Memory(storage_dir=str(temp_dir))
assert memory.ltm.get_config("download_folder") == "/downloads"
assert memory.ltm.preferences["preferred_quality"] == "4K"
def test_init_handles_corrupted_ltm(self, temp_dir):
"""Should handle corrupted LTM file."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text("not valid json {{{")
memory = Memory(storage_dir=str(temp_dir))
assert memory.ltm.config == {} # Default values
def test_save(self, temp_dir):
"""Should save LTM to file."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("test_key", "test_value") memory.ltm.set_config("test_key", "test_value")
memory.save() memory.save()
new_memory = Memory(storage_dir=storage)
assert new_memory.ltm.get_config("test_key") == "test_value"
ltm_file = temp_dir / "ltm.json" def test_clear_session(self, memory):
assert ltm_file.exists() memory.ltm.set_config("key", "value")
data = json.loads(ltm_file.read_text()) memory.stm.add_message("user", "Hello")
assert data["config"]["test_key"] == "test_value" memory.episodic.add_error("action", "error")
memory.clear_session()
def test_get_context_for_prompt(self, memory_with_search_results): assert memory.ltm.get_config("key") == "value"
"""Should generate context for prompt.""" assert memory.stm.conversation_history == []
context = memory_with_search_results.get_context_for_prompt() assert memory.episodic.recent_errors == []
assert "config" in context
assert "preferences" in context
assert context["last_search"]["query"] == "Inception 1080p"
assert context["last_search"]["result_count"] == 3
def test_get_full_state(self, memory):
"""Should return full state of all memories."""
state = memory.get_full_state()
assert "ltm" in state
assert "stm" in state
assert "episodic" in state
def test_clear_session(self, memory_with_search_results):
"""Should clear STM and Episodic but keep LTM."""
memory_with_search_results.ltm.set_config("key", "value")
memory_with_search_results.stm.add_message("user", "Hello")
memory_with_search_results.clear_session()
assert memory_with_search_results.ltm.get_config("key") == "value"
assert memory_with_search_results.stm.conversation_history == []
assert memory_with_search_results.episodic.last_search_results is None
class TestMemoryContext: class TestMemoryContext:
"""Tests for memory context functions.""" """Tests for memory context functions."""
def test_init_memory(self, temp_dir):
"""Should initialize and set memory in context."""
_memory_ctx.set(None) # Reset context
memory = init_memory(str(temp_dir))
assert memory is not None
assert has_memory()
assert get_memory() is memory
def test_set_memory(self, temp_dir):
"""Should set existing memory in context."""
_memory_ctx.set(None)
memory = Memory(storage_dir=str(temp_dir))
set_memory(memory)
assert get_memory() is memory
def test_get_memory_not_initialized(self): def test_get_memory_not_initialized(self):
"""Should raise if memory not initialized."""
_memory_ctx.set(None) _memory_ctx.set(None)
with pytest.raises(RuntimeError, match="Memory not initialized"): with pytest.raises(RuntimeError, match="Memory not initialized"):
get_memory() get_memory()
def test_has_memory(self, temp_dir): def test_init_memory(self, temp_dir):
"""Should check if memory is initialized."""
_memory_ctx.set(None) _memory_ctx.set(None)
assert not has_memory() memory = init_memory(str(temp_dir))
init_memory(str(temp_dir))
assert has_memory() assert has_memory()
assert get_memory() is memory

View File

@@ -0,0 +1,546 @@
"""Edge case tests for the Memory system."""
import pytest
import json
import os
from pathlib import Path
from datetime import datetime
from unittest.mock import patch, mock_open
from infrastructure.persistence import (
Memory,
LongTermMemory,
ShortTermMemory,
EpisodicMemory,
init_memory,
get_memory,
set_memory,
)
from infrastructure.persistence.context import _memory_ctx
class TestLongTermMemoryEdgeCases:
"""Edge case tests for LongTermMemory."""
def test_config_with_none_value(self):
"""Should handle None values in config."""
ltm = LongTermMemory()
ltm.set_config("key", None)
assert ltm.get_config("key") is None
assert not ltm.has_config("key")
def test_config_with_empty_string(self):
"""Should handle empty string values."""
ltm = LongTermMemory()
ltm.set_config("key", "")
assert ltm.get_config("key") == ""
assert ltm.has_config("key") # Empty string is still a value
def test_config_with_complex_types(self):
"""Should handle complex types in config."""
ltm = LongTermMemory()
ltm.set_config("list", [1, 2, 3])
ltm.set_config("dict", {"nested": {"deep": "value"}})
ltm.set_config("bool", False)
ltm.set_config("int", 0)
assert ltm.get_config("list") == [1, 2, 3]
assert ltm.get_config("dict")["nested"]["deep"] == "value"
assert ltm.get_config("bool") is False
assert ltm.get_config("int") == 0
def test_library_with_missing_imdb_id(self):
"""Should handle media without imdb_id."""
ltm = LongTermMemory()
media = {"title": "No ID Movie"}
ltm.add_to_library("movies", media)
# Should still add (imdb_id will be None)
assert len(ltm.library["movies"]) == 1
def test_library_duplicate_check_with_none_id(self):
"""Should handle duplicate check when imdb_id is None."""
ltm = LongTermMemory()
media1 = {"title": "Movie 1"}
media2 = {"title": "Movie 2"}
ltm.add_to_library("movies", media1)
ltm.add_to_library("movies", media2)
# May dedupe or not depending on implementation
assert len(ltm.library["movies"]) >= 1
def test_from_dict_with_extra_keys(self):
"""Should ignore extra keys in dict."""
data = {
"config": {},
"preferences": {},
"library": {"movies": []},
"following": [],
"extra_key": "should be ignored",
"another_extra": [1, 2, 3],
}
ltm = LongTermMemory.from_dict(data)
assert not hasattr(ltm, "extra_key")
def test_from_dict_with_wrong_types(self):
"""Should handle wrong types gracefully."""
data = {
"config": "not a dict", # Should be dict
"preferences": [], # Should be dict
"library": "wrong", # Should be dict
"following": {}, # Should be list
}
# Should not crash, but behavior may vary
try:
ltm = LongTermMemory.from_dict(data)
# If it doesn't crash, check it has some defaults
assert ltm is not None
except (TypeError, AttributeError):
# This is also acceptable behavior
pass
def test_to_dict_preserves_unicode(self):
"""Should preserve unicode in serialization."""
ltm = LongTermMemory()
ltm.set_config("japanese", "日本語")
ltm.set_config("emoji", "🎬🎥")
ltm.add_to_library("movies", {"title": "Amélie", "imdb_id": "tt1"})
data = ltm.to_dict()
assert data["config"]["japanese"] == "日本語"
assert data["config"]["emoji"] == "🎬🎥"
assert data["library"]["movies"][0]["title"] == "Amélie"
class TestShortTermMemoryEdgeCases:
"""Edge case tests for ShortTermMemory."""
def test_add_message_with_empty_content(self):
"""Should handle empty message content."""
stm = ShortTermMemory()
stm.add_message("user", "")
assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["content"] == ""
def test_add_message_with_very_long_content(self):
"""Should handle very long messages."""
stm = ShortTermMemory()
long_content = "x" * 100000
stm.add_message("user", long_content)
assert len(stm.conversation_history[0]["content"]) == 100000
def test_add_message_with_special_characters(self):
"""Should handle special characters."""
stm = ShortTermMemory()
special = "Line1\nLine2\tTab\r\nWindows\x00Null"
stm.add_message("user", special)
assert stm.conversation_history[0]["content"] == special
def test_max_history_zero(self):
"""Should handle max_history of 0."""
stm = ShortTermMemory()
stm.max_history = 0
stm.add_message("user", "Hello")
# Behavior: either empty or keeps last message
assert len(stm.conversation_history) <= 1
def test_max_history_one(self):
"""Should handle max_history of 1."""
stm = ShortTermMemory()
stm.max_history = 1
stm.add_message("user", "First")
stm.add_message("user", "Second")
assert len(stm.conversation_history) == 1
assert stm.conversation_history[0]["content"] == "Second"
def test_get_recent_history_zero(self):
"""Should handle n=0."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
recent = stm.get_recent_history(0)
# May return empty or all messages depending on implementation
assert isinstance(recent, list)
def test_get_recent_history_negative(self):
"""Should handle negative n."""
stm = ShortTermMemory()
stm.add_message("user", "Hello")
recent = stm.get_recent_history(-1)
# Python slicing with negative returns empty or last element
assert isinstance(recent, list)
def test_workflow_with_empty_target(self):
"""Should handle empty workflow target."""
stm = ShortTermMemory()
stm.start_workflow("download", {})
assert stm.current_workflow["target"] == {}
def test_workflow_with_none_target(self):
"""Should handle None workflow target."""
stm = ShortTermMemory()
stm.start_workflow("download", None)
assert stm.current_workflow["target"] is None
def test_entity_with_none_value(self):
"""Should store None as entity value."""
stm = ShortTermMemory()
stm.set_entity("key", None)
assert stm.get_entity("key") is None
assert "key" in stm.extracted_entities
def test_entity_overwrite(self):
"""Should overwrite existing entity."""
stm = ShortTermMemory()
stm.set_entity("key", "value1")
stm.set_entity("key", "value2")
assert stm.get_entity("key") == "value2"
def test_topic_with_empty_string(self):
"""Should handle empty topic."""
stm = ShortTermMemory()
stm.set_topic("")
assert stm.current_topic == ""
class TestEpisodicMemoryEdgeCases:
"""Edge case tests for EpisodicMemory."""
def test_store_empty_results(self):
"""Should handle empty results list."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [])
assert episodic.last_search_results is not None
assert episodic.last_search_results["results"] == []
def test_store_results_with_none_values(self):
"""Should handle results with None values."""
episodic = EpisodicMemory()
results = [
{"name": None, "seeders": None},
{"name": "Valid", "seeders": 100},
]
episodic.store_search_results("query", results)
assert len(episodic.last_search_results["results"]) == 2
def test_get_result_by_index_after_clear(self):
"""Should return None after clearing results."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Test"}])
episodic.clear_search_results()
result = episodic.get_result_by_index(1)
assert result is None
def test_get_result_by_very_large_index(self):
"""Should handle very large index."""
episodic = EpisodicMemory()
episodic.store_search_results("query", [{"name": "Test"}])
result = episodic.get_result_by_index(999999999)
assert result is None
def test_download_with_missing_fields(self):
"""Should handle download with missing fields."""
episodic = EpisodicMemory()
episodic.add_active_download({}) # Empty dict
assert len(episodic.active_downloads) == 1
assert "started_at" in episodic.active_downloads[0]
def test_update_nonexistent_download(self):
"""Should not crash when updating nonexistent download."""
episodic = EpisodicMemory()
# Should not raise
episodic.update_download_progress("nonexistent", 50)
assert episodic.active_downloads == []
def test_complete_nonexistent_download(self):
"""Should return None for nonexistent download."""
episodic = EpisodicMemory()
result = episodic.complete_download("nonexistent", "/path")
assert result is None
def test_error_with_empty_context(self):
"""Should handle error with None context."""
episodic = EpisodicMemory()
episodic.add_error("action", "error", None)
assert episodic.recent_errors[0]["context"] == {}
def test_error_with_very_long_message(self):
"""Should handle very long error messages."""
episodic = EpisodicMemory()
long_error = "x" * 10000
episodic.add_error("action", long_error)
assert len(episodic.recent_errors[0]["error"]) == 10000
def test_pending_question_with_empty_options(self):
"""Should handle question with no options."""
episodic = EpisodicMemory()
episodic.set_pending_question("Question?", [], {})
assert episodic.pending_question["options"] == []
def test_resolve_question_invalid_index(self):
"""Should return None for invalid answer index."""
episodic = EpisodicMemory()
episodic.set_pending_question(
"Question?",
[{"index": 1, "label": "Option"}],
{},
)
result = episodic.resolve_pending_question(999)
assert result is None
assert episodic.pending_question is None # Still cleared
def test_resolve_question_when_none(self):
"""Should handle resolving when no question pending."""
episodic = EpisodicMemory()
result = episodic.resolve_pending_question(1)
assert result is None
def test_background_event_with_empty_data(self):
"""Should handle event with empty data."""
episodic = EpisodicMemory()
episodic.add_background_event("event", {})
assert episodic.background_events[0]["data"] == {}
def test_get_unread_events_multiple_calls(self):
"""Should return empty on second call."""
episodic = EpisodicMemory()
episodic.add_background_event("event", {})
first = episodic.get_unread_events()
second = episodic.get_unread_events()
assert len(first) == 1
assert len(second) == 0
def test_max_errors_boundary(self):
"""Should keep exactly max_errors."""
episodic = EpisodicMemory()
episodic.max_errors = 3
for i in range(3):
episodic.add_error("action", f"Error {i}")
assert len(episodic.recent_errors) == 3
episodic.add_error("action", "Error 3")
assert len(episodic.recent_errors) == 3
assert episodic.recent_errors[0]["error"] == "Error 1"
def test_max_events_boundary(self):
"""Should keep exactly max_events."""
episodic = EpisodicMemory()
episodic.max_events = 3
for i in range(5):
episodic.add_background_event("event", {"i": i})
assert len(episodic.background_events) == 3
assert episodic.background_events[0]["data"]["i"] == 2
class TestMemoryEdgeCases:
"""Edge case tests for Memory manager."""
def test_init_with_nonexistent_directory(self, temp_dir):
"""Should create directory if not exists."""
new_dir = temp_dir / "new" / "nested" / "dir"
# Create parent directories first
new_dir.mkdir(parents=True, exist_ok=True)
memory = Memory(storage_dir=str(new_dir))
assert new_dir.exists()
def test_init_with_readonly_directory(self, temp_dir):
"""Should handle readonly directory gracefully."""
readonly_dir = temp_dir / "readonly"
readonly_dir.mkdir()
# Make readonly (may not work on all systems)
try:
os.chmod(readonly_dir, 0o444)
# This might raise or might work depending on OS
memory = Memory(storage_dir=str(readonly_dir))
except (PermissionError, OSError):
pass # Expected on some systems
finally:
os.chmod(readonly_dir, 0o755)
def test_load_ltm_with_empty_file(self, temp_dir):
"""Should handle empty LTM file."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text("")
memory = Memory(storage_dir=str(temp_dir))
# Should use defaults
assert memory.ltm.config == {}
def test_load_ltm_with_partial_data(self, temp_dir):
"""Should handle partial LTM data."""
ltm_file = temp_dir / "ltm.json"
ltm_file.write_text('{"config": {"key": "value"}}')
memory = Memory(storage_dir=str(temp_dir))
assert memory.ltm.get_config("key") == "value"
# Other fields should have defaults
assert memory.ltm.library == {"movies": [], "tv_shows": []}
def test_save_with_unicode(self, temp_dir):
"""Should save unicode correctly."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("japanese", "日本語テスト")
memory.save()
# Read back and verify
ltm_file = temp_dir / "ltm.json"
data = json.loads(ltm_file.read_text(encoding="utf-8"))
assert data["config"]["japanese"] == "日本語テスト"
def test_save_preserves_formatting(self, temp_dir):
"""Should save with readable formatting."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("key", "value")
memory.save()
ltm_file = temp_dir / "ltm.json"
content = ltm_file.read_text()
# Should be indented (pretty printed)
assert "\n" in content
def test_concurrent_access_simulation(self, temp_dir):
"""Should handle rapid save/load cycles."""
memory = Memory(storage_dir=str(temp_dir))
for i in range(100):
memory.ltm.set_config(f"key_{i}", f"value_{i}")
memory.save()
# Reload and verify
memory2 = Memory(storage_dir=str(temp_dir))
assert memory2.ltm.get_config("key_99") == "value_99"
def test_clear_session_preserves_ltm(self, temp_dir):
"""Should preserve LTM after clear_session."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("important", "data")
memory.stm.add_message("user", "Hello")
memory.episodic.store_search_results("query", [{}])
memory.clear_session()
assert memory.ltm.get_config("important") == "data"
assert memory.stm.conversation_history == []
assert memory.episodic.last_search_results is None
def test_get_context_for_prompt_empty(self, temp_dir):
"""Should handle empty memory state."""
memory = Memory(storage_dir=str(temp_dir))
context = memory.get_context_for_prompt()
assert context["config"] == {}
assert context["last_search"]["query"] is None
assert context["last_search"]["result_count"] == 0
def test_get_full_state_serializable(self, temp_dir):
"""Should return JSON-serializable state."""
memory = Memory(storage_dir=str(temp_dir))
memory.ltm.set_config("key", "value")
memory.stm.add_message("user", "Hello")
memory.episodic.store_search_results("query", [{"name": "Test"}])
state = memory.get_full_state()
# Should be JSON serializable
json_str = json.dumps(state)
assert json_str is not None
class TestMemoryContextEdgeCases:
"""Edge case tests for memory context."""
def test_multiple_init_calls(self, temp_dir):
"""Should handle multiple init calls."""
_memory_ctx.set(None)
mem1 = init_memory(str(temp_dir))
mem2 = init_memory(str(temp_dir))
# Second call should replace first
assert get_memory() is mem2
def test_set_memory_with_none(self):
"""Should handle setting None."""
_memory_ctx.set(None)
set_memory(None)
with pytest.raises(RuntimeError):
get_memory()
def test_context_isolation(self, temp_dir):
"""Context should be isolated per context."""
import asyncio
from contextvars import copy_context
_memory_ctx.set(None)
mem1 = init_memory(str(temp_dir))
# Create a copy of context
ctx = copy_context()
# In the copy, memory should still be set
def check_memory():
return get_memory()
result = ctx.run(check_memory)
assert result is mem1

View File

@@ -0,0 +1,396 @@
"""Edge case tests for PromptBuilder."""
import pytest
import json
from agent.prompts import PromptBuilder
from agent.registry import make_tools
from infrastructure.persistence import get_memory
class TestPromptBuilderEdgeCases:
"""Edge case tests for PromptBuilder."""
def test_prompt_with_empty_memory(self, memory):
"""Should build prompt with completely empty memory."""
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "AVAILABLE TOOLS" in prompt
assert "CURRENT CONFIGURATION" in prompt
def test_prompt_with_unicode_config(self, memory):
"""Should handle unicode in config."""
memory.ltm.set_config("folder_日本語", "/path/to/日本語")
memory.ltm.set_config("emoji_folder", "/path/🎬")
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "日本語" in prompt
assert "🎬" in prompt
def test_prompt_with_very_long_config_value(self, memory):
"""Should handle very long config values."""
long_path = "/very/long/path/" + "x" * 1000
memory.ltm.set_config("download_folder", long_path)
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# Should include the path (possibly truncated)
assert "very/long/path" in prompt
def test_prompt_with_special_chars_in_config(self, memory):
"""Should escape special characters in config."""
memory.ltm.set_config("path", '/path/with "quotes" and \\backslash')
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# Should be valid (not crash)
assert "CURRENT CONFIGURATION" in prompt
def test_prompt_with_many_search_results(self, memory):
"""Should limit displayed search results."""
results = [{"name": f"Torrent {i}", "seeders": i} for i in range(50)]
memory.episodic.store_search_results("test query", results)
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# Should show limited results
assert "LAST SEARCH" in prompt
# Should indicate there are more
assert "more" in prompt.lower() or "..." in prompt
def test_prompt_with_search_results_missing_fields(self, memory):
"""Should handle search results with missing fields."""
results = [
{"name": "Complete"},
{}, # Empty
{"seeders": 100}, # Missing name
]
memory.episodic.store_search_results("test", results)
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# Should not crash
assert "LAST SEARCH" in prompt
def test_prompt_with_many_active_downloads(self, memory):
"""Should limit displayed active downloads."""
for i in range(20):
memory.episodic.add_active_download({
"task_id": str(i),
"name": f"Download {i}",
"progress": i * 5,
})
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "ACTIVE DOWNLOADS" in prompt
# Should show limited number
assert "Download 0" in prompt
def test_prompt_with_many_errors(self, memory):
"""Should show only last error."""
for i in range(10):
memory.episodic.add_error(f"action_{i}", f"Error {i}")
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "LAST ERROR" in prompt
# Should show the most recent error
# (depends on max_errors setting)
def test_prompt_with_pending_question_many_options(self, memory):
"""Should handle pending question with many options."""
options = [{"index": i, "label": f"Option {i}"} for i in range(20)]
memory.episodic.set_pending_question("Choose one:", options, {})
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "PENDING QUESTION" in prompt
assert "Choose one:" in prompt
def test_prompt_with_complex_workflow(self, memory):
"""Should handle complex workflow state."""
memory.stm.start_workflow("download", {
"title": "Test Movie",
"year": 2024,
"quality": "1080p",
"nested": {"deep": {"value": "test"}},
})
memory.stm.update_workflow_stage("searching_torrents")
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "CURRENT WORKFLOW" in prompt
assert "download" in prompt
assert "searching_torrents" in prompt
def test_prompt_with_many_entities(self, memory):
"""Should handle many extracted entities."""
for i in range(50):
memory.stm.set_entity(f"entity_{i}", f"value_{i}")
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "EXTRACTED ENTITIES" in prompt
def test_prompt_with_null_values_in_entities(self, memory):
"""Should handle null values in entities."""
memory.stm.set_entity("null_value", None)
memory.stm.set_entity("empty_string", "")
memory.stm.set_entity("zero", 0)
memory.stm.set_entity("false", False)
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# Should not crash
assert "EXTRACTED ENTITIES" in prompt
def test_prompt_with_unread_events(self, memory):
"""Should include unread events."""
memory.episodic.add_background_event("download_complete", {"name": "Movie.mkv"})
memory.episodic.add_background_event("new_files", {"count": 5})
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
assert "UNREAD EVENTS" in prompt
def test_prompt_with_all_sections(self, memory):
"""Should include all sections when all data present."""
# Config
memory.ltm.set_config("download_folder", "/downloads")
# Search results
memory.episodic.store_search_results("test", [{"name": "Result"}])
# Active downloads
memory.episodic.add_active_download({"task_id": "1", "name": "Download"})
# Errors
memory.episodic.add_error("action", "error")
# Pending question
memory.episodic.set_pending_question("Question?", [], {})
# Workflow
memory.stm.start_workflow("download", {"title": "Test"})
# Topic
memory.stm.set_topic("searching")
# Entities
memory.stm.set_entity("key", "value")
# Events
memory.episodic.add_background_event("event", {})
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# All sections should be present
assert "CURRENT CONFIGURATION" in prompt
assert "LAST SEARCH" in prompt
assert "ACTIVE DOWNLOADS" in prompt
assert "LAST ERROR" in prompt
assert "PENDING QUESTION" in prompt
assert "CURRENT WORKFLOW" in prompt
assert "CURRENT TOPIC" in prompt
assert "EXTRACTED ENTITIES" in prompt
assert "UNREAD EVENTS" in prompt
def test_prompt_json_serializable(self, memory):
"""Should produce JSON-serializable content."""
memory.ltm.set_config("key", {"nested": [1, 2, 3]})
memory.stm.set_entity("complex", {"a": {"b": {"c": "d"}}})
tools = make_tools()
builder = PromptBuilder(tools)
prompt = builder.build_system_prompt()
# The prompt itself is a string, but embedded JSON should be valid
assert isinstance(prompt, str)
class TestFormatToolsDescriptionEdgeCases:
"""Edge case tests for _format_tools_description."""
def test_format_with_no_tools(self, memory):
"""Should handle empty tools dict."""
builder = PromptBuilder({})
desc = builder._format_tools_description()
assert desc == ""
def test_format_with_complex_parameters(self, memory):
"""Should format complex parameter schemas."""
from agent.registry import Tool
tools = {
"complex_tool": Tool(
name="complex_tool",
description="A complex tool",
func=lambda: {},
parameters={
"type": "object",
"properties": {
"nested": {
"type": "object",
"properties": {
"deep": {"type": "string"},
},
},
"array": {
"type": "array",
"items": {"type": "integer"},
},
},
"required": ["nested"],
},
),
}
builder = PromptBuilder(tools)
desc = builder._format_tools_description()
assert "complex_tool" in desc
assert "nested" in desc
class TestFormatEpisodicContextEdgeCases:
"""Edge case tests for _format_episodic_context."""
def test_format_with_empty_search_query(self, memory):
"""Should handle empty search query."""
memory.episodic.store_search_results("", [{"name": "Result"}])
tools = make_tools()
builder = PromptBuilder(tools)
context = builder._format_episodic_context()
assert "LAST SEARCH" in context
def test_format_with_search_results_none_names(self, memory):
"""Should handle results with None names."""
memory.episodic.store_search_results("test", [
{"name": None},
{"title": None},
{},
])
tools = make_tools()
builder = PromptBuilder(tools)
context = builder._format_episodic_context()
# Should not crash
assert "LAST SEARCH" in context
def test_format_with_download_missing_progress(self, memory):
"""Should handle download without progress."""
memory.episodic.add_active_download({"task_id": "1", "name": "Test"})
tools = make_tools()
builder = PromptBuilder(tools)
context = builder._format_episodic_context()
assert "ACTIVE DOWNLOADS" in context
assert "0%" in context # Default progress
class TestFormatStmContextEdgeCases:
"""Edge case tests for _format_stm_context."""
def test_format_with_workflow_missing_target(self, memory):
"""Should handle workflow with missing target."""
memory.stm.current_workflow = {
"type": "download",
"stage": "started",
}
tools = make_tools()
builder = PromptBuilder(tools)
context = builder._format_stm_context()
assert "CURRENT WORKFLOW" in context
def test_format_with_workflow_none_target(self, memory):
"""Should handle workflow with None target."""
memory.stm.start_workflow("download", None)
tools = make_tools()
builder = PromptBuilder(tools)
try:
context = builder._format_stm_context()
assert "CURRENT WORKFLOW" in context or True
except (AttributeError, TypeError):
# Expected if None target causes issues
pass
def test_format_with_empty_topic(self, memory):
"""Should handle empty topic."""
memory.stm.set_topic("")
tools = make_tools()
builder = PromptBuilder(tools)
context = builder._format_stm_context()
# Empty topic might not be shown
assert isinstance(context, str)
def test_format_with_entities_containing_json(self, memory):
"""Should handle entities containing JSON strings."""
memory.stm.set_entity("json_string", '{"key": "value"}')
tools = make_tools()
builder = PromptBuilder(tools)
context = builder._format_stm_context()
assert "EXTRACTED ENTITIES" in context

View File

@@ -0,0 +1,300 @@
"""Edge case tests for tool registry."""
import pytest
from unittest.mock import Mock
from agent.registry import Tool, make_tools
class TestToolEdgeCases:
"""Edge case tests for Tool dataclass."""
def test_tool_creation(self):
"""Should create tool with all fields."""
tool = Tool(
name="test_tool",
description="Test description",
func=lambda: {"status": "ok"},
parameters={"type": "object", "properties": {}},
)
assert tool.name == "test_tool"
assert tool.description == "Test description"
assert callable(tool.func)
def test_tool_with_unicode_name(self):
"""Should handle unicode in tool name."""
tool = Tool(
name="tool_日本語",
description="Japanese tool",
func=lambda: {},
parameters={},
)
assert "日本語" in tool.name
def test_tool_with_unicode_description(self):
"""Should handle unicode in description."""
tool = Tool(
name="test",
description="日本語の説明 🔧",
func=lambda: {},
parameters={},
)
assert "日本語" in tool.description
def test_tool_with_complex_parameters(self):
"""Should handle complex parameter schemas."""
tool = Tool(
name="complex",
description="Complex tool",
func=lambda: {},
parameters={
"type": "object",
"properties": {
"nested": {
"type": "object",
"properties": {
"deep": {
"type": "array",
"items": {"type": "string"},
},
},
},
"enum_field": {
"type": "string",
"enum": ["a", "b", "c"],
},
},
"required": ["nested"],
},
)
assert "nested" in tool.parameters["properties"]
def test_tool_with_empty_parameters(self):
"""Should handle empty parameters."""
tool = Tool(
name="no_params",
description="Tool with no parameters",
func=lambda: {},
parameters={},
)
assert tool.parameters == {}
def test_tool_with_none_func(self):
"""Should handle None func (though invalid)."""
tool = Tool(
name="invalid",
description="Invalid tool",
func=None,
parameters={},
)
assert tool.func is None
def test_tool_func_execution(self):
"""Should execute tool function."""
result_value = {"status": "ok", "data": "test"}
tool = Tool(
name="test",
description="Test",
func=lambda: result_value,
parameters={},
)
result = tool.func()
assert result == result_value
def test_tool_func_with_args(self):
"""Should execute tool function with arguments."""
tool = Tool(
name="test",
description="Test",
func=lambda x, y: {"sum": x + y},
parameters={},
)
result = tool.func(1, 2)
assert result["sum"] == 3
def test_tool_func_with_kwargs(self):
"""Should execute tool function with keyword arguments."""
tool = Tool(
name="test",
description="Test",
func=lambda **kwargs: {"received": kwargs},
parameters={},
)
result = tool.func(a=1, b=2)
assert result["received"]["a"] == 1
class TestMakeToolsEdgeCases:
"""Edge case tests for make_tools function."""
def test_make_tools_returns_dict(self, memory):
"""Should return dictionary of tools."""
tools = make_tools()
assert isinstance(tools, dict)
def test_make_tools_all_tools_have_required_fields(self, memory):
"""Should have all required fields for each tool."""
tools = make_tools()
for name, tool in tools.items():
assert tool.name == name
assert isinstance(tool.description, str)
assert len(tool.description) > 0
assert callable(tool.func)
assert isinstance(tool.parameters, dict)
def test_make_tools_unique_names(self, memory):
"""Should have unique tool names."""
tools = make_tools()
names = list(tools.keys())
assert len(names) == len(set(names))
def test_make_tools_valid_parameter_schemas(self, memory):
"""Should have valid JSON Schema for parameters."""
tools = make_tools()
for tool in tools.values():
params = tool.parameters
if params:
assert "type" in params
assert params["type"] == "object"
if "properties" in params:
assert isinstance(params["properties"], dict)
def test_make_tools_required_params_in_properties(self, memory):
"""Should have required params defined in properties."""
tools = make_tools()
for tool in tools.values():
params = tool.parameters
if "required" in params and "properties" in params:
for req in params["required"]:
assert req in params["properties"], f"Required param {req} not in properties for {tool.name}"
def test_make_tools_descriptions_not_empty(self, memory):
"""Should have non-empty descriptions."""
tools = make_tools()
for tool in tools.values():
assert tool.description.strip() != ""
def test_make_tools_funcs_callable(self, memory):
"""Should have callable functions."""
tools = make_tools()
for tool in tools.values():
assert callable(tool.func)
def test_make_tools_expected_tools_present(self, memory):
"""Should have expected tools."""
tools = make_tools()
expected = [
"set_path_for_folder",
"list_folder",
"find_media_imdb_id",
"find_torrent", # Changed from find_torrents
"add_torrent_by_index",
"add_torrent_to_qbittorrent",
"get_torrent_by_index",
"set_language",
]
for name in expected:
assert name in tools, f"Expected tool {name} not found"
def test_make_tools_idempotent(self, memory):
"""Should return same tools on multiple calls."""
tools1 = make_tools()
tools2 = make_tools()
assert set(tools1.keys()) == set(tools2.keys())
def test_make_tools_parameter_types(self, memory):
"""Should have valid parameter types."""
tools = make_tools()
valid_types = ["string", "integer", "number", "boolean", "array", "object"]
for tool in tools.values():
if "properties" in tool.parameters:
for prop_name, prop_schema in tool.parameters["properties"].items():
if "type" in prop_schema:
assert prop_schema["type"] in valid_types, f"Invalid type for {tool.name}.{prop_name}"
def test_make_tools_enum_values(self, memory):
"""Should have valid enum values."""
tools = make_tools()
for tool in tools.values():
if "properties" in tool.parameters:
for prop_name, prop_schema in tool.parameters["properties"].items():
if "enum" in prop_schema:
assert isinstance(prop_schema["enum"], list)
assert len(prop_schema["enum"]) > 0
class TestToolExecution:
"""Tests for tool execution edge cases."""
def test_tool_returns_dict(self, memory, real_folder):
"""Should return dict from tool execution."""
tools = make_tools()
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = tools["list_folder"].func(folder_type="download")
assert isinstance(result, dict)
def test_tool_returns_status(self, memory, real_folder):
"""Should return status in result."""
tools = make_tools()
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = tools["list_folder"].func(folder_type="download")
assert "status" in result or "error" in result
def test_tool_handles_missing_args(self, memory):
"""Should handle missing required arguments."""
tools = make_tools()
with pytest.raises(TypeError):
tools["set_path_for_folder"].func() # Missing required args
def test_tool_handles_wrong_type_args(self, memory):
"""Should handle wrong type arguments."""
tools = make_tools()
# Pass wrong type - should either work or raise
try:
result = tools["get_torrent_by_index"].func(index="not an int")
# If it doesn't raise, should return error
assert "error" in result or "status" in result
except (TypeError, ValueError):
pass # Also acceptable
def test_tool_handles_extra_args(self, memory, real_folder):
"""Should handle extra arguments."""
tools = make_tools()
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
# Extra args should raise TypeError
with pytest.raises(TypeError):
tools["list_folder"].func(
folder_type="download",
extra_arg="should fail",
)

View File

@@ -0,0 +1,414 @@
"""Tests for JSON repositories."""
import pytest
from datetime import datetime
from infrastructure.persistence.json import (
JsonMovieRepository,
JsonTVShowRepository,
JsonSubtitleRepository,
)
from domain.movies.entities import Movie
from domain.movies.value_objects import MovieTitle, ReleaseYear, Quality
from domain.tv_shows.entities import TVShow
from domain.tv_shows.value_objects import ShowStatus
from domain.subtitles.entities import Subtitle
from domain.subtitles.value_objects import Language, SubtitleFormat, TimingOffset
from domain.shared.value_objects import ImdbId, FilePath, FileSize
class TestJsonMovieRepository:
"""Tests for JsonMovieRepository."""
def test_save_movie(self, memory):
"""Should save a movie."""
repo = JsonMovieRepository()
movie = Movie(
imdb_id=ImdbId("tt1375666"),
title=MovieTitle("Inception"),
release_year=ReleaseYear(2010),
quality=Quality.FULL_HD,
)
repo.save(movie)
assert len(memory.ltm.library["movies"]) == 1
assert memory.ltm.library["movies"][0]["imdb_id"] == "tt1375666"
def test_save_updates_existing(self, memory):
"""Should update existing movie."""
repo = JsonMovieRepository()
movie1 = Movie(
imdb_id=ImdbId("tt1375666"),
title=MovieTitle("Inception"),
quality=Quality.HD,
)
movie2 = Movie(
imdb_id=ImdbId("tt1375666"),
title=MovieTitle("Inception"),
quality=Quality.FULL_HD,
)
repo.save(movie1)
repo.save(movie2)
assert len(memory.ltm.library["movies"]) == 1
assert memory.ltm.library["movies"][0]["quality"] == "1080p"
def test_find_by_imdb_id(self, memory_with_library):
"""Should find movie by IMDb ID."""
repo = JsonMovieRepository()
movie = repo.find_by_imdb_id(ImdbId("tt1375666"))
assert movie is not None
assert movie.title.value == "Inception"
def test_find_by_imdb_id_not_found(self, memory):
"""Should return None if not found."""
repo = JsonMovieRepository()
movie = repo.find_by_imdb_id(ImdbId("tt9999999"))
assert movie is None
def test_find_all(self, memory_with_library):
"""Should return all movies."""
repo = JsonMovieRepository()
movies = repo.find_all()
assert len(movies) >= 2
titles = [m.title.value for m in movies]
assert "Inception" in titles
assert "Interstellar" in titles
def test_find_all_empty(self, memory):
"""Should return empty list if no movies."""
repo = JsonMovieRepository()
movies = repo.find_all()
assert movies == []
def test_delete(self, memory_with_library):
"""Should delete movie."""
repo = JsonMovieRepository()
result = repo.delete(ImdbId("tt1375666"))
assert result is True
assert len(memory_with_library.ltm.library["movies"]) == 1
def test_delete_not_found(self, memory):
"""Should return False if not found."""
repo = JsonMovieRepository()
result = repo.delete(ImdbId("tt9999999"))
assert result is False
def test_exists(self, memory_with_library):
"""Should check if movie exists."""
repo = JsonMovieRepository()
assert repo.exists(ImdbId("tt1375666")) is True
assert repo.exists(ImdbId("tt9999999")) is False
def test_preserves_all_fields(self, memory):
"""Should preserve all movie fields."""
repo = JsonMovieRepository()
movie = Movie(
imdb_id=ImdbId("tt1375666"),
title=MovieTitle("Inception"),
release_year=ReleaseYear(2010),
quality=Quality.FULL_HD,
file_path=FilePath("/movies/inception.mkv"),
file_size=FileSize(2500000000),
tmdb_id=27205,
)
repo.save(movie)
loaded = repo.find_by_imdb_id(ImdbId("tt1375666"))
assert loaded.title.value == "Inception"
assert loaded.release_year.value == 2010
assert loaded.quality.value == "1080p"
assert str(loaded.file_path) == "/movies/inception.mkv"
assert loaded.file_size.bytes == 2500000000
assert loaded.tmdb_id == 27205
class TestJsonTVShowRepository:
"""Tests for JsonTVShowRepository."""
def test_save_show(self, memory):
"""Should save a TV show."""
repo = JsonTVShowRepository()
show = TVShow(
imdb_id=ImdbId("tt0944947"),
title="Game of Thrones",
seasons_count=8,
status=ShowStatus.ENDED,
)
repo.save(show)
assert len(memory.ltm.library["tv_shows"]) == 1
assert memory.ltm.library["tv_shows"][0]["title"] == "Game of Thrones"
def test_save_updates_existing(self, memory):
"""Should update existing show."""
repo = JsonTVShowRepository()
show1 = TVShow(
imdb_id=ImdbId("tt0944947"),
title="Game of Thrones",
seasons_count=7,
status=ShowStatus.ONGOING,
)
show2 = TVShow(
imdb_id=ImdbId("tt0944947"),
title="Game of Thrones",
seasons_count=8,
status=ShowStatus.ENDED,
)
repo.save(show1)
repo.save(show2)
assert len(memory.ltm.library["tv_shows"]) == 1
assert memory.ltm.library["tv_shows"][0]["seasons_count"] == 8
def test_find_by_imdb_id(self, memory_with_library):
"""Should find show by IMDb ID."""
repo = JsonTVShowRepository()
show = repo.find_by_imdb_id(ImdbId("tt0944947"))
assert show is not None
assert show.title == "Game of Thrones"
def test_find_by_imdb_id_not_found(self, memory):
"""Should return None if not found."""
repo = JsonTVShowRepository()
show = repo.find_by_imdb_id(ImdbId("tt9999999"))
assert show is None
def test_find_all(self, memory_with_library):
"""Should return all shows."""
repo = JsonTVShowRepository()
shows = repo.find_all()
assert len(shows) == 1
assert shows[0].title == "Game of Thrones"
def test_delete(self, memory_with_library):
"""Should delete show."""
repo = JsonTVShowRepository()
result = repo.delete(ImdbId("tt0944947"))
assert result is True
assert len(memory_with_library.ltm.library["tv_shows"]) == 0
def test_exists(self, memory_with_library):
"""Should check if show exists."""
repo = JsonTVShowRepository()
assert repo.exists(ImdbId("tt0944947")) is True
assert repo.exists(ImdbId("tt9999999")) is False
def test_preserves_status(self, memory):
"""Should preserve show status."""
repo = JsonTVShowRepository()
for i, status in enumerate([ShowStatus.ONGOING, ShowStatus.ENDED, ShowStatus.UNKNOWN]):
show = TVShow(
imdb_id=ImdbId(f"tt{i+1000000:07d}"),
title=f"Show {status.value}",
seasons_count=1,
status=status,
)
repo.save(show)
loaded = repo.find_by_imdb_id(ImdbId(f"tt{i+1000000:07d}"))
assert loaded.status == status
class TestJsonSubtitleRepository:
"""Tests for JsonSubtitleRepository."""
def test_save_subtitle(self, memory):
"""Should save a subtitle."""
repo = JsonSubtitleRepository()
subtitle = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/inception.en.srt"),
)
repo.save(subtitle)
assert "subtitles" in memory.ltm.library
assert len(memory.ltm.library["subtitles"]) == 1
def test_save_multiple_for_same_media(self, memory):
"""Should allow multiple subtitles for same media."""
repo = JsonSubtitleRepository()
sub_en = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/inception.en.srt"),
)
sub_fr = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.FRENCH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/inception.fr.srt"),
)
repo.save(sub_en)
repo.save(sub_fr)
assert len(memory.ltm.library["subtitles"]) == 2
def test_find_by_media(self, memory):
"""Should find subtitles by media ID."""
repo = JsonSubtitleRepository()
subtitle = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/inception.en.srt"),
)
repo.save(subtitle)
results = repo.find_by_media(ImdbId("tt1375666"))
assert len(results) == 1
assert results[0].language == Language.ENGLISH
def test_find_by_media_with_language_filter(self, memory):
"""Should filter by language."""
repo = JsonSubtitleRepository()
repo.save(Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/en.srt"),
))
repo.save(Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.FRENCH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/fr.srt"),
))
results = repo.find_by_media(ImdbId("tt1375666"), language=Language.FRENCH)
assert len(results) == 1
assert results[0].language == Language.FRENCH
def test_find_by_media_with_episode_filter(self, memory):
"""Should filter by season/episode."""
repo = JsonSubtitleRepository()
repo.save(Subtitle(
media_imdb_id=ImdbId("tt0944947"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/s01e01.srt"),
season_number=1,
episode_number=1,
))
repo.save(Subtitle(
media_imdb_id=ImdbId("tt0944947"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/s01e02.srt"),
season_number=1,
episode_number=2,
))
results = repo.find_by_media(
ImdbId("tt0944947"),
season=1,
episode=1,
)
assert len(results) == 1
assert results[0].episode_number == 1
def test_find_by_media_not_found(self, memory):
"""Should return empty list if not found."""
repo = JsonSubtitleRepository()
results = repo.find_by_media(ImdbId("tt9999999"))
assert results == []
def test_delete(self, memory):
"""Should delete subtitle by file path."""
repo = JsonSubtitleRepository()
subtitle = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/inception.en.srt"),
)
repo.save(subtitle)
result = repo.delete(subtitle)
assert result is True
assert len(memory.ltm.library["subtitles"]) == 0
def test_delete_not_found(self, memory):
"""Should return False if not found."""
repo = JsonSubtitleRepository()
subtitle = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/nonexistent.srt"),
)
result = repo.delete(subtitle)
assert result is False
def test_preserves_all_fields(self, memory):
"""Should preserve all subtitle fields."""
repo = JsonSubtitleRepository()
subtitle = Subtitle(
media_imdb_id=ImdbId("tt1375666"),
language=Language.ENGLISH,
format=SubtitleFormat.SRT,
file_path=FilePath("/subs/inception.en.srt"),
season_number=1,
episode_number=5,
timing_offset=TimingOffset(500),
hearing_impaired=True,
forced=False,
source="OpenSubtitles",
uploader="user123",
download_count=1000,
rating=8.5,
)
repo.save(subtitle)
results = repo.find_by_media(ImdbId("tt1375666"))
assert len(results) == 1
loaded = results[0]
assert loaded.season_number == 1
assert loaded.episode_number == 5
assert loaded.timing_offset.milliseconds == 500
assert loaded.hearing_impaired is True
assert loaded.forced is False
assert loaded.source == "OpenSubtitles"
assert loaded.uploader == "user123"
assert loaded.download_count == 1000
assert loaded.rating == 8.5

View File

@@ -1,8 +1,7 @@
"""Edge case tests for tools.""" """Edge case tests for tools."""
from unittest.mock import Mock, patch
import pytest import pytest
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
from agent.tools import api as api_tools from agent.tools import api as api_tools
from agent.tools import filesystem as fs_tools from agent.tools import filesystem as fs_tools
@@ -16,10 +15,7 @@ class TestFindTorrentEdgeCases:
def test_empty_query(self, mock_use_case_class, memory): def test_empty_query(self, mock_use_case_class, memory):
"""Should handle empty query.""" """Should handle empty query."""
mock_response = Mock() mock_response = Mock()
mock_response.to_dict.return_value = { mock_response.to_dict.return_value = {"status": "error", "error": "invalid_query"}
"status": "error",
"error": "invalid_query",
}
mock_use_case = Mock() mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case mock_use_case_class.return_value = mock_use_case
@@ -32,11 +28,7 @@ class TestFindTorrentEdgeCases:
def test_very_long_query(self, mock_use_case_class, memory): def test_very_long_query(self, mock_use_case_class, memory):
"""Should handle very long query.""" """Should handle very long query."""
mock_response = Mock() mock_response = Mock()
mock_response.to_dict.return_value = { mock_response.to_dict.return_value = {"status": "ok", "torrents": [], "count": 0}
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock() mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case mock_use_case_class.return_value = mock_use_case
@@ -51,11 +43,7 @@ class TestFindTorrentEdgeCases:
def test_special_characters_in_query(self, mock_use_case_class, memory): def test_special_characters_in_query(self, mock_use_case_class, memory):
"""Should handle special characters in query.""" """Should handle special characters in query."""
mock_response = Mock() mock_response = Mock()
mock_response.to_dict.return_value = { mock_response.to_dict.return_value = {"status": "ok", "torrents": [], "count": 0}
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock() mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case mock_use_case_class.return_value = mock_use_case
@@ -69,11 +57,7 @@ class TestFindTorrentEdgeCases:
def test_unicode_query(self, mock_use_case_class, memory): def test_unicode_query(self, mock_use_case_class, memory):
"""Should handle unicode in query.""" """Should handle unicode in query."""
mock_response = Mock() mock_response = Mock()
mock_response.to_dict.return_value = { mock_response.to_dict.return_value = {"status": "ok", "torrents": [], "count": 0}
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock() mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case mock_use_case_class.return_value = mock_use_case
@@ -177,10 +161,7 @@ class TestAddTorrentEdgeCases:
def test_empty_magnet_link(self, mock_use_case_class, memory): def test_empty_magnet_link(self, mock_use_case_class, memory):
"""Should handle empty magnet link.""" """Should handle empty magnet link."""
mock_response = Mock() mock_response = Mock()
mock_response.to_dict.return_value = { mock_response.to_dict.return_value = {"status": "error", "error": "empty_magnet"}
"status": "error",
"error": "empty_magnet",
}
mock_use_case = Mock() mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case mock_use_case_class.return_value = mock_use_case
@@ -345,10 +326,7 @@ class TestFilesystemEdgeCases:
for attempt in attempts: for attempt in attempts:
result = fs_tools.list_folder("download", attempt) result = fs_tools.list_folder("download", attempt)
# Should either be forbidden or not found # Should either be forbidden or not found
assert ( assert result.get("error") in ["forbidden", "not_found", None] or result.get("status") == "ok"
result.get("error") in ["forbidden", "not_found", None]
or result.get("status") == "ok"
)
def test_path_with_null_byte(self, memory, real_folder): def test_path_with_null_byte(self, memory, real_folder):
"""Should block null byte injection.""" """Should block null byte injection."""