""" export_logic_from_tia : Script para exportar el software de un PLC desde TIA Portal en archivos XML y SCL. """ import tkinter as tk from tkinter import filedialog import os import sys import traceback import shutil import tempfile from pathlib import Path # Import Path script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration # --- Configuration --- # Supported TIA Portal versions mapping (extension -> version) SUPPORTED_TIA_VERSIONS = { ".ap15": "15.0", ".ap16": "16.0", ".ap17": "17.0", ".ap18": "18.0", ".ap19": "19.0", ".ap20": "20.0", } EXPORT_OPTIONS = None # Use default export options KEEP_FOLDER_STRUCTURE = ( True # Replicate TIA project folder structure in export directory ) # --- TIA Scripting Import Handling --- # Check if the TIA_SCRIPTING environment variable is set if os.getenv("TIA_SCRIPTING"): sys.path.append(os.getenv("TIA_SCRIPTING")) else: # Optional: Define a fallback path if the environment variable isn't set # fallback_path = "C:\\path\\to\\your\\TIA_Scripting_binaries" # if os.path.exists(fallback_path): # sys.path.append(fallback_path) pass # Allow import to fail if not found try: import siemens_tia_scripting as ts EXPORT_OPTIONS = ( ts.Enums.ExportOptions.WithDefaults ) # Set default options now that 'ts' is imported except ImportError: print("ERROR: Failed to import 'siemens_tia_scripting'.") print("Ensure:") print("1. TIA Portal Openness is installed.") print( "2. The 'siemens_tia_scripting' Python module is installed (pip install ...) or" ) print( " the path to its binaries is set in the 'TIA_SCRIPTING' environment variable." ) print( "3. You are using a compatible Python version (e.g., 3.12.X as per documentation)." ) sys.exit(1) except Exception as e: print(f"An unexpected error occurred during import: {e}") traceback.print_exc() sys.exit(1) # --- Functions --- def is_block_exportable(block): """ Checks if a block can be exported based on its programming language. Returns (is_exportable, programming_language, reason) """ try: prog_language = block.get_property(name="ProgrammingLanguage") # List of known unsupported programming languages unsupported_languages = [ "ProDiag_OB", # ProDiag Organization Blocks "ProDiag", # ProDiag Function Blocks "GRAPH", # GRAPH (Sequential Control) ] if prog_language in unsupported_languages: return ( False, prog_language, f"Programming language '{prog_language}' is not supported for export", ) return True, prog_language, "OK" except Exception as e: # If we can't determine the programming language, assume it might be exportable # but warn about it return True, "Unknown", f"Could not determine programming language: {e}" def sanitize_filename(name): """Sanitizes a filename by removing/replacing invalid characters and whitespace.""" import re # Handle specific problematic cases first if name == "I/O access error": return "IO_access_error" elif name == "Time error interrupt": return "Time_error_interrupt" elif name.startswith("I/O_"): return name.replace("I/O_", "IO_").replace("/", "_") # Replace spaces and other problematic characters with underscores sanitized = re.sub(r'[<>:"/\\|?*\s]+', "_", name) # Remove leading/trailing underscores and dots sanitized = sanitized.strip("_.") # Ensure it's not empty if not sanitized: sanitized = "unknown" return sanitized def sanitize_path(path): """Sanitizes a path by ensuring it doesn't contain problematic whitespace.""" # Normalize the path and remove any trailing/leading whitespace normalized = os.path.normpath(path.strip()) return normalized def validate_export_path(path): """Validates that an export path is suitable for TIA Portal.""" if not path: return False, "La ruta está vacía" # Check for problematic characters or patterns if any(char in path for char in '<>"|?*'): return False, f"La ruta contiene caracteres no válidos: {path}" # Check for excessive whitespace if path != path.strip(): return False, f"La ruta contiene espacios al inicio o final: '{path}'" # Check for multiple consecutive spaces if " " in path: return False, f"La ruta contiene espacios múltiples consecutivos: '{path}'" # Check path length (Windows limitation) if len(path) > 250: return False, f"La ruta es demasiado larga ({len(path)} caracteres): {path}" return True, "OK" def create_temp_export_dir(): """Creates a temporary directory for export that doesn't contain spaces.""" # Create a temporary directory with a safe name temp_base = tempfile.gettempdir() temp_export = os.path.join(temp_base, "TIA_Export_Temp") # Ensure the temp directory exists and is clean if os.path.exists(temp_export): shutil.rmtree(temp_export) os.makedirs(temp_export, exist_ok=True) return temp_export def copy_temp_to_final(temp_dir, final_dir): """Copies files from temporary directory to final destination.""" try: print(f"\nCopiando archivos exportados desde directorio temporal...") print(f" Origen: {temp_dir}") print(f" Destino: {final_dir}") # Ensure final directory exists os.makedirs(final_dir, exist_ok=True) # Copy all contents from temp to final directory for item in os.listdir(temp_dir): src_path = os.path.join(temp_dir, item) dst_path = os.path.join(final_dir, item) if os.path.isdir(src_path): if os.path.exists(dst_path): shutil.rmtree(dst_path) shutil.copytree(src_path, dst_path) print(f" Directorio copiado: {item}") else: shutil.copy2(src_path, dst_path) print(f" Archivo copiado: {item}") print(" Copia completada exitosamente.") return True except Exception as e: print(f" ERROR durante la copia: {e}") return False def cleanup_temp_dir(temp_dir): """Cleans up the temporary directory.""" try: if os.path.exists(temp_dir): shutil.rmtree(temp_dir) print(f"Directorio temporal limpiado: {temp_dir}") except Exception as e: print(f"ADVERTENCIA: No se pudo limpiar el directorio temporal {temp_dir}: {e}") def get_supported_filetypes(): """Returns the supported file types for TIA Portal projects.""" filetypes = [] for ext, version in SUPPORTED_TIA_VERSIONS.items(): version_major = version.split(".")[0] filetypes.append((f"TIA Portal V{version_major} Projects", f"*{ext}")) # Add option to show all supported files all_extensions = " ".join([f"*{ext}" for ext in SUPPORTED_TIA_VERSIONS.keys()]) filetypes.insert(0, ("All TIA Portal Projects", all_extensions)) return filetypes def normalize_project_path(project_path): """Normalizes a TIA Portal project path to avoid path-related issues.""" # Convert forward slashes to backslashes for Windows normalized = project_path.replace("/", "\\") # Use os.path.normpath to clean up the path normalized = os.path.normpath(normalized) # Ensure it's an absolute path normalized = os.path.abspath(normalized) return normalized def detect_tia_version(project_file_path): """Detects TIA Portal version based on file extension.""" file_path = Path(project_file_path) file_extension = file_path.suffix.lower() if file_extension in SUPPORTED_TIA_VERSIONS: detected_version = SUPPORTED_TIA_VERSIONS[file_extension] print( f"Versión de TIA Portal detectada: {detected_version} (de la extensión {file_extension})" ) return detected_version else: print( f"ADVERTENCIA: Extensión de archivo no reconocida '{file_extension}'. Extensiones soportadas: {list(SUPPORTED_TIA_VERSIONS.keys())}" ) # Default to version 15.0 for backward compatibility print("Usando por defecto TIA Portal V15.0") return "15.0" def select_project_file(): """Opens a dialog to select a TIA Portal project file.""" root = tk.Tk() root.withdraw() # Hide the main tkinter window file_path = filedialog.askopenfilename( title="Select TIA Portal Project File", filetypes=get_supported_filetypes() ) root.destroy() if not file_path: print("No se seleccionó ningún archivo de proyecto. Saliendo.") sys.exit(0) return file_path def select_export_directory(): """Opens a dialog to select the export directory.""" root = tk.Tk() root.withdraw() # Hide the main tkinter window dir_path = filedialog.askdirectory(title="Select Export Directory") root.destroy() if not dir_path: print("No export directory selected. Exiting.") sys.exit(0) return dir_path def export_plc_data(plc, export_base_dir): """Exports Blocks, UDTs, and Tag Tables from a given PLC.""" plc_name = plc.get_name() plc_name_sanitized = sanitize_filename(plc_name) print(f"\n--- Procesando PLC: {plc_name} ---") if plc_name != plc_name_sanitized: print(f" Nombre sanitizado para directorios: {plc_name_sanitized}") # Define base export path for this PLC plc_export_dir = sanitize_path(os.path.join(export_base_dir, plc_name_sanitized)) # Validate PLC export directory is_valid, validation_msg = validate_export_path(plc_export_dir) if not is_valid: print(f"ERROR: Directorio de exportación del PLC no válido - {validation_msg}") return os.makedirs(plc_export_dir, exist_ok=True) # --- Export Program Blocks --- blocks_exported = 0 blocks_skipped = 0 print(f"\n[PLC: {plc_name}] Exportando bloques de programa...") xml_blocks_path = sanitize_path(os.path.join(plc_export_dir, "ProgramBlocks_XML")) scl_blocks_path = sanitize_path(os.path.join(plc_export_dir, "ProgramBlocks_SCL")) # Validate block export paths xml_valid, xml_msg = validate_export_path(xml_blocks_path) scl_valid, scl_msg = validate_export_path(scl_blocks_path) if not xml_valid: print(f" ERROR: Ruta XML no válida - {xml_msg}") return if not scl_valid: print(f" ERROR: Ruta SCL no válida - {scl_msg}") return os.makedirs(xml_blocks_path, exist_ok=True) os.makedirs(scl_blocks_path, exist_ok=True) print(f" Destino XML: {xml_blocks_path}") print(f" Destino SCL: {scl_blocks_path}") try: program_blocks = plc.get_program_blocks() print(f" Se encontraron {len(program_blocks)} bloques de programa.") for block in program_blocks: block_name = block.get_name() print(f" Procesando bloque: {block_name}...") # Check if block is exportable is_exportable, prog_language, reason = is_block_exportable(block) if not is_exportable: print(f" ADVERTENCIA: {reason}. Omitiendo bloque {block_name}.") blocks_skipped += 1 continue if prog_language == "Unknown": print(f" ADVERTENCIA: {reason}") try: if not block.is_consistent(): print(f" Compilando bloque {block_name}...") block.compile() if not block.is_consistent(): print( f" ADVERTENCIA: Bloque {block_name} inconsistente después de compilar. Omitiendo." ) blocks_skipped += 1 continue print(f" Exportando {block_name} como XML...") try: print(f" Destino: {xml_blocks_path}") # Check if this is a system block that might need special handling is_system_block = any( keyword in block_name.lower() for keyword in [ "interrupt", "error", "startup", "i/o", "rack_flt", "prog_err", "time error", "io access", "createsan", ] ) # Try creating a sanitized filename for problematic blocks if is_system_block or " " in block_name or "/" in block_name: print( f" Detectado bloque con nombre problemático: '{block_name}'" ) # Create a temporary export directory with sanitized name sanitized_block_name = sanitize_filename(block_name) temp_block_dir = os.path.join( xml_blocks_path, sanitized_block_name ) os.makedirs(temp_block_dir, exist_ok=True) print(f" Usando directorio sanitizado: {temp_block_dir}") block.export( target_directory_path=temp_block_dir, export_options=EXPORT_OPTIONS, export_format=ts.Enums.ExportFormats.SimaticML, keep_folder_structure=False, # Disable folder structure for problematic blocks ) # Rename files to use original block name in metadata for file in os.listdir(temp_block_dir): if file.endswith(".xml"): original_path = os.path.join(temp_block_dir, file) # Move file to main directory with original name preserved in content target_path = os.path.join(xml_blocks_path, file) if os.path.exists(target_path): os.remove(target_path) shutil.move(original_path, target_path) # Remove temporary directory if os.path.exists(temp_block_dir): os.rmdir(temp_block_dir) else: # Normal export for regular blocks block.export( target_directory_path=xml_blocks_path, export_options=EXPORT_OPTIONS, export_format=ts.Enums.ExportFormats.SimaticML, keep_folder_structure=KEEP_FOLDER_STRUCTURE, ) except Exception as xml_ex: print( f" ERROR en exportación XML para {block_name}: {xml_ex}" ) print(f" Ruta problemática: '{xml_blocks_path}'") print(f" Tipo de bloque: {type(block).__name__}") print(f" Lenguaje de programación: {prog_language}") # Check if it's a ProDiag related error if "ProDiag" in str( xml_ex ) or "not supported during import and export" in str(xml_ex): print( f" Este bloque usa un lenguaje no soportado para exportación. Omitiendo." ) # Skip this block and continue with others blocks_skipped += 1 continue # If we get here, XML export was successful # Now try SCL export if applicable try: if prog_language == "SCL": print(f" Exportando {block_name} como SCL...") try: print(f" Destino: {scl_blocks_path}") # Use same logic for SCL export is_system_block = any( keyword in block_name.lower() for keyword in [ "interrupt", "error", "startup", "i/o", "rack_flt", "prog_err", "time error", "io access", "createsan", ] ) if ( is_system_block or " " in block_name or "/" in block_name ): sanitized_block_name = sanitize_filename(block_name) temp_block_dir = os.path.join( scl_blocks_path, sanitized_block_name ) os.makedirs(temp_block_dir, exist_ok=True) block.export( target_directory_path=temp_block_dir, export_options=EXPORT_OPTIONS, export_format=ts.Enums.ExportFormats.ExternalSource, keep_folder_structure=False, ) # Move files to main directory for file in os.listdir(temp_block_dir): if file.endswith(".scl"): original_path = os.path.join( temp_block_dir, file ) target_path = os.path.join( scl_blocks_path, file ) if os.path.exists(target_path): os.remove(target_path) shutil.move(original_path, target_path) if os.path.exists(temp_block_dir): os.rmdir(temp_block_dir) else: block.export( target_directory_path=scl_blocks_path, export_options=EXPORT_OPTIONS, export_format=ts.Enums.ExportFormats.ExternalSource, keep_folder_structure=KEEP_FOLDER_STRUCTURE, ) except Exception as scl_ex: print( f" ERROR en exportación SCL para {block_name}: {scl_ex}" ) print(f" Ruta problemática: '{scl_blocks_path}'") # Don't raise, just continue else: print( f" Bloque {block_name} no es SCL (lenguaje: {prog_language}). Omitiendo exportación SCL." ) except Exception as scl_check_ex: print( f" Error verificando lenguaje para exportación SCL: {scl_check_ex}" ) blocks_exported += 1 except Exception as block_ex: print(f" ERROR exportando bloque {block_name}: {block_ex}") blocks_skipped += 1 print( f" Resumen de exportación de bloques: Exportados={blocks_exported}, Omitidos/Errores={blocks_skipped}" ) except Exception as e: print(f" ERROR procesando bloques de programa: {e}") traceback.print_exc() # --- Export PLC Data Types (UDTs) --- udts_exported = 0 udts_skipped = 0 print(f"\n[PLC: {plc_name}] Exportando tipos de datos PLC (UDTs)...") udt_export_path = sanitize_path(os.path.join(plc_export_dir, "PlcDataTypes")) # Validate UDT export path udt_valid, udt_msg = validate_export_path(udt_export_path) if not udt_valid: print(f" ERROR: Ruta UDT no válida - {udt_msg}") return os.makedirs(udt_export_path, exist_ok=True) print(f" Destino: {udt_export_path}") try: udts = plc.get_user_data_types() print(f" Se encontraron {len(udts)} UDTs.") for udt in udts: udt_name = udt.get_name() print(f" Procesando UDT: {udt_name}...") try: if not udt.is_consistent(): print(f" Compilando UDT {udt_name}...") udt.compile() if not udt.is_consistent(): print( f" ADVERTENCIA: UDT {udt_name} inconsistente después de compilar. Omitiendo." ) udts_skipped += 1 continue print(f" Exportando {udt_name}...") try: print(f" Destino: {udt_export_path}") udt.export( target_directory_path=udt_export_path, export_options=EXPORT_OPTIONS, keep_folder_structure=KEEP_FOLDER_STRUCTURE, ) except Exception as udt_export_ex: print( f" ERROR en exportación UDT para {udt_name}: {udt_export_ex}" ) print(f" Ruta problemática: '{udt_export_path}'") raise udt_export_ex udts_exported += 1 except Exception as udt_ex: print(f" ERROR exportando UDT {udt_name}: {udt_ex}") udts_skipped += 1 print( f" Resumen de exportación de UDTs: Exportados={udts_exported}, Omitidos/Errores={udts_skipped}" ) except Exception as e: print(f" ERROR procesando UDTs: {e}") traceback.print_exc() # --- Export PLC Tag Tables --- tags_exported = 0 tags_skipped = 0 print(f"\n[PLC: {plc_name}] Exportando tablas de variables PLC...") tags_export_path = sanitize_path(os.path.join(plc_export_dir, "PlcTags")) # Validate tags export path tags_valid, tags_msg = validate_export_path(tags_export_path) if not tags_valid: print(f" ERROR: Ruta Tags no válida - {tags_msg}") return os.makedirs(tags_export_path, exist_ok=True) print(f" Destino: {tags_export_path}") try: tag_tables = plc.get_plc_tag_tables() print(f" Se encontraron {len(tag_tables)} tablas de variables.") for table in tag_tables: table_name = table.get_name() print(f" Procesando tabla de variables: {table_name}...") try: print(f" Exportando {table_name}...") try: print(f" Destino: {tags_export_path}") table.export( target_directory_path=tags_export_path, export_options=EXPORT_OPTIONS, keep_folder_structure=KEEP_FOLDER_STRUCTURE, ) except Exception as table_export_ex: print( f" ERROR en exportación tabla para {table_name}: {table_export_ex}" ) print(f" Ruta problemática: '{tags_export_path}'") raise table_export_ex tags_exported += 1 except Exception as table_ex: print( f" ERROR exportando tabla de variables {table_name}: {table_ex}" ) tags_skipped += 1 print( f" Resumen de exportación de tablas de variables: Exportados={tags_exported}, Omitidos/Errores={tags_skipped}" ) except Exception as e: print(f" ERROR procesando tablas de variables: {e}") traceback.print_exc() print(f"\n--- Finalizado el procesamiento del PLC: {plc_name} ---") # --- Main Script --- if __name__ == "__main__": configs = load_configuration() working_directory = configs.get("working_directory") print("--- Exportador de datos TIA Portal (Bloques, UDTs, Variables) ---") # Validate working directory if not working_directory or not os.path.isdir(working_directory): print("ERROR: Directorio de trabajo no configurado o inválido.") print( "Por favor configure el directorio de trabajo usando la aplicación principal." ) sys.exit(1) # 1. Select Project File, Export Directory comes from config project_file = select_project_file() # Normalize the project file path to avoid TIA Portal path issues project_file = normalize_project_path(project_file) export_dir = sanitize_path( working_directory ) # Use working directory from config with sanitization # Validate export directory is_valid, validation_msg = validate_export_path(export_dir) if not is_valid: print(f"ERROR: Directorio de exportación no válido - {validation_msg}") sys.exit(1) # 2. Detect TIA Portal version from project file tia_version = detect_tia_version(project_file) print(f"\nProyecto seleccionado: {project_file}") print(f"Usando directorio de exportación (Directorio de trabajo): {export_dir}") portal_instance = None project_object = None try: # 3. Connect to TIA Portal with detected version print(f"\nConectando a TIA Portal V{tia_version}...") portal_instance = ts.open_portal( version=tia_version, portal_mode=ts.Enums.PortalMode.WithGraphicalUserInterface, ) print("Conectado a TIA Portal.") print(f"ID del proceso del Portal: {portal_instance.get_process_id()}") # 4. Open Project print(f"Abriendo proyecto: {os.path.basename(project_file)}...") print(f"Ruta completa del proyecto: {project_file}") try: project_object = portal_instance.open_project( project_file_path=project_file ) except Exception as open_ex: print(f"Error al abrir el proyecto: {open_ex}") print("Intentando obtener proyecto ya abierto...") project_object = None if project_object is None: print( "El proyecto podría estar ya abierto, intentando obtener el manejador..." ) project_object = portal_instance.get_project() if project_object is None: raise Exception("No se pudo abrir u obtener el proyecto especificado.") print("Proyecto abierto exitosamente.") # 5. Get PLCs plcs = project_object.get_plcs() if not plcs: print("No se encontraron dispositivos PLC en el proyecto.") else: print( f"Se encontraron {len(plcs)} PLC(s). Iniciando proceso de exportación..." ) # 6. Iterate and Export Data for each PLC for plc_device in plcs: export_plc_data(plc=plc_device, export_base_dir=export_dir) print("\nProceso de exportación completado.") except ValueError as val_ex: # Handle TIA Portal Openness exceptions (they come as ValueError) if "OpennessAccessException" in str(val_ex): print(f"\nError de TIA Portal Openness: {val_ex}") print("Posibles causas:") print("- El proyecto puede estar corrupto o en un formato incompatible") print( "- El proyecto puede requerir actualización a una versión más reciente" ) print( "- Verificar que la ruta del proyecto no contenga caracteres especiales" ) print("- Asegurarse de que TIA Portal esté instalado correctamente") else: print(f"\nError de valor: {val_ex}") traceback.print_exc() except FileNotFoundError: print(f"\nERROR: Archivo de proyecto no encontrado en {project_file}") except Exception as e: print(f"\nOcurrió un error inesperado: {e}") traceback.print_exc() finally: # 7. Cleanup if portal_instance: try: print("\nCerrando TIA Portal...") portal_instance.close_portal() print("TIA Portal cerrado.") except Exception as close_ex: print(f"Error durante la limpieza de TIA Portal: {close_ex}") print("\nScript finalizado.")