"""File manager for filesystem operations.""" import logging import os import shutil from enum import Enum from pathlib import Path from typing import Any from infrastructure.persistence import get_memory from .exceptions import PathTraversalError logger = logging.getLogger(__name__) class FolderName(Enum): """Types of folders that can be managed.""" DOWNLOAD = "download" TVSHOW = "tvshow" MOVIE = "movie" TORRENT = "torrent" class FileManager: """ File manager for filesystem operations. Handles folder configuration, listing, and file operations with security checks to prevent path traversal attacks. """ def set_folder_path(self, folder_name: str, path_value: str) -> dict[str, Any]: """ Set a folder path in the configuration. Validates that the path exists, is a directory, and is readable. Args: folder_name: Name of folder (download, tvshow, movie, torrent). path_value: Absolute path to the folder. Returns: Dict with status or error information. """ try: self._validate_folder_name(folder_name) path_obj = Path(path_value).resolve() if not path_obj.exists(): logger.warning(f"Path does not exist: {path_value}") return { "error": "invalid_path", "message": f"Path does not exist: {path_value}", } if not path_obj.is_dir(): logger.warning(f"Path is not a directory: {path_value}") return { "error": "invalid_path", "message": f"Path is not a directory: {path_value}", } if not os.access(path_obj, os.R_OK): logger.warning(f"Path is not readable: {path_value}") return { "error": "permission_denied", "message": f"Path is not readable: {path_value}", } memory = get_memory() memory.ltm.set_config(f"{folder_name}_folder", str(path_obj)) memory.save() logger.info(f"Set {folder_name}_folder to: {path_obj}") return {"status": "ok", "folder_name": folder_name, "path": str(path_obj)} except ValueError as e: logger.error(f"Validation error: {e}") return {"error": "validation_failed", "message": str(e)} except Exception as e: logger.error(f"Unexpected error setting path: {e}", exc_info=True) return {"error": "internal_error", "message": "Failed to set path"} def list_folder( # noqa: PLR0911 self, folder_type: str, path: str = "." ) -> dict[str, Any]: """ List contents of a configured folder. Includes security checks to prevent path traversal. Args: folder_type: Type of folder (download, tvshow, movie, torrent). path: Relative path within the folder (default: root). Returns: Dict with folder contents or error information. """ try: self._validate_folder_name(folder_type) safe_path = self._sanitize_path(path) memory = get_memory() folder_key = f"{folder_type}_folder" folder_path = memory.ltm.get_config(folder_key) if not folder_path: logger.warning(f"Folder not configured: {folder_type}") return { "error": "folder_not_set", "message": f"{folder_type.capitalize()} folder not configured.", } root = Path(folder_path) target = root / safe_path if not self._is_safe_path(root, target): logger.warning(f"Path traversal attempt: {path}") return { "error": "forbidden", "message": "Access denied: path outside allowed directory", } if not target.exists(): logger.warning(f"Path does not exist: {target}") return { "error": "not_found", "message": f"Path does not exist: {safe_path}", } if not target.is_dir(): logger.warning(f"Path is not a directory: {target}") return { "error": "not_a_directory", "message": f"Path is not a directory: {safe_path}", } try: entries = [entry.name for entry in target.iterdir()] logger.debug(f"Listed {len(entries)} entries in {target}") return { "status": "ok", "folder_type": folder_type, "path": safe_path, "entries": sorted(entries), "count": len(entries), } except PermissionError: logger.warning(f"Permission denied: {target}") return { "error": "permission_denied", "message": f"Permission denied: {safe_path}", } except PathTraversalError as e: logger.warning(f"Path traversal attempt: {e}") return {"error": "forbidden", "message": str(e)} except ValueError as e: logger.error(f"Validation error: {e}") return {"error": "validation_failed", "message": str(e)} except Exception as e: logger.error(f"Unexpected error listing folder: {e}", exc_info=True) return {"error": "internal_error", "message": "Failed to list folder"} def move_file( # noqa: PLR0911 self, source: str, destination: str ) -> dict[str, Any]: """ Move a file from one location to another. Includes validation and verification after move. Args: source: Source file path. destination: Destination file path. Returns: Dict with status or error information. """ try: source_path = Path(source).resolve() dest_path = Path(destination).resolve() logger.info(f"Moving file: {source_path} -> {dest_path}") if not source_path.exists(): return { "error": "source_not_found", "message": f"Source does not exist: {source}", } if not source_path.is_file(): return { "error": "source_not_file", "message": f"Source is not a file: {source}", } source_size = source_path.stat().st_size dest_parent = dest_path.parent if not dest_parent.exists(): return { "error": "destination_dir_not_found", "message": f"Destination directory does not exist: {dest_parent}", } if dest_path.exists(): return { "error": "destination_exists", "message": f"Destination already exists: {destination}", } shutil.move(str(source_path), str(dest_path)) # Verify move if not dest_path.exists(): return { "error": "move_verification_failed", "message": "File was not moved successfully", } dest_size = dest_path.stat().st_size if dest_size != source_size: return { "error": "size_mismatch", "message": "File size mismatch after move", } logger.info(f"File moved successfully: {dest_path.name}") return { "status": "ok", "source": str(source_path), "destination": str(dest_path), "filename": dest_path.name, "size": dest_size, } except Exception as e: logger.error(f"Error moving file: {e}", exc_info=True) return {"error": "move_failed", "message": str(e)} def _validate_folder_name(self, folder_name: str) -> bool: """ Validate folder name against allowed values. Args: folder_name: Name to validate. Returns: True if valid. Raises: ValueError: If folder name is invalid. """ valid_names = [fn.value for fn in FolderName] if folder_name not in valid_names: raise ValueError( f"Invalid folder_name '{folder_name}'. " f"Must be one of: {', '.join(valid_names)}" ) return True def _sanitize_path(self, path: str) -> str: """ Sanitize path to prevent path traversal attacks. Args: path: Path to sanitize. Returns: Sanitized path. Raises: PathTraversalError: If path contains traversal attempts. """ normalized = os.path.normpath(path) if os.path.isabs(normalized): raise PathTraversalError("Absolute paths are not allowed") if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized: raise PathTraversalError("Parent directory references not allowed") if "\x00" in normalized: raise PathTraversalError("Null bytes in path not allowed") return normalized def _is_safe_path(self, base_path: Path, target_path: Path) -> bool: """ Check if target path is within base path. Args: base_path: The allowed base directory. target_path: The path to check. Returns: True if target is within base, False otherwise. """ try: base_resolved = base_path.resolve() target_resolved = target_path.resolve() target_resolved.relative_to(base_resolved) return True except (ValueError, OSError): return False