ParamManagerScripts/backend/script_groups/S7_DB_Utils/x7_value_updater.py

393 lines
16 KiB
Python

# --- x7.py ---
import json
import os
import glob
import sys
import copy
from typing import Dict, List, Tuple, Any, Optional
# Importar load_configuration desde backend.script_utils
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Importar lo necesario desde x3.py
sys.path.append(os.path.dirname(__file__))
from x3 import S7Parser, find_working_directory, custom_json_serializer, ParsedData
def find_matching_files(working_dir: str) -> List[Tuple[str, str]]:
"""
Busca pares de archivos _data y _format con extensión .db o .awl.
"""
# Buscar archivos _data
data_files_db = glob.glob(os.path.join(working_dir, "*_data.db"))
data_files_awl = glob.glob(os.path.join(working_dir, "*_data.awl"))
all_data_files = data_files_db + data_files_awl
# Buscar archivos _format
format_files_db = glob.glob(os.path.join(working_dir, "*_format.db"))
format_files_awl = glob.glob(os.path.join(working_dir, "*_format.awl"))
all_format_files = format_files_db + format_files_awl
# Emparejar archivos _data y _format
matched_pairs = []
for data_file in all_data_files:
base_name = os.path.basename(data_file).replace("_data", "").split('.')[0]
format_candidates = [f for f in all_format_files if os.path.basename(f).startswith(f"{base_name}_format")]
if format_candidates:
matched_pairs.append((data_file, format_candidates[0]))
return matched_pairs
def parse_files_to_json(data_file: str, format_file: str, json_dir: str) -> Tuple[Dict, Dict]:
"""
Parsea los archivos _data y _format usando S7Parser y guarda los resultados como JSON.
"""
# Instancias separadas del parser para cada archivo
data_parser = S7Parser()
format_parser = S7Parser()
print(f"Parseando archivo data: {os.path.basename(data_file)}")
data_result = data_parser.parse_file(data_file)
print(f"Parseando archivo format: {os.path.basename(format_file)}")
format_result = format_parser.parse_file(format_file)
# Guardar resultados como JSON
data_base = os.path.splitext(os.path.basename(data_file))[0]
format_base = os.path.splitext(os.path.basename(format_file))[0]
data_json_path = os.path.join(json_dir, f"{data_base}.json")
format_json_path = os.path.join(json_dir, f"{format_base}.json")
# Serializar y guardar como JSON
data_json = json.dumps(data_result, default=custom_json_serializer, indent=2)
format_json = json.dumps(format_result, default=custom_json_serializer, indent=2)
with open(data_json_path, "w", encoding='utf-8') as f:
f.write(data_json)
with open(format_json_path, "w", encoding='utf-8') as f:
f.write(format_json)
print(f"Archivos JSON generados: {os.path.basename(data_json_path)} y {os.path.basename(format_json_path)}")
# Cargar de nuevo como objetos para procesamiento
data_obj = json.loads(data_json)
format_obj = json.loads(format_json)
return data_obj, format_obj
def create_offset_path_map(members: List[Dict], path_prefix: str = "") -> Dict[float, str]:
"""
Crea un mapa que asocia cada offset con la ruta completa de la variable.
Esto será usado para actualizar las asignaciones del bloque BEGIN.
"""
offset_to_path = {}
def process_member(member: Dict, current_path_prefix: str):
offset = member["byte_offset"]
full_path = f"{current_path_prefix}{member['name']}"
# Mapear offset a ruta completa
offset_to_path[offset] = full_path
# Procesar hijos recursivamente
if "children" in member and member["children"]:
for child in member["children"]:
process_member(child, f"{full_path}.")
# Procesar todos los miembros
for member in members:
process_member(member, path_prefix)
return offset_to_path
def flatten_variables_by_offset(data: Dict) -> Dict[float, Dict]:
"""
Aplana completamente todas las variables por offset, similar a flatten_members_for_markdown.
Incluye UDTs expandidos, estructuras anidadas, etc.
"""
offset_map = {}
processed_expanded_members = set()
def process_members(members: List[Dict], prefix: str = "", is_expansion: bool = False):
for var_idx, var in enumerate(members):
# Control para miembros UDT expandidos (evitar duplicados)
member_id = f"{prefix}{var['name']}_{var_idx}"
if is_expansion and member_id in processed_expanded_members:
continue
if is_expansion:
processed_expanded_members.add(member_id)
# Extraer offset e información de la variable
offset = var["byte_offset"]
var_info = {
"path": f"{prefix}{var['name']}",
"data_type": var["data_type"],
"size_in_bytes": var["size_in_bytes"],
"bit_size": var.get("bit_size", 0),
"initial_value": var.get("initial_value"),
"current_value": var.get("current_value"),
"current_element_values": var.get("current_element_values")
}
# Guardar en mapa por offset
offset_map[offset] = var_info
# Procesar recursivamente los hijos
if "children" in var and var["children"]:
process_members(
var["children"],
f"{prefix}{var['name']}.",
is_expansion=bool(var.get("udt_source_name"))
)
# Procesar todos los DBs
for db in data.get("dbs", []):
process_members(db.get("members", []))
return offset_map
def create_path_to_offset_map(members: List[Dict], path_prefix: str = "") -> Dict[str, float]:
"""
Crea un mapa que asocia cada ruta (path) completa con su offset.
Esto será usado para actualizar las asignaciones del bloque BEGIN.
"""
path_to_offset = {}
processed_expanded_members = set()
def process_member(member: Dict, current_path_prefix: str, is_expansion: bool = False):
member_id = f"{current_path_prefix}{member['name']}"
# Evitar duplicados para miembros expandidos de UDTs
if is_expansion and member_id in processed_expanded_members:
return
if is_expansion:
processed_expanded_members.add(member_id)
offset = member["byte_offset"]
path = f"{current_path_prefix}{member['name']}"
# Mapear ruta a offset
path_to_offset[path] = offset
# Para arrays, también mapear rutas con índices si hay valores iniciales
if member.get("array_dimensions") and member.get("current_element_values"):
for index in member["current_element_values"].keys():
array_path = f"{path}[{index}]"
path_to_offset[array_path] = offset
# Procesar hijos recursivamente
if "children" in member and member["children"]:
for child in member["children"]:
process_member(
child,
f"{path}.",
is_expansion=bool(member.get("udt_source_name"))
)
# Procesar todos los miembros
for member in members:
process_member(member, path_prefix)
return path_to_offset
def compare_structures_by_offset(data_vars: Dict[float, Dict], format_vars: Dict[float, Dict]) -> Tuple[bool, List[str]]:
"""
Compara variables por offset, verificando compatibilidad.
"""
issues = []
# Recopilar todos los offsets únicos de ambos conjuntos
all_offsets = sorted(set(list(data_vars.keys()) + list(format_vars.keys())))
# Verificar que todos los offsets existan en ambos conjuntos
for offset in all_offsets:
if offset not in data_vars:
issues.append(f"Offset {offset} existe en _format pero no en _data")
continue
if offset not in format_vars:
issues.append(f"Offset {offset} existe en _data pero no en _format")
continue
# Verificar coincidencia de tipos
data_type = data_vars[offset]["data_type"].upper()
format_type = format_vars[offset]["data_type"].upper()
if data_type != format_type:
issues.append(f"Tipo de dato diferente en offset {offset}: {data_type} ({data_vars[offset]['path']}) vs {format_type} ({format_vars[offset]['path']})")
# Verificar tamaño
data_size = data_vars[offset]["size_in_bytes"]
format_size = format_vars[offset]["size_in_bytes"]
if data_size != format_size:
issues.append(f"Tamaño diferente en offset {offset}: {data_size} bytes ({data_vars[offset]['path']}) vs {format_size} bytes ({format_vars[offset]['path']})")
# Verificar tamaño en bits para BOOLs
data_bit_size = data_vars[offset]["bit_size"]
format_bit_size = format_vars[offset]["bit_size"]
if data_bit_size != format_bit_size:
issues.append(f"Tamaño en bits diferente en offset {offset}: {data_bit_size} ({data_vars[offset]['path']}) vs {format_bit_size} ({format_vars[offset]['path']})")
return len(issues) == 0, issues
def update_values_recursive(target_member: Dict, data_offset_map: Dict[float, Dict]):
"""
Actualiza los valores de target_member con valores de data_offset_map basado en offset.
"""
offset = target_member["byte_offset"]
# Si encontramos una variable con el mismo offset en _data, tomar sus valores
if offset in data_offset_map:
data_var = data_offset_map[offset]
# Actualizar initial_value
if "initial_value" in data_var and data_var["initial_value"] is not None:
target_member["initial_value"] = data_var["initial_value"]
# Actualizar current_value
if "current_value" in data_var and data_var["current_value"] is not None:
target_member["current_value"] = data_var["current_value"]
# Actualizar current_element_values (para arrays)
if "current_element_values" in data_var and data_var["current_element_values"]:
target_member["current_element_values"] = data_var["current_element_values"]
# Actualizar recursivamente los hijos
if "children" in target_member and target_member["children"]:
for child in target_member["children"]:
update_values_recursive(child, data_offset_map)
def create_updated_json(data_json: Dict, format_json: Dict) -> Dict:
"""
Crea JSON actualizado basado en la estructura de _format con valores de _data.
"""
# Copia profunda de format_json para no modificar el original
updated_json = copy.deepcopy(format_json)
# Extraer todas las variables flat por offset
data_offset_map = flatten_variables_by_offset(data_json)
# Crear mapas de offsets y rutas para cada BD
db_maps = {}
for db_idx, db in enumerate(format_json.get("dbs", [])):
db_name = db["name"]
db_maps[db_name] = {
"offset_to_path": create_offset_path_map(db.get("members", [])),
"path_to_offset": create_path_to_offset_map(db.get("members", []))
}
# Actualizar valores de variables en la estructura de formato
for db_idx, db in enumerate(updated_json.get("dbs", [])):
for member in db.get("members", []):
update_values_recursive(member, data_offset_map)
# Actualizar también asignaciones del bloque BEGIN
db_name = db["name"]
data_db = next((d for d in data_json.get("dbs", []) if d["name"] == db_name), None)
if data_db and "_begin_block_assignments_ordered" in data_db:
# Obtener los mapas para este DB
offset_to_path = db_maps[db_name]["offset_to_path"]
# Crear una nueva lista de asignaciones con las rutas correctas
updated_assignments = []
# Para cada asignación en los datos de origen
for path, value in data_db["_begin_block_assignments_ordered"]:
# Búsqueda por offset si es posible
data_db_path_to_offset = create_path_to_offset_map(data_db.get("members", []))
if path in data_db_path_to_offset:
# Obtener el offset de la ruta original
offset = data_db_path_to_offset[path]
# Buscar la ruta correspondiente en el formato usando el offset
if offset in offset_to_path:
new_path = offset_to_path[offset]
updated_assignments.append([new_path, value])
print(f"Mapeando {path} -> {new_path} (offset {offset})")
else:
print(f"Advertencia: No se encontró un mapeo para el offset {offset} ({path})")
else:
print(f"Advertencia: No se pudo determinar el offset para la ruta {path}")
# Actualizar asignaciones en el JSON actualizado
db["_begin_block_assignments_ordered"] = updated_assignments
# También actualizar el diccionario _initial_values_from_begin_block
if "_initial_values_from_begin_block" in data_db:
updated_values = {}
for path, value in updated_assignments:
updated_values[path] = value
db["_initial_values_from_begin_block"] = updated_values
return updated_json
def main():
# Obtener directorio de trabajo
working_dir = find_working_directory()
print(f"Using working directory: {working_dir}")
# Crear directorio para JSON si no existe
output_json_dir = os.path.join(working_dir, "json")
os.makedirs(output_json_dir, exist_ok=True)
print(f"Los archivos JSON se guardarán en: {output_json_dir}")
# Buscar pares de archivos _data y _format
matched_pairs = find_matching_files(working_dir)
if not matched_pairs:
print("No se encontraron pares de archivos _data y _format para procesar.")
return
print(f"Se encontraron {len(matched_pairs)} pares de archivos para procesar.")
for data_file, format_file in matched_pairs:
print(f"\n--- Procesando par de archivos ---")
print(f"Data file: {os.path.basename(data_file)}")
print(f"Format file: {os.path.basename(format_file)}")
# Parsear archivos a JSON
data_json, format_json = parse_files_to_json(data_file, format_file, output_json_dir)
# Aplanar variables por offset
print("Aplanando variables por offset...")
data_offset_map = flatten_variables_by_offset(data_json)
format_offset_map = flatten_variables_by_offset(format_json)
# Comparar estructuras usando offset como clave
print(f"Comparando estructuras: {len(data_offset_map)} variables en _data, {len(format_offset_map)} variables en _format")
compatible, issues = compare_structures_by_offset(data_offset_map, format_offset_map)
if not compatible:
print("\nSe encontraron problemas de compatibilidad entre los archivos:")
for issue in issues:
print(f" - {issue}")
print("\nAbortando el proceso para este par de archivos.")
continue
print("\nLos archivos son compatibles. Creando el archivo _updated...")
# Crear JSON actualizado usando el mapa de offsets de _data
updated_json = create_updated_json(data_json, format_json)
# Guardar la versión actualizada
base_name = os.path.basename(format_file).replace("_format", "").split('.')[0]
updated_json_path = os.path.join(output_json_dir, f"{base_name}_updated.json")
with open(updated_json_path, "w", encoding='utf-8') as f:
json.dump(updated_json, f, default=custom_json_serializer, indent=2)
print(f"Archivo _updated generado: {updated_json_path}")
print("\n--- Proceso completado ---")
if __name__ == "__main__":
main()