import sys import os from lxml import etree def debug_print(message, enabled=True): """Print debug messages if enabled.""" if enabled: print(f"DEBUG: {message}") def parse_siemens_lad_to_scl(xml_file, debug=True): """ Parse a Siemens LAD/FUP XML file and convert to SCL using lxml. Args: xml_file: Path to the Siemens XML file debug: Enable debug output Returns: String containing the SCL equivalent code """ try: # Parse the XML file with lxml parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(xml_file, parser) root = tree.getroot() # Extract namespaces nsmap = root.nsmap debug_print(f"Namespaces: {nsmap}", debug) # Create namespace dictionary for XPath queries ns = {} default_ns = nsmap.get(None, "") if default_ns: ns["d"] = default_ns # Add other namespaces for prefix, uri in nsmap.items(): if prefix is not None: ns[prefix] = uri debug_print(f"Namespace dictionary: {ns}", debug) # Find the block type (FC, FB, OB) block = None block_type = None for block_tag in ["SW.Blocks.FC", "SW.Blocks.FB", "SW.Blocks.OB"]: # Try with and without namespace block = root.find(f".//{block_tag}") if block is None and "d" in ns: block = root.find(f".//d:{block_tag}", namespaces=ns) if block is not None: block_type = block_tag.split(".")[-1] debug_print(f"Found block of type {block_type}", debug) break if block is None: return "Error: No supported block (FC, FB, OB) found in the XML file." # Extract block information block_name = block.find(".//Name").text block_number = block.find(".//Number").text programming_language = block.find(".//ProgrammingLanguage").text debug_print( f"Block name: {block_name}, Number: {block_number}, Language: {programming_language}", debug, ) # Start SCL code generation scl_code = [ f"// SCL equivalent of {programming_language} block: {block_name} ({block_type} {block_number})" ] # Extract block title title_element = block.find(".//MultilingualText[@CompositionName='Title']") if title_element is not None: # Try to find title in any available language for text_item in title_element.findall(".//MultilingualTextItem"): culture_elem = text_item.find(".//Culture") text_elem = text_item.find(".//Text") if text_elem is not None and text_elem.text: culture = ( culture_elem.text if culture_elem is not None else "unknown" ) debug_print(f"Found title in {culture}: {text_elem.text}", debug) scl_code.append(f"// Title: {text_elem.text}") break # Initialize variable lists input_vars = [] output_vars = [] inout_vars = [] temp_vars = [] static_vars = [] # Extract interface information interface_elem = block.find(".//Interface") if interface_elem is not None: interface_text = interface_elem.text if interface_text: try: # Parse interface definition to extract variables interface_xml = etree.fromstring(interface_text) # Process each section in the interface sections = interface_xml.findall(".//Section") for section in sections: section_name = section.get("Name") members = section.findall("Member") # Store variables by section for member in members: var_name = member.get("Name") var_type = member.get("Datatype") if section_name == "Input": input_vars.append((var_name, var_type)) elif section_name == "Output": output_vars.append((var_name, var_type)) elif section_name == "InOut": inout_vars.append((var_name, var_type)) elif section_name == "Temp": temp_vars.append((var_name, var_type)) elif section_name == "Static": static_vars.append((var_name, var_type)) except Exception as e: debug_print(f"Error parsing interface: {str(e)}", debug) # Generate block header with interface if block_type == "FC": scl_code.append(f'FUNCTION "{block_name}" : VOID') elif block_type == "FB": scl_code.append(f'FUNCTION_BLOCK "{block_name}"') elif block_type == "OB": scl_code.append(f'ORGANIZATION_BLOCK "{block_name}"') # Add variable declarations if input_vars: scl_code.append("VAR_INPUT") for name, dtype in input_vars: scl_code.append(f" {name} : {dtype};") scl_code.append("END_VAR") if output_vars: scl_code.append("VAR_OUTPUT") for name, dtype in output_vars: scl_code.append(f" {name} : {dtype};") scl_code.append("END_VAR") if inout_vars: scl_code.append("VAR_IN_OUT") for name, dtype in inout_vars: scl_code.append(f" {name} : {dtype};") scl_code.append("END_VAR") if temp_vars: scl_code.append("VAR_TEMP") for name, dtype in temp_vars: scl_code.append(f" {name} : {dtype};") scl_code.append("END_VAR") if static_vars and block_type == "FB": scl_code.append("VAR") for name, dtype in static_vars: scl_code.append(f" {name} : {dtype};") scl_code.append("END_VAR") # Add BEGIN marker scl_code.append("BEGIN") # Process each network compile_units = block.findall(".//SW.Blocks.CompileUnit") debug_print(f"Found {len(compile_units)} networks", debug) for i, unit in enumerate(compile_units): debug_print(f"Processing network {i+1}", debug) # Get network title title_element = unit.find(".//MultilingualText[@CompositionName='Title']") network_title = f"Network {i+1}" if title_element is not None: for text_item in title_element.findall(".//MultilingualTextItem"): culture_elem = text_item.find(".//Culture") text_elem = text_item.find(".//Text") if text_elem is not None and text_elem.text: culture = ( culture_elem.text if culture_elem is not None else "unknown" ) network_title = f"Network {i+1}: {text_elem.text}" debug_print( f"Network title ({culture}): {network_title}", debug ) break scl_code.append(f" // {network_title}") # Get network source network_source_elem = unit.find(".//NetworkSource") if network_source_elem is None: debug_print(f"No NetworkSource found for network {i+1}", debug) continue # Look for FlgNet - the container for LAD/FUP network logic flg_net = None # Try direct search flg_net = network_source_elem.find("FlgNet") # Try with namespace if flg_net is None and "d" in ns: flg_net = network_source_elem.find("d:FlgNet", namespaces=ns) # Try using full XPath with any namespace if flg_net is None: for elem in network_source_elem: if elem.tag.endswith("FlgNet"): flg_net = elem break if flg_net is None: debug_print(f"No FlgNet found for network {i+1}", debug) continue # Get FlgNet namespace flgnet_ns = {} if flg_net.nsmap: debug_print(f"FlgNet has its own namespace: {flg_net.nsmap}", debug) # Add namespace with 'flg' prefix for prefix, uri in flg_net.nsmap.items(): if prefix is None: flgnet_ns["flg"] = uri else: flgnet_ns[prefix] = uri # Extract Parts and Wires sections - try with proper namespace handling parts = None wires = None # Try different methods to find Parts and Wires # Method 1: Direct access parts = flg_net.find("Parts") wires = flg_net.find("Wires") # Method 2: With FlgNet namespace if (parts is None or wires is None) and flgnet_ns: parts = parts or flg_net.find("flg:Parts", namespaces=flgnet_ns) wires = wires or flg_net.find("flg:Wires", namespaces=flgnet_ns) # Method 3: Try XPath with any namespace if parts is None: for child in flg_net: if child.tag.endswith("Parts"): parts = child break if wires is None: for child in flg_net: if child.tag.endswith("Wires"): wires = child break if parts is None: debug_print("No Parts element found - check XML namespace", debug) debug_print( f"FlgNet children: {[child.tag for child in flg_net]}", debug ) continue if wires is None: debug_print("No Wires element found - check XML namespace", debug) continue debug_print(f"Successfully found Parts and Wires elements", debug) # Build dictionaries for Parts and Access elements parts_dict = {} access_dict = {} # Process all Parts elements (including MOVE, Add, Contact, etc) for part in parts.findall("*"): if part.tag.endswith("Access"): uid = part.get("UId") if uid: access_dict[uid] = part else: uid = part.get("UId") if uid: parts_dict[uid] = part # Process all FlgNet namespace elements too if flgnet_ns: for part in parts.findall("flg:*", namespaces=flgnet_ns): if part.tag.endswith("Access"): uid = part.get("UId") if uid: access_dict[uid] = part else: uid = part.get("UId") if uid: parts_dict[uid] = part # Process wires to build connection map wire_connections = {} for wire in wires.findall("Wire"): wire_id = wire.get("UId") if not wire_id: continue connections = [] # Check for powerrail has_powerrail = any(child.tag.endswith("Powerrail") for child in wire) # Get all connections in this wire for child in wire: if child.tag.endswith("NameCon"): connections.append( { "type": "NameCon", "uid": child.get("UId"), "name": child.get("Name"), "has_powerrail": has_powerrail, } ) elif child.tag.endswith("IdentCon"): connections.append( { "type": "IdentCon", "uid": child.get("UId"), "has_powerrail": has_powerrail, } ) wire_connections[wire_id] = connections # Also process FlgNet namespace wires if flgnet_ns: for wire in wires.findall("flg:Wire", namespaces=flgnet_ns): wire_id = wire.get("UId") if not wire_id: continue connections = [] # Check for powerrail has_powerrail = any( child.tag.endswith("Powerrail") for child in wire ) # Get all connections in this wire for child in wire: if child.tag.endswith("NameCon"): connections.append( { "type": "NameCon", "uid": child.get("UId"), "name": child.get("Name"), "has_powerrail": has_powerrail, } ) elif child.tag.endswith("IdentCon"): connections.append( { "type": "IdentCon", "uid": child.get("UId"), "has_powerrail": has_powerrail, } ) wire_connections[wire_id] = connections # Find all contact elements - these are used for conditions contacts = {} for uid, part in parts_dict.items(): if part.tag.endswith("Contact") or part.get("Name") == "Contact": contacts[uid] = { "element": part, "condition": None, "input_wire": None, "output_wire": None, } # Connect contacts to their inputs/outputs and variables for wire_id, connections in wire_connections.items(): for conn in connections: if conn["type"] == "NameCon" and conn["uid"] in contacts: if conn["name"] == "in": contacts[conn["uid"]]["input_wire"] = wire_id elif conn["name"] == "out": contacts[conn["uid"]]["output_wire"] = wire_id # Find the operands for each contact for uid, contact in contacts.items(): for wire_id, connections in wire_connections.items(): for conn in connections: if ( conn["type"] == "NameCon" and conn["uid"] == uid and conn["name"] == "operand" ): for conn2 in connections: if conn2["type"] == "IdentCon": var_uid = conn2["uid"] contact["condition"] = var_uid # Get all instruction/operation types from parts operations = {} for uid, part in parts_dict.items(): if part.get("Name") == "Contact": continue # Skip contacts, we handled them separately part_name = part.get("Name") if not part_name: continue operations[uid] = { "type": part_name, "element": part, "inputs": {}, "outputs": {}, "conditions": [], "condition_uids": set(), "is_enabled": False, } # Find connections between parts - this contains the actual LAD logic for wire_id, connections in wire_connections.items(): # Check if wire has a powerrail (start of ladder rung) has_powerrail = any(conn["has_powerrail"] for conn in connections) # Store contact UIDs connected to this wire connected_contacts = [] for conn in connections: if conn["type"] == "NameCon" and conn["uid"] in contacts: if conn["name"] == "out": connected_contacts.append(conn["uid"]) # Process connections for conn in connections: if conn["type"] == "NameCon": uid = conn["uid"] name = conn["name"] if uid in operations: if name == "en": # This is an enable input - may be connected to powerrail operations[uid]["is_enabled"] |= has_powerrail # Add connected contacts as conditions for contact_uid in connected_contacts: if contacts[contact_uid]["condition"]: operations[uid]["conditions"].append( contacts[contact_uid] ) operations[uid]["condition_uids"].add( contacts[contact_uid]["condition"] ) elif name.startswith("in"): # Input connection operations[uid]["inputs"][name] = wire_id elif name.startswith("out"): # Output connection operations[uid]["outputs"][name] = wire_id # Helper functions for accessing variable/constant values def get_access_value(access_uid): """Extract variable name or constant value from an Access element.""" if access_uid not in access_dict: return None access = access_dict[access_uid] scope = access.get("Scope") if scope == "LiteralConstant": # Extract constant value const_elem = access.find(".//Constant") if const_elem is None and flgnet_ns: const_elem = access.find( ".//flg:Constant", namespaces=flgnet_ns ) if const_elem is not None: const_value = None for child in const_elem: if child.tag.endswith("ConstantValue"): const_value = child.text break return const_value elif scope == "GlobalVariable" or scope == "LocalVariable": # Extract variable name symbol = access.find("Symbol") or ( access.find("flg:Symbol", namespaces=flgnet_ns) if flgnet_ns else None ) if symbol is not None: components = [] for comp in symbol.findall("Component") or ( symbol.findall("flg:Component", namespaces=flgnet_ns) if flgnet_ns else [] ): components.append(comp.get("Name")) if components: return ".".join(components) elif scope == "TypedConstant": # Extract typed constant const_elem = access.find(".//Constant") if const_elem is None and flgnet_ns: const_elem = access.find( ".//flg:Constant", namespaces=flgnet_ns ) if const_elem is not None: const_type = None const_value = None for child in const_elem: if child.tag.endswith("ConstantType"): const_type = child.text elif child.tag.endswith("ConstantValue"): const_value = child.text if const_type and const_value: return f"{const_type}#{const_value}" elif const_value: return const_value return None def get_contact_condition(contact_uid): """Get the condition expression for a contact.""" if contact_uid not in contacts: return None condition_uid = contacts[contact_uid]["condition"] if not condition_uid: return None var_name = get_access_value(condition_uid) if not var_name: return None return var_name def get_connection_source_uid(wire_id, target_uid, target_port): """Find the source UID connected to the target through the given wire.""" if wire_id not in wire_connections: return None # Find if there's an IdentCon in this wire ident_con = None name_con_target = None for conn in wire_connections[wire_id]: if conn["type"] == "IdentCon": ident_con = conn["uid"] elif ( conn["type"] == "NameCon" and conn["uid"] == target_uid and conn["name"] == target_port ): name_con_target = True if ident_con and name_con_target: return ident_con return None # Process operations in the network to generate code network_code = [] # 1. Process function calls (Call instructions) for uid, op in operations.items(): if op["type"] == "Call": # Get call info call_info = op["element"].find("CallInfo") or ( op["element"].find("flg:CallInfo", namespaces=flgnet_ns) if flgnet_ns else None ) if call_info is not None: function_name = call_info.get("Name") block_type = call_info.get("BlockType") if op["is_enabled"]: if op["conditions"]: # Build condition string using all contacts condition_parts = [] for contact in op["conditions"]: var_name = get_contact_condition( contact["element"].get("UId") ) if var_name: condition_parts.append(var_name) if condition_parts: condition_str = " AND ".join(condition_parts) network_code.append(f" IF {condition_str} THEN") network_code.append(f" {function_name}();") network_code.append(f" END_IF;") else: network_code.append(f" {function_name}();") else: network_code.append(f" {function_name}();") else: # It's a conditional call, we'll document it network_code.append( f" // Conditional call to {function_name}()" ) # 2. Process MOVE operations for uid, op in operations.items(): if op["type"] == "Move": if "in" not in op["inputs"] or "out1" not in op["outputs"]: continue # Get source and destination source_uid = get_connection_source_uid( op["inputs"]["in"], uid, "in" ) dest_uid = get_connection_source_uid( op["outputs"]["out1"], uid, "out1" ) if not source_uid or not dest_uid: continue source_value = get_access_value(source_uid) dest_value = get_access_value(dest_uid) if not source_value or not dest_value: continue # Generate assignment code if op["is_enabled"]: if op["conditions"]: # Build condition string using all contacts condition_parts = [] for contact in op["conditions"]: var_name = get_contact_condition( contact["element"].get("UId") ) if var_name: condition_parts.append(var_name) if condition_parts: condition_str = " AND ".join(condition_parts) network_code.append(f" IF {condition_str} THEN") network_code.append( f" {dest_value} := {source_value};" ) network_code.append(f" END_IF;") else: network_code.append( f" {dest_value} := {source_value};" ) else: network_code.append(f" {dest_value} := {source_value};") else: # It's a conditional assignment, document it network_code.append( f" // Conditional: {dest_value} := {source_value};" ) # 3. Process ADD operations for uid, op in operations.items(): if op["type"] == "Add": if ( "in1" not in op["inputs"] or "in2" not in op["inputs"] or "out" not in op["outputs"] ): continue # Get source operands and destination source1_uid = get_connection_source_uid( op["inputs"]["in1"], uid, "in1" ) source2_uid = get_connection_source_uid( op["inputs"]["in2"], uid, "in2" ) dest_uid = get_connection_source_uid( op["outputs"]["out"], uid, "out" ) if not source1_uid or not source2_uid or not dest_uid: continue source1_value = get_access_value(source1_uid) source2_value = get_access_value(source2_uid) dest_value = get_access_value(dest_uid) if not source1_value or not source2_value or not dest_value: continue # Generate addition code if op["is_enabled"]: if op["conditions"]: # Build condition string using all contacts condition_parts = [] for contact in op["conditions"]: var_name = get_contact_condition( contact["element"].get("UId") ) if var_name: condition_parts.append(var_name) if condition_parts: condition_str = " AND ".join(condition_parts) network_code.append(f" IF {condition_str} THEN") network_code.append( f" {dest_value} := {source1_value} + {source2_value};" ) network_code.append(f" END_IF;") else: network_code.append( f" {dest_value} := {source1_value} + {source2_value};" ) else: network_code.append( f" {dest_value} := {source1_value} + {source2_value};" ) else: # It's a conditional addition, document it network_code.append( f" // Conditional: {dest_value} := {source1_value} + {source2_value};" ) # 4. Process EQ (equality comparison) operations for uid, op in operations.items(): if op["type"] == "Eq": if "in1" not in op["inputs"] or "in2" not in op["inputs"]: continue # Get comparison operands source1_uid = get_connection_source_uid( op["inputs"]["in1"], uid, "in1" ) source2_uid = get_connection_source_uid( op["inputs"]["in2"], uid, "in2" ) if not source1_uid or not source2_uid: continue source1_value = get_access_value(source1_uid) source2_value = get_access_value(source2_uid) if not source1_value or not source2_value: continue # Find if this feeds into a coil/memory coil_name = None for wire_id, connections in wire_connections.items(): # Check if this EQ is connected to the wire eq_is_connected = False for conn in connections: if ( conn["type"] == "NameCon" and conn["uid"] == uid and conn["name"] == "out" ): eq_is_connected = True break if eq_is_connected: # Check if a coil is also connected to this wire for conn in connections: if conn["type"] == "NameCon" and conn["name"] == "in": target_uid = conn["uid"] if target_uid in parts_dict: target_part = parts_dict[target_uid] if target_part.get("Name") == "Coil": # Find the coil's operand for ( wire_id2, connections2, ) in wire_connections.items(): for conn2 in connections2: if ( conn2["type"] == "NameCon" and conn2["uid"] == target_uid and conn2["name"] == "operand" ): for conn3 in connections2: if ( conn3["type"] == "IdentCon" ): var_uid = conn3["uid"] var_name = ( get_access_value( var_uid ) ) if var_name: coil_name = var_name if coil_name: network_code.append( f" IF {source1_value} = {source2_value} THEN" ) network_code.append(f" {coil_name} := TRUE;") network_code.append(f" ELSE") network_code.append(f" {coil_name} := FALSE;") network_code.append(f" END_IF;") else: network_code.append( f" // Comparison: {source1_value} = {source2_value}" ) # 5. Process Coil operations (setting outputs) for uid, op in parts_dict.items(): if op.get("Name") == "Coil": connected_var = None # Check if already handled with EQ is_handled = False for line in network_code: if " := TRUE;" in line or " := FALSE;" in line: is_handled = True break if is_handled: continue # Find connected variable for wire_id, connections in wire_connections.items(): for conn in connections: if ( conn["type"] == "NameCon" and conn["uid"] == uid and conn["name"] == "operand" ): for conn2 in connections: if conn2["type"] == "IdentCon": var_uid = conn2["uid"] connected_var = get_access_value(var_uid) if connected_var: # Try to find the wire connected to the coil's input input_cond = None for wire_id, connections in wire_connections.items(): for conn in connections: if ( conn["type"] == "NameCon" and conn["uid"] == uid and conn["name"] == "in" ): # Find if any contacts feed into this for ( wire_id2, connections2, ) in wire_connections.items(): for conn2 in connections2: if ( conn2["type"] == "NameCon" and conn2["name"] == "out" and conn2["uid"] in contacts ): var_name = get_contact_condition( conn2["uid"] ) if var_name: input_cond = var_name if input_cond: network_code.append(f" IF {input_cond} THEN") network_code.append(f" {connected_var} := TRUE;") network_code.append(f" ELSE") network_code.append(f" {connected_var} := FALSE;") network_code.append(f" END_IF;") else: network_code.append(f" // Sets output: {connected_var}") # 6. Process more complex operations (like MOD) for uid, op in operations.items(): if op["type"] == "Mod": if ( "in1" not in op["inputs"] or "in2" not in op["inputs"] or "out" not in op["outputs"] ): continue # Get source operands and destination source1_uid = get_connection_source_uid( op["inputs"]["in1"], uid, "in1" ) source2_uid = get_connection_source_uid( op["inputs"]["in2"], uid, "in2" ) dest_uid = get_connection_source_uid( op["outputs"]["out"], uid, "out" ) if not source1_uid or not source2_uid or not dest_uid: continue source1_value = get_access_value(source1_uid) source2_value = get_access_value(source2_uid) dest_value = get_access_value(dest_uid) if not source1_value or not source2_value or not dest_value: continue # Generate modulus code if op["is_enabled"]: if op["conditions"]: # Build condition string using all contacts condition_parts = [] for contact in op["conditions"]: var_name = get_contact_condition( contact["element"].get("UId") ) if var_name: condition_parts.append(var_name) if condition_parts: condition_str = " AND ".join(condition_parts) network_code.append(f" IF {condition_str} THEN") network_code.append( f" {dest_value} := {source1_value} MOD {source2_value};" ) network_code.append(f" END_IF;") else: network_code.append( f" {dest_value} := {source1_value} MOD {source2_value};" ) else: network_code.append( f" {dest_value} := {source1_value} MOD {source2_value};" ) else: # It's a conditional operation, document it network_code.append( f" // Conditional: {dest_value} := {source1_value} MOD {source2_value};" ) # 7. Process Convert operations for uid, op in operations.items(): if op["type"] == "Convert": if "in" not in op["inputs"] or "out" not in op["outputs"]: continue # Get source and destination source_uid = get_connection_source_uid( op["inputs"]["in"], uid, "in" ) dest_uid = get_connection_source_uid( op["outputs"]["out"], uid, "out" ) if not source_uid or not dest_uid: continue source_value = get_access_value(source_uid) dest_value = get_access_value(dest_uid) if not source_value or not dest_value: continue # Get conversion types src_type = op["element"].get("SrcType") or "" dest_type = op["element"].get("DestType") or "" # Generate conversion code if op["is_enabled"]: if op["conditions"]: # Build condition string using all contacts condition_parts = [] for contact in op["conditions"]: var_name = get_contact_condition( contact["element"].get("UId") ) if var_name: condition_parts.append(var_name) if condition_parts: condition_str = " AND ".join(condition_parts) network_code.append(f" IF {condition_str} THEN") network_code.append( f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}" ) network_code.append(f" END_IF;") else: network_code.append( f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}" ) else: network_code.append( f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}" ) else: # It's a conditional conversion, document it network_code.append( f" // Conditional: {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}" ) # Add network code to the SCL code # Check if any code was generated if network_code: scl_code.extend(network_code) else: # If no operations were documented for this network, add a placeholder op_types = set() for uid, op in operations.items(): if op["type"]: op_types.add(op["type"]) for uid, part in parts_dict.items(): if part.get("Name"): op_types.add(part.get("Name")) if op_types: scl_code.append(f" // Contains {', '.join(op_types)} operations") else: scl_code.append(f" // Empty network or complex logic") # Close the block if block_type == "FC": scl_code.append("END_FUNCTION;") elif block_type == "FB": scl_code.append("END_FUNCTION_BLOCK;") elif block_type == "OB": scl_code.append("END_ORGANIZATION_BLOCK;") return "\n".join(scl_code) except Exception as e: import traceback debug_print(f"Exception occurred: {str(e)}", debug) debug_print(traceback.format_exc(), debug) return f"Error converting to SCL: {str(e)}" # Main execution if __name__ == "__main__": if len(sys.argv) > 1: input_file = sys.argv[1] if os.path.exists(input_file): scl = parse_siemens_lad_to_scl(input_file) print(scl) else: print(f"Error: File {input_file} not found") else: print("Usage: python script.py ")