ParamManagerScripts/backend/script_groups/IO_adaptation/x7_update_CAx.py

696 lines
28 KiB
Python

"""
x7_update_CAx.py : Script que actualiza un archivo AML original basándose en las modificaciones
hechas en el archivo Excel de IOs generado por x2_process_CAx.py
Pipeline:
1. Lee el AML original
2. Lee el Excel modificado
3. Genera Excel de referencia desde el AML para comparar
4. Valida que la estructura base sea la misma (mismos nodos, nombres)
5. Identifica cambios en IOs, direcciones IP, etc.
6. Aplica los cambios al AML original
7. Genera un nuevo archivo AML con sufijo "_updated"
"""
import os
import sys
import tkinter as tk
from tkinter import filedialog, messagebox
import traceback
from lxml import etree as ET
import pandas as pd
from pathlib import Path
import re
import math
import tempfile
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Import functions from x2_process_CAx
from x2_process_CAx import (
extract_aml_data,
generate_io_excel_report,
sanitize_filename
)
def select_aml_file(title="Select Original AML File", initial_dir=None):
"""Abre un diálogo para seleccionar un archivo AML."""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title=title,
filetypes=[("AML Files", "*.aml"), ("All Files", "*.*")],
initialdir=initial_dir
)
root.destroy()
if not file_path:
return None
return file_path
def select_excel_file(title="Select Modified Excel File", initial_dir=None):
"""Abre un diálogo para seleccionar un archivo Excel."""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title=title,
filetypes=[("Excel Files", "*.xlsx"), ("Excel Files", "*.xls"), ("All Files", "*.*")],
initialdir=initial_dir
)
root.destroy()
if not file_path:
return None
return file_path
def generate_reference_excel_from_aml(aml_file_path, temp_dir):
"""
Genera un Excel de referencia desde el AML para comparar con el Excel modificado.
Retorna el path al Excel generado y los project_data.
"""
print("Generando Excel de referencia desde AML original...")
# Extraer datos del AML
try:
parser = ET.XMLParser(remove_blank_text=True, huge_tree=True)
tree = ET.parse(aml_file_path, parser)
root = tree.getroot()
project_data = extract_aml_data(root)
except Exception as e:
print(f"ERROR procesando archivo AML {aml_file_path}: {e}")
return None, None
if not project_data or not project_data.get("plcs"):
print("No se encontraron PLCs en el archivo AML.")
return None, None
# Generar Excel para cada PLC y combinar en uno solo
all_excel_rows = []
for plc_id, plc_data in project_data.get("plcs", {}).items():
# Crear archivo temporal para este PLC
temp_excel_path = os.path.join(temp_dir, f"temp_plc_{plc_id}.xlsx")
# Generar Excel para este PLC (reutilizamos la función existente)
generate_io_excel_report(project_data, temp_excel_path, plc_id, temp_dir)
# Leer el Excel generado y agregar a la lista combinada
if os.path.exists(temp_excel_path):
try:
df_plc = pd.read_excel(temp_excel_path, sheet_name='IO Report')
# Agregar columna de ID único
df_plc['Unique_ID'] = df_plc['PLC Name'] + "+" + df_plc['Device Name']
all_excel_rows.append(df_plc)
os.remove(temp_excel_path) # Limpiar archivo temporal
except Exception as e:
print(f"ERROR leyendo Excel temporal para PLC {plc_id}: {e}")
if not all_excel_rows:
print("No se pudieron generar datos de Excel de referencia.")
return None, None
# Combinar todos los DataFrames
combined_df = pd.concat(all_excel_rows, ignore_index=True)
# Guardar Excel de referencia
reference_excel_path = os.path.join(temp_dir, "reference_excel.xlsx")
combined_df.to_excel(reference_excel_path, sheet_name='IO Report', index=False)
return reference_excel_path, project_data
def compare_excel_files(reference_excel_path, modified_excel_path):
"""
Compara el Excel de referencia con el Excel modificado.
Retorna (is_valid, changes_dict) donde:
- is_valid: True si la estructura básica es la misma
- changes_dict: diccionario con los cambios detectados
"""
print("Comparando archivos Excel...")
try:
# Leer ambos archivos
df_ref = pd.read_excel(reference_excel_path, sheet_name='IO Report')
df_mod = pd.read_excel(modified_excel_path, sheet_name='IO Report')
# Agregar columna de ID único si no existe
if 'Unique_ID' not in df_ref.columns:
df_ref['Unique_ID'] = df_ref['PLC Name'] + "+" + df_ref['Device Name']
if 'Unique_ID' not in df_mod.columns:
df_mod['Unique_ID'] = df_mod['PLC Name'] + "+" + df_mod['Device Name']
except Exception as e:
print(f"ERROR leyendo archivos Excel: {e}")
return False, {}
# Validar estructura básica
if len(df_ref) != len(df_mod):
print(f"ERROR: Número de filas diferente. Referencia: {len(df_ref)}, Modificado: {len(df_mod)}")
return False, {}
# Verificar que todos los IDs únicos coincidan
ref_ids = set(df_ref['Unique_ID'].tolist())
mod_ids = set(df_mod['Unique_ID'].tolist())
if ref_ids != mod_ids:
missing_in_mod = ref_ids - mod_ids
extra_in_mod = mod_ids - ref_ids
print("ERROR: Los IDs únicos no coinciden entre archivos.")
if missing_in_mod:
print(f" Faltantes en modificado: {missing_in_mod}")
if extra_in_mod:
print(f" Extras en modificado: {extra_in_mod}")
return False, {}
print("Estructura básica validada correctamente.")
# Detectar cambios
changes = {}
columns_to_monitor = [
'Device Address', 'IO Input Start Address', 'IO Input Word Count',
'IO Output Start Address', 'IO Output Word Count',
'Network Type', 'Device Type', 'Order Number', 'Firmware Version'
]
for index, row_ref in df_ref.iterrows():
unique_id = row_ref['Unique_ID']
row_mod = df_mod[df_mod['Unique_ID'] == unique_id].iloc[0]
row_changes = {}
for col in columns_to_monitor:
if col in df_ref.columns and col in df_mod.columns:
# Manejo especial para valores numéricos
ref_raw = row_ref[col]
mod_raw = row_mod[col]
# Convertir a string, manejando floats apropiadamente
if pd.notna(ref_raw):
if isinstance(ref_raw, float) and ref_raw.is_integer():
ref_val = str(int(ref_raw))
else:
ref_val = str(ref_raw).strip()
else:
ref_val = 'N/A'
if pd.notna(mod_raw):
if isinstance(mod_raw, float) and mod_raw.is_integer():
mod_val = str(int(mod_raw))
else:
mod_val = str(mod_raw).strip()
else:
mod_val = 'N/A'
if ref_val != mod_val:
row_changes[col] = {
'original': ref_val,
'modified': mod_val
}
if row_changes:
changes[unique_id] = {
'plc_name': row_ref['PLC Name'],
'device_name': row_ref['Device Name'],
'changes': row_changes
}
print(f"Detectados {len(changes)} dispositivos con cambios.")
for unique_id, change_info in changes.items():
print(f" {unique_id}: {list(change_info['changes'].keys())}")
# Debug: mostrar los primeros cambios para verificar valores
if len(changes) <= 5: # Solo mostrar detalles si son pocos cambios
for field, change_data in change_info['changes'].items():
print(f" {field}: {change_data['original']}{change_data['modified']}")
return True, changes
def find_device_in_aml(project_data, plc_name, device_name):
"""
Encuentra el device_id correspondiente en los datos del AML basándose en PLC y device name.
"""
# Buscar el PLC por nombre
target_plc_id = None
for plc_id, plc_data in project_data.get("plcs", {}).items():
if plc_data.get('name', plc_id) == plc_name:
target_plc_id = plc_id
break
if not target_plc_id:
return None
# Buscar el dispositivo en las redes del PLC
plc_info = project_data["plcs"][target_plc_id]
plc_networks = plc_info.get("connected_networks", {})
for net_id, plc_addr_on_net in plc_networks.items():
net_info = project_data.get("networks", {}).get(net_id)
if not net_info:
continue
devices_on_net = net_info.get("devices_on_net", {})
# Identificar nodos que pertenecen al PLC
plc_interface_and_node_ids = set()
for node in plc_info.get("network_nodes", []):
plc_interface_and_node_ids.add(node["id"])
interface_id_lookup = project_data["devices"].get(node["id"], {}).get("parent_id")
if interface_id_lookup:
plc_interface_and_node_ids.add(interface_id_lookup)
plc_interface_and_node_ids.add(target_plc_id)
# Buscar en dispositivos de la red
for node_id, node_addr in devices_on_net.items():
if node_id in plc_interface_and_node_ids:
continue
node_info = project_data.get("devices", {}).get(node_id)
if not node_info:
continue
# Determinar información del dispositivo
interface_id = node_info.get("parent_id")
interface_info = None
actual_device_id = None
actual_device_info = None
if interface_id:
interface_info = project_data.get("devices", {}).get(interface_id)
if interface_info:
actual_device_id = interface_info.get("parent_id")
if actual_device_id:
actual_device_info = project_data.get("devices", {}).get(actual_device_id)
display_info = actual_device_info if actual_device_info else (interface_info if interface_info else node_info)
display_name = display_info.get("name", "Unknown")
if display_name == device_name:
return {
'node_id': node_id,
'display_id': actual_device_id if actual_device_info else (interface_id if interface_info else node_id),
'node_info': node_info,
'display_info': display_info,
'network_id': net_id
}
return None
def apply_changes_to_aml(aml_tree, project_data, changes_dict):
"""
Aplica los cambios detectados al árbol XML del AML.
"""
print("Aplicando cambios al archivo AML...")
root = aml_tree.getroot()
changes_applied = 0
for unique_id, change_info in changes_dict.items():
plc_name = change_info['plc_name']
device_name = change_info['device_name']
changes = change_info['changes']
print(f" Procesando cambios para: {unique_id}")
# Encontrar el dispositivo en los datos del AML
device_info = find_device_in_aml(project_data, plc_name, device_name)
if not device_info:
print(f" ERROR: No se pudo encontrar el dispositivo '{device_name}' del PLC '{plc_name}' en el AML")
continue
print(f" Debug: Encontrado dispositivo - node_id: {device_info['node_id']}, display_id: {device_info['display_id']}")
# Encontrar el elemento XML correspondiente
node_id = device_info['node_id']
display_id = device_info['display_id']
# Intentar primero con el display_id (el dispositivo real que contiene IOs)
xml_element = root.xpath(f".//*[@ID='{display_id}']")
if not xml_element:
# Si no se encuentra, intentar con el node_id
xml_element = root.xpath(f".//*[@ID='{node_id}']")
if not xml_element:
print(f" ERROR: No se pudo encontrar el elemento XML con ID {display_id} o {node_id}")
continue
device_element = xml_element[0]
print(f" Debug: Usando elemento XML con ID {device_element.get('ID', 'N/A')}")
# Aplicar cambios específicos
for field, change_data in changes.items():
original_val = change_data['original']
modified_val = change_data['modified']
if field == 'Device Address':
# Cambiar dirección de red del dispositivo
if apply_network_address_change(device_element, modified_val):
print(f" ✓ Dirección de red actualizada: {original_val} -> {modified_val}")
changes_applied += 1
else:
print(f" ✗ Error actualizando dirección de red")
elif field in ['IO Input Start Address', 'IO Input Word Count', 'IO Output Start Address', 'IO Output Word Count']:
# Cambiar direcciones IO específicas
if apply_io_address_change(device_element, field, original_val, modified_val, project_data, display_id):
print(f"{field} actualizada: {original_val} -> {modified_val}")
changes_applied += 1
else:
print(f" ✗ Error actualizando {field}")
elif field in ['Device Type', 'Order Number', 'Firmware Version']:
# Cambiar atributos del dispositivo
if apply_device_attribute_change(device_element, field, modified_val):
print(f"{field} actualizado: {original_val} -> {modified_val}")
changes_applied += 1
else:
print(f" ✗ Error actualizando {field}")
print(f"Total de cambios aplicados: {changes_applied}")
return changes_applied > 0
def apply_network_address_change(device_element, new_address):
"""
Aplica cambio de dirección de red a un elemento del dispositivo.
"""
try:
# Convertir new_address a string si es numérico
if isinstance(new_address, (int, float)):
address_str = str(new_address).rstrip('0').rstrip('.')
else:
address_str = str(new_address)
# Buscar atributo NetworkAddress
addr_attr = device_element.xpath("./*[local-name()='Attribute'][@Name='NetworkAddress']")
if addr_attr:
value_elem = addr_attr[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = address_str
return True
# Si no existe, crear el atributo
attr_elem = ET.SubElement(device_element, "Attribute")
attr_elem.set("Name", "NetworkAddress")
value_elem = ET.SubElement(attr_elem, "Value")
value_elem.text = address_str
return True
except Exception as e:
print(f" Error aplicando cambio de dirección: {e}")
return False
def apply_device_attribute_change(device_element, field, new_value):
"""
Aplica cambio de atributo del dispositivo.
"""
try:
# Mapear campos a nombres de atributos XML
attr_mapping = {
'Device Type': 'TypeName',
'Order Number': 'OrderNumber',
'Firmware Version': 'FirmwareVersion'
}
attr_name = attr_mapping.get(field)
if not attr_name:
return False
# Buscar atributo existente
attr_elem = device_element.xpath(f"./*[local-name()='Attribute'][@Name='{attr_name}']")
if attr_elem:
value_elem = attr_elem[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = new_value
return True
# Si no existe, crear el atributo
attr_elem = ET.SubElement(device_element, "Attribute")
attr_elem.set("Name", attr_name)
value_elem = ET.SubElement(attr_elem, "Value")
value_elem.text = new_value
return True
except Exception as e:
print(f" Error aplicando cambio de atributo {field}: {e}")
return False
def apply_io_address_change(device_element, field, original_val, modified_val, project_data, device_id):
"""
Aplica cambios a las direcciones IO usando Start Address + Word Count.
Esto es más simple y directo que calcular End Addresses.
"""
try:
# Validar que el nuevo valor sea numérico o N/A
if modified_val != 'N/A' and modified_val != '':
try:
# Manejar tanto floats como integers (pandas puede leer como float)
if isinstance(modified_val, str):
# Si es string, intentar convertir
new_value = int(float(modified_val))
else:
# Si ya es numérico, convertir directamente
new_value = int(modified_val)
except (ValueError, TypeError):
print(f" Error: Valor no válido: {modified_val} (tipo: {type(modified_val)})")
return False
else:
# Si es N/A, no hay dirección IO para este tipo
return True
# Determinar tipo de IO y si es start address o word count
is_input = "Input" in field
is_start_address = "Start Address" in field
is_word_count = "Word Count" in field
io_type = "Input" if is_input else "Output"
# Buscar la estructura IO en el dispositivo - con múltiples estrategias
print(f" Debug: Buscando elementos Address en dispositivo {device_element.get('Name', 'N/A')}")
# Estrategia 1: Buscar Address directamente en el elemento
address_elements = device_element.xpath("./*[local-name()='Attribute'][@Name='Address']")
if not address_elements:
# Estrategia 2: Buscar en elementos hijos
print(f" Debug: No se encontró Address directo, buscando en elementos hijos...")
address_elements = device_element.xpath(".//*[local-name()='Attribute'][@Name='Address']")
if not address_elements:
# Estrategia 3: Buscar en elementos padre (tal vez los IOs están en el parent)
print(f" Debug: No se encontró Address en hijos, buscando en elementos padre...")
parent_elements = device_element.xpath("parent::*")
if parent_elements:
address_elements = parent_elements[0].xpath(".//*[local-name()='Attribute'][@Name='Address']")
if not address_elements:
print(f" ERROR: No se encontró elemento Address en el dispositivo o sus alrededores")
print(f" Debug: Atributos disponibles en este elemento:")
for attr in device_element.xpath("./*[local-name()='Attribute']"):
attr_name = attr.get('Name', 'N/A')
print(f" - {attr_name}")
return False
print(f" Debug: Encontrados {len(address_elements)} elemento(s) Address")
# Buscar dentro del Address el sub-atributo correcto
target_io_element = None
# Buscar en todos los elementos Address encontrados
for address_element in address_elements:
print(f" Debug: Explorando Address element...")
io_subelements = address_element.xpath(f"./*[local-name()='Attribute']")
print(f" Debug: Encontrados {len(io_subelements)} sub-elementos en Address")
for io_sub in io_subelements:
sub_name = io_sub.get('Name', 'N/A')
print(f" Debug: Revisando sub-elemento: {sub_name}")
# Verificar si es el tipo correcto (Input/Output)
type_val = io_sub.xpath("./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()")
if type_val:
found_type = type_val[0]
print(f" Debug: Encontrado IoType: {found_type}")
if found_type.lower() == io_type.lower():
target_io_element = io_sub
print(f" Debug: ¡Encontrado elemento {io_type} compatible!")
break
else:
# También revisar si el nombre del sub-elemento indica el tipo
if io_type.lower() in sub_name.lower():
print(f" Debug: Sub-elemento {sub_name} parece ser de tipo {io_type}")
target_io_element = io_sub
break
if target_io_element:
break
if not target_io_element:
print(f" ERROR: No se encontró elemento {io_type} en ningún Address")
print(f" Debug: Elementos Address disponibles:")
for addr_elem in address_elements:
sub_attrs = addr_elem.xpath(f"./*[local-name()='Attribute']")
for sub in sub_attrs:
sub_name = sub.get('Name', 'N/A')
type_vals = sub.xpath("./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()")
type_info = type_vals[0] if type_vals else "No IoType"
print(f" - {sub_name} (IoType: {type_info})")
return False
if is_start_address:
# Actualizar StartAddress
start_attr = target_io_element.xpath("./*[local-name()='Attribute'][@Name='StartAddress']")
if start_attr:
value_elem = start_attr[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = str(new_value)
return True
else:
# Crear StartAddress si no existe
start_attr_elem = ET.SubElement(target_io_element, "Attribute")
start_attr_elem.set("Name", "StartAddress")
value_elem = ET.SubElement(start_attr_elem, "Value")
value_elem.text = str(new_value)
return True
elif is_word_count:
# Actualizar Length basándose en Word Count
# Convertir words a bits (1 word = 16 bits)
length_bits = new_value * 16
# Actualizar Length
length_attr = target_io_element.xpath("./*[local-name()='Attribute'][@Name='Length']")
if length_attr:
value_elem = length_attr[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = str(length_bits)
return True
else:
# Crear Length si no existe
length_attr_elem = ET.SubElement(target_io_element, "Attribute")
length_attr_elem.set("Name", "Length")
value_elem = ET.SubElement(length_attr_elem, "Value")
value_elem.text = str(length_bits)
return True
return False
except Exception as e:
print(f" Error aplicando cambio de dirección IO {field}: {e}")
traceback.print_exc()
return False
def save_updated_aml(aml_tree, original_aml_path):
"""
Guarda el árbol XML modificado como un nuevo archivo AML con sufijo "_updated".
"""
original_path = Path(original_aml_path)
updated_path = original_path.parent / f"{original_path.stem}_updated{original_path.suffix}"
try:
# Escribir el XML actualizado
aml_tree.write(
str(updated_path),
pretty_print=True,
xml_declaration=True,
encoding='utf-8'
)
print(f"Archivo AML actualizado guardado en: {updated_path}")
return str(updated_path)
except Exception as e:
print(f"ERROR guardando archivo AML actualizado: {e}")
return None
def main():
"""Función principal del script."""
try:
configs = load_configuration()
working_directory = configs.get("working_directory")
except Exception as e:
print(f"Warning: No se pudo cargar configuración: {e}")
configs = {}
working_directory = None
script_version = "v1.4 - Enhanced Address Element Search with Debug"
print(f"--- Actualizador de AML desde Excel Modificado ({script_version}) ---")
# Validar directorio de trabajo
if not working_directory or not os.path.isdir(working_directory):
print("Directorio de trabajo no configurado. Usando directorio actual.")
working_directory = os.getcwd()
print(f"Directorio de trabajo: {working_directory}")
# 1. Seleccionar archivo AML original
print("\n1. Seleccione el archivo AML original:")
aml_file_path = select_aml_file(initial_dir=working_directory)
if not aml_file_path:
print("No se seleccionó archivo AML. Saliendo.")
return
# 2. Seleccionar archivo Excel modificado
print("\n2. Seleccione el archivo Excel modificado:")
excel_file_path = select_excel_file(initial_dir=working_directory)
if not excel_file_path:
print("No se seleccionó archivo Excel. Saliendo.")
return
print(f"\nArchivo AML original: {aml_file_path}")
print(f"Archivo Excel modificado: {excel_file_path}")
# 3. Crear directorio temporal para archivos intermedios
with tempfile.TemporaryDirectory() as temp_dir:
print(f"\nUsando directorio temporal: {temp_dir}")
# 4. Generar Excel de referencia desde AML
reference_excel_path, project_data = generate_reference_excel_from_aml(aml_file_path, temp_dir)
if not reference_excel_path or not project_data:
print("ERROR: No se pudo generar Excel de referencia.")
return
# 5. Comparar archivos Excel
is_valid, changes_dict = compare_excel_files(reference_excel_path, excel_file_path)
if not is_valid:
print("ERROR: El archivo Excel modificado no es compatible con el AML original.")
return
if not changes_dict:
print("No se detectaron cambios en el Excel. No hay nada que actualizar.")
return
# 6. Cargar y parsear AML original
print("\nCargando archivo AML original...")
try:
parser = ET.XMLParser(remove_blank_text=True, huge_tree=True)
aml_tree = ET.parse(aml_file_path, parser)
except Exception as e:
print(f"ERROR parseando archivo AML: {e}")
return
# 7. Aplicar cambios al AML
success = apply_changes_to_aml(aml_tree, project_data, changes_dict)
if not success:
print("No se pudieron aplicar cambios al AML.")
return
# 8. Guardar AML actualizado
updated_aml_path = save_updated_aml(aml_tree, aml_file_path)
if updated_aml_path:
print(f"\n¡Proceso completado exitosamente!")
print(f"Archivo AML actualizado: {updated_aml_path}")
else:
print("ERROR: No se pudo guardar el archivo AML actualizado.")
if __name__ == "__main__":
main()