diff --git a/__pycache__/config_manager.cpython-312.pyc b/__pycache__/config_manager.cpython-312.pyc index 4a46e5d..648a9a6 100644 Binary files a/__pycache__/config_manager.cpython-312.pyc and b/__pycache__/config_manager.cpython-312.pyc differ diff --git a/app.py b/app.py index 3556f10..6b9280b 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,6 @@ from flask import Flask, render_template, request, jsonify, url_for from flask_sock import Sock -from config_manager import ConfigurationManager +from lib.config_manager import ConfigurationManager import os import json # Added import from datetime import datetime @@ -46,7 +46,7 @@ def broadcast_message(message): dead_connections = set() timestamp = datetime.now().strftime("[%H:%M:%S] ") - # Si es una lista de mensajes, procesar cada uno + # Normalize input to a list of messages if isinstance(message, list): messages = message else: @@ -54,32 +54,32 @@ def broadcast_message(message): messages = [line.strip() for line in message.splitlines() if line.strip()] # Procesar cada mensaje - for msg in messages: + for raw_msg in messages: # Limpiar timestamps duplicados al inicio del mensaje - while msg.startswith("[") and "]" in msg: + while raw_msg.startswith("[") and "]" in raw_msg: try: - closing_bracket = msg.index("]") + 1 - if msg[1 : closing_bracket - 1].replace(":", "").isdigit(): - msg = msg[closing_bracket:].strip() + closing_bracket = raw_msg.index("]") + 1 + if raw_msg[1 : closing_bracket - 1].replace(":", "").isdigit(): + raw_msg = raw_msg[closing_bracket:].strip() # Update raw_msg itself else: break except: break - # Añadir un único timestamp - formatted_msg = f"{timestamp}{msg}" + # Log the raw message using the config_manager's logger + # The logger will handle its own timestamping for the file. + config_manager.append_log(raw_msg) - # Escribir en el archivo de log - with open(config_manager.log_file, "a", encoding="utf-8") as f: - f.write(f"{formatted_msg}\n") + # Format message with timestamp *for WebSocket broadcast* + formatted_msg_for_ws = f"{timestamp}{raw_msg}" # Enviar a todos los clientes WebSocket for ws in list(websocket_connections): try: - if ws.connected: - ws.send(f"{formatted_msg}\n") + if ws.connected: # Check if ws is still connected before sending + ws.send(f"{formatted_msg_for_ws}\n") # Use the correct variable name here except Exception: - dead_connections.add(ws) + dead_connections.add(ws) # Collect dead connections # Limpiar conexiones muertas websocket_connections.difference_update(dead_connections) @@ -205,30 +205,19 @@ def handle_logs(): @app.route("/api/group-description/", methods=["GET", "POST"]) def handle_group_description(group): - description_path = os.path.join( - config_manager.script_groups_path, group, "description.json" - ) - if request.method == "GET": try: - with open(description_path, "r", encoding="utf-8") as f: - return jsonify(json.load(f)) - except FileNotFoundError: - return jsonify( - { - "name": group, - "description": "Sin descripción", - "version": "1.0", - "author": "Unknown", - } - ) + details = config_manager.get_group_details(group) + if "error" in details: + return jsonify(details), 404 # Group not found + return jsonify(details) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 else: # POST try: data = request.json - os.makedirs(os.path.dirname(description_path), exist_ok=True) - with open(description_path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - return jsonify({"status": "success"}) + result = config_manager.update_group_description(group, data) + return jsonify(result) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 diff --git a/backend/script_groups/ObtainIOFromProjectTia/data.json b/backend/script_groups/ObtainIOFromProjectTia/data.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/backend/script_groups/ObtainIOFromProjectTia/data.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/backend/script_groups/XML Parser to SCL/log_x1_to_json.txt b/backend/script_groups/XML Parser to SCL/log_x1_to_json.txt new file mode 100644 index 0000000..e28223b --- /dev/null +++ b/backend/script_groups/XML Parser to SCL/log_x1_to_json.txt @@ -0,0 +1,15 @@ +--- Log de Ejecución: x1_to_json.py --- +Grupo: XML Parser to SCL +Directorio de Trabajo: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport +Inicio: 2025-05-03 20:08:18 +Fin: 2025-05-03 20:08:22 +Duración: 0:00:03.850097 +Estado: SUCCESS (Código de Salida: 0) + +--- SALIDA ESTÁNDAR (STDOUT) --- +Por favor, selecciona el archivo XML de entrada... + +--- ERRORES (STDERR) --- +No se seleccionó ningún archivo. Saliendo. + +--- FIN DEL LOG --- diff --git a/backend/script_groups/example_group/log_x1.txt b/backend/script_groups/example_group/log_x1.txt index cad1585..193c09e 100644 --- a/backend/script_groups/example_group/log_x1.txt +++ b/backend/script_groups/example_group/log_x1.txt @@ -1,9 +1,9 @@ --- Log de Ejecución: x1.py --- Grupo: example_group Directorio de Trabajo: C:\Estudio -Inicio: 2025-05-03 17:26:22 -Fin: 2025-05-03 17:26:27 -Duración: 0:00:05.167151 +Inicio: 2025-05-03 20:54:12 +Fin: 2025-05-03 20:54:17 +Duración: 0:00:05.196719 Estado: SUCCESS (Código de Salida: 0) --- SALIDA ESTÁNDAR (STDOUT) --- diff --git a/backend/script_groups/example_group/log_x2.txt b/backend/script_groups/example_group/log_x2.txt index 8bbea37..e5ef7f7 100644 --- a/backend/script_groups/example_group/log_x2.txt +++ b/backend/script_groups/example_group/log_x2.txt @@ -1,9 +1,9 @@ --- Log de Ejecución: x2.py --- Grupo: example_group Directorio de Trabajo: C:\Estudio -Inicio: 2025-05-03 17:26:13 -Fin: 2025-05-03 17:26:14 -Duración: 0:00:01.149705 +Inicio: 2025-05-03 20:48:23 +Fin: 2025-05-03 20:48:27 +Duración: 0:00:03.208350 Estado: SUCCESS (Código de Salida: 0) --- SALIDA ESTÁNDAR (STDOUT) --- @@ -11,6 +11,10 @@ Estado: SUCCESS (Código de Salida: 0) Iniciando análisis de datos simulado... Analizando lote 1... +Lote 1 completado exitosamente +Analizando lote 2... +Lote 2 completado exitosamente +Analizando lote 3... ERROR: Error simulado en el procesamiento El proceso se detuvo debido a un error diff --git a/claude_file_organizer.py b/claude_file_organizer.py deleted file mode 100644 index 2889dae..0000000 --- a/claude_file_organizer.py +++ /dev/null @@ -1,171 +0,0 @@ -import os -import shutil -from pathlib import Path -import re - -class ClaudeProjectOrganizer: - def __init__(self): - self.source_dir = Path.cwd() - self.claude_dir = self.source_dir / 'claude' - self.file_mapping = {} - - def should_skip_directory(self, dir_name): - skip_dirs = {'.git', '__pycache__', 'venv', 'env', '.pytest_cache', '.vscode', 'claude'} - return dir_name in skip_dirs - - def get_comment_prefix(self, file_extension): - """Determina el prefijo de comentario según la extensión del archivo""" - comment_styles = { - '.py': '#', - '.js': '//', - '.css': '/*', - '.html': '', - } - return comment_suffixes.get(file_extension.lower(), '') - - def normalize_path(self, path_str: str) -> str: - """Normaliza la ruta usando forward slashes""" - return str(path_str).replace('\\', '/') - - def check_existing_path_comment(self, content: str, normalized_path: str, comment_prefix: str) -> bool: - """Verifica si ya existe un comentario con la ruta en el archivo""" - # Escapar caracteres especiales en el prefijo de comentario para regex - escaped_prefix = re.escape(comment_prefix) - - # Crear patrones para buscar tanto forward como backward slashes - forward_pattern = f"{escaped_prefix}\\s*{re.escape(normalized_path)}\\b" - backward_path = normalized_path.replace('/', '\\\\') # Doble backslash para el patrón - backward_pattern = f"{escaped_prefix}\\s*{re.escape(backward_path)}" - - # Buscar en las primeras líneas del archivo - first_lines = content.split('\n')[:5] - for line in first_lines: - if (re.search(forward_pattern, line) or - re.search(backward_pattern, line)): - return True - return False - - def add_path_comment(self, file_path: Path, content: str) -> str: - """Agrega un comentario con la ruta al inicio del archivo si no existe""" - relative_path = file_path.relative_to(self.source_dir) - normalized_path = self.normalize_path(relative_path) - comment_prefix = self.get_comment_prefix(file_path.suffix) - - if comment_prefix is None: - return content - - comment_suffix = self.get_comment_suffix(file_path.suffix) - - # Verificar si ya existe el comentario - if self.check_existing_path_comment(content, normalized_path, comment_prefix): - print(f" - Comentario de ruta ya existe en {file_path}") - return content - - path_comment = f"{comment_prefix} {normalized_path}{comment_suffix}\n" - - # Para archivos HTML, insertar después del doctype si existe - if file_path.suffix.lower() == '.html': - if content.lower().startswith('') + 1 - return content[:doctype_end] + '\n' + path_comment + content[doctype_end:] - - return path_comment + content - - def clean_claude_directory(self): - if self.claude_dir.exists(): - shutil.rmtree(self.claude_dir) - self.claude_dir.mkdir() - print(f"Directorio claude limpiado: {self.claude_dir}") - - def copy_files(self): - self.clean_claude_directory() - - for root, dirs, files in os.walk(self.source_dir): - dirs[:] = [d for d in dirs if not self.should_skip_directory(d)] - current_path = Path(root) - - for file in files: - file_path = current_path / file - - if file.endswith(('.py', '.js', '.css', '.html', '.json', '.yml', '.yaml', - '.tsx', '.ts', '.jsx', '.scss', '.less')): - target_path = self.claude_dir / file - - # Si el archivo ya existe en el directorio claude, agregar un sufijo numérico - if target_path.exists(): - base = target_path.stem - ext = target_path.suffix - counter = 1 - while target_path.exists(): - target_path = self.claude_dir / f"{base}_{counter}{ext}" - counter += 1 - - try: - # Leer el contenido del archivo - with open(file_path, 'r', encoding='utf-8') as f: - content = f.read() - - # Agregar el comentario con la ruta si no existe - modified_content = self.add_path_comment(file_path, content) - - # Escribir el nuevo contenido - with open(target_path, 'w', encoding='utf-8', newline='\n') as f: - f.write(modified_content) - - self.file_mapping[str(file_path)] = target_path.name - print(f"Copiado: {file_path} -> {target_path}") - - except UnicodeDecodeError: - print(f"Advertencia: No se pudo procesar {file_path} como texto. Copiando sin modificar...") - shutil.copy2(file_path, target_path) - except Exception as e: - print(f"Error procesando {file_path}: {str(e)}") - - def generate_tree_report(self): - """Genera el reporte en formato árbol visual""" - report = ["Estructura del proyecto original:\n"] - - def add_to_report(path, prefix="", is_last=True): - report.append(prefix + ("└── " if is_last else "├── ") + path.name) - - if path.is_dir() and not self.should_skip_directory(path.name): - children = sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)) - children = [c for c in children if not (c.is_dir() and self.should_skip_directory(c.name))] - - for i, child in enumerate(children): - is_last_child = i == len(children) - 1 - new_prefix = prefix + (" " if is_last else "│ ") - add_to_report(child, new_prefix, is_last_child) - - add_to_report(self.source_dir) - - report_path = self.claude_dir / "project_structure.txt" - with open(report_path, "w", encoding="utf-8") as f: - f.write("\n".join(report)) - print(f"\nReporte generado en: {report_path}") - -def main(): - try: - print("Iniciando organización de archivos para Claude...") - organizer = ClaudeProjectOrganizer() - organizer.copy_files() - organizer.generate_tree_report() - print("\n¡Proceso completado exitosamente!") - except Exception as e: - print(f"\nError durante la ejecución: {str(e)}") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/config_manager.py b/config_manager.py deleted file mode 100644 index 7e868e1..0000000 --- a/config_manager.py +++ /dev/null @@ -1,1010 +0,0 @@ -import os -import json -import subprocess -import re -import traceback -from typing import Dict, Any, List, Optional -import sys # Import sys to check the platform -import time # Add this import -from datetime import datetime # Add this import - - -# --- ConfigurationManager Class --- -class ConfigurationManager: - def __init__(self): - self.base_path = os.path.dirname(os.path.abspath(__file__)) - self.data_path = os.path.join(self.base_path, "data") - self.script_groups_path = os.path.join( - self.base_path, "backend", "script_groups" - ) - self.working_directory = None - self.log_file = os.path.join(self.data_path, "log.txt") - self._init_log_file() - self.last_execution_time = 0 # Add this attribute - # Minimum seconds between script executions to prevent rapid clicks - self.min_execution_interval = 1 # Minimum seconds between executions - - def _init_log_file(self): - """Initialize log file if it doesn't exist""" - if not os.path.exists(self.data_path): - os.makedirs(self.data_path) - if not os.path.exists(self.log_file): - with open(self.log_file, "w", encoding="utf-8") as f: - f.write("") - - # --- Logging Methods --- - def append_log(self, message: str) -> None: - """Append a message to the CENTRAL log file with timestamp.""" - # This function now primarily logs messages from the app itself, - # script output is handled separately in execute_script. - try: - timestamp = datetime.now().strftime("[%H:%M:%S] ") - lines = message.split("\n") - lines_with_timestamp = [] - for line in lines: - if line.strip(): - # Add timestamp only if line doesn't already have one (e.g., from script output) - if not line.strip().startswith("["): - line = f"{timestamp}{line}" - lines_with_timestamp.append(f"{line}\n") - - if lines_with_timestamp: - with open(self.log_file, "a", encoding="utf-8") as f: - f.writelines(lines_with_timestamp) - except Exception as e: - print(f"Error writing to central log file: {e}") - - def read_last_log_line(self) -> str: - """Read the last line from the log file.""" - try: - with open(self.log_file, "r", encoding="utf-8") as f: - # Leer las últimas líneas y encontrar la última no vacía - lines = f.readlines() - for line in reversed(lines): - if line.strip(): - return line - return "" - except Exception as e: - print(f"Error reading last log line: {e}") - return "" - - def read_log(self) -> str: - """Read the entire log file""" - try: - with open(self.log_file, "r", encoding="utf-8") as f: - return f.read() - except Exception as e: - print(f"Error reading log file: {e}") - return "" - - def clear_log(self) -> bool: - """Clear the log file""" - try: - with open(self.log_file, "w", encoding="utf-8") as f: - f.write("") - return True - except Exception as e: - print(f"Error clearing log file: {e}") - return False - - # --- Working Directory Methods --- - def set_working_directory(self, path: str) -> Dict[str, str]: - """Set and validate working directory.""" - if not os.path.exists(path): - return {"status": "error", "message": "Directory does not exist"} - - self.working_directory = path - - # Create default data.json if it doesn't exist - # This data.json will be populated with defaults by get_config later if needed - data_path = os.path.join(path, "data.json") - if not os.path.exists(data_path): - try: - with open(data_path, "w", encoding="utf-8") as f: - json.dump({}, f, indent=2) - print( - f"Info: Created empty data.json in working directory: {data_path}" - ) - except Exception as e: - print(f"Error creating data.json in working directory {path}: {e}") - # Non-fatal, get_config will handle missing file - - return {"status": "success", "path": path} - - def get_work_dir(self, group: str) -> Optional[str]: - """Get working directory path for a script group from work_dir.json.""" - work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") - try: - with open(work_dir_path, "r", encoding="utf-8") as f: - data = json.load(f) - path = data.get("path", "") - # Normalizar separadores de ruta - if path: - path = os.path.normpath(path) - # Actualizar la variable de instancia si hay una ruta válida y existe - if path and os.path.isdir(path): # Check if it's a directory - self.working_directory = path - return path - elif path: - print( - f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}" - ) - self.working_directory = None # Reset if invalid - return None - else: - self.working_directory = None # Reset if no path stored - return None - except (FileNotFoundError, json.JSONDecodeError): - self.working_directory = None # Reset if file missing or invalid - return None - except Exception as e: - print(f"Error reading work_dir.json for group '{group}': {e}") - self.working_directory = None - return None - - def get_directory_history(self, group: str) -> List[str]: - """Get the directory history for a script group.""" - work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") - try: - with open(work_dir_path, "r", encoding="utf-8") as f: - data = json.load(f) - # Normalizar todos los paths en el historial - history = [os.path.normpath(p) for p in data.get("history", [])] - # Filtrar solo directorios que existen - return [ - p for p in history if os.path.isdir(p) - ] # Check if directory exists - except (FileNotFoundError, json.JSONDecodeError): - return [] - - def get_script_groups(self) -> List[Dict[str, Any]]: - """Returns list of available script groups with their descriptions.""" - groups = [] - for d in os.listdir(self.script_groups_path): - group_path = os.path.join(self.script_groups_path, d) - if os.path.isdir(group_path): - description = self._get_group_description(group_path) - groups.append( - { - "id": d, - "name": description.get("name", d), - "description": description.get( - "description", "Sin descripción" - ), - "version": description.get("version", "1.0"), - "author": description.get("author", "Unknown"), - } - ) - return groups - - def _get_group_description(self, group_path: str) -> Dict[str, Any]: - """Get description for a script group.""" - description_file = os.path.join(group_path, "description.json") - try: - if os.path.exists(description_file): - with open(description_file, "r", encoding="utf-8") as f: - return json.load(f) - except Exception as e: - print(f"Error reading group description: {e}") - return {} - - # --- Configuration (data.json) Methods --- - def get_config(self, level: str, group: str = None) -> Dict[str, Any]: - """ - Get configuration for specified level. - Applies default values from the corresponding schema if the config - file doesn't exist or is missing keys with defaults. - """ - config_data = {} - needs_save = False - schema = None - data_path = None - schema_path_for_debug = "N/A" # For logging - - # 1. Determine data path based on level - if level == "1": - data_path = os.path.join(self.data_path, "data.json") - schema_path_for_debug = os.path.join(self.data_path, "esquema_general.json") - elif level == "2": - if not group: - return {"error": "Group required for level 2 config"} - data_path = os.path.join(self.script_groups_path, group, "data.json") - schema_path_for_debug = os.path.join( - self.script_groups_path, group, "esquema_group.json" - ) - elif level == "3": - # Level 3 config is always in the current working directory - if not self.working_directory: - return {} # Return empty config if working directory not set - data_path = os.path.join(self.working_directory, "data.json") - # Level 3 config might be based on level 3 schema (esquema_work.json) - if group: - schema_path_for_debug = os.path.join( - self.script_groups_path, group, "esquema_work.json" - ) - else: - # If no group, we can't determine the L3 schema for defaults. - schema_path_for_debug = "N/A (Level 3 without group)" - else: - return {"error": f"Invalid level specified for config: {level}"} - - # 2. Get the corresponding schema to check for defaults - try: - # Only attempt to load schema if needed (e.g., not L3 without group) - if not (level == "3" and not group): - schema = self.get_schema( - level, group - ) # Use the robust get_schema method - else: - schema = None # Cannot determine L3 schema without group - except Exception as e: - print( - f"Warning: Could not load schema for level {level}, group {group}. Defaults will not be applied. Error: {e}" - ) - schema = None # Ensure schema is None if loading failed - - # 3. Try to load existing data - data_file_exists = os.path.exists(data_path) - if data_file_exists: - try: - with open(data_path, "r", encoding="utf-8") as f_data: - content = f_data.read() - if content.strip(): - config_data = json.loads(content) - else: - print( - f"Warning: Data file {data_path} is empty. Will initialize with defaults." - ) - needs_save = True # Force save if file was empty - except json.JSONDecodeError: - print( - f"Warning: Could not decode JSON from {data_path}. Will initialize with defaults." - ) - config_data = {} - needs_save = True - except Exception as e: - print( - f"Error reading data from {data_path}: {e}. Will attempt to initialize with defaults." - ) - config_data = {} - needs_save = True - except FileNotFoundError: - print( - f"Info: Data file not found at {data_path}. Will initialize with defaults." - ) - needs_save = True # Mark for saving as it's a new file - - # 4. Apply defaults from schema if schema was loaded successfully - if schema and isinstance(schema, dict) and "properties" in schema: - schema_properties = schema.get("properties", {}) - if isinstance(schema_properties, dict): # Ensure properties is a dict - for key, prop_definition in schema_properties.items(): - # Ensure prop_definition is a dictionary before checking 'default' - if ( - isinstance(prop_definition, dict) - and key not in config_data - and "default" in prop_definition - ): - print( - f"Info: Applying default for '{key}' from schema {schema_path_for_debug}" - ) - config_data[key] = prop_definition["default"] - needs_save = ( - True # Mark for saving because a default was applied - ) - else: - print( - f"Warning: 'properties' in schema {schema_path_for_debug} is not a dictionary. Cannot apply defaults." - ) - - # 5. Save the file if it was created or updated with defaults - if needs_save and data_path: - try: - print(f"Info: Saving updated config data to: {data_path}") - os.makedirs(os.path.dirname(data_path), exist_ok=True) - with open(data_path, "w", encoding="utf-8") as f_data: - json.dump(config_data, f_data, indent=2, ensure_ascii=False) - except IOError as e: - print(f"Error: Could not write data file to {data_path}: {e}") - except Exception as e: - print(f"Unexpected error saving data to {data_path}: {e}") - - # 6. Return the final configuration - return config_data - - def update_config( - self, level: str, data: Dict[str, Any], group: str = None - ) -> Dict[str, str]: - """Update configuration for specified level.""" - path = None - if level == "1": - path = os.path.join(self.data_path, "data.json") - elif level == "2": - if not group: - return { - "status": "error", - "message": "Group required for level 2 config update", - } - path = os.path.join(self.script_groups_path, group, "data.json") - elif level == "3": - if not self.working_directory: - return { - "status": "error", - "message": "Working directory not set for level 3 config update", - } - path = os.path.join(self.working_directory, "data.json") - else: - return { - "status": "error", - "message": f"Invalid level for config update: {level}", - } - - try: - # Ensure directory exists - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - print(f"Info: Config successfully updated at {path}") - return {"status": "success"} - except Exception as e: - print(f"Error updating config at {path}: {str(e)}") - return {"status": "error", "message": str(e)} - - def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: - """Get schema for specified level.""" - schema_path = None - try: - # Clean level parameter - clean_level = str(level).split("-")[0] - - # Determine schema path based on level - if clean_level == "1": - schema_path = os.path.join(self.data_path, "esquema_general.json") - elif clean_level == "2": - if not group: - raise ValueError("Group is required for level 2 schema") - schema_path = os.path.join( - self.script_groups_path, group, "esquema_group.json" - ) - elif clean_level == "3": - if not group: - # Level 3 schema (esquema_work) is tied to a group. - # If no group, we can't know which schema to load. - print( - "Warning: Group needed to determine level 3 schema (esquema_work.json). Returning empty schema." - ) - return {"type": "object", "properties": {}} - schema_path = os.path.join( - self.script_groups_path, group, "esquema_work.json" - ) - else: - print( - f"Warning: Invalid level '{level}' for schema retrieval. Returning empty schema." - ) - return {"type": "object", "properties": {}} - - # Read existing schema or create default if it doesn't exist - if os.path.exists(schema_path): - try: - with open(schema_path, "r", encoding="utf-8") as f: - schema = json.load(f) - # Basic validation - if ( - not isinstance(schema, dict) - or "properties" not in schema - or "type" not in schema - ): - print( - f"Warning: Schema file {schema_path} has invalid structure. Returning default." - ) - return {"type": "object", "properties": {}} - # Ensure properties is a dict - if not isinstance(schema.get("properties"), dict): - print( - f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing." - ) - schema["properties"] = {} - return schema - except json.JSONDecodeError: - print( - f"Error: Could not decode JSON from schema file: {schema_path}. Returning default." - ) - return {"type": "object", "properties": {}} - except Exception as e: - print( - f"Error reading schema file {schema_path}: {e}. Returning default." - ) - return {"type": "object", "properties": {}} - else: - print( - f"Info: Schema file not found at {schema_path}. Creating default schema." - ) - default_schema = {"type": "object", "properties": {}} - try: - # Ensure directory exists before writing - os.makedirs(os.path.dirname(schema_path), exist_ok=True) - with open(schema_path, "w", encoding="utf-8") as f: - json.dump(default_schema, f, indent=2, ensure_ascii=False) - return default_schema - except Exception as e: - print(f"Error creating default schema file at {schema_path}: {e}") - return { - "type": "object", - "properties": {}, - } # Return empty if creation fails - - except ValueError as ve: # Catch specific errors like missing group - print(f"Error getting schema path: {ve}") - return {"type": "object", "properties": {}} - except Exception as e: - # Log the full path in case of unexpected errors - error_path = schema_path if schema_path else f"Level {level}, Group {group}" - print(f"Unexpected error loading schema from {error_path}: {str(e)}") - return {"type": "object", "properties": {}} - - def update_schema( - self, level: str, data: Dict[str, Any], group: str = None - ) -> Dict[str, str]: - """Update schema for specified level and clean corresponding config.""" - schema_path = None - config_path = None - try: - # Clean level parameter if it contains extra info like '-edit' - clean_level = str(level).split("-")[0] - - # Determinar rutas de schema y config - if clean_level == "1": - schema_path = os.path.join(self.data_path, "esquema_general.json") - config_path = os.path.join(self.data_path, "data.json") - elif clean_level == "2": - if not group: - return { - "status": "error", - "message": "Group is required for level 2 schema update", - } - schema_path = os.path.join( - self.script_groups_path, group, "esquema_group.json" - ) - config_path = os.path.join(self.script_groups_path, group, "data.json") - elif clean_level == "3": - if not group: - return { - "status": "error", - "message": "Group is required for level 3 schema update", - } - schema_path = os.path.join( - self.script_groups_path, group, "esquema_work.json" - ) - # Config path depends on whether working_directory is set and valid - config_path = ( - os.path.join(self.working_directory, "data.json") - if self.working_directory - and os.path.isdir(self.working_directory) # Check it's a directory - else None - ) - if not config_path: - print( - f"Warning: Working directory not set or invalid ('{self.working_directory}'). Level 3 config file will not be cleaned." - ) - else: - return {"status": "error", "message": "Invalid level"} - - # Ensure directory exists - os.makedirs(os.path.dirname(schema_path), exist_ok=True) - - # Basic validation and normalization of the schema data being saved - if not isinstance(data, dict): - print( - f"Warning: Invalid schema data received (not a dict). Wrapping in default structure." - ) - data = {"type": "object", "properties": {}} # Reset to default empty - if "type" not in data: - data["type"] = "object" # Ensure type exists - if "properties" not in data or not isinstance(data["properties"], dict): - print( - f"Warning: Invalid or missing 'properties' in schema data. Resetting properties." - ) - data["properties"] = {} # Ensure properties exists and is a dict - - # Write schema - with open(schema_path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - print(f"Info: Schema successfully updated at {schema_path}") - - # Clean the corresponding config file *if* its path is valid - if config_path: - self._clean_config_for_schema(config_path, data) - else: - print( - f"Info: Config cleaning skipped for level {level} (no valid config path)." - ) - - return {"status": "success"} - - except Exception as e: - error_path = schema_path if schema_path else f"Level {level}, Group {group}" - print(f"Error updating schema at {error_path}: {str(e)}") - # Consider adding traceback here for debugging - print(traceback.format_exc()) - return {"status": "error", "message": str(e)} - - def _clean_config_for_schema( - self, config_path: str, schema: Dict[str, Any] - ) -> None: - """Clean configuration file to match schema structure.""" - # Check existence *before* trying to open - try: - if not os.path.exists(config_path): - print( - f"Info: Config file {config_path} not found for cleaning. Skipping." - ) - return - - # Cargar configuración actual - config = {} - content = "" # Store original content for comparison - with open(config_path, "r", encoding="utf-8") as f: - content = f.read() - if content.strip(): # Avoid error on empty file - config = json.loads(content) - else: - print( - f"Info: Config file {config_path} is empty. Cleaning will result in an empty object." - ) - - # Limpiar configuración recursivamente - cleaned_config = self._clean_object_against_schema(config, schema) - - # Guardar configuración limpia solo si cambió o si el original estaba vacío - # (para evitar escrituras innecesarias) - # Use dumps for reliable comparison, handle potential errors during dumps - try: - original_config_str = json.dumps(config, sort_keys=True) - cleaned_config_str = json.dumps(cleaned_config, sort_keys=True) - except TypeError as te: - print( - f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save." - ) - original_config_str = "" # Force inequality - cleaned_config_str = " " # Force inequality - - if original_config_str != cleaned_config_str or not content.strip(): - print(f"Info: Cleaning config file: {config_path}") - with open(config_path, "w", encoding="utf-8") as f: - json.dump(cleaned_config, f, indent=2, ensure_ascii=False) - else: - print( - f"Info: Config file {config_path} already matches schema. No cleaning needed." - ) - - except json.JSONDecodeError: - print( - f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean." - ) - except IOError as e: - print(f"Error accessing config file {config_path} during cleaning: {e}") - except Exception as e: - print(f"Unexpected error cleaning config {config_path}: {str(e)}") - # Consider adding traceback here - print(traceback.format_exc()) - - def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any: - """Recursively clean data to match schema structure.""" - # Ensure schema is a dictionary, otherwise cannot proceed - if not isinstance(schema, dict): - print( - f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}" - ) - return data - - schema_type = schema.get("type") - - if schema_type == "object": - if not isinstance(data, dict): - # If data is not a dict, but schema expects object, return empty dict - return {} - - # This 'result' and the loop should be inside the 'if schema_type == "object":' block - result = {} - schema_props = schema.get("properties", {}) - # Ensure schema_props is a dictionary - if not isinstance(schema_props, dict): - print( - f"Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object." - ) - return {} - - for key, value in data.items(): - # Solo mantener campos que existen en el schema - if key in schema_props: - # Recursively clean the value based on the property's schema - # Ensure the property schema itself is a dict before recursing - prop_schema = schema_props[key] - if isinstance(prop_schema, dict): - result[key] = self._clean_object_against_schema( - value, prop_schema - ) - else: - # If property schema is invalid, maybe keep original value or omit? Let's omit. - print( - f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data." - ) - # Return result should be OUTSIDE the loop, but INSIDE the 'if object' block - return result - - elif schema_type == "array": - if not isinstance(data, list): - - # If data is not a list, but schema expects array, return empty list - return [] - # If schema defines items structure, clean each item - items_schema = schema.get("items") - if isinstance( - items_schema, dict - ): # Check if 'items' schema is a valid dict - return [ - self._clean_object_against_schema(item, items_schema) - for item in data - ] - else: - # If no valid item schema, return list as is (or potentially filter based on basic types if needed) - # Let's return as is for now. - return data # Keep array items as they are if no valid 'items' schema defined - - elif "enum" in schema: - # Ensure enum values are defined as a list - enum_values = schema.get("enum") - if isinstance(enum_values, list): - # If schema has enum, keep data only if it's one of the allowed values - if data in enum_values: - return data - else: - # If value not in enum, return None or potentially the default value if specified? - # For cleaning, returning None or omitting might be safer. Let's return None. - return None # Or consider returning schema.get('default') if cleaning should apply defaults too - else: - # Invalid enum definition, return original data or None? Let's return None. - print( - f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'." - ) - return None - - # For basic types (string, integer, number, boolean, null), just return the data - # We could add type checking here if strict cleaning is needed, - # e.g., return None if type(data) doesn't match schema_type - elif schema_type in ["string", "integer", "number", "boolean", "null"]: - # Optional: Add stricter type check if needed - # expected_type_map = { "string": str, "integer": int, "number": (int, float), "boolean": bool, "null": type(None) } - # expected_types = expected_type_map.get(schema_type) - # if expected_types and not isinstance(data, expected_types): - # print(f"Warning: Type mismatch during cleaning. Expected {schema_type}, got {type(data)}. Returning None.") - # return None # Or schema.get('default') - return data - - # If schema type is unknown or not handled, return data as is - else: - # This case might indicate an issue with the schema definition itself - # print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.") - return data - - # --- Script Listing and Execution Methods --- - def list_scripts(self, group: str) -> List[Dict[str, str]]: - """List all scripts in a group with their descriptions.""" - try: - scripts_dir = os.path.join(self.script_groups_path, group) - scripts = [] - - if not os.path.exists(scripts_dir): - print(f"Directory not found: {scripts_dir}") - return [] # Return empty list if group directory doesn't exist - - for file in os.listdir(scripts_dir): - # Modificar la condición para incluir cualquier archivo .py - if file.endswith(".py"): - path = os.path.join(scripts_dir, file) - description = self._extract_script_description(path) - print( - f"Debug: Found script: {file} with description: {description}" - ) # Debug line - scripts.append({"name": file, "description": description}) - - print(f"Debug: Total scripts found in group '{group}': {len(scripts)}") - return scripts - except Exception as e: - print(f"Error listing scripts for group '{group}': {str(e)}") - return [] # Return empty list on error - - def _extract_script_description(self, script_path: str) -> str: - """Extract description from script's docstring or initial comments.""" - try: - with open(script_path, "r", encoding="utf-8") as f: - content = f.read() - - # Try to find docstring - docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL) - if docstring_match: - return docstring_match.group(1).strip() - - # Try to find initial comment - comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE) - if comment_match: - return comment_match.group(1).strip() - - return "No description available" - except Exception as e: - print(f"Error extracting description from {script_path}: {str(e)}") - return "Error reading script description" - - def execute_script( - self, group: str, script_name: str, broadcast_fn=None - ) -> Dict[str, Any]: - """ - Execute script, broadcast output in real-time, and save final log - to a script-specific file in the script's directory. - """ - current_time = time.time() - time_since_last = current_time - self.last_execution_time - if time_since_last < self.min_execution_interval: - msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones" - self.append_log(f"Warning: {msg}") # Log throttling attempt - if broadcast_fn: - broadcast_fn(msg) - return {"status": "throttled", "error": msg} - - self.last_execution_time = current_time - script_path = os.path.join(self.script_groups_path, group, script_name) - script_dir = os.path.dirname(script_path) - script_base_name = os.path.splitext(script_name)[0] - # Define script-specific log file path - script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt") - - if not os.path.exists(script_path): - msg = f"Error Fatal: Script no encontrado en {script_path}" - self.append_log(msg) - if broadcast_fn: - broadcast_fn(msg) - return {"status": "error", "error": "Script not found"} - - # Get working directory specific to the group - working_dir = self.get_work_dir(group) - if not working_dir: - msg = f"Error Fatal: Directorio de trabajo no configurado o inválido para el grupo '{group}'" - self.append_log(msg) - if broadcast_fn: - broadcast_fn(msg) - return {"status": "error", "error": "Working directory not set"} - # Double check validity (get_work_dir should already do this) - if not os.path.isdir(working_dir): - msg = f"Error Fatal: El directorio de trabajo '{working_dir}' no es válido o no existe." - self.append_log(msg) - if broadcast_fn: - broadcast_fn(msg) - return {"status": "error", "error": "Invalid working directory"} - - # Aggregate configurations using the updated get_config - configs = { - "level1": self.get_config("1"), - "level2": self.get_config("2", group), - "level3": self.get_config( - "3", group - ), # get_config uses self.working_directory - "working_directory": working_dir, - } - print(f"Debug: Aggregated configs for script execution: {configs}") - - config_file_path = os.path.join(script_dir, "script_config.json") - try: - with open(config_file_path, "w", encoding="utf-8") as f: - json.dump(configs, f, indent=2, ensure_ascii=False) - # Don't broadcast config saving unless debugging - # if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}") - except Exception as e: - msg = f"Error Fatal: No se pudieron guardar las configuraciones temporales en {config_file_path}: {str(e)}" - self.append_log(msg) - if broadcast_fn: - broadcast_fn(msg) - # Optionally return error here if config saving is critical - - stdout_capture = [] - stderr_capture = "" - process = None - start_time = datetime.now() - - try: - if broadcast_fn: - start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..." - broadcast_fn(start_msg) - - # Determine creation flags for subprocess based on OS - creation_flags = 0 - if sys.platform == "win32": - creation_flags = subprocess.CREATE_NO_WINDOW - - # Execute the script - process = subprocess.Popen( - ["python", "-u", script_path], # Added -u for unbuffered output - cwd=working_dir, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - encoding="utf-8", - errors="replace", - bufsize=1, - env=dict(os.environ, PYTHONIOENCODING="utf-8"), - creationflags=creation_flags, # Add this line - ) - - # Real-time stdout reading and broadcasting - while True: - line = process.stdout.readline() - if not line and process.poll() is not None: - break - if line: - cleaned_line = line.rstrip() - stdout_capture.append(cleaned_line) # Store line for final log - if broadcast_fn: - broadcast_fn(cleaned_line) # Broadcast in real-time - - # Wait for process to finish and get return code - return_code = process.wait() - end_time = datetime.now() - duration = end_time - start_time - - # Capture any remaining stderr - stderr_capture = process.stderr.read() - - status = "success" if return_code == 0 else "error" - completion_msg = f"[{end_time.strftime('%H:%M:%S')}] Ejecución de {script_name} finalizada ({status}). Duración: {duration}." - - if stderr_capture: - # Broadcast stderr only if there was an error potentially - if status == "error" and broadcast_fn: - broadcast_fn(f"--- ERRORES ---") - broadcast_fn(stderr_capture.strip()) - broadcast_fn(f"--- FIN ERRORES ---") - # Always include stderr in the final log if present - completion_msg += f" Se detectaron errores (ver log)." - - if broadcast_fn: - broadcast_fn(completion_msg) - - # --- Write to script-specific log file --- - try: - with open(script_log_path, "w", encoding="utf-8") as log_f: - log_f.write(f"--- Log de Ejecución: {script_name} ---\n") - log_f.write(f"Grupo: {group}\n") - log_f.write(f"Directorio de Trabajo: {working_dir}\n") - log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") - log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") - log_f.write(f"Duración: {duration}\n") - log_f.write( - f"Estado: {status.upper()} (Código de Salida: {return_code})\n" - ) - log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n") - log_f.write("\n".join(stdout_capture)) - log_f.write("\n\n--- ERRORES (STDERR) ---\n") - log_f.write(stderr_capture if stderr_capture else "Ninguno") - log_f.write("\n--- FIN DEL LOG ---\n") - if broadcast_fn: - broadcast_fn(f"Log completo guardado en: {script_log_path}") - print(f"Info: Script log saved to {script_log_path}") - except Exception as log_e: - err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}" - print(err_msg) - if broadcast_fn: - broadcast_fn(err_msg) - # ------------------------------------------ - - return { - "status": status, - "return_code": return_code, - "error": stderr_capture if stderr_capture else None, - "log_file": script_log_path, # Return path to the specific log - } - - except Exception as e: - end_time = datetime.now() - duration = end_time - start_time - error_msg = ( - f"Error inesperado durante la ejecución de {script_name}: {str(e)}" - ) - traceback_info = traceback.format_exc() # Get full traceback - print(error_msg) # Print to console as well - print(traceback_info) - self.append_log( - f"ERROR FATAL: {error_msg}\n{traceback_info}" - ) # Log centrally - - if broadcast_fn: - # Ensure fatal errors are clearly marked in UI - broadcast_fn( - f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}" - ) - - # Attempt to write error to script-specific log - try: - with open(script_log_path, "w", encoding="utf-8") as log_f: - log_f.write(f"--- Log de Ejecución: {script_name} ---\n") - log_f.write(f"Grupo: {group}\n") - log_f.write(f"Directorio de Trabajo: {working_dir}\n") - log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") - log_f.write( - f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n" - ) - log_f.write(f"Duración: {duration}\n") - log_f.write(f"Estado: FATAL ERROR\n") - log_f.write("\n--- ERROR ---\n") - log_f.write(error_msg + "\n") - log_f.write("\n--- TRACEBACK ---\n") - log_f.write(traceback_info) # Include traceback in log - log_f.write("\n--- FIN DEL LOG ---\n") - except Exception as log_e: - err_msg_log = ( - f"Error adicional al intentar guardar el log de error: {log_e}" - ) - print(err_msg_log) - - return {"status": "error", "error": error_msg, "traceback": traceback_info} - finally: - # Ensure stderr pipe is closed if process exists - if process and process.stderr: - process.stderr.close() - # Ensure stdout pipe is closed if process exists - if process and process.stdout: - process.stdout.close() - - def set_work_dir(self, group: str, path: str) -> Dict[str, str]: - """Set working directory path for a script group and update history.""" - # Normalizar el path recibido - path = os.path.normpath(path) - - if not os.path.exists(path): - return {"status": "error", "message": "Directory does not exist"} - - work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") - - try: - # Cargar datos existentes o crear nuevos - try: - with open(work_dir_path, "r", encoding="utf-8") as f: - data = json.load(f) - # Normalizar paths existentes en el historial - if "history" in data: - data["history"] = [os.path.normpath(p) for p in data["history"]] - except (FileNotFoundError, json.JSONDecodeError): - data = {"path": "", "history": []} - - # Actualizar path actual - data["path"] = path - - # Actualizar historial - if "history" not in data: - data["history"] = [] - - # Eliminar la ruta del historial si ya existe (usando path normalizado) - data["history"] = [ - p for p in data["history"] if os.path.normpath(p) != path - ] - - # Agregar la ruta al principio del historial - data["history"].insert(0, path) - - # Mantener solo los últimos 10 directorios - data["history"] = data["history"][:10] - - # Guardar datos actualizados - with open(work_dir_path, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2) - - # Actualizar la variable de instancia - self.working_directory = path - - # Crear data.json en el directorio de trabajo si no existe - data_path = os.path.join(path, "data.json") - if not os.path.exists(data_path): - with open(data_path, "w", encoding="utf-8") as f: - json.dump({}, f, indent=2) - - return {"status": "success", "path": path} - except Exception as e: - return {"status": "error", "message": str(e)} diff --git a/data/esquema.json b/data/esquema.json deleted file mode 100644 index 5d04917..0000000 --- a/data/esquema.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "type": "object", - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "description": "Tu clave de API para servicios externos" - }, - "model": { - "type": "string", - "title": "Modelo LLM", - "description": "Modelo de lenguaje a utilizar", - "enum": [ - "gpt-3.5-turbo", - "gpt-4", - "claude-v1" - ] - } - } -} \ No newline at end of file diff --git a/data/esquema_general.json b/data/esquema_general.json index 1c9e43a..5d04917 100644 --- a/data/esquema_general.json +++ b/data/esquema_general.json @@ -1,4 +1,20 @@ { "type": "object", - "properties": {} + "properties": { + "api_key": { + "type": "string", + "title": "API Key", + "description": "Tu clave de API para servicios externos" + }, + "model": { + "type": "string", + "title": "Modelo LLM", + "description": "Modelo de lenguaje a utilizar", + "enum": [ + "gpt-3.5-turbo", + "gpt-4", + "claude-v1" + ] + } + } } \ No newline at end of file diff --git a/data/log.txt b/data/log.txt index 15570d3..4514b26 100644 --- a/data/log.txt +++ b/data/log.txt @@ -1,22 +1,22 @@ -[17:26:22] Iniciando ejecución de x1.py en C:\Estudio... -[17:26:22] === Ejecutando Script de Prueba 1 === -[17:26:22] Configuraciones cargadas: -[17:26:22] Nivel 1: { -[17:26:22] "api_key": "your-api-key-here", -[17:26:22] "model": "gpt-3.5-turbo" -[17:26:22] } -[17:26:22] Nivel 2: { -[17:26:22] "input_dir": "D:/Datos/Entrada", -[17:26:22] "output_dir": "D:/Datos/Salida", -[17:26:22] "batch_size": 50 -[17:26:22] } -[17:26:22] Nivel 3: {} -[17:26:22] Simulando procesamiento... -[17:26:23] Progreso: 20% -[17:26:24] Progreso: 40% -[17:26:25] Progreso: 60% -[17:26:26] Progreso: 80% -[17:26:27] Progreso: 100% -[17:26:27] ¡Proceso completado! -[17:26:27] Ejecución de x1.py finalizada (success). Duración: 0:00:05.167151. -[17:26:27] Log completo guardado en: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\example_group\log_x1.txt +[20:54:12] Iniciando ejecución de x1.py en C:\Estudio... +[20:54:12] === Ejecutando Script de Prueba 1 === +[20:54:12] Configuraciones cargadas: +[20:54:12] Nivel 1: { +[20:54:12] "api_key": "your-api-key-here", +[20:54:12] "model": "gpt-3.5-turbo" +[20:54:12] } +[20:54:12] Nivel 2: { +[20:54:12] "input_dir": "D:/Datos/Entrada", +[20:54:12] "output_dir": "D:/Datos/Salida", +[20:54:12] "batch_size": 50 +[20:54:12] } +[20:54:12] Nivel 3: {} +[20:54:12] Simulando procesamiento... +[20:54:13] Progreso: 20% +[20:54:14] Progreso: 40% +[20:54:15] Progreso: 60% +[20:54:16] Progreso: 80% +[20:54:17] Progreso: 100% +[20:54:17] ¡Proceso completado! +[20:54:17] Ejecución de x1.py finalizada (success). Duración: 0:00:05.196719. +[20:54:17] Log completo guardado en: d:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\example_group\log_x1.txt