Compare commits

..

3 Commits

Author SHA1 Message Date
Miguel affab8a646 Add XML block header parser and SCL header addition script
- Implemented `parse_block_header_from_xml` function to extract block information from TIA Portal XML files.
- Created `_extract_common_attributes` helper function to retrieve common attributes from block nodes.
- Added `generate_block_header_comment` function to format the header comment based on extracted block information.
- Introduced `get_block_header_comment_from_xml` for convenience in generating header comments directly from XML files.
- Developed `add_header_to_file` function in a new test script to read SCL files, check for existing headers, and prepend a generated header from the corresponding XML file.
- Included error handling and logging for better debugging and user feedback.
2025-08-23 13:14:18 +02:00
Miguel 5da864abe0 refactor: Simplify configuration and improve code readability in x4.py 2025-08-23 10:52:02 +02:00
Miguel 5ed4d9391e feat: Implement script execution and stopping functionality
- Added a new method to stop running scripts in the ScriptExecutor class, allowing graceful termination of scripts.
- Updated the ConfigurationManager class to handle script stopping requests and manage running processes.
- Enhanced the frontend JavaScript to include stop buttons for scripts, updating their state based on execution status.
- Introduced a mechanism to track running scripts and update UI elements accordingly.
- Improved logging for script execution and stopping events.
2025-08-23 10:51:38 +02:00
21 changed files with 25306 additions and 329 deletions

18
app.py
View File

@ -140,6 +140,24 @@ def execute_script():
return jsonify({"error": error_msg})
@app.route("/api/stop_script", methods=["POST"])
def stop_script():
try:
script_group = request.json["group"]
script_name = request.json["script"]
# Detener el script en ejecución
result = config_manager.stop_script(
script_group, script_name, broadcast_message
)
return jsonify(result)
except Exception as e:
error_msg = f"Error deteniendo script: {str(e)}"
broadcast_message(error_msg)
return jsonify({"error": error_msg})
@app.route("/")
def index():
script_groups = config_manager.get_script_groups()

View File

@ -8,8 +8,8 @@
"cronologia_file": "cronologia.md"
},
"level3": {
"cronologia_file": "Planning - emails",
"input_directory": "C:\\Trabajo\\SIDEL\\PROJECTs Planning\\Emails"
"cronologia_file": "emails",
"input_directory": "C:/Users/migue/OneDrive/Miguel/Obsidean/General/Notas/Miguel/Contable/2025/EmailsOriginales"
},
"working_directory": "C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\04-SIDEL\\0 - PROJECTS Description\\PLANNING"
"working_directory": "C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\General\\Notas\\Miguel\\Contable\\2025"
}

View File

@ -1,10 +1,12 @@
{
"path": "C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\04-SIDEL\\0 - PROJECTS Description\\PLANNING",
"path": "C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\General\\Notas\\Miguel\\Contable\\2025",
"history": [
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\General\\Notas\\Miguel\\Contable\\2025",
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\03-VM\\45 - HENKEL - VM Auto Changeover",
"D:\\Trabajo\\VM\\45 - HENKEL - VM Auto Changeover\\Entregado por VM\\01 - 26-07-2025 Max - Emails",
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\04-SIDEL\\0 - PROJECTS Description\\PLANNING",
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\04-SIDEL\\17 - E5.006880 - Modifica O&U - RSC098",
"C:\\Trabajo\\SIDEL\\17 - E5.006880 - Modifica O&U - RSC098\\Reporte\\Emails",
"C:\\Trabajo\\SIDEL\\17 - E5.006880 - Modifica O&U - RSC098",
"D:\\Trabajo\\VM\\45 - HENKEL - VM Auto Changeover\\Entregado por VM\\01 - 26-07-2025 Max - Emails"
"C:\\Trabajo\\SIDEL\\17 - E5.006880 - Modifica O&U - RSC098"
]
}

View File

