import os import json from typing import Dict, List, Optional, Callable class DirectoryManager: def __init__( self, script_groups_path: str, set_global_workdir_callback: Callable[[Optional[str]], None], ): self.script_groups_path = script_groups_path self._set_global_workdir = ( set_global_workdir_callback # Callback to update main manager's workdir ) def get_work_dir_for_group(self, group: str) -> Optional[str]: """Get working directory path for a script group from work_dir.json.""" work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: with open(work_dir_path, "r", encoding="utf-8") as f: data = json.load(f) path = data.get("path", "") if path: path = os.path.normpath(path) if path and os.path.isdir(path): return path elif path: print( f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}" ) return None else: return None except (FileNotFoundError, json.JSONDecodeError): return None except Exception as e: print(f"Error reading work_dir.json for group '{group}': {e}") return None def get_directory_history(self, group: str) -> List[str]: """Get the directory history for a script group.""" work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: with open(work_dir_path, "r", encoding="utf-8") as f: data = json.load(f) history = [os.path.normpath(p) for p in data.get("history", [])] return [p for p in history if os.path.isdir(p)] except (FileNotFoundError, json.JSONDecodeError): return [] except Exception as e: print(f"Error reading directory history for group '{group}': {e}") return [] def set_work_dir_for_group(self, group: str, path: str) -> Dict[str, str]: """Set working directory path for a script group and update history.""" path = os.path.normpath(path) if not os.path.isdir(path): # Check if it's a valid directory return { "status": "error", "message": f"Directory does not exist or is not valid: {path}", } work_dir_file = os.path.join(self.script_groups_path, group, "work_dir.json") work_dir_folder = os.path.dirname(work_dir_file) try: os.makedirs(work_dir_folder, exist_ok=True) # Ensure group folder exists try: with open(work_dir_file, "r", encoding="utf-8") as f: data = json.load(f) if "history" in data: data["history"] = [os.path.normpath(p) for p in data["history"]] except (FileNotFoundError, json.JSONDecodeError): data = {"path": "", "history": []} data["path"] = path if "history" not in data: data["history"] = [] data["history"] = [ p for p in data["history"] if os.path.normpath(p) != path ] data["history"].insert(0, path) data["history"] = data["history"][:10] with open(work_dir_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) self._set_global_workdir( path ) # Update the main manager's working directory return {"status": "success", "path": path} except Exception as e: print(f"Error setting work directory for group {group} at {path}: {e}") return {"status": "error", "message": str(e)}