ParamManagerScripts/backend/script_groups/IO_adaptation/x1.py

473 lines
20 KiB
Python

import pandas as pd
import re
import os
import shutil
import openpyxl
import sys
from datetime import datetime
def read_markdown_table(file_path):
"""Leer tabla en formato Markdown de Obsidian y convertirla a DataFrame."""
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Dividir el contenido en líneas
lines = content.strip().split('\n')
# Encontrar el inicio de la tabla (la primera línea que comienza con '|')
table_start = None
for i, line in enumerate(lines):
if line.strip().startswith('|'):
table_start = i
break
if table_start is None:
print("No se encontró ninguna tabla en el archivo")
return pd.DataFrame()
# Encontrar todas las líneas de la tabla
table_lines = []
for i in range(table_start, len(lines)):
line = lines[i].strip()
if line.startswith('|'):
table_lines.append(line)
elif not line: # Línea vacía podría indicar el final de la tabla
# Si la siguiente línea no comienza con '|', consideramos que es el final de la tabla
if i + 1 < len(lines) and not lines[i + 1].strip().startswith('|'):
break
else:
break # Si no comienza con '|' y no está vacía, es el final de la tabla
if len(table_lines) < 3: # Necesitamos al menos encabezado, separador y una fila de datos
print("La tabla no tiene suficientes filas")
return pd.DataFrame()
# Procesar encabezados
header_line = table_lines[0]
separator_line = table_lines[1]
# Verificar que la segunda línea sea realmente un separador
is_separator = all(cell.strip().startswith(':') or cell.strip().startswith('-')
for cell in separator_line.split('|')[1:-1] if cell.strip())
if not is_separator:
print("Advertencia: La segunda línea no parece ser un separador. Se asume que es parte de los datos.")
separator_idx = None
else:
separator_idx = 1
# Extraer encabezados
header_cells = header_line.split('|')
# Eliminar elementos vacíos al principio y al final
if not header_cells[0].strip():
header_cells = header_cells[1:]
if not header_cells[-1].strip():
header_cells = header_cells[:-1]
headers = [h.strip() for h in header_cells]
print(f"Encabezados detectados: {headers}")
# Procesar filas de datos
data_start_idx = 2 if separator_idx == 1 else 1
data = []
for line in table_lines[data_start_idx:]:
# Dividir la línea por el carácter pipe
cells = line.split('|')
# Eliminar elementos vacíos al principio y al final
if not cells[0].strip():
cells = cells[1:]
if not cells[-1].strip():
cells = cells[:-1]
# Limpiar valores
row_values = [cell.strip() for cell in cells]
# Asegurar que la fila tenga el mismo número de columnas que los encabezados
if len(row_values) != len(headers):
print(f"Advertencia: Fila con {len(row_values)} valores, pero se esperaban {len(headers)}. Ajustando...")
# Intentar ajustar la fila para que coincida con el número de columnas
if len(row_values) < len(headers):
row_values.extend([''] * (len(headers) - len(row_values)))
else:
row_values = row_values[:len(headers)]
data.append(row_values)
# Convertir a DataFrame
df = pd.DataFrame(data, columns=headers)
return df
def create_log_file(log_path):
"""Crear un archivo de log con timestamp."""
log_dir = os.path.dirname(log_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
with open(log_path, 'w', encoding='utf-8') as log_file:
log_file.write(f"Log de actualización de PLCTags - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write("=" * 80 + "\n\n")
return log_path
def log_message(log_path, message):
"""Añadir mensaje al log."""
with open(log_path, 'a', encoding='utf-8') as log_file:
log_file.write(message + "\n")
print(message)
def transform_io_address(address):
"""
Transform IO addresses according to the required format:
- Ixx.x → %Exx.x
- Exx.x → %Exx.x
- Qxx.x → %Axx.x
- Axx.x → %Axx.x
- PEWxx → %EWxx
- PAWxx → %AWxx
"""
if not address or not isinstance(address, str):
return address
address = address.strip()
# Patterns for boolean addresses
if re.match(r'^I(\d+)\.(\d+)$', address):
return re.sub(r'^I(\d+)\.(\d+)$', r'%E\1.\2', address)
elif re.match(r'^E(\d+)\.(\d+)$', address):
return re.sub(r'^E(\d+)\.(\d+)$', r'%E\1.\2', address)
elif re.match(r'^Q(\d+)\.(\d+)$', address):
return re.sub(r'^Q(\d+)\.(\d+)$', r'%A\1.\2', address)
elif re.match(r'^A(\d+)\.(\d+)$', address):
return re.sub(r'^A(\d+)\.(\d+)$', r'%A\1.\2', address)
# Patterns for word addresses
elif re.match(r'^PEW(\d+)$', address):
return re.sub(r'^PEW(\d+)$', r'%EW\1', address)
elif re.match(r'^PAW(\d+)$', address):
return re.sub(r'^PAW(\d+)$', r'%AW\1', address)
# If already in correct format or unknown format, return as is
return address
def update_excel_with_adaptation(excel_path, adaptation_path, output_path=None, log_path=None):
"""
Actualiza el archivo Excel con la información de adaptación.
- Modifica la columna "Logical Address" según las reglas:
1. Si el tag se encuentra en la tabla de adaptación, convierte el formato de IO.
2. Si no se encuentra y tiene formato %E, %A, %EW, %AW, asigna una dirección %M.
Args:
excel_path: Ruta al archivo Excel de tags PLC
adaptation_path: Ruta al archivo de adaptación en Markdown
output_path: Ruta para guardar el Excel actualizado (si es None, sobrescribe el original)
log_path: Ruta para el archivo de log
"""
# Si no se especifica ruta de salida, sobrescribir el archivo original
if output_path is None:
output_path = excel_path
# Si no se especifica ruta de log, crear una por defecto
if log_path is None:
log_dir = os.path.dirname(output_path)
log_filename = f"update_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
log_path = os.path.join(log_dir, log_filename)
# Crear archivo de log
create_log_file(log_path)
log_message(log_path, f"Archivo Excel de entrada: {excel_path}")
log_message(log_path, f"Archivo de adaptación: {adaptation_path}")
log_message(log_path, f"Archivo Excel de salida: {output_path}")
log_message(log_path, "-" * 80)
# Leer el archivo de adaptación
adaptation_df = read_markdown_table(adaptation_path)
# Identificar automáticamente la columna Master TAG
master_tag_col = None
for col in adaptation_df.columns:
# Buscar columnas con "master" y "tag" en el nombre (insensible a mayúsculas)
if "master" in col.lower() and "tag" in col.lower():
master_tag_col = col
break
# Si no encontramos la columna por nombre exacto, intentar con coincidencias parciales
if not master_tag_col:
for col in adaptation_df.columns:
if any(keyword in col.lower() for keyword in ["master", "tag", "name"]):
master_tag_col = col
break
# Si aún no hemos encontrado, verificar el contenido de las columnas
if not master_tag_col and not adaptation_df.empty:
# Buscar columnas que contengan valores que parezcan tags (con formato DI_xxx, DO_xxx, etc.)
for col in adaptation_df.columns:
# Tomar algunas muestras
samples = adaptation_df[col].dropna().astype(str).head(5).tolist()
# Comprobar si alguna muestra coincide con el patrón de Master TAG
tag_pattern = r'^[A-Z]{2,3}_[A-Za-z0-9_]+$'
if any(re.match(tag_pattern, s) for s in samples):
master_tag_col = col
break
if not master_tag_col:
error_msg = "Error: No se encontró la columna 'Master Tag' o similar en el archivo de adaptación"
log_message(log_path, error_msg)
log_message(log_path, f"Columnas disponibles: {adaptation_df.columns.tolist()}")
return False
log_message(log_path, f"Usando columna '{master_tag_col}' para el mapeo de tags")
# Aseguramos que no tenemos filas completamente vacías o solo con Master TAG vacío
adaptation_df = adaptation_df[adaptation_df[master_tag_col].notna() &
(adaptation_df[master_tag_col] != '')]
# Identificar automáticamente la columna IO
io_col = None
for col in adaptation_df.columns:
# Buscar coincidencias exactas
if col.lower() == "io":
io_col = col
break
# Buscar coincidencias parciales
if any(keyword in col.lower() for keyword in ["io", "i/o", "address", "logical"]):
io_col = col
break
# Si aún no encontramos, verificar el contenido de las columnas
if not io_col and not adaptation_df.empty:
# Buscar columnas que contengan valores que parezcan direcciones IO (I0.0, Q1.2, etc.)
for col in adaptation_df.columns:
# Tomar algunas muestras
samples = adaptation_df[col].dropna().astype(str).head(5).tolist()
# Definir patrones para direcciones IO
io_patterns = [
r'^[IQM][0-9]+\.[0-9]+$', # Ejemplo: I0.0, Q1.2
r'^PE[WBD][0-9]+$', # Ejemplo: PEW100
r'^PA[WBD][0-9]+$' # Ejemplo: PAW100
]
# Verificar si alguna muestra coincide con algún patrón
matches = False
for pattern in io_patterns:
if any(re.match(pattern, s) for s in samples):
matches = True
break
if matches:
io_col = col
break
if not io_col:
error_msg = "Error: No se encontró la columna 'IO' o similar en el archivo de adaptación"
log_message(log_path, error_msg)
log_message(log_path, f"Columnas disponibles: {adaptation_df.columns.tolist()}")
return False
log_message(log_path, f"Usando columna '{io_col}' para los valores de IO")
# Eliminar el archivo de salida si ya existe
if os.path.exists(output_path):
try:
os.remove(output_path)
log_message(log_path, f"Archivo de salida existente eliminado: {output_path}")
except Exception as e:
log_message(log_path, f"Error al eliminar archivo existente: {e}")
return False
# Crear una copia exacta del archivo Excel original
try:
shutil.copy2(excel_path, output_path)
log_message(log_path, f"Archivo Excel copiado: {excel_path} -> {output_path}")
except Exception as e:
log_message(log_path, f"Error al copiar el archivo Excel: {e}")
return False
# Abrir el archivo Excel copiado usando openpyxl para preservar estructura
try:
workbook = openpyxl.load_workbook(output_path)
log_message(log_path, f"Archivo Excel abierto correctamente: {output_path}")
log_message(log_path, f"Hojas disponibles: {workbook.sheetnames}")
except Exception as e:
log_message(log_path, f"Error al abrir el archivo Excel: {e}")
return False
# Crear un diccionario de actualización desde el archivo de adaptación
update_dict = {}
unmatched_count = 0
for idx, row in adaptation_df.iterrows():
master_tag = row[master_tag_col]
io_value = row[io_col]
# Convertir a string y limpiar espacios
master_tag_str = str(master_tag).strip() if not pd.isna(master_tag) else ""
io_value_str = str(io_value).strip() if not pd.isna(io_value) else ""
if master_tag_str and io_value_str:
update_dict[master_tag_str] = transform_io_address(io_value_str)
else:
unmatched_count += 1
if unmatched_count > 0:
log_message(log_path, f"Advertencia: {unmatched_count} filas en el archivo de adaptación tenían valores vacíos de Master TAG o IO")
log_message(log_path, f"Tags encontrados en el archivo de adaptación: {len(update_dict)}")
# Inicializar contador para direcciones %M
memory_byte_counter = 3600
memory_bit_counter = 0
# Contador de coincidencias
matched_tags = []
unmatched_adaptation_tags = set(update_dict.keys())
converted_to_memory = []
# Procesar cada hoja
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
log_message(log_path, f"\nProcesando hoja: {sheet_name}")
# Encontrar la columna "Name" y "Logical Address"
name_col_idx = None
logical_addr_col_idx = None
data_type_col_idx = None
for col_idx, cell in enumerate(sheet[1], 1): # Asumiendo que la primera fila contiene encabezados
cell_value = str(cell.value).lower() if cell.value else ""
if "name" in cell_value:
name_col_idx = col_idx
log_message(log_path, f"Columna 'Name' encontrada en posición {col_idx}")
if "logical address" in cell_value:
logical_addr_col_idx = col_idx
log_message(log_path, f"Columna 'Logical Address' encontrada en posición {col_idx}")
if "data type" in cell_value:
data_type_col_idx = col_idx
log_message(log_path, f"Columna 'Data Type' encontrada en posición {col_idx}")
if name_col_idx is None or logical_addr_col_idx is None:
log_message(log_path, f"No se encontraron las columnas necesarias en la hoja {sheet_name}, omitiendo...")
continue
# Actualizar los valores de "Logical Address"
updates_in_sheet = 0
memory_address_conversions = 0
for row_idx, row in enumerate(sheet.iter_rows(min_row=2), 2): # Comenzando desde la fila 2
name_cell = row[name_col_idx - 1] # Ajuste para índice base 0
logical_addr_cell = row[logical_addr_col_idx - 1] # Ajuste para índice base 0
data_type_cell = row[data_type_col_idx - 1] if data_type_col_idx else None # Puede ser None
tag_name = str(name_cell.value).strip() if name_cell.value else ""
current_address = str(logical_addr_cell.value).strip() if logical_addr_cell.value else ""
data_type = str(data_type_cell.value).strip().lower() if data_type_cell and data_type_cell.value else ""
if not tag_name or not current_address:
continue
# Caso 1: Tag encontrado en el diccionario de adaptación
if tag_name in update_dict:
old_value = logical_addr_cell.value
new_value = update_dict[tag_name]
logical_addr_cell.value = new_value
updates_in_sheet += 1
matched_tags.append(tag_name)
if tag_name in unmatched_adaptation_tags:
unmatched_adaptation_tags.remove(tag_name)
log_message(log_path, f" Actualizado: {tag_name} | Viejo valor: {old_value} | Nuevo valor: {new_value}")
# Caso 2: Tag no encontrado en adaptación pero con formato %E, %A, %EW, %AW
elif (current_address.startswith('%E') or
current_address.startswith('%A') or
current_address.startswith('%EW') or
current_address.startswith('%AW')):
old_value = logical_addr_cell.value
# Determinar si es booleano o word
is_boolean = ('bool' in data_type) or ('.') in current_address
if is_boolean:
# Para boolean, usamos formato %M byte.bit
new_value = f"%M{memory_byte_counter}.{memory_bit_counter}"
memory_bit_counter += 1
# Si llegamos a bit 8, pasamos al siguiente byte
if memory_bit_counter > 7:
memory_bit_counter = 0
memory_byte_counter += 1
else:
# Para word, usamos %MW y aumentamos en incrementos de 2
new_value = f"%MW{memory_byte_counter}"
memory_byte_counter += 2
logical_addr_cell.value = new_value
memory_address_conversions += 1
converted_to_memory.append(tag_name)
log_message(log_path, f" Convertido a memoria: {tag_name} | Viejo valor: {old_value} | Nuevo valor: {new_value}")
log_message(log_path, f"Total de actualizaciones en la hoja {sheet_name}: {updates_in_sheet}")
log_message(log_path, f"Total de conversiones a memoria en la hoja {sheet_name}: {memory_address_conversions}")
# Guardar cambios
try:
workbook.save(output_path)
log_message(log_path, f"\nArchivo Excel actualizado guardado: {output_path}")
except Exception as e:
log_message(log_path, f"Error al guardar el archivo Excel: {e}")
return False
# Mostrar resumen
unique_matched_tags = set(matched_tags)
log_message(log_path, "\n" + "=" * 30 + " RESUMEN " + "=" * 30)
log_message(log_path, f"Total de tags en archivo de adaptación: {len(update_dict)}")
log_message(log_path, f"Total de tags actualizados (coincidencias): {len(unique_matched_tags)}")
log_message(log_path, f"Total de tags convertidos a memoria: {len(converted_to_memory)}")
# Mostrar tags del archivo de adaptación sin coincidencias
if unmatched_adaptation_tags:
log_message(log_path, f"\nTags sin coincidencias ({len(unmatched_adaptation_tags)}):")
for tag in sorted(unmatched_adaptation_tags):
log_message(log_path, f" - {tag} -> {update_dict[tag]}")
return True
if __name__ == "__main__":
# Rutas de archivos predeterminadas
adaptation_table = r"C:\Users\migue\OneDrive\Miguel\Obsidean\Trabajo\VM\04-SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\SAE196 - IO Adapted.md"
tag_from_master_table = r"C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\TAGsIO\PLCTags.xlsx"
output_table = r"C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\TAGsIO\PLCTags_Updated.xlsx" # Crear un nuevo archivo para no sobrescribir el original
log_path = r"update_log.txt" # Ruta para el archivo de log
# Permitir pasar rutas como argumentos desde la línea de comandos
if len(sys.argv) > 1:
tag_from_master_table = sys.argv[1]
if len(sys.argv) > 2:
adaptation_table = sys.argv[2]
if len(sys.argv) > 3:
output_table = sys.argv[3]
if len(sys.argv) > 4:
log_path = sys.argv[4]
# Ejecutar la actualización
result = update_excel_with_adaptation(tag_from_master_table, adaptation_table, output_table, log_path)
if result:
print("\nProceso completado exitosamente.")
print(f"Se ha generado un archivo log en: {log_path}")
else:
print("\nHubo errores durante el proceso.")
print(f"Consulte el archivo log para más detalles: {log_path}")