@ -8,6 +8,7 @@ from tkinter import filedialog
import os
import sys
import traceback
import time
from pathlib import Path
script_root = os.path.dirname(
@ -18,18 +19,18 @@ from backend.script_utils import load_configuration
# --- Configuration ---
# Supported TIA Portal versions mapping (extension -> version)
SUPPORTED_TIA_VERSIONS = {
".ap18": "18.0",
".ap19": "19.0",
".ap20": "20.0"
}
SUPPORTED_TIA_VERSIONS = {".ap18": "18.0", ".ap19": "19.0", ".ap20": "20.0"}
# Filter for cross-references. Based on documentation:
# 1: 'AllObjects', 2: 'ObjectsWithReferences', 3: 'ObjectsWithoutReferences', 4: 'UnusedObjects'
# Using 1 to export all. 0 might also work as a default in some API versions.
CROSS_REF_FILTER = 1
MAX_REOPEN_ATTEMPTS = 5 # Número máximo de re-aperturas permitidas para evitar bucles infinitos
MAX_REOPEN_ATTEMPTS = (
5 # Número máximo de re-aperturas permitidas para evitar bucles infinitos
)
BLOCK_TIMEOUT_SECONDS = 120 # Referencia de tiempo esperado para el procesamiento de cada bloque (para logging)
class PortalDisposedException(Exception):
"""Excepción lanzada cuando TIA Portal se ha cerrado inesperadamente o un objeto ha sido descartado."""
@ -51,6 +52,7 @@ def _is_disposed_exception(exc: Exception) -> bool:
)
)
# --- TIA Scripting Import Handling ---
if os.getenv("TIA_SCRIPTING"):
sys.path.append(os.getenv("TIA_SCRIPTING"))
@ -80,11 +82,12 @@ except Exception as e:
# --- Functions ---
def get_supported_filetypes():
"""Returns the supported file types for TIA Portal projects."""
filetypes = []
for ext, version in SUPPORTED_TIA_VERSIONS.items():
version_major = version.split('.')[0]
version_major = version.split(".")[0]
filetypes.append((f"TIA Portal V{version_major} Projects", f"*{ext}"))
# Add option to show all supported files
@ -93,6 +96,7 @@ def get_supported_filetypes():
return filetypes
def detect_tia_version(project_file_path):
"""Detects TIA Portal version based on file extension."""
file_path = Path(project_file_path)
@ -100,21 +104,26 @@ def detect_tia_version(project_file_path):
if file_extension in SUPPORTED_TIA_VERSIONS:
detected_version = SUPPORTED_TIA_VERSIONS[file_extension]
print(f"Versión de TIA Portal detectada: {detected_version} (de la extensión {file_extension})")
print(
f"Versión de TIA Portal detectada: {detected_version} (de la extensión {file_extension})"
)
return detected_version
else:
print(f"ADVERTENCIA: Extensión de archivo no reconocida '{file_extension}'. Extensiones soportadas: {list(SUPPORTED_TIA_VERSIONS.keys())}")
print(
f"ADVERTENCIA: Extensión de archivo no reconocida '{file_extension}'. Extensiones soportadas: {list(SUPPORTED_TIA_VERSIONS.keys())}"
)
# Default to version 18.0 for backward compatibility
print("Usando por defecto TIA Portal V18.0")
return "18.0"
def select_project_file():
"""Opens a dialog to select a TIA Portal project file."""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title="Seleccionar archivo de proyecto TIA Portal",
filetypes=get_supported_filetypes()
filetypes=get_supported_filetypes(),
)
root.destroy()
if not file_path:
@ -122,12 +131,52 @@ def select_project_file():
sys.exit(0)
return file_path
# Normalizar nombres de bloque/tabla/udt para comparaciones consistentes
def _normalize_name(name: str) -> str:
"""Normaliza un nombre quitando espacios laterales y convirtiendo a minúsculas."""
return name.strip().lower()
def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, problematic_blocks=None):
def _export_block_with_timeout(
block, blocks_cr_path, block_name, timeout_seconds=BLOCK_TIMEOUT_SECONDS
):
"""
Exporta las referencias cruzadas de un bloque con monitoreo de tiempo.
Note: TIA Portal Openness no permite operaciones multi-hilo, por lo que
implementamos un timeout conceptual que al menos registra cuánto tiempo toma.
Returns:
bool: True si se exportó exitosamente
"""
start_time = time.time()
try:
# Realizar la exportación de forma directa (sin hilos debido a restricciones de TIA)
block.export_cross_references(
target_directorypath=str(blocks_cr_path),
filter=CROSS_REF_FILTER,
)
elapsed_time = time.time() - start_time
# Verificar si excedió el tiempo esperado (aunque ya terminó)
if elapsed_time > timeout_seconds:
print(
f" ADVERTENCIA: El bloque tardó {elapsed_time:.2f}s (>{timeout_seconds}s esperado)"
)
return True
except Exception as e:
elapsed_time = time.time() - start_time
print(f" Tiempo transcurrido antes del error: {elapsed_time:.2f} segundos")
raise e
def export_plc_cross_references(
plc, export_base_dir, exported_blocks=None, problematic_blocks=None
):
"""Exports cross-references for various elements from a given PLC.
Parámetros
----------
@ -151,7 +200,10 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
# --- Export Program Block Cross-References ---
blocks_cr_exported = 0
blocks_cr_skipped = 0
print(f"\n[PLC: {plc_name}] Exportando referencias cruzadas de bloques de programa...")
current_block_name = None # Track current block being processed
print(
f"\n[PLC: {plc_name}] Exportando referencias cruzadas de bloques de programa..."
)
blocks_cr_path = plc_export_dir / "ProgramBlocks_CR"
blocks_cr_path.mkdir(exist_ok=True)
print(f" Destino: {blocks_cr_path}")
@ -159,23 +211,42 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
try:
program_blocks = plc.get_program_blocks()
print(f" Se encontraron {len(program_blocks)} bloques de programa.")
# Show which blocks will be skipped from the start
if problematic_blocks:
skipped_names = []
for block in program_blocks:
if _normalize_name(block.get_name()) in problematic_blocks:
skipped_names.append(block.get_name())
if skipped_names:
print(
f" Bloques que serán omitidos (problemáticos previos): {', '.join(skipped_names)}"
)
for block in program_blocks:
block_name = block.get_name()
current_block_name = block_name # Update current block being processed
norm_block = _normalize_name(block_name)
if norm_block in problematic_blocks:
print(f" Omitiendo bloque problemático previamente detectado: {block_name}")
print(
f" Omitiendo bloque problemático previamente detectado: {block_name}"
)
blocks_cr_skipped += 1
continue
if norm_block in exported_blocks:
# Ya exportado en un intento anterior, no repetir
print(f" Omitiendo bloque ya exportado: {block_name}")
continue
print(f" Procesando bloque: {block_name}...")
try:
print(f" Exportando referencias cruzadas para {block_name}...")
block.export_cross_references(
target_directorypath=str(blocks_cr_path),
filter=CROSS_REF_FILTER,
)
start_time = time.time()
# Usar la función con monitoreo de tiempo
_export_block_with_timeout(block, blocks_cr_path, block_name)
elapsed_time = time.time() - start_time
print(f" Exportación completada en {elapsed_time:.2f} segundos")
blocks_cr_exported += 1
exported_blocks.add(norm_block)
except RuntimeError as block_ex:
@ -189,10 +260,10 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
f" ERROR GENERAL al exportar referencias cruzadas para el bloque {block_name}: {block_ex}"
)
traceback.print_exc()
problematic_blocks.add(norm_block) # Always mark as problematic
blocks_cr_skipped += 1
if _is_disposed_exception(block_ex):
# Escalamos para que el script pueda re-abrir el Portal y omitir el bloque
problematic_blocks.add(norm_block)
raise PortalDisposedException(block_ex, failed_block=block_name)
print(
f" Resumen de exportación de referencias cruzadas de bloques: Exportados={blocks_cr_exported}, Omitidos/Errores={blocks_cr_skipped}"
@ -202,15 +273,23 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
" Error de atributo: No se pudo encontrar 'get_program_blocks' en el objeto PLC. Omitiendo bloques de programa."
)
except Exception as e:
print(f" ERROR al acceder a los bloques de programa para exportar referencias cruzadas: {e}")
print(
f" ERROR al acceder a los bloques de programa para exportar referencias cruzadas: {e}"
)
traceback.print_exc()
problematic_blocks.add(_normalize_name(e.__str__()))
# If we know which block was being processed, mark it as problematic
if current_block_name:
problematic_blocks.add(_normalize_name(current_block_name))
raise PortalDisposedException(e, failed_block=current_block_name)
else:
raise PortalDisposedException(e)
# --- Export PLC Tag Table Cross-References ---
tags_cr_exported = 0
tags_cr_skipped = 0
print(f"\n[PLC: {plc_name}] Exportando referencias cruzadas de tablas de variables...")
print(
f"\n[PLC: {plc_name}] Exportando referencias cruzadas de tablas de variables..."
)
tags_cr_path = plc_export_dir / "PlcTags_CR"
tags_cr_path.mkdir(exist_ok=True)
print(f" Destino: {tags_cr_path}")
@ -224,8 +303,7 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
try:
print(f" Exportando referencias cruzadas para {table_name}...")
table.export_cross_references(
target_directorypath=str(tags_cr_path),
filter=CROSS_REF_FILTER
target_directorypath=str(tags_cr_path), filter=CROSS_REF_FILTER
)
tags_cr_exported += 1
except RuntimeError as table_ex:
@ -247,13 +325,17 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
" Error de atributo: No se pudo encontrar 'get_plc_tag_tables' en el objeto PLC. Omitiendo tablas de variables."
)
except Exception as e:
print(f" ERROR al acceder a las tablas de variables para exportar referencias cruzadas: {e}")
print(
f" ERROR al acceder a las tablas de variables para exportar referencias cruzadas: {e}"
)
traceback.print_exc()
# --- Export PLC Data Type (UDT) Cross-References ---
udts_cr_exported = 0
udts_cr_skipped = 0
print(f"\n[PLC: {plc_name}] Exportando referencias cruzadas de tipos de datos PLC (UDTs)...")
print(
f"\n[PLC: {plc_name}] Exportando referencias cruzadas de tipos de datos PLC (UDTs)..."
)
udts_cr_path = plc_export_dir / "PlcDataTypes_CR"
udts_cr_path.mkdir(exist_ok=True)
print(f" Destino: {udts_cr_path}")
@ -267,8 +349,7 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
try:
print(f" Exportando referencias cruzadas para {udt_name}...")
udt.export_cross_references(
target_directorypath=str(udts_cr_path),
filter=CROSS_REF_FILTER
target_directorypath=str(udts_cr_path), filter=CROSS_REF_FILTER
)
udts_cr_exported += 1
except RuntimeError as udt_ex:
@ -296,7 +377,9 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
# --- Export System Block Cross-References ---
sys_blocks_cr_exported = 0
sys_blocks_cr_skipped = 0
print(f"\n[PLC: {plc_name}] Intentando exportar referencias cruzadas de bloques de sistema...")
print(
f"\n[PLC: {plc_name}] Intentando exportar referencias cruzadas de bloques de sistema..."
)
sys_blocks_cr_path = plc_export_dir / "SystemBlocks_CR"
sys_blocks_cr_path.mkdir(exist_ok=True)
print(f" Destino: {sys_blocks_cr_path}")
@ -304,14 +387,14 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
try:
if hasattr(plc, "get_system_blocks"):
system_blocks = plc.get_system_blocks()
print(
f" Se encontraron {len(system_blocks)} bloques de sistema."
)
print(f" Se encontraron {len(system_blocks)} bloques de sistema.")
for sys_block in system_blocks:
sys_block_name = sys_block.get_name()
print(f" Procesando bloque de sistema: {sys_block_name}...")
try:
print(f" Exportando referencias cruzadas para {sys_block_name}...")
print(
f" Exportando referencias cruzadas para {sys_block_name}..."
)
sys_block.export_cross_references(
target_directorypath=str(sys_blocks_cr_path),
filter=CROSS_REF_FILTER,
@ -350,7 +433,9 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
# --- Export Software Unit Cross-References ---
sw_units_cr_exported = 0
sw_units_cr_skipped = 0
print(f"\n[PLC: {plc_name}] Intentando exportar referencias cruzadas de unidades de software...")
print(
f"\n[PLC: {plc_name}] Intentando exportar referencias cruzadas de unidades de software..."
)
sw_units_cr_path = plc_export_dir / "SoftwareUnits_CR"
sw_units_cr_path.mkdir(exist_ok=True)
print(f" Destino: {sw_units_cr_path}")
@ -400,6 +485,7 @@ def export_plc_cross_references(plc, export_base_dir, exported_blocks=None, prob
print(f"\n--- Finalizado el procesamiento del PLC: {plc_name} ---")
def open_portal_and_project(tia_version: str, project_file_path: str):
"""Abre TIA Portal y el proyecto indicado, devolviendo el portal y el objeto proyecto."""
print(f"\nConectando a TIA Portal V{tia_version}...")
@ -414,9 +500,12 @@ def open_portal_and_project(tia_version: str, project_file_path: str):
if project_obj is None:
project_obj = portal.get_project()
if project_obj is None:
raise Exception("No se pudo abrir u obtener el proyecto especificado tras la reapertura.")
raise Exception(
"No se pudo abrir u obtener el proyecto especificado tras la reapertura."
)
return portal, project_obj
# --- Main Script ---
if __name__ == "__main__":
@ -424,11 +513,20 @@ if __name__ == "__main__":
working_directory = configs.get("working_directory")
print("--- Exportador de Referencias Cruzadas de TIA Portal ---")
print(f"Configuración:")
print(
f" - Tiempo esperado por bloque: {BLOCK_TIMEOUT_SECONDS} segundos (para logging)"
)
print(f" - Máximo intentos de reapertura: {MAX_REOPEN_ATTEMPTS}")
print(f" - Filtro de referencias cruzadas: {CROSS_REF_FILTER}")
print("")
# Validate working directory
if not working_directory or not os.path.isdir(working_directory):
print("ERROR: Directorio de trabajo no configurado o inválido.")
print("Por favor configure el directorio de trabajo usando la aplicación principal.")
print(
"Por favor configure el directorio de trabajo usando la aplicación principal."
)
sys.exit(1)
# 1. Select Project File
@ -444,7 +542,9 @@ if __name__ == "__main__":
print(f"\nProyecto seleccionado: {project_file}")
print(f"Usando directorio base de exportación: {export_base_dir.resolve()}")
except Exception as e:
print(f"ERROR: No se pudo crear el directorio de exportación '{export_base_dir}'. Error: {e}")
print(
f"ERROR: No se pudo crear el directorio de exportación '{export_base_dir}'. Error: {e}"
)
sys.exit(1)
portal_instance = None
@ -452,7 +552,9 @@ if __name__ == "__main__":
try:
# 4. Connect to TIA Portal with detected version
portal_instance, project_object = open_portal_and_project(tia_version, project_file)
portal_instance, project_object = open_portal_and_project(
tia_version, project_file
)
# 5. Get PLCs
plcs = project_object.get_plcs()
@ -484,8 +586,14 @@ if __name__ == "__main__":
reopen_attempts += 1
failed_block = pd_ex.failed_block
if failed_block:
problematic_blocks.add(_normalize_name(failed_block))
norm_failed_block = _normalize_name(failed_block)
problematic_blocks.add(norm_failed_block)
skipped_blocks_report.append(failed_block)
print(f"Marcando bloque problemático: {failed_block}")
else:
print(
"Error general detectado sin bloque específico identificado"
)
if reopen_attempts > MAX_REOPEN_ATTEMPTS:
print(
@ -501,8 +609,12 @@ if __name__ == "__main__":
pass
# Re-abrir portal y proyecto
print(f"Re-abriendo TIA Portal (intento {reopen_attempts}/{MAX_REOPEN_ATTEMPTS})...")
portal_instance, project_object = open_portal_and_project(tia_version, project_file)
print(
f"Re-abriendo TIA Portal (intento {reopen_attempts}/{MAX_REOPEN_ATTEMPTS})..."
)
portal_instance, project_object = open_portal_and_project(
tia_version, project_file
)
# Buscar de nuevo el PLC por nombre
plc_device = None
@ -519,7 +631,12 @@ if __name__ == "__main__":
continue
if skipped_blocks_report:
print(f"\nBloques problemáticos para el PLC '{plc_name}': {', '.join(set(skipped_blocks_report))}")
print(
f"\nBloques problemáticos para el PLC '{plc_name}': {', '.join(set(skipped_blocks_report))}"
)
print(
f"Total de bloques problemáticos registrados: {len(problematic_blocks)}"
)
print("\nProceso de exportación de referencias cruzadas completado.")

View File

@ -0,0 +1,117 @@
FUNCTION "1032_FC Manual function" : Void
{ S7_Optimized_Access := 'TRUE' }
VERSION : 0.1
VAR_TEMP
wPosition : Word;
xrtCurrentLimit : Bool;
g : Int;
p : Int;
m : Int;
b : Int;
END_VAR
BEGIN
#g := "DB HMI_1".nGWNumber;
#p := "DB HMI_1".nPortNumber;
#b := "DB HMI_1".nBoxNumber;
#m := "DB HMI_1".nMotorNumber;
"rtMotInPos"(CLK:="DB HMI_1".xMotStatInPos );
IF "DB Cycle".Man THEN
// Run Forward
IF "DB HMI_1".xPBMotMoveFw AND
"DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].xEnable THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Sign := TRUE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].PosType := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Position := "DB HMI_1".nPosition;
END_IF;
// Run Backward
IF "DB HMI_1".xPBMotMoveBw AND
"DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].xEnable THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Sign := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].PosType := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Position := "DB HMI_1".nPosition;
END_IF;
// Run Zero/Position
IF "DB HMI_1".xPBMotMoveZeroPos AND
"DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].xEnable THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].PosType := TRUE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Position := "DB HMI_1".nPosition;
END_IF;
// Stop
IF NOT "DB Cycle".xZona_MoveManFw AND NOT "DB Cycle".xZona_MoveManBw THEN
IF "DB HMI_1".xPBMotStop OR "rtMotInPos".Q OR "DB HMI_1".xMotStatAlarm THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Position := 0;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].PosType := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Reset := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Sign := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Stop := FALSE;
END_IF;
IF ("DB HMI_1".xPBMotMoveFw OR "DB HMI_1".xPBMotMoveBw OR "DB HMI_1".xPBMotMoveZeroPos) AND
"DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].xEnable THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Stop := TRUE;
END_IF;
END_IF;
END_IF;
// Alarm Reset
IF "DB HMI_1".xPBMotAlarmReset AND "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].Alarm THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Reset := TRUE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Stop := TRUE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Sign := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].PosType := FALSE;
END_IF;
"rtPB_MotAlmReset"(CLK:= NOT "DB HMI_1".xPBMotAlarmReset);
IF "rtPB_MotAlmReset".Q THEN
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Reset := FALSE;
"DB Gateway".N[#g].write.P[#p].MotorBoxCtrl[#b].Mot[#m].Stop := FALSE;
END_IF;
// Motor Current limitation
"rtPB_CurrentLimit"(CLK:="DB HMI_1".xPopUpCurrentLim,
Q=>#xrtCurrentLimit);
IF "rtPB_CurrentLimit".Q THEN
"DB HMI_1".snCurrentLimFW := "DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].CurrentLimFW;
"DB HMI_1".snCurrentLimBW := "DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].CurrentLimBW;
END_IF;
IF "DB HMI_1".xPopUpCurrentLim AND NOT "rtPB_CurrentLimit".Q THEN
"DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].CurrentLimFW := "DB HMI_1".snCurrentLimFW;
"DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].CurrentLimBW := "DB HMI_1".snCurrentLimBW;
END_IF;
IF NOT "DB HMI_1".xPopUpCurrentLim THEN
"DB HMI_1".snCurrentLimFW := 0;
"DB HMI_1".snCurrentLimBW := 0;
END_IF;
// HMI Motor Status
"DB HMI_1".snGatewayFirmware := "DB Gateway".N[#g].read.P[#p].Firmware;
"DB HMI_1".nBoxInstalled := USINT_TO_INT ("DB Gateway".N[#g].read.P[#p].MboxNumber);
"DB HMI_1".snBoxFirmwareVersion := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].BoxStatus.FirmwareVersion;
"DB HMI_1".snBoxFirmwareRevision := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].BoxStatus.FirmwareRevision;
"DB HMI_1".xBox_MotEnabled := "DB MotorPar".GW[#g].P[#p].Box[#b].Mot[#m].xEnable;
"DB HMI_1".xMotStatRunningFw := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].MovingFW;
"DB HMI_1".xMotStatRunningBw := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].MovingBW;
"DB HMI_1".xMotStatInPos := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].InPOS;
"DB HMI_1".xMotStatAlarm := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].Alarm;
"DB HMI_1".xMotStatInZeroPos := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].InZero;
"DB HMI_1".xMotStatRunningSlowly := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].MotorStatus[#m].MovingSlowly;
"DB HMI_1".xBoxFuseBurned := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].BoxStatus.BurnedFuse;
"DB HMI_1".xBoxUndervoltage := "DB Gateway".N[#g].read.P[#p].MotorsBoxStatus[#b].BoxStatus.Undervoltage;
IF ("DB HMI_1".xMotStatRunningFw OR "DB HMI_1".xMotStatRunningBw) AND "DB Cycle".Man THEN
"DB HMI_1".xMotStatRunning := 1;
ELSE
"DB HMI_1".xMotStatRunning := 0;
END_IF;
END_FUNCTION

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@
"path": "."
},
{
"path": "C:/Trabajo/SIDEL/13 - E5.007560 - Modifica O&U - SAE235/Reporte/ExportTia"
"path": "../../../../../../Trabajo/VM/45 - HENKEL - VM Auto Changeover/ExportTia/PLC_TL25_Q1"
}
],
"settings": {

View File

@ -1,11 +1,23 @@
# generators/generate_md_tag_table.py
# -*- coding: utf-8 -*-
def generate_tag_table_markdown(data):
"""Genera contenido Markdown para una tabla de tags."""
md_lines = []
table_name = data.get("block_name", "UnknownTagTable")
tags = data.get("tags", [])
block_number = data.get("block_number")
block_type = data.get("block_type", "TagTable")
# Agregar línea de identificación del bloque al inicio
if block_number and block_type:
if block_type == "PlcTagTable" or block_type == "TagTable":
md_lines.append(f"<!-- TAG{block_number} -->")
else:
md_lines.append(f"<!-- {block_type}{block_number} -->")
elif block_type:
md_lines.append(f"<!-- {block_type} -->")
md_lines.append(f"# Tag Table: {table_name}")
md_lines.append("")
@ -18,7 +30,9 @@ def generate_tag_table_markdown(data):
datatype = tag.get("datatype", "N/A")
address = tag.get("address", "N/A") or " "
comment_raw = tag.get("comment")
comment = comment_raw.replace('|', '\|').replace('\n', ' ') if comment_raw else ""
comment = (
comment_raw.replace("|", "\|").replace("\n", " ") if comment_raw else ""
)
md_lines.append(f"| `{name}` | `{datatype}` | `{address}` | {comment} |")
md_lines.append("")
else:

View File

@ -3,44 +3,88 @@
import re
from .generator_utils import format_scl_start_value # Importar utilidad necesaria
def generate_markdown_member_rows(members, level=0):
"""Genera filas Markdown para miembros de UDT (recursivo)."""
md_rows = []; prefix = "&nbsp;&nbsp;&nbsp;&nbsp;" * level
md_rows = []
prefix = "&nbsp;&nbsp;&nbsp;&nbsp;" * level
for member in members:
name = member.get("name", "N/A"); datatype = member.get("datatype", "N/A")
name = member.get("name", "N/A")
datatype = member.get("datatype", "N/A")
start_value_raw = member.get("start_value")
start_value_fmt = format_scl_start_value(start_value_raw, datatype) if start_value_raw is not None else ""
comment_raw = member.get("comment"); comment = comment_raw.replace('|', '\|').replace('\n', ' ') if comment_raw else ""
md_rows.append(f"| {prefix}`{name}` | `{datatype}` | `{start_value_fmt}` | {comment} |")
start_value_fmt = (
format_scl_start_value(start_value_raw, datatype)
if start_value_raw is not None
else ""
)
comment_raw = member.get("comment")
comment = (
comment_raw.replace("|", "\|").replace("\n", " ") if comment_raw else ""
)
md_rows.append(
f"| {prefix}`{name}` | `{datatype}` | `{start_value_fmt}` | {comment} |"
)
children = member.get("children")
if children: md_rows.extend(generate_markdown_member_rows(children, level + 1))
if children:
md_rows.extend(generate_markdown_member_rows(children, level + 1))
array_elements = member.get("array_elements")
if array_elements:
base_type_for_init = datatype
if isinstance(datatype, str) and datatype.lower().startswith("array["):
match = re.match(r"(Array\[.*\]\s+of\s+)(.*)", datatype, re.IGNORECASE)
if match: base_type_for_init = match.group(2).strip()
if match:
base_type_for_init = match.group(2).strip()
md_rows.append(f"| {prefix}&nbsp;&nbsp;*(Initial Values)* | | | |")
try:
indices_numeric = {int(k): v for k, v in array_elements.items()}
sorted_indices_str = [str(k) for k in sorted(indices_numeric.keys())]
except ValueError: sorted_indices_str = sorted(array_elements.keys())
except ValueError:
sorted_indices_str = sorted(array_elements.keys())
for idx_str in sorted_indices_str:
val_raw = array_elements[idx_str]
val_fmt = format_scl_start_value(val_raw, base_type_for_init) if val_raw is not None else ""
md_rows.append(f"| {prefix}&nbsp;&nbsp;`[{idx_str}]` | | `{val_fmt}` | |")
val_fmt = (
format_scl_start_value(val_raw, base_type_for_init)
if val_raw is not None
else ""
)
md_rows.append(
f"| {prefix}&nbsp;&nbsp;`[{idx_str}]` | | `{val_fmt}` | |"
)
return md_rows
def generate_udt_markdown(data):
"""Genera contenido Markdown para un UDT."""
md_lines = []; udt_name = data.get("block_name", "UnknownUDT"); udt_comment = data.get("block_comment", "")
md_lines.append(f"# UDT: {udt_name}"); md_lines.append("")
if udt_comment: md_lines.append(f"**Comment:**"); [md_lines.append(f"> {line}") for line in udt_comment.splitlines()]; md_lines.append("")
md_lines = []
udt_name = data.get("block_name", "UnknownUDT")
udt_comment = data.get("block_comment", "")
block_number = data.get("block_number")
block_type = data.get("block_type", "UDT")
# Agregar línea de identificación del bloque al inicio
if block_number and block_type:
if block_type == "PlcUDT" or block_type == "UDT":
md_lines.append(f"<!-- UDT{block_number} -->")
else:
md_lines.append(f"<!-- {block_type}{block_number} -->")
elif block_type:
md_lines.append(f"<!-- {block_type} -->")
md_lines.append(f"# UDT: {udt_name}")
md_lines.append("")
if udt_comment:
md_lines.append(f"**Comment:**")
[md_lines.append(f"> {line}") for line in udt_comment.splitlines()]
md_lines.append("")
members = data.get("interface", {}).get("None", [])
if members:
md_lines.append("## Members"); md_lines.append("")
md_lines.append("| Name | Datatype | Start Value | Comment |"); md_lines.append("|---|---|---|---|")
md_lines.append("## Members")
md_lines.append("")
md_lines.append("| Name | Datatype | Start Value | Comment |")
md_lines.append("|---|---|---|---|")
md_lines.extend(generate_markdown_member_rows(members))
md_lines.append("")
else: md_lines.append("No members found in the UDT interface."); md_lines.append("")
else:
md_lines.append("No members found in the UDT interface.")
md_lines.append("")
return md_lines

View File

@ -7,7 +7,6 @@ from .generator_utils import format_variable_name, generate_scl_declarations
SCL_SUFFIX = "_sympy_processed"
# ... (_generate_scl_header sin cambios)...
def _generate_scl_header(data, scl_block_name):
scl_output = []
block_type = data.get("block_type", "Unknown")
@ -19,6 +18,20 @@ def _generate_scl_header(data, scl_block_name):
scl_block_keyword = "FUNCTION"
elif block_type == "OB":
scl_block_keyword = "ORGANIZATION_BLOCK"
# Agregar línea de identificación del bloque al inicio
if block_number and block_type:
if block_type == "FB":
scl_output.append(f"// FB{block_number}")
elif block_type == "FC":
scl_output.append(f"// FC{block_number}")
elif block_type == "OB":
scl_output.append(f"// OB{block_number}")
else:
scl_output.append(f"// {block_type}{block_number}")
elif block_type:
scl_output.append(f"// {block_type}")
scl_output.append(f"// Block Type: {block_type}")
if block_name != scl_block_name:
scl_output.append(f"// Block Name (Original): {block_name}")
@ -188,7 +201,9 @@ def _generate_scl_body(networks):
scl_output.append(f" // --- BEGIN STL Network {i+1} ---")
scl_output.append(f" ```stl ")
[
scl_output.append(f" {stl_line}") # scl_output.append(f" // {stl_line}")
scl_output.append(
f" {stl_line}"
) # scl_output.append(f" // {stl_line}")
for stl_line in raw_stl_code.splitlines()
]
scl_output.append(f" ``` ")

View File

@ -3,6 +3,7 @@
# No necesita importar json/os aquí, lo hará generate_scl_declarations
from .generator_utils import format_variable_name, generate_scl_declarations
# Modificar _generate_scl_header si es necesario, pero parece ok
def _generate_scl_header(data, scl_block_name):
# ... (código sin cambios) ...
@ -11,14 +12,35 @@ def _generate_scl_header(data, scl_block_name):
block_name = data.get("block_name", "UnknownBlock")
block_number = data.get("block_number")
block_comment = data.get("block_comment", "")
# Agregar línea de identificación del bloque al inicio
if block_number and block_type:
if block_type == "GlobalDB":
scl_output.append(f"// DB{block_number}")
elif block_type == "InstanceDB":
scl_output.append(f"// DB{block_number}")
else:
# Para otros tipos de DB
scl_output.append(f"// DB{block_number}")
elif block_type:
scl_output.append(f"// {block_type}")
scl_output.append(f"// Block Type: {block_type}")
if block_name != scl_block_name: scl_output.append(f"// Block Name (Original): {block_name}")
if block_number: scl_output.append(f"// Block Number: {block_number}")
if block_comment: scl_output.append(f"// Block Comment:"); [scl_output.append(f"// {line}") for line in block_comment.splitlines()]
scl_output.append(""); scl_output.append(f'DATA_BLOCK "{scl_block_name}"'); scl_output.append("{ S7_Optimized_Access := 'TRUE' }")
scl_output.append("VERSION : 0.1"); scl_output.append("")
if block_name != scl_block_name:
scl_output.append(f"// Block Name (Original): {block_name}")
if block_number:
scl_output.append(f"// Block Number: {block_number}")
if block_comment:
scl_output.append(f"// Block Comment:")
[scl_output.append(f"// {line}") for line in block_comment.splitlines()]
scl_output.append("")
scl_output.append(f'DATA_BLOCK "{scl_block_name}"')
scl_output.append("{ S7_Optimized_Access := 'TRUE' }")
scl_output.append("VERSION : 0.1")
scl_output.append("")
return scl_output
# Modificar _generate_scl_interface para pasar project_root_dir
def _generate_scl_interface(interface_data, project_root_dir): # <-- Nuevo argumento
"""Genera la sección VAR para DB (basada en 'Static')."""
@ -27,14 +49,21 @@ def _generate_scl_interface(interface_data, project_root_dir): # <-- Nuevo argum
if static_vars:
scl_output.append("VAR")
# Pasar project_root_dir a generate_scl_declarations
scl_output.extend(generate_scl_declarations(static_vars, indent_level=1, project_root_dir=project_root_dir)) # <-- Pasar ruta raíz
scl_output.extend(
generate_scl_declarations(
static_vars, indent_level=1, project_root_dir=project_root_dir
)
) # <-- Pasar ruta raíz
scl_output.append("END_VAR")
else:
print("Advertencia: No se encontró sección 'Static' o está vacía en la interfaz del DB.")
print(
"Advertencia: No se encontró sección 'Static' o está vacía en la interfaz del DB."
)
scl_output.append("VAR\nEND_VAR") # Añadir vacío
scl_output.append("")
return scl_output
# Modificar generate_scl_for_db para aceptar y pasar project_root_dir
def generate_scl_for_db(data, project_root_dir): # <-- Nuevo argumento
"""Genera el contenido SCL completo para un DATA_BLOCK."""
@ -45,7 +74,9 @@ def generate_scl_for_db(data, project_root_dir): # <-- Nuevo argumento
interface_data = data.get("interface", {})
# Pasar project_root_dir a _generate_scl_interface
scl_output.extend(_generate_scl_interface(interface_data, project_root_dir)) # <-- Pasar ruta raíz
scl_output.extend(
_generate_scl_interface(interface_data, project_root_dir)
) # <-- Pasar ruta raíz
scl_output.append("BEGIN")
scl_output.append(" // Data Blocks have no executable code")

View File

@ -0,0 +1,191 @@
# ToUpload/parsers/parse_block_header.py
# -*- coding: utf-8 -*-
from lxml import etree
import os
# Importar desde las utilidades del parser
from .parser_utils import ns, get_multilingual_text
def parse_block_header_from_xml(xml_filepath):
"""
Extrae información del header del bloque desde un archivo XML de TIA Portal.
Args:
xml_filepath (str): Ruta al archivo XML
Returns:
dict: Diccionario con información del bloque:
{
'block_type': 'FC' | 'FB' | 'DB' | 'UDT' | 'PlcTagTable',
'block_number': str | None,
'block_name': str | None,
'programming_language': str | None
}
"""
if not os.path.exists(xml_filepath):
return None
try:
tree = etree.parse(xml_filepath)
root = tree.getroot()
# Buscar diferentes tipos de bloques
block_info = {
"block_type": None,
"block_number": None,
"block_name": None,
"programming_language": None,
}
# 1. Function (FC)
fc_node = root.find(".//SW.Blocks.FC")
if fc_node is not None:
block_info["block_type"] = "FC"
block_info.update(_extract_common_attributes(fc_node))
return block_info
# 2. Function Block (FB)
fb_node = root.find(".//SW.Blocks.FB")
if fb_node is not None:
block_info["block_type"] = "FB"
block_info.update(_extract_common_attributes(fb_node))
return block_info
# 3. Organization Block (OB)
ob_node = root.find(".//SW.Blocks.OB")
if ob_node is not None:
block_info["block_type"] = "OB"
block_info.update(_extract_common_attributes(ob_node))
return block_info
# 4. Data Block (DB) - Global
db_node = root.find(".//SW.Blocks.GlobalDB")
if db_node is not None:
block_info["block_type"] = "GlobalDB"
block_info.update(_extract_common_attributes(db_node))
return block_info
# 5. Data Block (DB) - Instance
idb_node = root.find(".//SW.Blocks.InstanceDB")
if idb_node is not None:
block_info["block_type"] = "InstanceDB"
block_info.update(_extract_common_attributes(idb_node))
return block_info
# 6. User Defined Type (UDT)
udt_node = root.find(".//SW.Types.PlcStruct")
if udt_node is not None:
block_info["block_type"] = "PlcUDT"
block_info.update(_extract_common_attributes(udt_node))
return block_info
# 7. Tag Table
tag_table_node = root.find(".//SW.Tags.PlcTagTable")
if tag_table_node is not None:
block_info["block_type"] = "PlcTagTable"
block_info.update(_extract_common_attributes(tag_table_node))
return block_info
return None
except Exception as e:
print(f"Error parsing block header from {xml_filepath}: {e}")
return None
def _extract_common_attributes(block_node):
"""
Extrae atributos comunes de un nodo de bloque.
Args:
block_node: Nodo XML del bloque
Returns:
dict: Diccionario con atributos extraídos
"""
attributes = {}
# Buscar AttributeList
attr_list = block_node.find("AttributeList")
if attr_list is not None:
# Nombre del bloque
name_elem = attr_list.find("Name")
if name_elem is not None:
attributes["block_name"] = name_elem.text
# Número del bloque
number_elem = attr_list.find("Number")
if number_elem is not None:
attributes["block_number"] = str(number_elem.text)
# Lenguaje de programación
lang_elem = attr_list.find("ProgrammingLanguage")
if lang_elem is not None:
attributes["programming_language"] = lang_elem.text
return attributes
def generate_block_header_comment(block_info):
"""
Genera el comentario de header del bloque basado en la información extraída.
Args:
block_info (dict): Información del bloque extraída del XML
Returns:
str: Línea de comentario del header (ej: "// FC1032")
"""
if not block_info or not block_info.get("block_type"):
return None
block_type = block_info["block_type"]
block_number = block_info.get("block_number")
# Mapear tipos de bloque a abreviaciones
type_mapping = {
"FC": "FC",
"FB": "FB",
"OB": "OB",
"GlobalDB": "DB",
"InstanceDB": "DB",
"PlcUDT": "UDT",
"PlcTagTable": "TAG",
}
abbreviated_type = type_mapping.get(block_type, block_type)
if block_number:
return f"// {abbreviated_type}{block_number}"
else:
return f"// {abbreviated_type}"
# Función de conveniencia para uso directo
def get_block_header_comment_from_xml(xml_filepath):
"""
Función de conveniencia que extrae la información del bloque y genera el comentario de header.
Args:
xml_filepath (str): Ruta al archivo XML
Returns:
str | None: Comentario de header (ej: "// FC1032") o None si no se pudo extraer
"""
block_info = parse_block_header_from_xml(xml_filepath)
if block_info:
return generate_block_header_comment(block_info)
return None
if __name__ == "__main__":
# Ejemplo de uso para testing
import sys
if len(sys.argv) > 1:
xml_file = sys.argv[1]
header = get_block_header_comment_from_xml(xml_file)
print(f"Header for {xml_file}: {header}")
else:
print("Usage: python parse_block_header.py <xml_file>")

View File

@ -15,5 +15,5 @@
"xref_source_subdir": "source"
},
"level3": {},
"working_directory": "C:\\Trabajo\\SIDEL\\09 - SAE452 - Diet as Regular - San Giorgio in Bosco\\Reporte\\TiaExport"
"working_directory": "D:\\Trabajo\\VM\\45 - HENKEL - VM Auto Changeover\\ExportTia"
}

View File

@ -64,5 +64,11 @@
"short_description": "Sin descripción corta.",
"long_description": "",
"hidden": false
},
"test_parser.py": {
"display_name": "test_parser",
"short_description": "Sin descripción corta.",
"long_description": "",
"hidden": false
}
}

View File

@ -1,47 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Script de prueba para verificar que los índices de arrays se capturen correctamente en LAD/FBD."""
import os
import sys
# Añadir el directorio padre al path para los imports
sys.path.insert(0, os.path.dirname(__file__))
from x1_to_json import convert_xml_to_json
if __name__ == "__main__":
xml_file = ".example/FC TT Devices.xml"
json_file = ".example/FC_TT_Devices_test.json"
print(f"Probando conversión de {xml_file} a {json_file}...")
try:
success = convert_xml_to_json(xml_file, json_file)
if success:
print("Conversión exitosa!")
# Buscar patrones de arrays en el JSON generado
with open(json_file, "r", encoding="utf-8") as f:
content = f.read()
# Buscar di0.x con índices
if '"di0.x"[1]' in content:
print(
"✅ ÉXITO: Se encontró di0.x[1] - los índices de arrays se están capturando correctamente!"
)
elif '"di0.x"[]' in content:
print("❌ PROBLEMA: Se encontró di0.x[] - los índices están vacíos")
elif '"di0.x"' in content:
print(
"❌ PROBLEMA: Se encontró di0.x sin índices - el fix no está funcionando"
)
else:
print("⚠️ No se encontró di0.x en el contenido")
else:
print("Error en la conversión")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()

