From f090305574764bc9b5af753721c01c064ce128c1 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 18 Apr 2025 15:56:40 +0200 Subject: [PATCH] Con PBox y O creados --- ...Run_ProdTime_simplified_scl_processed.json | 10 +- process.py | 280 +++++++++++++----- 2 files changed, 210 insertions(+), 80 deletions(-) diff --git a/BlenderRun_ProdTime_simplified_scl_processed.json b/BlenderRun_ProdTime_simplified_scl_processed.json index 2ecd4e2..7457c46 100644 --- a/BlenderRun_ProdTime_simplified_scl_processed.json +++ b/BlenderRun_ProdTime_simplified_scl_processed.json @@ -953,7 +953,7 @@ }, { "instruction_uid": "41", - "type": "PBox", + "type": "PBox_scl", "inputs": { "bit": { "uid": "33", @@ -962,11 +962,12 @@ "name": "\"M19012\"" } }, - "outputs": {} + "outputs": {}, + "scl": "// Edge detection PBox 41 -> P_TRIG_FUNC(CLK := ((\"MOD60\" = DINT#0 AND \"Procedure_Variables\".\"Blender_Run\".\"Running\") AND \"CLK_1.0S\"), M := \"M19012\") (CLK source inferred)" }, { "instruction_uid": "42", - "type": "Coil", + "type": "Coil_scl", "inputs": { "in": { "type": "connection", @@ -981,7 +982,8 @@ "name": "\"mRunMin\"" } }, - "outputs": {} + "outputs": {}, + "scl": "\"mRunMin\" := P_TRIG_FUNC(CLK := ((\"MOD60\" = DINT#0 AND \"Procedure_Variables\".\"Blender_Run\".\"Running\") AND \"CLK_1.0S\"), M := \"M19012\");" } ] }, diff --git a/process.py b/process.py index 28ad502..57cec70 100644 --- a/process.py +++ b/process.py @@ -25,6 +25,7 @@ def get_scl_representation(source_info, network_id, scl_map, access_map): sub_scl = get_scl_representation(sub_source, network_id, scl_map, access_map) if sub_scl is None: all_resolved = False + # print(f"DEBUG: Dependencia no resuelta DENTRO de rama OR: {sub_source}") break # Evitar paréntesis innecesarios si ya es una expresión simple o contenida if sub_scl in ["TRUE", "FALSE"] or (sub_scl.startswith('"') and sub_scl.endswith('"')) or sub_scl.isdigit() or (sub_scl.startswith('(') and sub_scl.endswith(')')): @@ -32,7 +33,9 @@ def get_scl_representation(source_info, network_id, scl_map, access_map): else: scl_parts.append(f"({sub_scl})") # Añadir paréntesis por precaución de precedencia if all_resolved: - return " OR ".join(scl_parts) if len(scl_parts) > 1 else (scl_parts[0] if scl_parts else None) + or_expr = " OR ".join(scl_parts) if len(scl_parts) > 1 else (scl_parts[0] if scl_parts else "FALSE") # Default a FALSE si lista vacía? + # print(f"DEBUG: Rama OR resuelta a: {or_expr}") + return or_expr else: return None # Dependencia en la rama no resuelta @@ -43,7 +46,9 @@ def get_scl_representation(source_info, network_id, scl_map, access_map): return "TRUE" elif source_type == 'variable': - return source_info.get('name', f"_ERR_VAR_{source_info.get('uid')}_") + name = source_info.get('name') + return name if name else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_" + elif source_type == 'constant': dtype = str(source_info.get('datatype', '')).upper() @@ -53,28 +58,38 @@ def get_scl_representation(source_info, network_id, scl_map, access_map): elif dtype in ['INT', 'DINT', 'SINT', 'USINT', 'UINT', 'UDINT', 'LINT', 'ULINT', 'WORD', 'DWORD', 'LWORD', 'BYTE']: return str(value) elif dtype in ['REAL', 'LREAL']: s_val = str(value) - return s_val if '.' in s_val or 'e' in s_val.lower() else s_val + ".0" + # Asegurar formato decimal para SCL, incluso para enteros como 1.0 + if '.' not in s_val and 'e' not in s_val.lower(): + s_val += ".0" + return s_val elif dtype == 'STRING': return f"'{str(value)}'" elif dtype == 'TYPEDCONSTANT': return str(value) # Ej: DINT#60 - else: return f"'{str(value)}'" + else: return f"'{str(value)}'" # Otros tipos como string except Exception as e: print(f"Advertencia: Error formateando constante {source_info}: {e}") return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_" elif source_type == 'connection': map_key = (network_id, source_info.get('source_instruction_uid'), source_info.get('source_pin')) - return scl_map.get(map_key) # Devuelve valor o None si no existe + # print(f"DEBUG: Buscando en scl_map por {map_key}") + result = scl_map.get(map_key) + # if result is not None: print(f"DEBUG: Encontrado: {result}") + # else: print(f"DEBUG: No encontrado.") + return result # Devuelve valor o None si no existe + + elif source_type == 'unknown_source': + print(f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}") + return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_" else: print(f"Advertencia: Tipo de fuente desconocido o inválido: {source_info}") - return f"_ERR_UNKNOWN_SOURCE_" + return f"_ERR_INVALID_SRC_TYPE_" def generate_temp_var_name(network_id, instr_uid, pin_name): """Genera un nombre único para una variable temporal SCL.""" net_id_clean = str(network_id).replace('-', '_') instr_uid_clean = str(instr_uid).replace('-', '_') pin_name_clean = str(pin_name).replace('-', '_').lower() - # Evitar nombres que empiecen con número si network_id es numérico prefix = "_" if str(net_id_clean)[0].isdigit() else "" return f"{prefix}temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}" @@ -88,13 +103,17 @@ def get_target_scl_name(instruction, output_pin_name, network_id, default_to_tem dest_access = output_pin_data[0] if dest_access.get('type') == 'variable': target_scl = dest_access.get('name') + if not target_scl: # Si el nombre es None o vacío + print(f"Error: Variable de destino para {instr_uid}.{output_pin_name} no tiene nombre (UID: {dest_access.get('uid')}). {'Usando temporal.' if default_to_temp else 'Ignorando.'}") + target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) if default_to_temp else None elif dest_access.get('type') == 'constant': print(f"Advertencia: Instrucción {instr_uid} intenta escribir en constante UID {dest_access.get('uid')}. {'Usando temporal.' if default_to_temp else 'Ignorando.'}") - if default_to_temp: target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) + target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) if default_to_temp else None else: - print(f"Advertencia: Destino de {instr_uid}.{output_pin_name} no es variable: {dest_access.get('type')}. {'Usando temporal.' if default_to_temp else 'Ignorando.'}") - if default_to_temp: target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) + print(f"Advertencia: Destino de {instr_uid}.{output_pin_name} no es variable ni constante: {dest_access.get('type')}. {'Usando temporal.' if default_to_temp else 'Ignorando.'}") + target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) if default_to_temp else None elif default_to_temp: + # print(f"DEBUG: Usando temporal para {instr_uid}.{output_pin_name} (no hay destino único o no se requiere destino directo)") target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) return target_scl @@ -399,6 +418,127 @@ def process_move(instruction, network_id, scl_map, access_map): # print(f"INFO: MOVE UID: {instr_uid} procesado. SCL: {scl_final.splitlines()[0]}...") return True +def process_pbox(instruction, network_id, scl_map, access_map, network_logic_list): + """ + Traduce PBox a SCL. Asume P_TRIG (flanco positivo) si detecta uso típico, + si no, pasa el valor del bit directamente. + """ + instr_uid = instruction['instruction_uid'] + instr_type = instruction['type'] + if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False + + print(f"DEBUG: Intentando procesar PBOX - UID: {instr_uid} en Red: {network_id}") + + # 1. Obtener el bit de memoria (siempre presente en PBox para flancos) + mem_bit_input = instruction['inputs'].get('bit') + mem_bit_scl = get_scl_representation(mem_bit_input, network_id, scl_map, access_map) + if mem_bit_scl is None: + print(f"DEBUG: Dependencia no resuelta para PBOX UID: {instr_uid} (bit={mem_bit_scl})") + return False + # Validar que el bit de memoria sea una variable + if not (mem_bit_input and mem_bit_input.get('type') == 'variable'): + print(f"Error: Entrada 'bit' de PBOX UID {instr_uid} no es una variable: {mem_bit_input}") + instruction['scl'] = f"// ERROR: PBox {instr_uid} entrada 'bit' no es variable" + instruction['type'] += "_error" + return True + + # 2. Determinar si es un flanco P_TRIG (heurística basada en patrón común) + # Patrón: La salida del PBox va a un Coil y el PBox está precedido por lógica booleana. + is_likely_p_trig = False + consuming_coil_info = None + # Buscar Coil que consume la salida 'out' de este PBox + consuming_coil_uid = None + for potential_consumer in network_logic_list: + consumer_inputs = potential_consumer.get('inputs', {}) + coil_input_signal = consumer_inputs.get('in') # Bobinas usan 'in' + if isinstance(coil_input_signal, dict) and \ + coil_input_signal.get('type') == 'connection' and \ + coil_input_signal.get('source_instruction_uid') == instr_uid and \ + coil_input_signal.get('source_pin') == 'out': + if potential_consumer.get('type', '').startswith('Coil'): + consuming_coil_uid = potential_consumer['instruction_uid'] + # print(f"DEBUG: PBox {instr_uid} alimenta Coil {consuming_coil_uid}") + is_likely_p_trig = True # Fuerte indicio de P_TRIG + break + # else: # Podría alimentar otra cosa, ¿quizás no es P_TRIG? + # print(f"DEBUG: Salida de PBox {instr_uid} no alimenta un Coil directamente.") + # pass + + # 3. Encontrar la señal CLK implícita (Heurística: buscar RLO hacia atrás) + rlo_scl = None + if is_likely_p_trig: + clk_source_found = False + current_instr_index = -1 + for i, instr in enumerate(network_logic_list): + if instr['instruction_uid'] == instr_uid: + current_instr_index = i + break + + if current_instr_index != -1: + # Buscar hacia atrás desde ANTES del PBox + for i in range(current_instr_index - 1, -1, -1): + prev_instr = network_logic_list[i] + prev_instr_uid = prev_instr['instruction_uid'] + # Usar tipo original para la búsqueda + prev_instr_type = prev_instr.get('type', '').replace(SCL_SUFFIX, '').replace('_error', '') + + # Instrucciones que generan el RLO relevante + if prev_instr_type in ['Contact', 'Eq', 'O', 'PBox', 'And', 'Xor', 'Ne', 'Gt', 'Lt', 'Ge', 'Le']: + map_key_prev_out = (network_id, prev_instr_uid, 'out') + potential_clk_scl = scl_map.get(map_key_prev_out) + if potential_clk_scl is not None: + rlo_scl = potential_clk_scl + clk_source_found = True + # print(f"DEBUG: Fuente CLK para PBox {instr_uid} inferida de: {prev_instr_type} UID {prev_instr_uid} -> {rlo_scl}") + break + elif prev_instr_type in ['Move', 'Add', 'Convert', 'Mod']: + # Si encontramos un bloque funcional, el RLO podría venir de su ENO + map_key_prev_eno = (network_id, prev_instr_uid, 'eno') + potential_clk_scl = scl_map.get(map_key_prev_eno) + if potential_clk_scl is not None: + rlo_scl = potential_clk_scl + clk_source_found = True + # print(f"DEBUG: Fuente CLK para PBox {instr_uid} inferida de ENO de: {prev_instr_type} UID {prev_instr_uid} -> {rlo_scl}") + break + # Si no tiene ENO resuelto, seguimos buscando atrás + + if not clk_source_found: + # Podría estar conectado directamente a powerrail si es el inicio de la lógica + # O la heurística falló. Devolver error o asumir TRUE? Devolver error es más seguro. + print(f"Error: No se pudo inferir la fuente CLK para PBOX UID {instr_uid} (probable P_TRIG).") + instruction['scl'] = f"// ERROR: PBox {instr_uid} sin fuente CLK implícita clara" + instruction['type'] += "_error" + return True # Marcado como error + + # Verificar si el RLO inferido se resolvió + if rlo_scl is None: + print(f"DEBUG: Dependencia CLK (inferida) no resuelta para PBOX UID: {instr_uid}") + return False + + # 4. Generar SCL + scl_comment = "" + if is_likely_p_trig: + # Usar función hipotética P_TRIG_FUNC + clk_signal_formatted = f"({rlo_scl})" if ' ' in rlo_scl else rlo_scl + result_scl = f"P_TRIG_FUNC(CLK := {clk_signal_formatted}, M := {mem_bit_scl})" + scl_comment = f"// Edge detection PBox {instr_uid} -> {result_scl} (CLK source inferred)" + else: + # Si no parece P_TRIG, simplemente pasar el valor del bit de memoria? + # O podría ser un N_TRIG (flanco negativo)? O sólo lectura de M? + # Por seguridad, si no es el patrón P_TRIG->Coil, pasamos el bit. + print(f"Advertencia: PBox UID {instr_uid} no coincide con patrón P_TRIG->Coil. Pasando valor de {mem_bit_scl} directamente.") + result_scl = mem_bit_scl + scl_comment = f"// PBox {instr_uid} - Passing value from bit: {result_scl}" + + # 5. Poner el resultado en scl_map para la salida 'out' + map_key_out = (network_id, instr_uid, 'out') + scl_map[map_key_out] = result_scl + + # 6. Actualizar JSON + instruction['scl'] = scl_comment + instruction['type'] = instr_type + SCL_SUFFIX + return True + def process_o(instruction, network_id, scl_map, access_map): """Traduce O (OR lógico) a una expresión booleana SCL.""" instr_uid = instruction['instruction_uid'] @@ -407,7 +547,6 @@ def process_o(instruction, network_id, scl_map, access_map): # print(f"DEBUG: Intentando procesar O - UID: {instr_uid} en Red: {network_id}") - # Obtener todas las entradas (in1, in2, in3...) input_pins = [pin for pin in instruction['inputs'] if pin.startswith('in')] if not input_pins: print(f"Error: Instrucción O UID {instr_uid} no tiene pines de entrada 'inX'.") @@ -417,14 +556,14 @@ def process_o(instruction, network_id, scl_map, access_map): scl_parts = [] all_resolved = True - for pin in sorted(input_pins): # Ordenar para consistencia + for pin in sorted(input_pins): in_scl = get_scl_representation(instruction['inputs'][pin], network_id, scl_map, access_map) if in_scl is None: all_resolved = False # print(f"DEBUG: Dependencia no resuelta para O UID: {instr_uid} (pin {pin})") break - # Poner entre paréntesis si es complejo - term = f"({in_scl})" if ' ' in in_scl else in_scl + # Poner entre paréntesis si es complejo para seguridad en OR + term = f"({in_scl})" if (' ' in in_scl or 'AND' in in_scl) and not (in_scl.startswith('(') and in_scl.endswith(')')) else in_scl scl_parts.append(term) if not all_resolved: @@ -439,56 +578,12 @@ def process_o(instruction, network_id, scl_map, access_map): instruction['type'] = instr_type + SCL_SUFFIX return True -def process_pbox(instruction, network_id, scl_map, access_map): - """Traduce PBox (lectura de bit, posible flanco) a SCL.""" - instr_uid = instruction['instruction_uid'] - instr_type = instruction['type'] - if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False - - # print(f"DEBUG: Intentando procesar PBOX - UID: {instr_uid} en Red: {network_id}") - - # La entrada relevante es 'bit' - bit_scl = get_scl_representation(instruction['inputs'].get('bit'), network_id, scl_map, access_map) - - if bit_scl is None: - # print(f"DEBUG: Dependencia no resuelta para PBOX UID: {instr_uid} (bit={bit_scl})") - return False - - # Lógica específica de PBox: - # - Si solo lee un bit, la salida es ese bit. - # - Si detecta flanco (P, N), necesita lógica adicional (variable estática) - # TODO: Detectar si es detección de flanco (requiere más info del XML o nombre 'FP'/'FN') - is_edge_detection = False # Asumir que no por ahora - edge_type = '' # 'P' o 'N' - - result_scl = bit_scl # Por defecto, la salida es el bit de entrada - - if is_edge_detection: - # Necesita una variable estática (en TEMP o STAT) para guardar el estado anterior - static_var_name = f"stat_{network_id}_{instr_uid}_Flank" - if edge_type == 'P': - result_scl = f"({bit_scl} AND NOT {static_var_name})" - # La actualización de la variable estática ocurriría al final del ciclo o red - # scl_update = f"{static_var_name} := {bit_scl};" - print(f"Advertencia: Detección de Flanco P (PBox {instr_uid}) requiere manejo de variable estática '{static_var_name}' (no implementado completamente).") - elif edge_type == 'N': - result_scl = f"(NOT {bit_scl} AND {static_var_name})" - # scl_update = f"{static_var_name} := {bit_scl};" - print(f"Advertencia: Detección de Flanco N (PBox {instr_uid}) requiere manejo de variable estática '{static_var_name}' (no implementado completamente).") - else: - result_scl = bit_scl # Volver al caso simple si no se reconoce el tipo - - map_key_out = (network_id, instr_uid, 'out') - scl_map[map_key_out] = result_scl - - instruction['scl'] = f"// PBox {instr_uid} Output: {result_scl}" + (" (Edge detection logic simplified)" if is_edge_detection else "") - instruction['type'] = instr_type + SCL_SUFFIX - return True # --- Bucle Principal de Procesamiento --- def process_json_to_scl(json_filepath): + """Lee el JSON, aplica los procesadores iterativamente y guarda el resultado.""" if not os.path.exists(json_filepath): print(f"Error: Archivo JSON no encontrado en {json_filepath}") return @@ -496,6 +591,8 @@ def process_json_to_scl(json_filepath): print(f"Cargando JSON desde: {json_filepath}") try: with open(json_filepath, 'r', encoding='utf-8') as f: + # Guardar data globalmente o pasarla a process_pbox si es necesario + global data # Hacer data accesible globalmente (o pasar como argumento) data = json.load(f) except Exception as e: print(f"Error al cargar o parsear JSON: {e}") @@ -508,11 +605,14 @@ def process_json_to_scl(json_filepath): net_id = network['id'] current_access_map = {} for instr in network.get('logic', []): + # (Código para reconstruir access_map - sin cambios) + # Chequear inputs for pin, source in instr.get('inputs', {}).items(): sources_to_check = source if isinstance(source, list) else ([source] if isinstance(source, dict) else []) for src in sources_to_check: if isinstance(src, dict) and src.get('uid') and src.get('scope') and src.get('type') in ['variable', 'constant']: current_access_map[src['uid']] = src + # Chequear outputs for pin, dest_list in instr.get('outputs', {}).items(): if isinstance(dest_list, list): for dest in dest_list: @@ -524,24 +624,22 @@ def process_json_to_scl(json_filepath): max_passes = 20 passes = 0 - # Lista de procesadores actualizada + # Lista de procesadores con orden corregido processors = [ - # Instrucciones que generan valores base o condiciones process_convert, process_mod, process_eq, - process_pbox, # Procesa lectura de bit - # Instrucciones que combinan lógica booleana process_contact, - process_o, # Procesa OR - # Instrucciones que usan resultados y condiciones + process_o, + process_pbox, process_add, process_move, process_coil, - # Añadir más procesadores aquí (Sub, Mul, Div, GT, LT, temporizadores, contadores...) ] - + # Crear el mapa para búsqueda rápida processor_map = {func.__name__.split('_')[1].capitalize(): func for func in processors} + # !!! POTENCIAL PROBLEMA 1: La clave aquí sigue siendo sensible a mayúsculas !!! + # Ejemplo: "Pbox" vs "PBox" print("\n--- Iniciando Bucle de Procesamiento Iterativo ---") while passes < max_passes: @@ -549,35 +647,66 @@ def process_json_to_scl(json_filepath): made_change_in_pass = False print(f"\n--- Pase {passes} ---") + # Iterar sobre las redes for network in data.get('networks', []): network_id = network['id'] access_map = network_access_maps.get(network_id, {}) + network_logic = network.get('logic', []) - for instruction in network.get('logic', []): + # Iterar sobre las instrucciones en la red + for instruction in network_logic: instr_type_original = instruction['type'] + # Saltar si ya está procesado o es error if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original: continue - processor_func = processor_map.get(instr_type_original) - if processor_func: + # --- Búsqueda del procesador (CORREGIR ESTA PARTE) --- + processor_func = None + instr_type_lookup = instr_type_original.capitalize() # Capitalizar para buscar en el mapa + # !!! POTENCIAL PROBLEMA 2: Si el tipo es 'O', capitalize() da 'O', pero la clave es 'O'? + # Vamos a usar minúsculas para la búsqueda en el mapa también + instr_type_lower_lookup = instr_type_original.lower() + func_to_call = None + for func_key_cap, func_obj in processor_map.items(): + if func_key_cap.lower() == instr_type_lower_lookup: + func_to_call = func_obj + break + # --- Fin Corrección Búsqueda --- + + # Si encontramos un procesador para este tipo + if func_to_call: + # print(f"DEBUG: Encontrado procesador {func_to_call.__name__} para tipo {instr_type_original}") # Añadir para depurar try: - changed = processor_func(instruction, network_id, scl_map, access_map) + # Pasar lista lógica si es necesario + if func_to_call == process_pbox: + changed = func_to_call(instruction, network_id, scl_map, access_map, network_logic) + else: + changed = func_to_call(instruction, network_id, scl_map, access_map) + if changed: made_change_in_pass = True + # No hacer break aquí, seguir intentando otros procesadores + # en la misma instrucción podría tener sentido en algunos casos? + # Por ahora, SÍ hacemos break para procesar una vez por pase. + # break # <--- Quitar este break si queremos reintentar en el mismo pase? No, mantenerlo. except Exception as e: - print(f"ERROR al ejecutar {processor_func.__name__} en UID {instruction.get('instruction_uid')} Red {network_id}: {e}") + # ... (manejo de errores) ... + print(f"ERROR al ejecutar {func_to_call.__name__} en UID {instruction.get('instruction_uid')} Red {network_id}: {e}") traceback.print_exc() instruction['scl'] = f"// ERROR during processing: {e}" instruction['type'] += "_error" made_change_in_pass = True - # else: # Comentado para reducir ruido - # print(f"DEBUG: No hay procesador para el tipo: {instr_type_original}") + # else: # Reducir ruido + # print(f"DEBUG: No se encontró procesador para el tipo: {instr_type_original}") if not made_change_in_pass: print(f"\n--- No se hicieron cambios en el pase {passes}. Proceso completado. ---") break - elif passes == max_passes: + # else: print(f"DEBUG: Se hicieron cambios en el pase {passes}. Continuando...") + + + if passes == max_passes: print(f"\n--- Límite de {max_passes} pases alcanzado. Puede haber dependencias circulares o lógica no procesada. ---") output_filename = json_filepath.replace('.json', '_scl_processed.json') @@ -591,6 +720,5 @@ def process_json_to_scl(json_filepath): # --- Ejecución --- if __name__ == "__main__": - # Asegúrate de usar el JSON generado por el script anterior (el que tiene comentarios y eno_logic si aplica) input_json_file = 'BlenderRun_ProdTime_simplified.json' process_json_to_scl(input_json_file) \ No newline at end of file