# --- x3.py (Modificaciones v_final_2) --- import re import json from dataclasses import dataclass, field from typing import List, Dict, Optional, Union, Tuple, Any import copy # --- Estructuras de Datos (sin cambios respecto a v_final que te di antes, incluyendo DbInfo con ambas listas/dicts para BEGIN) --- @dataclass class ArrayDimension: lower_bound: int upper_bound: int @property def count(self) -> int: return self.upper_bound - self.lower_bound + 1 @dataclass class VariableInfo: name: str data_type: str byte_offset: float size_in_bytes: int bit_size: int = 0 udt_source_name: Optional[str] = None string_length: Optional[int] = None array_dimensions: List[ArrayDimension] = field(default_factory=list) initial_value: Optional[str] = None current_value: Optional[str] = None comment: Optional[str] = None children: List['VariableInfo'] = field(default_factory=list) is_udt_expanded_member: bool = False current_element_values: Optional[Dict[str, str]] = None @dataclass class UdtInfo: name: str family: Optional[str] = None version: Optional[str] = None members: List[VariableInfo] = field(default_factory=list) total_size_in_bytes: int = 0 @dataclass class DbInfo: name: str title: Optional[str] = None family: Optional[str] = None version: Optional[str] = None members: List[VariableInfo] = field(default_factory=list) total_size_in_bytes: int = 0 _begin_block_assignments_ordered: List[Tuple[str, str]] = field(default_factory=list) _initial_values_from_begin_block: Dict[str, str] = field(default_factory=dict) # Para búsquedas rápidas en _apply_current_values @dataclass class ParsedData: udts: List[UdtInfo] = field(default_factory=list) dbs: List[DbInfo] = field(default_factory=list) @dataclass class OffsetContext: # Sin cambios byte_offset: int = 0 bit_offset: int = 0 def get_combined_offset(self) -> float: if self.bit_offset == 0: return float(self.byte_offset) return float(self.byte_offset * 10 + self.bit_offset) / 10.0 def advance_bits(self, num_bits: int): self.bit_offset += num_bits self.byte_offset += self.bit_offset // 8 self.bit_offset %= 8 def align_to_byte(self): if self.bit_offset > 0: self.byte_offset += 1; self.bit_offset = 0 def align_to_word(self): self.align_to_byte() if self.byte_offset % 2 != 0: self.byte_offset += 1 # --- Fin Estructuras de Datos --- S7_PRIMITIVE_SIZES = { # Sin cambios "BOOL": (0, 1, True), "BYTE": (1, 1, False), "CHAR": (1, 1, False), "SINT": (1, 1, False), "USINT": (1, 1, False), "WORD": (2, 2, False), "INT": (2, 2, False), "UINT": (2, 2, False), "S5TIME": (2, 2, False), "DATE": (2, 2, False), "DWORD": (4, 2, False), "DINT": (4, 2, False), "UDINT": (4, 2, False), "REAL": (4, 2, False), "TIME": (4, 2, False), "TIME_OF_DAY": (4, 2, False), "TOD": (4, 2, False), "LREAL": (8, 2, False), "LINT": (8, 2, False), "ULINT": (8, 2, False), "LWORD": (8, 2, False), "DATE_AND_TIME": (8, 2, False), "DT": (8, 2, False), } class S7Parser: def __init__(self): # Sin cambios en regex básicos self.parsed_data = ParsedData() self.known_udts: Dict[str, UdtInfo] = {} self.type_start_regex = re.compile(r'^\s*TYPE\s+"([^"]+)"', re.IGNORECASE) self.db_start_regex = re.compile(r'^\s*DATA_BLOCK\s+"([^"]+)"', re.IGNORECASE) # Property regex: el valor NO debe terminar obligatoriamente en ; y puede ser capturado de forma no greedy self.property_regex = re.compile(r'^\s*([A-Z_]+)\s*:\s*(.+?)\s*(?://.*)?$', re.IGNORECASE) self.struct_start_regex = re.compile(r'^\s*STRUCT\b', re.IGNORECASE) self.end_struct_regex = re.compile(r'^\s*END_STRUCT\b', re.IGNORECASE) self.end_type_regex = re.compile(r'^\s*END_TYPE\b', re.IGNORECASE) self.end_db_regex = re.compile(r'^\s*END_DATA_BLOCK\b', re.IGNORECASE) self.begin_regex = re.compile(r'^\s*BEGIN\b', re.IGNORECASE) self.var_regex_simplified = re.compile( r'^\s*(?P[a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*' r'(?P' r'(?:ARRAY\s*\[(?P[^\]]+?)\]\s*OF\s*)?' r'(?P(?:"[^"]+"|[a-zA-Z_][a-zA-Z0-9_]*))' r'(?:\s*\[\s*(?P\d+)\s*\])?' r')' r'(?:\s*:=\s*(?P[^;]*?))??\s*' r';?\s*$', re.IGNORECASE ) self.array_dim_regex = re.compile(r'(\d+)\s*\.\.\s*(\d+)') # _get_type_details, _adjust_children_offsets, _parse_struct_members (sin cambios respecto a v_final) def _get_type_details(self, type_name_raw_cleaned: str) -> Tuple[int, int, bool, str]: type_name_upper = type_name_raw_cleaned.upper() if type_name_upper in S7_PRIMITIVE_SIZES: size, align, is_bool = S7_PRIMITIVE_SIZES[type_name_upper] return size, align, is_bool, type_name_upper elif type_name_raw_cleaned in self.known_udts: udt = self.known_udts[type_name_raw_cleaned] return udt.total_size_in_bytes, 2, False, type_name_raw_cleaned elif type_name_upper == "STRUCT": return 0, 2, False, "STRUCT" raise ValueError(f"Tipo de dato desconocido o UDT no definido: '{type_name_raw_cleaned}'") @staticmethod def _adjust_children_offsets(children: List[VariableInfo], base_offset_add: float): for child in children: child.byte_offset += base_offset_add if child.byte_offset == float(int(child.byte_offset)): child.byte_offset = float(int(child.byte_offset)) if child.children: S7Parser._adjust_children_offsets(child.children, base_offset_add) def _parse_struct_members(self, lines: List[str], current_line_idx: int, parent_members_list: List[VariableInfo], active_context: OffsetContext, is_top_level_struct_in_block: bool = False) -> int: idx_to_process = current_line_idx while idx_to_process < len(lines): original_line_text = lines[idx_to_process].strip() line_to_parse = original_line_text line_comment = None comment_marker_idx = original_line_text.find("//") if comment_marker_idx != -1: line_to_parse = original_line_text[:comment_marker_idx].strip() line_comment = original_line_text[comment_marker_idx + 2:].strip() line_index_for_return = idx_to_process idx_to_process += 1 if not line_to_parse: continue if self.end_struct_regex.match(line_to_parse): if not is_top_level_struct_in_block: active_context.align_to_byte() if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1 return idx_to_process if is_top_level_struct_in_block and \ (self.end_type_regex.match(line_to_parse) or \ self.end_db_regex.match(line_to_parse) or \ self.begin_regex.match(line_to_parse)): active_context.align_to_byte() if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1 return line_index_for_return var_match = self.var_regex_simplified.match(line_to_parse) if var_match: var_data = var_match.groupdict() raw_base_type_from_regex = var_data['basetype'].strip() clean_data_type = raw_base_type_from_regex.strip('"') udt_source_name_val = raw_base_type_from_regex if raw_base_type_from_regex.startswith('"') else None var_info = VariableInfo(name=var_data['name'], data_type=clean_data_type, byte_offset=0, size_in_bytes=0, udt_source_name=udt_source_name_val) if var_data.get('initval'): var_info.initial_value = var_data['initval'].strip() if line_comment: var_info.comment = line_comment num_array_elements = 1 if var_data['arraydims']: for dim_match in self.array_dim_regex.finditer(var_data['arraydims']): var_info.array_dimensions.append(ArrayDimension(int(dim_match.group(1)), int(dim_match.group(2)))) if var_info.array_dimensions: for dim in var_info.array_dimensions: num_array_elements *= dim.count if var_info.data_type.upper() == "STRUCT": active_context.align_to_word() var_info.byte_offset = active_context.get_combined_offset() nested_struct_context = OffsetContext() idx_after_nested_struct = self._parse_struct_members( lines, idx_to_process, var_info.children, nested_struct_context, is_top_level_struct_in_block=False ) var_info.size_in_bytes = nested_struct_context.byte_offset for child in var_info.children: child.byte_offset += var_info.byte_offset if child.byte_offset == float(int(child.byte_offset)): child.byte_offset = float(int(child.byte_offset)) if child.children: S7Parser._adjust_children_offsets(child.children, var_info.byte_offset) active_context.byte_offset += var_info.size_in_bytes idx_to_process = idx_after_nested_struct elif var_info.data_type.upper() == "STRING" and var_data['stringlength']: var_info.string_length = int(var_data['stringlength']) unit_size = var_info.string_length + 2 active_context.align_to_word() var_info.byte_offset = active_context.get_combined_offset() var_info.size_in_bytes = unit_size * num_array_elements active_context.byte_offset += var_info.size_in_bytes else: unit_size_bytes, unit_alignment_req, is_bool, type_name_for_udt_lookup = self._get_type_details(var_info.data_type) if is_bool: var_info.bit_size = 1 var_info.byte_offset = active_context.get_combined_offset() active_context.advance_bits(num_array_elements) start_byte_abs = int(var_info.byte_offset) start_bit_in_byte = int(round((var_info.byte_offset - start_byte_abs) * 10)) if num_array_elements == 1: var_info.size_in_bytes = 0 else: bits_rem = num_array_elements; bytes_spanned = 0 if start_bit_in_byte > 0: bits_in_first = 8 - start_bit_in_byte if bits_rem <= bits_in_first: bytes_spanned = 1 else: bytes_spanned = 1; bits_rem -= bits_in_first; bytes_spanned += (bits_rem + 7) // 8 else: bytes_spanned = (bits_rem + 7) // 8 var_info.size_in_bytes = bytes_spanned else: active_context.align_to_byte() if unit_alignment_req == 2: active_context.align_to_word() var_info.byte_offset = active_context.get_combined_offset() var_info.size_in_bytes = unit_size_bytes * num_array_elements active_context.byte_offset += var_info.size_in_bytes if type_name_for_udt_lookup in self.known_udts and not is_bool: udt_def = self.known_udts[type_name_for_udt_lookup] udt_instance_abs_start_offset = var_info.byte_offset for udt_member_template in udt_def.members: expanded_member = copy.deepcopy(udt_member_template) expanded_member.is_udt_expanded_member = True expanded_member.byte_offset += udt_instance_abs_start_offset if expanded_member.byte_offset == float(int(expanded_member.byte_offset)): expanded_member.byte_offset = float(int(expanded_member.byte_offset)) if expanded_member.children: S7Parser._adjust_children_offsets(expanded_member.children, udt_instance_abs_start_offset) var_info.children.append(expanded_member) parent_members_list.append(var_info) else: if line_to_parse and not self.struct_start_regex.match(line_to_parse): print(f"DEBUG (struct_members): Line not parsed: Original='{original_line_text}' | Processed='{line_to_parse}'") return idx_to_process def _parse_begin_block(self, lines: List[str], start_idx: int, db_info: DbInfo) -> int: idx = start_idx assignment_regex = re.compile(r'^\s*(?P.+?)\s*:=\s*(?P.+?)\s*;?\s*$', re.IGNORECASE) while idx < len(lines): original_line = lines[idx].strip() line_to_parse = original_line comment_marker = original_line.find("//") if comment_marker != -1: line_to_parse = original_line[:comment_marker].strip() if self.end_db_regex.match(line_to_parse): return idx idx += 1 if not line_to_parse: continue match = assignment_regex.match(line_to_parse) if match: path, value = match.group("path").strip(), match.group("value").strip().rstrip(';').strip() db_info._begin_block_assignments_ordered.append((path, value)) # Guardar ordenado db_info._initial_values_from_begin_block[path] = value # Guardar para búsqueda # else: print(f"DEBUG (begin_block): Line not matched: '{original_line}'") raise SyntaxError("Se esperaba END_DATA_BLOCK después de la sección BEGIN.") def _apply_current_values(self, members: List[VariableInfo], begin_values: Dict[str, str], current_path_prefix: str = ""): for var_info in members: full_member_path = f"{current_path_prefix}{var_info.name}" if var_info.array_dimensions: var_info.current_element_values = {} prefix_for_search = full_member_path + "[" for key_in_begin, val_in_begin in begin_values.items(): if key_in_begin.startswith(prefix_for_search) and key_in_begin.endswith("]"): try: indices_str = key_in_begin[len(prefix_for_search):-1] var_info.current_element_values[indices_str] = val_in_begin except: print(f"Advertencia: No se pudo parsear el índice para: {key_in_begin}") if not var_info.current_element_values: var_info.current_element_values = None if full_member_path in begin_values: var_info.current_value = begin_values[full_member_path] elif full_member_path in begin_values: var_info.current_value = begin_values[full_member_path] elif var_info.initial_value is not None: var_info.current_value = var_info.initial_value if var_info.children and not var_info.is_udt_expanded_member: self._apply_current_values(var_info.children, begin_values, f"{full_member_path}.") elif var_info.udt_source_name and var_info.children: # Instancia UDT con miembros expandidos self._apply_current_values(var_info.children, begin_values, f"{full_member_path}.") def parse_file(self, filepath: str) -> ParsedData: try: with open(filepath, 'r', encoding='utf-8-sig') as f: lines = f.readlines() except Exception as e: print(f"Error al leer el archivo {filepath}: {e}"); return self.parsed_data current_block_handler: Optional[Union[UdtInfo, DbInfo]] = None active_block_context = OffsetContext() parsing_title_value_next_line = False # Estado para parsear valor de TITLE idx = 0 while idx < len(lines): original_line_text = lines[idx] # Mantener espacios iniciales para manejo de TITLE stripped_original_line = original_line_text.strip() line_to_parse = stripped_original_line # line_comment_content = None # No es necesario aquí ya que property_regex lo ignora comment_marker = stripped_original_line.find("//") if comment_marker != -1: line_to_parse = stripped_original_line[:comment_marker].strip() if parsing_title_value_next_line and isinstance(current_block_handler, DbInfo): # El valor de TITLE está indentado y es la línea actual completa (con llaves) title_value_candidate = original_line_text.strip() # Usar la línea original strip() if title_value_candidate.startswith("{") and title_value_candidate.endswith("}"): current_block_handler.title = title_value_candidate print(f"DEBUG: Parsed TITLE value: {current_block_handler.title}") else: print(f"Advertencia: Se esperaba valor de TITLE {{...}} pero se encontró: '{title_value_candidate}' en la línea '{original_line_text.strip()}'") parsing_title_value_next_line = False # Resetear estado, ya sea que se haya encontrado o no idx += 1; continue # Consumir esta línea y pasar a la siguiente type_match = self.type_start_regex.match(line_to_parse) db_match = self.db_start_regex.match(line_to_parse) if type_match: udt_name = type_match.group(1); current_block_handler = UdtInfo(name=udt_name) self.parsed_data.udts.append(current_block_handler); active_block_context = OffsetContext() idx +=1; continue elif db_match: db_name = db_match.group(1); current_block_handler = DbInfo(name=db_name) self.parsed_data.dbs.append(current_block_handler); active_block_context = OffsetContext() idx +=1; continue if not current_block_handler: idx +=1; continue # Manejo especial para "TITLE =" if line_to_parse.upper() == "TITLE =": if isinstance(current_block_handler, DbInfo): parsing_title_value_next_line = True print(f"DEBUG: Found 'TITLE =', expecting value on next line.") idx += 1; continue prop_match = self.property_regex.match(stripped_original_line) # Usar stripped_original_line para que la regex vea la línea como en el archivo struct_keyword_match = self.struct_start_regex.match(line_to_parse) if prop_match and not parsing_title_value_next_line: key, value = prop_match.group(1).upper(), prop_match.group(2).strip() # El punto y coma de las propiedades es opcional en la regex y se quita si está if value.endswith(';'): value = value[:-1].strip() attr = key.lower() if hasattr(current_block_handler, attr): if attr == 'title' and current_block_handler.title is not None: pass else: setattr(current_block_handler, attr, value) # else: print(f"DEBUG: Property '{key}' not directly settable on {type(current_block_handler)}") elif struct_keyword_match and not current_block_handler.members: idx = self._parse_struct_members(lines, idx + 1, current_block_handler.members, active_block_context, is_top_level_struct_in_block=True) continue elif self.begin_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo): current_block_handler.total_size_in_bytes = active_block_context.byte_offset idx = self._parse_begin_block(lines, idx + 1, current_block_handler) continue elif self.end_type_regex.match(line_to_parse) and isinstance(current_block_handler, UdtInfo): if current_block_handler.total_size_in_bytes == 0: current_block_handler.total_size_in_bytes = active_block_context.byte_offset self.known_udts[current_block_handler.name] = current_block_handler print(f"Parsed UDT: {current_block_handler.name}, Size: {current_block_handler.total_size_in_bytes}b, Members: {len(current_block_handler.members)}") current_block_handler = None; parsing_title_value_next_line = False elif self.end_db_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo): if current_block_handler.total_size_in_bytes == 0 : current_block_handler.total_size_in_bytes = active_block_context.byte_offset self._apply_current_values(current_block_handler.members, current_block_handler._initial_values_from_begin_block) print(f"Parsed DB: {current_block_handler.name}, Decl.Size: {current_block_handler.total_size_in_bytes}b, Members: {len(current_block_handler.members)}, BEGIN assigns: {len(current_block_handler._begin_block_assignments_ordered)}") current_block_handler = None; parsing_title_value_next_line = False # else: # if line_to_parse: # Si no está vacía y no fue ninguna de las anteriores # print(f"DEBUG (main_loop): Line not processed: '{stripped_original_line}' | Parsed as: '{line_to_parse}'") idx += 1 return self.parsed_data # --- Serializador JSON Personalizado en x3.py --- def custom_json_serializer(obj: Any) -> Any: if isinstance(obj, OffsetContext): return None if hasattr(obj, '__dict__'): # Incluir _initial_values_from_begin_block y _begin_block_assignments_ordered # Filtrar solo None y listas vacías, no por nombre de campo específico a menos que sea temporal d = {k: v for k, v in obj.__dict__.items() if not (v is None or (isinstance(v, list) and not v))} # Asegurar que los booleanos y diccionarios de elementos de array se manejen bien if isinstance(obj, VariableInfo): if not obj.is_udt_expanded_member and 'is_udt_expanded_member' not in d : # Añadir si es False y no está d['is_udt_expanded_member'] = False if not obj.current_element_values and 'current_element_values' in d: # Quitar si es un dict vacío del d['current_element_values'] if isinstance(obj, DbInfo): if not obj._begin_block_assignments_ordered and '_begin_block_assignments_ordered' in d: del d['_begin_block_assignments_ordered'] if not obj._initial_values_from_begin_block and '_initial_values_from_begin_block' in d: del d['_initial_values_from_begin_block'] return d raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") # --- Bloque Principal --- if __name__ == "__main__": parser = S7Parser() filepath = "db1001_data.db.txt" print(f"Intentando parsear el archivo: {filepath}") parsed_result = parser.parse_file(filepath) json_output_filename = "parsed_s7_data_stat.json" print(f"\nParseo completo. Intentando serializar a JSON.") try: json_output = json.dumps(parsed_result, default=custom_json_serializer, indent=2) with open(json_output_filename, "w", encoding='utf-8') as f: f.write(json_output) print(f"Resultado guardado en: {json_output_filename}") except Exception as e: print(f"Error durante la serialización JSON o escritura de archivo: {e}")