Files
alfred/brain/infrastructure/filesystem/file_manager.py

312 lines
10 KiB
Python

"""File manager for filesystem operations."""
import logging
import os
import shutil
from enum import Enum
from pathlib import Path
from typing import Any
from infrastructure.persistence import get_memory
from .exceptions import PathTraversalError
logger = logging.getLogger(__name__)
class FolderName(Enum):
"""Types of folders that can be managed."""
DOWNLOAD = "download"
TVSHOW = "tvshow"
MOVIE = "movie"
TORRENT = "torrent"
class FileManager:
"""
File manager for filesystem operations.
Handles folder configuration, listing, and file operations
with security checks to prevent path traversal attacks.
"""
def set_folder_path(self, folder_name: str, path_value: str) -> dict[str, Any]:
"""
Set a folder path in the configuration.
Validates that the path exists, is a directory, and is readable.
Args:
folder_name: Name of folder (download, tvshow, movie, torrent).
path_value: Absolute path to the folder.
Returns:
Dict with status or error information.
"""
try:
self._validate_folder_name(folder_name)
path_obj = Path(path_value).resolve()
if not path_obj.exists():
logger.warning(f"Path does not exist: {path_value}")
return {
"error": "invalid_path",
"message": f"Path does not exist: {path_value}",
}
if not path_obj.is_dir():
logger.warning(f"Path is not a directory: {path_value}")
return {
"error": "invalid_path",
"message": f"Path is not a directory: {path_value}",
}
if not os.access(path_obj, os.R_OK):
logger.warning(f"Path is not readable: {path_value}")
return {
"error": "permission_denied",
"message": f"Path is not readable: {path_value}",
}
memory = get_memory()
memory.ltm.set_config(f"{folder_name}_folder", str(path_obj))
memory.save()
logger.info(f"Set {folder_name}_folder to: {path_obj}")
return {"status": "ok", "folder_name": folder_name, "path": str(path_obj)}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error setting path: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to set path"}
def list_folder( # noqa: PLR0911
self, folder_type: str, path: str = "."
) -> dict[str, Any]:
"""
List contents of a configured folder.
Includes security checks to prevent path traversal.
Args:
folder_type: Type of folder (download, tvshow, movie, torrent).
path: Relative path within the folder (default: root).
Returns:
Dict with folder contents or error information.
"""
try:
self._validate_folder_name(folder_type)
safe_path = self._sanitize_path(path)
memory = get_memory()
folder_key = f"{folder_type}_folder"
folder_path = memory.ltm.get_config(folder_key)
if not folder_path:
logger.warning(f"Folder not configured: {folder_type}")
return {
"error": "folder_not_set",
"message": f"{folder_type.capitalize()} folder not configured.",
}
root = Path(folder_path)
target = root / safe_path
if not self._is_safe_path(root, target):
logger.warning(f"Path traversal attempt: {path}")
return {
"error": "forbidden",
"message": "Access denied: path outside allowed directory",
}
if not target.exists():
logger.warning(f"Path does not exist: {target}")
return {
"error": "not_found",
"message": f"Path does not exist: {safe_path}",
}
if not target.is_dir():
logger.warning(f"Path is not a directory: {target}")
return {
"error": "not_a_directory",
"message": f"Path is not a directory: {safe_path}",
}
try:
entries = [entry.name for entry in target.iterdir()]
logger.debug(f"Listed {len(entries)} entries in {target}")
return {
"status": "ok",
"folder_type": folder_type,
"path": safe_path,
"entries": sorted(entries),
"count": len(entries),
}
except PermissionError:
logger.warning(f"Permission denied: {target}")
return {
"error": "permission_denied",
"message": f"Permission denied: {safe_path}",
}
except PathTraversalError as e:
logger.warning(f"Path traversal attempt: {e}")
return {"error": "forbidden", "message": str(e)}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error listing folder: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to list folder"}
def move_file( # noqa: PLR0911
self, source: str, destination: str
) -> dict[str, Any]:
"""
Move a file from one location to another.
Includes validation and verification after move.
Args:
source: Source file path.
destination: Destination file path.
Returns:
Dict with status or error information.
"""
try:
source_path = Path(source).resolve()
dest_path = Path(destination).resolve()
logger.info(f"Moving file: {source_path} -> {dest_path}")
if not source_path.exists():
return {
"error": "source_not_found",
"message": f"Source does not exist: {source}",
}
if not source_path.is_file():
return {
"error": "source_not_file",
"message": f"Source is not a file: {source}",
}
source_size = source_path.stat().st_size
dest_parent = dest_path.parent
if not dest_parent.exists():
return {
"error": "destination_dir_not_found",
"message": f"Destination directory does not exist: {dest_parent}",
}
if dest_path.exists():
return {
"error": "destination_exists",
"message": f"Destination already exists: {destination}",
}
shutil.move(str(source_path), str(dest_path))
# Verify move
if not dest_path.exists():
return {
"error": "move_verification_failed",
"message": "File was not moved successfully",
}
dest_size = dest_path.stat().st_size
if dest_size != source_size:
return {
"error": "size_mismatch",
"message": "File size mismatch after move",
}
logger.info(f"File moved successfully: {dest_path.name}")
return {
"status": "ok",
"source": str(source_path),
"destination": str(dest_path),
"filename": dest_path.name,
"size": dest_size,
}
except Exception as e:
logger.error(f"Error moving file: {e}", exc_info=True)
return {"error": "move_failed", "message": str(e)}
def _validate_folder_name(self, folder_name: str) -> bool:
"""
Validate folder name against allowed values.
Args:
folder_name: Name to validate.
Returns:
True if valid.
Raises:
ValueError: If folder name is invalid.
"""
valid_names = [fn.value for fn in FolderName]
if folder_name not in valid_names:
raise ValueError(
f"Invalid folder_name '{folder_name}'. "
f"Must be one of: {', '.join(valid_names)}"
)
return True
def _sanitize_path(self, path: str) -> str:
"""
Sanitize path to prevent path traversal attacks.
Args:
path: Path to sanitize.
Returns:
Sanitized path.
Raises:
PathTraversalError: If path contains traversal attempts.
"""
normalized = os.path.normpath(path)
if os.path.isabs(normalized):
raise PathTraversalError("Absolute paths are not allowed")
if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
raise PathTraversalError("Parent directory references not allowed")
if "\x00" in normalized:
raise PathTraversalError("Null bytes in path not allowed")
return normalized
def _is_safe_path(self, base_path: Path, target_path: Path) -> bool:
"""
Check if target path is within base path.
Args:
base_path: The allowed base directory.
target_path: The path to check.
Returns:
True if target is within base, False otherwise.
"""
try:
base_resolved = base_path.resolve()
target_resolved = target_path.resolve()
target_resolved.relative_to(base_resolved)
return True
except (ValueError, OSError):
return False