1053 lines
49 KiB
Python
1053 lines
49 KiB
Python
"""
|
|
export_io_from_CAx :
|
|
Script que sirve para exraer los IOs de un proyecto de TIA Portal y
|
|
generar un archivo Markdown con la información.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import tkinter as tk
|
|
from tkinter import filedialog
|
|
import traceback
|
|
from lxml import etree as ET
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
import math # Needed for ceil
|
|
|
|
script_root = os.path.dirname(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
|
)
|
|
sys.path.append(script_root)
|
|
from backend.script_utils import load_configuration
|
|
|
|
|
|
# --- extract_aml_data function (Unchanged from v15) ---
|
|
def extract_aml_data(root):
|
|
"""(v15 logic - Unchanged) Extracts data, correcting PLC network association lookup."""
|
|
data = {
|
|
"plcs": {},
|
|
"networks": {},
|
|
"devices": {},
|
|
"links_by_source": {},
|
|
"links_by_target": {},
|
|
"connections": [],
|
|
}
|
|
device_id_to_parent_device = {}
|
|
all_elements = root.xpath(".//*[local-name()='InternalElement']")
|
|
print(
|
|
f"Pass 1: Found {len(all_elements)} InternalElement(s). Populating device dictionary..."
|
|
)
|
|
# (Pass 1 logic remains unchanged)
|
|
for elem in all_elements:
|
|
elem_id = elem.get("ID", None)
|
|
if not elem_id:
|
|
continue
|
|
device_info = {
|
|
"name": elem.get("Name", "N/A"),
|
|
"id": elem_id,
|
|
"class": "N/A",
|
|
"type_identifier": "N/A",
|
|
"order_number": "N/A",
|
|
"type_name": "N/A",
|
|
"firmware_version": "N/A",
|
|
"position": elem.get("PositionNumber", "N/A"),
|
|
"attributes": {},
|
|
"interfaces": [],
|
|
"network_nodes": [],
|
|
"io_addresses": [],
|
|
"children_ids": [
|
|
c.get("ID")
|
|
for c in elem.xpath("./*[local-name()='InternalElement']")
|
|
if c.get("ID")
|
|
],
|
|
"parent_id": (
|
|
elem.xpath("parent::*[local-name()='InternalElement']/@ID")[0]
|
|
if elem.xpath("parent::*[local-name()='InternalElement']")
|
|
else None
|
|
),
|
|
}
|
|
class_tag = elem.xpath("./*[local-name()='SystemUnitClass']")
|
|
device_info["class"] = (
|
|
class_tag[0].get("Path", elem.get("RefBaseSystemUnitPath", "N/A"))
|
|
if class_tag
|
|
else elem.get("RefBaseSystemUnitPath", "N/A")
|
|
)
|
|
attributes = elem.xpath("./*[local-name()='Attribute']")
|
|
for attr in attributes:
|
|
attr_name = attr.get("Name", "")
|
|
value_elem = attr.xpath("./*[local-name()='Value']/text()")
|
|
attr_value = value_elem[0] if value_elem else ""
|
|
device_info["attributes"][attr_name] = attr_value
|
|
if attr_name == "TypeIdentifier":
|
|
device_info["type_identifier"] = attr_value
|
|
if "OrderNumber:" in attr_value:
|
|
device_info["order_number"] = attr_value.split("OrderNumber:")[
|
|
-1
|
|
].strip()
|
|
elif attr_name == "TypeName":
|
|
device_info["type_name"] = attr_value
|
|
elif attr_name == "FirmwareVersion":
|
|
device_info["firmware_version"] = attr_value
|
|
elif attr_name == "Address":
|
|
address_parts = attr.xpath("./*[local-name()='Attribute']")
|
|
for part in address_parts:
|
|
addr_details = {
|
|
"area": part.get("Name", "?"),
|
|
"start": "N/A",
|
|
"length": "N/A",
|
|
"type": "N/A",
|
|
}
|
|
start_val = part.xpath(
|
|
"./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()"
|
|
)
|
|
len_val = part.xpath(
|
|
"./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()"
|
|
)
|
|
type_val = part.xpath(
|
|
"./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()"
|
|
)
|
|
if start_val:
|
|
addr_details["start"] = start_val[0]
|
|
if len_val:
|
|
addr_details["length"] = len_val[0]
|
|
if type_val:
|
|
addr_details["type"] = type_val[0]
|
|
if addr_details["start"] != "N/A":
|
|
device_info["io_addresses"].append(addr_details)
|
|
interfaces = elem.xpath("./*[local-name()='ExternalInterface']")
|
|
for interface in interfaces:
|
|
device_info["interfaces"].append(
|
|
{
|
|
"name": interface.get("Name", "N/A"),
|
|
"id": interface.get("ID", "N/A"),
|
|
"ref_base_class": interface.get("RefBaseClassPath", "N/A"),
|
|
}
|
|
)
|
|
network_node_elements = elem.xpath(
|
|
"./*[local-name()='InternalElement'][*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]]"
|
|
)
|
|
if not network_node_elements and elem.xpath(
|
|
"./*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]"
|
|
):
|
|
network_node_elements = [elem]
|
|
for node_elem in network_node_elements:
|
|
node_id = node_elem.get("ID")
|
|
if not node_id:
|
|
continue
|
|
node_info = {
|
|
"id": node_id,
|
|
"name": node_elem.get("Name", device_info["name"]),
|
|
"type": "N/A",
|
|
"address": "N/A",
|
|
}
|
|
type_attr = node_elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()"
|
|
)
|
|
addr_attr = node_elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()"
|
|
)
|
|
if type_attr:
|
|
node_info["type"] = type_attr[0]
|
|
if addr_attr:
|
|
node_info["address"] = addr_attr[0]
|
|
if node_info["address"] == "N/A":
|
|
parent_addr_attr = elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()"
|
|
)
|
|
if parent_addr_attr:
|
|
node_info["address"] = parent_addr_attr[0]
|
|
if node_info["type"] == "N/A":
|
|
parent_type_attr = elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()"
|
|
)
|
|
if parent_type_attr:
|
|
node_info["type"] = parent_type_attr[0]
|
|
if node_info["address"] != "N/A":
|
|
len_attr = node_elem.xpath(
|
|
"./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()"
|
|
)
|
|
node_info["length"] = len_attr[0] if len_attr else "N/A"
|
|
device_info["network_nodes"].append(node_info)
|
|
device_id_to_parent_device[node_id] = elem_id
|
|
data["devices"][elem_id] = device_info
|
|
|
|
print("Pass 2: Identifying PLCs and Networks (Refined v2)...")
|
|
plc_ids_found = set()
|
|
elem_map = {elem.get("ID"): elem for elem in all_elements if elem.get("ID")}
|
|
for dev_id, device in data["devices"].items():
|
|
is_plc = False
|
|
plc_order_prefixes = [
|
|
"6ES7 516-3FP03",
|
|
"6ES7 151",
|
|
"6ES7 31",
|
|
"6ES7 41",
|
|
"6ES7 51",
|
|
]
|
|
if any(
|
|
device.get("order_number", "N/A").startswith(prefix)
|
|
for prefix in plc_order_prefixes
|
|
):
|
|
is_plc = True
|
|
elif (
|
|
"CPU" in device.get("type_name", "").upper()
|
|
or "PLC" in device.get("type_name", "").upper()
|
|
):
|
|
is_plc = True
|
|
if is_plc:
|
|
parent_id = device.get("parent_id")
|
|
is_child_of_plc = False
|
|
current_parent = parent_id
|
|
while current_parent:
|
|
if current_parent in plc_ids_found:
|
|
is_child_of_plc = True
|
|
break
|
|
current_parent = (
|
|
data["devices"].get(current_parent, {}).get("parent_id")
|
|
)
|
|
if not is_child_of_plc:
|
|
if dev_id not in plc_ids_found:
|
|
print(
|
|
f" Identified PLC: {device['name']} ({dev_id}) - Type: {device.get('type_name', 'N/A')} OrderNo: {device.get('order_number', 'N/A')}"
|
|
)
|
|
device["connected_networks"] = {}
|
|
data["plcs"][dev_id] = device
|
|
plc_ids_found.add(dev_id)
|
|
is_network = False
|
|
net_type = "Unknown"
|
|
elem = elem_map.get(dev_id)
|
|
if elem is not None:
|
|
role_classes = elem.xpath(
|
|
"./*[local-name()='SupportedRoleClass']/@RefRoleClassPath"
|
|
)
|
|
is_subnet_by_role = any("SUBNET" in rc.upper() for rc in role_classes)
|
|
if is_subnet_by_role:
|
|
is_network = True
|
|
for rc in role_classes:
|
|
rc_upper = rc.upper()
|
|
if "PROFINET" in rc_upper or "ETHERNET" in rc_upper:
|
|
net_type = "Ethernet/Profinet"
|
|
break
|
|
elif "PROFIBUS" in rc_upper:
|
|
net_type = "Profibus"
|
|
break
|
|
if net_type == "Unknown":
|
|
if "PROFIBUS" in device["name"].upper():
|
|
net_type = "Profibus"
|
|
elif (
|
|
"ETHERNET" in device["name"].upper()
|
|
or "PROFINET" in device["name"].upper()
|
|
):
|
|
net_type = "Ethernet/Profinet"
|
|
if is_network:
|
|
if dev_id not in data["networks"]:
|
|
print(
|
|
f" Identified Network: {device['name']} ({dev_id}) Type: {net_type}"
|
|
)
|
|
data["networks"][dev_id] = {
|
|
"name": device["name"],
|
|
"type": net_type,
|
|
"devices_on_net": {},
|
|
}
|
|
|
|
print("Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...")
|
|
internal_links = root.xpath(".//*[local-name()='InternalLink']")
|
|
print(f"Found {len(internal_links)} InternalLink(s).")
|
|
conn_id_counter = 0
|
|
for link in internal_links:
|
|
conn_id_counter += 1
|
|
link_name = link.get("Name", f"Link_{conn_id_counter}")
|
|
side_a_ref = link.get("RefPartnerSideA", "")
|
|
side_b_ref = link.get("RefPartnerSideB", "")
|
|
side_a_match = re.match(r"([^:]+):?(.*)", side_a_ref)
|
|
side_b_match = re.match(r"([^:]+):?(.*)", side_b_ref)
|
|
side_a_id = side_a_match.group(1) if side_a_match else "N/A"
|
|
side_a_suffix = (
|
|
side_a_match.group(2)
|
|
if side_a_match and side_a_match.group(2)
|
|
else side_a_id
|
|
)
|
|
side_b_id = side_b_match.group(1) if side_b_match else "N/A"
|
|
side_b_suffix = (
|
|
side_b_match.group(2)
|
|
if side_b_match and side_b_match.group(2)
|
|
else side_b_id
|
|
)
|
|
network_id, device_node_id = None, None
|
|
side_a_is_network = side_a_id in data["networks"]
|
|
side_b_is_network = side_b_id in data["networks"]
|
|
if side_a_is_network and not side_b_is_network:
|
|
network_id, device_node_id = side_a_id, side_b_id
|
|
elif side_b_is_network and not side_a_is_network:
|
|
network_id, device_node_id = side_b_id, side_a_id
|
|
elif side_a_is_network and side_b_is_network:
|
|
continue
|
|
elif not side_a_is_network and not side_b_is_network:
|
|
pass
|
|
if network_id and device_node_id:
|
|
linked_device_id = device_id_to_parent_device.get(device_node_id)
|
|
if not linked_device_id and device_node_id in data["devices"]:
|
|
linked_device_id = device_node_id
|
|
if linked_device_id and linked_device_id in data["devices"]:
|
|
device_info = data["devices"].get(linked_device_id)
|
|
if not device_info:
|
|
continue
|
|
address = "N/A"
|
|
node_info_for_addr = data["devices"].get(device_node_id, {})
|
|
for node in node_info_for_addr.get("network_nodes", []):
|
|
if node.get("id") == device_node_id:
|
|
address = node.get("address", "N/A")
|
|
break
|
|
if address == "N/A":
|
|
address = node_info_for_addr.get("attributes", {}).get(
|
|
"NetworkAddress", "N/A"
|
|
)
|
|
if address == "N/A":
|
|
address = device_info.get("attributes", {}).get(
|
|
"NetworkAddress", "N/A"
|
|
)
|
|
node_name_for_log = node_info_for_addr.get("name", device_node_id)
|
|
print(
|
|
f" Mapping Device/Node '{node_name_for_log}' (NodeID:{device_node_id}, Addr:{address}) to Network '{data['networks'][network_id]['name']}'"
|
|
)
|
|
data["networks"][network_id]["devices_on_net"][device_node_id] = address
|
|
potential_plc_id = None
|
|
interface_id = None
|
|
interface_info = None
|
|
node_check_info = data["devices"].get(device_node_id)
|
|
if node_check_info:
|
|
if device_node_id in data["plcs"]:
|
|
potential_plc_id = device_node_id
|
|
else:
|
|
interface_id = node_check_info.get("parent_id")
|
|
if interface_id and interface_id in data["devices"]:
|
|
interface_info = data["devices"].get(interface_id)
|
|
if interface_info:
|
|
if interface_id in data["plcs"]:
|
|
potential_plc_id = interface_id
|
|
elif (
|
|
interface_info.get("parent_id")
|
|
and interface_info["parent_id"] in data["plcs"]
|
|
):
|
|
potential_plc_id = interface_info["parent_id"]
|
|
if potential_plc_id:
|
|
plc_object = data["plcs"][potential_plc_id]
|
|
if "connected_networks" not in plc_object:
|
|
plc_object["connected_networks"] = {}
|
|
if network_id not in plc_object.get("connected_networks", {}):
|
|
print(
|
|
f" --> Associating Network '{data['networks'][network_id]['name']}' with PLC '{plc_object.get('name', 'Unknown PLC')}' (via Node '{node_name_for_log}' Addr: {address})"
|
|
)
|
|
data["plcs"][potential_plc_id]["connected_networks"][
|
|
network_id
|
|
] = address
|
|
elif (
|
|
plc_object["connected_networks"][network_id] == "N/A"
|
|
and address != "N/A"
|
|
):
|
|
print(
|
|
f" --> Updating address for Network '{data['networks'][network_id]['name']}' on PLC '{plc_object.get('name', 'Unknown PLC')}' to: {address}"
|
|
)
|
|
data["plcs"][potential_plc_id]["connected_networks"][
|
|
network_id
|
|
] = address
|
|
else:
|
|
print(
|
|
f" Warning: Could not map linked device ID {linked_device_id} (from NodeID {device_node_id}) to any known device."
|
|
)
|
|
continue
|
|
if not network_id: # Generic links
|
|
source_id, source_suffix, target_id, target_suffix = (
|
|
side_a_id,
|
|
side_a_suffix,
|
|
side_b_id,
|
|
side_b_suffix,
|
|
)
|
|
if ("Channel" in side_b_suffix or "Parameter" in side_b_suffix) and (
|
|
"Channel" not in side_a_suffix and "Parameter" not in side_a_suffix
|
|
):
|
|
source_id, source_suffix, target_id, target_suffix = (
|
|
side_b_id,
|
|
side_b_suffix,
|
|
side_a_id,
|
|
side_a_suffix,
|
|
)
|
|
if source_id != "N/A" and target_id != "N/A":
|
|
if source_id not in data["links_by_source"]:
|
|
data["links_by_source"][source_id] = []
|
|
if target_id not in data["links_by_target"]:
|
|
data["links_by_target"][target_id] = []
|
|
link_detail = {
|
|
"name": link_name,
|
|
"source_id": source_id,
|
|
"source_suffix": source_suffix,
|
|
"target_id": target_id,
|
|
"target_suffix": target_suffix,
|
|
"source_device_name": data["devices"]
|
|
.get(source_id, {})
|
|
.get("name", source_id),
|
|
"target_device_name": data["devices"]
|
|
.get(target_id, {})
|
|
.get("name", target_id),
|
|
}
|
|
data["links_by_source"][source_id].append(link_detail)
|
|
data["links_by_target"][target_id].append(link_detail)
|
|
data["connections"].append(link_detail)
|
|
print("Data extraction and structuring complete.")
|
|
return data
|
|
|
|
|
|
# --- Helper Function for Recursive IO Search (Unchanged from v20) ---
|
|
def find_io_recursively(device_id, project_data):
|
|
"""Recursively finds all IO addresses under a given device ID."""
|
|
io_list = []
|
|
device_info = project_data.get("devices", {}).get(device_id)
|
|
if not device_info:
|
|
return io_list
|
|
if device_info.get("io_addresses"):
|
|
for addr in device_info["io_addresses"]:
|
|
io_list.append(
|
|
{
|
|
"module_name": device_info.get("name", device_id),
|
|
"module_pos": device_info.get("position", "N/A"),
|
|
**addr,
|
|
}
|
|
)
|
|
children_ids = device_info.get("children_ids", [])
|
|
for child_id in children_ids:
|
|
if child_id != device_id: # Basic loop prevention
|
|
io_list.extend(find_io_recursively(child_id, project_data))
|
|
return io_list
|
|
|
|
|
|
# --- generate_markdown_tree function (v26 - Final Cleaned Version) ---
|
|
def generate_markdown_tree(project_data, md_file_path):
|
|
"""(v26) Generates final hierarchical Markdown with aesthetic improvements."""
|
|
markdown_lines = ["# Project Hardware & IO Summary (Tree View v26)", ""]
|
|
|
|
if not project_data or not project_data.get("plcs"):
|
|
markdown_lines.append("*No PLC identified in the project data.*")
|
|
try:
|
|
with open(md_file_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(markdown_lines))
|
|
print(f"\nMarkdown summary written to: {md_file_path}")
|
|
except Exception as e:
|
|
print(f"ERROR writing Markdown file {md_file_path}: {e}")
|
|
return
|
|
|
|
markdown_lines.append(f"Identified {len(project_data['plcs'])} PLC(s).")
|
|
|
|
for plc_id, plc_info in project_data.get("plcs", {}).items():
|
|
markdown_lines.append(f"\n## PLC: {plc_info.get('name', plc_id)}")
|
|
type_name = plc_info.get("type_name", "N/A")
|
|
order_num = plc_info.get("order_number", "N/A")
|
|
firmware = plc_info.get("firmware_version", "N/A")
|
|
if type_name and type_name != "N/A":
|
|
markdown_lines.append(f"- **Type Name:** `{type_name}`")
|
|
if order_num and order_num != "N/A":
|
|
markdown_lines.append(f"- **Order Number:** `{order_num}`")
|
|
if firmware and firmware != "N/A":
|
|
markdown_lines.append(f"- **Firmware:** `{firmware}`")
|
|
# ID removed
|
|
|
|
plc_networks = plc_info.get("connected_networks", {})
|
|
markdown_lines.append("\n- **Networks:**")
|
|
if not plc_networks:
|
|
markdown_lines.append(
|
|
" - *No network connections found associated with this PLC object.*"
|
|
)
|
|
else:
|
|
sorted_network_items = sorted(
|
|
plc_networks.items(),
|
|
key=lambda item: project_data.get("networks", {})
|
|
.get(item[0], {})
|
|
.get("name", item[0]),
|
|
)
|
|
|
|
for net_id, plc_addr_on_net in sorted_network_items:
|
|
net_info = project_data.get("networks", {}).get(net_id)
|
|
if not net_info:
|
|
markdown_lines.append(
|
|
f" - !!! Error: Network info missing for ID {net_id} !!!"
|
|
)
|
|
continue
|
|
|
|
markdown_lines.append(
|
|
f" - ### {net_info.get('name', net_id)} ({net_info.get('type', 'Unknown')})"
|
|
)
|
|
markdown_lines.append(
|
|
f" - PLC Address on this Net: `{plc_addr_on_net}`"
|
|
)
|
|
markdown_lines.append(f" - **Devices on Network:**")
|
|
|
|
devices_on_this_net = net_info.get("devices_on_net", {})
|
|
|
|
def sort_key(item):
|
|
node_id, node_addr = item
|
|
try:
|
|
parts = [int(p) for p in re.findall(r"\d+", node_addr)]
|
|
return parts
|
|
except:
|
|
return [float("inf")]
|
|
|
|
plc_interface_and_node_ids = set()
|
|
for node in plc_info.get("network_nodes", []):
|
|
plc_interface_and_node_ids.add(node["id"])
|
|
interface_id = (
|
|
project_data["devices"].get(node["id"], {}).get("parent_id")
|
|
)
|
|
if interface_id:
|
|
plc_interface_and_node_ids.add(interface_id)
|
|
plc_interface_and_node_ids.add(plc_id)
|
|
|
|
other_device_items = sorted(
|
|
[
|
|
(node_id, node_addr)
|
|
for node_id, node_addr in devices_on_this_net.items()
|
|
if node_id not in plc_interface_and_node_ids
|
|
],
|
|
key=sort_key,
|
|
)
|
|
|
|
if not other_device_items:
|
|
markdown_lines.append(" - *None (besides PLC interfaces)*")
|
|
else:
|
|
# --- Display Logic with Sibling IO Aggregation & Aesthetics ---
|
|
for node_id, node_addr in other_device_items:
|
|
node_info = project_data.get("devices", {}).get(node_id)
|
|
if not node_info:
|
|
markdown_lines.append(
|
|
f" - !!! Error: Node info missing for ID {node_id} Addr: {node_addr} !!!"
|
|
)
|
|
continue
|
|
|
|
interface_id = node_info.get("parent_id")
|
|
interface_info = None
|
|
actual_device_id = None
|
|
actual_device_info = None
|
|
rack_id = None
|
|
rack_info = None
|
|
if interface_id:
|
|
interface_info = project_data.get("devices", {}).get(
|
|
interface_id
|
|
)
|
|
if interface_info:
|
|
actual_device_id = interface_info.get("parent_id")
|
|
if actual_device_id:
|
|
actual_device_info = project_data.get(
|
|
"devices", {}
|
|
).get(actual_device_id)
|
|
if actual_device_info:
|
|
potential_rack_id = actual_device_info.get(
|
|
"parent_id"
|
|
)
|
|
if potential_rack_id:
|
|
potential_rack_info = project_data.get(
|
|
"devices", {}
|
|
).get(potential_rack_id)
|
|
if potential_rack_info and (
|
|
"Rack"
|
|
in potential_rack_info.get("name", "")
|
|
or potential_rack_info.get("position")
|
|
is None
|
|
):
|
|
rack_id = potential_rack_id
|
|
rack_info = potential_rack_info
|
|
|
|
display_info_title = (
|
|
actual_device_info
|
|
if actual_device_info
|
|
else (interface_info if interface_info else node_info)
|
|
)
|
|
display_id_title = (
|
|
actual_device_id
|
|
if actual_device_info
|
|
else (interface_id if interface_info else node_id)
|
|
)
|
|
|
|
io_search_root_id = (
|
|
actual_device_id
|
|
if actual_device_info
|
|
else (interface_id if interface_info else node_id)
|
|
)
|
|
io_search_root_info = project_data.get("devices", {}).get(
|
|
io_search_root_id
|
|
)
|
|
|
|
# Construct Title
|
|
display_name = display_info_title.get("name", display_id_title)
|
|
via_node_name = node_info.get("name", node_id)
|
|
title_str = f"#### {display_name}"
|
|
if display_id_title != node_id:
|
|
title_str += f" (via {via_node_name} @ `{node_addr}`)"
|
|
else:
|
|
title_str += f" (@ `{node_addr}`)"
|
|
markdown_lines.append(f" - {title_str}")
|
|
|
|
# Display Basic Details
|
|
markdown_lines.append(
|
|
f" - Address (on net): `{node_addr}`"
|
|
)
|
|
type_name_disp = display_info_title.get("type_name", "N/A")
|
|
order_num_disp = display_info_title.get("order_number", "N/A")
|
|
pos_disp = display_info_title.get("position", "N/A")
|
|
if type_name_disp and type_name_disp != "N/A":
|
|
markdown_lines.append(
|
|
f" - Type Name: `{type_name_disp}`"
|
|
)
|
|
if order_num_disp and order_num_disp != "N/A":
|
|
markdown_lines.append(
|
|
f" - Order No: `{order_num_disp}`"
|
|
)
|
|
if pos_disp and pos_disp != "N/A":
|
|
markdown_lines.append(
|
|
f" - Pos (in parent): `{pos_disp}`"
|
|
)
|
|
ultimate_parent_id = rack_id
|
|
if not ultimate_parent_id and actual_device_info:
|
|
ultimate_parent_id = actual_device_info.get("parent_id")
|
|
if (
|
|
ultimate_parent_id
|
|
and ultimate_parent_id != display_id_title
|
|
):
|
|
ultimate_parent_info = project_data.get("devices", {}).get(
|
|
ultimate_parent_id
|
|
)
|
|
ultimate_parent_name = (
|
|
ultimate_parent_info.get("name", "?")
|
|
if ultimate_parent_info
|
|
else "?"
|
|
)
|
|
markdown_lines.append(
|
|
f" - Parent Structure: `{ultimate_parent_name}`"
|
|
) # Removed ID here
|
|
|
|
# --- IO Aggregation Logic (from v24) ---
|
|
aggregated_io_addresses = []
|
|
parent_structure_id = (
|
|
io_search_root_info.get("parent_id")
|
|
if io_search_root_info
|
|
else None
|
|
)
|
|
io_search_root_name_disp = (
|
|
io_search_root_info.get("name", "?")
|
|
if io_search_root_info
|
|
else "?"
|
|
)
|
|
|
|
if parent_structure_id:
|
|
parent_structure_info = project_data.get("devices", {}).get(
|
|
parent_structure_id
|
|
)
|
|
parent_structure_name = (
|
|
parent_structure_info.get("name", "?")
|
|
if parent_structure_info
|
|
else "?"
|
|
)
|
|
search_title = f"parent '{parent_structure_name}'"
|
|
sibling_found_io = False
|
|
for dev_scan_id, dev_scan_info in project_data.get(
|
|
"devices", {}
|
|
).items():
|
|
if (
|
|
dev_scan_info.get("parent_id")
|
|
== parent_structure_id
|
|
):
|
|
io_from_sibling = find_io_recursively(
|
|
dev_scan_id, project_data
|
|
)
|
|
if io_from_sibling:
|
|
aggregated_io_addresses.extend(io_from_sibling)
|
|
sibling_found_io = True
|
|
if (
|
|
not sibling_found_io and not aggregated_io_addresses
|
|
): # Only show message if list still empty
|
|
markdown_lines.append(
|
|
f" - *No IO Addresses found in modules under {search_title} (ID: {parent_structure_id}).*"
|
|
)
|
|
|
|
elif io_search_root_id:
|
|
search_title = f"'{io_search_root_name_disp}'"
|
|
aggregated_io_addresses = find_io_recursively(
|
|
io_search_root_id, project_data
|
|
)
|
|
if not aggregated_io_addresses:
|
|
markdown_lines.append(
|
|
f" - *No IO Addresses found in modules under {search_title} (ID: {io_search_root_id}).*"
|
|
)
|
|
else:
|
|
markdown_lines.append(
|
|
f" - *Could not determine structure to search for IO addresses.*"
|
|
)
|
|
# --- End IO Aggregation ---
|
|
|
|
# Display aggregated IO Addresses with Siemens format (Cleaned)
|
|
if aggregated_io_addresses:
|
|
markdown_lines.append(
|
|
f" - **IO Addresses (Aggregated from Structure):**"
|
|
) # Removed redundant search root name
|
|
sorted_agg_io = sorted(
|
|
aggregated_io_addresses,
|
|
key=lambda x: (
|
|
(
|
|
int(x.get("module_pos", "9999"))
|
|
if x.get("module_pos", "9999").isdigit()
|
|
else 9999
|
|
),
|
|
x.get("module_name", ""),
|
|
x.get("type", ""),
|
|
(
|
|
int(x.get("start", "0"))
|
|
if x.get("start", "0").isdigit()
|
|
else float("inf")
|
|
),
|
|
),
|
|
)
|
|
last_module_id_key = None
|
|
for addr_info in sorted_agg_io:
|
|
current_module_id_key = (
|
|
addr_info.get("module_name", "?"),
|
|
addr_info.get("module_pos", "?"),
|
|
)
|
|
if current_module_id_key != last_module_id_key:
|
|
markdown_lines.append(
|
|
f" - **From Module:** {addr_info.get('module_name','?')} (Pos: {addr_info.get('module_pos','?')})"
|
|
)
|
|
last_module_id_key = current_module_id_key
|
|
|
|
# --- Siemens IO Formatting (from v25.1 - keep fixes) ---
|
|
io_type = addr_info.get("type", "?")
|
|
start_str = addr_info.get("start", "?")
|
|
length_str = addr_info.get("length", "?")
|
|
area_str = addr_info.get("area", "?")
|
|
siemens_addr = f"FMT_ERROR" # Default error
|
|
length_bits = 0
|
|
try:
|
|
start_byte = int(start_str)
|
|
length_bits = int(length_str)
|
|
length_bytes = math.ceil(
|
|
length_bits / 8.0
|
|
) # Use float division
|
|
if length_bits > 0 and length_bytes == 0:
|
|
length_bytes = 1 # Handle len < 8 bits
|
|
end_byte = start_byte + length_bytes - 1
|
|
prefix = "P?"
|
|
if io_type.lower() == "input":
|
|
prefix = "EW"
|
|
elif io_type.lower() == "output":
|
|
prefix = "AW"
|
|
siemens_addr = f"{prefix} {start_byte}..{end_byte}"
|
|
except Exception: # Catch any error during calc/format
|
|
siemens_addr = (
|
|
f"FMT_ERROR({start_str},{length_str})"
|
|
)
|
|
|
|
markdown_lines.append(
|
|
f" - `{siemens_addr}` (Len={length_bits} bits)" # Simplified output
|
|
)
|
|
# --- End Siemens IO Formatting ---
|
|
|
|
# IO Connections logic remains the same...
|
|
links_from = project_data.get("links_by_source", {}).get(
|
|
display_id_title, []
|
|
)
|
|
links_to = project_data.get("links_by_target", {}).get(
|
|
display_id_title, []
|
|
)
|
|
io_conns = []
|
|
for link in links_from:
|
|
if "channel" in link["source_suffix"].lower():
|
|
target_str = f"{link.get('target_device_name', link['target_id'])}:{link['target_suffix']}"
|
|
if link["target_id"] == display_id_title:
|
|
target_str = link["target_suffix"]
|
|
io_conns.append(
|
|
f"`{link['source_suffix']}` → `{target_str}`"
|
|
)
|
|
for link in links_to:
|
|
if "channel" in link["target_suffix"].lower():
|
|
source_str = f"{link.get('source_device_name', link['source_id'])}:{link['source_suffix']}"
|
|
if link["source_id"] == display_id_title:
|
|
source_str = link["source_suffix"]
|
|
io_conns.append(
|
|
f"`{source_str}` → `{link['target_suffix']}`"
|
|
)
|
|
if io_conns:
|
|
markdown_lines.append(
|
|
f" - **IO Connections (Channels):**"
|
|
)
|
|
for conn in sorted(list(set(io_conns))):
|
|
markdown_lines.append(f" - {conn}")
|
|
markdown_lines.append("") # Spacing
|
|
# --- *** END Display Logic *** ---
|
|
|
|
try:
|
|
with open(md_file_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(markdown_lines))
|
|
print(f"\nMarkdown summary written to: {md_file_path}")
|
|
except Exception as e:
|
|
print(f"ERROR writing Markdown file {md_file_path}: {e}")
|
|
traceback.print_exc()
|
|
|
|
|
|
# --- generate_io_upward_tree function (Unchanged from v23) ---
|
|
def generate_io_upward_tree(project_data, md_file_path):
|
|
"""(v23) Generates a debug tree starting from IO addresses upwards."""
|
|
markdown_lines = ["# IO Address Upward Connection Trace (Debug v23)", ""]
|
|
if not project_data or not project_data.get("devices"):
|
|
markdown_lines.append("*No device data found.*")
|
|
try:
|
|
with open(md_file_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(markdown_lines))
|
|
print(f"\nIO upward debug tree written to: {md_file_path}")
|
|
except Exception as e:
|
|
print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}")
|
|
return
|
|
node_to_network_map = {}
|
|
for net_id, net_info in project_data.get("networks", {}).items():
|
|
net_name = net_info.get("name", "?")
|
|
for node_id, node_addr in net_info.get("devices_on_net", {}).items():
|
|
node_to_network_map[node_id] = (net_id, net_name, node_addr)
|
|
devices_with_io = []
|
|
for dev_id, dev_info in project_data.get("devices", {}).items():
|
|
if dev_info.get("io_addresses"):
|
|
devices_with_io.append((dev_id, dev_info))
|
|
if not devices_with_io:
|
|
markdown_lines.append("*No devices with defined IO Addresses found.*")
|
|
else:
|
|
markdown_lines.append(
|
|
f"Found {len(devices_with_io)} device(s)/module(s) with IO addresses. Tracing connections upwards:\n"
|
|
)
|
|
devices_with_io.sort(
|
|
key=lambda item: (
|
|
(
|
|
int(item[1].get("position", "9999"))
|
|
if item[1].get("position", "9999").isdigit()
|
|
else 9999
|
|
),
|
|
item[1].get("name", ""),
|
|
)
|
|
)
|
|
for dev_id, dev_info in devices_with_io:
|
|
markdown_lines.append(
|
|
f"## IO Module: {dev_info.get('name', dev_id)} (ID: {dev_id})"
|
|
)
|
|
markdown_lines.append(f"- Position: {dev_info.get('position', 'N/A')}")
|
|
markdown_lines.append("- IO Addresses:")
|
|
for addr in sorted(
|
|
dev_info["io_addresses"],
|
|
key=lambda x: (
|
|
x.get("type", ""),
|
|
(
|
|
int(x.get("start", "0"))
|
|
if x.get("start", "0").isdigit()
|
|
else float("inf")
|
|
),
|
|
),
|
|
):
|
|
markdown_lines.append(
|
|
f" - `{addr.get('type','?').ljust(6)} Start={addr.get('start','?').ljust(4)} Len={addr.get('length','?').ljust(3)}` (Area: {addr.get('area','?')})"
|
|
)
|
|
markdown_lines.append("- Upward Path:")
|
|
current_id = dev_id
|
|
current_info = dev_info
|
|
indent = " "
|
|
path_found = False
|
|
ancestor_limit = 15
|
|
count = 0
|
|
while current_id and count < ancestor_limit:
|
|
ancestor_name = current_info.get("name", "?") if current_info else "?"
|
|
ancestor_pos = (
|
|
current_info.get("position", "N/A") if current_info else "N/A"
|
|
)
|
|
markdown_lines.append(
|
|
f"{indent}└─ {ancestor_name} (ID: {current_id}, Pos: {ancestor_pos})"
|
|
)
|
|
if current_id in node_to_network_map:
|
|
net_id, net_name, node_addr = node_to_network_map[current_id]
|
|
markdown_lines.append(f"{indent} └─ **Network Connection Point**")
|
|
markdown_lines.append(
|
|
f"{indent} - Node: {ancestor_name} (ID: {current_id})"
|
|
)
|
|
markdown_lines.append(
|
|
f"{indent} - Network: {net_name} (ID: {net_id})"
|
|
)
|
|
markdown_lines.append(f"{indent} - Address: `{node_addr}`")
|
|
plc_connection_found = False
|
|
for plc_id_check, plc_info_check in project_data.get(
|
|
"plcs", {}
|
|
).items():
|
|
if net_id in plc_info_check.get("connected_networks", {}):
|
|
markdown_lines.append(
|
|
f"{indent} - **Network associated with PLC:** {plc_info_check.get('name','?')} (ID: {plc_id_check})"
|
|
)
|
|
plc_connection_found = True
|
|
break
|
|
if not plc_connection_found:
|
|
markdown_lines.append(
|
|
f"{indent} - *Network not directly associated with a known PLC in data.*"
|
|
)
|
|
path_found = True
|
|
break
|
|
if current_id in project_data.get("plcs", {}):
|
|
markdown_lines.append(f"{indent} └─ **Is PLC:** {ancestor_name}")
|
|
path_found = True
|
|
break
|
|
parent_id = current_info.get("parent_id") if current_info else None
|
|
if parent_id:
|
|
current_info = project_data.get("devices", {}).get(parent_id)
|
|
if not current_info:
|
|
markdown_lines.append(
|
|
f"{indent} └─ Parent ID {parent_id} not found. Stopping trace."
|
|
)
|
|
break
|
|
current_id = parent_id
|
|
indent += " "
|
|
else:
|
|
markdown_lines.append(
|
|
f"{indent} └─ Reached top level (no parent)."
|
|
)
|
|
break
|
|
count += 1
|
|
if count >= ancestor_limit:
|
|
markdown_lines.append(
|
|
f"{indent} └─ Reached ancestor limit. Stopping trace."
|
|
)
|
|
if not path_found:
|
|
markdown_lines.append(
|
|
f"{indent}└─ *Could not trace path to a known Network Node or PLC.*"
|
|
)
|
|
markdown_lines.append("")
|
|
try:
|
|
with open(md_file_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(markdown_lines))
|
|
print(f"\nIO upward debug tree written to: {md_file_path}")
|
|
except Exception as e:
|
|
print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}")
|
|
|
|
|
|
# --- process_aml_file function (unchanged from v22) ---
|
|
def process_aml_file(
|
|
aml_file_path, json_output_path, md_output_path, md_upward_output_path
|
|
):
|
|
# (Unchanged)
|
|
print(f"Processing AML file: {aml_file_path}")
|
|
if not os.path.exists(aml_file_path):
|
|
print(f"ERROR: Input AML file not found at {aml_file_path}")
|
|
return
|
|
try:
|
|
parser = ET.XMLParser(remove_blank_text=True, huge_tree=True)
|
|
tree = ET.parse(aml_file_path, parser)
|
|
root = tree.getroot()
|
|
project_data = extract_aml_data(root) # v15 extraction
|
|
print(f"Generating JSON output: {json_output_path}")
|
|
try:
|
|
with open(json_output_path, "w", encoding="utf-8") as f:
|
|
json.dump(project_data, f, indent=4, default=str)
|
|
print(f"JSON data written successfully.")
|
|
except Exception as e:
|
|
print(f"ERROR writing JSON file {json_output_path}: {e}")
|
|
traceback.print_exc()
|
|
generate_markdown_tree(project_data, md_output_path) # v26 MD generation
|
|
generate_io_upward_tree(
|
|
project_data, md_upward_output_path
|
|
) # v23 upward generation
|
|
except ET.LxmlError as xml_err:
|
|
print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}")
|
|
traceback.print_exc()
|
|
except Exception as e:
|
|
print(f"ERROR processing AML file {aml_file_path}: {e}")
|
|
traceback.print_exc()
|
|
|
|
|
|
def select_cax_file(initial_dir=None): # Add initial_dir parameter
|
|
"""Opens a dialog to select a CAx (XML) export file, starting in the specified directory."""
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
file_path = filedialog.askopenfilename(
|
|
title="Select CAx Export File (AML)",
|
|
filetypes=[ ("AML Files", "*.aml"), ("All Files", "*.*")], # Added AML
|
|
initialdir=initial_dir # Set the initial directory
|
|
)
|
|
root.destroy()
|
|
if not file_path:
|
|
print("No CAx file selected. Exiting.")
|
|
sys.exit(0)
|
|
return file_path
|
|
|
|
|
|
def select_output_directory():
|
|
"""Opens a dialog to select the output directory."""
|
|
root = tk.Tk()
|
|
root.withdraw()
|
|
dir_path = filedialog.askdirectory(
|
|
title="Select Output Directory for JSON and MD files" # Updated title slightly
|
|
)
|
|
root.destroy()
|
|
if not dir_path:
|
|
print("No output directory selected. Exiting.")
|
|
sys.exit(0)
|
|
return dir_path
|
|
|
|
|
|
# --- Main Execution ---
|
|
if __name__ == "__main__":
|
|
configs = load_configuration()
|
|
working_directory = configs.get("working_directory")
|
|
|
|
script_version = "v28 - Working Directory Integration" # Updated version
|
|
print(
|
|
f"--- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter ({script_version}) ---"
|
|
)
|
|
|
|
# Validate working directory
|
|
if not working_directory or not os.path.isdir(working_directory):
|
|
print("ERROR: Working directory not set or invalid in configuration.")
|
|
print("Attempting to use script's directory as fallback.")
|
|
# Fallback to script's directory or current directory if needed
|
|
working_directory = os.path.dirname(os.path.abspath(__file__))
|
|
if not os.path.isdir(working_directory):
|
|
working_directory = os.getcwd()
|
|
print(f"Using fallback directory: {working_directory}")
|
|
# Optionally, prompt user to select a working directory here if critical
|
|
# output_dir = select_output_directory() # Keep this if you want user selection on failure
|
|
|
|
# Use working_directory as the output directory
|
|
output_dir = working_directory
|
|
print(f"Using Working Directory for Output: {output_dir}")
|
|
|
|
# 1. Select Input CAx File, starting in the working directory
|
|
# Pass working_directory to the selection function
|
|
cax_file_path = select_cax_file(initial_dir=working_directory)
|
|
|
|
# Convert paths to Path objects
|
|
input_path = Path(cax_file_path)
|
|
output_path = Path(output_dir) # Output path is the working directory
|
|
|
|
# Check if input file exists
|
|
if not input_path.is_file():
|
|
print(f"ERROR: Input file '{input_path}' not found or is not a file.")
|
|
sys.exit(1)
|
|
|
|
# Ensure output directory exists (redundant if working_directory is valid, but safe)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Construct output file paths within the selected output directory (working_directory)
|
|
output_json_file = output_path / input_path.with_suffix(".hierarchical.json").name
|
|
output_md_file = output_path / input_path.with_name(f"{input_path.stem}_Hardware_Tree.md") # Simplified name
|
|
output_md_upward_file = output_path / input_path.with_name(f"{input_path.stem}_IO_Upward_Debug.md") # Simplified name
|
|
|
|
print(f"Input AML: {input_path.resolve()}")
|
|
print(f"Output Directory: {output_path.resolve()}")
|
|
print(f"Output JSON: {output_json_file.resolve()}")
|
|
print(f"Output Main Tree MD: {output_md_file.resolve()}")
|
|
print(f"Output IO Debug Tree MD: {output_md_upward_file.resolve()}")
|
|
|
|
# Process the selected file and save outputs to the selected directory
|
|
process_aml_file(
|
|
str(input_path),
|
|
str(output_json_file),
|
|
str(output_md_file),
|
|
str(output_md_upward_file),
|
|
)
|
|
|
|
print("\nScript finished.") |