145 lines
4.4 KiB
Python
145 lines
4.4 KiB
Python
"""JSON-based movie repository implementation."""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from alfred.domain.movies.entities import Movie
|
|
from alfred.domain.movies.repositories import MovieRepository
|
|
from alfred.domain.movies.value_objects import MovieTitle, Quality, ReleaseYear
|
|
from alfred.domain.shared.value_objects import FilePath, FileSize, ImdbId
|
|
from alfred.infrastructure.persistence import get_memory
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class JsonMovieRepository(MovieRepository):
|
|
"""
|
|
JSON-based implementation of MovieRepository.
|
|
|
|
Stores movies in the LTM library using the memory context.
|
|
"""
|
|
|
|
def save(self, movie: Movie) -> None:
|
|
"""
|
|
Save a movie to the repository.
|
|
|
|
Updates existing movie if IMDb ID matches.
|
|
|
|
Args:
|
|
movie: Movie entity to save.
|
|
"""
|
|
memory = get_memory()
|
|
movies = memory.ltm.library.get("movies", [])
|
|
|
|
# Remove existing movie with same IMDb ID
|
|
movies = [m for m in movies if m.get("imdb_id") != str(movie.imdb_id)]
|
|
|
|
movies.append(self._to_dict(movie))
|
|
|
|
memory.ltm.library["movies"] = movies
|
|
memory.save()
|
|
logger.debug(f"Saved movie: {movie.imdb_id}")
|
|
|
|
def find_by_imdb_id(self, imdb_id: ImdbId) -> Movie | None:
|
|
"""
|
|
Find a movie by its IMDb ID.
|
|
|
|
Args:
|
|
imdb_id: IMDb ID to search for.
|
|
|
|
Returns:
|
|
Movie if found, None otherwise.
|
|
"""
|
|
memory = get_memory()
|
|
movies = memory.ltm.library.get("movies", [])
|
|
|
|
for movie_dict in movies:
|
|
if movie_dict.get("imdb_id") == str(imdb_id):
|
|
return self._from_dict(movie_dict)
|
|
|
|
return None
|
|
|
|
def find_all(self) -> list[Movie]:
|
|
"""
|
|
Get all movies in the repository.
|
|
|
|
Returns:
|
|
List of all Movie entities.
|
|
"""
|
|
memory = get_memory()
|
|
movies_dict = memory.ltm.library.get("movies", [])
|
|
return [self._from_dict(m) for m in movies_dict]
|
|
|
|
def delete(self, imdb_id: ImdbId) -> bool:
|
|
"""
|
|
Delete a movie from the repository.
|
|
|
|
Args:
|
|
imdb_id: IMDb ID of movie to delete.
|
|
|
|
Returns:
|
|
True if deleted, False if not found.
|
|
"""
|
|
memory = get_memory()
|
|
movies = memory.ltm.library.get("movies", [])
|
|
initial_count = len(movies)
|
|
|
|
movies = [m for m in movies if m.get("imdb_id") != str(imdb_id)]
|
|
|
|
if len(movies) < initial_count:
|
|
memory.ltm.library["movies"] = movies
|
|
memory.save()
|
|
logger.debug(f"Deleted movie: {imdb_id}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def exists(self, imdb_id: ImdbId) -> bool:
|
|
"""
|
|
Check if a movie exists in the repository.
|
|
|
|
Args:
|
|
imdb_id: IMDb ID to check.
|
|
|
|
Returns:
|
|
True if exists, False otherwise.
|
|
"""
|
|
return self.find_by_imdb_id(imdb_id) is not None
|
|
|
|
def _to_dict(self, movie: Movie) -> dict[str, Any]:
|
|
"""Convert Movie entity to dict for storage."""
|
|
return {
|
|
"imdb_id": str(movie.imdb_id),
|
|
"title": movie.title.value,
|
|
"release_year": movie.release_year.value if movie.release_year else None,
|
|
"quality": movie.quality.value,
|
|
"file_path": str(movie.file_path) if movie.file_path else None,
|
|
"file_size": movie.file_size.bytes if movie.file_size else None,
|
|
"tmdb_id": movie.tmdb_id,
|
|
"added_at": movie.added_at.isoformat(),
|
|
}
|
|
|
|
def _from_dict(self, data: dict[str, Any]) -> Movie:
|
|
"""Convert dict from storage to Movie entity."""
|
|
# Parse quality string to enum
|
|
quality_str = data.get("quality", "unknown")
|
|
quality = Quality.from_string(quality_str)
|
|
|
|
return Movie(
|
|
imdb_id=ImdbId(data["imdb_id"]),
|
|
title=MovieTitle(data["title"]),
|
|
release_year=(
|
|
ReleaseYear(data["release_year"]) if data.get("release_year") else None
|
|
),
|
|
quality=quality,
|
|
file_path=FilePath(data["file_path"]) if data.get("file_path") else None,
|
|
file_size=FileSize(data["file_size"]) if data.get("file_size") else None,
|
|
tmdb_id=data.get("tmdb_id"),
|
|
added_at=(
|
|
datetime.fromisoformat(data["added_at"])
|
|
if data.get("added_at")
|
|
else datetime.now()
|
|
),
|
|
)
|