ParamManagerScripts/config_manager.py

1011 lines
45 KiB
Python

import os
import json
import subprocess
import re
import traceback
from typing import Dict, Any, List, Optional
import sys # Import sys to check the platform
import time # Add this import
from datetime import datetime # Add this import
# --- ConfigurationManager Class ---
class ConfigurationManager:
def __init__(self):
self.base_path = os.path.dirname(os.path.abspath(__file__))
self.data_path = os.path.join(self.base_path, "data")
self.script_groups_path = os.path.join(
self.base_path, "backend", "script_groups"
)
self.working_directory = None
self.log_file = os.path.join(self.data_path, "log.txt")
self._init_log_file()
self.last_execution_time = 0 # Add this attribute
# Minimum seconds between script executions to prevent rapid clicks
self.min_execution_interval = 1 # Minimum seconds between executions
def _init_log_file(self):
"""Initialize log file if it doesn't exist"""
if not os.path.exists(self.data_path):
os.makedirs(self.data_path)
if not os.path.exists(self.log_file):
with open(self.log_file, "w", encoding="utf-8") as f:
f.write("")
# --- Logging Methods ---
def append_log(self, message: str) -> None:
"""Append a message to the CENTRAL log file with timestamp."""
# This function now primarily logs messages from the app itself,
# script output is handled separately in execute_script.
try:
timestamp = datetime.now().strftime("[%H:%M:%S] ")
lines = message.split("\n")
lines_with_timestamp = []
for line in lines:
if line.strip():
# Add timestamp only if line doesn't already have one (e.g., from script output)
if not line.strip().startswith("["):
line = f"{timestamp}{line}"
lines_with_timestamp.append(f"{line}\n")
if lines_with_timestamp:
with open(self.log_file, "a", encoding="utf-8") as f:
f.writelines(lines_with_timestamp)
except Exception as e:
print(f"Error writing to central log file: {e}")
def read_last_log_line(self) -> str:
"""Read the last line from the log file."""
try:
with open(self.log_file, "r", encoding="utf-8") as f:
# Leer las últimas líneas y encontrar la última no vacía
lines = f.readlines()
for line in reversed(lines):
if line.strip():
return line
return ""
except Exception as e:
print(f"Error reading last log line: {e}")
return ""
def read_log(self) -> str:
"""Read the entire log file"""
try:
with open(self.log_file, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"Error reading log file: {e}")
return ""
def clear_log(self) -> bool:
"""Clear the log file"""
try:
with open(self.log_file, "w", encoding="utf-8") as f:
f.write("")
return True
except Exception as e:
print(f"Error clearing log file: {e}")
return False
# --- Working Directory Methods ---
def set_working_directory(self, path: str) -> Dict[str, str]:
"""Set and validate working directory."""
if not os.path.exists(path):
return {"status": "error", "message": "Directory does not exist"}
self.working_directory = path
# Create default data.json if it doesn't exist
# This data.json will be populated with defaults by get_config later if needed
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
try:
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
print(
f"Info: Created empty data.json in working directory: {data_path}"
)
except Exception as e:
print(f"Error creating data.json in working directory {path}: {e}")
# Non-fatal, get_config will handle missing file
return {"status": "success", "path": path}
def get_work_dir(self, group: str) -> Optional[str]:
"""Get working directory path for a script group from work_dir.json."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida y existe
if path and os.path.isdir(path): # Check if it's a directory
self.working_directory = path
return path
elif path:
print(
f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}"
)
self.working_directory = None # Reset if invalid
return None
else:
self.working_directory = None # Reset if no path stored
return None
except (FileNotFoundError, json.JSONDecodeError):
self.working_directory = None # Reset if file missing or invalid
return None
except Exception as e:
print(f"Error reading work_dir.json for group '{group}': {e}")
self.working_directory = None
return None
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [
p for p in history if os.path.isdir(p)
] # Check if directory exists
except (FileNotFoundError, json.JSONDecodeError):
return []
def get_script_groups(self) -> List[Dict[str, Any]]:
"""Returns list of available script groups with their descriptions."""
groups = []
for d in os.listdir(self.script_groups_path):
group_path = os.path.join(self.script_groups_path, d)
if os.path.isdir(group_path):
description = self._get_group_description(group_path)
groups.append(
{
"id": d,
"name": description.get("name", d),
"description": description.get(
"description", "Sin descripción"
),
"version": description.get("version", "1.0"),
"author": description.get("author", "Unknown"),
}
)
return groups
def _get_group_description(self, group_path: str) -> Dict[str, Any]:
"""Get description for a script group."""
description_file = os.path.join(group_path, "description.json")
try:
if os.path.exists(description_file):
with open(description_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"Error reading group description: {e}")
return {}
# --- Configuration (data.json) Methods ---
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
"""
Get configuration for specified level.
Applies default values from the corresponding schema if the config
file doesn't exist or is missing keys with defaults.
"""
config_data = {}
needs_save = False
schema = None
data_path = None
schema_path_for_debug = "N/A" # For logging
# 1. Determine data path based on level
if level == "1":
data_path = os.path.join(self.data_path, "data.json")
schema_path_for_debug = os.path.join(self.data_path, "esquema_general.json")
elif level == "2":
if not group:
return {"error": "Group required for level 2 config"}
data_path = os.path.join(self.script_groups_path, group, "data.json")
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif level == "3":
# Level 3 config is always in the current working directory
if not self.working_directory:
return {} # Return empty config if working directory not set
data_path = os.path.join(self.working_directory, "data.json")
# Level 3 config might be based on level 3 schema (esquema_work.json)
if group:
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
# If no group, we can't determine the L3 schema for defaults.
schema_path_for_debug = "N/A (Level 3 without group)"
else:
return {"error": f"Invalid level specified for config: {level}"}
# 2. Get the corresponding schema to check for defaults
try:
# Only attempt to load schema if needed (e.g., not L3 without group)
if not (level == "3" and not group):
schema = self.get_schema(
level, group
) # Use the robust get_schema method
else:
schema = None # Cannot determine L3 schema without group
except Exception as e:
print(
f"Warning: Could not load schema for level {level}, group {group}. Defaults will not be applied. Error: {e}"
)
schema = None # Ensure schema is None if loading failed
# 3. Try to load existing data
data_file_exists = os.path.exists(data_path)
if data_file_exists:
try:
with open(data_path, "r", encoding="utf-8") as f_data:
content = f_data.read()
if content.strip():
config_data = json.loads(content)
else:
print(
f"Warning: Data file {data_path} is empty. Will initialize with defaults."
)
needs_save = True # Force save if file was empty
except json.JSONDecodeError:
print(
f"Warning: Could not decode JSON from {data_path}. Will initialize with defaults."
)
config_data = {}
needs_save = True
except Exception as e:
print(
f"Error reading data from {data_path}: {e}. Will attempt to initialize with defaults."
)
config_data = {}
needs_save = True
except FileNotFoundError:
print(
f"Info: Data file not found at {data_path}. Will initialize with defaults."
)
needs_save = True # Mark for saving as it's a new file
# 4. Apply defaults from schema if schema was loaded successfully
if schema and isinstance(schema, dict) and "properties" in schema:
schema_properties = schema.get("properties", {})
if isinstance(schema_properties, dict): # Ensure properties is a dict
for key, prop_definition in schema_properties.items():
# Ensure prop_definition is a dictionary before checking 'default'
if (
isinstance(prop_definition, dict)
and key not in config_data
and "default" in prop_definition
):
print(
f"Info: Applying default for '{key}' from schema {schema_path_for_debug}"
)
config_data[key] = prop_definition["default"]
needs_save = (
True # Mark for saving because a default was applied
)
else:
print(
f"Warning: 'properties' in schema {schema_path_for_debug} is not a dictionary. Cannot apply defaults."
)
# 5. Save the file if it was created or updated with defaults
if needs_save and data_path:
try:
print(f"Info: Saving updated config data to: {data_path}")
os.makedirs(os.path.dirname(data_path), exist_ok=True)
with open(data_path, "w", encoding="utf-8") as f_data:
json.dump(config_data, f_data, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error: Could not write data file to {data_path}: {e}")
except Exception as e:
print(f"Unexpected error saving data to {data_path}: {e}")
# 6. Return the final configuration
return config_data
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
path = None
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
if not group:
return {
"status": "error",
"message": "Group required for level 2 config update",
}
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not self.working_directory:
return {
"status": "error",
"message": "Working directory not set for level 3 config update",
}
path = os.path.join(self.working_directory, "data.json")
else:
return {
"status": "error",
"message": f"Invalid level for config update: {level}",
}
try:
# Ensure directory exists
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Config successfully updated at {path}")
return {"status": "success"}
except Exception as e:
print(f"Error updating config at {path}: {str(e)}")
return {"status": "error", "message": str(e)}
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
schema_path = None
try:
# Clean level parameter
clean_level = str(level).split("-")[0]
# Determine schema path based on level
if clean_level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json")
elif clean_level == "2":
if not group:
raise ValueError("Group is required for level 2 schema")
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif clean_level == "3":
if not group:
# Level 3 schema (esquema_work) is tied to a group.
# If no group, we can't know which schema to load.
print(
"Warning: Group needed to determine level 3 schema (esquema_work.json). Returning empty schema."
)
return {"type": "object", "properties": {}}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
print(
f"Warning: Invalid level '{level}' for schema retrieval. Returning empty schema."
)
return {"type": "object", "properties": {}}
# Read existing schema or create default if it doesn't exist
if os.path.exists(schema_path):
try:
with open(schema_path, "r", encoding="utf-8") as f:
schema = json.load(f)
# Basic validation
if (
not isinstance(schema, dict)
or "properties" not in schema
or "type" not in schema
):
print(
f"Warning: Schema file {schema_path} has invalid structure. Returning default."
)
return {"type": "object", "properties": {}}
# Ensure properties is a dict
if not isinstance(schema.get("properties"), dict):
print(
f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing."
)
schema["properties"] = {}
return schema
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from schema file: {schema_path}. Returning default."
)
return {"type": "object", "properties": {}}
except Exception as e:
print(
f"Error reading schema file {schema_path}: {e}. Returning default."
)
return {"type": "object", "properties": {}}
else:
print(
f"Info: Schema file not found at {schema_path}. Creating default schema."
)
default_schema = {"type": "object", "properties": {}}
try:
# Ensure directory exists before writing
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2, ensure_ascii=False)
return default_schema
except Exception as e:
print(f"Error creating default schema file at {schema_path}: {e}")
return {
"type": "object",
"properties": {},
} # Return empty if creation fails
except ValueError as ve: # Catch specific errors like missing group
print(f"Error getting schema path: {ve}")
return {"type": "object", "properties": {}}
except Exception as e:
# Log the full path in case of unexpected errors
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Unexpected error loading schema from {error_path}: {str(e)}")
return {"type": "object", "properties": {}}
def update_schema(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update schema for specified level and clean corresponding config."""
schema_path = None
config_path = None
try:
# Clean level parameter if it contains extra info like '-edit'
clean_level = str(level).split("-")[0]
# Determinar rutas de schema y config
if clean_level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json")
config_path = os.path.join(self.data_path, "data.json")
elif clean_level == "2":
if not group:
return {
"status": "error",
"message": "Group is required for level 2 schema update",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
config_path = os.path.join(self.script_groups_path, group, "data.json")
elif clean_level == "3":
if not group:
return {
"status": "error",
"message": "Group is required for level 3 schema update",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
# Config path depends on whether working_directory is set and valid
config_path = (
os.path.join(self.working_directory, "data.json")
if self.working_directory
and os.path.isdir(self.working_directory) # Check it's a directory
else None
)
if not config_path:
print(
f"Warning: Working directory not set or invalid ('{self.working_directory}'). Level 3 config file will not be cleaned."
)
else:
return {"status": "error", "message": "Invalid level"}
# Ensure directory exists
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
# Basic validation and normalization of the schema data being saved
if not isinstance(data, dict):
print(
f"Warning: Invalid schema data received (not a dict). Wrapping in default structure."
)
data = {"type": "object", "properties": {}} # Reset to default empty
if "type" not in data:
data["type"] = "object" # Ensure type exists
if "properties" not in data or not isinstance(data["properties"], dict):
print(
f"Warning: Invalid or missing 'properties' in schema data. Resetting properties."
)
data["properties"] = {} # Ensure properties exists and is a dict
# Write schema
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Schema successfully updated at {schema_path}")
# Clean the corresponding config file *if* its path is valid
if config_path:
self._clean_config_for_schema(config_path, data)
else:
print(
f"Info: Config cleaning skipped for level {level} (no valid config path)."
)
return {"status": "success"}
except Exception as e:
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Error updating schema at {error_path}: {str(e)}")
# Consider adding traceback here for debugging
print(traceback.format_exc())
return {"status": "error", "message": str(e)}
def _clean_config_for_schema(
self, config_path: str, schema: Dict[str, Any]
) -> None:
"""Clean configuration file to match schema structure."""
# Check existence *before* trying to open
try:
if not os.path.exists(config_path):
print(
f"Info: Config file {config_path} not found for cleaning. Skipping."
)
return
# Cargar configuración actual
config = {}
content = "" # Store original content for comparison
with open(config_path, "r", encoding="utf-8") as f:
content = f.read()
if content.strip(): # Avoid error on empty file
config = json.loads(content)
else:
print(
f"Info: Config file {config_path} is empty. Cleaning will result in an empty object."
)
# Limpiar configuración recursivamente
cleaned_config = self._clean_object_against_schema(config, schema)
# Guardar configuración limpia solo si cambió o si el original estaba vacío
# (para evitar escrituras innecesarias)
# Use dumps for reliable comparison, handle potential errors during dumps
try:
original_config_str = json.dumps(config, sort_keys=True)
cleaned_config_str = json.dumps(cleaned_config, sort_keys=True)
except TypeError as te:
print(
f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save."
)
original_config_str = "" # Force inequality
cleaned_config_str = " " # Force inequality
if original_config_str != cleaned_config_str or not content.strip():
print(f"Info: Cleaning config file: {config_path}")
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
else:
print(
f"Info: Config file {config_path} already matches schema. No cleaning needed."
)
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean."
)
except IOError as e:
print(f"Error accessing config file {config_path} during cleaning: {e}")
except Exception as e:
print(f"Unexpected error cleaning config {config_path}: {str(e)}")
# Consider adding traceback here
print(traceback.format_exc())
def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any:
"""Recursively clean data to match schema structure."""
# Ensure schema is a dictionary, otherwise cannot proceed
if not isinstance(schema, dict):
print(
f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}"
)
return data
schema_type = schema.get("type")
if schema_type == "object":
if not isinstance(data, dict):
# If data is not a dict, but schema expects object, return empty dict
return {}
# This 'result' and the loop should be inside the 'if schema_type == "object":' block
result = {}
schema_props = schema.get("properties", {})
# Ensure schema_props is a dictionary
if not isinstance(schema_props, dict):
print(
f"Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object."
)
return {}
for key, value in data.items():
# Solo mantener campos que existen en el schema
if key in schema_props:
# Recursively clean the value based on the property's schema
# Ensure the property schema itself is a dict before recursing
prop_schema = schema_props[key]
if isinstance(prop_schema, dict):
result[key] = self._clean_object_against_schema(
value, prop_schema
)
else:
# If property schema is invalid, maybe keep original value or omit? Let's omit.
print(
f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data."
)
# Return result should be OUTSIDE the loop, but INSIDE the 'if object' block
return result
elif schema_type == "array":
if not isinstance(data, list):
# If data is not a list, but schema expects array, return empty list
return []
# If schema defines items structure, clean each item
items_schema = schema.get("items")
if isinstance(
items_schema, dict
): # Check if 'items' schema is a valid dict
return [
self._clean_object_against_schema(item, items_schema)
for item in data
]
else:
# If no valid item schema, return list as is (or potentially filter based on basic types if needed)
# Let's return as is for now.
return data # Keep array items as they are if no valid 'items' schema defined
elif "enum" in schema:
# Ensure enum values are defined as a list
enum_values = schema.get("enum")
if isinstance(enum_values, list):
# If schema has enum, keep data only if it's one of the allowed values
if data in enum_values:
return data
else:
# If value not in enum, return None or potentially the default value if specified?
# For cleaning, returning None or omitting might be safer. Let's return None.
return None # Or consider returning schema.get('default') if cleaning should apply defaults too
else:
# Invalid enum definition, return original data or None? Let's return None.
print(
f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'."
)
return None
# For basic types (string, integer, number, boolean, null), just return the data
# We could add type checking here if strict cleaning is needed,
# e.g., return None if type(data) doesn't match schema_type
elif schema_type in ["string", "integer", "number", "boolean", "null"]:
# Optional: Add stricter type check if needed
# expected_type_map = { "string": str, "integer": int, "number": (int, float), "boolean": bool, "null": type(None) }
# expected_types = expected_type_map.get(schema_type)
# if expected_types and not isinstance(data, expected_types):
# print(f"Warning: Type mismatch during cleaning. Expected {schema_type}, got {type(data)}. Returning None.")
# return None # Or schema.get('default')
return data
# If schema type is unknown or not handled, return data as is
else:
# This case might indicate an issue with the schema definition itself
# print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.")
return data
# --- Script Listing and Execution Methods ---
def list_scripts(self, group: str) -> List[Dict[str, str]]:
"""List all scripts in a group with their descriptions."""
try:
scripts_dir = os.path.join(self.script_groups_path, group)
scripts = []
if not os.path.exists(scripts_dir):
print(f"Directory not found: {scripts_dir}")
return [] # Return empty list if group directory doesn't exist
for file in os.listdir(scripts_dir):
# Modificar la condición para incluir cualquier archivo .py
if file.endswith(".py"):
path = os.path.join(scripts_dir, file)
description = self._extract_script_description(path)
print(
f"Debug: Found script: {file} with description: {description}"
) # Debug line
scripts.append({"name": file, "description": description})
print(f"Debug: Total scripts found in group '{group}': {len(scripts)}")
return scripts
except Exception as e:
print(f"Error listing scripts for group '{group}': {str(e)}")
return [] # Return empty list on error
def _extract_script_description(self, script_path: str) -> str:
"""Extract description from script's docstring or initial comments."""
try:
with open(script_path, "r", encoding="utf-8") as f:
content = f.read()
# Try to find docstring
docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if docstring_match:
return docstring_match.group(1).strip()
# Try to find initial comment
comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE)
if comment_match:
return comment_match.group(1).strip()
return "No description available"
except Exception as e:
print(f"Error extracting description from {script_path}: {str(e)}")
return "Error reading script description"
def execute_script(
self, group: str, script_name: str, broadcast_fn=None
) -> Dict[str, Any]:
"""
Execute script, broadcast output in real-time, and save final log
to a script-specific file in the script's directory.
"""
current_time = time.time()
time_since_last = current_time - self.last_execution_time
if time_since_last < self.min_execution_interval:
msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones"
self.append_log(f"Warning: {msg}") # Log throttling attempt
if broadcast_fn:
broadcast_fn(msg)
return {"status": "throttled", "error": msg}
self.last_execution_time = current_time
script_path = os.path.join(self.script_groups_path, group, script_name)
script_dir = os.path.dirname(script_path)
script_base_name = os.path.splitext(script_name)[0]
# Define script-specific log file path
script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt")
if not os.path.exists(script_path):
msg = f"Error Fatal: Script no encontrado en {script_path}"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Script not found"}
# Get working directory specific to the group
working_dir = self.get_work_dir(group)
if not working_dir:
msg = f"Error Fatal: Directorio de trabajo no configurado o inválido para el grupo '{group}'"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Working directory not set"}
# Double check validity (get_work_dir should already do this)
if not os.path.isdir(working_dir):
msg = f"Error Fatal: El directorio de trabajo '{working_dir}' no es válido o no existe."
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Invalid working directory"}
# Aggregate configurations using the updated get_config
configs = {
"level1": self.get_config("1"),
"level2": self.get_config("2", group),
"level3": self.get_config(
"3", group
), # get_config uses self.working_directory
"working_directory": working_dir,
}
print(f"Debug: Aggregated configs for script execution: {configs}")
config_file_path = os.path.join(script_dir, "script_config.json")
try:
with open(config_file_path, "w", encoding="utf-8") as f:
json.dump(configs, f, indent=2, ensure_ascii=False)
# Don't broadcast config saving unless debugging
# if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}")
except Exception as e:
msg = f"Error Fatal: No se pudieron guardar las configuraciones temporales en {config_file_path}: {str(e)}"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
# Optionally return error here if config saving is critical
stdout_capture = []
stderr_capture = ""
process = None
start_time = datetime.now()
try:
if broadcast_fn:
start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..."
broadcast_fn(start_msg)
# Determine creation flags for subprocess based on OS
creation_flags = 0
if sys.platform == "win32":
creation_flags = subprocess.CREATE_NO_WINDOW
# Execute the script
process = subprocess.Popen(
["python", "-u", script_path], # Added -u for unbuffered output
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
env=dict(os.environ, PYTHONIOENCODING="utf-8"),
creationflags=creation_flags, # Add this line
)
# Real-time stdout reading and broadcasting
while True:
line = process.stdout.readline()
if not line and process.poll() is not None:
break
if line:
cleaned_line = line.rstrip()
stdout_capture.append(cleaned_line) # Store line for final log
if broadcast_fn:
broadcast_fn(cleaned_line) # Broadcast in real-time
# Wait for process to finish and get return code
return_code = process.wait()
end_time = datetime.now()
duration = end_time - start_time
# Capture any remaining stderr
stderr_capture = process.stderr.read()
status = "success" if return_code == 0 else "error"
completion_msg = f"[{end_time.strftime('%H:%M:%S')}] Ejecución de {script_name} finalizada ({status}). Duración: {duration}."
if stderr_capture:
# Broadcast stderr only if there was an error potentially
if status == "error" and broadcast_fn:
broadcast_fn(f"--- ERRORES ---")
broadcast_fn(stderr_capture.strip())
broadcast_fn(f"--- FIN ERRORES ---")
# Always include stderr in the final log if present
completion_msg += f" Se detectaron errores (ver log)."
if broadcast_fn:
broadcast_fn(completion_msg)
# --- Write to script-specific log file ---
try:
with open(script_log_path, "w", encoding="utf-8") as log_f:
log_f.write(f"--- Log de Ejecución: {script_name} ---\n")
log_f.write(f"Grupo: {group}\n")
log_f.write(f"Directorio de Trabajo: {working_dir}\n")
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Duración: {duration}\n")
log_f.write(
f"Estado: {status.upper()} (Código de Salida: {return_code})\n"
)
log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n")
log_f.write("\n".join(stdout_capture))
log_f.write("\n\n--- ERRORES (STDERR) ---\n")
log_f.write(stderr_capture if stderr_capture else "Ninguno")
log_f.write("\n--- FIN DEL LOG ---\n")
if broadcast_fn:
broadcast_fn(f"Log completo guardado en: {script_log_path}")
print(f"Info: Script log saved to {script_log_path}")
except Exception as log_e:
err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}"
print(err_msg)
if broadcast_fn:
broadcast_fn(err_msg)
# ------------------------------------------
return {
"status": status,
"return_code": return_code,
"error": stderr_capture if stderr_capture else None,
"log_file": script_log_path, # Return path to the specific log
}
except Exception as e:
end_time = datetime.now()
duration = end_time - start_time
error_msg = (
f"Error inesperado durante la ejecución de {script_name}: {str(e)}"
)
traceback_info = traceback.format_exc() # Get full traceback
print(error_msg) # Print to console as well
print(traceback_info)
self.append_log(
f"ERROR FATAL: {error_msg}\n{traceback_info}"
) # Log centrally
if broadcast_fn:
# Ensure fatal errors are clearly marked in UI
broadcast_fn(
f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}"
)
# Attempt to write error to script-specific log
try:
with open(script_log_path, "w", encoding="utf-8") as log_f:
log_f.write(f"--- Log de Ejecución: {script_name} ---\n")
log_f.write(f"Grupo: {group}\n")
log_f.write(f"Directorio de Trabajo: {working_dir}\n")
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(
f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n"
)
log_f.write(f"Duración: {duration}\n")
log_f.write(f"Estado: FATAL ERROR\n")
log_f.write("\n--- ERROR ---\n")
log_f.write(error_msg + "\n")
log_f.write("\n--- TRACEBACK ---\n")
log_f.write(traceback_info) # Include traceback in log
log_f.write("\n--- FIN DEL LOG ---\n")
except Exception as log_e:
err_msg_log = (
f"Error adicional al intentar guardar el log de error: {log_e}"
)
print(err_msg_log)
return {"status": "error", "error": error_msg, "traceback": traceback_info}
finally:
# Ensure stderr pipe is closed if process exists
if process and process.stderr:
process.stderr.close()
# Ensure stdout pipe is closed if process exists
if process and process.stdout:
process.stdout.close()
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
"""Set working directory path for a script group and update history."""
# Normalizar el path recibido
path = os.path.normpath(path)
if not os.path.exists(path):
return {"status": "error", "message": "Directory does not exist"}
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
# Cargar datos existentes o crear nuevos
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar paths existentes en el historial
if "history" in data:
data["history"] = [os.path.normpath(p) for p in data["history"]]
except (FileNotFoundError, json.JSONDecodeError):
data = {"path": "", "history": []}
# Actualizar path actual
data["path"] = path
# Actualizar historial
if "history" not in data:
data["history"] = []
# Eliminar la ruta del historial si ya existe (usando path normalizado)
data["history"] = [
p for p in data["history"] if os.path.normpath(p) != path
]
# Agregar la ruta al principio del historial
data["history"].insert(0, path)
# Mantener solo los últimos 10 directorios
data["history"] = data["history"][:10]
# Guardar datos actualizados
with open(work_dir_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# Actualizar la variable de instancia
self.working_directory = path
# Crear data.json en el directorio de trabajo si no existe
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
return {"status": "success", "path": path}
except Exception as e:
return {"status": "error", "message": str(e)}