1059 lines
45 KiB
Python
1059 lines
45 KiB
Python
import sys
|
|
import os
|
|
from lxml import etree
|
|
|
|
|
|
def debug_print(message, enabled=True):
|
|
"""Print debug messages if enabled."""
|
|
if enabled:
|
|
print(f"DEBUG: {message}")
|
|
|
|
|
|
def parse_siemens_lad_to_scl(xml_file, debug=True):
|
|
"""
|
|
Parse a Siemens LAD/FUP XML file and convert to SCL using lxml.
|
|
|
|
Args:
|
|
xml_file: Path to the Siemens XML file
|
|
debug: Enable debug output
|
|
|
|
Returns:
|
|
String containing the SCL equivalent code
|
|
"""
|
|
try:
|
|
# Parse the XML file with lxml
|
|
parser = etree.XMLParser(remove_blank_text=True)
|
|
tree = etree.parse(xml_file, parser)
|
|
root = tree.getroot()
|
|
|
|
# Extract namespaces
|
|
nsmap = root.nsmap
|
|
debug_print(f"Namespaces: {nsmap}", debug)
|
|
|
|
# Create namespace dictionary for XPath queries
|
|
ns = {}
|
|
default_ns = nsmap.get(None, "")
|
|
if default_ns:
|
|
ns["d"] = default_ns
|
|
|
|
# Add other namespaces
|
|
for prefix, uri in nsmap.items():
|
|
if prefix is not None:
|
|
ns[prefix] = uri
|
|
|
|
debug_print(f"Namespace dictionary: {ns}", debug)
|
|
|
|
# Find the block type (FC, FB, OB)
|
|
block = None
|
|
block_type = None
|
|
for block_tag in ["SW.Blocks.FC", "SW.Blocks.FB", "SW.Blocks.OB"]:
|
|
# Try with and without namespace
|
|
block = root.find(f".//{block_tag}")
|
|
if block is None and "d" in ns:
|
|
block = root.find(f".//d:{block_tag}", namespaces=ns)
|
|
|
|
if block is not None:
|
|
block_type = block_tag.split(".")[-1]
|
|
debug_print(f"Found block of type {block_type}", debug)
|
|
break
|
|
|
|
if block is None:
|
|
return "Error: No supported block (FC, FB, OB) found in the XML file."
|
|
|
|
# Extract block information
|
|
block_name = block.find(".//Name").text
|
|
block_number = block.find(".//Number").text
|
|
programming_language = block.find(".//ProgrammingLanguage").text
|
|
|
|
debug_print(
|
|
f"Block name: {block_name}, Number: {block_number}, Language: {programming_language}",
|
|
debug,
|
|
)
|
|
|
|
# Start SCL code generation
|
|
scl_code = [
|
|
f"// SCL equivalent of {programming_language} block: {block_name} ({block_type} {block_number})"
|
|
]
|
|
|
|
# Extract block title
|
|
title_element = block.find(".//MultilingualText[@CompositionName='Title']")
|
|
if title_element is not None:
|
|
# Try to find title in any available language
|
|
for text_item in title_element.findall(".//MultilingualTextItem"):
|
|
culture_elem = text_item.find(".//Culture")
|
|
text_elem = text_item.find(".//Text")
|
|
|
|
if text_elem is not None and text_elem.text:
|
|
culture = (
|
|
culture_elem.text if culture_elem is not None else "unknown"
|
|
)
|
|
debug_print(f"Found title in {culture}: {text_elem.text}", debug)
|
|
scl_code.append(f"// Title: {text_elem.text}")
|
|
break
|
|
|
|
# Initialize variable lists
|
|
input_vars = []
|
|
output_vars = []
|
|
inout_vars = []
|
|
temp_vars = []
|
|
static_vars = []
|
|
|
|
# Extract interface information
|
|
interface_elem = block.find(".//Interface")
|
|
if interface_elem is not None:
|
|
interface_text = interface_elem.text
|
|
if interface_text:
|
|
try:
|
|
# Parse interface definition to extract variables
|
|
interface_xml = etree.fromstring(interface_text)
|
|
|
|
# Process each section in the interface
|
|
sections = interface_xml.findall(".//Section")
|
|
for section in sections:
|
|
section_name = section.get("Name")
|
|
members = section.findall("Member")
|
|
|
|
# Store variables by section
|
|
for member in members:
|
|
var_name = member.get("Name")
|
|
var_type = member.get("Datatype")
|
|
|
|
if section_name == "Input":
|
|
input_vars.append((var_name, var_type))
|
|
elif section_name == "Output":
|
|
output_vars.append((var_name, var_type))
|
|
elif section_name == "InOut":
|
|
inout_vars.append((var_name, var_type))
|
|
elif section_name == "Temp":
|
|
temp_vars.append((var_name, var_type))
|
|
elif section_name == "Static":
|
|
static_vars.append((var_name, var_type))
|
|
except Exception as e:
|
|
debug_print(f"Error parsing interface: {str(e)}", debug)
|
|
|
|
# Generate block header with interface
|
|
if block_type == "FC":
|
|
scl_code.append(f'FUNCTION "{block_name}" : VOID')
|
|
elif block_type == "FB":
|
|
scl_code.append(f'FUNCTION_BLOCK "{block_name}"')
|
|
elif block_type == "OB":
|
|
scl_code.append(f'ORGANIZATION_BLOCK "{block_name}"')
|
|
|
|
# Add variable declarations
|
|
if input_vars:
|
|
scl_code.append("VAR_INPUT")
|
|
for name, dtype in input_vars:
|
|
scl_code.append(f" {name} : {dtype};")
|
|
scl_code.append("END_VAR")
|
|
|
|
if output_vars:
|
|
scl_code.append("VAR_OUTPUT")
|
|
for name, dtype in output_vars:
|
|
scl_code.append(f" {name} : {dtype};")
|
|
scl_code.append("END_VAR")
|
|
|
|
if inout_vars:
|
|
scl_code.append("VAR_IN_OUT")
|
|
for name, dtype in inout_vars:
|
|
scl_code.append(f" {name} : {dtype};")
|
|
scl_code.append("END_VAR")
|
|
|
|
if temp_vars:
|
|
scl_code.append("VAR_TEMP")
|
|
for name, dtype in temp_vars:
|
|
scl_code.append(f" {name} : {dtype};")
|
|
scl_code.append("END_VAR")
|
|
|
|
if static_vars and block_type == "FB":
|
|
scl_code.append("VAR")
|
|
for name, dtype in static_vars:
|
|
scl_code.append(f" {name} : {dtype};")
|
|
scl_code.append("END_VAR")
|
|
|
|
# Add BEGIN marker
|
|
scl_code.append("BEGIN")
|
|
|
|
# Process each network
|
|
compile_units = block.findall(".//SW.Blocks.CompileUnit")
|
|
debug_print(f"Found {len(compile_units)} networks", debug)
|
|
|
|
for i, unit in enumerate(compile_units):
|
|
debug_print(f"Processing network {i+1}", debug)
|
|
|
|
# Get network title
|
|
title_element = unit.find(".//MultilingualText[@CompositionName='Title']")
|
|
network_title = f"Network {i+1}"
|
|
|
|
if title_element is not None:
|
|
for text_item in title_element.findall(".//MultilingualTextItem"):
|
|
culture_elem = text_item.find(".//Culture")
|
|
text_elem = text_item.find(".//Text")
|
|
|
|
if text_elem is not None and text_elem.text:
|
|
culture = (
|
|
culture_elem.text if culture_elem is not None else "unknown"
|
|
)
|
|
network_title = f"Network {i+1}: {text_elem.text}"
|
|
debug_print(
|
|
f"Network title ({culture}): {network_title}", debug
|
|
)
|
|
break
|
|
|
|
scl_code.append(f" // {network_title}")
|
|
|
|
# Get network source
|
|
network_source_elem = unit.find(".//NetworkSource")
|
|
if network_source_elem is None:
|
|
debug_print(f"No NetworkSource found for network {i+1}", debug)
|
|
continue
|
|
|
|
# Look for FlgNet - the container for LAD/FUP network logic
|
|
flg_net = None
|
|
|
|
# Try direct search
|
|
flg_net = network_source_elem.find("FlgNet")
|
|
|
|
# Try with namespace
|
|
if flg_net is None and "d" in ns:
|
|
flg_net = network_source_elem.find("d:FlgNet", namespaces=ns)
|
|
|
|
# Try using full XPath with any namespace
|
|
if flg_net is None:
|
|
for elem in network_source_elem:
|
|
if elem.tag.endswith("FlgNet"):
|
|
flg_net = elem
|
|
break
|
|
|
|
if flg_net is None:
|
|
debug_print(f"No FlgNet found for network {i+1}", debug)
|
|
continue
|
|
|
|
# Get FlgNet namespace
|
|
flgnet_ns = {}
|
|
if flg_net.nsmap:
|
|
debug_print(f"FlgNet has its own namespace: {flg_net.nsmap}", debug)
|
|
# Add namespace with 'flg' prefix
|
|
for prefix, uri in flg_net.nsmap.items():
|
|
if prefix is None:
|
|
flgnet_ns["flg"] = uri
|
|
else:
|
|
flgnet_ns[prefix] = uri
|
|
|
|
# Extract Parts and Wires sections - try with proper namespace handling
|
|
parts = None
|
|
wires = None
|
|
|
|
# Try different methods to find Parts and Wires
|
|
# Method 1: Direct access
|
|
parts = flg_net.find("Parts")
|
|
wires = flg_net.find("Wires")
|
|
|
|
# Method 2: With FlgNet namespace
|
|
if (parts is None or wires is None) and flgnet_ns:
|
|
parts = parts or flg_net.find("flg:Parts", namespaces=flgnet_ns)
|
|
wires = wires or flg_net.find("flg:Wires", namespaces=flgnet_ns)
|
|
|
|
# Method 3: Try XPath with any namespace
|
|
if parts is None:
|
|
for child in flg_net:
|
|
if child.tag.endswith("Parts"):
|
|
parts = child
|
|
break
|
|
|
|
if wires is None:
|
|
for child in flg_net:
|
|
if child.tag.endswith("Wires"):
|
|
wires = child
|
|
break
|
|
|
|
if parts is None:
|
|
debug_print("No Parts element found - check XML namespace", debug)
|
|
debug_print(
|
|
f"FlgNet children: {[child.tag for child in flg_net]}", debug
|
|
)
|
|
continue
|
|
|
|
if wires is None:
|
|
debug_print("No Wires element found - check XML namespace", debug)
|
|
continue
|
|
|
|
debug_print(f"Successfully found Parts and Wires elements", debug)
|
|
|
|
# Build dictionaries for Parts and Access elements
|
|
parts_dict = {}
|
|
access_dict = {}
|
|
|
|
# Process all Parts elements (including MOVE, Add, Contact, etc)
|
|
for part in parts.findall("*"):
|
|
if part.tag.endswith("Access"):
|
|
uid = part.get("UId")
|
|
if uid:
|
|
access_dict[uid] = part
|
|
else:
|
|
uid = part.get("UId")
|
|
if uid:
|
|
parts_dict[uid] = part
|
|
|
|
# Process all FlgNet namespace elements too
|
|
if flgnet_ns:
|
|
for part in parts.findall("flg:*", namespaces=flgnet_ns):
|
|
if part.tag.endswith("Access"):
|
|
uid = part.get("UId")
|
|
if uid:
|
|
access_dict[uid] = part
|
|
else:
|
|
uid = part.get("UId")
|
|
if uid:
|
|
parts_dict[uid] = part
|
|
|
|
# Process wires to build connection map
|
|
wire_connections = {}
|
|
for wire in wires.findall("Wire"):
|
|
wire_id = wire.get("UId")
|
|
if not wire_id:
|
|
continue
|
|
|
|
connections = []
|
|
|
|
# Check for powerrail
|
|
has_powerrail = any(child.tag.endswith("Powerrail") for child in wire)
|
|
|
|
# Get all connections in this wire
|
|
for child in wire:
|
|
if child.tag.endswith("NameCon"):
|
|
connections.append(
|
|
{
|
|
"type": "NameCon",
|
|
"uid": child.get("UId"),
|
|
"name": child.get("Name"),
|
|
"has_powerrail": has_powerrail,
|
|
}
|
|
)
|
|
elif child.tag.endswith("IdentCon"):
|
|
connections.append(
|
|
{
|
|
"type": "IdentCon",
|
|
"uid": child.get("UId"),
|
|
"has_powerrail": has_powerrail,
|
|
}
|
|
)
|
|
|
|
wire_connections[wire_id] = connections
|
|
|
|
# Also process FlgNet namespace wires
|
|
if flgnet_ns:
|
|
for wire in wires.findall("flg:Wire", namespaces=flgnet_ns):
|
|
wire_id = wire.get("UId")
|
|
if not wire_id:
|
|
continue
|
|
|
|
connections = []
|
|
|
|
# Check for powerrail
|
|
has_powerrail = any(
|
|
child.tag.endswith("Powerrail") for child in wire
|
|
)
|
|
|
|
# Get all connections in this wire
|
|
for child in wire:
|
|
if child.tag.endswith("NameCon"):
|
|
connections.append(
|
|
{
|
|
"type": "NameCon",
|
|
"uid": child.get("UId"),
|
|
"name": child.get("Name"),
|
|
"has_powerrail": has_powerrail,
|
|
}
|
|
)
|
|
elif child.tag.endswith("IdentCon"):
|
|
connections.append(
|
|
{
|
|
"type": "IdentCon",
|
|
"uid": child.get("UId"),
|
|
"has_powerrail": has_powerrail,
|
|
}
|
|
)
|
|
|
|
wire_connections[wire_id] = connections
|
|
|
|
# Find all contact elements - these are used for conditions
|
|
contacts = {}
|
|
for uid, part in parts_dict.items():
|
|
if part.tag.endswith("Contact") or part.get("Name") == "Contact":
|
|
contacts[uid] = {
|
|
"element": part,
|
|
"condition": None,
|
|
"input_wire": None,
|
|
"output_wire": None,
|
|
}
|
|
|
|
# Connect contacts to their inputs/outputs and variables
|
|
for wire_id, connections in wire_connections.items():
|
|
for conn in connections:
|
|
if conn["type"] == "NameCon" and conn["uid"] in contacts:
|
|
if conn["name"] == "in":
|
|
contacts[conn["uid"]]["input_wire"] = wire_id
|
|
elif conn["name"] == "out":
|
|
contacts[conn["uid"]]["output_wire"] = wire_id
|
|
|
|
# Find the operands for each contact
|
|
for uid, contact in contacts.items():
|
|
for wire_id, connections in wire_connections.items():
|
|
for conn in connections:
|
|
if (
|
|
conn["type"] == "NameCon"
|
|
and conn["uid"] == uid
|
|
and conn["name"] == "operand"
|
|
):
|
|
for conn2 in connections:
|
|
if conn2["type"] == "IdentCon":
|
|
var_uid = conn2["uid"]
|
|
contact["condition"] = var_uid
|
|
|
|
# Get all instruction/operation types from parts
|
|
operations = {}
|
|
for uid, part in parts_dict.items():
|
|
if part.get("Name") == "Contact":
|
|
continue # Skip contacts, we handled them separately
|
|
|
|
part_name = part.get("Name")
|
|
if not part_name:
|
|
continue
|
|
|
|
operations[uid] = {
|
|
"type": part_name,
|
|
"element": part,
|
|
"inputs": {},
|
|
"outputs": {},
|
|
"conditions": [],
|
|
"condition_uids": set(),
|
|
"is_enabled": False,
|
|
}
|
|
|
|
# Find connections between parts - this contains the actual LAD logic
|
|
for wire_id, connections in wire_connections.items():
|
|
# Check if wire has a powerrail (start of ladder rung)
|
|
has_powerrail = any(conn["has_powerrail"] for conn in connections)
|
|
|
|
# Store contact UIDs connected to this wire
|
|
connected_contacts = []
|
|
for conn in connections:
|
|
if conn["type"] == "NameCon" and conn["uid"] in contacts:
|
|
if conn["name"] == "out":
|
|
connected_contacts.append(conn["uid"])
|
|
|
|
# Process connections
|
|
for conn in connections:
|
|
if conn["type"] == "NameCon":
|
|
uid = conn["uid"]
|
|
name = conn["name"]
|
|
|
|
if uid in operations:
|
|
if name == "en":
|
|
# This is an enable input - may be connected to powerrail
|
|
operations[uid]["is_enabled"] |= has_powerrail
|
|
|
|
# Add connected contacts as conditions
|
|
for contact_uid in connected_contacts:
|
|
if contacts[contact_uid]["condition"]:
|
|
operations[uid]["conditions"].append(
|
|
contacts[contact_uid]
|
|
)
|
|
operations[uid]["condition_uids"].add(
|
|
contacts[contact_uid]["condition"]
|
|
)
|
|
|
|
elif name.startswith("in"):
|
|
# Input connection
|
|
operations[uid]["inputs"][name] = wire_id
|
|
elif name.startswith("out"):
|
|
# Output connection
|
|
operations[uid]["outputs"][name] = wire_id
|
|
|
|
# Helper functions for accessing variable/constant values
|
|
def get_access_value(access_uid):
|
|
"""Extract variable name or constant value from an Access element."""
|
|
if access_uid not in access_dict:
|
|
return None
|
|
|
|
access = access_dict[access_uid]
|
|
scope = access.get("Scope")
|
|
|
|
if scope == "LiteralConstant":
|
|
# Extract constant value
|
|
const_elem = access.find(".//Constant")
|
|
if const_elem is None and flgnet_ns:
|
|
const_elem = access.find(
|
|
".//flg:Constant", namespaces=flgnet_ns
|
|
)
|
|
|
|
if const_elem is not None:
|
|
const_value = None
|
|
for child in const_elem:
|
|
if child.tag.endswith("ConstantValue"):
|
|
const_value = child.text
|
|
break
|
|
return const_value
|
|
|
|
elif scope == "GlobalVariable" or scope == "LocalVariable":
|
|
# Extract variable name
|
|
symbol = access.find("Symbol") or (
|
|
access.find("flg:Symbol", namespaces=flgnet_ns)
|
|
if flgnet_ns
|
|
else None
|
|
)
|
|
|
|
if symbol is not None:
|
|
components = []
|
|
for comp in symbol.findall("Component") or (
|
|
symbol.findall("flg:Component", namespaces=flgnet_ns)
|
|
if flgnet_ns
|
|
else []
|
|
):
|
|
components.append(comp.get("Name"))
|
|
|
|
if components:
|
|
return ".".join(components)
|
|
|
|
elif scope == "TypedConstant":
|
|
# Extract typed constant
|
|
const_elem = access.find(".//Constant")
|
|
if const_elem is None and flgnet_ns:
|
|
const_elem = access.find(
|
|
".//flg:Constant", namespaces=flgnet_ns
|
|
)
|
|
|
|
if const_elem is not None:
|
|
const_type = None
|
|
const_value = None
|
|
for child in const_elem:
|
|
if child.tag.endswith("ConstantType"):
|
|
const_type = child.text
|
|
elif child.tag.endswith("ConstantValue"):
|
|
const_value = child.text
|
|
if const_type and const_value:
|
|
return f"{const_type}#{const_value}"
|
|
elif const_value:
|
|
return const_value
|
|
|
|
return None
|
|
|
|
def get_contact_condition(contact_uid):
|
|
"""Get the condition expression for a contact."""
|
|
if contact_uid not in contacts:
|
|
return None
|
|
|
|
condition_uid = contacts[contact_uid]["condition"]
|
|
if not condition_uid:
|
|
return None
|
|
|
|
var_name = get_access_value(condition_uid)
|
|
if not var_name:
|
|
return None
|
|
|
|
return var_name
|
|
|
|
def get_connection_source_uid(wire_id, target_uid, target_port):
|
|
"""Find the source UID connected to the target through the given wire."""
|
|
if wire_id not in wire_connections:
|
|
return None
|
|
|
|
# Find if there's an IdentCon in this wire
|
|
ident_con = None
|
|
name_con_target = None
|
|
|
|
for conn in wire_connections[wire_id]:
|
|
if conn["type"] == "IdentCon":
|
|
ident_con = conn["uid"]
|
|
elif (
|
|
conn["type"] == "NameCon"
|
|
and conn["uid"] == target_uid
|
|
and conn["name"] == target_port
|
|
):
|
|
name_con_target = True
|
|
|
|
if ident_con and name_con_target:
|
|
return ident_con
|
|
|
|
return None
|
|
|
|
# Process operations in the network to generate code
|
|
network_code = []
|
|
|
|
# 1. Process function calls (Call instructions)
|
|
for uid, op in operations.items():
|
|
if op["type"] == "Call":
|
|
# Get call info
|
|
call_info = op["element"].find("CallInfo") or (
|
|
op["element"].find("flg:CallInfo", namespaces=flgnet_ns)
|
|
if flgnet_ns
|
|
else None
|
|
)
|
|
|
|
if call_info is not None:
|
|
function_name = call_info.get("Name")
|
|
block_type = call_info.get("BlockType")
|
|
|
|
if op["is_enabled"]:
|
|
if op["conditions"]:
|
|
# Build condition string using all contacts
|
|
condition_parts = []
|
|
for contact in op["conditions"]:
|
|
var_name = get_contact_condition(
|
|
contact["element"].get("UId")
|
|
)
|
|
if var_name:
|
|
condition_parts.append(var_name)
|
|
|
|
if condition_parts:
|
|
condition_str = " AND ".join(condition_parts)
|
|
network_code.append(f" IF {condition_str} THEN")
|
|
network_code.append(f" {function_name}();")
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(f" {function_name}();")
|
|
else:
|
|
network_code.append(f" {function_name}();")
|
|
else:
|
|
# It's a conditional call, we'll document it
|
|
network_code.append(
|
|
f" // Conditional call to {function_name}()"
|
|
)
|
|
|
|
# 2. Process MOVE operations
|
|
for uid, op in operations.items():
|
|
if op["type"] == "Move":
|
|
if "in" not in op["inputs"] or "out1" not in op["outputs"]:
|
|
continue
|
|
|
|
# Get source and destination
|
|
source_uid = get_connection_source_uid(
|
|
op["inputs"]["in"], uid, "in"
|
|
)
|
|
dest_uid = get_connection_source_uid(
|
|
op["outputs"]["out1"], uid, "out1"
|
|
)
|
|
|
|
if not source_uid or not dest_uid:
|
|
continue
|
|
|
|
source_value = get_access_value(source_uid)
|
|
dest_value = get_access_value(dest_uid)
|
|
|
|
if not source_value or not dest_value:
|
|
continue
|
|
|
|
# Generate assignment code
|
|
if op["is_enabled"]:
|
|
if op["conditions"]:
|
|
# Build condition string using all contacts
|
|
condition_parts = []
|
|
for contact in op["conditions"]:
|
|
var_name = get_contact_condition(
|
|
contact["element"].get("UId")
|
|
)
|
|
if var_name:
|
|
condition_parts.append(var_name)
|
|
|
|
if condition_parts:
|
|
condition_str = " AND ".join(condition_parts)
|
|
network_code.append(f" IF {condition_str} THEN")
|
|
network_code.append(
|
|
f" {dest_value} := {source_value};"
|
|
)
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {source_value};"
|
|
)
|
|
else:
|
|
network_code.append(f" {dest_value} := {source_value};")
|
|
else:
|
|
# It's a conditional assignment, document it
|
|
network_code.append(
|
|
f" // Conditional: {dest_value} := {source_value};"
|
|
)
|
|
|
|
# 3. Process ADD operations
|
|
for uid, op in operations.items():
|
|
if op["type"] == "Add":
|
|
if (
|
|
"in1" not in op["inputs"]
|
|
or "in2" not in op["inputs"]
|
|
or "out" not in op["outputs"]
|
|
):
|
|
continue
|
|
|
|
# Get source operands and destination
|
|
source1_uid = get_connection_source_uid(
|
|
op["inputs"]["in1"], uid, "in1"
|
|
)
|
|
source2_uid = get_connection_source_uid(
|
|
op["inputs"]["in2"], uid, "in2"
|
|
)
|
|
dest_uid = get_connection_source_uid(
|
|
op["outputs"]["out"], uid, "out"
|
|
)
|
|
|
|
if not source1_uid or not source2_uid or not dest_uid:
|
|
continue
|
|
|
|
source1_value = get_access_value(source1_uid)
|
|
source2_value = get_access_value(source2_uid)
|
|
dest_value = get_access_value(dest_uid)
|
|
|
|
if not source1_value or not source2_value or not dest_value:
|
|
continue
|
|
|
|
# Generate addition code
|
|
if op["is_enabled"]:
|
|
if op["conditions"]:
|
|
# Build condition string using all contacts
|
|
condition_parts = []
|
|
for contact in op["conditions"]:
|
|
var_name = get_contact_condition(
|
|
contact["element"].get("UId")
|
|
)
|
|
if var_name:
|
|
condition_parts.append(var_name)
|
|
|
|
if condition_parts:
|
|
condition_str = " AND ".join(condition_parts)
|
|
network_code.append(f" IF {condition_str} THEN")
|
|
network_code.append(
|
|
f" {dest_value} := {source1_value} + {source2_value};"
|
|
)
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {source1_value} + {source2_value};"
|
|
)
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {source1_value} + {source2_value};"
|
|
)
|
|
else:
|
|
# It's a conditional addition, document it
|
|
network_code.append(
|
|
f" // Conditional: {dest_value} := {source1_value} + {source2_value};"
|
|
)
|
|
|
|
# 4. Process EQ (equality comparison) operations
|
|
for uid, op in operations.items():
|
|
if op["type"] == "Eq":
|
|
if "in1" not in op["inputs"] or "in2" not in op["inputs"]:
|
|
continue
|
|
|
|
# Get comparison operands
|
|
source1_uid = get_connection_source_uid(
|
|
op["inputs"]["in1"], uid, "in1"
|
|
)
|
|
source2_uid = get_connection_source_uid(
|
|
op["inputs"]["in2"], uid, "in2"
|
|
)
|
|
|
|
if not source1_uid or not source2_uid:
|
|
continue
|
|
|
|
source1_value = get_access_value(source1_uid)
|
|
source2_value = get_access_value(source2_uid)
|
|
|
|
if not source1_value or not source2_value:
|
|
continue
|
|
|
|
# Find if this feeds into a coil/memory
|
|
coil_name = None
|
|
for wire_id, connections in wire_connections.items():
|
|
# Check if this EQ is connected to the wire
|
|
eq_is_connected = False
|
|
for conn in connections:
|
|
if (
|
|
conn["type"] == "NameCon"
|
|
and conn["uid"] == uid
|
|
and conn["name"] == "out"
|
|
):
|
|
eq_is_connected = True
|
|
break
|
|
|
|
if eq_is_connected:
|
|
# Check if a coil is also connected to this wire
|
|
for conn in connections:
|
|
if conn["type"] == "NameCon" and conn["name"] == "in":
|
|
target_uid = conn["uid"]
|
|
if target_uid in parts_dict:
|
|
target_part = parts_dict[target_uid]
|
|
if target_part.get("Name") == "Coil":
|
|
# Find the coil's operand
|
|
for (
|
|
wire_id2,
|
|
connections2,
|
|
) in wire_connections.items():
|
|
for conn2 in connections2:
|
|
if (
|
|
conn2["type"] == "NameCon"
|
|
and conn2["uid"] == target_uid
|
|
and conn2["name"] == "operand"
|
|
):
|
|
for conn3 in connections2:
|
|
if (
|
|
conn3["type"]
|
|
== "IdentCon"
|
|
):
|
|
var_uid = conn3["uid"]
|
|
var_name = (
|
|
get_access_value(
|
|
var_uid
|
|
)
|
|
)
|
|
if var_name:
|
|
coil_name = var_name
|
|
|
|
if coil_name:
|
|
network_code.append(
|
|
f" IF {source1_value} = {source2_value} THEN"
|
|
)
|
|
network_code.append(f" {coil_name} := TRUE;")
|
|
network_code.append(f" ELSE")
|
|
network_code.append(f" {coil_name} := FALSE;")
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(
|
|
f" // Comparison: {source1_value} = {source2_value}"
|
|
)
|
|
|
|
# 5. Process Coil operations (setting outputs)
|
|
for uid, op in parts_dict.items():
|
|
if op.get("Name") == "Coil":
|
|
connected_var = None
|
|
# Check if already handled with EQ
|
|
is_handled = False
|
|
for line in network_code:
|
|
if " := TRUE;" in line or " := FALSE;" in line:
|
|
is_handled = True
|
|
break
|
|
|
|
if is_handled:
|
|
continue
|
|
|
|
# Find connected variable
|
|
for wire_id, connections in wire_connections.items():
|
|
for conn in connections:
|
|
if (
|
|
conn["type"] == "NameCon"
|
|
and conn["uid"] == uid
|
|
and conn["name"] == "operand"
|
|
):
|
|
for conn2 in connections:
|
|
if conn2["type"] == "IdentCon":
|
|
var_uid = conn2["uid"]
|
|
connected_var = get_access_value(var_uid)
|
|
|
|
if connected_var:
|
|
# Try to find the wire connected to the coil's input
|
|
input_cond = None
|
|
for wire_id, connections in wire_connections.items():
|
|
for conn in connections:
|
|
if (
|
|
conn["type"] == "NameCon"
|
|
and conn["uid"] == uid
|
|
and conn["name"] == "in"
|
|
):
|
|
# Find if any contacts feed into this
|
|
for (
|
|
wire_id2,
|
|
connections2,
|
|
) in wire_connections.items():
|
|
for conn2 in connections2:
|
|
if (
|
|
conn2["type"] == "NameCon"
|
|
and conn2["name"] == "out"
|
|
and conn2["uid"] in contacts
|
|
):
|
|
var_name = get_contact_condition(
|
|
conn2["uid"]
|
|
)
|
|
if var_name:
|
|
input_cond = var_name
|
|
|
|
if input_cond:
|
|
network_code.append(f" IF {input_cond} THEN")
|
|
network_code.append(f" {connected_var} := TRUE;")
|
|
network_code.append(f" ELSE")
|
|
network_code.append(f" {connected_var} := FALSE;")
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(f" // Sets output: {connected_var}")
|
|
|
|
# 6. Process more complex operations (like MOD)
|
|
for uid, op in operations.items():
|
|
if op["type"] == "Mod":
|
|
if (
|
|
"in1" not in op["inputs"]
|
|
or "in2" not in op["inputs"]
|
|
or "out" not in op["outputs"]
|
|
):
|
|
continue
|
|
|
|
# Get source operands and destination
|
|
source1_uid = get_connection_source_uid(
|
|
op["inputs"]["in1"], uid, "in1"
|
|
)
|
|
source2_uid = get_connection_source_uid(
|
|
op["inputs"]["in2"], uid, "in2"
|
|
)
|
|
dest_uid = get_connection_source_uid(
|
|
op["outputs"]["out"], uid, "out"
|
|
)
|
|
|
|
if not source1_uid or not source2_uid or not dest_uid:
|
|
continue
|
|
|
|
source1_value = get_access_value(source1_uid)
|
|
source2_value = get_access_value(source2_uid)
|
|
dest_value = get_access_value(dest_uid)
|
|
|
|
if not source1_value or not source2_value or not dest_value:
|
|
continue
|
|
|
|
# Generate modulus code
|
|
if op["is_enabled"]:
|
|
if op["conditions"]:
|
|
# Build condition string using all contacts
|
|
condition_parts = []
|
|
for contact in op["conditions"]:
|
|
var_name = get_contact_condition(
|
|
contact["element"].get("UId")
|
|
)
|
|
if var_name:
|
|
condition_parts.append(var_name)
|
|
|
|
if condition_parts:
|
|
condition_str = " AND ".join(condition_parts)
|
|
network_code.append(f" IF {condition_str} THEN")
|
|
network_code.append(
|
|
f" {dest_value} := {source1_value} MOD {source2_value};"
|
|
)
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {source1_value} MOD {source2_value};"
|
|
)
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {source1_value} MOD {source2_value};"
|
|
)
|
|
else:
|
|
# It's a conditional operation, document it
|
|
network_code.append(
|
|
f" // Conditional: {dest_value} := {source1_value} MOD {source2_value};"
|
|
)
|
|
|
|
# 7. Process Convert operations
|
|
for uid, op in operations.items():
|
|
if op["type"] == "Convert":
|
|
if "in" not in op["inputs"] or "out" not in op["outputs"]:
|
|
continue
|
|
|
|
# Get source and destination
|
|
source_uid = get_connection_source_uid(
|
|
op["inputs"]["in"], uid, "in"
|
|
)
|
|
dest_uid = get_connection_source_uid(
|
|
op["outputs"]["out"], uid, "out"
|
|
)
|
|
|
|
if not source_uid or not dest_uid:
|
|
continue
|
|
|
|
source_value = get_access_value(source_uid)
|
|
dest_value = get_access_value(dest_uid)
|
|
|
|
if not source_value or not dest_value:
|
|
continue
|
|
|
|
# Get conversion types
|
|
src_type = op["element"].get("SrcType") or ""
|
|
dest_type = op["element"].get("DestType") or ""
|
|
|
|
# Generate conversion code
|
|
if op["is_enabled"]:
|
|
if op["conditions"]:
|
|
# Build condition string using all contacts
|
|
condition_parts = []
|
|
for contact in op["conditions"]:
|
|
var_name = get_contact_condition(
|
|
contact["element"].get("UId")
|
|
)
|
|
if var_name:
|
|
condition_parts.append(var_name)
|
|
|
|
if condition_parts:
|
|
condition_str = " AND ".join(condition_parts)
|
|
network_code.append(f" IF {condition_str} THEN")
|
|
network_code.append(
|
|
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
|
|
)
|
|
network_code.append(f" END_IF;")
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
|
|
)
|
|
else:
|
|
network_code.append(
|
|
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
|
|
)
|
|
else:
|
|
# It's a conditional conversion, document it
|
|
network_code.append(
|
|
f" // Conditional: {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
|
|
)
|
|
|
|
# Add network code to the SCL code
|
|
# Check if any code was generated
|
|
if network_code:
|
|
scl_code.extend(network_code)
|
|
else:
|
|
# If no operations were documented for this network, add a placeholder
|
|
op_types = set()
|
|
for uid, op in operations.items():
|
|
if op["type"]:
|
|
op_types.add(op["type"])
|
|
for uid, part in parts_dict.items():
|
|
if part.get("Name"):
|
|
op_types.add(part.get("Name"))
|
|
|
|
if op_types:
|
|
scl_code.append(f" // Contains {', '.join(op_types)} operations")
|
|
else:
|
|
scl_code.append(f" // Empty network or complex logic")
|
|
|
|
# Close the block
|
|
if block_type == "FC":
|
|
scl_code.append("END_FUNCTION;")
|
|
elif block_type == "FB":
|
|
scl_code.append("END_FUNCTION_BLOCK;")
|
|
elif block_type == "OB":
|
|
scl_code.append("END_ORGANIZATION_BLOCK;")
|
|
|
|
return "\n".join(scl_code)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
debug_print(f"Exception occurred: {str(e)}", debug)
|
|
debug_print(traceback.format_exc(), debug)
|
|
return f"Error converting to SCL: {str(e)}"
|
|
|
|
|
|
# Main execution
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1:
|
|
input_file = sys.argv[1]
|
|
if os.path.exists(input_file):
|
|
scl = parse_siemens_lad_to_scl(input_file)
|
|
print(scl)
|
|
else:
|
|
print(f"Error: File {input_file} not found")
|
|
else:
|
|
print("Usage: python script.py <xml_file>")
|