primera etapa de adaptacion a carpeta process

This commit is contained in:
Miguel 2025-04-19 13:21:14 +02:00
parent 27442c88fd
commit e3b4d413c9
3 changed files with 359 additions and 0 deletions

150
create_processor_files.py Normal file
View File

@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
import os
import sys
import re
import argparse
# Directorio donde se crearán los archivos de procesador
PROCESSORS_DIR = "processors"
# Cabecera estándar para añadir a cada nuevo archivo
FILE_HEADER = """# -*- coding: utf-8 -*-
# TODO: Import necessary functions from processor_utils
# from .processor_utils import get_scl_representation, format_variable_name, ...
# Or: import processors.processor_utils as utils
# TODO: Define constants if needed (e.g., SCL_SUFFIX) or import them
SCL_SUFFIX = "_scl"
# --- Function code starts ---
"""
# Pie de página estándar con la función get_processor_info de plantilla
def get_file_footer(func_name):
# Intenta adivinar el type_name quitando 'process_'
# Necesitará ajustes manuales para casos como 'call', 'edge_detector', 'comparison', 'math'
type_name_guess = func_name.replace('process_', '')
return f"""
# --- Function code ends ---
# --- Processor Information Function ---
def get_processor_info():
\"\"\"Returns the type name and processing function for this module.\"\"\"
# TODO: Adjust the type_name if needed (e.g., for call, edge_detector, comparison, math)
# TODO: Return a list if this module handles multiple types (e.g., PBox/NBox, FC/FB)
type_name = "{type_name_guess}" # Basic guess
return {{'type_name': type_name, 'processor_func': {func_name}}}
"""
def extract_and_create_processors(source_py_file):
"""
Extracts top-level functions starting with 'process_' from the source file
and creates individual processor files in the PROCESSORS_DIR.
"""
if not os.path.exists(source_py_file):
print(f"Error: Source file not found: '{source_py_file}'")
return
print(f"Reading source file: '{source_py_file}'")
try:
with open(source_py_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
except Exception as e:
print(f"Error reading source file: {e}")
return
# Crear directorio de procesadores si no existe
os.makedirs(PROCESSORS_DIR, exist_ok=True)
print(f"Ensuring '{PROCESSORS_DIR}' directory exists.")
current_func_name = None
current_func_lines = []
processor_count = 0
print("Searching for processor functions (def process_...):")
# Usamos una expresión regular para encontrar definiciones de función de nivel superior
# que empiecen por 'process_'
func_def_pattern = re.compile(r"^def\s+(process_\w+)\s*\(")
for i, line in enumerate(lines):
match = func_def_pattern.match(line)
if match: # Encontrada una nueva definición de función 'process_'
new_func_name = match.group(1)
# Si estábamos procesando una función anterior, guardarla ahora
if current_func_name:
create_processor_file(current_func_name, current_func_lines)
processor_count += 1
# Empezar a recolectar para la nueva función
print(f" - Found: {new_func_name}")
current_func_name = new_func_name
current_func_lines = [line] # Empezar con la línea 'def'
elif line.strip() == "" and not current_func_lines:
# Ignorar líneas en blanco antes de la primera función
continue
elif current_func_name and not line.startswith(' '):
# Si estamos dentro de una función y encontramos una línea
# que NO empieza con espacio (y no es un 'def process_'),
# podría ser el fin de la función (o una definición de otra cosa).
# Por simplicidad, asumimos que marca el fin. Guardamos la actual.
# (Esto funciona si las 'def process_' están una tras otra o separadas
# por comentarios o definiciones de funciones NO 'process_')
create_processor_file(current_func_name, current_func_lines)
processor_count += 1
current_func_name = None
current_func_lines = []
elif current_func_name:
# Si estamos recolectando líneas para una función, añadir la línea actual
current_func_lines.append(line)
# Guardar la última función encontrada después de salir del bucle
if current_func_name:
create_processor_file(current_func_name, current_func_lines)
processor_count += 1
if processor_count == 0:
print("\nWarning: No functions starting with 'process_' found at the top level.")
else:
print(f"\nFinished processing. Attempted to create/check {processor_count} processor files in '{PROCESSORS_DIR}'.")
def create_processor_file(func_name, func_lines):
"""Creates the individual processor file if it doesn't exist."""
target_filename = f"{func_name}.py"
target_filepath = os.path.join(PROCESSORS_DIR, target_filename)
if os.path.exists(target_filepath):
print(f" * Skipping: '{target_filename}' already exists.")
return
print(f" * Creating: '{target_filename}'...")
try:
with open(target_filepath, 'w', encoding='utf-8') as f:
f.write(FILE_HEADER)
f.writelines(func_lines) # Escribir las líneas de la función
f.write(get_file_footer(func_name))
except Exception as e:
print(f" Error writing file '{target_filename}': {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Extracts 'process_*' functions from a source Python file "
"and creates individual processor files."
)
parser.add_argument(
"source_file",
default="x2_process.py", # Valor por defecto
nargs='?', # Hacerlo opcional para que use el default
help="Path to the source Python file (default: x2_process.py)"
)
args = parser.parse_args()
extract_and_create_processors(args.source_file)

0
processors/__init__.py Normal file
View File

View File

@ -0,0 +1,209 @@
# -*- coding: utf-8 -*-
# /processors/processor_utils.py
# Procesador de utilidades para el procesamiento de archivos XML de Simatic
import re
# --- Copia aquí las funciones auxiliares ---
# get_scl_representation, format_variable_name,
# generate_temp_var_name, get_target_scl_name
# Asegúrate de que no dependen de variables globales de x2_process.py
# (como 'data' o 'network_access_maps' - esas se pasarán como argumentos)
# Ejemplo de una función (asegúrate de copiar todas las necesarias)
def format_variable_name(name):
"""Limpia el nombre de la variable para SCL."""
if not name:
return "_INVALID_NAME_"
if name.startswith('"') and name.endswith('"'):
return name
prefix = ""
if name.startswith("#"):
prefix = "#"
name = name[1:]
if name and name[0].isdigit():
name = "_" + name
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
# --- Helper Functions ---
# (get_scl_representation, format_variable_name, generate_temp_var_name, get_target_scl_name - sin cambios)
def get_scl_representation(source_info, network_id, scl_map, access_map):
if not source_info:
return None
if isinstance(source_info, list):
scl_parts = []
all_resolved = True
for sub_source in source_info:
sub_scl = get_scl_representation(
sub_source, network_id, scl_map, access_map
)
if sub_scl is None:
all_resolved = False
break
if (
sub_scl in ["TRUE", "FALSE"]
or (sub_scl.startswith('"') and sub_scl.endswith('"'))
or sub_scl.isdigit()
or (sub_scl.startswith("(") and sub_scl.endswith(")"))
):
scl_parts.append(sub_scl)
else:
scl_parts.append(f"({sub_scl})")
return (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None
)
source_type = source_info.get("type")
if source_type == "powerrail":
return "TRUE"
elif source_type == "variable":
name = source_info.get("name")
# Asegurar que los nombres de variables se formatean correctamente aquí también
return (
format_variable_name(name)
if name
else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_"
)
elif source_type == "constant":
dtype = str(source_info.get("datatype", "")).upper()
value = source_info.get("value")
try:
if dtype == "BOOL":
return str(value).upper()
elif dtype in [
"INT",
"DINT",
"SINT",
"USINT",
"UINT",
"UDINT",
"LINT",
"ULINT",
"WORD",
"DWORD",
"LWORD",
"BYTE",
]:
return str(value)
elif dtype in ["REAL", "LREAL"]:
s_val = str(value)
return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0"
elif dtype == "STRING":
# Escapar comillas simples dentro del string si es necesario
str_val = str(value).replace("'", "''")
return f"'{str_val}'"
elif dtype == "TYPEDCONSTANT":
# Podría necesitar formateo específico basado en el tipo real
return str(value)
else:
# Otros tipos (TIME, DATE, etc.) - devolver como string por ahora
str_val = str(value).replace("'", "''")
return f"'{str_val}'"
except Exception as e:
print(f"Advertencia: Error formateando constante {source_info}: {e}")
return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
elif source_type == "connection":
map_key = (
network_id,
source_info.get("source_instruction_uid"),
source_info.get("source_pin"),
)
return scl_map.get(map_key)
elif source_type == "unknown_source":
print(
f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}"
)
return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_"
else:
print(f"Advertencia: Tipo de fuente desconocido: {source_info}")
return f"_ERR_INVALID_SRC_TYPE_"
def format_variable_name(name):
"""Limpia el nombre de la variable para SCL."""
if not name:
return "_INVALID_NAME_"
# Si ya está entre comillas dobles, asumimos que es un nombre complejo (ej. "DB"."Variable")
# y lo devolvemos tal cual para SCL.
if name.startswith('"') and name.endswith('"'):
# Podríamos añadir validación extra aquí si fuera necesario
return name
# Si no tiene comillas, es un nombre simple (ej. Tag_1, #tempVar)
# Reemplazar caracteres no válidos (excepto '_') por '_'
# Permitir '#' al inicio para variables temporales
prefix = ""
if name.startswith("#"):
prefix = "#"
name = name[1:]
# Permitir letras, números y guiones bajos. Reemplazar el resto.
# Asegurarse de que no empiece con número (después del # si existe)
if name and name[0].isdigit():
name = "_" + name
# Reemplazar caracteres no válidos
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
def generate_temp_var_name(network_id, instr_uid, pin_name):
net_id_clean = str(network_id).replace("-", "_")
instr_uid_clean = str(instr_uid).replace("-", "_")
pin_name_clean = str(pin_name).replace("-", "_").lower()
# Usar # para variables temporales SCL estándar
return f"#_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
def get_target_scl_name(instruction, output_pin_name, network_id, default_to_temp=True):
instr_uid = instruction["instruction_uid"]
output_pin_data = instruction["outputs"].get(output_pin_name)
target_scl = None
if (
output_pin_data
and isinstance(output_pin_data, list)
and len(output_pin_data) == 1
):
dest_access = output_pin_data[0]
if dest_access.get("type") == "variable":
target_scl = dest_access.get("name")
if target_scl:
target_scl = format_variable_name(target_scl) # Formatear nombre
else:
print(
f"Error: Var destino {instr_uid}.{output_pin_name} sin nombre (UID: {dest_access.get('uid')}). {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
elif dest_access.get("type") == "constant":
print(
f"Advertencia: Instr {instr_uid} escribe en const UID {dest_access.get('uid')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
else:
print(
f"Advertencia: Destino {instr_uid}.{output_pin_name} no es var/const: {dest_access.get('type')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
elif default_to_temp:
target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
# Si target_scl sigue siendo None y no se debe usar temp, devolver None
if target_scl is None and not default_to_temp:
return None
# Si target_scl es None pero sí se permite temp, generar uno ahora
if target_scl is None and default_to_temp:
target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
return target_scl