ParamManagerScripts/backend/script_groups/S7_DB_Utils/x4.py

243 lines
11 KiB
Python

# --- x4_refactored.py ---
import json
from typing import List, Dict, Any
import sys
import os
import glob
from x3 import flatten_db_structure
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
def find_working_directory():
configs = load_configuration()
working_directory = configs.get("working_directory")
if not working_directory:
print("No working directory specified in the configuration file.")
sys.exit(1)
return working_directory
def format_data_type_for_source(var_info: Dict[str, Any]) -> str:
base_type = var_info.get("udt_source_name") if var_info.get("udt_source_name") else var_info["data_type"]
type_str = ""
if var_info.get("array_dimensions"):
dims_str = ",".join([f"{d['lower_bound']}..{d['upper_bound']}" for d in var_info["array_dimensions"]])
type_str += f"ARRAY [{dims_str}] OF "
type_str += base_type
if var_info["data_type"].upper() == "STRING" and var_info.get("string_length") is not None:
type_str += f"[{var_info['string_length']}]"
return type_str
def generate_variable_declaration_for_source(var_info: Dict[str, Any], indent_level: int) -> str:
indent_str = " " * indent_level
type_declaration_str = format_data_type_for_source(var_info)
line = f'{indent_str}{var_info["name"]} : {type_declaration_str}'
if var_info.get("initial_value") is not None:
initial_val = var_info["initial_value"]
initial_val_str = "TRUE" if isinstance(initial_val, bool) and initial_val else \
"FALSE" if isinstance(initial_val, bool) and not initial_val else \
str(initial_val)
line += f' := {initial_val_str}'
is_multiline_struct_def = (var_info["data_type"].upper() == "STRUCT" and \
not var_info.get("udt_source_name") and \
var_info.get("children"))
if not is_multiline_struct_def:
line += ';'
if var_info.get("comment"):
line += f'\t// {var_info["comment"]}'
return line
def generate_struct_members_for_source(members: List[Dict[str, Any]], indent_level: int) -> List[str]:
lines = []
for var_info in members:
if var_info.get("is_udt_expanded_member"): continue
if var_info["data_type"].upper() == "STRUCT" and \
not var_info.get("udt_source_name") and \
var_info.get("children"):
current_indent_str = " " * indent_level
lines.append(f'{current_indent_str}{var_info["name"]} : STRUCT')
lines.extend(generate_struct_members_for_source(var_info["children"], indent_level + 1))
lines.append(f'{current_indent_str}END_STRUCT;')
else:
lines.append(generate_variable_declaration_for_source(var_info, indent_level))
return lines
def generate_begin_block_assignments(db_info: Dict[str, Any], indent_level: int) -> List[str]:
"""
Genera asignaciones del bloque BEGIN para todas las variables con valores actuales,
ordenadas estrictamente por offset (byte.bit).
"""
indent_str = " " * indent_level
lines = []
# Obtener todas las variables aplanadas y ordenadas
flat_vars = flatten_db_structure(db_info)
# Para cada variable en el orden correcto, generar la asignación
for var in flat_vars:
# Verificar que tenga un valor actual para asignar
if var.get("current_value") is not None:
value_str = str(var["current_value"])
# Convertir valores booleanos a TRUE/FALSE según estándar S7
if value_str.lower() == "true": value_str = "TRUE"
elif value_str.lower() == "false": value_str = "FALSE"
# Generar la línea de asignación
lines.append(f"{indent_str}{var['full_path']} := {value_str};")
return lines
def generate_markdown_table(db_info: Dict[str, Any]) -> List[str]:
"""
Genera una tabla markdown completa con offsets de bits correctos.
"""
lines = []
lines.append(f"## Documentación para DB: {db_info['name']}")
lines.append("")
lines.append("| Address | Name | Type | Initial Value | Actual Value | Comment |")
lines.append("|---|---|---|---|---|---|")
# Obtener todas las variables aplanadas (ya ordenadas por offset)
flat_vars = flatten_db_structure(db_info)
# Mostrar todas las variables, incluyendo elementos de array
for var in flat_vars:
# Usar el address_display pre-calculado
address = var["address_display"]
name_for_display = var["full_path"]
# Formatear tipo adecuadamente según sea variable normal o elemento de array
if var.get("is_array_element"):
# Para elementos de array, mostrar solo el tipo base
if "array_dimensions" in var:
# Si todavía tenemos información de array, eliminar la parte ARRAY[..]
base_type = var["data_type"]
data_type_str = base_type
else:
data_type_str = var["data_type"]
else:
# Para variables normales, mostrar tipo completo
data_type_str = format_data_type_for_source(var)
# Formatear valores para la tabla
initial_value = str(var.get("initial_value", "")).replace("|", "\\|").replace("\n", " ")
actual_value = str(var.get("current_value", "")).replace("|", "\\|").replace("\n", " ")
comment = str(var.get("comment", "")).replace("|", "\\|").replace("\n", " ")
lines.append(f"| {address} | {name_for_display} | {data_type_str} | {initial_value} | {actual_value} | {comment} |")
return lines
def generate_s7_source_code_lines(data: Dict[str, Any]) -> List[str]:
lines = []
for udt in data.get("udts", []):
lines.append(f'TYPE "{udt["name"]}"')
if udt.get("family"): lines.append(f' FAMILY : {udt["family"]}')
if udt.get("version"): lines.append(f' VERSION : {udt["version"]}')
lines.append("")
lines.append(" STRUCT")
lines.extend(generate_struct_members_for_source(udt["members"], 2))
lines.append(" END_STRUCT;")
lines.append(f'END_TYPE')
lines.append("")
for db in data.get("dbs", []):
lines.append(f'DATA_BLOCK "{db["name"]}"')
if db.get("title"):
lines.append(f' TITLE = {db["title"]}')
if db.get("family"): lines.append(f' FAMILY : {db["family"]}')
if db.get("version"): lines.append(f' VERSION : {db["version"]}')
lines.append("")
lines.append(" STRUCT")
lines.extend(generate_struct_members_for_source(db["members"], 2))
lines.append(" END_STRUCT;")
begin_assignments = generate_begin_block_assignments(db, 1)
if begin_assignments:
lines.append("BEGIN")
lines.extend(begin_assignments)
lines.append(f'END_DATA_BLOCK')
lines.append("")
return lines
def main():
working_dir = find_working_directory()
print(f"Using working directory: {working_dir}")
input_json_dir = os.path.join(working_dir, "json")
documentation_dir = os.path.join(working_dir, "documentation")
os.makedirs(documentation_dir, exist_ok=True)
print(f"Los archivos de documentación generados se guardarán en: {documentation_dir}")
json_files_to_process = glob.glob(os.path.join(input_json_dir, "*.json"))
if not json_files_to_process:
print(f"No se encontraron archivos .json en {input_json_dir}")
return
print(f"Archivos JSON encontrados para procesar: {len(json_files_to_process)}")
for json_input_filepath in json_files_to_process:
json_filename_base = os.path.splitext(os.path.basename(json_input_filepath))[0]
current_json_filename = os.path.basename(json_input_filepath)
print(f"\n--- Procesando archivo JSON: {current_json_filename} ---")
s7_output_filename = os.path.join(documentation_dir, f"{json_filename_base}.txt")
md_output_filename = os.path.join(documentation_dir, f"{json_filename_base}.md")
try:
with open(json_input_filepath, 'r', encoding='utf-8') as f:
data_from_json = json.load(f)
print(f"Archivo JSON '{current_json_filename}' cargado correctamente.")
except Exception as e:
print(f"Error al cargar/leer {current_json_filename}: {e}")
continue
# Generar archivo S7 (.txt)
s7_code_lines = generate_s7_source_code_lines(data_from_json)
try:
with open(s7_output_filename, 'w', encoding='utf-8') as f:
for line in s7_code_lines:
f.write(line + "\n")
print(f"Archivo S7 reconstruido generado: {s7_output_filename}")
except Exception as e:
print(f"Error al escribir el archivo S7 {s7_output_filename}: {e}")
# Generar archivo Markdown (.md) para todos los DBs en este JSON
all_db_markdown_lines = []
if data_from_json.get("dbs"):
all_db_markdown_lines.append(f"# Documentación S7 para {json_filename_base}")
all_db_markdown_lines.append(f"_Fuente JSON: {current_json_filename}_")
all_db_markdown_lines.append("")
for db_index, db_to_document in enumerate(data_from_json["dbs"]):
if db_index > 0:
all_db_markdown_lines.append("\n---\n")
markdown_lines_for_one_db = generate_markdown_table(db_to_document)
all_db_markdown_lines.extend(markdown_lines_for_one_db)
all_db_markdown_lines.append("")
try:
with open(md_output_filename, 'w', encoding='utf-8') as f:
for line in all_db_markdown_lines:
f.write(line + "\n")
print(f"Archivo Markdown de documentación generado: {md_output_filename}")
except Exception as e:
print(f"Error al escribir el archivo Markdown {md_output_filename}: {e}")
else:
print(f"No se encontraron DBs en {current_json_filename} para generar documentación Markdown.")
with open(md_output_filename, 'w', encoding='utf-8') as f:
f.write(f"# Documentación S7 para {json_filename_base}\n\n_Fuente JSON: {current_json_filename}_\n\nNo se encontraron Bloques de Datos (DBs) en este archivo JSON.\n")
print(f"Archivo Markdown generado (sin DBs): {md_output_filename}")
print("\n--- Proceso de generación de documentación completado ---")
if __name__ == "__main__":
main()