Agregada opcion de valores por defecto en level2

This commit is contained in:
Miguel 2025-05-03 17:18:41 +02:00
parent b38c26bee7
commit 6ffdec7a9a
14 changed files with 2553 additions and 18949 deletions

174
.gitignore vendored Normal file
View File

@ -0,0 +1,174 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

View File

@ -0,0 +1,34 @@
--- Log de Ejecución: x1.py ---
Grupo: EmailCrono
Directorio de Trabajo: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Inicio: 2025-05-03 17:15:12
Fin: 2025-05-03 17:15:14
Duración: 0:00:01.628641
Estado: SUCCESS (Código de Salida: 0)
--- SALIDA ESTÁNDAR (STDOUT) ---
Working directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Input directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Output directory: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs
Cronologia file: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
Attachments directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\adjuntos
Beautify rules file: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\config\beautify_rules.json
Found 1 .eml files
Loaded 0 existing messages
Processing C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS.eml
Aplicando reglas de prioridad 1
Aplicando reglas de prioridad 2
Aplicando reglas de prioridad 3
Aplicando reglas de prioridad 4
Estadísticas de procesamiento:
- Total mensajes encontrados: 1
- Mensajes únicos añadidos: 1
- Mensajes duplicados ignorados: 0
Writing 1 messages to C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
--- ERRORES (STDERR) ---
Ninguno
--- FIN DEL LOG ---

View File

@ -0,0 +1,14 @@
{
"level1": {
"api_key": "your-api-key-here",
"model": "gpt-3.5-turbo"
},
"level2": {
"attachments_dir": "adjuntos",
"cronologia_file": "cronologia.md"
},
"level3": {
"output_directory": "C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs"
},
"working_directory": "C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS"
}

View File

