AutoExportFromTia_Openness_CAx/x1.py

336 lines
13 KiB
Python

import os
import sys
import traceback
# Import lxml
from lxml import etree as ET
import json
from pathlib import Path
import re
# --- Configuration ---
# (No changes needed here)
# CORRECTED function from v6
def extract_aml_data_v7(root):
"""Extracts device information using lxml and local-name() XPath. (Corrected)"""
project_data = {"devices": {}, "connections": []} # Use dict for devices
instance_hierarchies = root.xpath(".//*[local-name()='InstanceHierarchy']")
if not instance_hierarchies:
print("ERROR: Could not find 'InstanceHierarchy'.")
return project_data
ih = instance_hierarchies[0]
print(f"Processing InstanceHierarchy: {ih.get('Name', 'N/A')}")
internal_elements = ih.xpath(".//*[local-name()='InternalElement']")
print(f"Found {len(internal_elements)} InternalElement(s). Analyzing...")
# --- Device Loop ---
for elem in internal_elements:
elem_id = elem.get("ID", None)
if not elem_id:
continue
device_info = {
"name": elem.get("Name", "N/A"),
"id": elem_id,
"class": "N/A",
"type_identifier": "N/A",
"order_number": "N/A",
"type_name": "N/A",
"firmware_version": "N/A", # Added firmware field
"position": elem.get("PositionNumber", "N/A"),
"attributes": {},
"interfaces": [],
"network_nodes": [],
"io_addresses": [],
}
# Get Device Class/Type
class_tag = elem.xpath("./*[local-name()='SystemUnitClass']")
if class_tag:
device_info["class"] = class_tag[0].get(
"Path", elem.get("RefBaseSystemUnitPath", "N/A")
)
else:
device_info["class"] = elem.get("RefBaseSystemUnitPath", "N/A")
# Extract Attributes
attributes = elem.xpath(
"./*[local-name()='Attribute']"
) # Direct attributes first
if not attributes:
attributes = elem.xpath(".//*[local-name()='Attribute']") # Fallback nested
for attr in attributes:
attr_name = attr.get("Name", "")
# Get text value of the direct child 'Value' tag
value_elem = attr.xpath(
"./*[local-name()='Value']/text()"
) # CORRECT variable name
attr_value = (
value_elem[0] if value_elem else ""
) # USE CORRECT variable name here
# Store common identifying attributes directly
if attr_name == "TypeIdentifier":
device_info["type_identifier"] = attr_value
if "OrderNumber:" in attr_value:
device_info["order_number"] = attr_value.split("OrderNumber:")[-1]
elif attr_name == "TypeName":
device_info["type_name"] = attr_value
elif attr_name == "FirmwareVersion":
device_info["firmware_version"] = attr_value
# Store all attributes for reference
device_info["attributes"][attr_name] = attr_value
# Extract Detailed IO Addresses
if attr_name == "Address":
address_parts = attr.xpath("./*[local-name()='Attribute']")
for part in address_parts:
addr_details = {
"area": part.get("Name", "?"),
"start": "N/A",
"length": "N/A",
"type": "N/A",
}
start_val = part.xpath(
"./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()"
)
len_val = part.xpath(
"./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()"
)
type_val = part.xpath(
"./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()"
)
if start_val:
addr_details["start"] = start_val[0]
if len_val:
addr_details["length"] = len_val[0]
if type_val:
addr_details["type"] = type_val[0]
if addr_details["start"] != "N/A":
device_info["io_addresses"].append(addr_details)
# Extract External Interfaces
interfaces = elem.xpath("./*[local-name()='ExternalInterface']")
for interface in interfaces:
interface_info = {
"name": interface.get("Name", "N/A"),
"id": interface.get("ID", "N/A"),
"ref_base_class": interface.get("RefBaseClassPath", "N/A"),
}
device_info["interfaces"].append(interface_info)
# Extract Network Nodes
network_nodes = elem.xpath(
".//*[local-name()='InternalElement']/*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]"
)
for node_role in network_nodes:
node_elem = node_role.getparent()
node_info = {
"name": node_elem.get("Name", "N/A"),
"type": "N/A",
"address": "N/A",
}
type_attr = node_elem.xpath(
"./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()"
)
addr_attr = node_elem.xpath(
"./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()"
)
if type_attr:
node_info["type"] = type_attr[0]
if addr_attr:
node_info["address"] = addr_attr[0]
if node_info["address"] != "N/A":
device_info["network_nodes"].append(node_info)
project_data["devices"][elem_id] = device_info
# Find and process InternalLinks
internal_links = root.xpath(".//*[local-name()='InternalLink']")
print(f"Found {len(internal_links)} InternalLink(s) globally.")
for link in internal_links:
side_a_match = re.match(r"([^:]+):?(.*)", link.get("RefPartnerSideA", ""))
side_b_match = re.match(r"([^:]+):?(.*)", link.get("RefPartnerSideB", ""))
side_a_id = side_a_match.group(1) if side_a_match else "N/A"
side_a_suffix = (
side_a_match.group(2)
if side_a_match and side_a_match.group(2)
else side_a_id
)
side_b_id = side_b_match.group(1) if side_b_match else "N/A"
side_b_suffix = (
side_b_match.group(2)
if side_b_match and side_b_match.group(2)
else side_b_id
)
link_info = {
"name": link.get("Name", "N/A"),
"side_a_id": side_a_id,
"side_a_ref_suffix": side_a_suffix,
"side_b_id": side_b_id,
"side_b_ref_suffix": side_b_suffix,
"side_a_device_name": project_data["devices"]
.get(side_a_id, {})
.get("name", side_a_id),
"side_b_device_name": project_data["devices"]
.get(side_b_id, {})
.get("name", side_b_id),
}
project_data["connections"].append(link_info)
return project_data
# --- generate_markdown_obsidian function remains the same as in v6 ---
def generate_markdown_obsidian(project_data, md_file_path):
"""Generates structured Markdown output for Obsidian."""
def generate_table(headers, rows):
lines = []
if not rows:
return ["No data available."]
lines.append("| " + " | ".join(headers) + " |")
lines.append("|" + "---|" * len(headers))
for row in rows:
lines.append("| " + " | ".join(map(str, row)) + " |")
return lines
markdown_lines = ["# Project Hardware & IO Summary (from CAx Export)", ""]
network_rows = []
for device_id, device in project_data["devices"].items():
for node in device.get("network_nodes", []):
network_rows.append(
[
device.get("name", "N/A"),
node.get("name", "N/A"),
node.get("type", "N/A"),
node.get("address", "N/A"),
]
)
markdown_lines.append("## Network Configuration")
markdown_lines.extend(
generate_table(
["Parent Device", "Interface/Node Name", "Type", "Address (IP/DP)"],
network_rows,
)
)
markdown_lines.append("")
io_module_rows = []
for device_id, device in project_data["devices"].items():
if device.get("io_addresses"):
address_strs = [
f"{addr['type']} Start:{addr['start']} Len:{addr['length']} (Area:{addr['area']})"
for addr in device["io_addresses"]
]
io_module_rows.append(
[
device.get("name", "N/A"),
device.get("type_name", "N/A"),
device.get("order_number", "N/A"),
device.get("position", "N/A"),
"<br>".join(address_strs),
]
)
markdown_lines.append("## I/O Modules & Addresses")
markdown_lines.extend(
generate_table(
[
"Module Name",
"Type Name",
"Order Number",
"Slot/Pos",
"Logical Addresses",
],
io_module_rows,
)
)
markdown_lines.append("")
connection_rows = []
for i, conn in enumerate(project_data.get("connections", [])):
if i >= 50: # Limit links shown in MD
connection_rows.append(["...", "...", "..."])
break
source = f"{conn.get('side_a_device_name', 'UNKNOWN')}::{conn.get('side_a_ref_suffix', 'N/A')}"
target = f"{conn.get('side_b_device_name', 'UNKNOWN')}::{conn.get('side_b_ref_suffix', 'N/A')}"
connection_rows.append([conn.get("name", "N/A"), f"`{source}`", f"`{target}`"])
markdown_lines.append("## Connections / IO Tag Links")
markdown_lines.extend(
generate_table(
[
"Link Name",
"Source (Device::Channel/Interface)",
"Target (Device::Tag/Interface)",
],
connection_rows,
)
)
markdown_lines.append("")
try:
with open(md_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(markdown_lines))
print(f"Markdown summary written to: {md_file_path}")
except Exception as e:
print(f"ERROR writing Markdown file {md_file_path}: {e}")
# --- process_aml_file_v7 function calls the corrected extraction ---
def process_aml_file_v7(aml_file_path, json_output_path, md_output_path):
"""Main function using lxml with local-name() and corrected extraction."""
print(f"Processing AML file: {aml_file_path}")
if not os.path.exists(aml_file_path):
print(f"ERROR: Input AML file not found at {aml_file_path}")
return
try:
parser = ET.XMLParser(remove_blank_text=True)
tree = ET.parse(aml_file_path, parser)
root = tree.getroot()
project_data = extract_aml_data_v7(root) # Call corrected function
print(f"Generating JSON output: {json_output_path}")
try:
with open(json_output_path, "w", encoding="utf-8") as f:
json.dump(project_data, f, indent=4, default=str)
print(f"JSON data written successfully.")
except Exception as e:
print(f"ERROR writing JSON file {json_output_path}: {e}")
generate_markdown_obsidian(
project_data, md_output_path
) # Use the same MD generator
except ET.LxmlError as xml_err:
print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}")
except Exception as e:
print(f"ERROR processing AML file {aml_file_path}: {e}")
traceback.print_exc()
# --- Main Execution ---
if __name__ == "__main__":
print("--- AML (CAx Export) to JSON and Obsidian MD Converter (v7 - Corrected) ---")
input_aml_file = "SAE196_c0.2.XML_CAx_Export.xml"
input_path = Path(input_aml_file)
if not input_path.is_file():
print(f"ERROR: Input file '{input_aml_file}' not found.")
sys.exit(1)
output_json_file = input_path.with_suffix(".detailed.json")
output_md_file = input_path.with_name(f"{input_path.stem}_Obsidian_Summary.md")
print(f"Input AML: {input_path}")
print(f"Output JSON: {output_json_file}")
print(f"Output Markdown: {output_md_file}")
process_aml_file_v7(
str(input_path), str(output_json_file), str(output_md_file)
) # Call v7
print("\nScript finished.")