import os import json import traceback from typing import Dict, Any, Optional, Callable class SchemaHandler: def __init__( self, data_path: str, script_groups_path: str, get_workdir_func: Callable[[], Optional[str]], ): self.data_path = data_path self.script_groups_path = script_groups_path self._get_working_directory = ( get_workdir_func # Function to get current workdir from main manager ) def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: """Get schema for specified level.""" schema_path = self._get_schema_path(level, group) if not schema_path: print( f"Warning: Could not determine schema path for level '{level}', group '{group}'. Returning empty schema." ) return {"type": "object", "properties": {}} try: if os.path.exists(schema_path): try: with open(schema_path, "r", encoding="utf-8") as f: schema = json.load(f) if ( not isinstance(schema, dict) or "properties" not in schema or "type" not in schema ): print( f"Warning: Schema file {schema_path} has invalid structure. Returning default." ) return {"type": "object", "properties": {}} if not isinstance(schema.get("properties"), dict): print( f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing." ) schema["properties"] = {} return schema except json.JSONDecodeError: print( f"Error: Could not decode JSON from schema file: {schema_path}. Returning default." ) return {"type": "object", "properties": {}} except Exception as e: print( f"Error reading schema file {schema_path}: {e}. Returning default." ) return {"type": "object", "properties": {}} else: print( f"Info: Schema file not found at {schema_path}. Creating default schema." ) default_schema = {"type": "object", "properties": {}} try: os.makedirs(os.path.dirname(schema_path), exist_ok=True) with open(schema_path, "w", encoding="utf-8") as f: json.dump(default_schema, f, indent=2, ensure_ascii=False) return default_schema except Exception as e: print(f"Error creating default schema file at {schema_path}: {e}") return {"type": "object", "properties": {}} except ValueError as ve: print(f"Error getting schema path: {ve}") return {"type": "object", "properties": {}} except Exception as e: error_path = schema_path if schema_path else f"Level {level}, Group {group}" print(f"Unexpected error loading schema from {error_path}: {str(e)}") return {"type": "object", "properties": {}} def update_schema( self, level: str, data: Dict[str, Any], group: str = None ) -> Dict[str, str]: """Update schema for specified level and clean corresponding config.""" schema_path = self._get_schema_path(level, group) config_path = self._get_config_path_for_schema( level, group ) # Get corresponding config path if not schema_path: return { "status": "error", "message": f"Could not determine schema path for level '{level}', group '{group}'", } try: os.makedirs(os.path.dirname(schema_path), exist_ok=True) # Basic validation and normalization of the schema data being saved if not isinstance(data, dict): data = {"type": "object", "properties": {}} if "type" not in data: data["type"] = "object" if "properties" not in data or not isinstance(data["properties"], dict): data["properties"] = {} with open(schema_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"Info: Schema successfully updated at {schema_path}") if config_path: self._clean_config_for_schema(config_path, data) else: print( f"Info: Config cleaning skipped for level {level} (no valid config path)." ) return {"status": "success"} except Exception as e: print(f"Error updating schema at {schema_path}: {str(e)}") print(traceback.format_exc()) return {"status": "error", "message": str(e)} def _get_schema_path( self, level: str, group: Optional[str] = None ) -> Optional[str]: """Helper to determine the schema file path.""" clean_level = str(level).split("-")[0] if clean_level == "1": return os.path.join(self.data_path, "esquema_general.json") elif clean_level == "2": if not group: raise ValueError("Group is required for level 2 schema") return os.path.join(self.script_groups_path, group, "esquema_group.json") elif clean_level == "3": if not group: print( "Warning: Group needed to determine level 3 schema (esquema_work.json)." ) return None # Cannot determine without group return os.path.join(self.script_groups_path, group, "esquema_work.json") else: print(f"Warning: Invalid level '{level}' for schema path retrieval.") return None def _get_config_path_for_schema( self, level: str, group: Optional[str] = None ) -> Optional[str]: """Helper to determine the config file path corresponding to a schema level.""" clean_level = str(level).split("-")[0] if clean_level == "1": return os.path.join(self.data_path, "data.json") elif clean_level == "2": if not group: return None return os.path.join(self.script_groups_path, group, "data.json") elif clean_level == "3": working_directory = self._get_working_directory() if working_directory and os.path.isdir(working_directory): return os.path.join(working_directory, "data.json") else: print( f"Warning: Working directory not set or invalid ('{working_directory}') for level 3 config path." ) return None else: return None def _clean_config_for_schema( self, config_path: str, schema: Dict[str, Any] ) -> None: """Clean configuration file to match schema structure.""" try: if not os.path.exists(config_path): print( f"Info: Config file {config_path} not found for cleaning. Skipping." ) return config = {} content = "" with open(config_path, "r", encoding="utf-8") as f: content = f.read() if content.strip(): config = json.loads(content) else: print( f"Info: Config file {config_path} is empty. Cleaning will result in an empty object." ) cleaned_config = self._clean_object_against_schema(config, schema) try: original_config_str = json.dumps(config, sort_keys=True) cleaned_config_str = json.dumps(cleaned_config, sort_keys=True) except TypeError as te: print( f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save." ) original_config_str, cleaned_config_str = "", " " # Force inequality if original_config_str != cleaned_config_str or not content.strip(): print(f"Info: Cleaning config file: {config_path}") with open(config_path, "w", encoding="utf-8") as f: json.dump(cleaned_config, f, indent=2, ensure_ascii=False) else: print( f"Info: Config file {config_path} already matches schema. No cleaning needed." ) except json.JSONDecodeError: print( f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean." ) except IOError as e: print(f"Error accessing config file {config_path} during cleaning: {e}") except Exception as e: print(f"Unexpected error cleaning config {config_path}: {str(e)}") print(traceback.format_exc()) def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any: """Recursively clean data to match schema structure.""" if not isinstance(schema, dict): print( f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}" ) return data schema_type = schema.get("type") if schema_type == "object": if not isinstance(data, dict): return {} result = {} schema_props = schema.get("properties", {}) if not isinstance(schema_props, dict): print( "Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object." ) return {} for key, value in data.items(): if key in schema_props: prop_schema = schema_props[key] if isinstance(prop_schema, dict): result[key] = self._clean_object_against_schema( value, prop_schema ) else: print( f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data." ) return result elif schema_type == "array": if not isinstance(data, list): return [] items_schema = schema.get("items") if isinstance(items_schema, dict): return [ self._clean_object_against_schema(item, items_schema) for item in data ] else: return data # Keep array items as they are if no valid 'items' schema defined elif "enum" in schema: enum_values = schema.get("enum") if isinstance(enum_values, list): if data in enum_values: return data else: return None # Or consider schema.get('default') else: print( f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'." ) return None elif schema_type in ["string", "integer", "number", "boolean", "null"]: return data # Basic types, return as is (could add type checking) else: # print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.") return data