836 lines
40 KiB
Python
836 lines
40 KiB
Python
# --- x3_refactored.py ---
|
|
import re
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Optional, Union, Tuple, Any
|
|
import os
|
|
import glob
|
|
import copy
|
|
import sys
|
|
|
|
script_root = os.path.dirname(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
|
)
|
|
sys.path.append(script_root)
|
|
from backend.script_utils import load_configuration
|
|
|
|
def find_working_directory():
|
|
configs = load_configuration()
|
|
working_directory = configs.get("working_directory")
|
|
if not working_directory:
|
|
print("No working directory specified in the configuration file.")
|
|
sys.exit(1)
|
|
return working_directory
|
|
|
|
# --- Estructuras de Datos ---
|
|
@dataclass
|
|
class ArrayDimension:
|
|
lower_bound: int
|
|
upper_bound: int
|
|
|
|
@property
|
|
def count(self) -> int:
|
|
return self.upper_bound - self.lower_bound + 1
|
|
|
|
@dataclass
|
|
class VariableInfo:
|
|
name: str
|
|
data_type: str
|
|
byte_offset: float
|
|
size_in_bytes: int
|
|
bit_size: int = 0
|
|
udt_source_name: Optional[str] = None
|
|
string_length: Optional[int] = None
|
|
array_dimensions: List[ArrayDimension] = field(default_factory=list)
|
|
initial_value: Optional[str] = None
|
|
current_value: Optional[str] = None
|
|
comment: Optional[str] = None
|
|
children: List['VariableInfo'] = field(default_factory=list)
|
|
is_udt_expanded_member: bool = False
|
|
current_element_values: Optional[Dict[str, str]] = None
|
|
element_type: str = "SIMPLE_VAR" # New field with default value
|
|
|
|
@dataclass
|
|
class UdtInfo:
|
|
name: str
|
|
family: Optional[str] = None
|
|
version: Optional[str] = None
|
|
members: List[VariableInfo] = field(default_factory=list)
|
|
total_size_in_bytes: int = 0
|
|
|
|
@dataclass
|
|
class DbInfo:
|
|
name: str
|
|
title: Optional[str] = None
|
|
family: Optional[str] = None
|
|
version: Optional[str] = None
|
|
members: List[VariableInfo] = field(default_factory=list)
|
|
total_size_in_bytes: int = 0
|
|
# Eliminamos los campos redundantes:
|
|
# _begin_block_assignments_ordered y _initial_values_from_begin_block
|
|
|
|
@dataclass
|
|
class ParsedData:
|
|
udts: List[UdtInfo] = field(default_factory=list)
|
|
dbs: List[DbInfo] = field(default_factory=list)
|
|
|
|
@dataclass
|
|
class OffsetContext:
|
|
byte_offset: int = 0
|
|
bit_offset: int = 0
|
|
def get_combined_offset(self) -> float:
|
|
if self.bit_offset == 0: return float(self.byte_offset)
|
|
return float(self.byte_offset * 10 + self.bit_offset) / 10.0
|
|
def advance_bits(self, num_bits: int):
|
|
self.bit_offset += num_bits; self.byte_offset += self.bit_offset // 8; self.bit_offset %= 8
|
|
def align_to_byte(self):
|
|
if self.bit_offset > 0: self.byte_offset += 1; self.bit_offset = 0
|
|
def align_to_word(self):
|
|
self.align_to_byte()
|
|
if self.byte_offset % 2 != 0: self.byte_offset += 1
|
|
# --- Fin Estructuras de Datos ---
|
|
|
|
S7_PRIMITIVE_SIZES = {
|
|
"BOOL": (0, 1, True), "BYTE": (1, 1, False), "CHAR": (1, 1, False),
|
|
"SINT": (1, 1, False), "USINT": (1, 1, False), "WORD": (2, 2, False),
|
|
"INT": (2, 2, False), "UINT": (2, 2, False), "S5TIME": (2, 2, False),
|
|
"DATE": (2, 2, False), "DWORD": (4, 2, False), "DINT": (4, 2, False),
|
|
"UDINT": (4, 2, False), "REAL": (4, 2, False), "TIME": (4, 2, False),
|
|
"TIME_OF_DAY": (4, 2, False), "TOD": (4, 2, False),
|
|
"LREAL": (8, 2, False), "LINT": (8, 2, False), "ULINT": (8, 2, False),
|
|
"LWORD": (8, 2, False), "DATE_AND_TIME": (8, 2, False), "DT": (8, 2, False),
|
|
}
|
|
|
|
class S7Parser:
|
|
def __init__(self):
|
|
self.parsed_data = ParsedData()
|
|
self.known_udts: Dict[str, UdtInfo] = {}
|
|
self.type_start_regex = re.compile(r'^\s*TYPE\s+"([^"]+)"', re.IGNORECASE)
|
|
self.db_start_regex = re.compile(r'^\s*DATA_BLOCK\s+"([^"]+)"', re.IGNORECASE)
|
|
self.property_regex = re.compile(r'^\s*([A-Z_]+)\s*:\s*(.+?)\s*(?://.*)?$', re.IGNORECASE)
|
|
self.struct_start_regex = re.compile(r'^\s*STRUCT\b', re.IGNORECASE)
|
|
self.end_struct_regex = re.compile(r'^\s*END_STRUCT\b', re.IGNORECASE)
|
|
self.end_type_regex = re.compile(r'^\s*END_TYPE\b', re.IGNORECASE)
|
|
self.end_db_regex = re.compile(r'^\s*END_DATA_BLOCK\b', re.IGNORECASE)
|
|
self.begin_regex = re.compile(r'^\s*BEGIN\b', re.IGNORECASE)
|
|
self.var_regex_simplified = re.compile(
|
|
r'^\s*(?P<name>[a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*'
|
|
r'(?P<typefull>'
|
|
r'(?:ARRAY\s*\[(?P<arraydims>[^\]]+?)\]\s*OF\s*)?'
|
|
r'(?P<basetype>(?:"[^"]+"|[a-zA-Z_][a-zA-Z0-9_]*))'
|
|
r'(?:\s*\[\s*(?P<stringlength>\d+)\s*\])?'
|
|
r')'
|
|
r'(?:\s*:=\s*(?P<initval>[^;]*?))??\s*'
|
|
r';?\s*$',
|
|
re.IGNORECASE
|
|
)
|
|
self.array_dim_regex = re.compile(r'(\d+)\s*\.\.\s*(\d+)')
|
|
|
|
def _get_type_details(self, type_name_raw_cleaned: str) -> Tuple[int, int, bool, str]:
|
|
type_name_upper = type_name_raw_cleaned.upper()
|
|
if type_name_upper in S7_PRIMITIVE_SIZES:
|
|
size, align, is_bool = S7_PRIMITIVE_SIZES[type_name_upper]
|
|
return size, align, is_bool, type_name_upper
|
|
elif type_name_raw_cleaned in self.known_udts:
|
|
udt = self.known_udts[type_name_raw_cleaned]
|
|
return udt.total_size_in_bytes, 2, False, type_name_raw_cleaned
|
|
elif type_name_upper == "STRUCT":
|
|
return 0, 2, False, "STRUCT"
|
|
raise ValueError(f"Tipo de dato desconocido o UDT no definido: '{type_name_raw_cleaned}'")
|
|
|
|
@staticmethod
|
|
def _adjust_children_offsets(children: List[VariableInfo], base_offset_add: float):
|
|
for child in children:
|
|
child.byte_offset += base_offset_add
|
|
if child.byte_offset == float(int(child.byte_offset)):
|
|
child.byte_offset = float(int(child.byte_offset))
|
|
if child.children:
|
|
S7Parser._adjust_children_offsets(child.children, base_offset_add)
|
|
|
|
def _parse_struct_members(self, lines: List[str], current_line_idx: int,
|
|
parent_members_list: List[VariableInfo],
|
|
active_context: OffsetContext,
|
|
is_top_level_struct_in_block: bool = False) -> int:
|
|
idx_to_process = current_line_idx
|
|
while idx_to_process < len(lines):
|
|
original_line_text = lines[idx_to_process].strip()
|
|
line_to_parse = original_line_text
|
|
line_comment = None
|
|
comment_marker_idx = original_line_text.find("//")
|
|
if comment_marker_idx != -1:
|
|
line_to_parse = original_line_text[:comment_marker_idx].strip()
|
|
line_comment = original_line_text[comment_marker_idx + 2:].strip()
|
|
line_index_for_return = idx_to_process
|
|
idx_to_process += 1
|
|
if not line_to_parse: continue
|
|
|
|
is_nested_end_struct = self.end_struct_regex.match(line_to_parse) and not is_top_level_struct_in_block
|
|
is_main_block_end_struct = self.end_struct_regex.match(line_to_parse) and is_top_level_struct_in_block
|
|
is_block_terminator = is_top_level_struct_in_block and \
|
|
(self.end_type_regex.match(line_to_parse) or \
|
|
self.end_db_regex.match(line_to_parse) or \
|
|
self.begin_regex.match(line_to_parse))
|
|
|
|
if is_nested_end_struct:
|
|
active_context.align_to_byte()
|
|
if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1
|
|
return idx_to_process
|
|
if is_block_terminator:
|
|
active_context.align_to_byte()
|
|
if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1
|
|
return line_index_for_return
|
|
if is_main_block_end_struct:
|
|
pass
|
|
|
|
var_match = self.var_regex_simplified.match(line_to_parse)
|
|
if var_match:
|
|
var_data = var_match.groupdict()
|
|
raw_base_type_from_regex = var_data['basetype'].strip()
|
|
clean_data_type = raw_base_type_from_regex.strip('"')
|
|
udt_source_name_val = raw_base_type_from_regex if raw_base_type_from_regex.startswith('"') else None
|
|
var_info = VariableInfo(name=var_data['name'],
|
|
data_type=clean_data_type,
|
|
byte_offset=0, size_in_bytes=0,
|
|
udt_source_name=udt_source_name_val)
|
|
|
|
# Set element_type based on what we know about the variable
|
|
if var_data['arraydims']:
|
|
var_info.element_type = "ARRAY"
|
|
elif clean_data_type.upper() == "STRUCT":
|
|
var_info.element_type = "STRUCT"
|
|
elif udt_source_name_val:
|
|
var_info.element_type = "UDT_INSTANCE"
|
|
else:
|
|
var_info.element_type = "SIMPLE_VAR"
|
|
|
|
if var_data.get('initval'): var_info.initial_value = var_data['initval'].strip()
|
|
if line_comment: var_info.comment = line_comment
|
|
num_array_elements = 1
|
|
if var_data['arraydims']:
|
|
for dim_match in self.array_dim_regex.finditer(var_data['arraydims']):
|
|
var_info.array_dimensions.append(ArrayDimension(int(dim_match.group(1)), int(dim_match.group(2))))
|
|
if var_info.array_dimensions:
|
|
for dim in var_info.array_dimensions: num_array_elements *= dim.count
|
|
if var_info.data_type.upper() == "STRUCT":
|
|
active_context.align_to_word(); var_info.byte_offset = active_context.get_combined_offset()
|
|
nested_struct_context = OffsetContext()
|
|
idx_after_nested_struct = self._parse_struct_members(lines, idx_to_process, var_info.children, nested_struct_context, False)
|
|
var_info.size_in_bytes = nested_struct_context.byte_offset
|
|
for child in var_info.children:
|
|
child.byte_offset += var_info.byte_offset
|
|
if child.byte_offset == float(int(child.byte_offset)): child.byte_offset = float(int(child.byte_offset))
|
|
if child.children: S7Parser._adjust_children_offsets(child.children, var_info.byte_offset)
|
|
active_context.byte_offset += var_info.size_in_bytes; idx_to_process = idx_after_nested_struct
|
|
elif var_info.data_type.upper() == "STRING" and var_data['stringlength']:
|
|
var_info.string_length = int(var_data['stringlength']); unit_size = var_info.string_length + 2
|
|
active_context.align_to_word(); var_info.byte_offset = active_context.get_combined_offset()
|
|
var_info.size_in_bytes = unit_size * num_array_elements
|
|
active_context.byte_offset += var_info.size_in_bytes
|
|
else:
|
|
unit_size_bytes, unit_alignment_req, is_bool, type_name_for_udt_lookup = self._get_type_details(var_info.data_type)
|
|
if is_bool:
|
|
var_info.bit_size = 1; var_info.byte_offset = active_context.get_combined_offset()
|
|
active_context.advance_bits(num_array_elements)
|
|
start_byte_abs = int(var_info.byte_offset); start_bit_in_byte = int(round((var_info.byte_offset - start_byte_abs) * 10))
|
|
if num_array_elements == 1: var_info.size_in_bytes = 0
|
|
else:
|
|
bits_rem = num_array_elements; bytes_spanned = 0
|
|
if start_bit_in_byte > 0:
|
|
bits_in_first = 8 - start_bit_in_byte
|
|
if bits_rem <= bits_in_first: bytes_spanned = 1
|
|
else: bytes_spanned = 1; bits_rem -= bits_in_first; bytes_spanned += (bits_rem + 7) // 8
|
|
else: bytes_spanned = (bits_rem + 7) // 8
|
|
var_info.size_in_bytes = bytes_spanned
|
|
else:
|
|
active_context.align_to_byte()
|
|
if unit_alignment_req == 2: active_context.align_to_word()
|
|
var_info.byte_offset = active_context.get_combined_offset()
|
|
var_info.size_in_bytes = unit_size_bytes * num_array_elements
|
|
active_context.byte_offset += var_info.size_in_bytes
|
|
if type_name_for_udt_lookup in self.known_udts and not is_bool:
|
|
udt_def = self.known_udts[type_name_for_udt_lookup]; udt_instance_abs_start_offset = var_info.byte_offset
|
|
for udt_member_template in udt_def.members:
|
|
expanded_member = copy.deepcopy(udt_member_template); expanded_member.is_udt_expanded_member = True
|
|
expanded_member.byte_offset += udt_instance_abs_start_offset
|
|
if expanded_member.byte_offset == float(int(expanded_member.byte_offset)): expanded_member.byte_offset = float(int(expanded_member.byte_offset))
|
|
if expanded_member.children: S7Parser._adjust_children_offsets(expanded_member.children, udt_instance_abs_start_offset)
|
|
var_info.children.append(expanded_member)
|
|
parent_members_list.append(var_info)
|
|
elif line_to_parse and \
|
|
not self.struct_start_regex.match(line_to_parse) and \
|
|
not is_main_block_end_struct and \
|
|
not is_nested_end_struct and \
|
|
not is_block_terminator :
|
|
print(f"DEBUG (_parse_struct_members): Line not parsed: Original='{original_line_text}' | Processed='{line_to_parse}'")
|
|
return idx_to_process
|
|
|
|
def _parse_begin_block(self, lines: List[str], start_idx: int, db_info: DbInfo) -> int:
|
|
"""
|
|
Parsea el bloque BEGIN y aplica directamente los valores a las variables
|
|
correspondientes, calculando también offsets para elementos de arrays.
|
|
"""
|
|
idx = start_idx
|
|
assignment_regex = re.compile(r'^\s*(?P<path>.+?)\s*:=\s*(?P<value>.+?)\s*;?\s*$', re.IGNORECASE)
|
|
|
|
# Diccionario temporal para mapear rutas a variables
|
|
path_to_var_map = {}
|
|
|
|
# Función para calcular offset de elemento de array
|
|
def calculate_array_element_offset(var: VariableInfo, indices_str: str) -> float:
|
|
# Parsear los índices (pueden ser múltiples para arrays multidimensionales)
|
|
indices = [int(idx.strip()) for idx in indices_str.split(',')]
|
|
|
|
# Obtener dimensiones del array
|
|
dimensions = var.array_dimensions
|
|
if not dimensions or len(indices) != len(dimensions):
|
|
return var.byte_offset # No podemos calcular, devolver offset base
|
|
|
|
# Determinar tamaño de cada elemento base
|
|
element_size = 0
|
|
is_bit_array = False
|
|
|
|
if var.data_type.upper() == "BOOL":
|
|
is_bit_array = True
|
|
element_size = 0.1 # 0.1 byte = 1 bit (representación decimal)
|
|
elif var.data_type.upper() == "STRING" and var.string_length is not None:
|
|
element_size = var.string_length + 2
|
|
else:
|
|
# Para tipos primitivos y UDTs
|
|
data_type_upper = var.data_type.upper()
|
|
if data_type_upper in S7_PRIMITIVE_SIZES:
|
|
element_size = S7_PRIMITIVE_SIZES[data_type_upper][0]
|
|
elif var.data_type in self.known_udts:
|
|
element_size = self.known_udts[var.data_type].total_size_in_bytes
|
|
else:
|
|
# Si no podemos determinar tamaño, usar tamaño total / elementos
|
|
total_elements = 1
|
|
for dim in dimensions:
|
|
total_elements *= dim.count
|
|
if total_elements > 0 and var.size_in_bytes > 0:
|
|
element_size = var.size_in_bytes / total_elements
|
|
|
|
# Calcular offset para arrays multidimensionales
|
|
# Necesitamos calcular el índice lineal basado en índices multidimensionales
|
|
linear_index = 0
|
|
dimension_multiplier = 1
|
|
|
|
# Calcular desde la dimensión más interna a la más externa
|
|
# Los índices en S7 comienzan en las dimensiones a la izquierda
|
|
for i in range(len(indices)-1, -1, -1):
|
|
# Ajustar por el índice inicial de cada dimensión
|
|
adjusted_index = indices[i] - dimensions[i].lower_bound
|
|
linear_index += adjusted_index * dimension_multiplier
|
|
# Multiplicador para la siguiente dimensión
|
|
if i > 0: # No es necesario para la última iteración
|
|
dimension_multiplier *= dimensions[i].count
|
|
|
|
# Para arrays de bits, tenemos que calcular bit por bit
|
|
if is_bit_array:
|
|
base_byte = int(var.byte_offset)
|
|
base_bit = int(round((var.byte_offset - base_byte) * 10))
|
|
|
|
# Calcular nuevo bit y byte
|
|
new_bit = base_bit + linear_index
|
|
new_byte = base_byte + (new_bit // 8)
|
|
new_bit_position = new_bit % 8
|
|
|
|
return float(new_byte) + (float(new_bit_position) / 10.0)
|
|
else:
|
|
# Para tipos regulares, simplemente sumamos el offset lineal
|
|
return var.byte_offset + (linear_index * element_size)
|
|
|
|
# Construir mapa de rutas a variables
|
|
def build_path_map(members: List[VariableInfo], prefix: str = ""):
|
|
for var in members:
|
|
var_path = f"{prefix}{var.name}"
|
|
path_to_var_map[var_path] = var
|
|
|
|
# Para arrays, inicializar diccionario de elementos si es necesario
|
|
if var.array_dimensions:
|
|
var.current_element_values = {}
|
|
|
|
# Para variables con hijos, procesar recursivamente
|
|
if var.children:
|
|
build_path_map(var.children, f"{var_path}.")
|
|
|
|
# Construir el mapa antes de procesar el bloque BEGIN
|
|
build_path_map(db_info.members)
|
|
|
|
# Ahora procesar el bloque BEGIN
|
|
while idx < len(lines):
|
|
original_line = lines[idx].strip()
|
|
line_to_parse = original_line
|
|
comment_marker = original_line.find("//")
|
|
if comment_marker != -1:
|
|
line_to_parse = original_line[:comment_marker].strip()
|
|
|
|
if self.end_db_regex.match(line_to_parse):
|
|
break
|
|
|
|
idx += 1
|
|
if not line_to_parse:
|
|
continue
|
|
|
|
match = assignment_regex.match(line_to_parse)
|
|
if match:
|
|
path, value = match.group("path").strip(), match.group("value").strip().rstrip(';').strip()
|
|
|
|
# Distinguir entre asignación a elemento de array y variable normal
|
|
if '[' in path and ']' in path:
|
|
# Es un elemento de array
|
|
array_path = path[:path.find('[')]
|
|
indices = path[path.find('[')+1:path.find(']')]
|
|
|
|
if array_path in path_to_var_map:
|
|
var = path_to_var_map[array_path]
|
|
if var.current_element_values is None:
|
|
var.current_element_values = {}
|
|
|
|
# Calcular y guardar el offset real del elemento
|
|
element_offset = calculate_array_element_offset(var, indices)
|
|
|
|
# Guardar como un objeto con valor y offset
|
|
var.current_element_values[indices] = {
|
|
"value": value,
|
|
"offset": element_offset
|
|
}
|
|
elif path in path_to_var_map:
|
|
# Es una variable normal (o array completo)
|
|
var = path_to_var_map[path]
|
|
var.current_value = value
|
|
|
|
# También manejar rutas jerárquicas (e.g., MyStruct.MyField)
|
|
if '.' in path and '[' not in path: # Para simplificar, excluimos arrays con path jerárquico
|
|
parts = path.split('.')
|
|
current_path = ""
|
|
current_var = None
|
|
|
|
# Navegar por la jerarquía
|
|
for i, part in enumerate(parts):
|
|
if current_path:
|
|
current_path += f".{part}"
|
|
else:
|
|
current_path = part
|
|
|
|
if current_path in path_to_var_map:
|
|
current_var = path_to_var_map[current_path]
|
|
|
|
# Si es el último componente, asignar valor
|
|
if i == len(parts) - 1 and current_var:
|
|
current_var.current_value = value
|
|
|
|
# Propagar valores iniciales a variables sin asignación explícita
|
|
def propagate_initial_values(members: List[VariableInfo]):
|
|
for var in members:
|
|
# Si no tiene current_value pero tiene initial_value, copiar
|
|
if var.current_value is None and var.initial_value is not None:
|
|
var.current_value = var.initial_value
|
|
|
|
# Recursión para hijos
|
|
if var.children:
|
|
propagate_initial_values(var.children)
|
|
|
|
# Propagar valores iniciales
|
|
propagate_initial_values(db_info.members)
|
|
|
|
return idx
|
|
|
|
def parse_file(self, filepath: str) -> ParsedData:
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8-sig') as f: lines = f.readlines()
|
|
except Exception as e: print(f"Error al leer el archivo {filepath}: {e}"); return self.parsed_data
|
|
current_block_handler: Optional[Union[UdtInfo, DbInfo]] = None
|
|
active_block_context = OffsetContext(); parsing_title_value_next_line = False; idx = 0
|
|
while idx < len(lines):
|
|
original_line_text = lines[idx]; stripped_original_line = original_line_text.strip()
|
|
line_to_parse = stripped_original_line; comment_marker = stripped_original_line.find("//")
|
|
if comment_marker != -1: line_to_parse = stripped_original_line[:comment_marker].strip()
|
|
if parsing_title_value_next_line and isinstance(current_block_handler, DbInfo):
|
|
title_value_candidate = original_line_text.strip()
|
|
if title_value_candidate.startswith("{") and title_value_candidate.endswith("}"):
|
|
current_block_handler.title = title_value_candidate
|
|
else: print(f"Advertencia: Se esperaba valor de TITLE {{...}} pero se encontró: '{title_value_candidate}'")
|
|
parsing_title_value_next_line = False; idx += 1; continue
|
|
type_match = self.type_start_regex.match(line_to_parse); db_match = self.db_start_regex.match(line_to_parse)
|
|
if type_match:
|
|
udt_name = type_match.group(1); current_block_handler = UdtInfo(name=udt_name)
|
|
self.parsed_data.udts.append(current_block_handler); active_block_context = OffsetContext(); idx +=1; continue
|
|
elif db_match:
|
|
db_name = db_match.group(1); current_block_handler = DbInfo(name=db_name)
|
|
self.parsed_data.dbs.append(current_block_handler); active_block_context = OffsetContext(); idx +=1; continue
|
|
if not current_block_handler: idx +=1; continue
|
|
if line_to_parse.upper() == "TITLE =":
|
|
if isinstance(current_block_handler, DbInfo): parsing_title_value_next_line = True; idx += 1; continue
|
|
prop_match = self.property_regex.match(stripped_original_line)
|
|
struct_keyword_match = self.struct_start_regex.match(line_to_parse)
|
|
if prop_match and not parsing_title_value_next_line:
|
|
key, value = prop_match.group(1).upper(), prop_match.group(2).strip()
|
|
if value.endswith(';'): value = value[:-1].strip()
|
|
attr = key.lower()
|
|
if hasattr(current_block_handler, attr):
|
|
if attr == 'title' and current_block_handler.title is not None: pass
|
|
else: setattr(current_block_handler, attr, value)
|
|
elif struct_keyword_match and not current_block_handler.members:
|
|
idx = self._parse_struct_members(lines, idx + 1, current_block_handler.members, active_block_context, True); continue
|
|
elif self.begin_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo):
|
|
current_block_handler.total_size_in_bytes = active_block_context.byte_offset
|
|
idx = self._parse_begin_block(lines, idx + 1, current_block_handler); continue
|
|
elif self.end_type_regex.match(line_to_parse) and isinstance(current_block_handler, UdtInfo):
|
|
if current_block_handler.total_size_in_bytes == 0: current_block_handler.total_size_in_bytes = active_block_context.byte_offset
|
|
self.known_udts[current_block_handler.name] = current_block_handler
|
|
current_block_handler = None; parsing_title_value_next_line = False
|
|
elif self.end_db_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo):
|
|
if current_block_handler.total_size_in_bytes == 0 : current_block_handler.total_size_in_bytes = active_block_context.byte_offset
|
|
# Ya no necesitamos aplicar valores, porque se aplican directamente en _parse_begin_block
|
|
current_block_handler = None; parsing_title_value_next_line = False
|
|
idx += 1
|
|
return self.parsed_data
|
|
|
|
def custom_json_serializer(obj: Any) -> Any:
|
|
if isinstance(obj, OffsetContext): return None
|
|
if isinstance(obj, ArrayDimension):
|
|
return {
|
|
'lower_bound': obj.lower_bound,
|
|
'upper_bound': obj.upper_bound,
|
|
'count': obj.count
|
|
}
|
|
if hasattr(obj, '__dict__'):
|
|
d = {k: v for k, v in obj.__dict__.items()
|
|
if not (v is None or (isinstance(v, list) and not v))}
|
|
|
|
if isinstance(obj, VariableInfo):
|
|
if not obj.is_udt_expanded_member and 'is_udt_expanded_member' not in d:
|
|
d['is_udt_expanded_member'] = False
|
|
|
|
# Manejar current_element_values con format especial para offsets
|
|
if 'current_element_values' in d:
|
|
if not d['current_element_values']:
|
|
del d['current_element_values']
|
|
else:
|
|
# Asegurar que current_element_values se serializa correctamente
|
|
element_values = d['current_element_values']
|
|
if isinstance(element_values, dict):
|
|
# Preservar el formato {índice: {value, offset}}
|
|
d['current_element_values'] = element_values
|
|
|
|
return d
|
|
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable: {type(obj)}")
|
|
|
|
|
|
def format_address_for_display(byte_offset: float, bit_size: int = 0) -> str:
|
|
"""
|
|
Formatea correctamente la dirección para mostrar, preservando el índice del bit para BOOLs.
|
|
|
|
Args:
|
|
byte_offset: El offset en bytes (con parte decimal para bits)
|
|
bit_size: Tamaño en bits (>0 para BOOLs)
|
|
|
|
Returns:
|
|
String formateado como "X.Y" para bits o "X" para bytes completos
|
|
"""
|
|
if bit_size > 0:
|
|
# Para BOOL, extraer y mostrar el byte y bit exactos
|
|
byte_part = int(byte_offset)
|
|
# Multiplicamos por 10 y tomamos el entero para obtener el índice correcto del bit
|
|
bit_part = int(round((byte_offset - byte_part) * 10))
|
|
return f"{byte_part}.{bit_part}"
|
|
else:
|
|
# Para otros tipos, mostrar como entero si es un byte completo
|
|
if byte_offset == float(int(byte_offset)):
|
|
return str(int(byte_offset))
|
|
return f"{byte_offset:.1f}"
|
|
|
|
def compare_offsets(offset1: float, offset2: float) -> int:
|
|
"""
|
|
Compara dos offsets considerando tanto la parte del byte como la del bit.
|
|
|
|
Returns:
|
|
-1 si offset1 < offset2, 0 si son iguales, 1 si offset1 > offset2
|
|
"""
|
|
# Extraer partes de byte y bit
|
|
byte1 = int(offset1)
|
|
bit1 = int(round((offset1 - byte1) * 10))
|
|
|
|
byte2 = int(offset2)
|
|
bit2 = int(round((offset2 - byte2) * 10))
|
|
|
|
# Comparar primero por byte
|
|
if byte1 < byte2:
|
|
return -1
|
|
elif byte1 > byte2:
|
|
return 1
|
|
|
|
# Si bytes son iguales, comparar por bit
|
|
if bit1 < bit2:
|
|
return -1
|
|
elif bit1 > bit2:
|
|
return 1
|
|
|
|
# Son exactamente iguales
|
|
return 0
|
|
|
|
def calculate_array_element_offset(var: VariableInfo, indices_str: str) -> float:
|
|
"""
|
|
Calcula el offset exacto para un elemento de array basado en sus índices.
|
|
Maneja correctamente arrays de bits y multidimensionales.
|
|
|
|
Args:
|
|
var: Variable información del array
|
|
indices_str: String de índices (e.g. "1,2" para array bidimensional)
|
|
|
|
Returns:
|
|
Offset calculado como float, con parte decimal para bits
|
|
"""
|
|
# Parsear los índices (pueden ser múltiples para arrays multidimensionales)
|
|
indices = [int(idx.strip()) for idx in indices_str.split(',')]
|
|
|
|
# Obtener dimensiones del array
|
|
dimensions = var.array_dimensions
|
|
if not dimensions or len(indices) != len(dimensions):
|
|
return var.byte_offset # No podemos calcular, devolver offset base
|
|
|
|
# Determinar tamaño de cada elemento base
|
|
element_size = 0
|
|
is_bit_array = False
|
|
|
|
if var.data_type.upper() == "BOOL":
|
|
is_bit_array = True
|
|
element_size = 0.1 # 0.1 byte = 1 bit (representación decimal)
|
|
elif var.data_type.upper() == "STRING" and var.string_length is not None:
|
|
element_size = var.string_length + 2 # Para strings, sumar 2 bytes de cabecera
|
|
else:
|
|
# Para tipos primitivos y UDTs
|
|
data_type_upper = var.data_type.upper()
|
|
if data_type_upper in S7_PRIMITIVE_SIZES:
|
|
element_size = S7_PRIMITIVE_SIZES[data_type_upper][0]
|
|
elif var.data_type in self.known_udts:
|
|
element_size = self.known_udts[var.data_type].total_size_in_bytes
|
|
else:
|
|
# Si no podemos determinar tamaño, usar tamaño total / elementos
|
|
total_elements = 1
|
|
for dim in dimensions:
|
|
total_elements *= dim.count
|
|
if total_elements > 0 and var.size_in_bytes > 0:
|
|
element_size = var.size_in_bytes / total_elements
|
|
|
|
# Calcular offset para arrays multidimensionales
|
|
# En S7, los arrays se almacenan en orden Row-Major (la última dimensión varía más rápido)
|
|
linear_index = 0
|
|
dimension_multiplier = 1
|
|
|
|
# Calcular desde la dimensión más interna a la más externa
|
|
# Para S7, procesamos desde la última dimensión hacia la primera
|
|
for i in range(len(indices)-1, -1, -1):
|
|
# Ajustar por el índice inicial de cada dimensión
|
|
adjusted_index = indices[i] - dimensions[i].lower_bound
|
|
linear_index += adjusted_index * dimension_multiplier
|
|
# Multiplicador para la siguiente dimensión
|
|
if i > 0: # No es necesario para la última iteración
|
|
dimension_multiplier *= dimensions[i].count
|
|
|
|
# Calcular offset según tipo
|
|
if is_bit_array:
|
|
# Para arrays de bits, calcular bit por bit
|
|
base_byte = int(var.byte_offset)
|
|
base_bit = int(round((var.byte_offset - base_byte) * 10))
|
|
|
|
# Calcular nuevo bit y byte
|
|
new_bit = base_bit + linear_index
|
|
new_byte = base_byte + (new_bit // 8)
|
|
new_bit_position = new_bit % 8
|
|
|
|
# Formato S7: byte.bit con bit de 0-7
|
|
return float(new_byte) + (float(new_bit_position) / 10.0)
|
|
else:
|
|
# Para tipos regulares, simplemente sumar el offset lineal * tamaño elemento
|
|
return var.byte_offset + (linear_index * element_size)
|
|
|
|
|
|
def flatten_db_structure(db_info: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Función que aplana completamente una estructura de DB/UDT,
|
|
expandiendo todas las variables anidadas, UDTs y elementos de array.
|
|
Añade punteros jerárquicos para acceso directo a las variables originales.
|
|
|
|
Returns:
|
|
List[Dict]: Lista de variables aplanadas con todos sus atributos,
|
|
rutas completas y punteros jerárquicos, ordenada por offset.
|
|
"""
|
|
flat_variables = []
|
|
processed_ids = set() # Para evitar duplicados
|
|
|
|
def process_variable(var: Dict[str, Any], path_prefix: str = "", is_expansion: bool = False, hierarchy_path=None):
|
|
# Inicializar hierarchy_path si es None
|
|
if hierarchy_path is None:
|
|
hierarchy_path = []
|
|
|
|
# Identificador único para esta variable en este contexto
|
|
var_id = f"{path_prefix}{var['name']}_{var['byte_offset']}"
|
|
|
|
# Evitar procesar duplicados (como miembros expandidos de UDTs)
|
|
if is_expansion and var_id in processed_ids:
|
|
return
|
|
if is_expansion:
|
|
processed_ids.add(var_id)
|
|
|
|
# Crear copia de la variable con path completo
|
|
flat_var = var.copy()
|
|
flat_var["full_path"] = f"{path_prefix}{var['name']}"
|
|
flat_var["is_array_element"] = False # Por defecto no es elemento de array
|
|
|
|
# NUEVO: Guardar el camino jerárquico para acceso directo
|
|
flat_var["_hierarchy_path"] = copy.deepcopy(hierarchy_path)
|
|
|
|
# Preservar o inferir element_type
|
|
if "element_type" not in flat_var:
|
|
# Inferir tipo para compatibilidad hacia atrás
|
|
if var.get("array_dimensions"):
|
|
flat_var["element_type"] = "ARRAY"
|
|
elif var.get("children") and var["data_type"].upper() == "STRUCT":
|
|
flat_var["element_type"] = "STRUCT"
|
|
elif var.get("udt_source_name"):
|
|
flat_var["element_type"] = "UDT_INSTANCE"
|
|
else:
|
|
flat_var["element_type"] = "SIMPLE_VAR"
|
|
|
|
# Determinar si es un array con valores específicos
|
|
is_array = bool(var.get("array_dimensions"))
|
|
has_array_values = is_array and var.get("current_element_values")
|
|
|
|
# Si no es un array con valores específicos, agregar la variable base
|
|
if not has_array_values:
|
|
# Asegurarse de que el offset esté en el formato correcto
|
|
flat_var["address_display"] = format_address_for_display(var["byte_offset"], var.get("bit_size", 0))
|
|
flat_variables.append(flat_var)
|
|
|
|
# Si es un array con valores específicos, expandir cada elemento como variable individual
|
|
if has_array_values:
|
|
for idx, element_data in var.get("current_element_values", {}).items():
|
|
# Extraer valor y offset del elemento
|
|
if isinstance(element_data, dict) and "value" in element_data and "offset" in element_data:
|
|
# Nuevo formato con offset calculado
|
|
value = element_data["value"]
|
|
element_offset = element_data["offset"]
|
|
else:
|
|
# Compatibilidad con formato antiguo
|
|
value = element_data
|
|
element_offset = var["byte_offset"] # Offset base
|
|
|
|
# Crear una entrada por cada elemento del array
|
|
array_element = var.copy()
|
|
array_element["full_path"] = f"{path_prefix}{var['name']}[{idx}]"
|
|
array_element["is_array_element"] = True
|
|
array_element["array_index"] = idx
|
|
array_element["current_value"] = value
|
|
array_element["byte_offset"] = element_offset # Usar offset calculado
|
|
array_element["address_display"] = format_address_for_display(element_offset, var.get("bit_size", 0))
|
|
array_element["element_type"] = "ARRAY_ELEMENT"
|
|
|
|
# Para elementos de array, guardamos el camino al array + índice
|
|
array_element["_hierarchy_path"] = copy.deepcopy(hierarchy_path)
|
|
array_element["_array_index"] = idx # Para acceso directo al elemento específico
|
|
|
|
# Eliminar current_element_values para evitar redundancia
|
|
if "current_element_values" in array_element:
|
|
del array_element["current_element_values"]
|
|
|
|
flat_variables.append(array_element)
|
|
|
|
# Procesar recursivamente todos los hijos
|
|
if var.get("children"):
|
|
for i, child in enumerate(var.get("children", [])):
|
|
child_hierarchy = copy.deepcopy(hierarchy_path)
|
|
child_hierarchy.append({"type": "children", "index": i})
|
|
process_variable(
|
|
child,
|
|
f"{path_prefix}{var['name']}.",
|
|
is_expansion=bool(var.get("udt_source_name")),
|
|
hierarchy_path=child_hierarchy
|
|
)
|
|
|
|
# Procesar todos los miembros desde el nivel superior
|
|
for i, member in enumerate(db_info.get("members", [])):
|
|
process_variable(member, hierarchy_path=[{"type": "members", "index": i}])
|
|
|
|
# Ordenar estrictamente por offset byte.bit
|
|
flat_variables.sort(key=lambda x: (
|
|
int(x["byte_offset"]),
|
|
int(round((x["byte_offset"] - int(x["byte_offset"])) * 10))
|
|
))
|
|
|
|
return flat_variables
|
|
|
|
def access_by_hierarchy_path(root_obj: Dict[str, Any], hierarchy_path: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Accede directamente a un elemento usando su ruta jerárquica.
|
|
|
|
Args:
|
|
root_obj: Objeto raíz (generalmente un DB) donde comienza la navegación
|
|
hierarchy_path: Lista de pasos jerárquicos, cada uno con "type" e "index"
|
|
|
|
Returns:
|
|
El objeto encontrado o None si no se puede acceder
|
|
"""
|
|
if not hierarchy_path:
|
|
return None
|
|
|
|
current = root_obj
|
|
|
|
for path_step in hierarchy_path:
|
|
container_type = path_step["type"] # "members" o "children"
|
|
index = path_step["index"]
|
|
|
|
# Verificar que el contenedor existe
|
|
if container_type not in current:
|
|
print(f"Error: No se encontró el contenedor '{container_type}' en la ruta jerárquica")
|
|
return None
|
|
|
|
container = current[container_type]
|
|
|
|
# Verificar que el índice es válido
|
|
if not isinstance(container, list) or len(container) <= index:
|
|
print(f"Error: Índice {index} fuera de rango en la ruta jerárquica")
|
|
return None
|
|
|
|
# Navegar al siguiente nivel
|
|
current = container[index]
|
|
|
|
return current
|
|
|
|
if __name__ == "__main__":
|
|
working_dir = find_working_directory()
|
|
print(f"Using working directory: {working_dir}")
|
|
|
|
output_json_dir = os.path.join(working_dir, "json")
|
|
os.makedirs(output_json_dir, exist_ok=True)
|
|
print(f"Los archivos JSON de salida se guardarán en: {output_json_dir}")
|
|
|
|
source_files_db = glob.glob(os.path.join(working_dir, "*.db"))
|
|
source_files_awl = glob.glob(os.path.join(working_dir, "*.awl"))
|
|
all_source_files = source_files_db + source_files_awl
|
|
|
|
if not all_source_files:
|
|
print(f"No se encontraron archivos .db o .awl en {working_dir}")
|
|
else:
|
|
print(f"Archivos encontrados para procesar: {len(all_source_files)}")
|
|
|
|
for filepath in all_source_files:
|
|
parser = S7Parser()
|
|
filename = os.path.basename(filepath)
|
|
print(f"\n--- Procesando archivo: {filename} ---")
|
|
|
|
parsed_result = parser.parse_file(filepath)
|
|
|
|
output_filename_base = os.path.splitext(filename)[0]
|
|
json_output_filename = os.path.join(output_json_dir, f"{output_filename_base}.json")
|
|
|
|
print(f"Parseo completo. Intentando serializar a JSON: {json_output_filename}")
|
|
try:
|
|
json_output = json.dumps(parsed_result, default=custom_json_serializer, indent=2)
|
|
with open(json_output_filename, "w", encoding='utf-8') as f:
|
|
f.write(json_output)
|
|
print(f"Resultado guardado en: {json_output_filename}")
|
|
except Exception as e:
|
|
print(f"Error durante la serialización JSON o escritura del archivo {json_output_filename}: {e}")
|
|
|
|
print("\n--- Proceso completado ---") |