Files
alfred/agent/tools/filesystem.py
2025-11-28 22:09:26 +01:00

258 lines
7.8 KiB
Python

"""Filesystem tools for managing folders and files with security."""
from typing import Dict, Any
from enum import Enum
from pathlib import Path
import logging
import os
from ..memory import Memory
logger = logging.getLogger(__name__)
class FolderName(Enum):
"""Types of folders that can be managed."""
DOWNLOAD = "download"
TVSHOW = "tvshow"
MOVIE = "movie"
TORRENT = "torrent"
class FilesystemError(Exception):
"""Base exception for filesystem operations."""
pass
class PathTraversalError(FilesystemError):
"""Raised when path traversal attack is detected."""
pass
def _validate_folder_name(folder_name: str) -> bool:
"""
Validate folder name against allowed values.
Args:
folder_name: Name to validate
Returns:
True if valid
Raises:
ValueError: If folder name is invalid
"""
valid_names = [fn.value for fn in FolderName]
if folder_name not in valid_names:
raise ValueError(
f"Invalid folder_name '{folder_name}'. Must be one of: {', '.join(valid_names)}"
)
return True
def _sanitize_path(path: str) -> str:
"""
Sanitize path to prevent path traversal attacks.
Args:
path: Path to sanitize
Returns:
Sanitized path
Raises:
PathTraversalError: If path contains dangerous patterns
"""
# Normalize path
normalized = os.path.normpath(path)
# Check for absolute paths
if os.path.isabs(normalized):
raise PathTraversalError("Absolute paths are not allowed")
# Check for parent directory references
if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized:
raise PathTraversalError("Parent directory references are not allowed")
# Check for null bytes
if "\x00" in normalized:
raise PathTraversalError("Null bytes in path are not allowed")
return normalized
def _is_safe_path(base_path: Path, target_path: Path) -> bool:
"""
Check if target path is within base path (prevents path traversal).
Args:
base_path: Base directory path
target_path: Target path to check
Returns:
True if safe, False otherwise
"""
try:
# Resolve both paths to absolute paths
base_resolved = base_path.resolve()
target_resolved = target_path.resolve()
# Check if target is relative to base
target_resolved.relative_to(base_resolved)
return True
except (ValueError, OSError):
return False
def set_path_for_folder(memory: Memory, folder_name: str, path_value: str) -> Dict[str, Any]:
"""
Set a path in the config with validation.
Args:
memory: Memory instance to store the configuration
folder_name: Name of folder to set (download, tvshow, movie, torrent)
path_value: Absolute path to the folder
Returns:
Dict with status or error information
"""
try:
# Validate folder name
_validate_folder_name(folder_name)
# Convert to Path object for better handling
path_obj = Path(path_value).resolve()
# Validate path exists and is a directory
if not path_obj.exists():
logger.warning(f"Path does not exist: {path_value}")
return {
"error": "invalid_path",
"message": f"Path does not exist: {path_value}"
}
if not path_obj.is_dir():
logger.warning(f"Path is not a directory: {path_value}")
return {
"error": "invalid_path",
"message": f"Path is not a directory: {path_value}"
}
# Check if path is readable
if not os.access(path_obj, os.R_OK):
logger.warning(f"Path is not readable: {path_value}")
return {
"error": "permission_denied",
"message": f"Path is not readable: {path_value}"
}
# Store in memory
config = memory.get("config", {})
config[f"{folder_name}_folder"] = str(path_obj)
memory.set("config", config)
logger.info(f"Set {folder_name}_folder to: {path_obj}")
return {
"status": "ok",
"folder_name": folder_name,
"path": str(path_obj)
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error setting path: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to set path"}
def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, Any]:
"""
List contents of a folder with security checks.
Args:
memory: Memory instance to retrieve the configuration
folder_type: Type of folder to list (download, tvshow, movie, torrent)
path: Relative path within the folder (default: ".")
Returns:
Dict with folder contents or error information
"""
try:
# Validate folder type
_validate_folder_name(folder_type)
# Sanitize the path
safe_path = _sanitize_path(path)
# Get root folder from config
folder_key = f"{folder_type}_folder"
config = memory.get("config", {})
if folder_key not in config or not config[folder_key]:
logger.warning(f"Folder not configured: {folder_type}")
return {
"error": "folder_not_set",
"message": f"{folder_type.capitalize()} folder not set in config."
}
root = Path(config[folder_key])
target = root / safe_path
# Security check: ensure target is within root
if not _is_safe_path(root, target):
logger.warning(f"Path traversal attempt detected: {path}")
return {
"error": "forbidden",
"message": "Access denied: path outside allowed directory"
}
# Check if target exists
if not target.exists():
logger.warning(f"Path does not exist: {target}")
return {
"error": "not_found",
"message": f"Path does not exist: {safe_path}"
}
# Check if target is a directory
if not target.is_dir():
logger.warning(f"Path is not a directory: {target}")
return {
"error": "not_a_directory",
"message": f"Path is not a directory: {safe_path}"
}
# List directory contents
try:
entries = [entry.name for entry in target.iterdir()]
logger.debug(f"Listed {len(entries)} entries in {target}")
return {
"status": "ok",
"folder_type": folder_type,
"path": safe_path,
"entries": sorted(entries),
"count": len(entries)
}
except PermissionError:
logger.warning(f"Permission denied accessing: {target}")
return {
"error": "permission_denied",
"message": f"Permission denied accessing: {safe_path}"
}
except PathTraversalError as e:
logger.warning(f"Path traversal attempt: {e}")
return {
"error": "forbidden",
"message": str(e)
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {"error": "validation_failed", "message": str(e)}
except Exception as e:
logger.error(f"Unexpected error listing folder: {e}", exc_info=True)
return {"error": "internal_error", "message": "Failed to list folder"}