Files
alfred/alfred/domain/tv_shows/services.py
2025-12-24 07:50:09 +01:00

235 lines
6.8 KiB
Python

"""TV Show domain services - Business logic."""
import logging
import re
from ..shared.value_objects import ImdbId
from .entities import TVShow
from .exceptions import (
TVShowAlreadyExists,
TVShowNotFound,
)
from .repositories import EpisodeRepository, SeasonRepository, TVShowRepository
logger = logging.getLogger(__name__)
class TVShowService:
"""
Domain service for TV show-related business logic.
This service contains business rules that don't naturally fit
within a single entity.
"""
def __init__(
self,
show_repository: TVShowRepository,
season_repository: SeasonRepository | None = None,
episode_repository: EpisodeRepository | None = None,
):
"""
Initialize TV show service.
Args:
show_repository: TV show repository for persistence
season_repository: Optional season repository
episode_repository: Optional episode repository
"""
self.show_repository = show_repository
self.season_repository = season_repository
self.episode_repository = episode_repository
def track_show(self, show: TVShow) -> None:
"""
Start tracking a TV show.
Args:
show: TVShow entity to track
Raises:
TVShowAlreadyExists: If show is already being tracked
"""
if self.show_repository.exists(show.imdb_id):
raise TVShowAlreadyExists(
f"TV show with IMDb ID {show.imdb_id} is already tracked"
)
self.show_repository.save(show)
logger.info(f"Started tracking TV show: {show.title} ({show.imdb_id})")
def get_show(self, imdb_id: ImdbId) -> TVShow:
"""
Get a TV show by IMDb ID.
Args:
imdb_id: IMDb ID of the show
Returns:
TVShow entity
Raises:
TVShowNotFound: If show not found
"""
show = self.show_repository.find_by_imdb_id(imdb_id)
if not show:
raise TVShowNotFound(f"TV show with IMDb ID {imdb_id} not found")
return show
def get_all_shows(self) -> list[TVShow]:
"""
Get all tracked TV shows.
Returns:
List of all TV shows
"""
return self.show_repository.find_all()
def get_ongoing_shows(self) -> list[TVShow]:
"""
Get all ongoing TV shows.
Returns:
List of ongoing TV shows
"""
all_shows = self.show_repository.find_all()
return [show for show in all_shows if show.is_ongoing()]
def get_ended_shows(self) -> list[TVShow]:
"""
Get all ended TV shows.
Returns:
List of ended TV shows
"""
all_shows = self.show_repository.find_all()
return [show for show in all_shows if show.is_ended()]
def update_show(self, show: TVShow) -> None:
"""
Update an existing TV show.
Args:
show: TVShow entity with updated data
Raises:
TVShowNotFound: If show doesn't exist
"""
if not self.show_repository.exists(show.imdb_id):
raise TVShowNotFound(f"TV show with IMDb ID {show.imdb_id} not found")
self.show_repository.save(show)
logger.info(f"Updated TV show: {show.title} ({show.imdb_id})")
def untrack_show(self, imdb_id: ImdbId) -> None:
"""
Stop tracking a TV show.
Args:
imdb_id: IMDb ID of the show to untrack
Raises:
TVShowNotFound: If show not found
"""
if not self.show_repository.delete(imdb_id):
raise TVShowNotFound(f"TV show with IMDb ID {imdb_id} not found")
logger.info(f"Stopped tracking TV show with IMDb ID: {imdb_id}")
def parse_episode_from_filename(self, filename: str) -> tuple[int, int] | None:
"""
Parse season and episode numbers from filename.
Supports formats:
- S01E05
- 1x05
- Season 1 Episode 5
Args:
filename: Filename to parse
Returns:
Tuple of (season, episode) if found, None otherwise
"""
filename_lower = filename.lower()
# Pattern 1: S01E05
pattern1 = r"s(\d{1,2})e(\d{1,2})"
match = re.search(pattern1, filename_lower)
if match:
return (int(match.group(1)), int(match.group(2)))
# Pattern 2: 1x05
pattern2 = r"(\d{1,2})x(\d{1,2})"
match = re.search(pattern2, filename_lower)
if match:
return (int(match.group(1)), int(match.group(2)))
# Pattern 3: Season 1 Episode 5
pattern3 = r"season\s*(\d{1,2})\s*episode\s*(\d{1,2})"
match = re.search(pattern3, filename_lower)
if match:
return (int(match.group(1)), int(match.group(2)))
return None
def validate_episode_file(self, filename: str) -> bool:
"""
Validate that a file is a valid episode file.
Args:
filename: Filename to validate
Returns:
True if valid episode file, False otherwise
"""
# Check file extension
valid_extensions = {".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".webm"}
extension = filename[filename.rfind(".") :].lower() if "." in filename else ""
if extension not in valid_extensions:
logger.warning(f"Invalid file extension: {extension}")
return False
# Check if we can parse episode info
episode_info = self.parse_episode_from_filename(filename)
if not episode_info:
logger.warning(f"Could not parse episode info from filename: {filename}")
return False
return True
def find_next_episode(
self, show: TVShow, last_season: int, last_episode: int
) -> tuple[int, int] | None:
"""
Find the next episode to download for a show.
Args:
show: TVShow entity
last_season: Last downloaded season number
last_episode: Last downloaded episode number
Returns:
Tuple of (season, episode) for next episode, or None if show is complete
"""
# If show has ended and we've watched all seasons, no next episode
if show.is_ended() and last_season >= show.seasons_count:
return None
# Simple logic: next episode in same season, or first episode of next season
# This could be enhanced with actual episode counts per season
next_episode = last_episode + 1
next_season = last_season
# Assume max 50 episodes per season (could be improved with actual data)
if next_episode > 50:
next_season += 1
next_episode = 1
# Don't go beyond known seasons
if next_season > show.seasons_count:
return None
return (next_season, next_episode)