ParamManagerScripts/backend/script_groups/XML Parser to SCL/parsers/parser_utils.py

552 lines
23 KiB
Python

# ToUpload/parsers/parser_utils.py
# -*- coding: utf-8 -*-
from lxml import etree
import traceback
# Definición de 'ns' (asegúrate de que esté definida correctamente en tu archivo)
ns = {
"iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5",
"flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4",
"st": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StructuredText/v3",
"stl": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StatementList/v4",
# Añade otros namespaces si son necesarios
}
# --- Funciones Comunes de Extracción de Texto y Nodos ---
def get_multilingual_text(element, default_lang="en-US-it-IT", fallback_lang=None):
"""
Extrae texto multilingüe de un elemento XML. (v5.2 - DEBUG + XPath ObjectList)
"""
# print(f"--- DEBUG get_multilingual_text v5.2: Iniciando para elemento {element.tag if element is not None else 'None'}, default='{default_lang}' ---")
if element is None: return ""
combined_texts = []
languages_to_try = []
# --- Lógica Combinada ---
is_combined_mode = default_lang and '-' in default_lang and len(default_lang.split('-')) >= 2
if is_combined_mode:
# print(f"--- DEBUG v5.2: Detectado modo combinado: '{default_lang}' ---")
parts = default_lang.split('-')
target_langs = []
if len(parts) % 2 == 0:
for i in range(0, len(parts), 2): target_langs.append(f"{parts[i]}-{parts[i+1]}")
else: target_langs = []
if target_langs:
# print(f"--- DEBUG v5.2: Culturas combinadas a buscar: {target_langs} ---")
try:
for lang in target_langs:
# --- CORRECCIÓN XPath v5.2: Añadir ObjectList ---
xpath_find_item = f"./ObjectList/MultilingualTextItem[AttributeList/Culture='{lang}']"
found_items = element.xpath(xpath_find_item, namespaces=ns)
# print(f" DEBUG Combinado v5.2: Items encontrados para '{lang}': {len(found_items)}")
if found_items:
xpath_get_text = "./AttributeList/Text/text()"
text_nodes = found_items[0].xpath(xpath_get_text, namespaces=ns)
# print(f" DEBUG Combinado v5.2: Nodos de texto encontrados: {len(text_nodes)}")
if text_nodes:
text_content = text_nodes[0].strip()
# print(f" DEBUG Combinado v5.2: Texto encontrado para '{lang}': '{text_content[:50]}...'")
if text_content: combined_texts.append(text_content)
# --- FIN CORRECCIÓN XPath v5.2 ---
if combined_texts:
# print(f"--- DEBUG v5.2: Modo combinado retornando: '{' - '.join(combined_texts)}' ---")
return " - ".join(combined_texts)
else:
# print(f"--- DEBUG v5.2: Modo combinado no encontró textos. Intentando fallback... ---")
default_lang = None
except Exception as e_comb:
print(f" Advertencia: Error procesando modo combinado '{default_lang}': {e_comb}")
default_lang = None
else: default_lang = None
# --- Fin Lógica Combinada ---
# --- Lógica Normal / Fallback ---
# print("--- DEBUG v5.2: Iniciando lógica Normal/Fallback ---")
if default_lang: languages_to_try.append(default_lang)
if fallback_lang: languages_to_try.append(fallback_lang)
# print(f" DEBUG v5.2: Idiomas específicos a probar: {languages_to_try}")
try:
if languages_to_try:
for lang in languages_to_try:
# --- CORRECCIÓN XPath v5.2: Añadir ObjectList ---
xpath_find_item = f"./ObjectList/MultilingualTextItem[AttributeList/Culture='{lang}']"
found_items = element.xpath(xpath_find_item, namespaces=ns)
# print(f" DEBUG Fallback v5.2: Items encontrados para '{lang}': {len(found_items)}")
if found_items:
xpath_get_text = "./AttributeList/Text/text()"
text_nodes = found_items[0].xpath(xpath_get_text, namespaces=ns)
# print(f" DEBUG Fallback v5.2: Nodos de texto encontrados: {len(text_nodes)}")
if text_nodes:
text_content = text_nodes[0].strip()
# print(f" DEBUG Fallback v5.2: Texto encontrado para '{lang}': '{text_content[:50]}...'")
if text_content:
# print(f"--- DEBUG v5.2: Fallback retornando texto de '{lang}' ---")
return text_content
# --- FIN CORRECCIÓN XPath v5.2 ---
# Fallback final: buscar cualquier texto no vacío
# print(" DEBUG v5.2: Probando cualquier idioma con texto no vacío...")
xpath_any_text = "./ObjectList/MultilingualTextItem/AttributeList/Text/text()" # .// busca en cualquier nivel
all_text_nodes = element.xpath(xpath_any_text, namespaces=ns)
# print(f" DEBUG Fallback Any v5.2: Nodos de texto encontrados: {len(all_text_nodes)}")
for text_content_raw in all_text_nodes:
text_content = text_content_raw.strip()
if text_content:
# print(f"--- DEBUG v5.2: Fallback 'Any' retornando texto: '{text_content}' ---")
return text_content
# print("--- DEBUG v5.2: No se encontró ningún texto no vacío. Retornando '' ---")
return ""
except Exception as e:
# print(f"--- DEBUG v5.2: EXCEPCIÓN en lógica Normal/Fallback: {e} ---")
# traceback.print_exc()
return ""
def get_symbol_name(symbol_element):
"""Obtiene el nombre completo de un símbolo desde un elemento <flg:Symbol>."""
if symbol_element is None:
return None
try:
components = symbol_element.xpath("./flg:Component/@Name", namespaces=ns)
return (
".".join(
f'"{c}"' if not c.startswith("#") and '"' not in c else c
for c in components
)
if components
else None
)
except Exception as e:
print(f"Advertencia: Excepción en get_symbol_name: {e}")
return None
def parse_access(access_element):
"""Parsea un nodo <flg:Access> devolviendo un diccionario con su información."""
if access_element is None:
return None
uid = access_element.get("UId")
scope = access_element.get("Scope")
info = {"uid": uid, "scope": scope, "type": "unknown"}
# Manejo específico para direcciones (Address)
if scope == "Address":
address_elem = access_element.xpath("./flg:Address", namespaces=ns)
if address_elem:
addr = address_elem[0]
# Extraer toda la información disponible sobre la dirección
info["type"] = "unknown_structure" # Mantener compatible con el código existente
info["Area"] = addr.get("Area", "DB")
info["BitOffset"] = addr.get("BitOffset", "0")
info["BlockNumber"] = addr.get("BlockNumber", "")
info["Type"] = addr.get("Type", "Word") # Tipo por defecto: Word
return info
symbol = access_element.xpath("./flg:Symbol", namespaces=ns)
constant = access_element.xpath("./flg:Constant", namespaces=ns)
if symbol:
info["type"] = "variable"
info["name"] = get_symbol_name(symbol[0])
if info["name"] is None:
info["type"] = "error_parsing_symbol"
print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}")
raw_text = "".join(symbol[0].xpath(".//text()")).strip()
info["name"] = (
f'"_ERR_PARSING_{raw_text[:20]}"'
if raw_text
else f'"_ERR_PARSING_EMPTY_SYMBOL_ACCESS_{uid}"'
)
elif constant:
info["type"] = "constant"
const_type_elem = constant[0].xpath("./flg:ConstantType", namespaces=ns)
const_val_elem = constant[0].xpath("./flg:ConstantValue", namespaces=ns)
info["datatype"] = (
const_type_elem[0].text.strip()
if const_type_elem and const_type_elem[0].text is not None
else "Unknown"
)
value_str = (
const_val_elem[0].text.strip()
if const_val_elem and const_val_elem[0].text is not None
else None
)
if value_str is None:
info["type"] = "error_parsing_constant"
info["value"] = None
print(f"Error: Constante sin valor Access UID={uid}")
if info["datatype"] == "Unknown" and value_str:
val_lower = value_str.lower()
if val_lower in ["true", "false"]:
info["datatype"] = "Bool"
elif value_str.isdigit() or (
value_str.startswith("-") and value_str[1:].isdigit()
):
info["datatype"] = "Int"
elif "." in value_str:
try:
float(value_str)
info["datatype"] = "Real"
except ValueError:
pass
elif "#" in value_str:
parts = value_str.split("#", 1)
prefix = parts[0].upper()
if prefix == "T":
info["datatype"] = "Time"
elif prefix == "LT":
info["datatype"] = "LTime"
elif prefix == "S5T":
info["datatype"] = "S5Time"
elif prefix == "D":
info["datatype"] = "Date"
elif prefix == "DT":
info["datatype"] = "DT"
elif prefix == "DTL":
info["datatype"] = "DTL"
elif prefix == "TOD":
info["datatype"] = "Time_Of_Day"
elif value_str.startswith("'") and value_str.endswith("'"):
info["datatype"] = "String"
else:
info["datatype"] = "TypedConstant"
elif value_str.startswith("'") and value_str.endswith("'"):
info["datatype"] = "String"
info["value"] = value_str
dtype_lower = info["datatype"].lower()
val_str_processed = value_str
if isinstance(value_str, str):
if "#" in value_str:
val_str_processed = value_str.split("#", 1)[-1]
if (
val_str_processed.startswith("'")
and val_str_processed.endswith("'")
and len(val_str_processed) > 1
):
val_str_processed = val_str_processed[1:-1]
try:
if dtype_lower in [
"int",
"dint",
"udint",
"sint",
"usint",
"lint",
"ulint",
"word",
"dword",
"lword",
"byte",
]:
info["value"] = int(val_str_processed)
elif dtype_lower == "bool":
info["value"] = (
val_str_processed.lower() == "true" or val_str_processed == "1"
)
elif dtype_lower in ["real", "lreal"]:
info["value"] = float(val_str_processed)
except (ValueError, TypeError):
info["value"] = value_str
else:
info["type"] = "unknown_structure"
print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.")
if info["type"] == "variable" and info.get("name") is None:
print(f"Error Interno: parse_access var sin nombre UID {uid}.")
info["type"] = "error_no_name"
return info
def parse_part(part_element):
"""Parsea un nodo <flg:Part> de LAD/FBD."""
if part_element is None:
return None
uid = part_element.get("UId")
name = part_element.get("Name")
if not uid or not name:
print(
f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"
)
return None
template_values = {}
negated_pins = {}
try:
for tv in part_element.xpath("./TemplateValue"):
tv_name = tv.get("Name")
tv_type = tv.get("Type")
if tv_name and tv_type:
template_values[tv_name] = tv_type
except Exception as e:
print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
try:
for negated_elem in part_element.xpath("./Negated"):
negated_pin_name = negated_elem.get("Name")
if negated_pin_name:
negated_pins[negated_pin_name] = True
except Exception as e:
print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}")
return {
"uid": uid,
"type": name,
"template_values": template_values,
"negated_pins": negated_pins,
}
def parse_call(call_element):
"""Parsea un nodo <flg:Call> de LAD/FBD."""
if call_element is None:
return None
uid = call_element.get("UId")
if not uid:
print(
f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"
)
return None
call_info_elem = call_element.xpath("./flg:CallInfo", namespaces=ns)
if not call_info_elem:
call_info_elem_no_ns = call_element.xpath("./CallInfo")
if not call_info_elem_no_ns:
print(f"Error: Call UID {uid} sin elemento CallInfo.")
return {"uid": uid, "type": "Call_error", "error": "Missing CallInfo"}
else:
print(f"Advertencia: Call UID {uid} encontró CallInfo SIN namespace.")
call_info = call_info_elem_no_ns[0]
else:
call_info = call_info_elem[0]
block_name = call_info.get("Name")
block_type = call_info.get("BlockType")
if not block_name or not block_type:
print(f"Error: CallInfo para UID {uid} sin Name o BlockType.")
return {
"uid": uid,
"type": "Call_error",
"error": "Missing Name or BlockType in CallInfo",
}
instance_name, instance_scope = None, None
if block_type == "FB":
instance_elem_list = call_info.xpath("./flg:Instance", namespaces=ns)
if instance_elem_list:
instance_elem = instance_elem_list[0]
instance_scope = instance_elem.get("Scope")
component_elem_list = instance_elem.xpath("./flg:Component", namespaces=ns)
if component_elem_list:
component_elem = component_elem_list[0]
db_name_raw = component_elem.get("Name")
if db_name_raw:
instance_name = (
f'"{db_name_raw}"'
if not db_name_raw.startswith('"')
else db_name_raw
)
else:
print(
f"Advertencia: <flg:Component> en <flg:Instance> FB Call UID {uid} sin 'Name'."
)
else:
print(
f"Advertencia: No se encontró <flg:Component> en <flg:Instance> FB Call UID {uid}."
)
else:
print(
f"Advertencia: FB Call '{block_name}' UID {uid} sin <flg:Instance>. ¿Llamada a multi-instancia STAT?"
)
call_scope = call_element.get("Scope")
if call_scope == "LocalVariable":
instance_name = f'"{block_name}"'
instance_scope = "Static"
print(
f"INFO: Asumiendo instancia STAT '{instance_name}' para FB Call UID {uid}."
)
call_data = {
"uid": uid,
"type": "Call",
"block_name": block_name,
"block_type": block_type,
}
if instance_name:
call_data["instance_db"] = instance_name
if instance_scope:
call_data["instance_scope"] = instance_scope
return call_data
def parse_interface_members(member_elements):
"""Parsea recursivamente miembros de interfaz/estructura, incluyendo InstanceOfName."""
members_data = []
if not member_elements:
return members_data
for member in member_elements:
member_name = member.get("Name")
member_dtype_raw = member.get("Datatype")
member_version = member.get("Version")
member_remanence = member.get("Remanence", "NonRetain")
member_accessibility = member.get("Accessibility", "Public")
# <-- NUEVO: Obtener InstanceOfName -->
member_instance_of = member.get("InstanceOfName") # Puede ser None
# <-- FIN NUEVO -->
if not member_name or not member_dtype_raw:
print("Advertencia: Miembro sin nombre o tipo de dato. Saltando.")
continue
# Combinar tipo y versión si existe versión
member_dtype = (
f"{member_dtype_raw}:v{member_version}"
if member_version
else member_dtype_raw
)
member_info = {
"name": member_name,
"datatype": member_dtype,
"remanence": member_remanence,
"accessibility": member_accessibility,
"start_value": None,
"comment": None,
"children": [],
"array_elements": {},
}
# <-- NUEVO: Añadir instance_of_name si existe -->
if member_instance_of:
member_info["instance_of_name"] = member_instance_of
# <-- FIN NUEVO -->
# Extraer comentario
comment_node = member.xpath("./iface:Comment", namespaces=ns)
if comment_node:
member_info["comment"] = get_multilingual_text(comment_node[0])
# Extraer valor inicial
start_value_node = member.xpath("./iface:StartValue", namespaces=ns)
if start_value_node:
constant_name = start_value_node[0].get("ConstantName")
member_info["start_value"] = (
constant_name
if constant_name
else (
start_value_node[0].text
if start_value_node[0].text is not None
else None
)
)
# Procesar miembros anidados (Struct)
nested_sections = member.xpath(
"./iface:Sections/iface:Section[@Name='None']/iface:Member", namespaces=ns
)
if nested_sections:
member_info["children"] = parse_interface_members(nested_sections)
# Procesar valores iniciales de Array
if isinstance(member_dtype, str) and member_dtype.lower().startswith("array["):
subelements = member.xpath("./iface:Subelement", namespaces=ns)
for sub in subelements:
path = sub.get("Path") # Índice del array, ej "0", "1"
sub_start_value_node = sub.xpath("./iface:StartValue", namespaces=ns)
if path and sub_start_value_node:
constant_name = sub_start_value_node[0].get("ConstantName")
value = (
constant_name
if constant_name
else (
sub_start_value_node[0].text
if sub_start_value_node[0].text is not None
else None
)
)
# Guardar valor y comentario del subelemento
sub_comment_node = sub.xpath("./iface:Comment", namespaces=ns)
sub_comment_text = (
get_multilingual_text(sub_comment_node[0])
if sub_comment_node
else None
)
if sub_comment_text:
member_info["array_elements"][path] = {
"value": value,
"comment": sub_comment_text,
}
else:
member_info["array_elements"][path] = {
"value": value
} # Guardar como dict simple si no hay comment
members_data.append(member_info)
return members_data
# --- NUEVA FUNCIÓN: Adaptación dinámica de namespaces ---
def adapt_namespaces(root):
"""Actualiza dinámicamente los valores en el diccionario global `ns` para que
coincidan con los namespaces reales presentes en el XML exportado por TIA.
Debe llamarse después de obtener la raíz (`root = tree.getroot()`). Si en el
XML aparecen nuevas versiones (p.ej. v6) de los URIs, esta función las
detectará y sobreescribirá las entradas correspondientes en `ns`.
"""
if root is None:
return # nada que hacer
detected = {}
# 1) Examinar los namespaces declarados en la raíz (cuando existan)
if hasattr(root, "nsmap") and root.nsmap:
for uri in root.nsmap.values():
if not uri or "siemens.com/automation" not in str(uri):
continue
_assign_uri_to_prefix(str(uri), detected)
# 2) Escaneo rápido por elementos clave si aún no hemos encontrado URIs
# Utilizamos búsquedas sin namespace (local-name) para localizar el primer
# elemento relevante y extraer su URI real.
# helper interno
def find_uri_by_localname(tag_local):
elem = root.xpath(f'//*[local-name()="{tag_local}"]')
if elem:
return etree.QName(elem[0]).namespace
return None
if "flg" not in detected or not detected.get("flg"):
flg_uri = find_uri_by_localname("FlgNet")
if flg_uri:
detected["flg"] = flg_uri
if "st" not in detected or not detected.get("st"):
st_uri = find_uri_by_localname("StructuredText")
if st_uri:
detected["st"] = st_uri
if "stl" not in detected or not detected.get("stl"):
stl_uri = find_uri_by_localname("StatementList")
if stl_uri:
detected["stl"] = stl_uri
if "iface" not in detected or not detected.get("iface"):
iface_uri = find_uri_by_localname("Sections")
if iface_uri and "/Interface/" in iface_uri:
detected["iface"] = iface_uri
if detected:
ns.update(detected)
# --- función auxiliar privada para adapt_namespaces ---
def _assign_uri_to_prefix(uri_str: str, out_dict: dict):
"""Asigna un URI concreto al prefijo adecuado en `out_dict`."""
if "/Interface/" in uri_str:
out_dict["iface"] = uri_str
elif "/NetworkSource/FlgNet/" in uri_str:
out_dict["flg"] = uri_str
elif "/NetworkSource/StructuredText/" in uri_str:
out_dict["st"] = uri_str
elif "/NetworkSource/StatementList/" in uri_str:
out_dict["stl"] = uri_str