ParamManagerScripts/backend/script_groups/S7_DB_Utils/x3.py

409 lines
24 KiB
Python

# --- x3.py (Modificaciones v_final_4 - Incluye 'count' para ArrayDimension y ajuste debug) ---
import re
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Union, Tuple, Any
import os # Asegurarse de que os está importado
import glob # Para buscar archivos
import copy
import sys
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
def find_working_directory():
configs = load_configuration()
working_directory = configs.get("working_directory")
if not working_directory:
print("No working directory specified in the configuration file.")
sys.exit(1)
return working_directory
# --- Estructuras de Datos ---
@dataclass
class ArrayDimension:
lower_bound: int
upper_bound: int
@property
def count(self) -> int: # La propiedad 'count' se calculará
return self.upper_bound - self.lower_bound + 1
@dataclass
class VariableInfo: # Sin cambios respecto a v_final_3
name: str
data_type: str
byte_offset: float
size_in_bytes: int
bit_size: int = 0
udt_source_name: Optional[str] = None
string_length: Optional[int] = None
array_dimensions: List[ArrayDimension] = field(default_factory=list)
initial_value: Optional[str] = None
current_value: Optional[str] = None
comment: Optional[str] = None
children: List['VariableInfo'] = field(default_factory=list)
is_udt_expanded_member: bool = False
current_element_values: Optional[Dict[str, str]] = None
@dataclass
class UdtInfo: # Sin cambios respecto a v_final_3
name: str
family: Optional[str] = None
version: Optional[str] = None
members: List[VariableInfo] = field(default_factory=list)
total_size_in_bytes: int = 0
@dataclass
class DbInfo: # Sin cambios respecto a v_final_3
name: str
title: Optional[str] = None
family: Optional[str] = None
version: Optional[str] = None
members: List[VariableInfo] = field(default_factory=list)
total_size_in_bytes: int = 0
_begin_block_assignments_ordered: List[Tuple[str, str]] = field(default_factory=list)
_initial_values_from_begin_block: Dict[str, str] = field(default_factory=dict)
@dataclass
class ParsedData: # Sin cambios
udts: List[UdtInfo] = field(default_factory=list)
dbs: List[DbInfo] = field(default_factory=list)
@dataclass
class OffsetContext: # Sin cambios
byte_offset: int = 0
bit_offset: int = 0
def get_combined_offset(self) -> float:
if self.bit_offset == 0: return float(self.byte_offset)
return float(self.byte_offset * 10 + self.bit_offset) / 10.0
def advance_bits(self, num_bits: int):
self.bit_offset += num_bits; self.byte_offset += self.bit_offset // 8; self.bit_offset %= 8
def align_to_byte(self):
if self.bit_offset > 0: self.byte_offset += 1; self.bit_offset = 0
def align_to_word(self):
self.align_to_byte()
if self.byte_offset % 2 != 0: self.byte_offset += 1
# --- Fin Estructuras de Datos ---
S7_PRIMITIVE_SIZES = { # Sin cambios
"BOOL": (0, 1, True), "BYTE": (1, 1, False), "CHAR": (1, 1, False),
"SINT": (1, 1, False), "USINT": (1, 1, False), "WORD": (2, 2, False),
"INT": (2, 2, False), "UINT": (2, 2, False), "S5TIME": (2, 2, False),
"DATE": (2, 2, False), "DWORD": (4, 2, False), "DINT": (4, 2, False),
"UDINT": (4, 2, False), "REAL": (4, 2, False), "TIME": (4, 2, False),
"TIME_OF_DAY": (4, 2, False), "TOD": (4, 2, False),
"LREAL": (8, 2, False), "LINT": (8, 2, False), "ULINT": (8, 2, False),
"LWORD": (8, 2, False), "DATE_AND_TIME": (8, 2, False), "DT": (8, 2, False),
}
class S7Parser: # Sin cambios en __init__ respecto a v_final_3
def __init__(self):
self.parsed_data = ParsedData()
self.known_udts: Dict[str, UdtInfo] = {}
self.type_start_regex = re.compile(r'^\s*TYPE\s+"([^"]+)"', re.IGNORECASE)
self.db_start_regex = re.compile(r'^\s*DATA_BLOCK\s+"([^"]+)"', re.IGNORECASE)
self.property_regex = re.compile(r'^\s*([A-Z_]+)\s*:\s*(.+?)\s*(?://.*)?$', re.IGNORECASE)
self.struct_start_regex = re.compile(r'^\s*STRUCT\b', re.IGNORECASE)
self.end_struct_regex = re.compile(r'^\s*END_STRUCT\b', re.IGNORECASE)
self.end_type_regex = re.compile(r'^\s*END_TYPE\b', re.IGNORECASE)
self.end_db_regex = re.compile(r'^\s*END_DATA_BLOCK\b', re.IGNORECASE)
self.begin_regex = re.compile(r'^\s*BEGIN\b', re.IGNORECASE)
self.var_regex_simplified = re.compile(
r'^\s*(?P<name>[a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*'
r'(?P<typefull>'
r'(?:ARRAY\s*\[(?P<arraydims>[^\]]+?)\]\s*OF\s*)?'
r'(?P<basetype>(?:"[^"]+"|[a-zA-Z_][a-zA-Z0-9_]*))'
r'(?:\s*\[\s*(?P<stringlength>\d+)\s*\])?'
r')'
r'(?:\s*:=\s*(?P<initval>[^;]*?))??\s*'
r';?\s*$',
re.IGNORECASE
)
self.array_dim_regex = re.compile(r'(\d+)\s*\.\.\s*(\d+)')
def _get_type_details(self, type_name_raw_cleaned: str) -> Tuple[int, int, bool, str]: # Sin cambios
type_name_upper = type_name_raw_cleaned.upper()
if type_name_upper in S7_PRIMITIVE_SIZES:
size, align, is_bool = S7_PRIMITIVE_SIZES[type_name_upper]
return size, align, is_bool, type_name_upper
elif type_name_raw_cleaned in self.known_udts:
udt = self.known_udts[type_name_raw_cleaned]
return udt.total_size_in_bytes, 2, False, type_name_raw_cleaned
elif type_name_upper == "STRUCT":
return 0, 2, False, "STRUCT"
raise ValueError(f"Tipo de dato desconocido o UDT no definido: '{type_name_raw_cleaned}'")
@staticmethod
def _adjust_children_offsets(children: List[VariableInfo], base_offset_add: float): # Sin cambios
for child in children:
child.byte_offset += base_offset_add
if child.byte_offset == float(int(child.byte_offset)):
child.byte_offset = float(int(child.byte_offset))
if child.children:
S7Parser._adjust_children_offsets(child.children, base_offset_add)
def _parse_struct_members(self, lines: List[str], current_line_idx: int,
parent_members_list: List[VariableInfo],
active_context: OffsetContext,
is_top_level_struct_in_block: bool = False) -> int: # Ajuste en depuración
idx_to_process = current_line_idx
while idx_to_process < len(lines):
original_line_text = lines[idx_to_process].strip()
line_to_parse = original_line_text
line_comment = None
comment_marker_idx = original_line_text.find("//")
if comment_marker_idx != -1:
line_to_parse = original_line_text[:comment_marker_idx].strip()
line_comment = original_line_text[comment_marker_idx + 2:].strip()
line_index_for_return = idx_to_process
idx_to_process += 1
if not line_to_parse: continue
is_nested_end_struct = self.end_struct_regex.match(line_to_parse) and not is_top_level_struct_in_block
is_main_block_end_struct = self.end_struct_regex.match(line_to_parse) and is_top_level_struct_in_block
is_block_terminator = is_top_level_struct_in_block and \
(self.end_type_regex.match(line_to_parse) or \
self.end_db_regex.match(line_to_parse) or \
self.begin_regex.match(line_to_parse))
if is_nested_end_struct:
active_context.align_to_byte()
if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1
return idx_to_process
if is_block_terminator:
active_context.align_to_byte()
if active_context.byte_offset % 2 != 0: active_context.byte_offset += 1
return line_index_for_return
if is_main_block_end_struct: # Simplemente lo ignoramos aquí, será manejado por END_TYPE/DB
pass
var_match = self.var_regex_simplified.match(line_to_parse)
if var_match: # Lógica de var_match sin cambios respecto a v_final_3
var_data = var_match.groupdict()
raw_base_type_from_regex = var_data['basetype'].strip()
clean_data_type = raw_base_type_from_regex.strip('"')
udt_source_name_val = raw_base_type_from_regex if raw_base_type_from_regex.startswith('"') else None
var_info = VariableInfo(name=var_data['name'],
data_type=clean_data_type,
byte_offset=0, size_in_bytes=0,
udt_source_name=udt_source_name_val)
if var_data.get('initval'): var_info.initial_value = var_data['initval'].strip()
if line_comment: var_info.comment = line_comment
num_array_elements = 1
if var_data['arraydims']:
for dim_match in self.array_dim_regex.finditer(var_data['arraydims']):
var_info.array_dimensions.append(ArrayDimension(int(dim_match.group(1)), int(dim_match.group(2))))
if var_info.array_dimensions:
for dim in var_info.array_dimensions: num_array_elements *= dim.count
if var_info.data_type.upper() == "STRUCT":
active_context.align_to_word(); var_info.byte_offset = active_context.get_combined_offset()
nested_struct_context = OffsetContext()
idx_after_nested_struct = self._parse_struct_members(lines, idx_to_process, var_info.children, nested_struct_context, False)
var_info.size_in_bytes = nested_struct_context.byte_offset
for child in var_info.children:
child.byte_offset += var_info.byte_offset
if child.byte_offset == float(int(child.byte_offset)): child.byte_offset = float(int(child.byte_offset))
if child.children: S7Parser._adjust_children_offsets(child.children, var_info.byte_offset)
active_context.byte_offset += var_info.size_in_bytes; idx_to_process = idx_after_nested_struct
elif var_info.data_type.upper() == "STRING" and var_data['stringlength']:
var_info.string_length = int(var_data['stringlength']); unit_size = var_info.string_length + 2
active_context.align_to_word(); var_info.byte_offset = active_context.get_combined_offset()
var_info.size_in_bytes = unit_size * num_array_elements
active_context.byte_offset += var_info.size_in_bytes
else:
unit_size_bytes, unit_alignment_req, is_bool, type_name_for_udt_lookup = self._get_type_details(var_info.data_type)
if is_bool:
var_info.bit_size = 1; var_info.byte_offset = active_context.get_combined_offset()
active_context.advance_bits(num_array_elements)
start_byte_abs = int(var_info.byte_offset); start_bit_in_byte = int(round((var_info.byte_offset - start_byte_abs) * 10))
if num_array_elements == 1: var_info.size_in_bytes = 0
else:
bits_rem = num_array_elements; bytes_spanned = 0
if start_bit_in_byte > 0:
bits_in_first = 8 - start_bit_in_byte
if bits_rem <= bits_in_first: bytes_spanned = 1
else: bytes_spanned = 1; bits_rem -= bits_in_first; bytes_spanned += (bits_rem + 7) // 8
else: bytes_spanned = (bits_rem + 7) // 8
var_info.size_in_bytes = bytes_spanned
else:
active_context.align_to_byte()
if unit_alignment_req == 2: active_context.align_to_word()
var_info.byte_offset = active_context.get_combined_offset()
var_info.size_in_bytes = unit_size_bytes * num_array_elements
active_context.byte_offset += var_info.size_in_bytes
if type_name_for_udt_lookup in self.known_udts and not is_bool:
udt_def = self.known_udts[type_name_for_udt_lookup]; udt_instance_abs_start_offset = var_info.byte_offset
for udt_member_template in udt_def.members:
expanded_member = copy.deepcopy(udt_member_template); expanded_member.is_udt_expanded_member = True
expanded_member.byte_offset += udt_instance_abs_start_offset
if expanded_member.byte_offset == float(int(expanded_member.byte_offset)): expanded_member.byte_offset = float(int(expanded_member.byte_offset))
if expanded_member.children: S7Parser._adjust_children_offsets(expanded_member.children, udt_instance_abs_start_offset)
var_info.children.append(expanded_member)
parent_members_list.append(var_info)
# Ajuste de la condición del mensaje de depuración
elif line_to_parse and \
not self.struct_start_regex.match(line_to_parse) and \
not is_main_block_end_struct and \
not is_nested_end_struct and \
not is_block_terminator : # Solo imprimir si no es un terminador conocido
print(f"DEBUG (_parse_struct_members): Line not parsed: Original='{original_line_text}' | Processed='{line_to_parse}'")
return idx_to_process
def _parse_begin_block(self, lines: List[str], start_idx: int, db_info: DbInfo) -> int: # Sin cambios
idx = start_idx
assignment_regex = re.compile(r'^\s*(?P<path>.+?)\s*:=\s*(?P<value>.+?)\s*;?\s*$', re.IGNORECASE)
while idx < len(lines):
original_line = lines[idx].strip(); line_to_parse = original_line
comment_marker = original_line.find("//")
if comment_marker != -1: line_to_parse = original_line[:comment_marker].strip()
if self.end_db_regex.match(line_to_parse): return idx
idx += 1
if not line_to_parse: continue
match = assignment_regex.match(line_to_parse)
if match:
path, value = match.group("path").strip(), match.group("value").strip().rstrip(';').strip()
db_info._begin_block_assignments_ordered.append((path, value))
db_info._initial_values_from_begin_block[path] = value
raise SyntaxError("Se esperaba END_DATA_BLOCK después de la sección BEGIN.")
def _apply_current_values(self, members: List[VariableInfo], begin_values: Dict[str, str], current_path_prefix: str = ""): # Sin cambios
for var_info in members:
full_member_path = f"{current_path_prefix}{var_info.name}"
if var_info.array_dimensions:
var_info.current_element_values = {}
prefix_for_search = full_member_path + "["
for key_in_begin, val_in_begin in begin_values.items():
if key_in_begin.startswith(prefix_for_search) and key_in_begin.endswith("]"):
try:
indices_str = key_in_begin[len(prefix_for_search):-1]
var_info.current_element_values[indices_str] = val_in_begin
except: print(f"Advertencia: No se pudo parsear el índice para: {key_in_begin}")
if not var_info.current_element_values: var_info.current_element_values = None
if full_member_path in begin_values: var_info.current_value = begin_values[full_member_path]
elif full_member_path in begin_values: var_info.current_value = begin_values[full_member_path]
elif var_info.initial_value is not None: var_info.current_value = var_info.initial_value
if var_info.children and not var_info.is_udt_expanded_member:
self._apply_current_values(var_info.children, begin_values, f"{full_member_path}.")
elif var_info.udt_source_name and var_info.children:
self._apply_current_values(var_info.children, begin_values, f"{full_member_path}.")
def parse_file(self, filepath: str) -> ParsedData: # Sin cambios respecto a v_final_3
try:
with open(filepath, 'r', encoding='utf-8-sig') as f: lines = f.readlines()
except Exception as e: print(f"Error al leer el archivo {filepath}: {e}"); return self.parsed_data
current_block_handler: Optional[Union[UdtInfo, DbInfo]] = None
active_block_context = OffsetContext(); parsing_title_value_next_line = False; idx = 0
while idx < len(lines):
original_line_text = lines[idx]; stripped_original_line = original_line_text.strip()
line_to_parse = stripped_original_line; comment_marker = stripped_original_line.find("//")
if comment_marker != -1: line_to_parse = stripped_original_line[:comment_marker].strip()
if parsing_title_value_next_line and isinstance(current_block_handler, DbInfo):
title_value_candidate = original_line_text.strip()
if title_value_candidate.startswith("{") and title_value_candidate.endswith("}"):
current_block_handler.title = title_value_candidate
else: print(f"Advertencia: Se esperaba valor de TITLE {{...}} pero se encontró: '{title_value_candidate}'")
parsing_title_value_next_line = False; idx += 1; continue
type_match = self.type_start_regex.match(line_to_parse); db_match = self.db_start_regex.match(line_to_parse)
if type_match:
udt_name = type_match.group(1); current_block_handler = UdtInfo(name=udt_name)
self.parsed_data.udts.append(current_block_handler); active_block_context = OffsetContext(); idx +=1; continue
elif db_match:
db_name = db_match.group(1); current_block_handler = DbInfo(name=db_name)
self.parsed_data.dbs.append(current_block_handler); active_block_context = OffsetContext(); idx +=1; continue
if not current_block_handler: idx +=1; continue
if line_to_parse.upper() == "TITLE =":
if isinstance(current_block_handler, DbInfo): parsing_title_value_next_line = True; idx += 1; continue
prop_match = self.property_regex.match(stripped_original_line)
struct_keyword_match = self.struct_start_regex.match(line_to_parse)
if prop_match and not parsing_title_value_next_line:
key, value = prop_match.group(1).upper(), prop_match.group(2).strip()
if value.endswith(';'): value = value[:-1].strip()
attr = key.lower()
if hasattr(current_block_handler, attr):
if attr == 'title' and current_block_handler.title is not None: pass
else: setattr(current_block_handler, attr, value)
elif struct_keyword_match and not current_block_handler.members:
idx = self._parse_struct_members(lines, idx + 1, current_block_handler.members, active_block_context, True); continue
elif self.begin_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo):
current_block_handler.total_size_in_bytes = active_block_context.byte_offset
idx = self._parse_begin_block(lines, idx + 1, current_block_handler); continue
elif self.end_type_regex.match(line_to_parse) and isinstance(current_block_handler, UdtInfo):
if current_block_handler.total_size_in_bytes == 0: current_block_handler.total_size_in_bytes = active_block_context.byte_offset
self.known_udts[current_block_handler.name] = current_block_handler
# print(f"Parsed UDT: {current_block_handler.name}, Size: {current_block_handler.total_size_in_bytes}b, Members: {len(current_block_handler.members)}")
current_block_handler = None; parsing_title_value_next_line = False
elif self.end_db_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo):
if current_block_handler.total_size_in_bytes == 0 : current_block_handler.total_size_in_bytes = active_block_context.byte_offset
self._apply_current_values(current_block_handler.members, current_block_handler._initial_values_from_begin_block)
# print(f"Parsed DB: {current_block_handler.name}, Decl.Size: {current_block_handler.total_size_in_bytes}b, Members: {len(current_block_handler.members)}, BEGIN assigns: {len(current_block_handler._begin_block_assignments_ordered)}")
current_block_handler = None; parsing_title_value_next_line = False
idx += 1
return self.parsed_data
def custom_json_serializer(obj: Any) -> Any:
if isinstance(obj, OffsetContext): return None
# Manejar ArrayDimension explícitamente para incluir 'count'
if isinstance(obj, ArrayDimension):
return {
'lower_bound': obj.lower_bound,
'upper_bound': obj.upper_bound,
'count': obj.count # La propiedad se calcula y se añade aquí
}
if hasattr(obj, '__dict__'):
d = {k: v for k, v in obj.__dict__.items()
if not (v is None or (isinstance(v, list) and not v))} # No filtrar _initial_values_from_begin_block
if isinstance(obj, VariableInfo):
if not obj.is_udt_expanded_member and 'is_udt_expanded_member' not in d :
d['is_udt_expanded_member'] = False
if not obj.current_element_values and 'current_element_values' in d:
del d['current_element_values']
if isinstance(obj, DbInfo): # Asegurar que las listas vacías no se omitan si el campo existe
if '_begin_block_assignments_ordered' not in d and obj._begin_block_assignments_ordered == []:
d['_begin_block_assignments_ordered'] = [] # Mantener lista vacía si es el caso
if '_initial_values_from_begin_block' not in d and obj._initial_values_from_begin_block == {}:
d['_initial_values_from_begin_block'] = {} # Mantener dict vacío si es el caso
return d
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable: {type(obj)}")
if __name__ == "__main__":
working_dir = find_working_directory()
print(f"Using working directory: {working_dir}")
output_json_dir = os.path.join(working_dir, "json")
os.makedirs(output_json_dir, exist_ok=True)
print(f"Los archivos JSON de salida se guardarán en: {output_json_dir}")
source_files_db = glob.glob(os.path.join(working_dir, "*.db"))
source_files_awl = glob.glob(os.path.join(working_dir, "*.awl"))
all_source_files = source_files_db + source_files_awl
if not all_source_files:
print(f"No se encontraron archivos .db o .awl en {working_dir}")
else:
print(f"Archivos encontrados para procesar: {len(all_source_files)}")
for filepath in all_source_files:
parser = S7Parser() # Nueva instancia para cada archivo para evitar estados residuales
filename = os.path.basename(filepath)
print(f"\n--- Procesando archivo: {filename} ---")
parsed_result = parser.parse_file(filepath)
output_filename_base = os.path.splitext(filename)[0]
json_output_filename = os.path.join(output_json_dir, f"{output_filename_base}.json")
print(f"Parseo completo. Intentando serializar a JSON: {json_output_filename}")
try:
json_output = json.dumps(parsed_result, default=custom_json_serializer, indent=2)
with open(json_output_filename, "w", encoding='utf-8') as f:
f.write(json_output)
print(f"Resultado guardado en: {json_output_filename}")
except Exception as e:
print(f"Error durante la serialización JSON o escritura del archivo {json_output_filename}: {e}")
print("\n--- Proceso completado ---")