diff --git a/create_processor_files.py b/create_processor_files.py new file mode 100644 index 0000000..47cc5d4 --- /dev/null +++ b/create_processor_files.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +import os +import sys +import re +import argparse + +# Directorio donde se crearán los archivos de procesador +PROCESSORS_DIR = "processors" + +# Cabecera estándar para añadir a cada nuevo archivo +FILE_HEADER = """# -*- coding: utf-8 -*- + +# TODO: Import necessary functions from processor_utils +# from .processor_utils import get_scl_representation, format_variable_name, ... +# Or: import processors.processor_utils as utils + +# TODO: Define constants if needed (e.g., SCL_SUFFIX) or import them +SCL_SUFFIX = "_scl" + +# --- Function code starts --- +""" + +# Pie de página estándar con la función get_processor_info de plantilla +def get_file_footer(func_name): + # Intenta adivinar el type_name quitando 'process_' + # Necesitará ajustes manuales para casos como 'call', 'edge_detector', 'comparison', 'math' + type_name_guess = func_name.replace('process_', '') + return f""" +# --- Function code ends --- + +# --- Processor Information Function --- +def get_processor_info(): + \"\"\"Returns the type name and processing function for this module.\"\"\" + # TODO: Adjust the type_name if needed (e.g., for call, edge_detector, comparison, math) + # TODO: Return a list if this module handles multiple types (e.g., PBox/NBox, FC/FB) + type_name = "{type_name_guess}" # Basic guess + return {{'type_name': type_name, 'processor_func': {func_name}}} +""" + +def extract_and_create_processors(source_py_file): + """ + Extracts top-level functions starting with 'process_' from the source file + and creates individual processor files in the PROCESSORS_DIR. + """ + if not os.path.exists(source_py_file): + print(f"Error: Source file not found: '{source_py_file}'") + return + + print(f"Reading source file: '{source_py_file}'") + try: + with open(source_py_file, 'r', encoding='utf-8') as f: + lines = f.readlines() + except Exception as e: + print(f"Error reading source file: {e}") + return + + # Crear directorio de procesadores si no existe + os.makedirs(PROCESSORS_DIR, exist_ok=True) + print(f"Ensuring '{PROCESSORS_DIR}' directory exists.") + + current_func_name = None + current_func_lines = [] + processor_count = 0 + + print("Searching for processor functions (def process_...):") + + # Usamos una expresión regular para encontrar definiciones de función de nivel superior + # que empiecen por 'process_' + func_def_pattern = re.compile(r"^def\s+(process_\w+)\s*\(") + + for i, line in enumerate(lines): + match = func_def_pattern.match(line) + + if match: # Encontrada una nueva definición de función 'process_' + new_func_name = match.group(1) + + # Si estábamos procesando una función anterior, guardarla ahora + if current_func_name: + create_processor_file(current_func_name, current_func_lines) + processor_count += 1 + + # Empezar a recolectar para la nueva función + print(f" - Found: {new_func_name}") + current_func_name = new_func_name + current_func_lines = [line] # Empezar con la línea 'def' + + elif line.strip() == "" and not current_func_lines: + # Ignorar líneas en blanco antes de la primera función + continue + + elif current_func_name and not line.startswith(' '): + # Si estamos dentro de una función y encontramos una línea + # que NO empieza con espacio (y no es un 'def process_'), + # podría ser el fin de la función (o una definición de otra cosa). + # Por simplicidad, asumimos que marca el fin. Guardamos la actual. + # (Esto funciona si las 'def process_' están una tras otra o separadas + # por comentarios o definiciones de funciones NO 'process_') + create_processor_file(current_func_name, current_func_lines) + processor_count += 1 + current_func_name = None + current_func_lines = [] + + + elif current_func_name: + # Si estamos recolectando líneas para una función, añadir la línea actual + current_func_lines.append(line) + + # Guardar la última función encontrada después de salir del bucle + if current_func_name: + create_processor_file(current_func_name, current_func_lines) + processor_count += 1 + + if processor_count == 0: + print("\nWarning: No functions starting with 'process_' found at the top level.") + else: + print(f"\nFinished processing. Attempted to create/check {processor_count} processor files in '{PROCESSORS_DIR}'.") + +def create_processor_file(func_name, func_lines): + """Creates the individual processor file if it doesn't exist.""" + target_filename = f"{func_name}.py" + target_filepath = os.path.join(PROCESSORS_DIR, target_filename) + + if os.path.exists(target_filepath): + print(f" * Skipping: '{target_filename}' already exists.") + return + + print(f" * Creating: '{target_filename}'...") + try: + with open(target_filepath, 'w', encoding='utf-8') as f: + f.write(FILE_HEADER) + f.writelines(func_lines) # Escribir las líneas de la función + f.write(get_file_footer(func_name)) + except Exception as e: + print(f" Error writing file '{target_filename}': {e}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Extracts 'process_*' functions from a source Python file " + "and creates individual processor files." + ) + parser.add_argument( + "source_file", + default="x2_process.py", # Valor por defecto + nargs='?', # Hacerlo opcional para que use el default + help="Path to the source Python file (default: x2_process.py)" + ) + args = parser.parse_args() + + extract_and_create_processors(args.source_file) \ No newline at end of file diff --git a/processors/__init__.py b/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/processors/processor_utils.py b/processors/processor_utils.py new file mode 100644 index 0000000..fb140a0 --- /dev/null +++ b/processors/processor_utils.py @@ -0,0 +1,209 @@ +# -*- coding: utf-8 -*- +# /processors/processor_utils.py +# Procesador de utilidades para el procesamiento de archivos XML de Simatic +import re + +# --- Copia aquí las funciones auxiliares --- +# get_scl_representation, format_variable_name, +# generate_temp_var_name, get_target_scl_name +# Asegúrate de que no dependen de variables globales de x2_process.py +# (como 'data' o 'network_access_maps' - esas se pasarán como argumentos) + +# Ejemplo de una función (asegúrate de copiar todas las necesarias) +def format_variable_name(name): + """Limpia el nombre de la variable para SCL.""" + if not name: + return "_INVALID_NAME_" + if name.startswith('"') and name.endswith('"'): + return name + prefix = "" + if name.startswith("#"): + prefix = "#" + name = name[1:] + if name and name[0].isdigit(): + name = "_" + name + name = re.sub(r"[^a-zA-Z0-9_]", "_", name) + return prefix + name + +# --- Helper Functions --- +# (get_scl_representation, format_variable_name, generate_temp_var_name, get_target_scl_name - sin cambios) +def get_scl_representation(source_info, network_id, scl_map, access_map): + if not source_info: + return None + if isinstance(source_info, list): + scl_parts = [] + all_resolved = True + for sub_source in source_info: + sub_scl = get_scl_representation( + sub_source, network_id, scl_map, access_map + ) + if sub_scl is None: + all_resolved = False + break + if ( + sub_scl in ["TRUE", "FALSE"] + or (sub_scl.startswith('"') and sub_scl.endswith('"')) + or sub_scl.isdigit() + or (sub_scl.startswith("(") and sub_scl.endswith(")")) + ): + scl_parts.append(sub_scl) + else: + scl_parts.append(f"({sub_scl})") + return ( + " OR ".join(scl_parts) + if len(scl_parts) > 1 + else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None + ) + source_type = source_info.get("type") + if source_type == "powerrail": + return "TRUE" + elif source_type == "variable": + name = source_info.get("name") + # Asegurar que los nombres de variables se formatean correctamente aquí también + return ( + format_variable_name(name) + if name + else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_" + ) + elif source_type == "constant": + dtype = str(source_info.get("datatype", "")).upper() + value = source_info.get("value") + try: + if dtype == "BOOL": + return str(value).upper() + elif dtype in [ + "INT", + "DINT", + "SINT", + "USINT", + "UINT", + "UDINT", + "LINT", + "ULINT", + "WORD", + "DWORD", + "LWORD", + "BYTE", + ]: + return str(value) + elif dtype in ["REAL", "LREAL"]: + s_val = str(value) + return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0" + elif dtype == "STRING": + # Escapar comillas simples dentro del string si es necesario + str_val = str(value).replace("'", "''") + return f"'{str_val}'" + elif dtype == "TYPEDCONSTANT": + # Podría necesitar formateo específico basado en el tipo real + return str(value) + else: + # Otros tipos (TIME, DATE, etc.) - devolver como string por ahora + str_val = str(value).replace("'", "''") + return f"'{str_val}'" + except Exception as e: + print(f"Advertencia: Error formateando constante {source_info}: {e}") + return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_" + elif source_type == "connection": + map_key = ( + network_id, + source_info.get("source_instruction_uid"), + source_info.get("source_pin"), + ) + return scl_map.get(map_key) + elif source_type == "unknown_source": + print( + f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}" + ) + return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_" + else: + print(f"Advertencia: Tipo de fuente desconocido: {source_info}") + return f"_ERR_INVALID_SRC_TYPE_" + +def format_variable_name(name): + """Limpia el nombre de la variable para SCL.""" + if not name: + return "_INVALID_NAME_" + + # Si ya está entre comillas dobles, asumimos que es un nombre complejo (ej. "DB"."Variable") + # y lo devolvemos tal cual para SCL. + if name.startswith('"') and name.endswith('"'): + # Podríamos añadir validación extra aquí si fuera necesario + return name + + # Si no tiene comillas, es un nombre simple (ej. Tag_1, #tempVar) + # Reemplazar caracteres no válidos (excepto '_') por '_' + # Permitir '#' al inicio para variables temporales + prefix = "" + if name.startswith("#"): + prefix = "#" + name = name[1:] + + # Permitir letras, números y guiones bajos. Reemplazar el resto. + # Asegurarse de que no empiece con número (después del # si existe) + if name and name[0].isdigit(): + name = "_" + name + # Reemplazar caracteres no válidos + name = re.sub(r"[^a-zA-Z0-9_]", "_", name) + + return prefix + name + +def generate_temp_var_name(network_id, instr_uid, pin_name): + net_id_clean = str(network_id).replace("-", "_") + instr_uid_clean = str(instr_uid).replace("-", "_") + pin_name_clean = str(pin_name).replace("-", "_").lower() + # Usar # para variables temporales SCL estándar + return f"#_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}" + +def get_target_scl_name(instruction, output_pin_name, network_id, default_to_temp=True): + instr_uid = instruction["instruction_uid"] + output_pin_data = instruction["outputs"].get(output_pin_name) + target_scl = None + if ( + output_pin_data + and isinstance(output_pin_data, list) + and len(output_pin_data) == 1 + ): + dest_access = output_pin_data[0] + if dest_access.get("type") == "variable": + target_scl = dest_access.get("name") + if target_scl: + target_scl = format_variable_name(target_scl) # Formatear nombre + else: + print( + f"Error: Var destino {instr_uid}.{output_pin_name} sin nombre (UID: {dest_access.get('uid')}). {'Usando temp.' if default_to_temp else 'Ignorando.'}" + ) + target_scl = ( + generate_temp_var_name(network_id, instr_uid, output_pin_name) + if default_to_temp + else None + ) + elif dest_access.get("type") == "constant": + print( + f"Advertencia: Instr {instr_uid} escribe en const UID {dest_access.get('uid')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}" + ) + target_scl = ( + generate_temp_var_name(network_id, instr_uid, output_pin_name) + if default_to_temp + else None + ) + else: + print( + f"Advertencia: Destino {instr_uid}.{output_pin_name} no es var/const: {dest_access.get('type')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}" + ) + target_scl = ( + generate_temp_var_name(network_id, instr_uid, output_pin_name) + if default_to_temp + else None + ) + elif default_to_temp: + target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) + + # Si target_scl sigue siendo None y no se debe usar temp, devolver None + if target_scl is None and not default_to_temp: + return None + + # Si target_scl es None pero sí se permite temp, generar uno ahora + if target_scl is None and default_to_temp: + target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) + + return target_scl