Simatic_XML_Parser_to_SCL/ToUpload/x1_to_json.py

625 lines
28 KiB
Python

# ToUpload/x1_to_json.py
# -*- coding: utf-8 -*-
import json
import argparse
import os
import sys
import traceback
import importlib
from lxml import etree
from collections import defaultdict
import copy
# Importar funciones comunes y namespaces desde el nuevo módulo de utils
try:
from parsers.parser_utils import ns, get_multilingual_text, parse_interface_members
except ImportError as e:
print(
f"Error crítico: No se pudieron importar funciones desde parsers.parser_utils: {e}"
)
print(
"Asegúrate de que el directorio 'parsers' y 'parsers/parser_utils.py' existen y son correctos."
)
sys.exit(1)
# --- NUEVAS FUNCIONES DE PARSEO para UDT y Tag Table ---
def parse_udt(udt_element):
"""Parsea un elemento <SW.Types.PlcStruct> (UDT)."""
print(" -> Detectado: PlcStruct (UDT)")
block_data = {
"block_name": "UnknownUDT",
"block_type": "PlcUDT", # Identificador para x3
"language": "UDT", # Lenguaje específico
"interface": {},
"networks": [], # Los UDTs no tienen redes
"block_comment": "",
}
# Extraer nombre y comentario del UDT (similar a como se hace con bloques)
attribute_list_node = udt_element.xpath("./AttributeList")
if attribute_list_node:
attr_list = attribute_list_node[0]
name_node = attr_list.xpath("./Name/text()")
block_data["block_name"] = name_node[0].strip() if name_node else "UnknownUDT"
# Comentario del UDT
comment_node_list = udt_element.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']"
)
if comment_node_list:
block_data["block_comment"] = get_multilingual_text(comment_node_list[0])
else: # Fallback
comment_attr_node = attr_list.xpath(
"../ObjectList/MultilingualText[@CompositionName='Comment']"
) # Buscar desde el padre
if comment_attr_node:
block_data["block_comment"] = get_multilingual_text(
comment_attr_node[0]
)
# Extraer interfaz (miembros)
# La interfaz de un UDT suele estar directamente en <Interface><Sections><Section Name="None">
interface_node_list = udt_element.xpath(
"./AttributeList/Interface/iface:Sections/iface:Section[@Name='None']",
namespaces=ns,
)
if interface_node_list:
section_node = interface_node_list[0]
members_in_section = section_node.xpath("./iface:Member", namespaces=ns)
if members_in_section:
# Usar la función existente para parsear miembros
block_data["interface"]["None"] = parse_interface_members(
members_in_section
)
else:
print(
f"Advertencia: Sección 'None' encontrada en UDT '{block_data['block_name']}' pero sin miembros."
)
else:
# Intentar buscar interfaz directamente si no está en AttributeList (menos común)
interface_node_direct = udt_element.xpath(
".//iface:Interface/iface:Sections/iface:Section[@Name='None']",
namespaces=ns,
)
if interface_node_direct:
section_node = interface_node_direct[0]
members_in_section = section_node.xpath("./iface:Member", namespaces=ns)
if members_in_section:
block_data["interface"]["None"] = parse_interface_members(
members_in_section
)
else:
print(
f"Advertencia: Sección 'None' encontrada directamente en UDT '{block_data['block_name']}' pero sin miembros."
)
else:
print(
f"Advertencia: No se encontró la sección 'None' de la interfaz para UDT '{block_data['block_name']}'."
)
if not block_data["interface"]:
print(
f"Advertencia: No se pudo extraer la interfaz del UDT '{block_data['block_name']}'."
)
return block_data
def parse_tag_table(tag_table_element):
"""Parsea un elemento <SW.Tags.PlcTagTable>."""
print(" -> Detectado: PlcTagTable")
table_data = {
"block_name": "UnknownTagTable",
"block_type": "PlcTagTable", # Identificador para x3
"language": "TagTable", # Lenguaje específico
"tags": [],
"networks": [], # Las Tag Tables no tienen redes
"block_comment": "", # Las tablas de tags no suelen tener comentario de bloque
}
# Extraer nombre de la tabla
attribute_list_node = tag_table_element.xpath("./AttributeList")
if attribute_list_node:
name_node = attribute_list_node[0].xpath("./Name/text()")
table_data["block_name"] = (
name_node[0].strip() if name_node else "UnknownTagTable"
)
# Extraer tags
tag_elements = tag_table_element.xpath("./ObjectList/SW.Tags.PlcTag")
print(f" - Encontrados {len(tag_elements)} tags.")
for tag_elem in tag_elements:
tag_info = {
"name": "UnknownTag",
"datatype": "Unknown",
"address": None,
"comment": "",
}
tag_attr_list = tag_elem.xpath("./AttributeList")
if tag_attr_list:
attr_list = tag_attr_list[0]
name_node = attr_list.xpath("./Name/text()")
tag_info["name"] = name_node[0].strip() if name_node else "UnknownTag"
dtype_node = attr_list.xpath("./DataTypeName/text()")
tag_info["datatype"] = dtype_node[0].strip() if dtype_node else "Unknown"
addr_node = attr_list.xpath("./LogicalAddress/text()")
tag_info["address"] = addr_node[0].strip() if addr_node else None
# Extraer comentario del tag
comment_node_list = tag_elem.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']"
)
if comment_node_list:
tag_info["comment"] = get_multilingual_text(comment_node_list[0])
table_data["tags"].append(tag_info)
return table_data
# --- Cargador Dinámico de Parsers (sin cambios) ---
def load_parsers(parsers_dir="parsers"):
"""
Escanea el directorio de parsers, importa módulos y construye
un mapa de lenguaje a función de parseo.
"""
parser_map = {}
# Verificar si el directorio existe
script_dir = os.path.dirname(__file__)
parsers_dir_path = os.path.join(script_dir, parsers_dir)
if not os.path.isdir(parsers_dir_path):
print(f"Error: Directorio de parsers no encontrado: '{parsers_dir_path}'")
return parser_map # Devuelve mapa vacío
print(f"Cargando parsers desde: '{parsers_dir_path}'")
parsers_package = os.path.basename(parsers_dir)
for filename in os.listdir(parsers_dir_path):
# Buscar archivos que empiecen con 'parse_' y terminen en '.py'
# Excluir '__init__.py' y 'parser_utils.py'
if (
filename.startswith("parse_")
and filename.endswith(".py")
and filename not in ["__init__.py", "parser_utils.py"]
):
module_name_rel = filename[:-3] # Nombre sin .py (e.g., parse_lad_fbd)
full_module_name = (
f"{parsers_package}.{module_name_rel}" # e.g., parsers.parse_lad_fbd
)
try:
# Importar el módulo dinámicamente
module = importlib.import_module(full_module_name)
# Verificar si el módulo tiene la función get_parser_info
if hasattr(module, "get_parser_info") and callable(
module.get_parser_info
):
parser_info = module.get_parser_info()
# Esperamos un diccionario con 'language' (lista) y 'parser_func'
if (
isinstance(parser_info, dict)
and "language" in parser_info
and "parser_func" in parser_info
):
languages = parser_info["language"]
parser_func = parser_info["parser_func"]
if isinstance(languages, list) and callable(parser_func):
# Añadir la función al mapa para cada lenguaje que soporta
for lang in languages:
lang_upper = lang.upper() # Usar mayúsculas como clave
if lang_upper in parser_map:
print(
f" Advertencia: Parser para '{lang_upper}' en {full_module_name} sobrescribe definición anterior."
)
parser_map[lang_upper] = parser_func
print(
f" - Cargado parser para '{lang_upper}' desde {module_name_rel}.py"
)
else:
print(
f" Advertencia: Formato inválido en get_parser_info de {full_module_name} (language debe ser lista, parser_func callable)."
)
else:
print(
f" Advertencia: get_parser_info en {full_module_name} no devolvió el diccionario esperado."
)
else:
print(
f" Advertencia: Módulo {module_name_rel}.py no tiene la función 'get_parser_info'."
)
except ImportError as e:
print(f"Error importando {full_module_name}: {e}")
except Exception as e:
print(f"Error procesando {full_module_name}: {e}")
traceback.print_exc()
print(f"\nTotal de lenguajes con parser cargado: {len(parser_map)}")
print(f"Lenguajes soportados: {list(parser_map.keys())}")
return parser_map
# --- Función Principal de Conversión (MODIFICADA) ---
def convert_xml_to_json(xml_filepath, json_filepath, parser_map):
"""Convierte XML a JSON, detectando tipo de bloque (FC/FB/OB/DB/UDT/TagTable)."""
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return False # Indicar fallo
try:
print("Paso 1: Parseando archivo XML...")
parser = etree.XMLParser(
remove_blank_text=True, recover=True
) # recover=True puede ayudar
tree = etree.parse(xml_filepath, parser)
root = tree.getroot()
print("Paso 1: Parseo XML completado.")
result = None # Inicializar resultado
# --- Detección del tipo de bloque/objeto principal ---
print("Paso 2: Detectando tipo de objeto principal...")
# Buscar UDT
udt_element = root.find(".//SW.Types.PlcStruct", namespaces=root.nsmap)
if udt_element is not None:
result = parse_udt(udt_element)
# Buscar Tag Table si no es UDT
if result is None:
tag_table_element = root.find(
".//SW.Tags.PlcTagTable", namespaces=root.nsmap
)
if tag_table_element is not None:
result = parse_tag_table(tag_table_element)
# Buscar bloque FC/FB/OB/GlobalDB si no es UDT ni Tag Table
if result is None:
print("Paso 2: No es UDT ni Tag Table. Buscando SW.Blocks.* ...")
# Usar local-name() para ignorar namespaces en esta búsqueda inicial
block_list = root.xpath(
"//*[local-name()='SW.Blocks.FC' or local-name()='SW.Blocks.FB' or local-name()='SW.Blocks.GlobalDB' or local-name()='SW.Blocks.OB']"
)
# (Resto de la lógica de detección de bloques FC/FB/OB/DB como estaba antes...)
block_type_found = None
the_block = None
if block_list:
the_block = block_list[0]
block_tag_name = etree.QName(the_block.tag).localname
if block_tag_name == "SW.Blocks.FC":
block_type_found = "FC"
elif block_tag_name == "SW.Blocks.FB":
block_type_found = "FB"
elif block_tag_name == "SW.Blocks.GlobalDB":
block_type_found = "GlobalDB"
elif block_tag_name == "SW.Blocks.OB":
block_type_found = "OB"
print(
f"Paso 2b: Bloque {block_tag_name} (Tipo: {block_type_found}) encontrado (ID={the_block.get('ID')})."
)
else:
print(
"Error Crítico: No se encontró el elemento raíz del bloque (<SW.Blocks.FC/FB/GlobalDB/OB>) ni UDT ni Tag Table."
)
return False # Fallo si no se encuentra ningún objeto principal
# --- Si es FC/FB/OB/DB, continuar con el parseo original ---
if the_block is not None:
print("Paso 3: Extrayendo atributos del bloque...")
# (Extracción de atributos Name, Number, Language como antes...)
attribute_list_node = the_block.xpath("./AttributeList")
block_name_val, block_number_val, block_lang_val = (
"Unknown",
None,
"Unknown",
)
if attribute_list_node:
attr_list = attribute_list_node[0]
name_node = attr_list.xpath("./Name/text()")
block_name_val = (
name_node[0].strip() if name_node else block_name_val
)
num_node = attr_list.xpath("./Number/text()")
try:
block_number_val = int(num_node[0]) if num_node else None
except (ValueError, TypeError):
block_number_val = None
lang_node = attr_list.xpath("./ProgrammingLanguage/text()")
block_lang_val = (
lang_node[0].strip()
if lang_node
else ("DB" if block_type_found == "GlobalDB" else "Unknown")
)
print(
f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje Bloque='{block_lang_val}'"
)
else:
print(
f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}."
)
if block_type_found == "GlobalDB":
block_lang_val = "DB"
# (Extracción de comentario como antes...)
block_comment_val = ""
comment_node_list = the_block.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']"
)
if comment_node_list:
block_comment_val = get_multilingual_text(comment_node_list[0])
else: # Fallback
comment_attr_node = the_block.xpath(
"./AttributeList/Comment"
) # Buscar desde AttributeList
if comment_attr_node:
block_comment_val = get_multilingual_text(comment_attr_node[0])
print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
# Crear diccionario resultado
result = {
"block_name": block_name_val,
"block_number": block_number_val,
"language": block_lang_val,
"block_type": block_type_found,
"block_comment": block_comment_val,
"interface": {},
"networks": [], # Inicializar networks aquí
}
# (Extracción de interfaz como antes...)
print("Paso 4: Extrayendo la interfaz del bloque...")
interface_node_list = (
attribute_list_node[0].xpath("./Interface")
if attribute_list_node
else []
)
if interface_node_list:
interface_node = interface_node_list[0]
all_sections = interface_node.xpath(
".//iface:Section", namespaces=ns
)
if all_sections:
processed_sections = set()
for section in all_sections:
section_name = section.get("Name")
if not section_name or section_name in processed_sections:
continue
members_in_section = section.xpath(
"./iface:Member", namespaces=ns
)
if members_in_section:
result["interface"][section_name] = (
parse_interface_members(members_in_section)
)
processed_sections.add(section_name)
else:
print(
"Advertencia: Nodo Interface no contiene secciones <iface:Section>."
)
if not result["interface"]:
print(
"Advertencia: Interface encontrada pero sin secciones procesables."
)
elif block_type_found == "GlobalDB":
static_members = the_block.xpath(
".//iface:Section[@Name='Static']/iface:Member", namespaces=ns
)
if static_members:
print(
"Paso 4: Encontrada sección Static para GlobalDB (sin nodo Interface)."
)
result["interface"]["Static"] = parse_interface_members(
static_members
)
else:
print(
"Advertencia: No se encontró sección 'Static' para GlobalDB."
)
else:
print(
f"Advertencia: No se encontró <Interface> para bloque {block_type_found}."
)
if not result["interface"]:
print("Advertencia: No se pudo extraer información de la interfaz.")
# (Procesamiento de redes como antes, SOLO si NO es GlobalDB)
if block_type_found != "GlobalDB":
print("Paso 5: Buscando y PROCESANDO redes (CompileUnits)...")
networks_processed_count = 0
result["networks"] = []
object_list_node = the_block.xpath("./ObjectList")
if object_list_node:
compile_units = object_list_node[0].xpath(
"./SW.Blocks.CompileUnit"
)
print(
f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit."
)
# Bucle de parseo de redes (igual que antes)
for network_elem in compile_units:
networks_processed_count += 1
network_id = network_elem.get("ID")
if not network_id:
continue
network_lang = "LAD"
net_attr_list = network_elem.xpath("./AttributeList")
if net_attr_list:
lang_node = net_attr_list[0].xpath(
"./ProgrammingLanguage/text()"
)
if lang_node:
network_lang = lang_node[0].strip()
print(
f" - Procesando Red ID={network_id}, Lenguaje Red={network_lang}"
)
parser_func = parser_map.get(network_lang.upper())
parsed_network_data = None
if parser_func:
try:
parsed_network_data = parser_func(network_elem)
except Exception as e_parse:
print(
f" ERROR durante el parseo de Red {network_id} ({network_lang}): {e_parse}"
)
traceback.print_exc()
parsed_network_data = {
"id": network_id,
"language": network_lang,
"logic": [],
"error": f"Parser failed: {e_parse}",
}
else:
print(
f" Advertencia: Lenguaje de red '{network_lang}' no soportado."
)
parsed_network_data = {
"id": network_id,
"language": network_lang,
"logic": [],
"error": f"Unsupported language: {network_lang}",
}
if parsed_network_data:
title_element = network_elem.xpath(
".//iface:MultilingualText[@CompositionName='Title']",
namespaces=ns,
)
parsed_network_data["title"] = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_elem_net = network_elem.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']",
namespaces=ns,
)
if not comment_elem_net:
comment_elem_net = network_elem.xpath(
".//MultilingualText[@CompositionName='Comment']",
namespaces=ns,
) # Fallback
parsed_network_data["comment"] = (
get_multilingual_text(comment_elem_net[0])
if comment_elem_net
else ""
)
result["networks"].append(parsed_network_data)
if networks_processed_count == 0:
print(
f"Advertencia: ObjectList para {block_type_found} sin SW.Blocks.CompileUnit."
)
else:
print(
f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}."
)
else: # Es GlobalDB
print("Paso 5: Saltando procesamiento de redes para GlobalDB.")
# --- Escritura del JSON (si se encontró un objeto) ---
if result:
print("Paso 6: Escribiendo el resultado en el archivo JSON...")
# Validaciones finales
if (
result.get("block_type") not in ["PlcUDT", "PlcTagTable"]
and not result["interface"]
):
print("ADVERTENCIA FINAL: 'interface' está vacía en el JSON.")
if (
result.get("block_type") not in ["PlcUDT", "PlcTagTable", "GlobalDB"]
and not result["networks"]
):
print("ADVERTENCIA FINAL: 'networks' está vacía en el JSON.")
try:
with open(json_filepath, "w", encoding="utf-8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print("Paso 6: Escritura JSON completada.")
print(
f"Conversión finalizada. JSON guardado en: '{os.path.relpath(json_filepath)}'"
)
return True # Indicar éxito
except IOError as e:
print(
f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"
)
return False
except TypeError as e:
print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
return False
else:
print(
"Error Crítico: No se pudo determinar el tipo de objeto principal en el XML."
)
return False
except etree.XMLSyntaxError as e:
print(
f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}"
)
return False # Indicar fallo
except Exception as e:
print(f"Error Crítico: Error inesperado durante la conversión: {e}")
traceback.print_exc()
return False # Indicar fallo
# --- Punto de Entrada Principal (__main__) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Convert Simatic XML (FC/FB/OB/DB/UDT/TagTable) to simplified JSON using dynamic parsers." # Actualizado
)
parser.add_argument(
"xml_filepath",
help="Path to the input XML file passed from the main script (x0_main.py).",
)
args = parser.parse_args()
xml_input_file = args.xml_filepath
if not os.path.exists(xml_input_file):
print(
f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'",
file=sys.stderr,
)
sys.exit(1)
# --- Cargar Parsers Dinámicamente ---
loaded_parsers = load_parsers() # Carga parsers LAD/FBD/STL/SCL
if not loaded_parsers:
# Continuar incluso sin parsers de red, ya que podríamos estar parseando UDT/TagTable
print(
"Advertencia (x1): No se cargaron parsers de red. Se continuará para UDT/TagTable/DB."
)
# sys.exit(1) # Ya no salimos si no hay parsers de red
# Derivar nombre de salida JSON
xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0]
base_dir = os.path.dirname(xml_input_file)
output_dir = os.path.join(base_dir, "parsing")
os.makedirs(output_dir, exist_ok=True)
json_output_file = os.path.join(output_dir, f"{xml_filename_base}.json")
print(
f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'"
)
# Llamar a la función de conversión principal
success = convert_xml_to_json(xml_input_file, json_output_file, loaded_parsers)
# Salir con código de error apropiado
if success:
sys.exit(0) # Éxito
else:
print(
f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.",
file=sys.stderr,
)
sys.exit(1) # Fallo