# --- x3_refactored.py --- import re import json from dataclasses import dataclass, field from typing import List, Dict, Optional, Union, Tuple, Any import os import glob import copy import sys script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration def find_working_directory(): configs = load_configuration() working_directory = configs.get("working_directory") if not working_directory: print("No working directory specified in the configuration file.") sys.exit(1) return working_directory # --- Estructuras de Datos --- @dataclass class ArrayDimension: lower_bound: int upper_bound: int @property def count(self) -> int: return self.upper_bound - self.lower_bound + 1 @dataclass class VariableInfo: name: str data_type: str byte_offset: float size_in_bytes: int bit_size: int = 0 udt_source_name: Optional[str] = None string_length: Optional[int] = None array_dimensions: List[ArrayDimension] = field(default_factory=list) initial_value: Optional[str] = None current_value: Optional[str] = None comment: Optional[str] = None children: List['VariableInfo'] = field(default_factory=list) is_udt_expanded_member: bool = False current_element_values: Optional[Dict[str, str]] = None element_type: str = "SIMPLE_VAR" # New field with default value @dataclass class UdtInfo: name: str family: Optional[str] = None version: Optional[str] = None members: List[VariableInfo] = field(default_factory=list) total_size_in_bytes: int = 0 @dataclass class DbInfo: name: str title: Optional[str] = None family: Optional[str] = None version: Optional[str] = None members: List[VariableInfo] = field(default_factory=list) total_size_in_bytes: int = 0 # Eliminamos los campos redundantes: # _begin_block_assignments_ordered y _initial_values_from_begin_block @dataclass class ParsedData: udts: List[UdtInfo] = field(default_factory=list) dbs: List[DbInfo] = field(default_factory=list) @dataclass class OffsetContext: byte_offset: int = 0 bit_offset: int = 0 def get_combined_offset(self) -> float: if self.bit_offset == 0: return float(self.byte_offset) return float(self.byte_offset * 10 + self.bit_offset) / 10.0 def advance_bits(self, num_bits: int): self.bit_offset += num_bits; self.byte_offset += self.bit_offset // 8; self.bit_offset %= 8 def align_to_byte(self): if self.bit_offset > 0: self.byte_offset += 1; self.bit_offset = 0 def align_to_word(self): self.align_to_byte() if self.byte_offset % 2 != 0: self.byte_offset += 1 # --- Fin Estructuras de Datos --- S7_PRIMITIVE_SIZES = { "BOOL": (0, 1, True), "BYTE": (1, 1, False), "CHAR": (1, 1, False), "SINT": (1, 1, False), "USINT": (1, 1, False), "WORD": (2, 2, False), "INT": (2, 2, False), "UINT": (2, 2, False), "S5TIME": (2, 2, False), "DATE": (2, 2, False), "DWORD": (4, 2, False), "DINT": (4, 2, False), "UDINT": (4, 2, False), "REAL": (4, 2, False), "TIME": (4, 2, False), "TIME_OF_DAY": (4, 2, False), "TOD": (4, 2, False), "LREAL": (8, 2, False), "LINT": (8, 2, False), "ULINT": (8, 2, False), "LWORD": (8, 2, False), "DATE_AND_TIME": (8, 2, False), "DT": (8, 2, False), } class S7Parser: def __init__(self): self.parsed_data = ParsedData() self.known_udts: Dict[str, UdtInfo] = {} self.type_start_regex = re.compile(r'^\s*TYPE\s+"([^"]+)"', re.IGNORECASE) self.db_start_regex = re.compile(r'^\s*DATA_BLOCK\s+"([^"]+)"', re.IGNORECASE) self.property_regex = re.compile(r'^\s*([A-Z_]+)\s*:\s*(.+?)\s*(?://.*)?$', re.IGNORECASE) self.struct_start_regex = re.compile(r'^\s*STRUCT\b', re.IGNORECASE) self.end_struct_regex = re.compile(r'^\s*END_STRUCT\b', re.IGNORECASE) self.end_type_regex = re.compile(r'^\s*END_TYPE\b', re.IGNORECASE) self.end_db_regex = re.compile(r'^\s*END_DATA_BLOCK\b', re.IGNORECASE) self.begin_regex = re.compile(r'^\s*BEGIN\b', re.IGNORECASE) self.var_regex_simplified = re.compile( r'^\s*(?P[a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*' r'(?P' r'(?:ARRAY\s*\[(?P[^\]]+?)\]\s*OF\s*)?' r'(?P(?:"[^"]+"|[a-zA-Z_][a-zA-Z0-9_]*))' r'(?:\s*\[\s*(?P\d+)\s*\])?' r')' r'(?:\s*:=\s*(?P[^;]*?))??\s*' r';?\s*$', re.IGNORECASE ) self.array_dim_regex = re.compile(r'(\d+)\s*\.\.\s*(\d+)') def _get_type_details(self, type_name_raw_cleaned: str) -> Tuple[int, int, bool, str]: type_name_upper = type_name_raw_cleaned.upper() if type_name_upper in S7_PRIMITIVE_SIZES: size, align, is_bool = S7_PRIMITIVE_SIZES[type_name_upper] return size, align, is_bool, type_name_upper elif type_name_raw_cleaned in self.known_udts: udt = self.known_udts[type_name_raw_cleaned] return udt.total_size_in_bytes, 2, False, type_name_raw_cleaned elif type_name_upper == "STRUCT": return 0, 2, False, "STRUCT" raise ValueError(f"Tipo de dato desconocido o UDT no definido: '{type_name_raw_cleaned}'") @staticmethod def _adjust_children_offsets(children: List[VariableInfo], base_offset_add: float): for child in children: child.byte_offset += base_offset_add if child.byte_offset == float(int(child.byte_offset)): child.byte_offset = float(int(child.byte_offset)) if child.children: S7Parser._adjust_children_offsets(child.children, base_offset_add) def _parse_struct_members(self, lines: List[str], current_line_idx: int, parent_members_list: List[VariableInfo], active_context: OffsetContext, is_top_level_struct_in_block: bool = False) -> int: idx_to_process = current_line_idx while idx_to_process < len(lines): original_line_text = lines[idx_to_process].strip() line_to_parse = original_line_text line_comment = None comment_marker_idx = original_line_text.find("//") if comment_marker_idx != -1: line_to_parse = original_line_text[:comment_marker_idx].strip() line_comment = original_line_text[comment_marker_idx + 2:].strip() line_index_for_return = idx_to_process idx_to_process += 1 if not line_to_parse: continue is_nested_end_struct = self.end_struct_regex.match(line_to_parse) and not is_top_level_struct_in_block is_main_block_end_struct = self.end_struct_regex.match(line_to_parse) and is_top_level_struct_in_block is_block_terminator = is_top_level_struct_in_block and \ (self.end_type_regex.match(line_to_parse) or \ self.end_db_regex.match(line_to_parse) or \ self.begin_regex.match(line_to_parse)) if is_nested_end_struct: active_context.align_to_byte() if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1 return idx_to_process if is_block_terminator: active_context.align_to_byte() if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1 return line_index_for_return if is_main_block_end_struct: pass var_match = self.var_regex_simplified.match(line_to_parse) if var_match: var_data = var_match.groupdict() raw_base_type_from_regex = var_data['basetype'].strip() clean_data_type = raw_base_type_from_regex.strip('"') udt_source_name_val = raw_base_type_from_regex if raw_base_type_from_regex.startswith('"') else None var_info = VariableInfo(name=var_data['name'], data_type=clean_data_type, byte_offset=0, size_in_bytes=0, udt_source_name=udt_source_name_val) # Set element_type based on what we know about the variable if var_data['arraydims']: var_info.element_type = "ARRAY" elif clean_data_type.upper() == "STRUCT": var_info.element_type = "STRUCT" elif udt_source_name_val: var_info.element_type = "UDT_INSTANCE" else: var_info.element_type = "SIMPLE_VAR" if var_data.get('initval'): var_info.initial_value = var_data['initval'].strip() if line_comment: var_info.comment = line_comment num_array_elements = 1 if var_data['arraydims']: for dim_match in self.array_dim_regex.finditer(var_data['arraydims']): var_info.array_dimensions.append(ArrayDimension(int(dim_match.group(1)), int(dim_match.group(2)))) if var_info.array_dimensions: for dim in var_info.array_dimensions: num_array_elements *= dim.count if var_info.data_type.upper() == "STRUCT": active_context.align_to_word(); var_info.byte_offset = active_context.get_combined_offset() nested_struct_context = OffsetContext() idx_after_nested_struct = self._parse_struct_members(lines, idx_to_process, var_info.children, nested_struct_context, False) var_info.size_in_bytes = nested_struct_context.byte_offset for child in var_info.children: child.byte_offset += var_info.byte_offset if child.byte_offset == float(int(child.byte_offset)): child.byte_offset = float(int(child.byte_offset)) if child.children: S7Parser._adjust_children_offsets(child.children, var_info.byte_offset) active_context.byte_offset += var_info.size_in_bytes; idx_to_process = idx_after_nested_struct elif var_info.data_type.upper() == "STRING" and var_data['stringlength']: var_info.string_length = int(var_data['stringlength']); unit_size = var_info.string_length + 2 active_context.align_to_word(); var_info.byte_offset = active_context.get_combined_offset() var_info.size_in_bytes = unit_size * num_array_elements active_context.byte_offset += var_info.size_in_bytes else: unit_size_bytes, unit_alignment_req, is_bool, type_name_for_udt_lookup = self._get_type_details(var_info.data_type) if is_bool: var_info.bit_size = 1; var_info.byte_offset = active_context.get_combined_offset() active_context.advance_bits(num_array_elements) start_byte_abs = int(var_info.byte_offset); start_bit_in_byte = int(round((var_info.byte_offset - start_byte_abs) * 10)) if num_array_elements == 1: var_info.size_in_bytes = 0 else: bits_rem = num_array_elements; bytes_spanned = 0 if start_bit_in_byte > 0: bits_in_first = 8 - start_bit_in_byte if bits_rem <= bits_in_first: bytes_spanned = 1 else: bytes_spanned = 1; bits_rem -= bits_in_first; bytes_spanned += (bits_rem + 7) // 8 else: bytes_spanned = (bits_rem + 7) // 8 var_info.size_in_bytes = bytes_spanned else: active_context.align_to_byte() if unit_alignment_req == 2: active_context.align_to_word() var_info.byte_offset = active_context.get_combined_offset() var_info.size_in_bytes = unit_size_bytes * num_array_elements active_context.byte_offset += var_info.size_in_bytes if type_name_for_udt_lookup in self.known_udts and not is_bool: udt_def = self.known_udts[type_name_for_udt_lookup]; udt_instance_abs_start_offset = var_info.byte_offset for udt_member_template in udt_def.members: expanded_member = copy.deepcopy(udt_member_template); expanded_member.is_udt_expanded_member = True expanded_member.byte_offset += udt_instance_abs_start_offset if expanded_member.byte_offset == float(int(expanded_member.byte_offset)): expanded_member.byte_offset = float(int(expanded_member.byte_offset)) if expanded_member.children: S7Parser._adjust_children_offsets(expanded_member.children, udt_instance_abs_start_offset) var_info.children.append(expanded_member) parent_members_list.append(var_info) elif line_to_parse and \ not self.struct_start_regex.match(line_to_parse) and \ not is_main_block_end_struct and \ not is_nested_end_struct and \ not is_block_terminator : print(f"DEBUG (_parse_struct_members): Line not parsed: Original='{original_line_text}' | Processed='{line_to_parse}'") return idx_to_process def _parse_begin_block(self, lines: List[str], start_idx: int, db_info: DbInfo) -> int: """ Parsea el bloque BEGIN y aplica directamente los valores a las variables correspondientes, calculando también offsets para elementos de arrays. """ idx = start_idx assignment_regex = re.compile(r'^\s*(?P.+?)\s*:=\s*(?P.+?)\s*;?\s*$', re.IGNORECASE) # Diccionario temporal para mapear rutas a variables path_to_var_map = {} # Función para calcular offset de elemento de array def calculate_array_element_offset(var: VariableInfo, indices_str: str) -> float: # Parsear los índices (pueden ser múltiples para arrays multidimensionales) indices = [int(idx.strip()) for idx in indices_str.split(',')] # Obtener dimensiones del array dimensions = var.array_dimensions if not dimensions or len(indices) != len(dimensions): return var.byte_offset # No podemos calcular, devolver offset base # Determinar tamaño de cada elemento base element_size = 0 is_bit_array = False if var.data_type.upper() == "BOOL": is_bit_array = True element_size = 0.1 # 0.1 byte = 1 bit (representación decimal) elif var.data_type.upper() == "STRING" and var.string_length is not None: element_size = var.string_length + 2 else: # Para tipos primitivos y UDTs data_type_upper = var.data_type.upper() if data_type_upper in S7_PRIMITIVE_SIZES: element_size = S7_PRIMITIVE_SIZES[data_type_upper][0] elif var.data_type in self.known_udts: element_size = self.known_udts[var.data_type].total_size_in_bytes else: # Si no podemos determinar tamaño, usar tamaño total / elementos total_elements = 1 for dim in dimensions: total_elements *= dim.count if total_elements > 0 and var.size_in_bytes > 0: element_size = var.size_in_bytes / total_elements # Calcular offset para arrays multidimensionales # Necesitamos calcular el índice lineal basado en índices multidimensionales linear_index = 0 dimension_multiplier = 1 # Calcular desde la dimensión más interna a la más externa # Los índices en S7 comienzan en las dimensiones a la izquierda for i in range(len(indices)-1, -1, -1): # Ajustar por el índice inicial de cada dimensión adjusted_index = indices[i] - dimensions[i].lower_bound linear_index += adjusted_index * dimension_multiplier # Multiplicador para la siguiente dimensión if i > 0: # No es necesario para la última iteración dimension_multiplier *= dimensions[i].count # Para arrays de bits, tenemos que calcular bit por bit if is_bit_array: base_byte = int(var.byte_offset) base_bit = int(round((var.byte_offset - base_byte) * 10)) # Calcular nuevo bit y byte new_bit = base_bit + linear_index new_byte = base_byte + (new_bit // 8) new_bit_position = new_bit % 8 return float(new_byte) + (float(new_bit_position) / 10.0) else: # Para tipos regulares, simplemente sumamos el offset lineal return var.byte_offset + (linear_index * element_size) # Construir mapa de rutas a variables def build_path_map(members: List[VariableInfo], prefix: str = ""): for var in members: var_path = f"{prefix}{var.name}" path_to_var_map[var_path] = var # Para arrays, inicializar diccionario de elementos si es necesario if var.array_dimensions: var.current_element_values = {} # Para variables con hijos, procesar recursivamente if var.children: build_path_map(var.children, f"{var_path}.") # Construir el mapa antes de procesar el bloque BEGIN build_path_map(db_info.members) # Ahora procesar el bloque BEGIN while idx < len(lines): original_line = lines[idx].strip() line_to_parse = original_line comment_marker = original_line.find("//") if comment_marker != -1: line_to_parse = original_line[:comment_marker].strip() if self.end_db_regex.match(line_to_parse): break idx += 1 if not line_to_parse: continue match = assignment_regex.match(line_to_parse) if match: path, value = match.group("path").strip(), match.group("value").strip().rstrip(';').strip() # Distinguir entre asignación a elemento de array y variable normal if '[' in path and ']' in path: # Es un elemento de array array_path = path[:path.find('[')] indices = path[path.find('[')+1:path.find(']')] if array_path in path_to_var_map: var = path_to_var_map[array_path] if var.current_element_values is None: var.current_element_values = {} # Calcular y guardar el offset real del elemento element_offset = calculate_array_element_offset(var, indices) # Guardar como un objeto con valor y offset var.current_element_values[indices] = { "value": value, "offset": element_offset } elif path in path_to_var_map: # Es una variable normal (o array completo) var = path_to_var_map[path] var.current_value = value # También manejar rutas jerárquicas (e.g., MyStruct.MyField) if '.' in path and '[' not in path: # Para simplificar, excluimos arrays con path jerárquico parts = path.split('.') current_path = "" current_var = None # Navegar por la jerarquía for i, part in enumerate(parts): if current_path: current_path += f".{part}" else: current_path = part if current_path in path_to_var_map: current_var = path_to_var_map[current_path] # Si es el último componente, asignar valor if i == len(parts) - 1 and current_var: current_var.current_value = value # Propagar valores iniciales a variables sin asignación explícita def propagate_initial_values(members: List[VariableInfo]): for var in members: # Si no tiene current_value pero tiene initial_value, copiar if var.current_value is None and var.initial_value is not None: var.current_value = var.initial_value # Recursión para hijos if var.children: propagate_initial_values(var.children) # Propagar valores iniciales propagate_initial_values(db_info.members) return idx def parse_file(self, filepath: str) -> ParsedData: try: with open(filepath, 'r', encoding='utf-8-sig') as f: lines = f.readlines() except Exception as e: print(f"Error al leer el archivo {filepath}: {e}"); return self.parsed_data current_block_handler: Optional[Union[UdtInfo, DbInfo]] = None active_block_context = OffsetContext(); parsing_title_value_next_line = False; idx = 0 while idx < len(lines): original_line_text = lines[idx]; stripped_original_line = original_line_text.strip() line_to_parse = stripped_original_line; comment_marker = stripped_original_line.find("//") if comment_marker != -1: line_to_parse = stripped_original_line[:comment_marker].strip() if parsing_title_value_next_line and isinstance(current_block_handler, DbInfo): title_value_candidate = original_line_text.strip() if title_value_candidate.startswith("{") and title_value_candidate.endswith("}"): current_block_handler.title = title_value_candidate else: print(f"Advertencia: Se esperaba valor de TITLE {{...}} pero se encontró: '{title_value_candidate}'") parsing_title_value_next_line = False; idx += 1; continue type_match = self.type_start_regex.match(line_to_parse); db_match = self.db_start_regex.match(line_to_parse) if type_match: udt_name = type_match.group(1); current_block_handler = UdtInfo(name=udt_name) self.parsed_data.udts.append(current_block_handler); active_block_context = OffsetContext(); idx +=1; continue elif db_match: db_name = db_match.group(1); current_block_handler = DbInfo(name=db_name) self.parsed_data.dbs.append(current_block_handler); active_block_context = OffsetContext(); idx +=1; continue if not current_block_handler: idx +=1; continue if line_to_parse.upper() == "TITLE =": if isinstance(current_block_handler, DbInfo): parsing_title_value_next_line = True; idx += 1; continue prop_match = self.property_regex.match(stripped_original_line) struct_keyword_match = self.struct_start_regex.match(line_to_parse) if prop_match and not parsing_title_value_next_line: key, value = prop_match.group(1).upper(), prop_match.group(2).strip() if value.endswith(';'): value = value[:-1].strip() attr = key.lower() if hasattr(current_block_handler, attr): if attr == 'title' and current_block_handler.title is not None: pass else: setattr(current_block_handler, attr, value) elif struct_keyword_match and not current_block_handler.members: idx = self._parse_struct_members(lines, idx + 1, current_block_handler.members, active_block_context, True); continue elif self.begin_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo): current_block_handler.total_size_in_bytes = active_block_context.byte_offset idx = self._parse_begin_block(lines, idx + 1, current_block_handler); continue elif self.end_type_regex.match(line_to_parse) and isinstance(current_block_handler, UdtInfo): if current_block_handler.total_size_in_bytes == 0: current_block_handler.total_size_in_bytes = active_block_context.byte_offset self.known_udts[current_block_handler.name] = current_block_handler current_block_handler = None; parsing_title_value_next_line = False elif self.end_db_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo): if current_block_handler.total_size_in_bytes == 0 : current_block_handler.total_size_in_bytes = active_block_context.byte_offset # Ya no necesitamos aplicar valores, porque se aplican directamente en _parse_begin_block current_block_handler = None; parsing_title_value_next_line = False idx += 1 return self.parsed_data def custom_json_serializer(obj: Any) -> Any: if isinstance(obj, OffsetContext): return None if isinstance(obj, ArrayDimension): return { 'lower_bound': obj.lower_bound, 'upper_bound': obj.upper_bound, 'count': obj.count } if hasattr(obj, '__dict__'): d = {k: v for k, v in obj.__dict__.items() if not (v is None or (isinstance(v, list) and not v))} if isinstance(obj, VariableInfo): if not obj.is_udt_expanded_member and 'is_udt_expanded_member' not in d: d['is_udt_expanded_member'] = False # Manejar current_element_values con format especial para offsets if 'current_element_values' in d: if not d['current_element_values']: del d['current_element_values'] else: # Asegurar que current_element_values se serializa correctamente element_values = d['current_element_values'] if isinstance(element_values, dict): # Preservar el formato {índice: {value, offset}} d['current_element_values'] = element_values return d raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable: {type(obj)}") def format_address_for_display(byte_offset: float, bit_size: int = 0) -> str: """ Formatea correctamente la dirección para mostrar, preservando el índice del bit para BOOLs. Args: byte_offset: El offset en bytes (con parte decimal para bits) bit_size: Tamaño en bits (>0 para BOOLs) Returns: String formateado como "X.Y" para bits o "X" para bytes completos """ if bit_size > 0: # Para BOOL, extraer y mostrar el byte y bit exactos byte_part = int(byte_offset) # Multiplicamos por 10 y tomamos el entero para obtener el índice correcto del bit bit_part = int(round((byte_offset - byte_part) * 10)) return f"{byte_part}.{bit_part}" else: # Para otros tipos, mostrar como entero si es un byte completo if byte_offset == float(int(byte_offset)): return str(int(byte_offset)) return f"{byte_offset:.1f}" def compare_offsets(offset1: float, offset2: float) -> int: """ Compara dos offsets considerando tanto la parte del byte como la del bit. Returns: -1 si offset1 < offset2, 0 si son iguales, 1 si offset1 > offset2 """ # Extraer partes de byte y bit byte1 = int(offset1) bit1 = int(round((offset1 - byte1) * 10)) byte2 = int(offset2) bit2 = int(round((offset2 - byte2) * 10)) # Comparar primero por byte if byte1 < byte2: return -1 elif byte1 > byte2: return 1 # Si bytes son iguales, comparar por bit if bit1 < bit2: return -1 elif bit1 > bit2: return 1 # Son exactamente iguales return 0 def calculate_array_element_offset(var: VariableInfo, indices_str: str) -> float: """ Calcula el offset exacto para un elemento de array basado en sus índices. Maneja correctamente arrays de bits y multidimensionales. Args: var: Variable información del array indices_str: String de índices (e.g. "1,2" para array bidimensional) Returns: Offset calculado como float, con parte decimal para bits """ # Parsear los índices (pueden ser múltiples para arrays multidimensionales) indices = [int(idx.strip()) for idx in indices_str.split(',')] # Obtener dimensiones del array dimensions = var.array_dimensions if not dimensions or len(indices) != len(dimensions): return var.byte_offset # No podemos calcular, devolver offset base # Determinar tamaño de cada elemento base element_size = 0 is_bit_array = False if var.data_type.upper() == "BOOL": is_bit_array = True element_size = 0.1 # 0.1 byte = 1 bit (representación decimal) elif var.data_type.upper() == "STRING" and var.string_length is not None: element_size = var.string_length + 2 # Para strings, sumar 2 bytes de cabecera else: # Para tipos primitivos y UDTs data_type_upper = var.data_type.upper() if data_type_upper in S7_PRIMITIVE_SIZES: element_size = S7_PRIMITIVE_SIZES[data_type_upper][0] elif var.data_type in self.known_udts: element_size = self.known_udts[var.data_type].total_size_in_bytes else: # Si no podemos determinar tamaño, usar tamaño total / elementos total_elements = 1 for dim in dimensions: total_elements *= dim.count if total_elements > 0 and var.size_in_bytes > 0: element_size = var.size_in_bytes / total_elements # Calcular offset para arrays multidimensionales # En S7, los arrays se almacenan en orden Row-Major (la última dimensión varía más rápido) linear_index = 0 dimension_multiplier = 1 # Calcular desde la dimensión más interna a la más externa # Para S7, procesamos desde la última dimensión hacia la primera for i in range(len(indices)-1, -1, -1): # Ajustar por el índice inicial de cada dimensión adjusted_index = indices[i] - dimensions[i].lower_bound linear_index += adjusted_index * dimension_multiplier # Multiplicador para la siguiente dimensión if i > 0: # No es necesario para la última iteración dimension_multiplier *= dimensions[i].count # Calcular offset según tipo if is_bit_array: # Para arrays de bits, calcular bit por bit base_byte = int(var.byte_offset) base_bit = int(round((var.byte_offset - base_byte) * 10)) # Calcular nuevo bit y byte new_bit = base_bit + linear_index new_byte = base_byte + (new_bit // 8) new_bit_position = new_bit % 8 # Formato S7: byte.bit con bit de 0-7 return float(new_byte) + (float(new_bit_position) / 10.0) else: # Para tipos regulares, simplemente sumar el offset lineal * tamaño elemento return var.byte_offset + (linear_index * element_size) def flatten_db_structure(db_info: Dict[str, Any]) -> List[Dict[str, Any]]: """ Función que aplana completamente una estructura de DB/UDT, expandiendo todas las variables anidadas, UDTs y elementos de array. Añade punteros jerárquicos para acceso directo a las variables originales. Returns: List[Dict]: Lista de variables aplanadas con todos sus atributos, rutas completas y punteros jerárquicos, ordenada por offset. """ flat_variables = [] processed_ids = set() # Para evitar duplicados def process_variable(var: Dict[str, Any], path_prefix: str = "", is_expansion: bool = False, hierarchy_path=None): # Inicializar hierarchy_path si es None if hierarchy_path is None: hierarchy_path = [] # Identificador único para esta variable en este contexto var_id = f"{path_prefix}{var['name']}_{var['byte_offset']}" # Evitar procesar duplicados (como miembros expandidos de UDTs) if is_expansion and var_id in processed_ids: return if is_expansion: processed_ids.add(var_id) # Crear copia de la variable con path completo flat_var = var.copy() flat_var["full_path"] = f"{path_prefix}{var['name']}" flat_var["is_array_element"] = False # Por defecto no es elemento de array # NUEVO: Guardar el camino jerárquico para acceso directo flat_var["_hierarchy_path"] = copy.deepcopy(hierarchy_path) # Preservar o inferir element_type if "element_type" not in flat_var: # Inferir tipo para compatibilidad hacia atrás if var.get("array_dimensions"): flat_var["element_type"] = "ARRAY" elif var.get("children") and var["data_type"].upper() == "STRUCT": flat_var["element_type"] = "STRUCT" elif var.get("udt_source_name"): flat_var["element_type"] = "UDT_INSTANCE" else: flat_var["element_type"] = "SIMPLE_VAR" # Determinar si es un array con valores específicos is_array = bool(var.get("array_dimensions")) has_array_values = is_array and var.get("current_element_values") # Si no es un array con valores específicos, agregar la variable base if not has_array_values: # Asegurarse de que el offset esté en el formato correcto flat_var["address_display"] = format_address_for_display(var["byte_offset"], var.get("bit_size", 0)) flat_variables.append(flat_var) # Si es un array con valores específicos, expandir cada elemento como variable individual if has_array_values: for idx, element_data in var.get("current_element_values", {}).items(): # Extraer valor y offset del elemento if isinstance(element_data, dict) and "value" in element_data and "offset" in element_data: # Nuevo formato con offset calculado value = element_data["value"] element_offset = element_data["offset"] else: # Compatibilidad con formato antiguo value = element_data element_offset = var["byte_offset"] # Offset base # Crear una entrada por cada elemento del array array_element = var.copy() array_element["full_path"] = f"{path_prefix}{var['name']}[{idx}]" array_element["is_array_element"] = True array_element["array_index"] = idx array_element["current_value"] = value array_element["byte_offset"] = element_offset # Usar offset calculado array_element["address_display"] = format_address_for_display(element_offset, var.get("bit_size", 0)) array_element["element_type"] = "ARRAY_ELEMENT" # Para elementos de array, guardamos el camino al array + índice array_element["_hierarchy_path"] = copy.deepcopy(hierarchy_path) array_element["_array_index"] = idx # Para acceso directo al elemento específico # Eliminar current_element_values para evitar redundancia if "current_element_values" in array_element: del array_element["current_element_values"] flat_variables.append(array_element) # Procesar recursivamente todos los hijos if var.get("children"): for i, child in enumerate(var.get("children", [])): child_hierarchy = copy.deepcopy(hierarchy_path) child_hierarchy.append({"type": "children", "index": i}) process_variable( child, f"{path_prefix}{var['name']}.", is_expansion=bool(var.get("udt_source_name")), hierarchy_path=child_hierarchy ) # Procesar todos los miembros desde el nivel superior for i, member in enumerate(db_info.get("members", [])): process_variable(member, hierarchy_path=[{"type": "members", "index": i}]) # Ordenar estrictamente por offset byte.bit flat_variables.sort(key=lambda x: ( int(x["byte_offset"]), int(round((x["byte_offset"] - int(x["byte_offset"])) * 10)) )) return flat_variables def access_by_hierarchy_path(root_obj: Dict[str, Any], hierarchy_path: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ Accede directamente a un elemento usando su ruta jerárquica. Args: root_obj: Objeto raíz (generalmente un DB) donde comienza la navegación hierarchy_path: Lista de pasos jerárquicos, cada uno con "type" e "index" Returns: El objeto encontrado o None si no se puede acceder """ if not hierarchy_path: return None current = root_obj for path_step in hierarchy_path: container_type = path_step["type"] # "members" o "children" index = path_step["index"] # Verificar que el contenedor existe if container_type not in current: print(f"Error: No se encontró el contenedor '{container_type}' en la ruta jerárquica") return None container = current[container_type] # Verificar que el índice es válido if not isinstance(container, list) or len(container) <= index: print(f"Error: Índice {index} fuera de rango en la ruta jerárquica") return None # Navegar al siguiente nivel current = container[index] return current if __name__ == "__main__": working_dir = find_working_directory() print(f"Using working directory: {working_dir}") output_json_dir = os.path.join(working_dir, "json") os.makedirs(output_json_dir, exist_ok=True) print(f"Los archivos JSON de salida se guardarán en: {output_json_dir}") source_files_db = glob.glob(os.path.join(working_dir, "*.db")) source_files_awl = glob.glob(os.path.join(working_dir, "*.awl")) all_source_files = source_files_db + source_files_awl if not all_source_files: print(f"No se encontraron archivos .db o .awl en {working_dir}") else: print(f"Archivos encontrados para procesar: {len(all_source_files)}") for filepath in all_source_files: parser = S7Parser() filename = os.path.basename(filepath) print(f"\n--- Procesando archivo: {filename} ---") parsed_result = parser.parse_file(filepath) output_filename_base = os.path.splitext(filename)[0] json_output_filename = os.path.join(output_json_dir, f"{output_filename_base}.json") print(f"Parseo completo. Intentando serializar a JSON: {json_output_filename}") try: json_output = json.dumps(parsed_result, default=custom_json_serializer, indent=2) with open(json_output_filename, "w", encoding='utf-8') as f: f.write(json_output) print(f"Resultado guardado en: {json_output_filename}") except Exception as e: print(f"Error durante la serialización JSON o escritura del archivo {json_output_filename}: {e}") print("\n--- Proceso completado ---")