Files
alfred/tests/test_tools_edge_cases.py
2025-12-07 03:33:51 +01:00

446 lines
16 KiB
Python

"""Edge case tests for tools."""
from unittest.mock import Mock, patch
import pytest
from agent.tools import api as api_tools
from agent.tools import filesystem as fs_tools
from infrastructure.persistence import get_memory
class TestFindTorrentEdgeCases:
"""Edge case tests for find_torrent."""
@patch("agent.tools.api.SearchTorrentsUseCase")
def test_empty_query(self, mock_use_case_class, memory):
"""Should handle empty query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "error",
"error": "invalid_query",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_torrent("")
assert result["status"] == "error"
@patch("agent.tools.api.SearchTorrentsUseCase")
def test_very_long_query(self, mock_use_case_class, memory):
"""Should handle very long query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
long_query = "x" * 10000
result = api_tools.find_torrent(long_query)
# Should not crash
assert "status" in result
@patch("agent.tools.api.SearchTorrentsUseCase")
def test_special_characters_in_query(self, mock_use_case_class, memory):
"""Should handle special characters in query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
special_query = "Movie (2024) [1080p] {x265} <HDR>"
result = api_tools.find_torrent(special_query)
assert "status" in result
@patch("agent.tools.api.SearchTorrentsUseCase")
def test_unicode_query(self, mock_use_case_class, memory):
"""Should handle unicode in query."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [],
"count": 0,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_torrent("日本語映画 2024")
assert "status" in result
@patch("agent.tools.api.SearchTorrentsUseCase")
def test_results_with_missing_fields(self, mock_use_case_class, memory):
"""Should handle results with missing fields."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"torrents": [
{"name": "Torrent 1"}, # Missing seeders, magnet, etc.
{}, # Completely empty
],
"count": 2,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_torrent("Test")
assert result["status"] == "ok"
mem = get_memory()
assert len(mem.episodic.last_search_results["results"]) == 2
@patch("agent.tools.api.SearchTorrentsUseCase")
def test_api_timeout(self, mock_use_case_class, memory):
"""Should handle API timeout."""
mock_use_case = Mock()
mock_use_case.execute.side_effect = TimeoutError("Connection timed out")
mock_use_case_class.return_value = mock_use_case
with pytest.raises(TimeoutError):
api_tools.find_torrent("Test")
class TestGetTorrentByIndexEdgeCases:
"""Edge case tests for get_torrent_by_index."""
def test_index_as_float(self, memory_with_search_results):
"""Should handle float index (converted to int)."""
# Python will convert 2.0 to 2 when passed as int
result = api_tools.get_torrent_by_index(int(2.9))
assert result["status"] == "ok"
assert result["torrent"]["index"] == 2
def test_results_modified_between_calls(self, memory):
"""Should handle results being modified."""
memory.episodic.store_search_results("query1", [{"name": "Result 1"}])
# Get first result
result1 = api_tools.get_torrent_by_index(1)
assert result1["status"] == "ok"
# Store new results
memory.episodic.store_search_results("query2", [{"name": "New Result"}])
# Get first result again - should be new result
result2 = api_tools.get_torrent_by_index(1)
assert result2["torrent"]["name"] == "New Result"
def test_result_with_index_already_set(self, memory):
"""Should handle results that already have index field."""
memory.episodic.store_search_results(
"query",
[{"name": "Result", "index": 999}], # Pre-existing index
)
result = api_tools.get_torrent_by_index(1)
# May overwrite or error depending on implementation
assert result["status"] in ["ok", "error"]
class TestAddTorrentEdgeCases:
"""Edge case tests for add_torrent functions."""
@patch("agent.tools.api.AddTorrentUseCase")
def test_invalid_magnet_link(self, mock_use_case_class, memory):
"""Should handle invalid magnet link."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "error",
"error": "invalid_magnet",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.add_torrent_to_qbittorrent("not a magnet link")
assert result["status"] == "error"
@patch("agent.tools.api.AddTorrentUseCase")
def test_empty_magnet_link(self, mock_use_case_class, memory):
"""Should handle empty magnet link."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "error",
"error": "empty_magnet",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.add_torrent_to_qbittorrent("")
assert result["status"] == "error"
@patch("agent.tools.api.AddTorrentUseCase")
def test_very_long_magnet_link(self, mock_use_case_class, memory):
"""Should handle very long magnet link."""
mock_response = Mock()
mock_response.to_dict.return_value = {"status": "ok"}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
long_magnet = "magnet:?xt=urn:btih:" + "a" * 10000
result = api_tools.add_torrent_to_qbittorrent(long_magnet)
assert "status" in result
@patch("agent.tools.api.AddTorrentUseCase")
def test_qbittorrent_connection_refused(self, mock_use_case_class, memory):
"""Should handle qBittorrent connection refused."""
mock_use_case = Mock()
mock_use_case.execute.side_effect = ConnectionRefusedError()
mock_use_case_class.return_value = mock_use_case
with pytest.raises(ConnectionRefusedError):
api_tools.add_torrent_to_qbittorrent("magnet:?xt=...")
def test_add_by_index_with_empty_magnet(self, memory):
"""Should handle torrent with empty magnet."""
memory.episodic.store_search_results(
"query",
[{"name": "Torrent", "magnet": ""}],
)
result = api_tools.add_torrent_by_index(1)
assert result["status"] == "error"
assert result["error"] == "no_magnet"
def test_add_by_index_with_whitespace_magnet(self, memory):
"""Should handle torrent with whitespace magnet."""
memory.episodic.store_search_results(
"query",
[{"name": "Torrent", "magnet": " "}],
)
result = api_tools.add_torrent_by_index(1)
# Whitespace-only magnet should be treated as no magnet
# Behavior depends on implementation
assert "status" in result
class TestFilesystemEdgeCases:
"""Edge case tests for filesystem tools."""
def test_set_path_with_trailing_slash(self, memory, real_folder):
"""Should handle path with trailing slash."""
path_with_slash = str(real_folder["downloads"]) + "/"
result = fs_tools.set_path_for_folder("download", path_with_slash)
assert result["status"] == "ok"
def test_set_path_with_double_slashes(self, memory, real_folder):
"""Should handle path with double slashes."""
path_double = str(real_folder["downloads"]).replace("/", "//")
result = fs_tools.set_path_for_folder("download", path_double)
# Should normalize and work
assert result["status"] == "ok"
def test_set_path_with_dot_segments(self, memory, real_folder):
"""Should handle path with . segments."""
path_with_dots = str(real_folder["downloads"]) + "/./."
result = fs_tools.set_path_for_folder("download", path_with_dots)
assert result["status"] == "ok"
def test_list_folder_with_hidden_files(self, memory, real_folder):
"""Should list hidden files."""
hidden_file = real_folder["downloads"] / ".hidden"
hidden_file.touch()
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = fs_tools.list_folder("download")
assert ".hidden" in result["entries"]
def test_list_folder_with_broken_symlink(self, memory, real_folder):
"""Should handle broken symlinks."""
broken_link = real_folder["downloads"] / "broken_link"
try:
broken_link.symlink_to("/nonexistent/target")
except OSError:
pytest.skip("Cannot create symlinks")
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = fs_tools.list_folder("download")
# Should still list the symlink
assert "broken_link" in result["entries"]
def test_list_folder_with_permission_denied_file(self, memory, real_folder):
"""Should handle files with no read permission."""
import os
no_read = real_folder["downloads"] / "no_read.txt"
no_read.touch()
try:
os.chmod(no_read, 0o000)
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = fs_tools.list_folder("download")
# Should still list the file (listing doesn't require read permission)
assert "no_read.txt" in result["entries"]
finally:
os.chmod(no_read, 0o644)
def test_list_folder_case_sensitivity(self, memory, real_folder):
"""Should handle case sensitivity correctly."""
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
# Try with different cases
result_lower = fs_tools.list_folder("download")
# Note: folder_type is validated, so "DOWNLOAD" would fail validation
assert result_lower["status"] == "ok"
def test_list_folder_with_spaces_in_path(self, memory, real_folder):
"""Should handle spaces in path."""
space_dir = real_folder["downloads"] / "folder with spaces"
space_dir.mkdir()
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = fs_tools.list_folder("download", "folder with spaces")
assert result["status"] == "ok"
def test_path_traversal_with_encoded_chars(self, memory, real_folder):
"""Should block URL-encoded traversal attempts."""
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
# Various encoding attempts
attempts = [
"..%2f",
"..%5c",
"%2e%2e/",
"..%252f",
]
for attempt in attempts:
result = fs_tools.list_folder("download", attempt)
# Should either be forbidden or not found
assert (
result.get("error") in ["forbidden", "not_found", None]
or result.get("status") == "ok"
)
def test_path_with_null_byte(self, memory, real_folder):
"""Should block null byte injection."""
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = fs_tools.list_folder("download", "file\x00.txt")
assert result["error"] == "forbidden"
def test_very_deep_path(self, memory, real_folder):
"""Should handle very deep paths."""
# Create deep directory structure
deep_path = real_folder["downloads"]
for i in range(20):
deep_path = deep_path / f"level{i}"
deep_path.mkdir(parents=True)
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
# Navigate to deep path
relative_path = "/".join([f"level{i}" for i in range(20)])
result = fs_tools.list_folder("download", relative_path)
assert result["status"] == "ok"
def test_folder_with_many_files(self, memory, real_folder):
"""Should handle folder with many files."""
# Create many files
for i in range(1000):
(real_folder["downloads"] / f"file_{i:04d}.txt").touch()
memory.ltm.set_config("download_folder", str(real_folder["downloads"]))
result = fs_tools.list_folder("download")
assert result["status"] == "ok"
assert result["count"] >= 1000
class TestFindMediaImdbIdEdgeCases:
"""Edge case tests for find_media_imdb_id."""
@patch("agent.tools.api.SearchMovieUseCase")
def test_movie_with_same_name_different_years(self, mock_use_case_class, memory):
"""Should handle movies with same name."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"imdb_id": "tt1234567",
"title": "The Thing",
"year": 1982,
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_media_imdb_id("The Thing 1982")
assert result["status"] == "ok"
@patch("agent.tools.api.SearchMovieUseCase")
def test_movie_with_special_title(self, mock_use_case_class, memory):
"""Should handle movies with special characters in title."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"imdb_id": "tt1234567",
"title": "Se7en",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_media_imdb_id("Se7en")
assert result["status"] == "ok"
@patch("agent.tools.api.SearchMovieUseCase")
def test_tv_show_vs_movie(self, mock_use_case_class, memory):
"""Should distinguish TV shows from movies."""
mock_response = Mock()
mock_response.to_dict.return_value = {
"status": "ok",
"imdb_id": "tt0944947",
"title": "Game of Thrones",
"media_type": "tv",
}
mock_use_case = Mock()
mock_use_case.execute.return_value = mock_response
mock_use_case_class.return_value = mock_use_case
result = api_tools.find_media_imdb_id("Game of Thrones")
assert result["media_type"] == "tv"