"""Edge case tests for JSON repositories.""" from datetime import datetime from alfred.domain.movies.entities import Movie from alfred.domain.movies.value_objects import MovieTitle, Quality from alfred.domain.shared.value_objects import FilePath, FileSize, ImdbId from alfred.domain.subtitles.entities import Subtitle from alfred.domain.subtitles.value_objects import Language, SubtitleFormat, TimingOffset from alfred.domain.tv_shows.entities import TVShow from alfred.domain.tv_shows.value_objects import ShowStatus from alfred.infrastructure.persistence.json import ( JsonMovieRepository, JsonSubtitleRepository, JsonTVShowRepository, ) class TestJsonMovieRepositoryEdgeCases: """Edge case tests for JsonMovieRepository.""" def test_save_movie_with_unicode_title(self, memory): """Should save movie with unicode title.""" repo = JsonMovieRepository() movie = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("千と千尋の神隠し"), quality=Quality.FULL_HD, ) repo.save(movie) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.title.value == "千と千尋の神隠し" def test_save_movie_with_special_chars_in_path(self, memory): """Should save movie with special characters in path.""" repo = JsonMovieRepository() movie = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("Test"), quality=Quality.FULL_HD, file_path=FilePath("/movies/Test (2024) [1080p] {x265}.mkv"), ) repo.save(movie) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert "[1080p]" in str(loaded.file_path) def test_save_movie_with_very_long_title(self, memory): """Should save movie with very long title.""" repo = JsonMovieRepository() long_title = "A" * 500 movie = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle(long_title), quality=Quality.FULL_HD, ) repo.save(movie) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert len(loaded.title.value) == 500 def test_save_movie_with_zero_file_size(self, memory): """Should save movie with zero file size.""" repo = JsonMovieRepository() movie = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("Test"), quality=Quality.FULL_HD, file_size=FileSize(0), ) repo.save(movie) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) # May be None or 0 depending on implementation assert loaded.file_size is None or loaded.file_size.bytes == 0 def test_save_movie_with_very_large_file_size(self, memory): """Should save movie with very large file size.""" repo = JsonMovieRepository() large_size = 100 * 1024 * 1024 * 1024 # 100 GB movie = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("Test"), quality=Quality.UHD_4K, # Use valid quality enum file_size=FileSize(large_size), ) repo.save(movie) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.file_size.bytes == large_size def test_find_all_with_corrupted_entry(self, memory): """Should handle corrupted entries gracefully.""" # Manually add corrupted data with valid IMDb IDs memory.ltm.library["movies"] = [ { "imdb_id": "tt1234567", "title": "Valid", "quality": "1080p", "added_at": datetime.now().isoformat(), }, {"imdb_id": "tt2345678"}, # Missing required fields { "imdb_id": "tt3456789", "title": "Also Valid", "quality": "720p", "added_at": datetime.now().isoformat(), }, ] repo = JsonMovieRepository() # Should either skip corrupted or raise try: movies = repo.find_all() # If it works, should have at least the valid ones assert len(movies) >= 1 except (KeyError, TypeError, Exception): # If it raises, that's also acceptable pass def test_delete_nonexistent_movie(self, memory): """Should return False for nonexistent movie.""" repo = JsonMovieRepository() result = repo.delete(ImdbId("tt9999999")) assert result is False def test_delete_from_empty_library(self, memory): """Should handle delete from empty library.""" repo = JsonMovieRepository() memory.ltm.library["movies"] = [] result = repo.delete(ImdbId("tt1234567")) assert result is False def test_exists_with_similar_ids(self, memory): """Should distinguish similar IMDb IDs.""" repo = JsonMovieRepository() movie = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("Test"), quality=Quality.FULL_HD, ) repo.save(movie) assert repo.exists(ImdbId("tt1234567")) is True assert repo.exists(ImdbId("tt12345678")) is False assert repo.exists(ImdbId("tt7654321")) is False def test_save_preserves_added_at(self, memory): """Should preserve original added_at on update.""" repo = JsonMovieRepository() # Save first version movie1 = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("Test"), quality=Quality.HD, added_at=datetime(2020, 1, 1, 12, 0, 0), ) repo.save(movie1) # Update with new quality movie2 = Movie( imdb_id=ImdbId("tt1234567"), title=MovieTitle("Test"), quality=Quality.FULL_HD, added_at=datetime(2024, 1, 1, 12, 0, 0), ) repo.save(movie2) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) # The new added_at should be used (since it's a full replacement) assert loaded.quality.value == "1080p" def test_concurrent_saves(self, memory): """Should handle rapid saves.""" repo = JsonMovieRepository() for i in range(100): movie = Movie( imdb_id=ImdbId(f"tt{i:07d}"), title=MovieTitle(f"Movie {i}"), quality=Quality.FULL_HD, ) repo.save(movie) movies = repo.find_all() assert len(movies) == 100 class TestJsonTVShowRepositoryEdgeCases: """Edge case tests for JsonTVShowRepository.""" def test_save_show_with_zero_seasons(self, memory): """Should save show with zero seasons.""" repo = JsonTVShowRepository() show = TVShow( imdb_id=ImdbId("tt1234567"), title="Upcoming Show", seasons_count=0, status=ShowStatus.ONGOING, ) repo.save(show) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.seasons_count == 0 def test_save_show_with_many_seasons(self, memory): """Should save show with many seasons.""" repo = JsonTVShowRepository() show = TVShow( imdb_id=ImdbId("tt1234567"), title="Long Running Show", seasons_count=100, status=ShowStatus.ONGOING, ) repo.save(show) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.seasons_count == 100 def test_save_show_with_all_statuses(self, memory): """Should save shows with all status types.""" repo = JsonTVShowRepository() for i, status in enumerate( [ShowStatus.ONGOING, ShowStatus.ENDED, ShowStatus.UNKNOWN] ): show = TVShow( imdb_id=ImdbId(f"tt{i:07d}"), title=f"Show {i}", seasons_count=1, status=status, ) repo.save(show) loaded = repo.find_by_imdb_id(ImdbId(f"tt{i:07d}")) assert loaded.status == status def test_save_show_with_unicode_title(self, memory): """Should save show with unicode title.""" repo = JsonTVShowRepository() show = TVShow( imdb_id=ImdbId("tt1234567"), title="日本のドラマ", seasons_count=1, status=ShowStatus.ONGOING, ) repo.save(show) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.title == "日本のドラマ" def test_save_show_with_first_air_date(self, memory): """Should save show with first air date.""" repo = JsonTVShowRepository() show = TVShow( imdb_id=ImdbId("tt1234567"), title="Test Show", seasons_count=1, status=ShowStatus.ONGOING, first_air_date="2024-01-15", ) repo.save(show) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.first_air_date == "2024-01-15" def test_find_all_empty(self, memory): """Should return empty list for empty library.""" repo = JsonTVShowRepository() memory.ltm.library["tv_shows"] = [] shows = repo.find_all() assert shows == [] def test_update_show_seasons(self, memory): """Should update show seasons count.""" repo = JsonTVShowRepository() # Save initial show1 = TVShow( imdb_id=ImdbId("tt1234567"), title="Test Show", seasons_count=5, status=ShowStatus.ONGOING, ) repo.save(show1) # Update seasons show2 = TVShow( imdb_id=ImdbId("tt1234567"), title="Test Show", seasons_count=6, status=ShowStatus.ONGOING, ) repo.save(show2) loaded = repo.find_by_imdb_id(ImdbId("tt1234567")) assert loaded.seasons_count == 6 class TestJsonSubtitleRepositoryEdgeCases: """Edge case tests for JsonSubtitleRepository.""" def test_save_subtitle_with_large_timing_offset(self, memory): """Should save subtitle with large timing offset.""" repo = JsonSubtitleRepository() subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath("/subs/test.srt"), timing_offset=TimingOffset(3600000), # 1 hour ) repo.save(subtitle) results = repo.find_by_media(ImdbId("tt1234567")) assert results[0].timing_offset.milliseconds == 3600000 def test_save_subtitle_with_negative_timing_offset(self, memory): """Should save subtitle with negative timing offset.""" repo = JsonSubtitleRepository() subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath("/subs/test.srt"), timing_offset=TimingOffset(-5000), ) repo.save(subtitle) results = repo.find_by_media(ImdbId("tt1234567")) assert results[0].timing_offset.milliseconds == -5000 def test_find_by_media_multiple_languages(self, memory): """Should find subtitles for multiple languages.""" repo = JsonSubtitleRepository() # Only use existing languages for lang in [Language.ENGLISH, Language.FRENCH]: subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=lang, format=SubtitleFormat.SRT, file_path=FilePath(f"/subs/test.{lang.value}.srt"), ) repo.save(subtitle) all_subs = repo.find_by_media(ImdbId("tt1234567")) en_subs = repo.find_by_media(ImdbId("tt1234567"), language=Language.ENGLISH) assert len(all_subs) == 2 assert len(en_subs) == 1 def test_find_by_media_specific_episode(self, memory): """Should find subtitle for specific episode.""" repo = JsonSubtitleRepository() # Add subtitles for multiple episodes for ep in range(1, 4): subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath(f"/subs/s01e{ep:02d}.srt"), season_number=1, episode_number=ep, ) repo.save(subtitle) results = repo.find_by_media( ImdbId("tt1234567"), season=1, episode=2, ) assert len(results) == 1 assert results[0].episode_number == 2 def test_find_by_media_season_only(self, memory): """Should find all subtitles for a season.""" repo = JsonSubtitleRepository() # Add subtitles for multiple seasons for season in [1, 2]: for ep in range(1, 3): subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath(f"/subs/s{season:02d}e{ep:02d}.srt"), season_number=season, episode_number=ep, ) repo.save(subtitle) results = repo.find_by_media(ImdbId("tt1234567"), season=1) assert len(results) == 2 def test_delete_subtitle_by_path(self, memory): """Should delete subtitle by file path.""" repo = JsonSubtitleRepository() sub1 = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath("/subs/test1.srt"), ) sub2 = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.FRENCH, format=SubtitleFormat.SRT, file_path=FilePath("/subs/test2.srt"), ) repo.save(sub1) repo.save(sub2) result = repo.delete(sub1) assert result is True remaining = repo.find_by_media(ImdbId("tt1234567")) assert len(remaining) == 1 assert remaining[0].language == Language.FRENCH def test_save_subtitle_with_all_metadata(self, memory): """Should save subtitle with all metadata fields.""" repo = JsonSubtitleRepository() subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath("/subs/test.srt"), season_number=1, episode_number=5, timing_offset=TimingOffset(500), hearing_impaired=True, forced=True, source="OpenSubtitles", uploader="user123", download_count=10000, rating=9.5, ) repo.save(subtitle) results = repo.find_by_media(ImdbId("tt1234567")) loaded = results[0] assert loaded.hearing_impaired is True assert loaded.forced is True assert loaded.source == "OpenSubtitles" assert loaded.uploader == "user123" assert loaded.download_count == 10000 assert loaded.rating == 9.5 def test_save_subtitle_with_unicode_path(self, memory): """Should save subtitle with unicode in path.""" repo = JsonSubtitleRepository() subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.FRENCH, # Use existing language format=SubtitleFormat.SRT, file_path=FilePath("/subs/日本語字幕.srt"), ) repo.save(subtitle) results = repo.find_by_media(ImdbId("tt1234567")) assert "日本語" in str(results[0].file_path) def test_find_by_media_no_results(self, memory): """Should return empty list when no subtitles found.""" repo = JsonSubtitleRepository() results = repo.find_by_media(ImdbId("tt9999999")) assert results == [] def test_find_by_media_wrong_language(self, memory): """Should return empty when language doesn't match.""" repo = JsonSubtitleRepository() subtitle = Subtitle( media_imdb_id=ImdbId("tt1234567"), language=Language.ENGLISH, format=SubtitleFormat.SRT, file_path=FilePath("/subs/test.srt"), ) repo.save(subtitle) results = repo.find_by_media(ImdbId("tt1234567"), language=Language.FRENCH) assert results == []