ParamManagerScripts/backend/script_groups/IO_adaptation/x7_update_CAx.py

619 lines
24 KiB
Python

"""
x7_update_CAx.py : Script que actualiza un archivo AML original basándose en las modificaciones
hechas en el archivo Excel de IOs generado por x2_process_CAx.py
Pipeline:
1. Lee el AML original
2. Lee el Excel modificado
3. Genera Excel de referencia desde el AML para comparar
4. Valida que la estructura base sea la misma (mismos nodos, nombres)
5. Identifica cambios en IOs, direcciones IP, etc.
6. Aplica los cambios al AML original
7. Genera un nuevo archivo AML con sufijo "_updated"
"""
import os
import sys
import tkinter as tk
from tkinter import filedialog, messagebox
import traceback
from lxml import etree as ET
import pandas as pd
from pathlib import Path
import re
import math
import tempfile
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Import functions from x2_process_CAx
from x2_process_CAx import (
extract_aml_data,
generate_io_excel_report,
sanitize_filename
)
def select_aml_file(title="Select Original AML File", initial_dir=None):
"""Abre un diálogo para seleccionar un archivo AML."""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title=title,
filetypes=[("AML Files", "*.aml"), ("All Files", "*.*")],
initialdir=initial_dir
)
root.destroy()
if not file_path:
return None
return file_path
def select_excel_file(title="Select Modified Excel File", initial_dir=None):
"""Abre un diálogo para seleccionar un archivo Excel."""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title=title,
filetypes=[("Excel Files", "*.xlsx"), ("Excel Files", "*.xls"), ("All Files", "*.*")],
initialdir=initial_dir
)
root.destroy()
if not file_path:
return None
return file_path
def generate_reference_excel_from_aml(aml_file_path, temp_dir):
"""
Genera un Excel de referencia desde el AML para comparar con el Excel modificado.
Retorna el path al Excel generado y los project_data.
"""
print("Generando Excel de referencia desde AML original...")
# Extraer datos del AML
try:
parser = ET.XMLParser(remove_blank_text=True, huge_tree=True)
tree = ET.parse(aml_file_path, parser)
root = tree.getroot()
project_data = extract_aml_data(root)
except Exception as e:
print(f"ERROR procesando archivo AML {aml_file_path}: {e}")
return None, None
if not project_data or not project_data.get("plcs"):
print("No se encontraron PLCs en el archivo AML.")
return None, None
# Generar Excel para cada PLC y combinar en uno solo
all_excel_rows = []
for plc_id, plc_data in project_data.get("plcs", {}).items():
# Crear archivo temporal para este PLC
temp_excel_path = os.path.join(temp_dir, f"temp_plc_{plc_id}.xlsx")
# Generar Excel para este PLC (reutilizamos la función existente)
generate_io_excel_report(project_data, temp_excel_path, plc_id, temp_dir)
# Leer el Excel generado y agregar a la lista combinada
if os.path.exists(temp_excel_path):
try:
df_plc = pd.read_excel(temp_excel_path, sheet_name='IO Report')
# Agregar columna de ID único
df_plc['Unique_ID'] = df_plc['PLC Name'] + "+" + df_plc['Device Name']
all_excel_rows.append(df_plc)
os.remove(temp_excel_path) # Limpiar archivo temporal
except Exception as e:
print(f"ERROR leyendo Excel temporal para PLC {plc_id}: {e}")
if not all_excel_rows:
print("No se pudieron generar datos de Excel de referencia.")
return None, None
# Combinar todos los DataFrames
combined_df = pd.concat(all_excel_rows, ignore_index=True)
# Guardar Excel de referencia
reference_excel_path = os.path.join(temp_dir, "reference_excel.xlsx")
combined_df.to_excel(reference_excel_path, sheet_name='IO Report', index=False)
return reference_excel_path, project_data
def compare_excel_files(reference_excel_path, modified_excel_path):
"""
Compara el Excel de referencia con el Excel modificado.
Retorna (is_valid, changes_dict) donde:
- is_valid: True si la estructura básica es la misma
- changes_dict: diccionario con los cambios detectados
"""
print("Comparando archivos Excel...")
try:
# Leer ambos archivos
df_ref = pd.read_excel(reference_excel_path, sheet_name='IO Report')
df_mod = pd.read_excel(modified_excel_path, sheet_name='IO Report')
# Agregar columna de ID único si no existe
if 'Unique_ID' not in df_ref.columns:
df_ref['Unique_ID'] = df_ref['PLC Name'] + "+" + df_ref['Device Name']
if 'Unique_ID' not in df_mod.columns:
df_mod['Unique_ID'] = df_mod['PLC Name'] + "+" + df_mod['Device Name']
except Exception as e:
print(f"ERROR leyendo archivos Excel: {e}")
return False, {}
# Validar estructura básica
if len(df_ref) != len(df_mod):
print(f"ERROR: Número de filas diferente. Referencia: {len(df_ref)}, Modificado: {len(df_mod)}")
return False, {}
# Verificar que todos los IDs únicos coincidan
ref_ids = set(df_ref['Unique_ID'].tolist())
mod_ids = set(df_mod['Unique_ID'].tolist())
if ref_ids != mod_ids:
missing_in_mod = ref_ids - mod_ids
extra_in_mod = mod_ids - ref_ids
print("ERROR: Los IDs únicos no coinciden entre archivos.")
if missing_in_mod:
print(f" Faltantes en modificado: {missing_in_mod}")
if extra_in_mod:
print(f" Extras en modificado: {extra_in_mod}")
return False, {}
print("Estructura básica validada correctamente.")
# Detectar cambios
changes = {}
columns_to_monitor = [
'Device Address', 'IO Input Start Address', 'IO Input End Address',
'IO Output Start Address', 'IO Output End Address',
'Network Type', 'Device Type', 'Order Number', 'Firmware Version'
]
for index, row_ref in df_ref.iterrows():
unique_id = row_ref['Unique_ID']
row_mod = df_mod[df_mod['Unique_ID'] == unique_id].iloc[0]
row_changes = {}
for col in columns_to_monitor:
if col in df_ref.columns and col in df_mod.columns:
ref_val = str(row_ref[col]).strip() if pd.notna(row_ref[col]) else 'N/A'
mod_val = str(row_mod[col]).strip() if pd.notna(row_mod[col]) else 'N/A'
if ref_val != mod_val:
row_changes[col] = {
'original': ref_val,
'modified': mod_val
}
if row_changes:
changes[unique_id] = {
'plc_name': row_ref['PLC Name'],
'device_name': row_ref['Device Name'],
'changes': row_changes
}
print(f"Detectados {len(changes)} dispositivos con cambios.")
for unique_id, change_info in changes.items():
print(f" {unique_id}: {list(change_info['changes'].keys())}")
return True, changes
def find_device_in_aml(project_data, plc_name, device_name):
"""
Encuentra el device_id correspondiente en los datos del AML basándose en PLC y device name.
"""
# Buscar el PLC por nombre
target_plc_id = None
for plc_id, plc_data in project_data.get("plcs", {}).items():
if plc_data.get('name', plc_id) == plc_name:
target_plc_id = plc_id
break
if not target_plc_id:
return None
# Buscar el dispositivo en las redes del PLC
plc_info = project_data["plcs"][target_plc_id]
plc_networks = plc_info.get("connected_networks", {})
for net_id, plc_addr_on_net in plc_networks.items():
net_info = project_data.get("networks", {}).get(net_id)
if not net_info:
continue
devices_on_net = net_info.get("devices_on_net", {})
# Identificar nodos que pertenecen al PLC
plc_interface_and_node_ids = set()
for node in plc_info.get("network_nodes", []):
plc_interface_and_node_ids.add(node["id"])
interface_id_lookup = project_data["devices"].get(node["id"], {}).get("parent_id")
if interface_id_lookup:
plc_interface_and_node_ids.add(interface_id_lookup)
plc_interface_and_node_ids.add(target_plc_id)
# Buscar en dispositivos de la red
for node_id, node_addr in devices_on_net.items():
if node_id in plc_interface_and_node_ids:
continue
node_info = project_data.get("devices", {}).get(node_id)
if not node_info:
continue
# Determinar información del dispositivo
interface_id = node_info.get("parent_id")
interface_info = None
actual_device_id = None
actual_device_info = None
if interface_id:
interface_info = project_data.get("devices", {}).get(interface_id)
if interface_info:
actual_device_id = interface_info.get("parent_id")
if actual_device_id:
actual_device_info = project_data.get("devices", {}).get(actual_device_id)
display_info = actual_device_info if actual_device_info else (interface_info if interface_info else node_info)
display_name = display_info.get("name", "Unknown")
if display_name == device_name:
return {
'node_id': node_id,
'display_id': actual_device_id if actual_device_info else (interface_id if interface_info else node_id),
'node_info': node_info,
'display_info': display_info,
'network_id': net_id
}
return None
def apply_changes_to_aml(aml_tree, project_data, changes_dict):
"""
Aplica los cambios detectados al árbol XML del AML.
"""
print("Aplicando cambios al archivo AML...")
root = aml_tree.getroot()
changes_applied = 0
for unique_id, change_info in changes_dict.items():
plc_name = change_info['plc_name']
device_name = change_info['device_name']
changes = change_info['changes']
print(f" Procesando cambios para: {unique_id}")
# Encontrar el dispositivo en los datos del AML
device_info = find_device_in_aml(project_data, plc_name, device_name)
if not device_info:
print(f" ERROR: No se pudo encontrar el dispositivo en el AML")
continue
# Encontrar el elemento XML correspondiente
device_id = device_info['node_id']
xml_element = root.xpath(f".//*[@ID='{device_id}']")
if not xml_element:
print(f" ERROR: No se pudo encontrar el elemento XML con ID {device_id}")
continue
device_element = xml_element[0]
# Aplicar cambios específicos
for field, change_data in changes.items():
original_val = change_data['original']
modified_val = change_data['modified']
if field == 'Device Address':
# Cambiar dirección de red del dispositivo
if apply_network_address_change(device_element, modified_val):
print(f" ✓ Dirección de red actualizada: {original_val} -> {modified_val}")
changes_applied += 1
else:
print(f" ✗ Error actualizando dirección de red")
elif field in ['IO Input Start Address', 'IO Input End Address', 'IO Output Start Address', 'IO Output End Address']:
# Cambiar direcciones IO específicas
if apply_io_address_change(device_element, field, original_val, modified_val, project_data, device_id):
print(f"{field} actualizada: {original_val} -> {modified_val}")
changes_applied += 1
else:
print(f" ✗ Error actualizando {field}")
elif field in ['Device Type', 'Order Number', 'Firmware Version']:
# Cambiar atributos del dispositivo
if apply_device_attribute_change(device_element, field, modified_val):
print(f"{field} actualizado: {original_val} -> {modified_val}")
changes_applied += 1
else:
print(f" ✗ Error actualizando {field}")
print(f"Total de cambios aplicados: {changes_applied}")
return changes_applied > 0
def apply_network_address_change(device_element, new_address):
"""
Aplica cambio de dirección de red a un elemento del dispositivo.
"""
try:
# Buscar atributo NetworkAddress
addr_attr = device_element.xpath("./*[local-name()='Attribute'][@Name='NetworkAddress']")
if addr_attr:
value_elem = addr_attr[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = new_address
return True
# Si no existe, crear el atributo
attr_elem = ET.SubElement(device_element, "Attribute")
attr_elem.set("Name", "NetworkAddress")
value_elem = ET.SubElement(attr_elem, "Value")
value_elem.text = new_address
return True
except Exception as e:
print(f" Error aplicando cambio de dirección: {e}")
return False
def apply_device_attribute_change(device_element, field, new_value):
"""
Aplica cambio de atributo del dispositivo.
"""
try:
# Mapear campos a nombres de atributos XML
attr_mapping = {
'Device Type': 'TypeName',
'Order Number': 'OrderNumber',
'Firmware Version': 'FirmwareVersion'
}
attr_name = attr_mapping.get(field)
if not attr_name:
return False
# Buscar atributo existente
attr_elem = device_element.xpath(f"./*[local-name()='Attribute'][@Name='{attr_name}']")
if attr_elem:
value_elem = attr_elem[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = new_value
return True
# Si no existe, crear el atributo
attr_elem = ET.SubElement(device_element, "Attribute")
attr_elem.set("Name", attr_name)
value_elem = ET.SubElement(attr_elem, "Value")
value_elem.text = new_value
return True
except Exception as e:
print(f" Error aplicando cambio de atributo {field}: {e}")
return False
def apply_io_address_change(device_element, field, original_val, modified_val, project_data, device_id):
"""
Aplica cambios a las direcciones IO individuales. Ahora es más simple
porque las direcciones están separadas en campos específicos.
"""
try:
# Validar que el nuevo valor sea numérico o N/A
if modified_val != 'N/A' and modified_val != '':
try:
new_addr_value = int(modified_val)
except ValueError:
print(f" Error: Valor de dirección no válido: {modified_val}")
return False
else:
# Si es N/A, no hay dirección IO para este tipo
return True
# Determinar tipo de IO y si es start o end
is_input = "Input" in field
is_start = "Start" in field
io_type = "Input" if is_input else "Output"
# Buscar la estructura IO en el dispositivo
# Esto requiere encontrar los elementos Address y sus sub-atributos
address_elements = device_element.xpath("./*[local-name()='Attribute'][@Name='Address']")
if not address_elements:
print(f" No se encontró elemento Address en el dispositivo")
return False
# Buscar dentro del Address el sub-atributo correcto
address_element = address_elements[0]
io_subelements = address_element.xpath(f"./*[local-name()='Attribute']")
target_io_element = None
for io_sub in io_subelements:
# Verificar si es el tipo correcto (Input/Output)
type_val = io_sub.xpath("./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()")
if type_val and type_val[0].lower() == io_type.lower():
target_io_element = io_sub
break
if not target_io_element:
print(f" No se encontró elemento {io_type} en Address")
return False
# Actualizar StartAddress o calcular Length según el campo
if is_start:
# Actualizar StartAddress
start_attr = target_io_element.xpath("./*[local-name()='Attribute'][@Name='StartAddress']")
if start_attr:
value_elem = start_attr[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = str(new_addr_value)
return True
else:
# Crear StartAddress si no existe
start_attr_elem = ET.SubElement(target_io_element, "Attribute")
start_attr_elem.set("Name", "StartAddress")
value_elem = ET.SubElement(start_attr_elem, "Value")
value_elem.text = str(new_addr_value)
return True
else:
# Es End Address - necesitamos calcular Length basándose en Start y End
# Primero obtener StartAddress
start_attr = target_io_element.xpath("./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()")
if start_attr:
try:
start_value = int(start_attr[0])
# Length en bytes = (end - start + 1)
length_bytes = new_addr_value - start_value + 1
if length_bytes > 0:
# Convertir a bits (asumiendo 8 bits por byte)
length_bits = length_bytes * 8
# Actualizar Length
length_attr = target_io_element.xpath("./*[local-name()='Attribute'][@Name='Length']")
if length_attr:
value_elem = length_attr[0].xpath("./*[local-name()='Value']")
if value_elem:
value_elem[0].text = str(length_bits)
return True
else:
# Crear Length si no existe
length_attr_elem = ET.SubElement(target_io_element, "Attribute")
length_attr_elem.set("Name", "Length")
value_elem = ET.SubElement(length_attr_elem, "Value")
value_elem.text = str(length_bits)
return True
else:
print(f" Error: End address ({new_addr_value}) debe ser mayor que start address ({start_value})")
return False
except ValueError:
print(f" Error: No se pudo convertir start address: {start_attr[0]}")
return False
else:
print(f" Error: No se encontró StartAddress para calcular Length")
return False
return False
except Exception as e:
print(f" Error aplicando cambio de dirección IO {field}: {e}")
traceback.print_exc()
return False
def save_updated_aml(aml_tree, original_aml_path):
"""
Guarda el árbol XML modificado como un nuevo archivo AML con sufijo "_updated".
"""
original_path = Path(original_aml_path)
updated_path = original_path.parent / f"{original_path.stem}_updated{original_path.suffix}"
try:
# Escribir el XML actualizado
aml_tree.write(
str(updated_path),
pretty_print=True,
xml_declaration=True,
encoding='utf-8'
)
print(f"Archivo AML actualizado guardado en: {updated_path}")
return str(updated_path)
except Exception as e:
print(f"ERROR guardando archivo AML actualizado: {e}")
return None
def main():
"""Función principal del script."""
try:
configs = load_configuration()
working_directory = configs.get("working_directory")
except Exception as e:
print(f"Warning: No se pudo cargar configuración: {e}")
configs = {}
working_directory = None
script_version = "v1.1 - Simplified IO Address Handling (Start/End Separated)"
print(f"--- Actualizador de AML desde Excel Modificado ({script_version}) ---")
# Validar directorio de trabajo
if not working_directory or not os.path.isdir(working_directory):
print("Directorio de trabajo no configurado. Usando directorio actual.")
working_directory = os.getcwd()
print(f"Directorio de trabajo: {working_directory}")
# 1. Seleccionar archivo AML original
print("\n1. Seleccione el archivo AML original:")
aml_file_path = select_aml_file(initial_dir=working_directory)
if not aml_file_path:
print("No se seleccionó archivo AML. Saliendo.")
return
# 2. Seleccionar archivo Excel modificado
print("\n2. Seleccione el archivo Excel modificado:")
excel_file_path = select_excel_file(initial_dir=working_directory)
if not excel_file_path:
print("No se seleccionó archivo Excel. Saliendo.")
return
print(f"\nArchivo AML original: {aml_file_path}")
print(f"Archivo Excel modificado: {excel_file_path}")
# 3. Crear directorio temporal para archivos intermedios
with tempfile.TemporaryDirectory() as temp_dir:
print(f"\nUsando directorio temporal: {temp_dir}")
# 4. Generar Excel de referencia desde AML
reference_excel_path, project_data = generate_reference_excel_from_aml(aml_file_path, temp_dir)
if not reference_excel_path or not project_data:
print("ERROR: No se pudo generar Excel de referencia.")
return
# 5. Comparar archivos Excel
is_valid, changes_dict = compare_excel_files(reference_excel_path, excel_file_path)
if not is_valid:
print("ERROR: El archivo Excel modificado no es compatible con el AML original.")
return
if not changes_dict:
print("No se detectaron cambios en el Excel. No hay nada que actualizar.")
return
# 6. Cargar y parsear AML original
print("\nCargando archivo AML original...")
try:
parser = ET.XMLParser(remove_blank_text=True, huge_tree=True)
aml_tree = ET.parse(aml_file_path, parser)
except Exception as e:
print(f"ERROR parseando archivo AML: {e}")
return
# 7. Aplicar cambios al AML
success = apply_changes_to_aml(aml_tree, project_data, changes_dict)
if not success:
print("No se pudieron aplicar cambios al AML.")
return
# 8. Guardar AML actualizado
updated_aml_path = save_updated_aml(aml_tree, aml_file_path)
if updated_aml_path:
print(f"\n¡Proceso completado exitosamente!")
print(f"Archivo AML actualizado: {updated_aml_path}")
else:
print("ERROR: No se pudo guardar el archivo AML actualizado.")
if __name__ == "__main__":
main()