Simatic_XML_Parser_to_SCL/ToUpload/x3_generate_scl.py

553 lines
22 KiB
Python

# x3_generate_scl.py
# -*- coding: utf-8 -*-
import json
import os
import re
import argparse
import sys
import traceback # Importar traceback para errores
# --- Importar Utilidades y Constantes (Asumiendo ubicación) ---
try:
# Intenta importar desde el paquete de procesadores si está estructurado así
from processors.processor_utils import format_variable_name
# Definir SCL_SUFFIX aquí o importarlo si está centralizado
SCL_SUFFIX = "_sympy_processed" # Asegúrate que coincida con x2_process.py
GROUPED_COMMENT = (
"// Logic included in grouped IF" # Opcional, si se usa para filtrar
)
except ImportError:
print(
"Advertencia: No se pudo importar 'format_variable_name' desde processors.processor_utils."
)
print(
"Usando una implementación local básica (¡PUEDE FALLAR CON NOMBRES COMPLEJOS!)."
)
# Implementación local BÁSICA como fallback (MENOS RECOMENDADA)
def format_variable_name(name):
if not name:
return "_INVALID_NAME_"
if name.startswith('"') and name.endswith('"'):
return name # Mantener comillas
prefix = "#" if name.startswith("#") else ""
if prefix:
name = name[1:]
if name and name[0].isdigit():
name = "_" + name
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
SCL_SUFFIX = "_sympy_processed"
GROUPED_COMMENT = "// Logic included in grouped IF"
# para formatear valores iniciales
def format_scl_start_value(value, datatype):
"""Formatea un valor para la inicialización SCL según el tipo."""
if value is None:
return None
datatype_lower = datatype.lower() if datatype else ""
value_str = str(value)
if "bool" in datatype_lower:
return "TRUE" if value_str.lower() == "true" else "FALSE"
elif "string" in datatype_lower:
escaped_value = value_str.replace("'", "''")
if escaped_value.startswith("'") and escaped_value.endswith("'"):
escaped_value = escaped_value[1:-1]
return f"'{escaped_value}'"
elif "char" in datatype_lower: # Añadido Char
escaped_value = value_str.replace("'", "''")
if escaped_value.startswith("'") and escaped_value.endswith("'"):
escaped_value = escaped_value[1:-1]
return f"'{escaped_value}'"
elif any(
t in datatype_lower
for t in [
"int",
"byte",
"word",
"dint",
"dword",
"lint",
"lword",
"sint",
"usint",
"uint",
"udint",
"ulint",
]
): # Ampliado
try:
return str(int(value_str))
except ValueError:
if re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", value_str):
return value_str
return f"'{value_str}'" # O como string si no es entero ni símbolo
elif "real" in datatype_lower or "lreal" in datatype_lower:
try:
f_val = float(value_str)
s_val = str(f_val)
if "." not in s_val and "e" not in s_val.lower():
s_val += ".0"
return s_val
except ValueError:
if re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", value_str):
return value_str
return f"'{value_str}'"
elif "time" in datatype_lower: # Añadido Time, S5Time, LTime
# Quitar T#, LT#, S5T# si existen
prefix = ""
if value_str.upper().startswith("T#"):
prefix = "T#"
value_str = value_str[2:]
elif value_str.upper().startswith("LT#"):
prefix = "LT#"
value_str = value_str[3:]
elif value_str.upper().startswith("S5T#"):
prefix = "S5T#"
value_str = value_str[4:]
# Devolver con el prefijo correcto o T# por defecto si no había
if prefix:
return f"{prefix}{value_str}"
elif "s5time" in datatype_lower:
return f"S5T#{value_str}"
elif "ltime" in datatype_lower:
return f"LT#{value_str}"
else:
return f"T#{value_str}" # Default a TIME
elif "date" in datatype_lower: # Añadido Date, DT, TOD
if value_str.upper().startswith("D#"):
return value_str
elif "dt" in datatype_lower or "date_and_time" in datatype_lower:
if value_str.upper().startswith("DT#"):
return value_str
else:
return f"DT#{value_str}" # Añadir prefijo DT#
elif "tod" in datatype_lower or "time_of_day" in datatype_lower:
if value_str.upper().startswith("TOD#"):
return value_str
else:
return f"TOD#{value_str}" # Añadir prefijo TOD#
else:
return f"D#{value_str}" # Default a Date
# Fallback genérico
else:
if re.match(
r'^[a-zA-Z_][a-zA-Z0-9_."#\[\]]+$', value_str
): # Permitir más caracteres en símbolos/tipos
# Si es un UDT o Struct complejo, podría venir con comillas, quitarlas
if value_str.startswith('"') and value_str.endswith('"'):
return value_str[1:-1]
return value_str
else:
escaped_value = value_str.replace("'", "''")
return f"'{escaped_value}'"
# --- NUEVA FUNCIÓN RECURSIVA para generar declaraciones SCL (VAR/STRUCT/ARRAY) ---
def generate_scl_declarations(variables, indent_level=1):
"""Genera las líneas SCL para declarar variables, structs y arrays."""
scl_lines = []
indent = " " * indent_level
for var in variables:
var_name_scl = format_variable_name(var.get("name"))
var_dtype_raw = var.get("datatype", "VARIANT")
# Limpiar comillas de tipos de datos UDT ("MyType" -> MyType)
var_dtype = (
var_dtype_raw.strip('"')
if var_dtype_raw.startswith('"') and var_dtype_raw.endswith('"')
else var_dtype_raw
)
var_comment = var.get("comment")
start_value = var.get("start_value")
children = var.get("children") # Para structs
array_elements = var.get("array_elements") # Para arrays
# Manejar tipos de datos Array especiales
array_match = re.match(r"(Array\[.*\]\s+of\s+)(.*)", var_dtype, re.IGNORECASE)
base_type_for_init = var_dtype
declaration_dtype = var_dtype
if array_match:
array_prefix = array_match.group(1)
base_type_raw = array_match.group(2).strip()
# Limpiar comillas del tipo base del array
base_type_for_init = (
base_type_raw.strip('"')
if base_type_raw.startswith('"') and base_type_raw.endswith('"')
else base_type_raw
)
declaration_dtype = (
f'{array_prefix}"{base_type_for_init}"'
if '"' not in base_type_raw
else f"{array_prefix}{base_type_raw}"
) # Reconstruir con comillas si es UDT
# Reconstruir declaración con comillas si es UDT y no array
elif (
not array_match and var_dtype != base_type_for_init
): # Es un tipo que necesita comillas (UDT)
declaration_dtype = f'"{var_dtype}"'
declaration_line = f"{indent}{var_name_scl} : {declaration_dtype}"
init_value = None
# ---- Arrays ----
if array_elements:
# Ordenar índices (asumiendo que son numéricos)
try:
sorted_indices = sorted(array_elements.keys(), key=int)
except ValueError:
sorted_indices = sorted(
array_elements.keys()
) # Fallback a orden alfabético
init_values = [
format_scl_start_value(array_elements[idx], base_type_for_init)
for idx in sorted_indices
]
valid_inits = [v for v in init_values if v is not None]
if valid_inits:
init_value = f"[{', '.join(valid_inits)}]"
# ---- Structs ----
elif children:
# No añadir comentario // Struct aquí, es redundante
scl_lines.append(declaration_line) # Añadir línea de declaración base
scl_lines.append(f"{indent}STRUCT")
scl_lines.extend(generate_scl_declarations(children, indent_level + 1))
scl_lines.append(f"{indent}END_STRUCT;")
if var_comment:
scl_lines.append(f"{indent}// {var_comment}")
scl_lines.append("") # Línea extra
continue # Saltar resto para Struct
# ---- Tipos Simples ----
else:
if start_value is not None:
init_value = format_scl_start_value(start_value, var_dtype)
# Añadir inicialización si existe
if init_value:
declaration_line += f" := {init_value}"
declaration_line += ";"
if var_comment:
declaration_line += f" // {var_comment}"
scl_lines.append(declaration_line)
return scl_lines
# --- Función Principal de Generación SCL ---
def generate_scl(processed_json_filepath, output_scl_filepath):
"""Genera un archivo SCL a partir del JSON procesado (FC/FB o DB)."""
if not os.path.exists(processed_json_filepath):
print(
f"Error: Archivo JSON procesado no encontrado en '{processed_json_filepath}'"
)
return
print(f"Cargando JSON procesado desde: {processed_json_filepath}")
try:
with open(processed_json_filepath, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
print(f"Error al cargar o parsear JSON: {e}")
traceback.print_exc()
return
# --- Extracción de Información del Bloque (Común) ---
block_name = data.get("block_name", "UnknownBlock")
block_number = data.get("block_number")
block_lang_original = data.get("language", "Unknown") # Será "DB" para Data Blocks
block_type = data.get("block_type", "Unknown") # FC, FB, GlobalDB
block_comment = data.get("block_comment", "")
scl_block_name = format_variable_name(block_name) # Nombre SCL seguro
print(
f"Generando SCL para: {block_type} '{scl_block_name}' (Original: {block_name}, Lang: {block_lang_original})"
)
scl_output = []
# --- GENERACIÓN PARA DATA BLOCK (DB) ---
if block_lang_original == "DB":
print("Modo de generación: DATA_BLOCK")
scl_output.append(f"// Block Type: {block_type}")
scl_output.append(f"// Block Name (Original): {block_name}")
if block_number:
scl_output.append(f"// Block Number: {block_number}")
if block_comment:
scl_output.append(f"// Block Comment: {block_comment}")
scl_output.append("")
scl_output.append(f'DATA_BLOCK "{scl_block_name}"')
scl_output.append("{ S7_Optimized_Access := 'TRUE' }") # Asumir optimizado
scl_output.append("VERSION : 0.1")
scl_output.append("")
interface_data = data.get("interface", {})
static_vars = interface_data.get("Static", [])
if static_vars:
scl_output.append("VAR")
scl_output.extend(generate_scl_declarations(static_vars, indent_level=1))
scl_output.append("END_VAR")
scl_output.append("")
else:
print(
"Advertencia: No se encontró sección 'Static' o está vacía en la interfaz del DB."
)
scl_output.append("VAR")
scl_output.append("END_VAR")
scl_output.append("")
scl_output.append("BEGIN")
scl_output.append("")
scl_output.append("END_DATA_BLOCK")
# --- GENERACIÓN PARA FUNCTION BLOCK / FUNCTION (FC/FB) ---
else:
print("Modo de generación: FUNCTION_BLOCK / FUNCTION")
scl_block_keyword = "FUNCTION_BLOCK" if block_type == "FB" else "FUNCTION"
# Cabecera del Bloque
scl_output.append(f"// Block Type: {block_type}")
scl_output.append(f"// Block Name (Original): {block_name}")
if block_number:
scl_output.append(f"// Block Number: {block_number}")
scl_output.append(f"// Original Language: {block_lang_original}")
if block_comment:
scl_output.append(f"// Block Comment: {block_comment}")
scl_output.append("")
# Manejar tipo de retorno para FUNCTION
return_type = "Void" # Default
interface_data = data.get("interface", {})
if scl_block_keyword == "FUNCTION" and interface_data.get("Return"):
return_member = interface_data["Return"][
0
] # Asumir un solo valor de retorno
return_type_raw = return_member.get("datatype", "Void")
return_type = (
return_type_raw.strip('"')
if return_type_raw.startswith('"') and return_type_raw.endswith('"')
else return_type_raw
)
# Añadir comillas si es UDT
if return_type != return_type_raw:
return_type = f'"{return_type}"'
scl_output.append(
f'{scl_block_keyword} "{scl_block_name}" : {return_type}'
if scl_block_keyword == "FUNCTION"
else f'{scl_block_keyword} "{scl_block_name}"'
)
scl_output.append("{ S7_Optimized_Access := 'TRUE' }")
scl_output.append("VERSION : 0.1")
scl_output.append("")
# Declaraciones de Interfaz FC/FB
section_order = [
"Input",
"Output",
"InOut",
"Static",
"Temp",
"Constant",
] # Return ya está en cabecera
declared_temps = set()
for section_name in section_order:
vars_in_section = interface_data.get(section_name, [])
if vars_in_section:
scl_section_keyword = f"VAR_{section_name.upper()}"
if section_name == "Static":
scl_section_keyword = "VAR_STAT"
if section_name == "Temp":
scl_section_keyword = "VAR_TEMP"
if section_name == "Constant":
scl_section_keyword = "CONSTANT"
scl_output.append(scl_section_keyword)
scl_output.extend(
generate_scl_declarations(vars_in_section, indent_level=1)
)
if section_name == "Temp":
declared_temps.update(
format_variable_name(v.get("name"))
for v in vars_in_section
if v.get("name")
)
scl_output.append("END_VAR")
scl_output.append("")
# Declaraciones VAR_TEMP adicionales detectadas
temp_vars = set()
temp_pattern = re.compile(
r'"?#(_temp_[a-zA-Z0-9_]+)"?|"?(_temp_[a-zA-Z0-9_]+)"?'
)
for network in data.get("networks", []):
for instruction in network.get("logic", []):
scl_code = instruction.get("scl", "")
edge_update_code = instruction.get("_edge_mem_update_scl", "")
code_to_scan = (
(scl_code if scl_code else "")
+ "\n"
+ (edge_update_code if edge_update_code else "")
)
if code_to_scan:
found_temps = temp_pattern.findall(code_to_scan)
for temp_tuple in found_temps:
temp_name = next((t for t in temp_tuple if t), None)
if temp_name:
temp_vars.add(
"#" + temp_name
if not temp_name.startswith("#")
else temp_name
)
additional_temps = sorted(list(temp_vars - declared_temps))
if additional_temps:
if not interface_data.get("Temp"):
scl_output.append("VAR_TEMP")
for var_name in additional_temps:
scl_name = format_variable_name(var_name)
inferred_type = "Bool" # Asumir Bool
scl_output.append(
f" {scl_name} : {inferred_type}; // Auto-generated temporary"
)
if not interface_data.get("Temp"):
scl_output.append("END_VAR")
scl_output.append("")
# Cuerpo del Bloque FC/FB
scl_output.append("BEGIN")
scl_output.append("")
# Iterar por redes y lógica (como antes, incluyendo manejo STL Markdown)
for i, network in enumerate(data.get("networks", [])):
network_title = network.get("title", f'Network {network.get("id")}')
network_comment = network.get("comment", "")
network_lang = network.get("language", "LAD")
scl_output.append(
f" // Network {i+1}: {network_title} (Original Language: {network_lang})"
)
if network_comment:
for line in network_comment.splitlines():
scl_output.append(f" // {line}")
scl_output.append("")
network_has_code = False
if network_lang == "STL":
network_has_code = True
if (
network.get("logic")
and network["logic"][0].get("type") == "RAW_STL_CHUNK"
):
raw_stl_code = network["logic"][0].get(
"stl", "// ERROR: STL code missing"
)
scl_output.append(f" {'//'} ```STL")
for stl_line in raw_stl_code.splitlines():
scl_output.append(f" {stl_line}")
scl_output.append(f" {'//'} ```")
else:
scl_output.append(" // ERROR: Contenido STL inesperado.")
else: # LAD, FBD, SCL, etc.
for instruction in network.get("logic", []):
instruction_type = instruction.get("type", "")
scl_code = instruction.get("scl", "")
is_grouped = instruction.get("grouped", False)
if is_grouped:
continue
if (
instruction_type.endswith(SCL_SUFFIX)
or instruction_type in ["RAW_SCL_CHUNK", "UNSUPPORTED_LANG"]
) and scl_code:
is_only_comment = all(
line.strip().startswith("//")
for line in scl_code.splitlines()
if line.strip()
)
is_if_block = scl_code.strip().startswith("IF")
if not is_only_comment or is_if_block:
network_has_code = True
for line in scl_code.splitlines():
scl_output.append(f" {line}")
if network_has_code:
scl_output.append("")
else:
scl_output.append(f" // Network did not produce printable SCL code.")
scl_output.append("")
# Fin del bloque FC/FB
scl_output.append(f"END_{scl_block_keyword}")
# --- Escritura del Archivo SCL (Común) ---
print(f"Escribiendo archivo SCL en: {output_scl_filepath}")
try:
with open(output_scl_filepath, "w", encoding="utf-8") as f:
for line in scl_output:
f.write(line + "\n")
print("Generación de SCL completada.")
except Exception as e:
print(f"Error al escribir el archivo SCL: {e}")
traceback.print_exc()
# --- Ejecución ---
if __name__ == "__main__":
# Imports necesarios solo para la ejecución como script principal
import argparse
import os
import sys
import traceback # Asegurarse que traceback está importado si se usa en generate_scl
# Configurar ArgumentParser para recibir la ruta del XML original obligatoria
parser = argparse.ArgumentParser(
description="Generate final SCL file from processed JSON (_simplified_processed.json). Expects original XML filepath as argument."
)
parser.add_argument(
"source_xml_filepath", # Argumento posicional obligatorio
help="Path to the original source XML file (passed from x0_main.py, used to derive input/output names).",
)
args = parser.parse_args() # Parsea los argumentos de sys.argv
source_xml_file = args.source_xml_filepath # Obtiene la ruta del XML original
# Verificar si el archivo XML original existe (como referencia)
if not os.path.exists(source_xml_file):
print(
f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON procesado."
)
# No salir necesariamente.
# Derivar nombres de archivos de entrada (JSON procesado) y salida (SCL)
xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0]
# Asumir que los archivos están en el mismo directorio que el XML original
base_dir = os.path.dirname(source_xml_file) # Directorio del XML original
input_json_file = os.path.join(
base_dir, f"{xml_filename_base}_simplified_processed.json"
)
output_scl_file = os.path.join(
base_dir, f"{xml_filename_base}_simplified_processed.scl"
)
print(
f"(x3) Generando SCL: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_scl_file)}'"
)
# Verificar si el archivo JSON procesado de entrada EXISTE
if not os.path.exists(input_json_file):
print(
f"Error Fatal (x3): Archivo JSON procesado no encontrado: '{input_json_file}'"
)
print(
f"Asegúrate de que 'x2_process.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'."
)
sys.exit(1) # Salir si el archivo necesario no está
else:
# Llamar a la función principal de generación SCL del script
# Asumiendo que tu función principal se llama generate_scl(input_json_path, output_scl_path)
try:
generate_scl(input_json_file, output_scl_file)
except Exception as e:
print(
f"Error Crítico (x3) durante la generación de SCL desde '{input_json_file}': {e}"
)
# traceback ya debería estar importado si generate_scl lo necesita
traceback.print_exc()
sys.exit(1) # Salir con error si la función principal falla