""" convert Markdown tables from adapted IO to Excel for import into TIA Portal Updated to work with paths defined in a shared JSON config file. """ import pandas as pd import openpyxl import re import os import sys import json import tkinter as tk from tkinter import filedialog, messagebox from datetime import datetime # Determine script_root and add to sys.path for custom module import try: current_script_path = os.path.abspath(__file__) script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(current_script_path))) ) if script_root not in sys.path: sys.path.append(script_root) from backend.script_utils import load_configuration except ImportError: print( "Error: No se pudo importar 'load_configuration' desde 'backend.script_utils'." ) sys.exit(1) except NameError: # __file__ is not defined print( "Error: __file__ no está definido. Este script podría no estar ejecutándose en un entorno Python estándar." ) sys.exit(1) def load_path_config(working_directory=None): """ Carga la configuración de paths desde un archivo JSON Si no existe, crea uno con valores predeterminados """ # Si no se proporciona working_directory, usar el directorio actual if not working_directory: try: configs = load_configuration() working_directory = configs.get("working_directory") if not working_directory: print("Error: 'working_directory' no se encontró en la configuración.") working_directory = os.getcwd() except: working_directory = os.getcwd() # Path para el archivo JSON de configuración json_config_path = os.path.join(working_directory, "io_paths_config.json") # Si el archivo existe, cargarlo if os.path.exists(json_config_path): try: with open(json_config_path, 'r', encoding='utf-8') as f: config = json.load(f) print(f"Configuración de paths cargada desde: {json_config_path}") return config except Exception as e: print(f"Error al cargar el archivo de configuración JSON: {e}") return None # Si no existe, crear uno con valores predeterminados default_config = { "paths": [ { "path": "Inputs", "type": "Input", "no_used_path": "IO Not in Hardware\\InputsMaster" }, { "path": "Outputs", "type": "Output", "no_used_path": "IO Not in Hardware\\OutputsMaster" }, { "path": "OutputsFesto", "type": "Output", "no_used_path": "IO Not in Hardware\\OutputsMaster" }, { "path": "IO Not in Hardware\\InputsMaster", "type": "Input", "no_used_path": "IO Not in Hardware\\InputsMaster" }, { "path": "IO Not in Hardware\\OutputsMaster", "type": "Output", "no_used_path": "IO Not in Hardware\\OutputsMaster" } ] } try: with open(json_config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=2) print(f"Archivo de configuración creado: {json_config_path}") return default_config except Exception as e: print(f"Error al crear el archivo de configuración JSON: {e}") return None def read_markdown_table(file_path): """Leer tabla en formato Markdown y convertirla a DataFrame.""" with open(file_path, 'r', encoding='utf-8') as file: content = file.read() # Dividir el contenido en líneas lines = content.strip().split('\n') # Encontrar el inicio de la tabla (primera línea que comienza con '|') table_start = None for i, line in enumerate(lines): if line.strip().startswith('|'): table_start = i break if table_start is None: print("No se encontró ninguna tabla en el archivo") return pd.DataFrame() # Encontrar todas las líneas de la tabla table_lines = [] for i in range(table_start, len(lines)): line = lines[i].strip() if line.startswith('|'): table_lines.append(line) elif not line: # Línea vacía podría indicar el final de la tabla if i + 1 < len(lines) and not lines[i + 1].strip().startswith('|'): break else: break # Si no comienza con '|' y no está vacía, es el final de la tabla if len(table_lines) < 3: # Necesitamos al menos encabezado, separador y una fila de datos print("La tabla no tiene suficientes filas") return pd.DataFrame() # Procesar encabezados header_line = table_lines[0] separator_line = table_lines[1] # Verificar que la segunda línea sea realmente un separador is_separator = all(cell.strip().startswith(':') or cell.strip().startswith('-') for cell in separator_line.split('|')[1:-1] if cell.strip()) if not is_separator: print("Advertencia: La segunda línea no parece ser un separador. Se asume que es parte de los datos.") separator_idx = None else: separator_idx = 1 # Extraer encabezados header_cells = header_line.split('|') # Eliminar elementos vacíos al principio y al final if not header_cells[0].strip(): header_cells = header_cells[1:] if not header_cells[-1].strip(): header_cells = header_cells[:-1] headers = [h.strip() for h in header_cells] print(f"Encabezados detectados: {headers}") # Procesar filas de datos data_start_idx = 2 if separator_idx == 1 else 1 data = [] for line in table_lines[data_start_idx:]: # Dividir la línea por el carácter pipe cells = line.split('|') # Eliminar elementos vacíos al principio y al final if not cells[0].strip(): cells = cells[1:] if not cells[-1].strip(): cells = cells[:-1] # Limpiar valores row_values = [cell.strip() for cell in cells] # Asegurar que la fila tenga el mismo número de columnas que los encabezados if len(row_values) != len(headers): print(f"Advertencia: Fila con {len(row_values)} valores, pero se esperaban {len(headers)}. Ajustando...") # Intentar ajustar la fila para que coincida con el número de columnas if len(row_values) < len(headers): row_values.extend([''] * (len(headers) - len(row_values))) else: row_values = row_values[:len(headers)] data.append(row_values) # Convertir a DataFrame df = pd.DataFrame(data, columns=headers) return df def create_log_file(output_dir): """Crear un archivo de log con timestamp.""" log_filename = f"update_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" log_path = os.path.join(output_dir, log_filename) try: with open(log_path, 'w', encoding='utf-8') as log_file: log_file.write(f"Log de actualización de PLCTags - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") log_file.write("=" * 80 + "\n\n") return log_path except Exception as e: print(f"Error al crear el archivo de log: {e}") # Si hay un error, intentar crear en el directorio actual fallback_path = os.path.join(os.getcwd(), log_filename) try: with open(fallback_path, 'w', encoding='utf-8') as log_file: log_file.write(f"Log de actualización de PLCTags - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") log_file.write("=" * 80 + "\n\n") log_file.write(f"ADVERTENCIA: No se pudo crear el log en {output_dir}. Usando directorio alternativo.\n\n") return fallback_path except: print("Error crítico: No se pudo crear ningún archivo de log.") return None def log_message(log_path, message): """Añadir mensaje al log.""" if log_path: try: with open(log_path, 'a', encoding='utf-8') as log_file: log_file.write(message + "\n") except Exception as e: print(f"Error al escribir en el log: {e}") print(message) def transform_io_address(address): """ Transform IO addresses according to the required format: - Ixx.x → %Exx.x - Exx.x → %Exx.x - Qxx.x → %Axx.x - Axx.x → %Axx.x - PEWxx → %EWxx - PAWxx → %AWxx - EW xx..xx → %EWxx (ranges for Profibus) - AW xx..xx → %AWxx (ranges for Profibus) """ if not address or not isinstance(address, str): return address address = address.strip() # Handle Profibus ranges (extract the first number before the range) profibus_match = re.match(r'^(EW|AW)\s+(\d+)\.\..*$', address) if profibus_match: prefix, number = profibus_match.groups() if prefix == 'EW': return f"%EW{number}" elif prefix == 'AW': return f"%AW{number}" # Patterns for boolean addresses if re.match(r'^I(\d+)\.(\d+)$', address): return re.sub(r'^I(\d+)\.(\d+)$', r'%E\1.\2', address) elif re.match(r'^E(\d+)\.(\d+)$', address): return re.sub(r'^E(\d+)\.(\d+)$', r'%E\1.\2', address) elif re.match(r'^Q(\d+)\.(\d+)$', address): return re.sub(r'^Q(\d+)\.(\d+)$', r'%A\1.\2', address) elif re.match(r'^A(\d+)\.(\d+)$', address): return re.sub(r'^A(\d+)\.(\d+)$', r'%A\1.\2', address) # Patterns for word addresses elif re.match(r'^PEW(\d+)$', address): return re.sub(r'^PEW(\d+)$', r'%EW\1', address) elif re.match(r'^PAW(\d+)$', address): return re.sub(r'^PAW(\d+)$', r'%AW\1', address) # If already in correct format or unknown format, return as is return address def is_input_tag(tag_name): """Determinar si un tag es de entrada basado en su nombre.""" input_prefixes = ['DI_', 'AI_', 'P_AI_', 'P_FT', 'P_CT', 'P_PT', 'P_TT', 'P_g', 'P_PDS_'] for prefix in input_prefixes: if tag_name.startswith(prefix): # Excepciones para P_g que pueden ser outputs if tag_name.startswith('P_g') and ('_VFC_ControlWord' in tag_name or '_Refvalue' in tag_name): return False return True return False def is_output_tag(tag_name): """Determinar si un tag es de salida basado en su nombre.""" output_prefixes = ['DO_', 'AO_', 'P_AO_', 'P_g', 'MaselliHold', 'MaselliSpare'] for prefix in output_prefixes: if tag_name.startswith(prefix): return True # Si comienza con P_g, revisar si tiene '_VFC_ControlWord' o 'Refvalue' que son outputs if tag_name.startswith('P_g') and ('_VFC_ControlWord' in tag_name or '_VFC_Refvalue' in tag_name): return True # Si comienza con P_PDS_, revisar si son outputs específicos if tag_name.startswith('P_PDS_') and ('_Recipe_Number' in tag_name or '_Freeze_To_PDS' in tag_name or '_Stop_to_PDS' in tag_name): return True return False def is_bit_type(data_type): """Determinar si el tipo de dato es un bit (Bool).""" return data_type.lower() == 'bool' def update_plc_tags(excel_path, md_path, output_path, log_path): """ Actualiza el archivo Excel con la información del archivo Markdown. Args: excel_path: Ruta al archivo Excel exportado de TIA Portal md_path: Ruta al archivo Markdown con la adaptación IO output_path: Ruta para guardar el Excel actualizado log_path: Ruta para el archivo de log """ log_message(log_path, f"Iniciando proceso de actualización") log_message(log_path, f"Archivo Excel de entrada: {excel_path}") log_message(log_path, f"Archivo Markdown de entrada: {md_path}") log_message(log_path, f"Archivo Excel de salida: {output_path}") log_message(log_path, "-" * 80) # Cargar configuración de paths excel_dir = os.path.dirname(excel_path) path_config = load_path_config(excel_dir) if not path_config: log_message(log_path, "ERROR: No se pudo cargar la configuración de paths") return False # Extraer información de paths desde la configuración path_info = {} for path_entry in path_config["paths"]: path_info[path_entry["path"]] = { "type": path_entry["type"], "no_used_path": path_entry["no_used_path"] } log_message(log_path, f"Configuración de paths cargada:") for path, info in path_info.items(): log_message(log_path, f" - {path}: tipo={info['type']}, no_used_path={info['no_used_path']}") # Leer el archivo Markdown md_df = read_markdown_table(md_path) # Identificar las columnas relevantes en el archivo Markdown io_col = None master_tag_col = None for col in md_df.columns: col_lower = col.lower() if col_lower == 'io' or 'address' in col_lower: io_col = col elif 'master' in col_lower and 'tag' in col_lower: master_tag_col = col if not io_col or not master_tag_col: log_message(log_path, "ERROR: No se pudieron identificar las columnas necesarias en el archivo Markdown") return False log_message(log_path, f"Columna IO: {io_col}") log_message(log_path, f"Columna Master Tag: {master_tag_col}") # Crear un diccionario de mapeo IO desde el Markdown io_mapping = {} for _, row in md_df.iterrows(): master_tag = str(row[master_tag_col]).strip() io_value = str(row[io_col]).strip() if master_tag and io_value and master_tag != 'nan' and io_value != 'nan': io_mapping[master_tag] = transform_io_address(io_value) log_message(log_path, f"Tags mapeados en el archivo Markdown: {len(io_mapping)}") # Cargar el archivo Excel try: # Usar openpyxl para mantener la estructura del Excel workbook = openpyxl.load_workbook(excel_path) log_message(log_path, f"Archivo Excel cargado: {excel_path}") log_message(log_path, f"Hojas disponibles: {workbook.sheetnames}") except Exception as e: log_message(log_path, f"ERROR: No se pudo cargar el archivo Excel: {e}") return False # Inicializar contadores para direcciones de memoria input_mem_byte = 3600 input_mem_bit = 0 output_mem_byte = 3800 output_mem_bit = 0 # Estadísticas total_tags = 0 updated_tags = 0 relocated_tags = {} assigned_memory_addresses = 0 # Inicializar el contador para cada tipo de relocalización for path in path_info.keys(): relocated_tags[path] = 0 # Lista de paths válidos desde la configuración valid_paths = list(path_info.keys()) # Procesamos la hoja principal (asumimos que es la primera) if len(workbook.sheetnames) > 0: sheet = workbook[workbook.sheetnames[0]] # Encontrar las columnas relevantes name_col = None path_col = None data_type_col = None logical_address_col = None for col_idx, cell in enumerate(sheet[1], 1): cell_value = str(cell.value).strip() if cell.value else "" if cell_value.lower() == "name": name_col = col_idx elif cell_value.lower() == "path": path_col = col_idx elif cell_value.lower() == "data type": data_type_col = col_idx elif cell_value.lower() == "logical address": logical_address_col = col_idx if not all([name_col, path_col, data_type_col, logical_address_col]): log_message(log_path, "ERROR: No se encontraron todas las columnas necesarias en el Excel") return False # Convertir a índices base 0 para openpyxl name_col -= 1 path_col -= 1 data_type_col -= 1 logical_address_col -= 1 # Recorrer todas las filas (excluyendo la primera que es el encabezado) for row_idx, row in enumerate(sheet.iter_rows(min_row=2), 2): name_cell = row[name_col] path_cell = row[path_col] data_type_cell = row[data_type_col] logical_address_cell = row[logical_address_col] tag_name = str(name_cell.value).strip() if name_cell.value else "" path = str(path_cell.value).strip() if path_cell.value else "" data_type = str(data_type_cell.value).strip() if data_type_cell.value else "" # Verificar si el path está en la configuración if path in valid_paths: total_tags += 1 # Verificar si el tag está en el mapeo de IO if tag_name in io_mapping: old_address = logical_address_cell.value new_address = io_mapping[tag_name] logical_address_cell.value = new_address # Determinar el nuevo path basado en la dirección asignada new_path = path # Por defecto, mantener el mismo path # Determinar el nuevo path según la dirección IO asignada if new_address.startswith("%E"): # Buscar el path configurado para entradas for p, info in path_info.items(): if info.get("type") == "Input" and "Not in Hardware" not in p: new_path = p break elif new_address.startswith("%A"): # Buscar el path configurado para salidas for p, info in path_info.items(): if info.get("type") == "Output" and "Not in Hardware" not in p and p != "OutputsFesto": new_path = p break # Actualizar el path si ha cambiado if new_path != path: path_cell.value = new_path relocated_tags[new_path] = relocated_tags.get(new_path, 0) + 1 updated_tags += 1 log_message(log_path, f"Actualizado: {tag_name} | Viejo valor: {old_address} | Nuevo valor: {new_address} | Path: {path_cell.value}") # Si no está en el mapeo, asignar dirección de memoria según configuración else: current_path_info = path_info.get(path, {}) no_used_path = current_path_info.get("no_used_path", "") path_type = current_path_info.get("type", "") # Si tenemos información válida para este path if no_used_path and path_type: # Determinar automáticamente si es entrada o salida is_input = is_input_tag(tag_name) or path_type == "Input" is_output = is_output_tag(tag_name) or path_type == "Output" # Actualizar el path según configuración path_cell.value = no_used_path # Asignar dirección de memoria según el tipo (Input/Output) if path_type == "Input" or (is_input and not is_output): # Asignar dirección de memoria para entradas if is_bit_type(data_type): new_address = f"%M{input_mem_byte}.{input_mem_bit}" input_mem_bit += 1 if input_mem_bit > 7: input_mem_bit = 0 input_mem_byte += 1 else: new_address = f"%MW{input_mem_byte}" input_mem_byte += 2 else: # Tipo Output o no determinado # Asignar dirección de memoria para salidas if is_bit_type(data_type): new_address = f"%M{output_mem_byte}.{output_mem_bit}" output_mem_bit += 1 if output_mem_bit > 7: output_mem_bit = 0 output_mem_byte += 1 else: new_address = f"%MW{output_mem_byte}" output_mem_byte += 2 relocated_tags[no_used_path] = relocated_tags.get(no_used_path, 0) + 1 old_address = logical_address_cell.value logical_address_cell.value = new_address assigned_memory_addresses += 1 log_message(log_path, f"Asignación memoria: {tag_name} | Viejo valor: {old_address} | Nuevo valor: {new_address} | Path: {path_cell.value}") # Guardar el archivo actualizado try: workbook.save(output_path) log_message(log_path, f"Archivo Excel guardado: {output_path}") except Exception as e: log_message(log_path, f"ERROR: No se pudo guardar el archivo Excel: {e}") return False # Mostrar estadísticas log_message(log_path, "\n" + "=" * 30 + " RESUMEN " + "=" * 30) log_message(log_path, f"Total de tags procesados: {total_tags}") log_message(log_path, f"Tags actualizados desde el Markdown: {updated_tags}") for path, count in relocated_tags.items(): if count > 0: log_message(log_path, f"Tags relocalizados a {path}: {count}") log_message(log_path, f"Tags con direcciones de memoria asignadas: {assigned_memory_addresses}") return True def main(): try: # Intentar cargar la configuración para obtener working_directory from backend.script_utils import load_configuration configs = load_configuration() working_directory = configs.get("working_directory") if not working_directory: working_directory = os.getcwd() except: working_directory = os.getcwd() print(f"Usando directorio de trabajo: {working_directory}") # Crear interfaz para seleccionar archivos root = tk.Tk() root.withdraw() # Ocultar ventana principal # Verificar si existe el archivo PLCTags.xlsx predeterminado en working_directory default_excel_path = os.path.join(working_directory, "PLCTags.xlsx") default_md_path = os.path.join(working_directory, "IO Tags consolidated.md") # Pedir al usuario que seleccione los archivos if os.path.exists(default_excel_path): excel_path = default_excel_path print(f"Usando archivo Excel predeterminado: {excel_path}") else: print("Seleccione el archivo Excel exportado de TIA Portal:") excel_path = filedialog.askopenfilename( title="Seleccione el archivo Excel exportado de TIA Portal", filetypes=[("Excel files", "*.xlsx"), ("All files", "*.*")], initialdir=working_directory ) if not excel_path: print("No se seleccionó ningún archivo Excel. Saliendo...") return if os.path.exists(default_md_path): md_path = default_md_path print(f"Usando archivo Markdown predeterminado: {md_path}") else: print("Seleccione el archivo Markdown con la adaptación IO:") md_path = filedialog.askopenfilename( title="Seleccione el archivo Markdown con la adaptación IO", filetypes=[("Markdown files", "*.md"), ("All files", "*.*")], initialdir=working_directory ) if not md_path: print("No se seleccionó ningún archivo Markdown. Saliendo...") return # Determinar la ruta de salida (mismo directorio que el Excel, pero con "_Updated" añadido) excel_dir = os.path.dirname(excel_path) excel_filename = os.path.basename(excel_path) excel_name, excel_ext = os.path.splitext(excel_filename) output_filename = f"{excel_name}_Updated{excel_ext}" output_path = os.path.join(working_directory, output_filename) # Crear archivo de log log_path = create_log_file(working_directory) # Ejecutar el proceso de actualización success = update_plc_tags(excel_path, md_path, output_path, log_path) if success: messagebox.showinfo("Proceso completado", f"La actualización se ha completado con éxito.\n\n" f"Archivo de salida: {output_path}\n\n" f"Archivo de log: {log_path}") else: messagebox.showerror("Error", f"Hubo un error durante el proceso.\n\n" f"Consulte el archivo de log para más detalles: {log_path}") if __name__ == "__main__": main()