From 2b815502f65454b290b10f02c22b1aafd5a2bcc9 Mon Sep 17 00:00:00 2001 From: Francwa Date: Mon, 1 Dec 2025 03:22:44 +0100 Subject: [PATCH] Before the refactoring ... --- .gitignore | 3 + agent/tools/api.py | 21 ---- agent/tools/filesystem.py | 259 +++++++++++++++++++++++++++++++++----- 3 files changed, 228 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 9e78bd9..8647d1d 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,6 @@ memory.json # OS .DS_Store Thumbs.db + +# Secrets +.env diff --git a/agent/tools/api.py b/agent/tools/api.py index 78ac27e..6c0f72a 100644 --- a/agent/tools/api.py +++ b/agent/tools/api.py @@ -84,13 +84,6 @@ def find_media_imdb_id(media_title: str) -> Dict[str, Any]: "message": str(e) } - except Exception as e: - logger.error(f"Unexpected error: {e}", exc_info=True) - return { - "error": "internal_error", - "message": "An unexpected error occurred" - } - def find_torrent(media_title: str) -> Dict[str, Any]: """ @@ -162,13 +155,6 @@ def find_torrent(media_title: str) -> Dict[str, Any]: "message": str(e) } - except Exception as e: - logger.error(f"Unexpected error: {e}", exc_info=True) - return { - "error": "internal_error", - "message": "An unexpected error occurred" - } - def add_torrent_to_qbittorrent(magnet_link: str) -> Dict[str, Any]: """ @@ -236,10 +222,3 @@ def add_torrent_to_qbittorrent(magnet_link: str) -> Dict[str, Any]: "error": "validation_failed", "message": str(e) } - - except Exception as e: - logger.error(f"Unexpected error: {e}", exc_info=True) - return { - "error": "internal_error", - "message": "An unexpected error occurred" - } diff --git a/agent/tools/filesystem.py b/agent/tools/filesystem.py index 5cb4f69..33d001d 100644 --- a/agent/tools/filesystem.py +++ b/agent/tools/filesystem.py @@ -31,13 +31,13 @@ class PathTraversalError(FilesystemError): def _validate_folder_name(folder_name: str) -> bool: """ Validate folder name against allowed values. - + Args: folder_name: Name to validate - + Returns: True if valid - + Raises: ValueError: If folder name is invalid """ @@ -52,42 +52,42 @@ def _validate_folder_name(folder_name: str) -> bool: def _sanitize_path(path: str) -> str: """ Sanitize path to prevent path traversal attacks. - + Args: path: Path to sanitize - + Returns: Sanitized path - + Raises: PathTraversalError: If path contains dangerous patterns """ # Normalize path normalized = os.path.normpath(path) - + # Check for absolute paths if os.path.isabs(normalized): raise PathTraversalError("Absolute paths are not allowed") - + # Check for parent directory references if normalized.startswith("..") or "/.." in normalized or "\\.." in normalized: raise PathTraversalError("Parent directory references are not allowed") - + # Check for null bytes if "\x00" in normalized: raise PathTraversalError("Null bytes in path are not allowed") - + return normalized def _is_safe_path(base_path: Path, target_path: Path) -> bool: """ Check if target path is within base path (prevents path traversal). - + Args: base_path: Base directory path target_path: Target path to check - + Returns: True if safe, False otherwise """ @@ -95,7 +95,7 @@ def _is_safe_path(base_path: Path, target_path: Path) -> bool: # Resolve both paths to absolute paths base_resolved = base_path.resolve() target_resolved = target_path.resolve() - + # Check if target is relative to base target_resolved.relative_to(base_resolved) return True @@ -111,17 +111,17 @@ def set_path_for_folder(memory: Memory, folder_name: str, path_value: str) -> Di memory: Memory instance to store the configuration folder_name: Name of folder to set (download, tvshow, movie, torrent) path_value: Absolute path to the folder - + Returns: Dict with status or error information """ try: # Validate folder name _validate_folder_name(folder_name) - + # Convert to Path object for better handling path_obj = Path(path_value).resolve() - + # Validate path exists and is a directory if not path_obj.exists(): logger.warning(f"Path does not exist: {path_value}") @@ -129,14 +129,14 @@ def set_path_for_folder(memory: Memory, folder_name: str, path_value: str) -> Di "error": "invalid_path", "message": f"Path does not exist: {path_value}" } - + if not path_obj.is_dir(): logger.warning(f"Path is not a directory: {path_value}") return { "error": "invalid_path", "message": f"Path is not a directory: {path_value}" } - + # Check if path is readable if not os.access(path_obj, os.R_OK): logger.warning(f"Path is not readable: {path_value}") @@ -144,23 +144,23 @@ def set_path_for_folder(memory: Memory, folder_name: str, path_value: str) -> Di "error": "permission_denied", "message": f"Path is not readable: {path_value}" } - + # Store in memory config = memory.get("config", {}) config[f"{folder_name}_folder"] = str(path_obj) memory.set("config", config) - + logger.info(f"Set {folder_name}_folder to: {path_obj}") return { "status": "ok", "folder_name": folder_name, "path": str(path_obj) } - + except ValueError as e: logger.error(f"Validation error: {e}") return {"error": "validation_failed", "message": str(e)} - + except Exception as e: logger.error(f"Unexpected error setting path: {e}", exc_info=True) return {"error": "internal_error", "message": "Failed to set path"} @@ -174,31 +174,31 @@ def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, memory: Memory instance to retrieve the configuration folder_type: Type of folder to list (download, tvshow, movie, torrent) path: Relative path within the folder (default: ".") - + Returns: Dict with folder contents or error information """ try: # Validate folder type _validate_folder_name(folder_type) - + # Sanitize the path safe_path = _sanitize_path(path) - + # Get root folder from config folder_key = f"{folder_type}_folder" config = memory.get("config", {}) - + if folder_key not in config or not config[folder_key]: logger.warning(f"Folder not configured: {folder_type}") return { "error": "folder_not_set", "message": f"{folder_type.capitalize()} folder not set in config." } - + root = Path(config[folder_key]) target = root / safe_path - + # Security check: ensure target is within root if not _is_safe_path(root, target): logger.warning(f"Path traversal attempt detected: {path}") @@ -206,7 +206,7 @@ def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, "error": "forbidden", "message": "Access denied: path outside allowed directory" } - + # Check if target exists if not target.exists(): logger.warning(f"Path does not exist: {target}") @@ -214,7 +214,7 @@ def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, "error": "not_found", "message": f"Path does not exist: {safe_path}" } - + # Check if target is a directory if not target.is_dir(): logger.warning(f"Path is not a directory: {target}") @@ -222,7 +222,7 @@ def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, "error": "not_a_directory", "message": f"Path is not a directory: {safe_path}" } - + # List directory contents try: entries = [entry.name for entry in target.iterdir()] @@ -240,18 +240,209 @@ def list_folder(memory: Memory, folder_type: str, path: str = ".") -> Dict[str, "error": "permission_denied", "message": f"Permission denied accessing: {safe_path}" } - + except PathTraversalError as e: logger.warning(f"Path traversal attempt: {e}") return { "error": "forbidden", "message": str(e) } - + except ValueError as e: logger.error(f"Validation error: {e}") return {"error": "validation_failed", "message": str(e)} - + except Exception as e: logger.error(f"Unexpected error listing folder: {e}", exc_info=True) return {"error": "internal_error", "message": "Failed to list folder"} + +def move_file(path: str, destination: str) -> Dict[str, Any]: + """ + Move a file from one location to another with safety checks. + + This function is designed to safely move files from downloads to movies/series + folders with comprehensive validation and error handling to prevent data loss. + + Args: + path: Source file path (absolute or relative) + destination: Destination file path (absolute or relative) + + Returns: + Dict with status or error information: + - Success: {"status": "ok", "source": str, "destination": str, "size": int} + - Error: {"error": str, "message": str} + + Safety features: + - Validates source file exists and is readable + - Validates destination directory exists and is writable + - Prevents overwriting existing files + - Verifies file integrity after move (size check) + - Atomic operation using shutil.move + - Comprehensive logging + + Example: + >>> result = move_file( + ... "/downloads/movie.mkv", + ... "/movies/Inception (2010)/movie.mkv" + ... ) + >>> print(result) + {'status': 'ok', 'source': '...', 'destination': '...', 'size': 1234567890} + """ + import shutil + + try: + # Convert to Path objects + source_path = Path(path).resolve() + dest_path = Path(destination).resolve() + + logger.info(f"Moving file from {source_path} to {dest_path}") + + # === VALIDATION: Source file === + + # Check source exists + if not source_path.exists(): + logger.error(f"Source file does not exist: {source_path}") + return { + "error": "source_not_found", + "message": f"Source file does not exist: {path}" + } + + # Check source is a file (not a directory) + if not source_path.is_file(): + logger.error(f"Source is not a file: {source_path}") + return { + "error": "source_not_file", + "message": f"Source is not a file: {path}" + } + + # Check source is readable + if not os.access(source_path, os.R_OK): + logger.error(f"Source file is not readable: {source_path}") + return { + "error": "permission_denied", + "message": f"Source file is not readable: {path}" + } + + # Get source file size for verification + source_size = source_path.stat().st_size + logger.debug(f"Source file size: {source_size} bytes") + + # === VALIDATION: Destination === + + # Check destination parent directory exists + dest_parent = dest_path.parent + if not dest_parent.exists(): + logger.error(f"Destination directory does not exist: {dest_parent}") + return { + "error": "destination_dir_not_found", + "message": f"Destination directory does not exist: {dest_parent}" + } + + # Check destination parent is a directory + if not dest_parent.is_dir(): + logger.error(f"Destination parent is not a directory: {dest_parent}") + return { + "error": "destination_not_dir", + "message": f"Destination parent is not a directory: {dest_parent}" + } + + # Check destination parent is writable + if not os.access(dest_parent, os.W_OK): + logger.error(f"Destination directory is not writable: {dest_parent}") + return { + "error": "permission_denied", + "message": f"Destination directory is not writable: {dest_parent}" + } + + # Check destination file doesn't already exist + if dest_path.exists(): + logger.warning(f"Destination file already exists: {dest_path}") + return { + "error": "destination_exists", + "message": f"Destination file already exists: {destination}" + } + + # === SAFETY CHECK: Prevent moving to same location === + + if source_path == dest_path: + logger.warning("Source and destination are the same") + return { + "error": "same_location", + "message": "Source and destination are the same" + } + + # === PERFORM MOVE === + + logger.info(f"Moving file: {source_path.name} ({source_size} bytes)") + + try: + # Use shutil.move for atomic operation + # This handles cross-filesystem moves automatically + shutil.move(str(source_path), str(dest_path)) + logger.info(f"File moved successfully to {dest_path}") + + except Exception as e: + logger.error(f"Failed to move file: {e}", exc_info=True) + return { + "error": "move_failed", + "message": f"Failed to move file: {str(e)}" + } + + # === VERIFICATION: Ensure file was moved correctly === + + # Check destination file exists + if not dest_path.exists(): + logger.error("Destination file does not exist after move!") + # Try to recover by checking if source still exists + if source_path.exists(): + logger.info("Source file still exists, move may have failed") + return { + "error": "move_verification_failed", + "message": "File was not moved successfully (destination not found)" + } + else: + logger.critical("Both source and destination missing after move!") + return { + "error": "file_lost", + "message": "CRITICAL: File missing after move operation" + } + + # Check destination file size matches source + dest_size = dest_path.stat().st_size + if dest_size != source_size: + logger.error(f"File size mismatch! Source: {source_size}, Dest: {dest_size}") + return { + "error": "size_mismatch", + "message": f"File size mismatch after move (expected {source_size}, got {dest_size})" + } + + # Check source file no longer exists + if source_path.exists(): + logger.warning("Source file still exists after move (copy instead of move?)") + # This is not necessarily an error (shutil.move copies across filesystems) + # but we should log it + + # === SUCCESS === + + logger.info(f"File successfully moved and verified: {dest_path.name}") + return { + "status": "ok", + "source": str(source_path), + "destination": str(dest_path), + "filename": dest_path.name, + "size": dest_size + } + + except PermissionError as e: + logger.error(f"Permission denied: {e}") + return { + "error": "permission_denied", + "message": f"Permission denied: {str(e)}" + } + + except OSError as e: + logger.error(f"OS error during move: {e}", exc_info=True) + return { + "error": "os_error", + "message": f"OS error: {str(e)}" + }