Enhance SCL code generation by adding support for contact conditions and improving network processing logic

This commit is contained in:
Miguel 2025-04-15 17:32:20 +02:00
parent 29700f0d32
commit 06b2698ada
2 changed files with 341 additions and 93 deletions

Binary file not shown.

View File

@ -376,9 +376,46 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
wire_connections[wire_id] = connections wire_connections[wire_id] = connections
# Find all contact elements - these are used for conditions
contacts = {}
for uid, part in parts_dict.items():
if part.tag.endswith("Contact") or part.get("Name") == "Contact":
contacts[uid] = {
"element": part,
"condition": None,
"input_wire": None,
"output_wire": None,
}
# Connect contacts to their inputs/outputs and variables
for wire_id, connections in wire_connections.items():
for conn in connections:
if conn["type"] == "NameCon" and conn["uid"] in contacts:
if conn["name"] == "in":
contacts[conn["uid"]]["input_wire"] = wire_id
elif conn["name"] == "out":
contacts[conn["uid"]]["output_wire"] = wire_id
# Find the operands for each contact
for uid, contact in contacts.items():
for wire_id, connections in wire_connections.items():
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "operand"
):
for conn2 in connections:
if conn2["type"] == "IdentCon":
var_uid = conn2["uid"]
contact["condition"] = var_uid
# Get all instruction/operation types from parts # Get all instruction/operation types from parts
operations = {} operations = {}
for uid, part in parts_dict.items(): for uid, part in parts_dict.items():
if part.get("Name") == "Contact":
continue # Skip contacts, we handled them separately
part_name = part.get("Name") part_name = part.get("Name")
if not part_name: if not part_name:
continue continue
@ -389,6 +426,7 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
"inputs": {}, "inputs": {},
"outputs": {}, "outputs": {},
"conditions": [], "conditions": [],
"condition_uids": set(),
"is_enabled": False, "is_enabled": False,
} }
@ -397,6 +435,13 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
# Check if wire has a powerrail (start of ladder rung) # Check if wire has a powerrail (start of ladder rung)
has_powerrail = any(conn["has_powerrail"] for conn in connections) has_powerrail = any(conn["has_powerrail"] for conn in connections)
# Store contact UIDs connected to this wire
connected_contacts = []
for conn in connections:
if conn["type"] == "NameCon" and conn["uid"] in contacts:
if conn["name"] == "out":
connected_contacts.append(conn["uid"])
# Process connections # Process connections
for conn in connections: for conn in connections:
if conn["type"] == "NameCon": if conn["type"] == "NameCon":
@ -407,6 +452,17 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if name == "en": if name == "en":
# This is an enable input - may be connected to powerrail # This is an enable input - may be connected to powerrail
operations[uid]["is_enabled"] |= has_powerrail operations[uid]["is_enabled"] |= has_powerrail
# Add connected contacts as conditions
for contact_uid in connected_contacts:
if contacts[contact_uid]["condition"]:
operations[uid]["conditions"].append(
contacts[contact_uid]
)
operations[uid]["condition_uids"].add(
contacts[contact_uid]["condition"]
)
elif name.startswith("in"): elif name.startswith("in"):
# Input connection # Input connection
operations[uid]["inputs"][name] = wire_id operations[uid]["inputs"][name] = wire_id
@ -468,12 +524,35 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
) )
if const_elem is not None: if const_elem is not None:
const_type = None
const_value = None
for child in const_elem: for child in const_elem:
if child.tag.endswith("ConstantValue"): if child.tag.endswith("ConstantType"):
return child.text const_type = child.text
elif child.tag.endswith("ConstantValue"):
const_value = child.text
if const_type and const_value:
return f"{const_type}#{const_value}"
elif const_value:
return const_value
return None return None
def get_contact_condition(contact_uid):
"""Get the condition expression for a contact."""
if contact_uid not in contacts:
return None
condition_uid = contacts[contact_uid]["condition"]
if not condition_uid:
return None
var_name = get_access_value(condition_uid)
if not var_name:
return None
return var_name
def get_connection_source_uid(wire_id, target_uid, target_port): def get_connection_source_uid(wire_id, target_uid, target_port):
"""Find the source UID connected to the target through the given wire.""" """Find the source UID connected to the target through the given wire."""
if wire_id not in wire_connections: if wire_id not in wire_connections:
@ -498,8 +577,8 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
return None return None
# Process operations in the network # Process operations in the network to generate code
executed_operations = [] network_code = []
# 1. Process function calls (Call instructions) # 1. Process function calls (Call instructions)
for uid, op in operations.items(): for uid, op in operations.items():
@ -516,25 +595,35 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
block_type = call_info.get("BlockType") block_type = call_info.get("BlockType")
if op["is_enabled"]: if op["is_enabled"]:
executed_operations.append( if op["conditions"]:
{ # Build condition string using all contacts
"type": "call", condition_parts = []
"function": function_name, for contact in op["conditions"]:
"block_type": block_type, var_name = get_contact_condition(
"condition": None, contact["element"].get("UId")
} )
) if var_name:
scl_code.append(f" {function_name}();") condition_parts.append(var_name)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(f" {function_name}();")
network_code.append(f" END_IF;")
else:
network_code.append(f" {function_name}();")
else:
network_code.append(f" {function_name}();")
else: else:
# It's a conditional call, we'll document it # It's a conditional call, we'll document it
scl_code.append( network_code.append(
f" // Conditional call to {function_name}()" f" // Conditional call to {function_name}()"
) )
# 2. Process MOVE operations # 2. Process MOVE operations
for uid, op in operations.items(): for uid, op in operations.items():
if op["type"] == "Move": if op["type"] == "Move":
if not "in" in op["inputs"] or not "out1" in op["outputs"]: if "in" not in op["inputs"] or "out1" not in op["outputs"]:
continue continue
# Get source and destination # Get source and destination
@ -556,18 +645,32 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
# Generate assignment code # Generate assignment code
if op["is_enabled"]: if op["is_enabled"]:
executed_operations.append( if op["conditions"]:
{ # Build condition string using all contacts
"type": "move", condition_parts = []
"source": source_value, for contact in op["conditions"]:
"destination": dest_value, var_name = get_contact_condition(
"condition": None, contact["element"].get("UId")
} )
) if var_name:
scl_code.append(f" {dest_value} := {source_value};") condition_parts.append(var_name)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {source_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source_value};"
)
else:
network_code.append(f" {dest_value} := {source_value};")
else: else:
# It's a conditional assignment, document it # It's a conditional assignment, document it
scl_code.append( network_code.append(
f" // Conditional: {dest_value} := {source_value};" f" // Conditional: {dest_value} := {source_value};"
) )
@ -575,9 +678,9 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
for uid, op in operations.items(): for uid, op in operations.items():
if op["type"] == "Add": if op["type"] == "Add":
if ( if (
not "in1" in op["inputs"] "in1" not in op["inputs"]
or not "in2" in op["inputs"] or "in2" not in op["inputs"]
or not "out" in op["outputs"] or "out" not in op["outputs"]
): ):
continue continue
@ -604,28 +707,41 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
# Generate addition code # Generate addition code
if op["is_enabled"]: if op["is_enabled"]:
executed_operations.append( if op["conditions"]:
{ # Build condition string using all contacts
"type": "add", condition_parts = []
"operand1": source1_value, for contact in op["conditions"]:
"operand2": source2_value, var_name = get_contact_condition(
"destination": dest_value, contact["element"].get("UId")
"condition": None, )
} if var_name:
) condition_parts.append(var_name)
scl_code.append(
f" {dest_value} := {source1_value} + {source2_value};" if condition_parts:
) condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {source1_value} + {source2_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source1_value} + {source2_value};"
)
else:
network_code.append(
f" {dest_value} := {source1_value} + {source2_value};"
)
else: else:
# It's a conditional addition, document it # It's a conditional addition, document it
scl_code.append( network_code.append(
f" // Conditional: {dest_value} := {source1_value} + {source2_value};" f" // Conditional: {dest_value} := {source1_value} + {source2_value};"
) )
# 4. Process EQ (equality comparison) operations # 4. Process EQ (equality comparison) operations
for uid, op in operations.items(): for uid, op in operations.items():
if op["type"] == "Eq": if op["type"] == "Eq":
if not "in1" in op["inputs"] or not "in2" in op["inputs"]: if "in1" not in op["inputs"] or "in2" not in op["inputs"]:
continue continue
# Get comparison operands # Get comparison operands
@ -645,37 +761,136 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if not source1_value or not source2_value: if not source1_value or not source2_value:
continue continue
# Document the comparison # Find if this feeds into a coil/memory
scl_code.append( coil_name = None
f" // Comparison: {source1_value} = {source2_value}" for wire_id, connections in wire_connections.items():
) # Check if this EQ is connected to the wire
eq_is_connected = False
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "out"
):
eq_is_connected = True
break
if eq_is_connected:
# Check if a coil is also connected to this wire
for conn in connections:
if conn["type"] == "NameCon" and conn["name"] == "in":
target_uid = conn["uid"]
if target_uid in parts_dict:
target_part = parts_dict[target_uid]
if target_part.get("Name") == "Coil":
# Find the coil's operand
for (
wire_id2,
connections2,
) in wire_connections.items():
for conn2 in connections2:
if (
conn2["type"] == "NameCon"
and conn2["uid"] == target_uid
and conn2["name"] == "operand"
):
for conn3 in connections2:
if (
conn3["type"]
== "IdentCon"
):
var_uid = conn3["uid"]
var_name = (
get_access_value(
var_uid
)
)
if var_name:
coil_name = var_name
if coil_name:
network_code.append(
f" IF {source1_value} = {source2_value} THEN"
)
network_code.append(f" {coil_name} := TRUE;")
network_code.append(f" ELSE")
network_code.append(f" {coil_name} := FALSE;")
network_code.append(f" END_IF;")
else:
network_code.append(
f" // Comparison: {source1_value} = {source2_value}"
)
# 5. Process Coil operations (setting outputs) # 5. Process Coil operations (setting outputs)
for uid, op in operations.items(): for uid, op in parts_dict.items():
if op["type"] == "Coil": if op.get("Name") == "Coil":
# Just document any coils
connected_var = None connected_var = None
for wire_id in wire_connections.values(): # Check if already handled with EQ
for conn in wire_id: is_handled = False
for line in network_code:
if " := TRUE;" in line or " := FALSE;" in line:
is_handled = True
break
if is_handled:
continue
# Find connected variable
for wire_id, connections in wire_connections.items():
for conn in connections:
if ( if (
conn["type"] == "NameCon" conn["type"] == "NameCon"
and conn["uid"] == uid and conn["uid"] == uid
and conn["name"] == "operand" and conn["name"] == "operand"
): ):
if "uid" in conn: for conn2 in connections:
var_uid = conn["uid"] if conn2["type"] == "IdentCon":
connected_var = get_access_value(var_uid) var_uid = conn2["uid"]
connected_var = get_access_value(var_uid)
if connected_var: if connected_var:
scl_code.append(f" // Sets output: {connected_var}") # Try to find the wire connected to the coil's input
input_cond = None
for wire_id, connections in wire_connections.items():
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "in"
):
# Find if any contacts feed into this
for (
wire_id2,
connections2,
) in wire_connections.items():
for conn2 in connections2:
if (
conn2["type"] == "NameCon"
and conn2["name"] == "out"
and conn2["uid"] in contacts
):
var_name = get_contact_condition(
conn2["uid"]
)
if var_name:
input_cond = var_name
if input_cond:
network_code.append(f" IF {input_cond} THEN")
network_code.append(f" {connected_var} := TRUE;")
network_code.append(f" ELSE")
network_code.append(f" {connected_var} := FALSE;")
network_code.append(f" END_IF;")
else:
network_code.append(f" // Sets output: {connected_var}")
# 6. Process more complex operations (like MOD) # 6. Process more complex operations (like MOD)
for uid, op in operations.items(): for uid, op in operations.items():
if op["type"] == "Mod": if op["type"] == "Mod":
if ( if (
not "in1" in op["inputs"] "in1" not in op["inputs"]
or not "in2" in op["inputs"] or "in2" not in op["inputs"]
or not "out" in op["outputs"] or "out" not in op["outputs"]
): ):
continue continue
@ -702,28 +917,41 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
# Generate modulus code # Generate modulus code
if op["is_enabled"]: if op["is_enabled"]:
executed_operations.append( if op["conditions"]:
{ # Build condition string using all contacts
"type": "mod", condition_parts = []
"operand1": source1_value, for contact in op["conditions"]:
"operand2": source2_value, var_name = get_contact_condition(
"destination": dest_value, contact["element"].get("UId")
"condition": None, )
} if var_name:
) condition_parts.append(var_name)
scl_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};" if condition_parts:
) condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
else:
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
else: else:
# It's a conditional operation, document it # It's a conditional operation, document it
scl_code.append( network_code.append(
f" // Conditional: {dest_value} := {source1_value} MOD {source2_value};" f" // Conditional: {dest_value} := {source1_value} MOD {source2_value};"
) )
# 7. Process Convert operations # 7. Process Convert operations
for uid, op in operations.items(): for uid, op in operations.items():
if op["type"] == "Convert": if op["type"] == "Convert":
if not "in" in op["inputs"] or not "out" in op["outputs"]: if "in" not in op["inputs"] or "out" not in op["outputs"]:
continue continue
# Get source and destination # Get source and destination
@ -749,33 +977,53 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
# Generate conversion code # Generate conversion code
if op["is_enabled"]: if op["is_enabled"]:
executed_operations.append( if op["conditions"]:
{ # Build condition string using all contacts
"type": "convert", condition_parts = []
"source": source_value, for contact in op["conditions"]:
"destination": dest_value, var_name = get_contact_condition(
"src_type": src_type, contact["element"].get("UId")
"dest_type": dest_type, )
"condition": None, if var_name:
} condition_parts.append(var_name)
)
scl_code.append( if condition_parts:
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}" condition_str = " AND ".join(condition_parts)
) network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
else:
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
else: else:
# It's a conditional conversion, document it # It's a conditional conversion, document it
scl_code.append( network_code.append(
f" // Conditional: {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}" f" // Conditional: {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
) )
# If no operations were documented for this network, add a placeholder # Add network code to the SCL code
if len(executed_operations) == 0: # Check if any code was generated
# Check for ANY operations if network_code:
op_types = [op["type"] for op in operations.values()] scl_code.extend(network_code)
else:
# If no operations were documented for this network, add a placeholder
op_types = set()
for uid, op in operations.items():
if op["type"]:
op_types.add(op["type"])
for uid, part in parts_dict.items():
if part.get("Name"):
op_types.add(part.get("Name"))
if op_types: if op_types:
scl_code.append( scl_code.append(f" // Contains {', '.join(op_types)} operations")
f" // Contains {', '.join(set(op_types))} operations"
)
else: else:
scl_code.append(f" // Empty network or complex logic") scl_code.append(f" // Empty network or complex logic")