336 lines
13 KiB
Python
336 lines
13 KiB
Python
import os
|
|
import sys
|
|
import traceback
|
|
|
|
# Import lxml
|
|
from lxml import etree as ET
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
|
|
# --- Configuration ---
|
|
# (No changes needed here)
|
|
|
|
|
|
# CORRECTED function from v6
|
|
def extract_aml_data_v7(root):
|
|
"""Extracts device information using lxml and local-name() XPath. (Corrected)"""
|
|
project_data = {"devices": {}, "connections": []} # Use dict for devices
|
|
|
|
instance_hierarchies = root.xpath(".//*[local-name()='InstanceHierarchy']")
|
|
if not instance_hierarchies:
|
|
print("ERROR: Could not find 'InstanceHierarchy'.")
|
|
return project_data
|
|
ih = instance_hierarchies[0]
|
|
print(f"Processing InstanceHierarchy: {ih.get('Name', 'N/A')}")
|
|
|
|
internal_elements = ih.xpath(".//*[local-name()='InternalElement']")
|
|
print(f"Found {len(internal_elements)} InternalElement(s). Analyzing...")
|
|
|
|
# --- Device Loop ---
|
|
for elem in internal_elements:
|
|
elem_id = elem.get("ID", None)
|
|
if not elem_id:
|
|
continue
|
|
|
|
device_info = {
|
|
"name": elem.get("Name", "N/A"),
|
|
"id": elem_id,
|
|
"class": "N/A",
|
|
"type_identifier": "N/A",
|
|
"order_number": "N/A",
|
|
"type_name": "N/A",
|
|
"firmware_version": "N/A", # Added firmware field
|
|
"position": elem.get("PositionNumber", "N/A"),
|
|
"attributes": {},
|
|
"interfaces": [],
|
|
"network_nodes": [],
|
|
"io_addresses": [],
|
|
}
|
|
|
|
# Get Device Class/Type
|
|
class_tag = elem.xpath("./*[local-name()='SystemUnitClass']")
|
|
if class_tag:
|
|
device_info["class"] = class_tag[0].get(
|
|
"Path", elem.get("RefBaseSystemUnitPath", "N/A")
|
|
)
|
|
else:
|
|
device_info["class"] = elem.get("RefBaseSystemUnitPath", "N/A")
|
|
|
|
# Extract Attributes
|
|
attributes = elem.xpath(
|
|
"./*[local-name()='Attribute']"
|
|
) # Direct attributes first
|
|
if not attributes:
|
|
attributes = elem.xpath(".//*[local-name()='Attribute']") # Fallback nested
|
|
|
|
for attr in attributes:
|
|
attr_name = attr.get("Name", "")
|
|
# Get text value of the direct child 'Value' tag
|
|
value_elem = attr.xpath(
|
|
"./*[local-name()='Value']/text()"
|
|
) # CORRECT variable name
|
|
attr_value = (
|
|
value_elem[0] if value_elem else ""
|
|
) # USE CORRECT variable name here
|
|
|
|
# Store common identifying attributes directly
|
|
if attr_name == "TypeIdentifier":
|
|
device_info["type_identifier"] = attr_value
|
|
if "OrderNumber:" in attr_value:
|
|
device_info["order_number"] = attr_value.split("OrderNumber:")[-1]
|
|
elif attr_name == "TypeName":
|
|
device_info["type_name"] = attr_value
|
|
elif attr_name == "FirmwareVersion":
|
|
device_info["firmware_version"] = attr_value
|
|
|
|
# Store all attributes for reference
|
|
device_info["attributes"][attr_name] = attr_value
|
|
|
|
# Extract Detailed IO Addresses
|
|
if attr_name == "Address":
|
|
address_parts = attr.xpath("./*[local-name()='Attribute']")
|
|
for part in address_parts:
|
|
addr_details = {
|
|
"area": part.get("Name", "?"),
|
|
"start": "N/A",
|
|
"length": "N/A",
|
|
"type": "N/A",
|
|
}
|
|
start_val = part.xpath(
|
|
"./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()"
|
|
)
|
|
len_val = part.xpath(
|
|
"./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()"
|
|
)
|
|
type_val = part.xpath(
|
|
"./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()"
|
|
)
|
|
if start_val:
|
|
addr_details["start"] = start_val[0]
|
|
if len_val:
|
|
addr_details["length"] = len_val[0]
|
|
if type_val:
|
|
addr_details["type"] = type_val[0]
|
|
if addr_details["start"] != "N/A":
|
|
device_info["io_addresses"].append(addr_details)
|
|
|
|
# Extract External Interfaces
|
|
interfaces = elem.xpath("./*[local-name()='ExternalInterface']")
|
|
for interface in interfaces:
|
|
interface_info = {
|
|
"name": interface.get("Name", "N/A"),
|
|
"id": interface.get("ID", "N/A"),
|
|
"ref_base_class": interface.get("RefBaseClassPath", "N/A"),
|
|
}
|
|
device_info["interfaces"].append(interface_info)
|
|
|
|
# Extract Network Nodes
|
|
network_nodes = elem.xpath(
|
|
".//*[local-name()='InternalElement']/*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]"
|
|
)
|
|
for node_role in network_nodes:
|
|
node_elem = node_role.getparent()
|
|
node_info = {
|
|
"name": node_elem.get("Name", "N/A"),
|
|
"type": "N/A",
|
|
"address": "N/A",
|
|
}
|
|
type_attr = node_elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()"
|
|
)
|
|
addr_attr = node_elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()"
|
|
)
|
|
if type_attr:
|
|
node_info["type"] = type_attr[0]
|
|
if addr_attr:
|
|
node_info["address"] = addr_attr[0]
|
|
if node_info["address"] != "N/A":
|
|
device_info["network_nodes"].append(node_info)
|
|
|
|
project_data["devices"][elem_id] = device_info
|
|
|
|
# Find and process InternalLinks
|
|
internal_links = root.xpath(".//*[local-name()='InternalLink']")
|
|
print(f"Found {len(internal_links)} InternalLink(s) globally.")
|
|
for link in internal_links:
|
|
side_a_match = re.match(r"([^:]+):?(.*)", link.get("RefPartnerSideA", ""))
|
|
side_b_match = re.match(r"([^:]+):?(.*)", link.get("RefPartnerSideB", ""))
|
|
side_a_id = side_a_match.group(1) if side_a_match else "N/A"
|
|
side_a_suffix = (
|
|
side_a_match.group(2)
|
|
if side_a_match and side_a_match.group(2)
|
|
else side_a_id
|
|
)
|
|
side_b_id = side_b_match.group(1) if side_b_match else "N/A"
|
|
side_b_suffix = (
|
|
side_b_match.group(2)
|
|
if side_b_match and side_b_match.group(2)
|
|
else side_b_id
|
|
)
|
|
|
|
link_info = {
|
|
"name": link.get("Name", "N/A"),
|
|
"side_a_id": side_a_id,
|
|
"side_a_ref_suffix": side_a_suffix,
|
|
"side_b_id": side_b_id,
|
|
"side_b_ref_suffix": side_b_suffix,
|
|
"side_a_device_name": project_data["devices"]
|
|
.get(side_a_id, {})
|
|
.get("name", side_a_id),
|
|
"side_b_device_name": project_data["devices"]
|
|
.get(side_b_id, {})
|
|
.get("name", side_b_id),
|
|
}
|
|
project_data["connections"].append(link_info)
|
|
|
|
return project_data
|
|
|
|
|
|
# --- generate_markdown_obsidian function remains the same as in v6 ---
|
|
def generate_markdown_obsidian(project_data, md_file_path):
|
|
"""Generates structured Markdown output for Obsidian."""
|
|
|
|
def generate_table(headers, rows):
|
|
lines = []
|
|
if not rows:
|
|
return ["No data available."]
|
|
lines.append("| " + " | ".join(headers) + " |")
|
|
lines.append("|" + "---|" * len(headers))
|
|
for row in rows:
|
|
lines.append("| " + " | ".join(map(str, row)) + " |")
|
|
return lines
|
|
|
|
markdown_lines = ["# Project Hardware & IO Summary (from CAx Export)", ""]
|
|
network_rows = []
|
|
for device_id, device in project_data["devices"].items():
|
|
for node in device.get("network_nodes", []):
|
|
network_rows.append(
|
|
[
|
|
device.get("name", "N/A"),
|
|
node.get("name", "N/A"),
|
|
node.get("type", "N/A"),
|
|
node.get("address", "N/A"),
|
|
]
|
|
)
|
|
markdown_lines.append("## Network Configuration")
|
|
markdown_lines.extend(
|
|
generate_table(
|
|
["Parent Device", "Interface/Node Name", "Type", "Address (IP/DP)"],
|
|
network_rows,
|
|
)
|
|
)
|
|
markdown_lines.append("")
|
|
|
|
io_module_rows = []
|
|
for device_id, device in project_data["devices"].items():
|
|
if device.get("io_addresses"):
|
|
address_strs = [
|
|
f"{addr['type']} Start:{addr['start']} Len:{addr['length']} (Area:{addr['area']})"
|
|
for addr in device["io_addresses"]
|
|
]
|
|
io_module_rows.append(
|
|
[
|
|
device.get("name", "N/A"),
|
|
device.get("type_name", "N/A"),
|
|
device.get("order_number", "N/A"),
|
|
device.get("position", "N/A"),
|
|
"<br>".join(address_strs),
|
|
]
|
|
)
|
|
markdown_lines.append("## I/O Modules & Addresses")
|
|
markdown_lines.extend(
|
|
generate_table(
|
|
[
|
|
"Module Name",
|
|
"Type Name",
|
|
"Order Number",
|
|
"Slot/Pos",
|
|
"Logical Addresses",
|
|
],
|
|
io_module_rows,
|
|
)
|
|
)
|
|
markdown_lines.append("")
|
|
|
|
connection_rows = []
|
|
for i, conn in enumerate(project_data.get("connections", [])):
|
|
if i >= 50: # Limit links shown in MD
|
|
connection_rows.append(["...", "...", "..."])
|
|
break
|
|
source = f"{conn.get('side_a_device_name', 'UNKNOWN')}::{conn.get('side_a_ref_suffix', 'N/A')}"
|
|
target = f"{conn.get('side_b_device_name', 'UNKNOWN')}::{conn.get('side_b_ref_suffix', 'N/A')}"
|
|
connection_rows.append([conn.get("name", "N/A"), f"`{source}`", f"`{target}`"])
|
|
markdown_lines.append("## Connections / IO Tag Links")
|
|
markdown_lines.extend(
|
|
generate_table(
|
|
[
|
|
"Link Name",
|
|
"Source (Device::Channel/Interface)",
|
|
"Target (Device::Tag/Interface)",
|
|
],
|
|
connection_rows,
|
|
)
|
|
)
|
|
markdown_lines.append("")
|
|
|
|
try:
|
|
with open(md_file_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(markdown_lines))
|
|
print(f"Markdown summary written to: {md_file_path}")
|
|
except Exception as e:
|
|
print(f"ERROR writing Markdown file {md_file_path}: {e}")
|
|
|
|
|
|
# --- process_aml_file_v7 function calls the corrected extraction ---
|
|
def process_aml_file_v7(aml_file_path, json_output_path, md_output_path):
|
|
"""Main function using lxml with local-name() and corrected extraction."""
|
|
print(f"Processing AML file: {aml_file_path}")
|
|
if not os.path.exists(aml_file_path):
|
|
print(f"ERROR: Input AML file not found at {aml_file_path}")
|
|
return
|
|
|
|
try:
|
|
parser = ET.XMLParser(remove_blank_text=True)
|
|
tree = ET.parse(aml_file_path, parser)
|
|
root = tree.getroot()
|
|
project_data = extract_aml_data_v7(root) # Call corrected function
|
|
|
|
print(f"Generating JSON output: {json_output_path}")
|
|
try:
|
|
with open(json_output_path, "w", encoding="utf-8") as f:
|
|
json.dump(project_data, f, indent=4, default=str)
|
|
print(f"JSON data written successfully.")
|
|
except Exception as e:
|
|
print(f"ERROR writing JSON file {json_output_path}: {e}")
|
|
|
|
generate_markdown_obsidian(
|
|
project_data, md_output_path
|
|
) # Use the same MD generator
|
|
|
|
except ET.LxmlError as xml_err:
|
|
print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}")
|
|
except Exception as e:
|
|
print(f"ERROR processing AML file {aml_file_path}: {e}")
|
|
traceback.print_exc()
|
|
|
|
|
|
# --- Main Execution ---
|
|
if __name__ == "__main__":
|
|
print("--- AML (CAx Export) to JSON and Obsidian MD Converter (v7 - Corrected) ---")
|
|
input_aml_file = "SAE196_c0.2.XML_CAx_Export.xml"
|
|
input_path = Path(input_aml_file)
|
|
if not input_path.is_file():
|
|
print(f"ERROR: Input file '{input_aml_file}' not found.")
|
|
sys.exit(1)
|
|
output_json_file = input_path.with_suffix(".detailed.json")
|
|
output_md_file = input_path.with_name(f"{input_path.stem}_Obsidian_Summary.md")
|
|
print(f"Input AML: {input_path}")
|
|
print(f"Output JSON: {output_json_file}")
|
|
print(f"Output Markdown: {output_md_file}")
|
|
process_aml_file_v7(
|
|
str(input_path), str(output_json_file), str(output_md_file)
|
|
) # Call v7
|
|
print("\nScript finished.")
|