View File

@ -1,7 +1,7 @@
{
"path": "C:\\Trabajo\\SIDEL\\09 - SAE452 - Diet as Regular - San Giorgio in Bosco\\Reporte\\TiaExport",
"path": "D:\\Trabajo\\VM\\45 - HENKEL - VM Auto Changeover\\ExportTia",
"history": [
"C:\\Trabajo\\SIDEL\\09 - SAE452 - Diet as Regular - San Giorgio in Bosco\\Reporte\\TiaExport",
"D:\\Trabajo\\VM\\45 - HENKEL - VM Auto Changeover\\ExportTia"
"D:\\Trabajo\\VM\\45 - HENKEL - VM Auto Changeover\\ExportTia",
"C:\\Trabajo\\SIDEL\\09 - SAE452 - Diet as Regular - San Giorgio in Bosco\\Reporte\\TiaExport"
]
}

View File

@ -18,6 +18,7 @@ import traceback
import json
import datetime # <-- NUEVO: Para timestamps
import shutil # <-- ADDED: Import shutil for file copying
import re # <-- ADDED: Import regex for block header processing
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
@ -162,6 +163,217 @@ def check_skip_status(
return status
# --- FUNCIÓN AUXILIAR PARA AGREGAR HEADER A SCL COPIADOS ---
def add_block_header_to_scl(src_path, dest_path, log_f):
"""
Copia un archivo SCL agregando la línea de identificación del bloque al inicio.
Extrae el tipo y número de bloque del XML correspondiente si existe.
"""
try:
log_message(
f" Procesando archivo SCL: {os.path.basename(src_path)}",
log_f,
also_print=False,
)
# Leer el archivo SCL original
with open(src_path, "r", encoding="utf-8") as f:
content = f.read()
# Verificar si ya tiene header correcto en la primera línea
lines = content.split("\n")
if lines and lines[0].strip():
first_line = lines[0].strip()
if re.match(r"^//\s*(FB|FC|DB|UDT|TAG|OB)\d+\s*$", first_line):
# Ya tiene el header correcto, no necesitamos agregarlo
log_message(
f" ✓ Archivo ya tiene header de bloque: {first_line}",
log_f,
also_print=False,
)
shutil.copy2(src_path, dest_path)
return True
# Intentar encontrar el XML correspondiente
xml_path = src_path.replace(".scl", ".xml")
header_comment = None
# Si no existe en la misma carpeta, buscar en estructura paralela ProgramBlocks_XML
if not os.path.exists(xml_path):
# Intentar convertir ruta de ProgramBlocks_SCL a ProgramBlocks_XML
if "ProgramBlocks_SCL" in src_path:
xml_path = src_path.replace(
"ProgramBlocks_SCL", "ProgramBlocks_XML"
).replace(".scl", ".xml")
# O viceversa, si está en otra estructura
elif "scl_output" in src_path:
# Para archivos ya copiados en scl_output, buscar en ProgramBlocks_XML
# Extraer el nombre base y buscar recursivamente en el proyecto
base_name = os.path.splitext(os.path.basename(src_path))[0]
project_root = src_path
# Subir hasta encontrar la raíz del proyecto (donde están las carpetas ProgramBlocks_*)
while project_root and not any(
os.path.exists(os.path.join(project_root, d))
for d in ["ProgramBlocks_XML", "ProgramBlocks_SCL"]
):
parent = os.path.dirname(project_root)
if parent == project_root: # Llegamos a la raíz del sistema
break
project_root = parent
if project_root:
# Buscar el XML correspondiente recursivamente
xml_search_pattern = os.path.join(
project_root, "**", f"{base_name}.xml"
)
import glob
xml_candidates = glob.glob(xml_search_pattern, recursive=True)
if xml_candidates:
xml_path = xml_candidates[0] # Tomar el primero encontrado
log_message(f" Buscando XML en: {xml_path}", log_f, also_print=False)
if os.path.exists(xml_path):
# Usar el nuevo parser para extraer información del XML
try:
from parsers.parse_block_header import get_block_header_comment_from_xml
header_comment = get_block_header_comment_from_xml(xml_path)
if header_comment:
log_message(
f" Extraído header del XML: {header_comment}",
log_f,
also_print=False,
)
except Exception as e:
log_message(
f" Error extrayendo header del XML {xml_path}: {e}",
log_f,
also_print=False,
)
else:
log_message(
f" XML no encontrado en: {xml_path}", log_f, also_print=False
)
# Si no se pudo extraer del XML, intentar extraer del contenido SCL (fallback)
if not header_comment:
log_message(
f" XML no encontrado o sin header válido, intentando extraer del contenido SCL",
log_f,
also_print=False,
)
header_comment = _extract_header_from_scl_content(content, log_f)
# Escribir el archivo con el header
with open(dest_path, "w", encoding="utf-8") as f:
if header_comment:
f.write(header_comment + "\n")
f.write(content)
if header_comment:
log_message(
f" ✓ Agregado header: {header_comment}", log_f, also_print=False
)
else:
log_message(
f" ⚠ No se pudo determinar tipo/número de bloque, copiando sin header",
log_f,
also_print=False,
)
return True
except Exception as e:
log_message(f" ✗ ERROR procesando archivo SCL: {e}", log_f)
# Fallback: copia simple
try:
shutil.copy2(src_path, dest_path)
log_message(
f" ⚠ Fallback: copia simple realizada", log_f, also_print=False
)
return True
except Exception as e2:
log_message(f" ✗ ERROR en fallback de copia: {e2}", log_f)
return False
def _extract_header_from_scl_content(content, log_f):
"""
Función auxiliar para extraer header del contenido SCL como fallback.
"""
block_type = None
block_number = None
# Buscar primero en comentarios ya existentes
lines = content.split("\n")
for line in lines[:15]: # Buscar en las primeras 15 líneas
line_clean = line.strip()
if line_clean.startswith("//"):
# Buscar patrones como "// Block Number: 1051"
if "Block Number:" in line_clean:
match = re.search(r"Block Number:\s*(\d+)", line_clean)
if match:
block_number = match.group(1)
elif "Block Type:" in line_clean:
if "GlobalDB" in line_clean or "InstanceDB" in line_clean:
block_type = "DB"
elif "FB" in line_clean:
block_type = "FB"
elif "FC" in line_clean:
block_type = "FC"
elif "UDT" in line_clean or "PlcUDT" in line_clean:
block_type = "UDT"
elif "PlcTagTable" in line_clean or "TagTable" in line_clean:
block_type = "TAG"
# Si no se encontró en comentarios, buscar en declaraciones de bloques
if not block_type or not block_number:
for line in lines:
line_clean = line.strip()
# Buscar declaraciones de bloques
if "FUNCTION_BLOCK" in line_clean and '"' in line_clean:
block_type = "FB"
match = re.search(r"FB[_]?(\d+)", line_clean, re.IGNORECASE)
if match:
block_number = match.group(1)
break
elif (
"FUNCTION" in line_clean
and '"' in line_clean
and "FUNCTION_BLOCK" not in line_clean
):
block_type = "FC"
match = re.search(r"FC[_]?(\d+)", line_clean, re.IGNORECASE)
if match:
block_number = match.group(1)
break
elif "DATA_BLOCK" in line_clean and '"' in line_clean:
block_type = "DB"
match = re.search(r"DB[_]?(\d+)", line_clean, re.IGNORECASE)
if match:
block_number = match.group(1)
break
elif "TYPE" in line_clean and '"' in line_clean:
block_type = "UDT"
match = re.search(r"UDT[_]?(\d+)", line_clean, re.IGNORECASE)
if match:
block_number = match.group(1)
break
# Construir la línea de header
if block_type and block_number:
return f"// {block_type}{block_number}"
elif block_type:
return f"// {block_type}"
return None
# --- FIN FUNCIÓN AUXILIAR ---
# --- FUNCIÓN DE LIMPIEZA (x7) ---------------------------------------------------------------------------
@ -807,22 +1019,59 @@ if __name__ == "__main__":
# Check if a file with the same name was already generated from XML
if os.path.exists(dest_scl_path):
log_message(
f" - Omitiendo copia de '{relative_scl_path}': Ya existe un archivo generado con el mismo nombre en el destino.",
f" - Sobreescribiendo archivo existente: '{relative_scl_path}' (agregando cabecera si es necesario)",
log_f,
also_print=True,
)
# En lugar de omitir, vamos a procesarlo para agregar la cabecera
try:
log_message(
f" - Procesando '{relative_scl_path}' para verificar/agregar cabecera",
log_f,
also_print=True,
)
# Usar la función auxiliar que agrega el header del bloque
success = add_block_header_to_scl(
src_scl_path, dest_scl_path, log_f
)
if success:
copied_scl_count += 1
log_message(
f" ✓ Procesado exitosamente",
log_f,
also_print=True,
)
else:
log_message(
f" - ERROR procesando '{relative_scl_path}'", log_f
)
except Exception as copy_err:
log_message(
f" - ERROR procesando '{relative_scl_path}': {copy_err}",
log_f,
also_print=False,
)
skipped_scl_count += 1
else:
try:
log_message(
f" - Copiando '{relative_scl_path}' a '{os.path.relpath(dest_scl_path, working_directory)}'",
log_f,
also_print=False,
also_print=True, # Cambiado a True para ver en consola
)
shutil.copy2(
src_scl_path, dest_scl_path
) # copy2 preserves metadata
# Usar la función auxiliar que agrega el header del bloque
success = add_block_header_to_scl(
src_scl_path, dest_scl_path, log_f
)
if success:
copied_scl_count += 1
log_message(
f" ✓ Copiado exitosamente",
log_f,
also_print=True,
)
else:
log_message(
f" - ERROR procesando '{relative_scl_path}'", log_f
)
except Exception as copy_err:
log_message(
f" - ERROR copiando '{relative_scl_path}': {copy_err}",

21003
data/log.txt

File diff suppressed because it is too large Load Diff

View File

@ -35,18 +35,31 @@ class ConfigurationManager:
self.min_execution_interval = 1
# Instantiate handlers/managers
self.logger = Logger(os.path.join(self.data_path, "log.txt")) # Pass log path to Logger
self.dir_manager = DirectoryManager(self.script_groups_path, self._set_working_directory_internal)
self.logger = Logger(
os.path.join(self.data_path, "log.txt")
) # Pass log path to Logger
self.dir_manager = DirectoryManager(
self.script_groups_path, self._set_working_directory_internal
)
self.group_manager = GroupManager(self.script_groups_path)
self.schema_handler = SchemaHandler(self.data_path, self.script_groups_path, self._get_working_directory_internal)
self.config_handler = ConfigHandler(self.data_path, self.script_groups_path, self._get_working_directory_internal, self.schema_handler)
self.schema_handler = SchemaHandler(
self.data_path,
self.script_groups_path,
self._get_working_directory_internal,
)
self.config_handler = ConfigHandler(
self.data_path,
self.script_groups_path,
self._get_working_directory_internal,
self.schema_handler,
)
self.script_executor = ScriptExecutor(
self.script_groups_path,
self.dir_manager,
self.config_handler,
self.logger, # Pass the central logger instance
self._get_execution_state_internal,
self._set_last_execution_time_internal
self._set_last_execution_time_internal,
)
# --- Internal Callbacks/Getters for Sub-Managers ---
@ -59,9 +72,11 @@ class ConfigurationManager:
data_json_path = os.path.join(path, "data.json")
if not os.path.exists(data_json_path):
try:
with open(data_json_path, 'w', encoding='utf-8') as f:
with open(data_json_path, "w", encoding="utf-8") as f:
json.dump({}, f)
print(f"Info: Created empty data.json in new working directory: {data_json_path}")
print(
f"Info: Created empty data.json in new working directory: {data_json_path}"
)
except Exception as e:
print(f"Warning: Could not create data.json in {path}: {e}")
else:
@ -73,7 +88,10 @@ class ConfigurationManager:
def _get_execution_state_internal(self) -> Dict[str, Any]:
"""Provides execution throttling state to ScriptExecutor."""
return {"last_time": self.last_execution_time, "interval": self.min_execution_interval}
return {
"last_time": self.last_execution_time,
"interval": self.min_execution_interval,
}
def _set_last_execution_time_internal(self, exec_time: float):
"""Callback for ScriptExecutor to update the last execution time."""
@ -127,9 +145,13 @@ class ConfigurationManager:
details.setdefault("author", "Unknown")
return details
def update_group_description(self, group: str, data: Dict[str, Any]) -> Dict[str, str]:
def update_group_description(
self, group: str, data: Dict[str, Any]
) -> Dict[str, str]:
"""Update the description file for a specific group."""
description_path = os.path.join(self.script_groups_path, group, "description.json")
description_path = os.path.join(
self.script_groups_path, group, "description.json"
)
try:
os.makedirs(os.path.dirname(description_path), exist_ok=True)
with open(description_path, "w", encoding="utf-8") as f:
@ -174,14 +196,14 @@ class ConfigurationManager:
group_path = self._get_group_path(group_id)
if not group_path:
return None
return os.path.join(group_path, 'scripts_description.json')
return os.path.join(group_path, "scripts_description.json")
def _load_script_descriptions(self, group_id: str) -> Dict[str, Any]:
"""Carga las descripciones de scripts desde scripts_description.json."""
path = self._get_script_descriptions_path(group_id)
if path and os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
print(f"Error: JSON inválido en {path}")
@ -191,13 +213,17 @@ class ConfigurationManager:
return {}
return {}
def _save_script_descriptions(self, group_id: str, descriptions: Dict[str, Any]) -> bool:
def _save_script_descriptions(
self, group_id: str, descriptions: Dict[str, Any]
) -> bool:
"""Guarda las descripciones de scripts en scripts_description.json."""
path = self._get_script_descriptions_path(group_id)
if path:
try:
os.makedirs(os.path.dirname(path), exist_ok=True) # Asegura que el directorio del grupo existe
with open(path, 'w', encoding='utf-8') as f:
os.makedirs(
os.path.dirname(path), exist_ok=True
) # Asegura que el directorio del grupo existe
with open(path, "w", encoding="utf-8") as f:
json.dump(descriptions, f, indent=4, ensure_ascii=False)
return True
except Exception as e:
@ -208,15 +234,26 @@ class ConfigurationManager:
def _extract_short_description(self, script_path: str) -> str:
"""Extrae la primera línea del docstring de un script Python."""
try:
with open(script_path, 'r', encoding='utf-8') as f:
with open(script_path, "r", encoding="utf-8") as f:
content = f.read()
# Buscar docstring al inicio del archivo """...""" o '''...'''
match = re.match(r'^\s*("""(.*?)"""|\'\'\'(.*?)\'\'\')', content, re.DOTALL | re.MULTILINE)
match = re.match(
r'^\s*("""(.*?)"""|\'\'\'(.*?)\'\'\')',
content,
re.DOTALL | re.MULTILINE,
)
if match:
# Obtener el contenido del docstring (grupo 2 o 3)
docstring = match.group(2) or match.group(3)
# Tomar la primera línea no vacía
first_line = next((line.strip() for line in docstring.strip().splitlines() if line.strip()), None)
first_line = next(
(
line.strip()
for line in docstring.strip().splitlines()
if line.strip()
),
None,
)
return first_line if first_line else "Sin descripción corta."
except Exception as e:
print(f"Error extrayendo descripción de {script_path}: {e}")
@ -234,36 +271,52 @@ class ConfigurationManager:
try:
# Listar archivos .py en el directorio del grupo
script_files = [f for f in os.listdir(group_path) if f.endswith('.py') and os.path.isfile(os.path.join(group_path, f))]
script_files = [
f
for f in os.listdir(group_path)
if f.endswith(".py") and os.path.isfile(os.path.join(group_path, f))
]
for filename in script_files:
script_path = os.path.join(group_path, filename)
if filename not in descriptions:
print(f"Script '{filename}' no encontrado en descripciones, auto-populando.")
print(
f"Script '{filename}' no encontrado en descripciones, auto-populando."
)
short_desc = self._extract_short_description(script_path)
descriptions[filename] = {
"display_name": filename.replace('.py', ''), # Nombre por defecto
"display_name": filename.replace(
".py", ""
), # Nombre por defecto
"short_description": short_desc,
"long_description": "",
"hidden": False
"hidden": False,
}
updated = True
# Añadir a la lista si no está oculto
details = descriptions[filename]
if not details.get('hidden', False):
scripts_details.append({
if not details.get("hidden", False):
scripts_details.append(
{
"filename": filename, # Nombre real del archivo
"display_name": details.get("display_name", filename.replace('.py', '')),
"short_description": details.get("short_description", "Sin descripción corta."),
"long_description": details.get("long_description", "") # Añadir descripción larga
})
"display_name": details.get(
"display_name", filename.replace(".py", "")
),
"short_description": details.get(
"short_description", "Sin descripción corta."
),
"long_description": details.get(
"long_description", ""
), # Añadir descripción larga
}
)
if updated:
self._save_script_descriptions(group, descriptions)
# Ordenar por display_name para consistencia
scripts_details.sort(key=lambda x: x['display_name'])
scripts_details.sort(key=lambda x: x["display_name"])
return scripts_details
except FileNotFoundError:
@ -276,49 +329,88 @@ class ConfigurationManager:
"""Obtiene los detalles completos de un script específico."""
descriptions = self._load_script_descriptions(group_id)
# Devolver detalles o un diccionario por defecto si no existe (aunque list_scripts debería crearlo)
return descriptions.get(script_filename, {
"display_name": script_filename.replace('.py', ''),
return descriptions.get(
script_filename,
{
"display_name": script_filename.replace(".py", ""),
"short_description": "No encontrado.",
"long_description": "",
"hidden": False
})
"hidden": False,
},
)
def update_script_details(self, group_id: str, script_filename: str, details: Dict[str, Any]) -> Dict[str, str]:
def update_script_details(
self, group_id: str, script_filename: str, details: Dict[str, Any]
) -> Dict[str, str]:
"""Actualiza los detalles de un script específico."""
descriptions = self._load_script_descriptions(group_id)
if script_filename in descriptions:
# Asegurarse de que los campos esperados están presentes y actualizar
descriptions[script_filename]["display_name"] = details.get("display_name", descriptions[script_filename].get("display_name", script_filename.replace('.py', '')))
descriptions[script_filename]["short_description"] = details.get("short_description", descriptions[script_filename].get("short_description", "")) # Actualizar descripción corta
descriptions[script_filename]["long_description"] = details.get("long_description", descriptions[script_filename].get("long_description", ""))
descriptions[script_filename]["hidden"] = details.get("hidden", descriptions[script_filename].get("hidden", False))
descriptions[script_filename]["display_name"] = details.get(
"display_name",
descriptions[script_filename].get(
"display_name", script_filename.replace(".py", "")
),
)
descriptions[script_filename]["short_description"] = details.get(
"short_description",
descriptions[script_filename].get("short_description", ""),
) # Actualizar descripción corta
descriptions[script_filename]["long_description"] = details.get(
"long_description",
descriptions[script_filename].get("long_description", ""),
)
descriptions[script_filename]["hidden"] = details.get(
"hidden", descriptions[script_filename].get("hidden", False)
)
if self._save_script_descriptions(group_id, descriptions):
return {"status": "success"}
else:
return {"status": "error", "message": "Fallo al guardar las descripciones de los scripts."}
return {
"status": "error",
"message": "Fallo al guardar las descripciones de los scripts.",
}
else:
# Intentar crear la entrada si el script existe pero no está en el JSON (caso raro)
group_path = self._get_group_path(group_id)
script_path = os.path.join(group_path, script_filename) if group_path else None
script_path = (
os.path.join(group_path, script_filename) if group_path else None
)
if script_path and os.path.exists(script_path):
print(f"Advertencia: El script '{script_filename}' existe pero no estaba en descriptions.json. Creando entrada.")
print(
f"Advertencia: El script '{script_filename}' existe pero no estaba en descriptions.json. Creando entrada."
)
short_desc = self._extract_short_description(script_path)
descriptions[script_filename] = {
"display_name": details.get("display_name", script_filename.replace('.py', '')),
"display_name": details.get(
"display_name", script_filename.replace(".py", "")
),
"short_description": short_desc, # Usar la extraída
"long_description": details.get("long_description", ""),
"hidden": details.get("hidden", False)
"hidden": details.get("hidden", False),
}
if self._save_script_descriptions(group_id, descriptions):
return {"status": "success"}
else:
return {"status": "error", "message": "Fallo al guardar las descripciones de los scripts después de crear la entrada."}
return {
"status": "error",
"message": "Fallo al guardar las descripciones de los scripts después de crear la entrada.",
}
else:
return {"status": "error", "message": f"Script '{script_filename}' no encontrado en las descripciones ni en el sistema de archivos."}
return {
"status": "error",
"message": f"Script '{script_filename}' no encontrado en las descripciones ni en el sistema de archivos.",
}
def execute_script(
self, group: str, script_name: str, broadcast_fn=None
) -> Dict[str, Any]:
# ScriptExecutor uses callbacks to get/set execution state
return self.script_executor.execute_script(group, script_name, broadcast_fn)
def stop_script(
self, group: str, script_name: str, broadcast_fn=None
) -> Dict[str, Any]:
# Delegar al ScriptExecutor para detener el script
return self.script_executor.stop_script(group, script_name, broadcast_fn)

View File

@ -33,6 +33,10 @@ class ScriptExecutor:
self._get_exec_state = get_exec_state_func
self._set_last_exec_time = set_last_exec_time_func
# Diccionario para rastrear procesos en ejecución
# Key: f"{group}:{script_name}", Value: subprocess.Popen object
self.running_processes = {}
def execute_script(
self,
group: str,
@ -118,6 +122,7 @@ class ScriptExecutor:
stderr_capture = ""
process = None
start_time = datetime.now()
process_key = f"{group}:{script_name}"
try:
if broadcast_fn:
@ -141,6 +146,9 @@ class ScriptExecutor:
creationflags=creation_flags,
)
# Registrar el proceso en ejecución
self.running_processes[process_key] = process
while True:
line = process.stdout.readline()
if not line and process.poll() is not None:
@ -232,7 +240,74 @@ class ScriptExecutor:
return {"status": "error", "error": error_msg, "traceback": traceback_info}
finally:
# Remover el proceso del registro cuando termine
if process_key in self.running_processes:
del self.running_processes[process_key]
if process and process.stderr:
process.stderr.close()
if process and process.stdout:
process.stdout.close()
def stop_script(
self,
group: str,
script_name: str,
broadcast_fn: Optional[Callable[[str], None]] = None,
) -> Dict[str, Any]:
"""
Detiene un script en ejecución.
"""
process_key = f"{group}:{script_name}"
if process_key not in self.running_processes:
msg = f"El script {script_name} no está ejecutándose actualmente"
self.app_logger.append_log(f"Warning: {msg}")
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Script not running"}
process = self.running_processes[process_key]
try:
# Verificar que el proceso aún esté vivo
if process.poll() is not None:
# El proceso ya terminó naturalmente
del self.running_processes[process_key]
msg = f"El script {script_name} ya había terminado"
self.app_logger.append_log(f"Info: {msg}")
if broadcast_fn:
broadcast_fn(msg)
return {"status": "already_finished", "message": msg}
# Intentar terminar el proceso suavemente
process.terminate()
# Esperar un poco para ver si termina suavemente
try:
process.wait(timeout=5) # Esperar 5 segundos
msg = f"Script {script_name} detenido correctamente"
self.app_logger.append_log(f"Info: {msg}")
if broadcast_fn:
broadcast_fn(msg)
return {"status": "success", "message": msg}
except subprocess.TimeoutExpired:
# Si no termina suavemente, forzar la terminación
process.kill()
process.wait() # Esperar a que termine definitivamente
msg = f"Script {script_name} forzado a terminar"
self.app_logger.append_log(f"Warning: {msg}")
if broadcast_fn:
broadcast_fn(msg)
return {"status": "forced_kill", "message": msg}
except Exception as e:
error_msg = f"Error al detener el script {script_name}: {str(e)}"
self.app_logger.append_log(f"ERROR: {error_msg}")
if broadcast_fn:
broadcast_fn(error_msg)
return {"status": "error", "error": error_msg}
finally:
# Asegurarse de que el proceso se elimine del registro
if process_key in self.running_processes:
del self.running_processes[process_key]

View File

@ -1,5 +1,8 @@
let currentGroup;
// Registro de procesos en ejecución para scripts de configuración
let runningConfigScripts = new Set();
// Initialize WebSocket connection
let socket = null; // Define socket en un alcance accesible (p.ej., globalmente o en el scope del módulo)
@ -206,11 +209,20 @@ async function loadScripts(group) {
</div>
<div class="flex items-center gap-2 flex-shrink-0">
<div class="flex flex-col items-center">
<div class="flex gap-1">
<button data-filename="${script.filename}"
class="bg-green-500 hover:bg-green-600 text-white px-3 py-1 rounded text-sm w-24 text-center execute-button">
Ejecutar
class="bg-green-500 hover:bg-green-600 text-white px-2 py-1 rounded text-sm execute-button"
title="Ejecutar script">
</button>
<div class="text-xs text-gray-500 mt-1 truncate w-24 text-center" title="${script.filename}">${script.filename}</div>
<button data-filename="${script.filename}"
class="bg-red-500 hover:bg-red-600 text-white px-2 py-1 rounded text-sm stop-button disabled:opacity-50 disabled:cursor-not-allowed"
disabled
title="Detener script">
</button>
</div>
<div class="text-xs text-gray-500 mt-1 truncate w-20 text-center" title="${script.filename}">${script.filename}</div>
</div>
<button data-group="${group}" data-filename="${script.filename}"
class="p-1 rounded text-gray-500 hover:bg-gray-200 hover:text-gray-700 edit-button" title="Editar Detalles">
@ -228,6 +240,11 @@ async function loadScripts(group) {
executeScript(script.filename);
});
const stopButton = div.querySelector('.stop-button');
stopButton.addEventListener('click', () => {
stopScript(script.filename);
});
const editButton = div.querySelector('.edit-button');
editButton.addEventListener('click', () => {
editScriptDetails(group, script.filename);
@ -255,6 +272,10 @@ async function executeScript(scriptName) {
// REMOVE this line - let the backend log the start via WebSocket
// addLogLine(`\nEjecutando script: ${scriptName}...\n`);
// Marcar script como en ejecución
runningConfigScripts.add(scriptName);
updateScriptButtons(scriptName, true);
try {
const response = await fetch('/api/execute_script', {
method: 'POST',
@ -268,6 +289,10 @@ async function executeScript(scriptName) {
console.error(`Error initiating script execution request: ${response.status} ${response.statusText}`, errorText);
// Log only the request error, not script execution errors which come via WebSocket
addLogLine(`\nError al iniciar la petición del script: ${response.status} ${errorText}\n`);
// Desmarcar script si falló el inicio
runningConfigScripts.delete(scriptName);
updateScriptButtons(scriptName, false);
return; // Stop if the request failed
}
@ -280,9 +305,95 @@ async function executeScript(scriptName) {
// Script output and final status/errors will arrive via WebSocket messages
// handled by socket.onmessage -> addLogLine
// Nota: El script se desmarcará cuando termine (en el backend deberíamos enviar una señal de finalización)
// Por ahora, lo desmarcaremos después de un tiempo o cuando el usuario haga clic en stop
} catch (error) {
console.error('Error in executeScript fetch:', error);
addLogLine(`\nError de red o JavaScript al intentar ejecutar el script: ${error.message}\n`);
// Desmarcar script si hubo error
runningConfigScripts.delete(scriptName);
updateScriptButtons(scriptName, false);
}
}
// Stop a script
async function stopScript(scriptName) {
try {
addLogLine(`\nDeteniendo script: ${scriptName}...\n`);
const response = await fetch('/api/stop_script', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ group: currentGroup, script: scriptName })
});
if (!response.ok) {
const errorText = await response.text();
console.error(`Error stopping script: ${response.status} ${response.statusText}`, errorText);
addLogLine(`\nError al detener el script: ${response.status} ${errorText}\n`);
return;
}
const result = await response.json();
if (result.error) {
addLogLine(`\nError al detener script: ${result.error}\n`);
} else {
addLogLine(`\nScript ${scriptName} detenido con éxito.\n`);
}
// Desmarcar script como en ejecución
runningConfigScripts.delete(scriptName);
updateScriptButtons(scriptName, false);
} catch (error) {
console.error('Error stopping script:', error);
addLogLine(`\nError de red al intentar detener el script: ${error.message}\n`);
}
}
// Actualizar estado de botones (ejecutar/stop) para un script
function updateScriptButtons(scriptName, isRunning) {
const scriptItem = document.querySelector(`[data-filename="${scriptName}"]`).closest('.script-item');
if (!scriptItem) return;
const executeButton = scriptItem.querySelector('.execute-button');
const stopButton = scriptItem.querySelector('.stop-button');
if (isRunning) {
executeButton.disabled = true;
executeButton.classList.add('opacity-50', 'cursor-not-allowed');
stopButton.disabled = false;
stopButton.classList.remove('opacity-50', 'cursor-not-allowed');
} else {
executeButton.disabled = false;
executeButton.classList.remove('opacity-50', 'cursor-not-allowed');
stopButton.disabled = true;
stopButton.classList.add('opacity-50', 'cursor-not-allowed');
}
}
// Función para detectar cuando un script ha terminado mediante mensajes de WebSocket
function handleScriptCompletion(message) {
// Buscar patrones que indiquen que un script ha terminado
const completionPatterns = [
/Ejecución de (.+?) finalizada/,
/Script (.+?) detenido/,
/ERROR FATAL.*?en (.+?):/
];
for (const pattern of completionPatterns) {
const match = message.match(pattern);
if (match) {
const scriptName = match[1];
if (runningConfigScripts.has(scriptName)) {
runningConfigScripts.delete(scriptName);
updateScriptButtons(scriptName, false);
console.log(`Script ${scriptName} marcado como terminado`);
}
break;
}
}
}
@ -1078,6 +1189,9 @@ function addLogLine(message) {
// Append the cleaned message + a newline for display separation.
logArea.innerHTML += cleanMessage + '\n';
logArea.scrollTop = logArea.scrollHeight; // Ensure scroll to bottom
// Detectar finalización de scripts
handleScriptCompletion(cleanMessage);
}
}