ParamManagerScripts/backend/script_groups/ObtainIOFromProjectTia/x1.py

759 lines
29 KiB
Python

"""
export_logic_from_tia : Script para exportar el software de un PLC desde TIA Portal en archivos XML y SCL.
"""
import tkinter as tk
from tkinter import filedialog
import os
import sys
import traceback
import shutil
import tempfile
from pathlib import Path # Import Path
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Configuration ---
# Supported TIA Portal versions mapping (extension -> version)
SUPPORTED_TIA_VERSIONS = {
".ap15": "15.0",
".ap16": "16.0",
".ap17": "17.0",
".ap18": "18.0",
".ap19": "19.0",
".ap20": "20.0",
}
EXPORT_OPTIONS = None # Use default export options
KEEP_FOLDER_STRUCTURE = (
True # Replicate TIA project folder structure in export directory
)
# --- TIA Scripting Import Handling ---
# Check if the TIA_SCRIPTING environment variable is set
if os.getenv("TIA_SCRIPTING"):
sys.path.append(os.getenv("TIA_SCRIPTING"))
else:
# Optional: Define a fallback path if the environment variable isn't set
# fallback_path = "C:\\path\\to\\your\\TIA_Scripting_binaries"
# if os.path.exists(fallback_path):
# sys.path.append(fallback_path)
pass # Allow import to fail if not found
try:
import siemens_tia_scripting as ts
EXPORT_OPTIONS = (
ts.Enums.ExportOptions.WithDefaults
) # Set default options now that 'ts' is imported
except ImportError:
print("ERROR: Failed to import 'siemens_tia_scripting'.")
print("Ensure:")
print("1. TIA Portal Openness is installed.")
print(
"2. The 'siemens_tia_scripting' Python module is installed (pip install ...) or"
)
print(
" the path to its binaries is set in the 'TIA_SCRIPTING' environment variable."
)
print(
"3. You are using a compatible Python version (e.g., 3.12.X as per documentation)."
)
sys.exit(1)
except Exception as e:
print(f"An unexpected error occurred during import: {e}")
traceback.print_exc()
sys.exit(1)
# --- Functions ---
def is_block_exportable(block):
"""
Checks if a block can be exported based on its programming language.
Returns (is_exportable, programming_language, reason)
"""
try:
prog_language = block.get_property(name="ProgrammingLanguage")
# List of known unsupported programming languages
unsupported_languages = [
"ProDiag_OB", # ProDiag Organization Blocks
"ProDiag", # ProDiag Function Blocks
"GRAPH", # GRAPH (Sequential Control)
]
if prog_language in unsupported_languages:
return (
False,
prog_language,
f"Programming language '{prog_language}' is not supported for export",
)
return True, prog_language, "OK"
except Exception as e:
# If we can't determine the programming language, assume it might be exportable
# but warn about it
return True, "Unknown", f"Could not determine programming language: {e}"
def sanitize_filename(name):
"""Sanitizes a filename by removing/replacing invalid characters and whitespace."""
import re
# Handle specific problematic cases first
if name == "I/O access error":
return "IO_access_error"
elif name == "Time error interrupt":
return "Time_error_interrupt"
elif name.startswith("I/O_"):
return name.replace("I/O_", "IO_").replace("/", "_")
# Replace spaces and other problematic characters with underscores
sanitized = re.sub(r'[<>:"/\\|?*\s]+', "_", name)
# Remove leading/trailing underscores and dots
sanitized = sanitized.strip("_.")
# Ensure it's not empty
if not sanitized:
sanitized = "unknown"
return sanitized
def sanitize_path(path):
"""Sanitizes a path by ensuring it doesn't contain problematic whitespace."""
# Normalize the path and remove any trailing/leading whitespace
normalized = os.path.normpath(path.strip())
return normalized
def validate_export_path(path):
"""Validates that an export path is suitable for TIA Portal."""
if not path:
return False, "La ruta está vacía"
# Check for problematic characters or patterns
if any(char in path for char in '<>"|?*'):
return False, f"La ruta contiene caracteres no válidos: {path}"
# Check for excessive whitespace
if path != path.strip():
return False, f"La ruta contiene espacios al inicio o final: '{path}'"
# Check for multiple consecutive spaces
if " " in path:
return False, f"La ruta contiene espacios múltiples consecutivos: '{path}'"
# Check path length (Windows limitation)
if len(path) > 250:
return False, f"La ruta es demasiado larga ({len(path)} caracteres): {path}"
return True, "OK"
def create_temp_export_dir():
"""Creates a temporary directory for export that doesn't contain spaces."""
# Create a temporary directory with a safe name
temp_base = tempfile.gettempdir()
temp_export = os.path.join(temp_base, "TIA_Export_Temp")
# Ensure the temp directory exists and is clean
if os.path.exists(temp_export):
shutil.rmtree(temp_export)
os.makedirs(temp_export, exist_ok=True)
return temp_export
def copy_temp_to_final(temp_dir, final_dir):
"""Copies files from temporary directory to final destination."""
try:
print(f"\nCopiando archivos exportados desde directorio temporal...")
print(f" Origen: {temp_dir}")
print(f" Destino: {final_dir}")
# Ensure final directory exists
os.makedirs(final_dir, exist_ok=True)
# Copy all contents from temp to final directory
for item in os.listdir(temp_dir):
src_path = os.path.join(temp_dir, item)
dst_path = os.path.join(final_dir, item)
if os.path.isdir(src_path):
if os.path.exists(dst_path):
shutil.rmtree(dst_path)
shutil.copytree(src_path, dst_path)
print(f" Directorio copiado: {item}")
else:
shutil.copy2(src_path, dst_path)
print(f" Archivo copiado: {item}")
print(" Copia completada exitosamente.")
return True
except Exception as e:
print(f" ERROR durante la copia: {e}")
return False
def cleanup_temp_dir(temp_dir):
"""Cleans up the temporary directory."""
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Directorio temporal limpiado: {temp_dir}")
except Exception as e:
print(f"ADVERTENCIA: No se pudo limpiar el directorio temporal {temp_dir}: {e}")
def get_supported_filetypes():
"""Returns the supported file types for TIA Portal projects."""
filetypes = []
for ext, version in SUPPORTED_TIA_VERSIONS.items():
version_major = version.split(".")[0]
filetypes.append((f"TIA Portal V{version_major} Projects", f"*{ext}"))
# Add option to show all supported files
all_extensions = " ".join([f"*{ext}" for ext in SUPPORTED_TIA_VERSIONS.keys()])
filetypes.insert(0, ("All TIA Portal Projects", all_extensions))
return filetypes
def normalize_project_path(project_path):
"""Normalizes a TIA Portal project path to avoid path-related issues."""
# Convert forward slashes to backslashes for Windows
normalized = project_path.replace("/", "\\")
# Use os.path.normpath to clean up the path
normalized = os.path.normpath(normalized)
# Ensure it's an absolute path
normalized = os.path.abspath(normalized)
return normalized
def detect_tia_version(project_file_path):
"""Detects TIA Portal version based on file extension."""
file_path = Path(project_file_path)
file_extension = file_path.suffix.lower()
if file_extension in SUPPORTED_TIA_VERSIONS:
detected_version = SUPPORTED_TIA_VERSIONS[file_extension]
print(
f"Versión de TIA Portal detectada: {detected_version} (de la extensión {file_extension})"
)
return detected_version
else:
print(
f"ADVERTENCIA: Extensión de archivo no reconocida '{file_extension}'. Extensiones soportadas: {list(SUPPORTED_TIA_VERSIONS.keys())}"
)
# Default to version 15.0 for backward compatibility
print("Usando por defecto TIA Portal V15.0")
return "15.0"
def select_project_file():
"""Opens a dialog to select a TIA Portal project file."""
root = tk.Tk()
root.withdraw() # Hide the main tkinter window
file_path = filedialog.askopenfilename(
title="Select TIA Portal Project File", filetypes=get_supported_filetypes()
)
root.destroy()
if not file_path:
print("No se seleccionó ningún archivo de proyecto. Saliendo.")
sys.exit(0)
return file_path
def select_export_directory():
"""Opens a dialog to select the export directory."""
root = tk.Tk()
root.withdraw() # Hide the main tkinter window
dir_path = filedialog.askdirectory(title="Select Export Directory")
root.destroy()
if not dir_path:
print("No export directory selected. Exiting.")
sys.exit(0)
return dir_path
def export_plc_data(plc, export_base_dir):
"""Exports Blocks, UDTs, and Tag Tables from a given PLC."""
plc_name = plc.get_name()
plc_name_sanitized = sanitize_filename(plc_name)
print(f"\n--- Procesando PLC: {plc_name} ---")
if plc_name != plc_name_sanitized:
print(f" Nombre sanitizado para directorios: {plc_name_sanitized}")
# Define base export path for this PLC
plc_export_dir = sanitize_path(os.path.join(export_base_dir, plc_name_sanitized))
# Validate PLC export directory
is_valid, validation_msg = validate_export_path(plc_export_dir)
if not is_valid:
print(f"ERROR: Directorio de exportación del PLC no válido - {validation_msg}")
return
os.makedirs(plc_export_dir, exist_ok=True)
# --- Export Program Blocks ---
blocks_exported = 0
blocks_skipped = 0
print(f"\n[PLC: {plc_name}] Exportando bloques de programa...")
xml_blocks_path = sanitize_path(os.path.join(plc_export_dir, "ProgramBlocks_XML"))
scl_blocks_path = sanitize_path(os.path.join(plc_export_dir, "ProgramBlocks_SCL"))
# Validate block export paths
xml_valid, xml_msg = validate_export_path(xml_blocks_path)
scl_valid, scl_msg = validate_export_path(scl_blocks_path)
if not xml_valid:
print(f" ERROR: Ruta XML no válida - {xml_msg}")
return
if not scl_valid:
print(f" ERROR: Ruta SCL no válida - {scl_msg}")
return
os.makedirs(xml_blocks_path, exist_ok=True)
os.makedirs(scl_blocks_path, exist_ok=True)
print(f" Destino XML: {xml_blocks_path}")
print(f" Destino SCL: {scl_blocks_path}")
try:
program_blocks = plc.get_program_blocks()
print(f" Se encontraron {len(program_blocks)} bloques de programa.")
for block in program_blocks:
block_name = block.get_name()
print(f" Procesando bloque: {block_name}...")
# Check if block is exportable
is_exportable, prog_language, reason = is_block_exportable(block)
if not is_exportable:
print(f" ADVERTENCIA: {reason}. Omitiendo bloque {block_name}.")
blocks_skipped += 1
continue
if prog_language == "Unknown":
print(f" ADVERTENCIA: {reason}")
try:
if not block.is_consistent():
print(f" Compilando bloque {block_name}...")
block.compile()
if not block.is_consistent():
print(
f" ADVERTENCIA: Bloque {block_name} inconsistente después de compilar. Omitiendo."
)
blocks_skipped += 1
continue
print(f" Exportando {block_name} como XML...")
try:
print(f" Destino: {xml_blocks_path}")
# Check if this is a system block that might need special handling
is_system_block = any(
keyword in block_name.lower()
for keyword in [
"interrupt",
"error",
"startup",
"i/o",
"rack_flt",
"prog_err",
"time error",
"io access",
"createsan",
]
)
# Try creating a sanitized filename for problematic blocks
if is_system_block or " " in block_name or "/" in block_name:
print(
f" Detectado bloque con nombre problemático: '{block_name}'"
)
# Create a temporary export directory with sanitized name
sanitized_block_name = sanitize_filename(block_name)
temp_block_dir = os.path.join(
xml_blocks_path, sanitized_block_name
)
os.makedirs(temp_block_dir, exist_ok=True)
print(f" Usando directorio sanitizado: {temp_block_dir}")
block.export(
target_directory_path=temp_block_dir,
export_options=EXPORT_OPTIONS,
export_format=ts.Enums.ExportFormats.SimaticML,
keep_folder_structure=False, # Disable folder structure for problematic blocks
)
# Rename files to use original block name in metadata
for file in os.listdir(temp_block_dir):
if file.endswith(".xml"):
original_path = os.path.join(temp_block_dir, file)
# Move file to main directory with original name preserved in content
target_path = os.path.join(xml_blocks_path, file)
if os.path.exists(target_path):
os.remove(target_path)
shutil.move(original_path, target_path)
# Remove temporary directory
if os.path.exists(temp_block_dir):
os.rmdir(temp_block_dir)
else:
# Normal export for regular blocks
block.export(
target_directory_path=xml_blocks_path,
export_options=EXPORT_OPTIONS,
export_format=ts.Enums.ExportFormats.SimaticML,
keep_folder_structure=KEEP_FOLDER_STRUCTURE,
)
except Exception as xml_ex:
print(
f" ERROR en exportación XML para {block_name}: {xml_ex}"
)
print(f" Ruta problemática: '{xml_blocks_path}'")
print(f" Tipo de bloque: {type(block).__name__}")
print(f" Lenguaje de programación: {prog_language}")
# Check if it's a ProDiag related error
if "ProDiag" in str(
xml_ex
) or "not supported during import and export" in str(xml_ex):
print(
f" Este bloque usa un lenguaje no soportado para exportación. Omitiendo."
)
# Skip this block and continue with others
blocks_skipped += 1
continue
# If we get here, XML export was successful
# Now try SCL export if applicable
try:
if prog_language == "SCL":
print(f" Exportando {block_name} como SCL...")
try:
print(f" Destino: {scl_blocks_path}")
# Use same logic for SCL export
is_system_block = any(
keyword in block_name.lower()
for keyword in [
"interrupt",
"error",
"startup",
"i/o",
"rack_flt",
"prog_err",
"time error",
"io access",
"createsan",
]
)
if (
is_system_block
or " " in block_name
or "/" in block_name
):
sanitized_block_name = sanitize_filename(block_name)
temp_block_dir = os.path.join(
scl_blocks_path, sanitized_block_name
)
os.makedirs(temp_block_dir, exist_ok=True)
block.export(
target_directory_path=temp_block_dir,
export_options=EXPORT_OPTIONS,
export_format=ts.Enums.ExportFormats.ExternalSource,
keep_folder_structure=False,
)
# Move files to main directory
for file in os.listdir(temp_block_dir):
if file.endswith(".scl"):
original_path = os.path.join(
temp_block_dir, file
)
target_path = os.path.join(
scl_blocks_path, file
)
if os.path.exists(target_path):
os.remove(target_path)
shutil.move(original_path, target_path)
if os.path.exists(temp_block_dir):
os.rmdir(temp_block_dir)
else:
block.export(
target_directory_path=scl_blocks_path,
export_options=EXPORT_OPTIONS,
export_format=ts.Enums.ExportFormats.ExternalSource,
keep_folder_structure=KEEP_FOLDER_STRUCTURE,
)
except Exception as scl_ex:
print(
f" ERROR en exportación SCL para {block_name}: {scl_ex}"
)
print(f" Ruta problemática: '{scl_blocks_path}'")
# Don't raise, just continue
else:
print(
f" Bloque {block_name} no es SCL (lenguaje: {prog_language}). Omitiendo exportación SCL."
)
except Exception as scl_check_ex:
print(
f" Error verificando lenguaje para exportación SCL: {scl_check_ex}"
)
blocks_exported += 1
except Exception as block_ex:
print(f" ERROR exportando bloque {block_name}: {block_ex}")
blocks_skipped += 1
print(
f" Resumen de exportación de bloques: Exportados={blocks_exported}, Omitidos/Errores={blocks_skipped}"
)
except Exception as e:
print(f" ERROR procesando bloques de programa: {e}")
traceback.print_exc()
# --- Export PLC Data Types (UDTs) ---
udts_exported = 0
udts_skipped = 0
print(f"\n[PLC: {plc_name}] Exportando tipos de datos PLC (UDTs)...")
udt_export_path = sanitize_path(os.path.join(plc_export_dir, "PlcDataTypes"))
# Validate UDT export path
udt_valid, udt_msg = validate_export_path(udt_export_path)
if not udt_valid:
print(f" ERROR: Ruta UDT no válida - {udt_msg}")
return
os.makedirs(udt_export_path, exist_ok=True)
print(f" Destino: {udt_export_path}")
try:
udts = plc.get_user_data_types()
print(f" Se encontraron {len(udts)} UDTs.")
for udt in udts:
udt_name = udt.get_name()
print(f" Procesando UDT: {udt_name}...")
try:
if not udt.is_consistent():
print(f" Compilando UDT {udt_name}...")
udt.compile()
if not udt.is_consistent():
print(
f" ADVERTENCIA: UDT {udt_name} inconsistente después de compilar. Omitiendo."
)
udts_skipped += 1
continue
print(f" Exportando {udt_name}...")
try:
print(f" Destino: {udt_export_path}")
udt.export(
target_directory_path=udt_export_path,
export_options=EXPORT_OPTIONS,
keep_folder_structure=KEEP_FOLDER_STRUCTURE,
)
except Exception as udt_export_ex:
print(
f" ERROR en exportación UDT para {udt_name}: {udt_export_ex}"
)
print(f" Ruta problemática: '{udt_export_path}'")
raise udt_export_ex
udts_exported += 1
except Exception as udt_ex:
print(f" ERROR exportando UDT {udt_name}: {udt_ex}")
udts_skipped += 1
print(
f" Resumen de exportación de UDTs: Exportados={udts_exported}, Omitidos/Errores={udts_skipped}"
)
except Exception as e:
print(f" ERROR procesando UDTs: {e}")
traceback.print_exc()
# --- Export PLC Tag Tables ---
tags_exported = 0
tags_skipped = 0
print(f"\n[PLC: {plc_name}] Exportando tablas de variables PLC...")
tags_export_path = sanitize_path(os.path.join(plc_export_dir, "PlcTags"))
# Validate tags export path
tags_valid, tags_msg = validate_export_path(tags_export_path)
if not tags_valid:
print(f" ERROR: Ruta Tags no válida - {tags_msg}")
return
os.makedirs(tags_export_path, exist_ok=True)
print(f" Destino: {tags_export_path}")
try:
tag_tables = plc.get_plc_tag_tables()
print(f" Se encontraron {len(tag_tables)} tablas de variables.")
for table in tag_tables:
table_name = table.get_name()
print(f" Procesando tabla de variables: {table_name}...")
try:
print(f" Exportando {table_name}...")
try:
print(f" Destino: {tags_export_path}")
table.export(
target_directory_path=tags_export_path,
export_options=EXPORT_OPTIONS,
keep_folder_structure=KEEP_FOLDER_STRUCTURE,
)
except Exception as table_export_ex:
print(
f" ERROR en exportación tabla para {table_name}: {table_export_ex}"
)
print(f" Ruta problemática: '{tags_export_path}'")
raise table_export_ex
tags_exported += 1
except Exception as table_ex:
print(
f" ERROR exportando tabla de variables {table_name}: {table_ex}"
)
tags_skipped += 1
print(
f" Resumen de exportación de tablas de variables: Exportados={tags_exported}, Omitidos/Errores={tags_skipped}"
)
except Exception as e:
print(f" ERROR procesando tablas de variables: {e}")
traceback.print_exc()
print(f"\n--- Finalizado el procesamiento del PLC: {plc_name} ---")
# --- Main Script ---
if __name__ == "__main__":
configs = load_configuration()
working_directory = configs.get("working_directory")
print("--- Exportador de datos TIA Portal (Bloques, UDTs, Variables) ---")
# Validate working directory
if not working_directory or not os.path.isdir(working_directory):
print("ERROR: Directorio de trabajo no configurado o inválido.")
print(
"Por favor configure el directorio de trabajo usando la aplicación principal."
)
sys.exit(1)
# 1. Select Project File, Export Directory comes from config
project_file = select_project_file()
# Normalize the project file path to avoid TIA Portal path issues
project_file = normalize_project_path(project_file)
export_dir = sanitize_path(
working_directory
) # Use working directory from config with sanitization
# Validate export directory
is_valid, validation_msg = validate_export_path(export_dir)
if not is_valid:
print(f"ERROR: Directorio de exportación no válido - {validation_msg}")
sys.exit(1)
# 2. Detect TIA Portal version from project file
tia_version = detect_tia_version(project_file)
print(f"\nProyecto seleccionado: {project_file}")
print(f"Usando directorio de exportación (Directorio de trabajo): {export_dir}")
portal_instance = None
project_object = None
try:
# 3. Connect to TIA Portal with detected version
print(f"\nConectando a TIA Portal V{tia_version}...")
portal_instance = ts.open_portal(
version=tia_version,
portal_mode=ts.Enums.PortalMode.WithGraphicalUserInterface,
)
print("Conectado a TIA Portal.")
print(f"ID del proceso del Portal: {portal_instance.get_process_id()}")
# 4. Open Project
print(f"Abriendo proyecto: {os.path.basename(project_file)}...")
print(f"Ruta completa del proyecto: {project_file}")
try:
project_object = portal_instance.open_project(
project_file_path=project_file
)
except Exception as open_ex:
print(f"Error al abrir el proyecto: {open_ex}")
print("Intentando obtener proyecto ya abierto...")
project_object = None
if project_object is None:
print(
"El proyecto podría estar ya abierto, intentando obtener el manejador..."
)
project_object = portal_instance.get_project()
if project_object is None:
raise Exception("No se pudo abrir u obtener el proyecto especificado.")
print("Proyecto abierto exitosamente.")
# 5. Get PLCs
plcs = project_object.get_plcs()
if not plcs:
print("No se encontraron dispositivos PLC en el proyecto.")
else:
print(
f"Se encontraron {len(plcs)} PLC(s). Iniciando proceso de exportación..."
)
# 6. Iterate and Export Data for each PLC
for plc_device in plcs:
export_plc_data(plc=plc_device, export_base_dir=export_dir)
print("\nProceso de exportación completado.")
except ValueError as val_ex:
# Handle TIA Portal Openness exceptions (they come as ValueError)
if "OpennessAccessException" in str(val_ex):
print(f"\nError de TIA Portal Openness: {val_ex}")
print("Posibles causas:")
print("- El proyecto puede estar corrupto o en un formato incompatible")
print(
"- El proyecto puede requerir actualización a una versión más reciente"
)
print(
"- Verificar que la ruta del proyecto no contenga caracteres especiales"
)
print("- Asegurarse de que TIA Portal esté instalado correctamente")
else:
print(f"\nError de valor: {val_ex}")
traceback.print_exc()
except FileNotFoundError:
print(f"\nERROR: Archivo de proyecto no encontrado en {project_file}")
except Exception as e:
print(f"\nOcurrió un error inesperado: {e}")
traceback.print_exc()
finally:
# 7. Cleanup
if portal_instance:
try:
print("\nCerrando TIA Portal...")
portal_instance.close_portal()
print("TIA Portal cerrado.")
except Exception as close_ex:
print(f"Error durante la limpieza de TIA Portal: {close_ex}")
print("\nScript finalizado.")