514 lines
16 KiB
Python
514 lines
16 KiB
Python
"""Edge case tests for JSON repositories."""
|
|
|
|
from datetime import datetime
|
|
|
|
from domain.movies.entities import Movie
|
|
from domain.movies.value_objects import MovieTitle, Quality
|
|
from domain.shared.value_objects import FilePath, FileSize, ImdbId
|
|
from domain.subtitles.entities import Subtitle
|
|
from domain.subtitles.value_objects import Language, SubtitleFormat, TimingOffset
|
|
from domain.tv_shows.entities import TVShow
|
|
from domain.tv_shows.value_objects import ShowStatus
|
|
from infrastructure.persistence.json import (
|
|
JsonMovieRepository,
|
|
JsonSubtitleRepository,
|
|
JsonTVShowRepository,
|
|
)
|
|
|
|
|
|
class TestJsonMovieRepositoryEdgeCases:
|
|
"""Edge case tests for JsonMovieRepository."""
|
|
|
|
def test_save_movie_with_unicode_title(self, memory):
|
|
"""Should save movie with unicode title."""
|
|
repo = JsonMovieRepository()
|
|
movie = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("千と千尋の神隠し"),
|
|
quality=Quality.FULL_HD,
|
|
)
|
|
|
|
repo.save(movie)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert loaded.title.value == "千と千尋の神隠し"
|
|
|
|
def test_save_movie_with_special_chars_in_path(self, memory):
|
|
"""Should save movie with special characters in path."""
|
|
repo = JsonMovieRepository()
|
|
movie = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("Test"),
|
|
quality=Quality.FULL_HD,
|
|
file_path=FilePath("/movies/Test (2024) [1080p] {x265}.mkv"),
|
|
)
|
|
|
|
repo.save(movie)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert "[1080p]" in str(loaded.file_path)
|
|
|
|
def test_save_movie_with_very_long_title(self, memory):
|
|
"""Should save movie with very long title."""
|
|
repo = JsonMovieRepository()
|
|
long_title = "A" * 500
|
|
movie = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle(long_title),
|
|
quality=Quality.FULL_HD,
|
|
)
|
|
|
|
repo.save(movie)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert len(loaded.title.value) == 500
|
|
|
|
def test_save_movie_with_zero_file_size(self, memory):
|
|
"""Should save movie with zero file size."""
|
|
repo = JsonMovieRepository()
|
|
movie = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("Test"),
|
|
quality=Quality.FULL_HD,
|
|
file_size=FileSize(0),
|
|
)
|
|
|
|
repo.save(movie)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
# May be None or 0 depending on implementation
|
|
assert loaded.file_size is None or loaded.file_size.bytes == 0
|
|
|
|
def test_save_movie_with_very_large_file_size(self, memory):
|
|
"""Should save movie with very large file size."""
|
|
repo = JsonMovieRepository()
|
|
large_size = 100 * 1024 * 1024 * 1024 # 100 GB
|
|
movie = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("Test"),
|
|
quality=Quality.UHD_4K, # Use valid quality enum
|
|
file_size=FileSize(large_size),
|
|
)
|
|
|
|
repo.save(movie)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert loaded.file_size.bytes == large_size
|
|
|
|
def test_find_all_with_corrupted_entry(self, memory):
|
|
"""Should handle corrupted entries gracefully."""
|
|
# Manually add corrupted data with valid IMDb IDs
|
|
memory.ltm.library["movies"] = [
|
|
{
|
|
"imdb_id": "tt1234567",
|
|
"title": "Valid",
|
|
"quality": "1080p",
|
|
"added_at": datetime.now().isoformat(),
|
|
},
|
|
{"imdb_id": "tt2345678"}, # Missing required fields
|
|
{
|
|
"imdb_id": "tt3456789",
|
|
"title": "Also Valid",
|
|
"quality": "720p",
|
|
"added_at": datetime.now().isoformat(),
|
|
},
|
|
]
|
|
|
|
repo = JsonMovieRepository()
|
|
|
|
# Should either skip corrupted or raise
|
|
try:
|
|
movies = repo.find_all()
|
|
# If it works, should have at least the valid ones
|
|
assert len(movies) >= 1
|
|
except (KeyError, TypeError, Exception):
|
|
# If it raises, that's also acceptable
|
|
pass
|
|
|
|
def test_delete_nonexistent_movie(self, memory):
|
|
"""Should return False for nonexistent movie."""
|
|
repo = JsonMovieRepository()
|
|
|
|
result = repo.delete(ImdbId("tt9999999"))
|
|
|
|
assert result is False
|
|
|
|
def test_delete_from_empty_library(self, memory):
|
|
"""Should handle delete from empty library."""
|
|
repo = JsonMovieRepository()
|
|
memory.ltm.library["movies"] = []
|
|
|
|
result = repo.delete(ImdbId("tt1234567"))
|
|
|
|
assert result is False
|
|
|
|
def test_exists_with_similar_ids(self, memory):
|
|
"""Should distinguish similar IMDb IDs."""
|
|
repo = JsonMovieRepository()
|
|
|
|
movie = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("Test"),
|
|
quality=Quality.FULL_HD,
|
|
)
|
|
repo.save(movie)
|
|
|
|
assert repo.exists(ImdbId("tt1234567")) is True
|
|
assert repo.exists(ImdbId("tt12345678")) is False
|
|
assert repo.exists(ImdbId("tt7654321")) is False
|
|
|
|
def test_save_preserves_added_at(self, memory):
|
|
"""Should preserve original added_at on update."""
|
|
repo = JsonMovieRepository()
|
|
|
|
# Save first version
|
|
movie1 = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("Test"),
|
|
quality=Quality.HD,
|
|
added_at=datetime(2020, 1, 1, 12, 0, 0),
|
|
)
|
|
repo.save(movie1)
|
|
|
|
# Update with new quality
|
|
movie2 = Movie(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title=MovieTitle("Test"),
|
|
quality=Quality.FULL_HD,
|
|
added_at=datetime(2024, 1, 1, 12, 0, 0),
|
|
)
|
|
repo.save(movie2)
|
|
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
# The new added_at should be used (since it's a full replacement)
|
|
assert loaded.quality.value == "1080p"
|
|
|
|
def test_concurrent_saves(self, memory):
|
|
"""Should handle rapid saves."""
|
|
repo = JsonMovieRepository()
|
|
|
|
for i in range(100):
|
|
movie = Movie(
|
|
imdb_id=ImdbId(f"tt{i:07d}"),
|
|
title=MovieTitle(f"Movie {i}"),
|
|
quality=Quality.FULL_HD,
|
|
)
|
|
repo.save(movie)
|
|
|
|
movies = repo.find_all()
|
|
assert len(movies) == 100
|
|
|
|
|
|
class TestJsonTVShowRepositoryEdgeCases:
|
|
"""Edge case tests for JsonTVShowRepository."""
|
|
|
|
def test_save_show_with_zero_seasons(self, memory):
|
|
"""Should save show with zero seasons."""
|
|
repo = JsonTVShowRepository()
|
|
show = TVShow(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title="Upcoming Show",
|
|
seasons_count=0,
|
|
status=ShowStatus.ONGOING,
|
|
)
|
|
|
|
repo.save(show)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert loaded.seasons_count == 0
|
|
|
|
def test_save_show_with_many_seasons(self, memory):
|
|
"""Should save show with many seasons."""
|
|
repo = JsonTVShowRepository()
|
|
show = TVShow(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title="Long Running Show",
|
|
seasons_count=100,
|
|
status=ShowStatus.ONGOING,
|
|
)
|
|
|
|
repo.save(show)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert loaded.seasons_count == 100
|
|
|
|
def test_save_show_with_all_statuses(self, memory):
|
|
"""Should save shows with all status types."""
|
|
repo = JsonTVShowRepository()
|
|
|
|
for i, status in enumerate(
|
|
[ShowStatus.ONGOING, ShowStatus.ENDED, ShowStatus.UNKNOWN]
|
|
):
|
|
show = TVShow(
|
|
imdb_id=ImdbId(f"tt{i:07d}"),
|
|
title=f"Show {i}",
|
|
seasons_count=1,
|
|
status=status,
|
|
)
|
|
repo.save(show)
|
|
loaded = repo.find_by_imdb_id(ImdbId(f"tt{i:07d}"))
|
|
assert loaded.status == status
|
|
|
|
def test_save_show_with_unicode_title(self, memory):
|
|
"""Should save show with unicode title."""
|
|
repo = JsonTVShowRepository()
|
|
show = TVShow(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title="日本のドラマ",
|
|
seasons_count=1,
|
|
status=ShowStatus.ONGOING,
|
|
)
|
|
|
|
repo.save(show)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert loaded.title == "日本のドラマ"
|
|
|
|
def test_save_show_with_first_air_date(self, memory):
|
|
"""Should save show with first air date."""
|
|
repo = JsonTVShowRepository()
|
|
show = TVShow(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title="Test Show",
|
|
seasons_count=1,
|
|
status=ShowStatus.ONGOING,
|
|
first_air_date="2024-01-15",
|
|
)
|
|
|
|
repo.save(show)
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
|
|
assert loaded.first_air_date == "2024-01-15"
|
|
|
|
def test_find_all_empty(self, memory):
|
|
"""Should return empty list for empty library."""
|
|
repo = JsonTVShowRepository()
|
|
memory.ltm.library["tv_shows"] = []
|
|
|
|
shows = repo.find_all()
|
|
|
|
assert shows == []
|
|
|
|
def test_update_show_seasons(self, memory):
|
|
"""Should update show seasons count."""
|
|
repo = JsonTVShowRepository()
|
|
|
|
# Save initial
|
|
show1 = TVShow(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title="Test Show",
|
|
seasons_count=5,
|
|
status=ShowStatus.ONGOING,
|
|
)
|
|
repo.save(show1)
|
|
|
|
# Update seasons
|
|
show2 = TVShow(
|
|
imdb_id=ImdbId("tt1234567"),
|
|
title="Test Show",
|
|
seasons_count=6,
|
|
status=ShowStatus.ONGOING,
|
|
)
|
|
repo.save(show2)
|
|
|
|
loaded = repo.find_by_imdb_id(ImdbId("tt1234567"))
|
|
assert loaded.seasons_count == 6
|
|
|
|
|
|
class TestJsonSubtitleRepositoryEdgeCases:
|
|
"""Edge case tests for JsonSubtitleRepository."""
|
|
|
|
def test_save_subtitle_with_large_timing_offset(self, memory):
|
|
"""Should save subtitle with large timing offset."""
|
|
repo = JsonSubtitleRepository()
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/test.srt"),
|
|
timing_offset=TimingOffset(3600000), # 1 hour
|
|
)
|
|
|
|
repo.save(subtitle)
|
|
results = repo.find_by_media(ImdbId("tt1234567"))
|
|
|
|
assert results[0].timing_offset.milliseconds == 3600000
|
|
|
|
def test_save_subtitle_with_negative_timing_offset(self, memory):
|
|
"""Should save subtitle with negative timing offset."""
|
|
repo = JsonSubtitleRepository()
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/test.srt"),
|
|
timing_offset=TimingOffset(-5000),
|
|
)
|
|
|
|
repo.save(subtitle)
|
|
results = repo.find_by_media(ImdbId("tt1234567"))
|
|
|
|
assert results[0].timing_offset.milliseconds == -5000
|
|
|
|
def test_find_by_media_multiple_languages(self, memory):
|
|
"""Should find subtitles for multiple languages."""
|
|
repo = JsonSubtitleRepository()
|
|
|
|
# Only use existing languages
|
|
for lang in [Language.ENGLISH, Language.FRENCH]:
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=lang,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath(f"/subs/test.{lang.value}.srt"),
|
|
)
|
|
repo.save(subtitle)
|
|
|
|
all_subs = repo.find_by_media(ImdbId("tt1234567"))
|
|
en_subs = repo.find_by_media(ImdbId("tt1234567"), language=Language.ENGLISH)
|
|
|
|
assert len(all_subs) == 2
|
|
assert len(en_subs) == 1
|
|
|
|
def test_find_by_media_specific_episode(self, memory):
|
|
"""Should find subtitle for specific episode."""
|
|
repo = JsonSubtitleRepository()
|
|
|
|
# Add subtitles for multiple episodes
|
|
for ep in range(1, 4):
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath(f"/subs/s01e{ep:02d}.srt"),
|
|
season_number=1,
|
|
episode_number=ep,
|
|
)
|
|
repo.save(subtitle)
|
|
|
|
results = repo.find_by_media(
|
|
ImdbId("tt1234567"),
|
|
season=1,
|
|
episode=2,
|
|
)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].episode_number == 2
|
|
|
|
def test_find_by_media_season_only(self, memory):
|
|
"""Should find all subtitles for a season."""
|
|
repo = JsonSubtitleRepository()
|
|
|
|
# Add subtitles for multiple seasons
|
|
for season in [1, 2]:
|
|
for ep in range(1, 3):
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath(f"/subs/s{season:02d}e{ep:02d}.srt"),
|
|
season_number=season,
|
|
episode_number=ep,
|
|
)
|
|
repo.save(subtitle)
|
|
|
|
results = repo.find_by_media(ImdbId("tt1234567"), season=1)
|
|
|
|
assert len(results) == 2
|
|
|
|
def test_delete_subtitle_by_path(self, memory):
|
|
"""Should delete subtitle by file path."""
|
|
repo = JsonSubtitleRepository()
|
|
|
|
sub1 = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/test1.srt"),
|
|
)
|
|
sub2 = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.FRENCH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/test2.srt"),
|
|
)
|
|
|
|
repo.save(sub1)
|
|
repo.save(sub2)
|
|
|
|
result = repo.delete(sub1)
|
|
|
|
assert result is True
|
|
remaining = repo.find_by_media(ImdbId("tt1234567"))
|
|
assert len(remaining) == 1
|
|
assert remaining[0].language == Language.FRENCH
|
|
|
|
def test_save_subtitle_with_all_metadata(self, memory):
|
|
"""Should save subtitle with all metadata fields."""
|
|
repo = JsonSubtitleRepository()
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/test.srt"),
|
|
season_number=1,
|
|
episode_number=5,
|
|
timing_offset=TimingOffset(500),
|
|
hearing_impaired=True,
|
|
forced=True,
|
|
source="OpenSubtitles",
|
|
uploader="user123",
|
|
download_count=10000,
|
|
rating=9.5,
|
|
)
|
|
|
|
repo.save(subtitle)
|
|
results = repo.find_by_media(ImdbId("tt1234567"))
|
|
|
|
loaded = results[0]
|
|
assert loaded.hearing_impaired is True
|
|
assert loaded.forced is True
|
|
assert loaded.source == "OpenSubtitles"
|
|
assert loaded.uploader == "user123"
|
|
assert loaded.download_count == 10000
|
|
assert loaded.rating == 9.5
|
|
|
|
def test_save_subtitle_with_unicode_path(self, memory):
|
|
"""Should save subtitle with unicode in path."""
|
|
repo = JsonSubtitleRepository()
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.FRENCH, # Use existing language
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/日本語字幕.srt"),
|
|
)
|
|
|
|
repo.save(subtitle)
|
|
results = repo.find_by_media(ImdbId("tt1234567"))
|
|
|
|
assert "日本語" in str(results[0].file_path)
|
|
|
|
def test_find_by_media_no_results(self, memory):
|
|
"""Should return empty list when no subtitles found."""
|
|
repo = JsonSubtitleRepository()
|
|
|
|
results = repo.find_by_media(ImdbId("tt9999999"))
|
|
|
|
assert results == []
|
|
|
|
def test_find_by_media_wrong_language(self, memory):
|
|
"""Should return empty when language doesn't match."""
|
|
repo = JsonSubtitleRepository()
|
|
subtitle = Subtitle(
|
|
media_imdb_id=ImdbId("tt1234567"),
|
|
language=Language.ENGLISH,
|
|
format=SubtitleFormat.SRT,
|
|
file_path=FilePath("/subs/test.srt"),
|
|
)
|
|
repo.save(subtitle)
|
|
|
|
results = repo.find_by_media(ImdbId("tt1234567"), language=Language.FRENCH)
|
|
|
|
assert results == []
|