Files
alfred/infrastructure/filesystem/file_manager.py
Francwa 2c8cdd3ab1 New archi: domain driven development
Working but need to check out code
2025-12-01 07:10:03 +01:00

310 lines
11 KiB
Python

"""File manager - Migrated from agent/tools/filesystem.py with domain logic extracted."""
from typing import Dict, Any, List
from enum import Enum
from pathlib import Path
import logging
import os
import shutil
from .exceptions import FilesystemError, PathTraversalError
from infrastructure.persistence.memory import Memory
logger = logging.getLogger(__name__)
class FolderName(Enum):
"""Types of folders that can be managed."""
DOWNLOAD = "download"
TVSHOW = "tvshow"
MOVIE = "movie"
TORRENT = "torrent"
class FileManager:
"""
File manager for filesystem operations.
Handles folder configuration, listing, and file operations with security.
"""
def __init__(self, memory: Memory):
"""
Initialize file manager.
Args:
memory: Memory instance for folder configuration
"""
self.memory = memory
def set_folder_path(self, folder_name: str, path_value: str) -> Dict[str, Any]:
"""
Set a folder path in the configuration with validation.
Args:
folder_name: Name of folder to set (download, tvshow, movie, torrent)
path_value: Absolute path to the folder
Returns:
Dict with status or error information
"""
try:
# Validate folder name
self._validate_folder_name(folder_name)
# Convert to Path object for better handling
path_obj = Path(path_value).resolve()
# Validate path exists and is a directory
if not path_obj.exists():
logger.warning(f"Path does not exist: {path_value}")
return {
"error": "invalid_path",
"message": f"Path does not exist: {path_value}"
}
if not path_obj.is_dir():
logger.warning(f"Path is not a directory: {path_value}")
return {
"error": "invalid_path",
"message": f"Path is not a directory: {path_value}"
}
# Check if path is readable
if not os.access(path_obj, os.R_OK):
logger.warning(f"Path is not readable: {path_value}")
return {
"error": "permission_denied",
"message": f"Path is not readable: {path_value}"
}
# Store in memory
config = self.memory.get("config", {})
config[f"{folder_name}_folder"] = str(path_obj)
self.memory.set("config", config)
logger.info(f"Set {folder_name}_folder to: {path_obj}")
return {
"status": "ok",
"folder_name": folder_name,
"path": str(path_obj)
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error setting path: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to set path"}
def list_folder(self, folder_type: str, path: str = ".") -> Dict[str, Any]:
"""
List contents of a folder with security checks.
Args:
folder_type: Type of folder to list (download, tvshow, movie, torrent)
path: Relative path within the folder (default: ".")
Returns:
Dict with folder contents or error information
"""
try:
# Validate folder type
self._validate_folder_name(folder_type)
# Sanitize the path
safe_path = self._sanitize_path(path)
# Get root folder from config
folder_key = f"{folder_type}_folder"
config = self.memory.get("config", {})
if folder_key not in config or not config[folder_key]:
logger.warning(f"Folder not configured: {folder_type}")
return {
"error": "folder_not_set",
"message": f"{folder_type.capitalize()} folder not set in config."
}
root = Path(config[folder_key])
target = root / safe_path
# Security check: ensure target is within root
if not self._is_safe_path(root, target):
logger.warning(f"Path traversal attempt detected: {path}")
return {
"error": "forbidden",
"message": "Access denied: path outside allowed directory"
}
# Check if target exists
if not target.exists():
logger.warning(f"Path does not exist: {target}")
return {
"error": "not_found",
"message": f"Path does not exist: {safe_path}"
}
# Check if target is a directory
if not target.is_dir():
logger.warning(f"Path is not a directory: {target}")
return {
"error": "not_a_directory",
"message": f"Path is not a directory: {safe_path}"
}
# List directory contents
try:
entries = [entry.name for entry in target.iterdir()]
logger.debug(f"Listed {len(entries)} entries in {target}")
return {
"status": "ok",
"folder_type": folder_type,
"path": safe_path,
"entries": sorted(entries),
"count": len(entries)
}
except PermissionError:
logger.warning(f"Permission denied accessing: {target}")
return {
"error": "permission_denied",
"message": f"Permission denied accessing: {safe_path}"
}
except PathTraversalError as e:
logger.warning(f"Path traversal attempt: {e}")
return {
"error": "forbidden",
"message": str(e)
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error listing folder: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to list folder"}
def move_file(self, source: str, destination: str) -> Dict[str, Any]:
"""
Move a file from one location to another with safety checks.
Args:
source: Source file path
destination: Destination file path
Returns:
Dict with status or error information
"""
try:
# Convert to Path objects
source_path = Path(source).resolve()
dest_path = Path(destination).resolve()
logger.info(f"Moving file from {source_path} to {dest_path}")
# Validate source
if not source_path.exists():
return {
"error": "source_not_found",
"message": f"Source file does not exist: {source}"
}
if not source_path.is_file():
return {
"error": "source_not_file",
"message": f"Source is not a file: {source}"
}
# Get source file size for verification
source_size = source_path.stat().st_size
# Validate destination
dest_parent = dest_path.parent
if not dest_parent.exists():
return {
"error": "destination_dir_not_found",
"message": f"Destination directory does not exist: {dest_parent}"
}
if dest_path.exists():
return {
"error": "destination_exists",
"message": f"Destination file already exists: {destination}"
}
# Perform move
shutil.move(str(source_path), str(dest_path))
# Verify
if not dest_path.exists():
return {
"error": "move_verification_failed",
"message": "File was not moved successfully"
}
dest_size = dest_path.stat().st_size
if dest_size != source_size:
return {
"error": "size_mismatch",
"message": f"File size mismatch after move"
}
logger.info(f"File successfully moved: {dest_path.name}")
return {
"status": "ok",
"source": str(source_path),
"destination": str(dest_path),
"filename": dest_path.name,
"size": dest_size
}
except Exception as e:
logger.error(f"Error moving file: {e}", exc_info=True)
return {
"error": "move_failed",
"message": str(e)
}
def _validate_folder_name(self, folder_name: str) -> bool:
"""Validate folder name against allowed values."""
valid_names = [fn.value for fn in FolderName]
if folder_name not in valid_names:
raise ValueError(
f"Invalid folder_name '{folder_name}'. Must be one of: {', '.join(valid_names)}"
)
return True
def _sanitize_path(self, path: str) -> str:
"""Sanitize path to prevent path traversal attacks."""
# Normalize path
normalized = os.path.normpath(path)
# Check for absolute paths
if os.path.isabs(normalized):
raise PathTraversalError("Absolute paths are not allowed")
# Check for parent directory references
if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
raise PathTraversalError("Parent directory references are not allowed")
# Check for null bytes
if "\x00" in normalized:
raise PathTraversalError("Null bytes in path are not allowed")
return normalized
def _is_safe_path(self, base_path: Path, target_path: Path) -> bool:
"""Check if target path is within base path (prevents path traversal)."""
try:
# Resolve both paths to absolute paths
base_resolved = base_path.resolve()
target_resolved = target_path.resolve()
# Check if target is relative to base
target_resolved.relative_to(base_resolved)
return True
except (ValueError, OSError):
return False