ParamManagerScripts/backend/script_groups/IO_adaptation/x5_md_to_excel.py

930 lines
38 KiB
Python

"""
convert Markdown tables from adapted IO to Excel for import into TIA Portal
Updated to work with paths defined in a shared JSON config file.
"""
import pandas as pd
import openpyxl
import re
import os
import sys
import json
import tkinter as tk
from tkinter import filedialog, messagebox
from datetime import datetime
# Determine script_root and add to sys.path for custom module import
try:
current_script_path = os.path.abspath(__file__)
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(current_script_path)))
)
if script_root not in sys.path:
sys.path.append(script_root)
from backend.script_utils import load_configuration
except ImportError:
print(
"Error: No se pudo importar 'load_configuration' desde 'backend.script_utils'."
)
sys.exit(1)
except NameError: # __file__ is not defined
print(
"Error: __file__ no está definido. Este script podría no estar ejecutándose en un entorno Python estándar."
)
sys.exit(1)
def load_path_config(working_directory=None):
"""
Carga la configuración de paths desde un archivo JSON
Si no existe, crea uno con valores predeterminados
"""
# Si no se proporciona working_directory, usar el directorio actual
if not working_directory:
try:
configs = load_configuration()
working_directory = configs.get("working_directory")
if not working_directory:
print("Error: 'working_directory' no se encontró en la configuración.")
working_directory = os.getcwd()
except:
working_directory = os.getcwd()
# Path para el archivo JSON de configuración
json_config_path = os.path.join(working_directory, "io_paths_config.json")
# Si el archivo existe, cargarlo
if os.path.exists(json_config_path):
try:
with open(json_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"Configuración de paths cargada desde: {json_config_path}")
return config
except Exception as e:
print(f"Error al cargar el archivo de configuración JSON: {e}")
return None
# Si no existe, crear uno con valores predeterminados
default_config = {
"paths": [
{
"path": "Inputs",
"type": "Input",
"no_used_path": "IO Not in Hardware\\InputsMaster"
},
{
"path": "Outputs",
"type": "Output",
"no_used_path": "IO Not in Hardware\\OutputsMaster"
},
{
"path": "OutputsFesto",
"type": "Output",
"no_used_path": "IO Not in Hardware\\OutputsMaster"
},
{
"path": "IO Not in Hardware\\InputsMaster",
"type": "Input",
"no_used_path": "IO Not in Hardware\\InputsMaster"
},
{
"path": "IO Not in Hardware\\OutputsMaster",
"type": "Output",
"no_used_path": "IO Not in Hardware\\OutputsMaster"
}
]
}
try:
with open(json_config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2)
print(f"Archivo de configuración creado: {json_config_path}")
return default_config
except Exception as e:
print(f"Error al crear el archivo de configuración JSON: {e}")
return None
def read_markdown_table(file_path):
"""Leer todas las tablas en formato Markdown que contengan las columnas requeridas y combinarlas en un DataFrame."""
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Dividir el contenido en líneas
lines = content.strip().split('\n')
# Encontrar todas las tablas en el archivo
all_tables = []
i = 0
while i < len(lines):
# Buscar el inicio de una tabla (línea que comienza con '|')
while i < len(lines) and not lines[i].strip().startswith('|'):
i += 1
if i >= len(lines):
break
# Encontrar todas las líneas de esta tabla
table_start = i
table_lines = []
while i < len(lines):
line = lines[i].strip()
if line.startswith('|'):
table_lines.append(line)
i += 1
elif not line: # Línea vacía
i += 1
# Verificar si la siguiente línea también es parte de la tabla
if i < len(lines) and lines[i].strip().startswith('|'):
continue
else:
break
else:
break # Si no comienza con '|' y no está vacía, es el final de la tabla
if len(table_lines) >= 3: # Necesitamos al menos encabezado, separador y una fila de datos
all_tables.append((table_start, table_lines))
print(f"Se encontraron {len(all_tables)} tablas en el archivo")
# Procesar cada tabla y verificar si tiene las columnas requeridas
valid_dataframes = []
for table_idx, (table_start, table_lines) in enumerate(all_tables):
print(f"\nProcesando tabla {table_idx + 1} (línea {table_start + 1})")
# Procesar encabezados
header_line = table_lines[0]
separator_line = table_lines[1] if len(table_lines) > 1 else ""
# Verificar que la segunda línea sea realmente un separador
is_separator = False
if separator_line:
is_separator = all(cell.strip().startswith(':') or cell.strip().startswith('-')
for cell in separator_line.split('|')[1:-1] if cell.strip())
if not is_separator and len(table_lines) > 1:
print(f"Advertencia: La segunda línea no parece ser un separador en tabla {table_idx + 1}. Se asume que es parte de los datos.")
separator_idx = None
else:
separator_idx = 1
# Extraer encabezados
header_cells = header_line.split('|')
# Eliminar elementos vacíos al principio y al final
if not header_cells[0].strip():
header_cells = header_cells[1:]
if not header_cells[-1].strip():
header_cells = header_cells[:-1]
headers = [h.strip() for h in header_cells]
print(f"Encabezados detectados: {headers}")
# Verificar si la tabla tiene las columnas requeridas
has_io_column = False
has_tag_column = False
for header in headers:
header_lower = header.lower()
if header_lower == 'io' or 'address' in header_lower:
has_io_column = True
if ('master' in header_lower and 'tag' in header_lower) or header_lower == 'master' or header_lower == 'tag':
has_tag_column = True
if not (has_io_column and has_tag_column):
print(f"Tabla {table_idx + 1} no tiene las columnas requeridas (IO/address y Master Tag/master/tag). Omitiendo...")
continue
print(f"Tabla {table_idx + 1} tiene las columnas requeridas. Procesando...")
# Procesar filas de datos
data_start_idx = 2 if separator_idx == 1 else 1
data = []
for line in table_lines[data_start_idx:]:
# Dividir la línea por el carácter pipe
cells = line.split('|')
# Eliminar elementos vacíos al principio y al final
if not cells[0].strip():
cells = cells[1:]
if not cells[-1].strip():
cells = cells[:-1]
# Limpiar valores
row_values = [cell.strip() for cell in cells]
# Asegurar que la fila tenga el mismo número de columnas que los encabezados
if len(row_values) != len(headers):
print(f"Advertencia: Fila con {len(row_values)} valores, pero se esperaban {len(headers)}. Ajustando...")
# Intentar ajustar la fila para que coincida con el número de columnas
if len(row_values) < len(headers):
row_values.extend([''] * (len(headers) - len(row_values)))
else:
row_values = row_values[:len(headers)]
data.append(row_values)
# Convertir a DataFrame
if data: # Solo si hay datos
df = pd.DataFrame(data, columns=headers)
valid_dataframes.append(df)
print(f"Tabla {table_idx + 1} procesada exitosamente: {len(df)} filas")
else:
print(f"Tabla {table_idx + 1} no tiene datos. Omitiendo...")
# Combinar todas las tablas válidas
if not valid_dataframes:
print("No se encontraron tablas válidas con las columnas requeridas")
return pd.DataFrame()
print(f"\nCombinando {len(valid_dataframes)} tablas válidas...")
# Si solo hay un DataFrame, devolverlo directamente
if len(valid_dataframes) == 1:
combined_df = valid_dataframes[0]
else:
# Si hay múltiples DataFrames, necesitamos combinarlos
# Primero, estandarizar las columnas para que todas tengan los mismos nombres
standardized_dfs = []
for df in valid_dataframes:
# Crear un DataFrame estandarizado
standardized_df = df.copy()
# Estandarizar nombres de columnas
new_columns = {}
for col in df.columns:
col_lower = col.lower()
if col_lower == 'io' or 'address' in col_lower:
new_columns[col] = 'IO'
elif 'master' in col_lower and 'tag' in col_lower:
new_columns[col] = 'Master Tag'
elif col_lower == 'master' or col_lower == 'tag':
new_columns[col] = 'Master Tag'
standardized_df = standardized_df.rename(columns=new_columns)
standardized_dfs.append(standardized_df)
# Combinar todos los DataFrames
combined_df = pd.concat(standardized_dfs, ignore_index=True)
print(f"Tabla combinada final: {len(combined_df)} filas, {len(combined_df.columns)} columnas")
print(f"Columnas finales: {list(combined_df.columns)}")
return combined_df
def create_log_file(output_dir):
"""Crear un archivo de log con timestamp."""
log_filename = f"update_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
log_path = os.path.join(output_dir, log_filename)
try:
with open(log_path, 'w', encoding='utf-8') as log_file:
log_file.write(f"Log de actualización de PLCTags - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write("=" * 80 + "\n\n")
return log_path
except Exception as e:
print(f"Error al crear el archivo de log: {e}")
# Si hay un error, intentar crear en el directorio actual
fallback_path = os.path.join(os.getcwd(), log_filename)
try:
with open(fallback_path, 'w', encoding='utf-8') as log_file:
log_file.write(f"Log de actualización de PLCTags - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
log_file.write("=" * 80 + "\n\n")
log_file.write(f"ADVERTENCIA: No se pudo crear el log en {output_dir}. Usando directorio alternativo.\n\n")
return fallback_path
except:
print("Error crítico: No se pudo crear ningún archivo de log.")
return None
def log_message(log_path, message):
"""Añadir mensaje al log."""
if log_path:
try:
with open(log_path, 'a', encoding='utf-8') as log_file:
log_file.write(message + "\n")
except Exception as e:
print(f"Error al escribir en el log: {e}")
print(message)
def get_io_address_format(address, data_type):
"""
Determinar el formato correcto de dirección IO según el tipo de datos.
Args:
address: Dirección base (ej: "I0.0", "Q0.0", "PEW0", etc.)
data_type: Tipo de datos (Bool, Word, Real, etc.)
Returns:
str: Dirección formateada según el tipo de datos
"""
if not address or not isinstance(address, str):
return address
address = address.strip()
data_type_lower = data_type.lower() if data_type else ""
# Handle Profibus ranges (extract the first number before the range)
profibus_match = re.match(r'^(EW|AW)\s+(\d+)\.\..*$', address)
if profibus_match:
prefix, number = profibus_match.groups()
if prefix == 'EW':
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%ED{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%EB{number}"
else:
return f"%EW{number}" # Word por defecto
elif prefix == 'AW':
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%AD{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%AB{number}"
else:
return f"%AW{number}" # Word por defecto
# Handle simple AW/EW with space (without range)
simple_profibus_match = re.match(r'^(EW|AW)\s+(\d+)$', address)
if simple_profibus_match:
prefix, number = simple_profibus_match.groups()
if prefix == 'EW':
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%ED{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%EB{number}"
else:
return f"%EW{number}" # Word por defecto
elif prefix == 'AW':
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%AD{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%AB{number}"
else:
return f"%AW{number}" # Word por defecto
# Extraer números de las direcciones para determinar el formato según tipo de datos
# Patterns for boolean addresses (mantener formato de bit)
if re.match(r'^[IEQ](\d+)\.(\d+)$', address):
byte_num = re.match(r'^[IEQ](\d+)\.(\d+)$', address).group(1)
bit_num = re.match(r'^[IEQ](\d+)\.(\d+)$', address).group(2)
if address.startswith(('I', 'E')):
if data_type_lower == 'bool':
return f"%E{byte_num}.{bit_num}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%EB{byte_num}"
elif data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%ED{byte_num}"
else:
return f"%EW{byte_num}" # Word por defecto
elif address.startswith(('Q', 'A')):
if data_type_lower == 'bool':
return f"%A{byte_num}.{bit_num}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%AB{byte_num}"
elif data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%AD{byte_num}"
else:
return f"%AW{byte_num}" # Word por defecto
# Patterns for word addresses
elif re.match(r'^PEW(\d+)$', address):
number = re.match(r'^PEW(\d+)$', address).group(1)
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%ED{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%EB{number}"
else:
return f"%EW{number}" # Word por defecto
elif re.match(r'^PAW(\d+)$', address):
number = re.match(r'^PAW(\d+)$', address).group(1)
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%AD{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%AB{number}"
else:
return f"%AW{number}" # Word por defecto
# Handle addresses that already have % prefix but wrong data type format
elif re.match(r'^%[AE][BWDL](\d+)$', address):
match = re.match(r'^%([AE])([BWDL])(\d+)$', address)
if match:
prefix, current_type, number = match.groups()
if prefix == 'E': # Input
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%ED{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%EB{number}"
else:
return f"%EW{number}" # Word por defecto
elif prefix == 'A': # Output
if data_type_lower in ['dword', 'udint', 'dint', 'real']:
return f"%AD{number}"
elif data_type_lower in ['byte', 'usint', 'sint']:
return f"%AB{number}"
else:
return f"%AW{number}" # Word por defecto
# Si ya está en formato correcto o formato desconocido, devolver tal como está
return address
def transform_io_address(address, data_type=None):
"""
Transform IO addresses according to the required format, considering data type:
For Bool:
- Ixx.x → %Exx.x
- Qxx.x → %Axx.x
For Byte:
- Ixx.x → %EBxx
- Qxx.x → %ABxx
For Word:
- Ixx.x → %EWxx
- Qxx.x → %AWxx
- PEWxx → %EWxx
- PAWxx → %AWxx
For DWord/Real:
- Ixx.x → %EDxx
- Qxx.x → %ADxx
- PEWxx → %EDxx
- PAWxx → %ADxx
"""
if data_type:
return get_io_address_format(address, data_type)
# Fallback a la lógica anterior si no se proporciona data_type
if not address or not isinstance(address, str):
return address
address = address.strip()
# Handle Profibus ranges (extract the first number before the range)
profibus_match = re.match(r'^(EW|AW)\s+(\d+)\.\..*$', address)
if profibus_match:
prefix, number = profibus_match.groups()
if prefix == 'EW':
return f"%EW{number}"
elif prefix == 'AW':
return f"%AW{number}"
# Patterns for boolean addresses
if re.match(r'^I(\d+)\.(\d+)$', address):
return re.sub(r'^I(\d+)\.(\d+)$', r'%E\1.\2', address)
elif re.match(r'^E(\d+)\.(\d+)$', address):
return re.sub(r'^E(\d+)\.(\d+)$', r'%E\1.\2', address)
elif re.match(r'^Q(\d+)\.(\d+)$', address):
return re.sub(r'^Q(\d+)\.(\d+)$', r'%A\1.\2', address)
elif re.match(r'^A(\d+)\.(\d+)$', address):
return re.sub(r'^A(\d+)\.(\d+)$', r'%A\1.\2', address)
# Patterns for word addresses
elif re.match(r'^PEW(\d+)$', address):
return re.sub(r'^PEW(\d+)$', r'%EW\1', address)
elif re.match(r'^PAW(\d+)$', address):
return re.sub(r'^PAW(\d+)$', r'%AW\1', address)
# If already in correct format or unknown format, return as is
return address
def is_input_tag(tag_name):
"""Determinar si un tag es de entrada basado en su nombre."""
input_prefixes = ['DI_', 'AI_', 'P_AI_', 'P_FT', 'P_CT', 'P_PT', 'P_TT', 'P_g', 'P_PDS_']
for prefix in input_prefixes:
if tag_name.startswith(prefix):
# Excepciones para P_g que pueden ser outputs
if tag_name.startswith('P_g') and ('_VFC_ControlWord' in tag_name or '_Refvalue' in tag_name):
return False
return True
return False
def is_output_tag(tag_name):
"""Determinar si un tag es de salida basado en su nombre."""
output_prefixes = ['DO_', 'AO_', 'P_AO_', 'P_g', 'MaselliHold', 'MaselliSpare']
for prefix in output_prefixes:
if tag_name.startswith(prefix):
return True
# Si comienza con P_g, revisar si tiene '_VFC_ControlWord' o 'Refvalue' que son outputs
if tag_name.startswith('P_g') and ('_VFC_ControlWord' in tag_name or '_VFC_Refvalue' in tag_name):
return True
# Si comienza con P_PDS_, revisar si son outputs específicos
if tag_name.startswith('P_PDS_') and ('_Recipe_Number' in tag_name or '_Freeze_To_PDS' in tag_name or '_Stop_to_PDS' in tag_name):
return True
return False
def is_bit_type(data_type):
"""Determinar si el tipo de dato es un bit (Bool)."""
return data_type.lower() == 'bool'
def get_memory_address_format(data_type, mem_byte, mem_bit=None):
"""
Determinar el formato de dirección de memoria y el incremento de bytes según el tipo de datos.
Returns:
tuple: (address_format, byte_increment, new_bit)
"""
data_type_lower = data_type.lower()
if data_type_lower == 'bool':
# Para Bool usar formato de bit
if mem_bit is None:
mem_bit = 0
address = f"%M{mem_byte}.{mem_bit}"
new_bit = mem_bit + 1
if new_bit > 7:
new_bit = 0
byte_increment = 1
else:
byte_increment = 0
return address, byte_increment, new_bit
elif data_type_lower in ['byte', 'usint', 'sint']:
# Para Byte usar %MB (1 byte)
address = f"%MB{mem_byte}"
return address, 1, None
elif data_type_lower in ['word', 'uint', 'int']:
# Para Word usar %MW (2 bytes)
address = f"%MW{mem_byte}"
return address, 2, None
elif data_type_lower in ['dword', 'udint', 'dint']:
# Para DWord usar %MD (4 bytes)
address = f"%MD{mem_byte}"
return address, 4, None
elif data_type_lower in ['real', 'lreal']:
# Para Real usar %MD (4 bytes), para LReal usar %ML (8 bytes)
if data_type_lower == 'lreal':
address = f"%ML{mem_byte}"
return address, 8, None
else:
address = f"%MD{mem_byte}"
return address, 4, None
elif data_type_lower in ['time', 'time_of_day', 's5time']:
# Para tipos de tiempo usar %MD (4 bytes)
address = f"%MD{mem_byte}"
return address, 4, None
else:
# Para tipos desconocidos, usar %MW por defecto (2 bytes)
address = f"%MW{mem_byte}"
return address, 2, None
def update_plc_tags(excel_path, md_path, output_path, log_path):
"""
Actualiza el archivo Excel con la información del archivo Markdown.
Args:
excel_path: Ruta al archivo Excel exportado de TIA Portal
md_path: Ruta al archivo Markdown con la adaptación IO
output_path: Ruta para guardar el Excel actualizado
log_path: Ruta para el archivo de log
"""
log_message(log_path, f"Iniciando proceso de actualización")
log_message(log_path, f"Archivo Excel de entrada: {excel_path}")
log_message(log_path, f"Archivo Markdown de entrada: {md_path}")
log_message(log_path, f"Archivo Excel de salida: {output_path}")
log_message(log_path, "-" * 80)
# Cargar configuración de paths
excel_dir = os.path.dirname(excel_path)
path_config = load_path_config(excel_dir)
if not path_config:
log_message(log_path, "ERROR: No se pudo cargar la configuración de paths")
return False
# Extraer información de paths desde la configuración
path_info = {}
for path_entry in path_config["paths"]:
path_info[path_entry["path"]] = {
"type": path_entry["type"],
"no_used_path": path_entry["no_used_path"]
}
log_message(log_path, f"Configuración de paths cargada:")
for path, info in path_info.items():
log_message(log_path, f" - {path}: tipo={info['type']}, no_used_path={info['no_used_path']}")
# Leer el archivo Markdown
md_df = read_markdown_table(md_path)
# Identificar las columnas relevantes en el archivo Markdown
io_col = None
master_tag_col = None
for col in md_df.columns:
col_lower = col.lower()
if col_lower == 'io' or 'address' in col_lower:
io_col = col
elif ('master' in col_lower and 'tag' in col_lower) or col_lower == 'master' or col_lower == 'tag':
master_tag_col = col
if not io_col or not master_tag_col:
log_message(log_path, "ERROR: No se pudieron identificar las columnas necesarias en el archivo Markdown")
log_message(log_path, f"Columnas disponibles: {list(md_df.columns)}")
return False
log_message(log_path, f"Columna IO: {io_col}")
log_message(log_path, f"Columna Master Tag: {master_tag_col}")
# Crear un diccionario de mapeo IO desde el Markdown (sin transformar aún)
io_mapping = {}
for _, row in md_df.iterrows():
master_tag = str(row[master_tag_col]).strip()
io_value = str(row[io_col]).strip()
if master_tag and io_value and master_tag != 'nan' and io_value != 'nan':
io_mapping[master_tag] = io_value # Guardar la dirección sin transformar
log_message(log_path, f"Tags mapeados en el archivo Markdown: {len(io_mapping)}")
# Cargar el archivo Excel
try:
# Usar openpyxl para mantener la estructura del Excel
workbook = openpyxl.load_workbook(excel_path)
log_message(log_path, f"Archivo Excel cargado: {excel_path}")
log_message(log_path, f"Hojas disponibles: {workbook.sheetnames}")
except Exception as e:
log_message(log_path, f"ERROR: No se pudo cargar el archivo Excel: {e}")
return False
# Inicializar contadores para direcciones de memoria
input_mem_byte = 3600
input_mem_bit = 0
output_mem_byte = 3900
output_mem_bit = 0
# Validar configuración inicial de memoria
if output_mem_byte <= input_mem_byte:
error_msg = (f"ERROR: Configuración de memoria inválida. "
f"output_mem_byte ({output_mem_byte}) debe ser mayor que input_mem_byte ({input_mem_byte}). "
f"Ajuste los valores iniciales de memoria.")
log_message(log_path, error_msg)
return False
log_message(log_path, f"Configuración de memoria: input_mem_byte={input_mem_byte}, output_mem_byte={output_mem_byte}")
log_message(log_path, f"Espacio disponible para inputs: {output_mem_byte - input_mem_byte} bytes")
# Estadísticas
total_tags = 0
updated_tags = 0
relocated_tags = {}
assigned_memory_addresses = 0
# Inicializar el contador para cada tipo de relocalización
for path in path_info.keys():
relocated_tags[path] = 0
# Lista de paths válidos desde la configuración
valid_paths = list(path_info.keys())
# Procesamos la hoja principal (asumimos que es la primera)
if len(workbook.sheetnames) > 0:
sheet = workbook[workbook.sheetnames[0]]
# Encontrar las columnas relevantes
name_col = None
path_col = None
data_type_col = None
logical_address_col = None
for col_idx, cell in enumerate(sheet[1], 1):
cell_value = str(cell.value).strip() if cell.value else ""
if cell_value.lower() == "name":
name_col = col_idx
elif cell_value.lower() == "path":
path_col = col_idx
elif cell_value.lower() == "data type":
data_type_col = col_idx
elif cell_value.lower() == "logical address":
logical_address_col = col_idx
if not all([name_col, path_col, data_type_col, logical_address_col]):
log_message(log_path, "ERROR: No se encontraron todas las columnas necesarias en el Excel")
return False
# Convertir a índices base 0 para openpyxl
name_col -= 1
path_col -= 1
data_type_col -= 1
logical_address_col -= 1
# Recorrer todas las filas (excluyendo la primera que es el encabezado)
for row_idx, row in enumerate(sheet.iter_rows(min_row=2), 2):
name_cell = row[name_col]
path_cell = row[path_col]
data_type_cell = row[data_type_col]
logical_address_cell = row[logical_address_col]
tag_name = str(name_cell.value).strip() if name_cell.value else ""
path = str(path_cell.value).strip() if path_cell.value else ""
data_type = str(data_type_cell.value).strip() if data_type_cell.value else ""
# Verificar si el path está en la configuración
if path in valid_paths:
total_tags += 1
# Verificar si el tag está en el mapeo de IO
if tag_name in io_mapping:
old_address = logical_address_cell.value
io_raw_address = io_mapping[tag_name]
# Transformar la dirección considerando el tipo de datos
new_address = transform_io_address(io_raw_address, data_type)
logical_address_cell.value = new_address
# Determinar el nuevo path basado en la dirección asignada
new_path = path # Por defecto, mantener el mismo path
# Determinar el nuevo path según la dirección IO asignada
if new_address.startswith("%E"):
# Buscar el path configurado para entradas
for p, info in path_info.items():
if info.get("type") == "Input" and "Not in Hardware" not in p:
new_path = p
break
elif new_address.startswith("%A"):
# Buscar el path configurado para salidas
for p, info in path_info.items():
if info.get("type") == "Output" and "Not in Hardware" not in p and p != "OutputsFesto":
new_path = p
break
# Actualizar el path si ha cambiado
if new_path != path:
path_cell.value = new_path
relocated_tags[new_path] = relocated_tags.get(new_path, 0) + 1
updated_tags += 1
log_message(log_path, f"Actualizado: {tag_name} | Tipo: {data_type} | Viejo valor: {old_address} | Nuevo valor: {new_address} | Path: {path_cell.value}")
# Si no está en el mapeo, asignar dirección de memoria según configuración
else:
current_path_info = path_info.get(path, {})
no_used_path = current_path_info.get("no_used_path", "")
path_type = current_path_info.get("type", "")
# Si tenemos información válida para este path
if no_used_path and path_type:
# Determinar automáticamente si es entrada o salida
is_input = is_input_tag(tag_name) or path_type == "Input"
is_output = is_output_tag(tag_name) or path_type == "Output"
# Actualizar el path según configuración
path_cell.value = no_used_path
# Asignar dirección de memoria según el tipo (Input/Output)
if path_type == "Input" or (is_input and not is_output):
# Verificar que no se exceda el límite de memoria de entrada
new_address, byte_increment, new_bit = get_memory_address_format(
data_type, input_mem_byte, input_mem_bit
)
# Verificar límites antes de asignar
new_input_mem_byte = input_mem_byte + byte_increment
if new_input_mem_byte >= output_mem_byte:
error_msg = (f"ERROR: input_mem_byte ({new_input_mem_byte}) excedería output_mem_byte ({output_mem_byte}). "
f"La memoria de entrada está demasiado cerca de la memoria de salida. "
f"Tag problemático: {tag_name}. "
f"Considere aumentar output_mem_byte o reducir el número de tags de entrada.")
log_message(log_path, error_msg)
return False
if is_bit_type(data_type):
input_mem_bit = new_bit
input_mem_byte = new_input_mem_byte
else: # Tipo Output o no determinado
# Asignar dirección de memoria para salidas
new_address, byte_increment, new_bit = get_memory_address_format(
data_type, output_mem_byte, output_mem_bit
)
if is_bit_type(data_type):
output_mem_bit = new_bit
output_mem_byte += byte_increment
relocated_tags[no_used_path] = relocated_tags.get(no_used_path, 0) + 1
old_address = logical_address_cell.value
logical_address_cell.value = new_address
assigned_memory_addresses += 1
log_message(log_path, f"Asignación memoria: {tag_name} | Tipo: {data_type} | Viejo valor: {old_address} | Nuevo valor: {new_address} | Path: {path_cell.value}")
# Guardar el archivo actualizado
try:
workbook.save(output_path)
log_message(log_path, f"Archivo Excel guardado: {output_path}")
except Exception as e:
log_message(log_path, f"ERROR: No se pudo guardar el archivo Excel: {e}")
return False
# Mostrar estadísticas
log_message(log_path, "\n" + "=" * 30 + " RESUMEN " + "=" * 30)
log_message(log_path, f"Total de tags procesados: {total_tags}")
log_message(log_path, f"Tags actualizados desde el Markdown: {updated_tags}")
for path, count in relocated_tags.items():
if count > 0:
log_message(log_path, f"Tags relocalizados a {path}: {count}")
log_message(log_path, f"Tags con direcciones de memoria asignadas: {assigned_memory_addresses}")
# Mostrar uso final de memoria
log_message(log_path, "\n" + "=" * 25 + " USO DE MEMORIA " + "=" * 25)
log_message(log_path, f"Memoria de entrada final: {input_mem_byte} (inicio: 3600)")
log_message(log_path, f"Memoria de salida final: {output_mem_byte} (inicio: 3900)")
log_message(log_path, f"Bytes usados para entradas: {input_mem_byte - 3600}")
log_message(log_path, f"Bytes usados para salidas: {output_mem_byte - 3900}")
log_message(log_path, f"Espacio restante entre memorias: {3900 - input_mem_byte} bytes")
if input_mem_byte > 3600 or output_mem_byte > 3900:
log_message(log_path, f"✅ Proceso completado exitosamente sin conflictos de memoria")
return True
def main():
try:
# Intentar cargar la configuración para obtener working_directory
from backend.script_utils import load_configuration
configs = load_configuration()
working_directory = configs.get("working_directory")
if not working_directory:
working_directory = os.getcwd()
except:
working_directory = os.getcwd()
print(f"Usando directorio de trabajo: {working_directory}")
# Crear interfaz para seleccionar archivos
root = tk.Tk()
root.withdraw() # Ocultar ventana principal
# Verificar si existe el archivo PLCTags.xlsx predeterminado en working_directory
default_excel_path = os.path.join(working_directory, "PLCTags.xlsx")
default_md_path = os.path.join(working_directory, "IO Tags consolidated.md")
# Pedir al usuario que seleccione los archivos
if os.path.exists(default_excel_path):
excel_path = default_excel_path
print(f"Usando archivo Excel predeterminado: {excel_path}")
else:
print("Seleccione el archivo Excel exportado de TIA Portal:")
excel_path = filedialog.askopenfilename(
title="Seleccione el archivo Excel exportado de TIA Portal",
filetypes=[("Excel files", "*.xlsx"), ("All files", "*.*")],
initialdir=working_directory
)
if not excel_path:
print("No se seleccionó ningún archivo Excel. Saliendo...")
return
if os.path.exists(default_md_path):
md_path = default_md_path
print(f"Usando archivo Markdown predeterminado: {md_path}")
else:
print("Seleccione el archivo Markdown con la adaptación IO:")
md_path = filedialog.askopenfilename(
title="Seleccione el archivo Markdown con la adaptación IO",
filetypes=[("Markdown files", "*.md"), ("All files", "*.*")],
initialdir=working_directory
)
if not md_path:
print("No se seleccionó ningún archivo Markdown. Saliendo...")
return
# Determinar la ruta de salida (mismo directorio que el Excel, pero con "_Updated" añadido)
excel_dir = os.path.dirname(excel_path)
excel_filename = os.path.basename(excel_path)
excel_name, excel_ext = os.path.splitext(excel_filename)
output_filename = f"{excel_name}_Updated{excel_ext}"
output_path = os.path.join(working_directory, output_filename)
# Crear archivo de log
log_path = create_log_file(working_directory)
# Ejecutar el proceso de actualización
success = update_plc_tags(excel_path, md_path, output_path, log_path)
if success:
messagebox.showinfo("Proceso completado",
f"La actualización se ha completado con éxito.\n\n"
f"Archivo de salida: {output_path}\n\n"
f"Archivo de log: {log_path}")
else:
messagebox.showerror("Error",
f"Hubo un error durante el proceso.\n\n"
f"Consulte el archivo de log para más detalles: {log_path}")
if __name__ == "__main__":
main()