@ -1,6 +1,8 @@
{
"path": "C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody",
"path": "C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS",
"history": [
"C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS",
"C:\\Estudio",
"C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody",
"C:\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Reporte\\Emails",
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Emails",

View File

@ -1,4 +1,11 @@
{
"scl_output_dir": "scl_output",
"xref_output_dir": "xref_output"
"xref_output_dir": "xref_output",
"xref_source_subdir": "source",
"call_xref_filename": "xref_calls_tree.md",
"db_usage_xref_filename": "xref_db_usage_summary.md",
"plc_tag_xref_filename": "xref_plc_tags_summary.md",
"max_call_depth": 5,
"max_users_list": 20,
"aggregated_filename": "full_project_representation.md"
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,17 @@
"api_key": "your-api-key-here",
"model": "gpt-3.5-turbo"
},
"level2": {},
"level2": {
"scl_output_dir": "scl_output",
"xref_output_dir": "xref_output",
"xref_source_subdir": "source",
"call_xref_filename": "xref_calls_tree.md",
"db_usage_xref_filename": "xref_db_usage_summary.md",
"plc_tag_xref_filename": "xref_plc_tags_summary.md",
"max_call_depth": 5,
"max_users_list": 20,
"aggregated_filename": "full_project_representation.md"
},
"level3": {},
"working_directory": "C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport"
}

View File

@ -2,11 +2,13 @@ import os
import json
import subprocess
import re
from typing import Dict, Any, List
import traceback
from typing import Dict, Any, List, Optional
import time # Add this import
from datetime import datetime # Add this import
# --- ConfigurationManager Class ---
class ConfigurationManager:
def __init__(self):
self.base_path = os.path.dirname(os.path.abspath(__file__))
@ -18,6 +20,7 @@ class ConfigurationManager:
self.log_file = os.path.join(self.data_path, "log.txt")
self._init_log_file()
self.last_execution_time = 0 # Add this attribute
# Minimum seconds between script executions to prevent rapid clicks
self.min_execution_interval = 1 # Minimum seconds between executions
def _init_log_file(self):
@ -28,6 +31,7 @@ class ConfigurationManager:
with open(self.log_file, "w", encoding="utf-8") as f:
f.write("")
# --- Logging Methods ---
def append_log(self, message: str) -> None:
"""Append a message to the CENTRAL log file with timestamp."""
# This function now primarily logs messages from the app itself,
@ -38,6 +42,7 @@ class ConfigurationManager:
lines_with_timestamp = []
for line in lines:
if line.strip():
# Add timestamp only if line doesn't already have one (e.g., from script output)
if not line.strip().startswith("["):
line = f"{timestamp}{line}"
lines_with_timestamp.append(f"{line}\n")
@ -81,6 +86,7 @@ class ConfigurationManager:
print(f"Error clearing log file: {e}")
return False
# --- Working Directory Methods ---
def set_working_directory(self, path: str) -> Dict[str, str]:
"""Set and validate working directory."""
if not os.path.exists(path):
@ -89,13 +95,67 @@ class ConfigurationManager:
self.working_directory = path
# Create default data.json if it doesn't exist
# This data.json will be populated with defaults by get_config later if needed
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w") as f:
try:
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
print(
f"Info: Created empty data.json in working directory: {data_path}"
)
except Exception as e:
print(f"Error creating data.json in working directory {path}: {e}")
# Non-fatal, get_config will handle missing file
return {"status": "success", "path": path}
def get_work_dir(self, group: str) -> Optional[str]:
"""Get working directory path for a script group from work_dir.json."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida y existe
if path and os.path.isdir(path): # Check if it's a directory
self.working_directory = path
return path
elif path:
print(
f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}"
)
self.working_directory = None # Reset if invalid
return None
else:
self.working_directory = None # Reset if no path stored
return None
except (FileNotFoundError, json.JSONDecodeError):
self.working_directory = None # Reset if file missing or invalid
return None
except Exception as e:
print(f"Error reading work_dir.json for group '{group}': {e}")
self.working_directory = None
return None
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [
p for p in history if os.path.isdir(p)
] # Check if directory exists
except (FileNotFoundError, json.JSONDecodeError):
return []
def get_script_groups(self) -> List[Dict[str, Any]]:
"""Returns list of available script groups with their descriptions."""
groups = []
@ -127,189 +187,506 @@ class ConfigurationManager:
print(f"Error reading group description: {e}")
return {}
# --- Configuration (data.json) Methods ---
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get configuration for specified level."""
"""
Get configuration for specified level.
Applies default values from the corresponding schema if the config
file doesn't exist or is missing keys with defaults.
"""
config_data = {}
needs_save = False
schema = None
data_path = None
schema_path_for_debug = "N/A" # For logging
# 1. Determine data path based on level
if level == "1":
path = os.path.join(self.data_path, "data.json")
data_path = os.path.join(self.data_path, "data.json")
schema_path_for_debug = os.path.join(self.data_path, "esquema_general.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not self.working_directory:
return {} # Return empty config if working directory not set
path = os.path.join(self.working_directory, "data.json")
try:
with open(path, "r") as f:
return json.load(f)
except FileNotFoundError:
return {} # Return empty config if file doesn't exist
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
try:
# Clean level parameter
level = str(level).split("-")[0]
# Determine schema path based on level
if level == "1":
path = os.path.join(self.data_path, "esquema_general.json")
elif level == "2":
path = os.path.join(
if not group:
return {"error": "Group required for level 2 config"}
data_path = os.path.join(self.script_groups_path, group, "data.json")
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif level == "3":
if not group:
return {"type": "object", "properties": {}}
path = os.path.join(self.script_groups_path, group, "esquema_work.json")
# Level 3 config is always in the current working directory
if not self.working_directory:
return {} # Return empty config if working directory not set
data_path = os.path.join(self.working_directory, "data.json")
# Level 3 config might be based on level 3 schema (esquema_work.json)
if group:
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
return {"type": "object", "properties": {}}
# If no group, we can't determine the L3 schema for defaults.
schema_path_for_debug = "N/A (Level 3 without group)"
else:
return {"error": f"Invalid level specified for config: {level}"}
# Read existing schema from whichever file exists
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
schema = json.load(f)
return (
schema
if isinstance(schema, dict)
else {"type": "object", "properties": {}}
# 2. Get the corresponding schema to check for defaults
try:
# Only attempt to load schema if needed (e.g., not L3 without group)
if not (level == "3" and not group):
schema = self.get_schema(
level, group
) # Use the robust get_schema method
else:
schema = None # Cannot determine L3 schema without group
except Exception as e:
print(
f"Warning: Could not load schema for level {level}, group {group}. Defaults will not be applied. Error: {e}"
)
schema = None # Ensure schema is None if loading failed
# 3. Try to load existing data
data_file_exists = os.path.exists(data_path)
if data_file_exists:
try:
with open(data_path, "r", encoding="utf-8") as f_data:
content = f_data.read()
if content.strip():
config_data = json.loads(content)
else:
print(
f"Warning: Data file {data_path} is empty. Will initialize with defaults."
)
needs_save = True # Force save if file was empty
except json.JSONDecodeError:
print(
f"Warning: Could not decode JSON from {data_path}. Will initialize with defaults."
)
config_data = {}
needs_save = True
except Exception as e:
print(
f"Error reading data from {data_path}: {e}. Will attempt to initialize with defaults."
)
config_data = {}
needs_save = True
except FileNotFoundError:
print(
f"Info: Data file not found at {data_path}. Will initialize with defaults."
)
needs_save = True # Mark for saving as it's a new file
# 4. Apply defaults from schema if schema was loaded successfully
if schema and isinstance(schema, dict) and "properties" in schema:
schema_properties = schema.get("properties", {})
if isinstance(schema_properties, dict): # Ensure properties is a dict
for key, prop_definition in schema_properties.items():
# Ensure prop_definition is a dictionary before checking 'default'
if (
isinstance(prop_definition, dict)
and key not in config_data
and "default" in prop_definition
):
print(
f"Info: Applying default for '{key}' from schema {schema_path_for_debug}"
)
config_data[key] = prop_definition["default"]
needs_save = (
True # Mark for saving because a default was applied
)
else:
print(
f"Warning: 'properties' in schema {schema_path_for_debug} is not a dictionary. Cannot apply defaults."
)
# Create default schema if no file exists
default_schema = {"type": "object", "properties": {}}
# 5. Save the file if it was created or updated with defaults
if needs_save and data_path:
try:
print(f"Info: Saving updated config data to: {data_path}")
os.makedirs(os.path.dirname(data_path), exist_ok=True)
with open(data_path, "w", encoding="utf-8") as f_data:
json.dump(config_data, f_data, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error: Could not write data file to {data_path}: {e}")
except Exception as e:
print(f"Unexpected error saving data to {data_path}: {e}")
# 6. Return the final configuration
return config_data
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
path = None
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
if not group:
return {
"status": "error",
"message": "Group required for level 2 config update",
}
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not self.working_directory:
return {
"status": "error",
"message": "Working directory not set for level 3 config update",
}
path = os.path.join(self.working_directory, "data.json")
else:
return {
"status": "error",
"message": f"Invalid level for config update: {level}",
}
try:
# Ensure directory exists
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2)
return default_schema
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Config successfully updated at {path}")
return {"status": "success"}
except Exception as e:
print(f"Error loading schema: {str(e)}")
print(f"Error updating config at {path}: {str(e)}")
return {"status": "error", "message": str(e)}
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
schema_path = None
try:
# Clean level parameter
clean_level = str(level).split("-")[0]
# Determine schema path based on level
if clean_level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json")
elif clean_level == "2":
if not group:
raise ValueError("Group is required for level 2 schema")
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif clean_level == "3":
if not group:
# Level 3 schema (esquema_work) is tied to a group.
# If no group, we can't know which schema to load.
print(
"Warning: Group needed to determine level 3 schema (esquema_work.json). Returning empty schema."
)
return {"type": "object", "properties": {}}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
print(
f"Warning: Invalid level '{level}' for schema retrieval. Returning empty schema."
)
return {"type": "object", "properties": {}}
# Read existing schema or create default if it doesn't exist
if os.path.exists(schema_path):
try:
with open(schema_path, "r", encoding="utf-8") as f:
schema = json.load(f)
# Basic validation
if (
not isinstance(schema, dict)
or "properties" not in schema
or "type" not in schema
):
print(
f"Warning: Schema file {schema_path} has invalid structure. Returning default."
)
return {"type": "object", "properties": {}}
# Ensure properties is a dict
if not isinstance(schema.get("properties"), dict):
print(
f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing."
)
schema["properties"] = {}
return schema
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from schema file: {schema_path}. Returning default."
)
return {"type": "object", "properties": {}}
except Exception as e:
print(
f"Error reading schema file {schema_path}: {e}. Returning default."
)
return {"type": "object", "properties": {}}
else:
print(
f"Info: Schema file not found at {schema_path}. Creating default schema."
)
default_schema = {"type": "object", "properties": {}}
try:
# Ensure directory exists before writing
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2, ensure_ascii=False)
return default_schema
except Exception as e:
print(f"Error creating default schema file at {schema_path}: {e}")
return {
"type": "object",
"properties": {},
} # Return empty if creation fails
except ValueError as ve: # Catch specific errors like missing group
print(f"Error getting schema path: {ve}")
return {"type": "object", "properties": {}}
except Exception as e:
# Log the full path in case of unexpected errors
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Unexpected error loading schema from {error_path}: {str(e)}")
return {"type": "object", "properties": {}}
def update_schema(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update schema for specified level and clean corresponding config."""
schema_path = None
config_path = None
try:
# Clean level parameter if it contains extra info like '-edit'
clean_level = str(level).split("-")[0]
# Determinar rutas de schema y config
if level == "1":
if clean_level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json")
config_path = os.path.join(self.data_path, "data.json")
elif level == "2":
elif clean_level == "2":
if not group:
return {
"status": "error",
"message": "Group is required for level 2 schema update",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
config_path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
elif clean_level == "3":
if not group:
return {
"status": "error",
"message": "Group is required for level 3",
"message": "Group is required for level 3 schema update",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
# Config path depends on whether working_directory is set and valid
config_path = (
os.path.join(self.working_directory, "data.json")
if self.working_directory
and os.path.isdir(self.working_directory) # Check it's a directory
else None
)
if not config_path:
print(
f"Warning: Working directory not set or invalid ('{self.working_directory}'). Level 3 config file will not be cleaned."
)
else:
return {"status": "error", "message": "Invalid level"}
# Ensure directory exists
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
# Validate schema structure
if (
not isinstance(data, dict)
or "type" not in data
or "properties" not in data
):
data = {
"type": "object",
"properties": data if isinstance(data, dict) else {},
}
# Basic validation and normalization of the schema data being saved
if not isinstance(data, dict):
print(
f"Warning: Invalid schema data received (not a dict). Wrapping in default structure."
)
data = {"type": "object", "properties": {}} # Reset to default empty
if "type" not in data:
data["type"] = "object" # Ensure type exists
if "properties" not in data or not isinstance(data["properties"], dict):
print(
f"Warning: Invalid or missing 'properties' in schema data. Resetting properties."
)
data["properties"] = {} # Ensure properties exists and is a dict
# Write schema
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Schema successfully updated at {schema_path}")
# Clean corresponding config file
# Clean the corresponding config file *if* its path is valid
if config_path:
self._clean_config_for_schema(config_path, data)
else:
print(
f"Info: Config cleaning skipped for level {level} (no valid config path)."
)
return {"status": "success"}
except Exception as e:
print(f"Error updating schema: {str(e)}")
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Error updating schema at {error_path}: {str(e)}")
# Consider adding traceback here for debugging
print(traceback.format_exc())
return {"status": "error", "message": str(e)}
def _clean_config_for_schema(
self, config_path: str, schema: Dict[str, Any]
) -> None:
"""Clean configuration file to match schema structure."""
if not config_path or not os.path.exists(config_path):
# Check existence *before* trying to open
try:
if not os.path.exists(config_path):
print(
f"Info: Config file {config_path} not found for cleaning. Skipping."
)
return
try:
# Cargar configuración actual
config = {}
content = "" # Store original content for comparison
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
content = f.read()
if content.strip(): # Avoid error on empty file
config = json.loads(content)
else:
print(
f"Info: Config file {config_path} is empty. Cleaning will result in an empty object."
)
# Limpiar configuración recursivamente
cleaned_config = self._clean_object_against_schema(config, schema)
# Guardar configuración limpia
# Guardar configuración limpia solo si cambió o si el original estaba vacío
# (para evitar escrituras innecesarias)
# Use dumps for reliable comparison, handle potential errors during dumps
try:
original_config_str = json.dumps(config, sort_keys=True)
cleaned_config_str = json.dumps(cleaned_config, sort_keys=True)
except TypeError as te:
print(
f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save."
)
original_config_str = "" # Force inequality
cleaned_config_str = " " # Force inequality
if original_config_str != cleaned_config_str or not content.strip():
print(f"Info: Cleaning config file: {config_path}")
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
else:
print(
f"Info: Config file {config_path} already matches schema. No cleaning needed."
)
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean."
)
except IOError as e:
print(f"Error accessing config file {config_path} during cleaning: {e}")
except Exception as e:
print(f"Error cleaning config: {str(e)}")
print(f"Unexpected error cleaning config {config_path}: {str(e)}")
# Consider adding traceback here
print(traceback.format_exc())
def _clean_object_against_schema(
self, data: Dict[str, Any], schema: Dict[str, Any]
) -> Dict[str, Any]:
"""Recursively clean object to match schema structure."""
if not isinstance(data, dict) or not isinstance(schema, dict):
def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any:
"""Recursively clean data to match schema structure."""
# Ensure schema is a dictionary, otherwise cannot proceed
if not isinstance(schema, dict):
print(
f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}"
)
return data
schema_type = schema.get("type")
if schema_type == "object":
if not isinstance(data, dict):
# If data is not a dict, but schema expects object, return empty dict
return {}
# This 'result' and the loop should be inside the 'if schema_type == "object":' block
result = {}
schema_props = schema.get("properties", {})
# Ensure schema_props is a dictionary
if not isinstance(schema_props, dict):
print(
f"Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object."
)
return {}
for key, value in data.items():
# Solo mantener campos que existen en el schema
if key in schema_props:
# Recursively clean the value based on the property's schema
# Ensure the property schema itself is a dict before recursing
prop_schema = schema_props[key]
# Si es un objeto anidado, limpiar recursivamente
if prop_schema.get("type") == "object":
result[key] = self._clean_object_against_schema(value, prop_schema)
# Si es un enum, verificar que el valor sea válido
elif "enum" in prop_schema:
if value in prop_schema["enum"]:
result[key] = value
# Para otros tipos, mantener el valor
if isinstance(prop_schema, dict):
result[key] = self._clean_object_against_schema(
value, prop_schema
)
else:
result[key] = value
# If property schema is invalid, maybe keep original value or omit? Let's omit.
print(
f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data."
)
# Return result should be OUTSIDE the loop, but INSIDE the 'if object' block
return result
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
if level == "3" and not self.working_directory:
return {"status": "error", "message": "Working directory not set"}
elif schema_type == "array":
if not isinstance(data, list):
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
path = os.path.join(self.working_directory, "data.json")
# If data is not a list, but schema expects array, return empty list
return []
# If schema defines items structure, clean each item
items_schema = schema.get("items")
if isinstance(
items_schema, dict
): # Check if 'items' schema is a valid dict
return [
self._clean_object_against_schema(item, items_schema)
for item in data
]
else:
# If no valid item schema, return list as is (or potentially filter based on basic types if needed)
# Let's return as is for now.
return data # Keep array items as they are if no valid 'items' schema defined
with open(path, "w") as f:
json.dump(data, f, indent=2)
elif "enum" in schema:
# Ensure enum values are defined as a list
enum_values = schema.get("enum")
if isinstance(enum_values, list):
# If schema has enum, keep data only if it's one of the allowed values
if data in enum_values:
return data
else:
# If value not in enum, return None or potentially the default value if specified?
# For cleaning, returning None or omitting might be safer. Let's return None.
return None # Or consider returning schema.get('default') if cleaning should apply defaults too
else:
# Invalid enum definition, return original data or None? Let's return None.
print(
f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'."
)
return None
# For basic types (string, integer, number, boolean, null), just return the data
# We could add type checking here if strict cleaning is needed,
# e.g., return None if type(data) doesn't match schema_type
elif schema_type in ["string", "integer", "number", "boolean", "null"]:
# Optional: Add stricter type check if needed
# expected_type_map = { "string": str, "integer": int, "number": (int, float), "boolean": bool, "null": type(None) }
# expected_types = expected_type_map.get(schema_type)
# if expected_types and not isinstance(data, expected_types):
# print(f"Warning: Type mismatch during cleaning. Expected {schema_type}, got {type(data)}. Returning None.")
# return None # Or schema.get('default')
return data
# If schema type is unknown or not handled, return data as is
else:
# This case might indicate an issue with the schema definition itself
# print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.")
return data
# --- Script Listing and Execution Methods ---
def list_scripts(self, group: str) -> List[Dict[str, str]]:
"""List all scripts in a group with their descriptions."""
try:
@ -318,7 +695,7 @@ class ConfigurationManager:
if not os.path.exists(scripts_dir):
print(f"Directory not found: {scripts_dir}")
return []
return [] # Return empty list if group directory doesn't exist
for file in os.listdir(scripts_dir):
# Modificar la condición para incluir cualquier archivo .py
@ -326,15 +703,15 @@ class ConfigurationManager:
path = os.path.join(scripts_dir, file)
description = self._extract_script_description(path)
print(
f"Found script: {file} with description: {description}"
f"Debug: Found script: {file} with description: {description}"
) # Debug line
scripts.append({"name": file, "description": description})
print(f"Total scripts found: {len(scripts)}") # Debug line
print(f"Debug: Total scripts found in group '{group}': {len(scripts)}")
return scripts
except Exception as e:
print(f"Error listing scripts: {str(e)}") # Debug line
return []
print(f"Error listing scripts for group '{group}': {str(e)}")
return [] # Return empty list on error
def _extract_script_description(self, script_path: str) -> str:
"""Extract description from script's docstring or initial comments."""
@ -354,9 +731,7 @@ class ConfigurationManager:
return "No description available"
except Exception as e:
print(
f"Error extracting description from {script_path}: {str(e)}"
) # Debug line
print(f"Error extracting description from {script_path}: {str(e)}")
return "Error reading script description"
def execute_script(
@ -370,7 +745,9 @@ class ConfigurationManager:
time_since_last = current_time - self.last_execution_time
if time_since_last < self.min_execution_interval:
msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones"
if broadcast_fn: broadcast_fn(msg)
self.append_log(f"Warning: {msg}") # Log throttling attempt
if broadcast_fn:
broadcast_fn(msg)
return {"status": "throttled", "error": msg}
self.last_execution_time = current_time
@ -381,27 +758,38 @@ class ConfigurationManager:
script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt")
if not os.path.exists(script_path):
msg = f"Error: Script no encontrado en {script_path}"
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: Script no encontrado en {script_path}"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Script not found"}
# Get working directory specific to the group
working_dir = self.get_work_dir(group)
if not working_dir:
msg = f"Error: Directorio de trabajo no configurado para el grupo '{group}'"
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: Directorio de trabajo no configurado o inválido para el grupo '{group}'"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Working directory not set"}
# Double check validity (get_work_dir should already do this)
if not os.path.isdir(working_dir):
msg = f"Error: El directorio de trabajo '{working_dir}' no es válido o no existe."
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: El directorio de trabajo '{working_dir}' no es válido o no existe."
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Invalid working directory"}
# Aggregate configurations using the updated get_config
configs = {
"level1": self.get_config("1"),
"level2": self.get_config("2", group),
"level3": self.get_config("3", group), # get_config now handles working dir lookup
"level3": self.get_config(
"3", group
), # get_config uses self.working_directory
"working_directory": working_dir,
}
print(f"Debug: Aggregated configs for script execution: {configs}")
config_file_path = os.path.join(script_dir, "script_config.json")
try:
@ -410,8 +798,10 @@ class ConfigurationManager:
# Don't broadcast config saving unless debugging
# if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}")
except Exception as e:
msg = f"Error guardando configuraciones temporales: {str(e)}"
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: No se pudieron guardar las configuraciones temporales en {config_file_path}: {str(e)}"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
# Optionally return error here if config saving is critical
stdout_capture = []
@ -421,16 +811,18 @@ class ConfigurationManager:
try:
if broadcast_fn:
broadcast_fn(f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}...")
start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..."
broadcast_fn(start_msg)
# Execute the script
process = subprocess.Popen(
["python", "-u", script_path], # Added -u for unbuffered output
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace',
encoding="utf-8",
errors="replace",
bufsize=1,
env=dict(os.environ, PYTHONIOENCODING="utf-8"),
)
@ -466,7 +858,6 @@ class ConfigurationManager:
# Always include stderr in the final log if present
completion_msg += f" Se detectaron errores (ver log)."
if broadcast_fn:
broadcast_fn(completion_msg)
@ -479,7 +870,9 @@ class ConfigurationManager:
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Duración: {duration}\n")
log_f.write(f"Estado: {status.upper()} (Código de Salida: {return_code})\n")
log_f.write(
f"Estado: {status.upper()} (Código de Salida: {return_code})\n"
)
log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n")
log_f.write("\n".join(stdout_capture))
log_f.write("\n\n--- ERRORES (STDERR) ---\n")
@ -487,29 +880,39 @@ class ConfigurationManager:
log_f.write("\n--- FIN DEL LOG ---\n")
if broadcast_fn:
broadcast_fn(f"Log completo guardado en: {script_log_path}")
print(f"Info: Script log saved to {script_log_path}")
except Exception as log_e:
err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}"
print(err_msg)
if broadcast_fn: broadcast_fn(err_msg)
if broadcast_fn:
broadcast_fn(err_msg)
# ------------------------------------------
return {
"status": status,
"return_code": return_code,
"error": stderr_capture if stderr_capture else None,
"log_file": script_log_path # Return path to the specific log
"log_file": script_log_path, # Return path to the specific log
}
except Exception as e:
end_time = datetime.now()
duration = end_time - start_time
error_msg = f"Error inesperado durante la ejecución de {script_name}: {str(e)}"
traceback_info = traceback.format_exc() # Get traceback
error_msg = (
f"Error inesperado durante la ejecución de {script_name}: {str(e)}"
)
traceback_info = traceback.format_exc() # Get full traceback
print(error_msg) # Print to console as well
print(traceback_info)
self.append_log(
f"ERROR FATAL: {error_msg}\n{traceback_info}"
) # Log centrally
if broadcast_fn:
broadcast_fn(f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}")
# Ensure fatal errors are clearly marked in UI
broadcast_fn(
f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}"
)
# Attempt to write error to script-specific log
try:
@ -518,7 +921,9 @@ class ConfigurationManager:
log_f.write(f"Grupo: {group}\n")
log_f.write(f"Directorio de Trabajo: {working_dir}\n")
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n")
log_f.write(
f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n"
)
log_f.write(f"Duración: {duration}\n")
log_f.write(f"Estado: FATAL ERROR\n")
log_f.write("\n--- ERROR ---\n")
@ -527,8 +932,10 @@ class ConfigurationManager:
log_f.write(traceback_info) # Include traceback in log
log_f.write("\n--- FIN DEL LOG ---\n")
except Exception as log_e:
print(f"Error adicional al intentar guardar el log de error: {log_e}")
err_msg_log = (
f"Error adicional al intentar guardar el log de error: {log_e}"
)
print(err_msg_log)
return {"status": "error", "error": error_msg, "traceback": traceback_info}
finally:
@ -539,23 +946,6 @@ class ConfigurationManager:
if process and process.stdout:
process.stdout.close()
def get_work_dir(self, group: str) -> str:
"""Get working directory path for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida
if path and os.path.exists(path):
self.working_directory = path
return path
except (FileNotFoundError, json.JSONDecodeError):
return ""
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
"""Set working directory path for a script group and update history."""
# Normalizar el path recibido
@ -569,7 +959,7 @@ class ConfigurationManager:
try:
# Cargar datos existentes o crear nuevos
try:
with open(work_dir_path, "r") as f:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar paths existentes en el historial
if "history" in data:
@ -596,7 +986,7 @@ class ConfigurationManager:
data["history"] = data["history"][:10]
# Guardar datos actualizados
with open(work_dir_path, "w") as f:
with open(work_dir_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# Actualizar la variable de instancia
@ -605,22 +995,9 @@ class ConfigurationManager:
# Crear data.json en el directorio de trabajo si no existe
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w") as f:
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
return {"status": "success", "path": path}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [p for p in history if os.path.exists(p)]
except (FileNotFoundError, json.JSONDecodeError):
return []

View File

@ -0,0 +1,21 @@
[17:15:12] Iniciando ejecución de x1.py en C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS...
[17:15:14] Working directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
[17:15:14] Input directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
[17:15:14] Output directory: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs
[17:15:14] Cronologia file: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
[17:15:14] Attachments directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\adjuntos
[17:15:14] Beautify rules file: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\config\beautify_rules.json
[17:15:14] Found 1 .eml files
[17:15:14] Loaded 0 existing messages
[17:15:14] Processing C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS.eml
[17:15:14] Aplicando reglas de prioridad 1
[17:15:14] Aplicando reglas de prioridad 2
[17:15:14] Aplicando reglas de prioridad 3
[17:15:14] Aplicando reglas de prioridad 4
[17:15:14] Estadísticas de procesamiento:
[17:15:14] - Total mensajes encontrados: 1
[17:15:14] - Mensajes únicos añadidos: 1
[17:15:14] - Mensajes duplicados ignorados: 0
[17:15:14] Writing 1 messages to C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
[17:15:14] Ejecución de x1.py finalizada (success). Duración: 0:00:01.628641.
[17:15:14] Log completo guardado en: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\log_x1.txt

View File

@ -418,6 +418,12 @@ function createFieldEditor(key, field) {
class="w-full p-2 border rounded"
onchange="updateVisualSchema()">
</div>
<div>
<label class="block text-sm font-bold mb-2">Valor por Defecto</label>
<input type="text" value="${field.default !== undefined ? field.default : ''}"
class="w-full p-2 border rounded"
onchange="updateVisualSchema()">
</div>
</div>
${field.enum ? `
<div class="enum-container mt-4">
@ -494,28 +500,55 @@ function updateVisualSchema() {
const inputs = field.getElementsByTagName('input');
const select = field.getElementsByTagName('select')[0];
const key = inputs[0].value;
const fieldType = select.value; // string, directory, number, boolean, enum
const title = inputs[1].value;
const description = inputs[2].value;
const defaultValueInput = inputs[3]; // El nuevo input de valor por defecto
const defaultValueString = defaultValueInput.value;
let propertyDefinition = {
type: fieldType === 'directory' || fieldType === 'enum' ? 'string' : fieldType, // El tipo base
title: title,
description: description
};
// Añadir formato específico si es directorio
if (select.value === 'directory') {
schema.properties[key] = {
type: 'string',
format: 'directory',
title: inputs[1].value,
description: inputs[2].value
};
} else if (select.value === 'enum') {
schema.properties[key] = {
type: 'string',
title: inputs[1].value,
description: inputs[2].value,
enum: field.querySelector('textarea').value.split('\n').filter(v => v.trim())
};
} else {
schema.properties[key] = {
type: select.value,
title: inputs[1].value,
description: inputs[2].value
};
propertyDefinition.format = 'directory';
}
// Añadir enum si es de tipo enum
if (select.value === 'enum') {
propertyDefinition.enum = field.querySelector('textarea').value.split('\n').filter(v => v.trim());
}
// Procesar y añadir el valor por defecto si se proporcionó
if (defaultValueString !== null && defaultValueString.trim() !== '') {
let typedDefaultValue = defaultValueString;
try {
if (propertyDefinition.type === 'number' || propertyDefinition.type === 'integer') {
typedDefaultValue = Number(defaultValueString);
if (isNaN(typedDefaultValue)) {
console.warn(`Valor por defecto inválido para número en campo '${key}': ${defaultValueString}. Se omitirá.`);
// No añadir default si no es un número válido
} else {
// Opcional: truncar si el tipo es integer
if (propertyDefinition.type === 'integer' && !Number.isInteger(typedDefaultValue)) {
typedDefaultValue = Math.trunc(typedDefaultValue);
}
propertyDefinition.default = typedDefaultValue;
}
} else if (propertyDefinition.type === 'boolean') {
typedDefaultValue = ['true', '1', 'yes', 'on'].includes(defaultValueString.toLowerCase());
propertyDefinition.default = typedDefaultValue;
} else { // string, enum, directory
propertyDefinition.default = typedDefaultValue; // Ya es string
}
} catch (e) {
console.error(`Error procesando valor por defecto para campo '${key}':`, e);
}
}
schema.properties[key] = propertyDefinition;
});
const jsonEditor = document.getElementById('json-editor');