ParamManagerScripts/lib/schema_handler.py

286 lines
12 KiB
Python

import os
import json
import traceback
from typing import Dict, Any, Optional, Callable
class SchemaHandler:
def __init__(
self,
data_path: str,
script_groups_path: str,
get_workdir_func: Callable[[], Optional[str]],
):
self.data_path = data_path
self.script_groups_path = script_groups_path
self._get_working_directory = (
get_workdir_func # Function to get current workdir from main manager
)
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
schema_path = self._get_schema_path(level, group)
if not schema_path:
print(
f"Warning: Could not determine schema path for level '{level}', group '{group}'. Returning empty schema."
)
return {"type": "object", "properties": {}}
try:
if os.path.exists(schema_path):
try:
with open(schema_path, "r", encoding="utf-8") as f:
schema = json.load(f)
if (
not isinstance(schema, dict)
or "properties" not in schema
or "type" not in schema
):
print(
f"Warning: Schema file {schema_path} has invalid structure. Returning default."
)
return {"type": "object", "properties": {}}
if not isinstance(schema.get("properties"), dict):
print(
f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing."
)
schema["properties"] = {}
return schema
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from schema file: {schema_path}. Returning default."
)
return {"type": "object", "properties": {}}
except Exception as e:
print(
f"Error reading schema file {schema_path}: {e}. Returning default."
)
return {"type": "object", "properties": {}}
else:
print(
f"Info: Schema file not found at {schema_path}. Creating default schema."
)
default_schema = {"type": "object", "properties": {}}
try:
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2, ensure_ascii=False)
return default_schema
except Exception as e:
print(f"Error creating default schema file at {schema_path}: {e}")
return {"type": "object", "properties": {}}
except ValueError as ve:
print(f"Error getting schema path: {ve}")
return {"type": "object", "properties": {}}
except Exception as e:
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Unexpected error loading schema from {error_path}: {str(e)}")
return {"type": "object", "properties": {}}
def update_schema(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update schema for specified level and clean corresponding config."""
schema_path = self._get_schema_path(level, group)
config_path = self._get_config_path_for_schema(
level, group
) # Get corresponding config path
if not schema_path:
return {
"status": "error",
"message": f"Could not determine schema path for level '{level}', group '{group}'",
}
try:
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
# Basic validation and normalization of the schema data being saved
if not isinstance(data, dict):
data = {"type": "object", "properties": {}}
if "type" not in data:
data["type"] = "object"
if "properties" not in data or not isinstance(data["properties"], dict):
data["properties"] = {}
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Schema successfully updated at {schema_path}")
if config_path:
self._clean_config_for_schema(config_path, data)
else:
print(
f"Info: Config cleaning skipped for level {level} (no valid config path)."
)
return {"status": "success"}
except Exception as e:
print(f"Error updating schema at {schema_path}: {str(e)}")
print(traceback.format_exc())
return {"status": "error", "message": str(e)}
def _get_schema_path(
self, level: str, group: Optional[str] = None
) -> Optional[str]:
"""Helper to determine the schema file path."""
clean_level = str(level).split("-")[0]
if clean_level == "1":
return os.path.join(self.data_path, "esquema_general.json")
elif clean_level == "2":
if not group:
raise ValueError("Group is required for level 2 schema")
return os.path.join(self.script_groups_path, group, "esquema_group.json")
elif clean_level == "3":
if not group:
print(
"Warning: Group needed to determine level 3 schema (esquema_work.json)."
)
return None # Cannot determine without group
return os.path.join(self.script_groups_path, group, "esquema_work.json")
else:
print(f"Warning: Invalid level '{level}' for schema path retrieval.")
return None
def _get_config_path_for_schema(
self, level: str, group: Optional[str] = None
) -> Optional[str]:
"""Helper to determine the config file path corresponding to a schema level."""
clean_level = str(level).split("-")[0]
if clean_level == "1":
return os.path.join(self.data_path, "data.json")
elif clean_level == "2":
if not group:
return None
return os.path.join(self.script_groups_path, group, "data.json")
elif clean_level == "3":
working_directory = self._get_working_directory()
if working_directory and os.path.isdir(working_directory):
return os.path.join(working_directory, "data.json")
else:
print(
f"Warning: Working directory not set or invalid ('{working_directory}') for level 3 config path."
)
return None
else:
return None
def _clean_config_for_schema(
self, config_path: str, schema: Dict[str, Any]
) -> None:
"""Clean configuration file to match schema structure."""
try:
if not os.path.exists(config_path):
print(
f"Info: Config file {config_path} not found for cleaning. Skipping."
)
return
config = {}
content = ""
with open(config_path, "r", encoding="utf-8") as f:
content = f.read()
if content.strip():
config = json.loads(content)
else:
print(
f"Info: Config file {config_path} is empty. Cleaning will result in an empty object."
)
cleaned_config = self._clean_object_against_schema(config, schema)
try:
original_config_str = json.dumps(config, sort_keys=True)
cleaned_config_str = json.dumps(cleaned_config, sort_keys=True)
except TypeError as te:
print(
f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save."
)
original_config_str, cleaned_config_str = "", " " # Force inequality
if original_config_str != cleaned_config_str or not content.strip():
print(f"Info: Cleaning config file: {config_path}")
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
else:
print(
f"Info: Config file {config_path} already matches schema. No cleaning needed."
)
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean."
)
except IOError as e:
print(f"Error accessing config file {config_path} during cleaning: {e}")
except Exception as e:
print(f"Unexpected error cleaning config {config_path}: {str(e)}")
print(traceback.format_exc())
def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any:
"""Recursively clean data to match schema structure."""
if not isinstance(schema, dict):
print(
f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}"
)
return data
schema_type = schema.get("type")
if schema_type == "object":
if not isinstance(data, dict):
return {}
result = {}
schema_props = schema.get("properties", {})
if not isinstance(schema_props, dict):
print(
"Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object."
)
return {}
for key, value in data.items():
if key in schema_props:
prop_schema = schema_props[key]
if isinstance(prop_schema, dict):
result[key] = self._clean_object_against_schema(
value, prop_schema
)
else:
print(
f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data."
)
return result
elif schema_type == "array":
if not isinstance(data, list):
return []
items_schema = schema.get("items")
if isinstance(items_schema, dict):
return [
self._clean_object_against_schema(item, items_schema)
for item in data
]
else:
return data # Keep array items as they are if no valid 'items' schema defined
elif "enum" in schema:
enum_values = schema.get("enum")
if isinstance(enum_values, list):
if data in enum_values:
return data
else:
return None # Or consider schema.get('default')
else:
print(
f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'."
)
return None
elif schema_type in ["string", "integer", "number", "boolean", "null"]:
return data # Basic types, return as is (could add type checking)
else:
# print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.")
return data