"""Edge case tests for tools.""" from unittest.mock import Mock, patch import pytest from alfred.agent.tools import api as api_tools from alfred.agent.tools import filesystem as fs_tools from alfred.infrastructure.persistence import get_memory class TestFindTorrentEdgeCases: """Edge case tests for find_torrent.""" @patch("alfred.agent.tools.api.SearchTorrentsUseCase") def test_empty_query(self, mock_use_case_class, memory): """Should handle empty query.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "error", "error": "invalid_query", } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.find_torrent("") assert result["status"] == "error" @patch("alfred.agent.tools.api.SearchTorrentsUseCase") def test_very_long_query(self, mock_use_case_class, memory): """Should handle very long query.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "torrents": [], "count": 0, } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case long_query = "x" * 10000 result = api_tools.find_torrent(long_query) # Should not crash assert "status" in result @patch("alfred.agent.tools.api.SearchTorrentsUseCase") def test_special_characters_in_query(self, mock_use_case_class, memory): """Should handle special characters in query.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "torrents": [], "count": 0, } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case special_query = "Movie (2024) [1080p] {x265} " result = api_tools.find_torrent(special_query) assert "status" in result @patch("alfred.agent.tools.api.SearchTorrentsUseCase") def test_unicode_query(self, mock_use_case_class, memory): """Should handle unicode in query.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "torrents": [], "count": 0, } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.find_torrent("日本語映画 2024") assert "status" in result @patch("alfred.agent.tools.api.SearchTorrentsUseCase") def test_results_with_missing_fields(self, mock_use_case_class, memory): """Should handle results with missing fields.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "torrents": [ {"name": "Torrent 1"}, # Missing seeders, magnet, etc. {}, # Completely empty ], "count": 2, } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.find_torrent("Test") assert result["status"] == "ok" mem = get_memory() assert len(mem.episodic.last_search_results["results"]) == 2 @patch("alfred.agent.tools.api.SearchTorrentsUseCase") def test_api_timeout(self, mock_use_case_class, memory): """Should handle API timeout.""" mock_use_case = Mock() mock_use_case.execute.side_effect = TimeoutError("Connection timed out") mock_use_case_class.return_value = mock_use_case with pytest.raises(TimeoutError): api_tools.find_torrent("Test") class TestGetTorrentByIndexEdgeCases: """Edge case tests for get_torrent_by_index.""" def test_index_as_float(self, memory_with_search_results): """Should handle float index (converted to int).""" # Python will convert 2.0 to 2 when passed as int result = api_tools.get_torrent_by_index(int(2.9)) assert result["status"] == "ok" assert result["torrent"]["index"] == 2 def test_results_modified_between_calls(self, memory): """Should handle results being modified.""" memory.episodic.store_search_results("query1", [{"name": "Result 1"}]) # Get first result result1 = api_tools.get_torrent_by_index(1) assert result1["status"] == "ok" # Store new results memory.episodic.store_search_results("query2", [{"name": "New Result"}]) # Get first result again - should be new result result2 = api_tools.get_torrent_by_index(1) assert result2["torrent"]["name"] == "New Result" def test_result_with_index_already_set(self, memory): """Should handle results that already have index field.""" memory.episodic.store_search_results( "query", [{"name": "Result", "index": 999}], # Pre-existing index ) result = api_tools.get_torrent_by_index(1) # May overwrite or error depending on implementation assert result["status"] in ["ok", "error"] class TestAddTorrentEdgeCases: """Edge case tests for add_torrent functions.""" @patch("alfred.agent.tools.api.AddTorrentUseCase") def test_invalid_magnet_link(self, mock_use_case_class, memory): """Should handle invalid magnet link.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "error", "error": "invalid_magnet", } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.add_torrent_to_qbittorrent("not a magnet link") assert result["status"] == "error" @patch("alfred.agent.tools.api.AddTorrentUseCase") def test_empty_magnet_link(self, mock_use_case_class, memory): """Should handle empty magnet link.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "error", "error": "empty_magnet", } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.add_torrent_to_qbittorrent("") assert result["status"] == "error" @patch("alfred.agent.tools.api.AddTorrentUseCase") def test_very_long_magnet_link(self, mock_use_case_class, memory): """Should handle very long magnet link.""" mock_response = Mock() mock_response.to_dict.return_value = {"status": "ok"} mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case long_magnet = "magnet:?xt=urn:btih:" + "a" * 10000 result = api_tools.add_torrent_to_qbittorrent(long_magnet) assert "status" in result @patch("alfred.agent.tools.api.AddTorrentUseCase") def test_qbittorrent_connection_refused(self, mock_use_case_class, memory): """Should handle qBittorrent connection refused.""" mock_use_case = Mock() mock_use_case.execute.side_effect = ConnectionRefusedError() mock_use_case_class.return_value = mock_use_case with pytest.raises(ConnectionRefusedError): api_tools.add_torrent_to_qbittorrent("magnet:?xt=...") def test_add_by_index_with_empty_magnet(self, memory): """Should handle torrent with empty magnet.""" memory.episodic.store_search_results( "query", [{"name": "Torrent", "magnet": ""}], ) result = api_tools.add_torrent_by_index(1) assert result["status"] == "error" assert result["error"] == "no_magnet" def test_add_by_index_with_whitespace_magnet(self, memory): """Should handle torrent with whitespace magnet.""" memory.episodic.store_search_results( "query", [{"name": "Torrent", "magnet": " "}], ) result = api_tools.add_torrent_by_index(1) # Whitespace-only magnet should be treated as no magnet # Behavior depends on implementation assert "status" in result class TestFilesystemEdgeCases: """Edge case tests for filesystem tools.""" def test_set_path_with_trailing_slash(self, memory, real_folder): """Should handle path with trailing slash.""" path_with_slash = str(real_folder["downloads"]) + "/" result = fs_tools.set_path_for_folder("download", path_with_slash) assert result["status"] == "ok" def test_set_path_with_double_slashes(self, memory, real_folder): """Should handle path with double slashes.""" path_double = str(real_folder["downloads"]).replace("/", "//") result = fs_tools.set_path_for_folder("download", path_double) # Should normalize and work assert result["status"] == "ok" def test_set_path_with_dot_segments(self, memory, real_folder): """Should handle path with . segments.""" path_with_dots = str(real_folder["downloads"]) + "/./." result = fs_tools.set_path_for_folder("download", path_with_dots) assert result["status"] == "ok" def test_list_folder_with_hidden_files(self, memory, real_folder): """Should list hidden files.""" hidden_file = real_folder["downloads"] / ".hidden" hidden_file.touch() memory.ltm.set_config("download_folder", str(real_folder["downloads"])) result = fs_tools.list_folder("download") assert ".hidden" in result["entries"] def test_list_folder_with_broken_symlink(self, memory, real_folder): """Should handle broken symlinks.""" broken_link = real_folder["downloads"] / "broken_link" try: broken_link.symlink_to("/nonexistent/target") except OSError: pytest.skip("Cannot create symlinks") memory.ltm.set_config("download_folder", str(real_folder["downloads"])) result = fs_tools.list_folder("download") # Should still list the symlink assert "broken_link" in result["entries"] def test_list_folder_with_permission_denied_file(self, memory, real_folder): """Should handle files with no read permission.""" import os no_read = real_folder["downloads"] / "no_read.txt" no_read.touch() try: os.chmod(no_read, 0o000) memory.ltm.set_config("download_folder", str(real_folder["downloads"])) result = fs_tools.list_folder("download") # Should still list the file (listing doesn't require read permission) assert "no_read.txt" in result["entries"] finally: os.chmod(no_read, 0o644) def test_list_folder_case_sensitivity(self, memory, real_folder): """Should handle case sensitivity correctly.""" memory.ltm.set_config("download_folder", str(real_folder["downloads"])) # Try with different cases result_lower = fs_tools.list_folder("download") # Note: folder_type is validated, so "DOWNLOAD" would fail validation assert result_lower["status"] == "ok" def test_list_folder_with_spaces_in_path(self, memory, real_folder): """Should handle spaces in path.""" space_dir = real_folder["downloads"] / "folder with spaces" space_dir.mkdir() memory.ltm.set_config("download_folder", str(real_folder["downloads"])) result = fs_tools.list_folder("download", "folder with spaces") assert result["status"] == "ok" def test_path_traversal_with_encoded_chars(self, memory, real_folder): """Should block URL-encoded traversal attempts.""" memory.ltm.set_config("download_folder", str(real_folder["downloads"])) # Various encoding attempts attempts = [ "..%2f", "..%5c", "%2e%2e/", "..%252f", ] for attempt in attempts: result = fs_tools.list_folder("download", attempt) # Should either be forbidden or not found assert ( result.get("error") in ["forbidden", "not_found", None] or result.get("status") == "ok" ) def test_path_with_null_byte(self, memory, real_folder): """Should block null byte injection.""" memory.ltm.set_config("download_folder", str(real_folder["downloads"])) result = fs_tools.list_folder("download", "file\x00.txt") assert result["error"] == "forbidden" def test_very_deep_path(self, memory, real_folder): """Should handle very deep paths.""" # Create deep directory structure deep_path = real_folder["downloads"] for i in range(20): deep_path = deep_path / f"level{i}" deep_path.mkdir(parents=True) memory.ltm.set_config("download_folder", str(real_folder["downloads"])) # Navigate to deep path relative_path = "/".join([f"level{i}" for i in range(20)]) result = fs_tools.list_folder("download", relative_path) assert result["status"] == "ok" def test_folder_with_many_files(self, memory, real_folder): """Should handle folder with many files.""" # Create many files for i in range(1000): (real_folder["downloads"] / f"file_{i:04d}.txt").touch() memory.ltm.set_config("download_folder", str(real_folder["downloads"])) result = fs_tools.list_folder("download") assert result["status"] == "ok" assert result["count"] >= 1000 class TestFindMediaImdbIdEdgeCases: """Edge case tests for find_media_imdb_id.""" @patch("alfred.agent.tools.api.SearchMovieUseCase") def test_movie_with_same_name_different_years(self, mock_use_case_class, memory): """Should handle movies with same name.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "imdb_id": "tt1234567", "title": "The Thing", "year": 1982, } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.find_media_imdb_id("The Thing 1982") assert result["status"] == "ok" @patch("alfred.agent.tools.api.SearchMovieUseCase") def test_movie_with_special_title(self, mock_use_case_class, memory): """Should handle movies with special characters in title.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "imdb_id": "tt1234567", "title": "Se7en", } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.find_media_imdb_id("Se7en") assert result["status"] == "ok" @patch("alfred.agent.tools.api.SearchMovieUseCase") def test_tv_show_vs_movie(self, mock_use_case_class, memory): """Should distinguish TV shows from movies.""" mock_response = Mock() mock_response.to_dict.return_value = { "status": "ok", "imdb_id": "tt0944947", "title": "Game of Thrones", "media_type": "tv", } mock_use_case = Mock() mock_use_case.execute.return_value = mock_response mock_use_case_class.return_value = mock_use_case result = api_tools.find_media_imdb_id("Game of Thrones") assert result["media_type"] == "tv"