Files
alfred/infrastructure/persistence/json/subtitle_repository.py
Francwa 9ca31e45e0 feat!: migrate to OpenAI native tool calls and fix circular deps (#fuck-gemini)
- Fix circular dependencies in agent/tools
- Migrate from custom JSON to OpenAI tool calls format
- Add async streaming (step_stream, complete_stream)
- Simplify prompt system and remove token counting
- Add 5 new API endpoints (/health, /v1/models, /api/memory/*)
- Add 3 new tools (get_torrent_by_index, add_torrent_by_index, set_language)
- Fix all 500 tests and add coverage config (80% threshold)
- Add comprehensive docs (README, pytest guide)

BREAKING: LLM interface changed, memory injection via get_memory()
2025-12-06 19:11:05 +01:00

145 lines
4.7 KiB
Python

"""JSON-based subtitle repository implementation."""
import logging
from typing import Any
from domain.shared.value_objects import FilePath, ImdbId
from domain.subtitles.entities import Subtitle
from domain.subtitles.repositories import SubtitleRepository
from domain.subtitles.value_objects import Language, SubtitleFormat, TimingOffset
from infrastructure.persistence import get_memory
logger = logging.getLogger(__name__)
class JsonSubtitleRepository(SubtitleRepository):
"""
JSON-based implementation of SubtitleRepository.
Stores subtitles in the LTM library using the memory context.
"""
def save(self, subtitle: Subtitle) -> None:
"""
Save a subtitle to the repository.
Multiple subtitles can exist for the same media.
Args:
subtitle: Subtitle entity to save.
"""
memory = get_memory()
subtitles = memory.ltm.library.get("subtitles", [])
subtitles.append(self._to_dict(subtitle))
if "subtitles" not in memory.ltm.library:
memory.ltm.library["subtitles"] = []
memory.ltm.library["subtitles"] = subtitles
memory.save()
logger.debug(f"Saved subtitle for: {subtitle.media_imdb_id}")
def find_by_media(
self,
media_imdb_id: ImdbId,
language: Language | None = None,
season: int | None = None,
episode: int | None = None,
) -> list[Subtitle]:
"""
Find subtitles for a media item.
Args:
media_imdb_id: IMDb ID of the media.
language: Optional language filter.
season: Optional season number filter.
episode: Optional episode number filter.
Returns:
List of matching Subtitle entities.
"""
memory = get_memory()
subtitles = memory.ltm.library.get("subtitles", [])
results = []
for sub_dict in subtitles:
if sub_dict.get("media_imdb_id") != str(media_imdb_id):
continue
if language and sub_dict.get("language") != language.value:
continue
if season is not None and sub_dict.get("season_number") != season:
continue
if episode is not None and sub_dict.get("episode_number") != episode:
continue
results.append(self._from_dict(sub_dict))
return results
def delete(self, subtitle: Subtitle) -> bool:
"""
Delete a subtitle from the repository.
Matches by file path.
Args:
subtitle: Subtitle entity to delete.
Returns:
True if deleted, False if not found.
"""
memory = get_memory()
subtitles = memory.ltm.library.get("subtitles", [])
initial_count = len(subtitles)
subtitles = [
s for s in subtitles if s.get("file_path") != str(subtitle.file_path)
]
if len(subtitles) < initial_count:
memory.ltm.library["subtitles"] = subtitles
memory.save()
logger.debug(f"Deleted subtitle: {subtitle.file_path}")
return True
return False
def _to_dict(self, subtitle: Subtitle) -> dict[str, Any]:
"""Convert Subtitle entity to dict for storage."""
return {
"media_imdb_id": str(subtitle.media_imdb_id),
"language": subtitle.language.value,
"format": subtitle.format.value,
"file_path": str(subtitle.file_path),
"season_number": subtitle.season_number,
"episode_number": subtitle.episode_number,
"timing_offset": subtitle.timing_offset.milliseconds,
"hearing_impaired": subtitle.hearing_impaired,
"forced": subtitle.forced,
"source": subtitle.source,
"uploader": subtitle.uploader,
"download_count": subtitle.download_count,
"rating": subtitle.rating,
}
def _from_dict(self, data: dict[str, Any]) -> Subtitle:
"""Convert dict from storage to Subtitle entity."""
return Subtitle(
media_imdb_id=ImdbId(data["media_imdb_id"]),
language=Language.from_code(data["language"]),
format=SubtitleFormat.from_extension(data["format"]),
file_path=FilePath(data["file_path"]),
season_number=data.get("season_number"),
episode_number=data.get("episode_number"),
timing_offset=TimingOffset(data.get("timing_offset", 0)),
hearing_impaired=data.get("hearing_impaired", False),
forced=data.get("forced", False),
source=data.get("source"),
uploader=data.get("uploader"),
download_count=data.get("download_count"),
rating=data.get("rating"),
)