Implementado cierta logica de IF THEN y algebra

This commit is contained in:
Miguel 2025-04-15 21:03:19 +02:00
parent 7e0bf2df96
commit b70a5983ee
2 changed files with 362 additions and 269 deletions

Binary file not shown.

View File

@ -102,7 +102,32 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
interface_elem = block.find(".//Interface")
if interface_elem is not None:
interface_text = interface_elem.text
if interface_text:
if not interface_text:
# Check if Interface has child elements instead of text
sections_elem = interface_elem.find(".//Sections")
if sections_elem is not None:
interface_xml = sections_elem
sections = interface_xml.findall(".//Section")
for section in sections:
section_name = section.get("Name")
members = section.findall(".//Member")
# Store variables by section
for member in members:
var_name = member.get("Name")
var_type = member.get("Datatype")
if section_name == "Input":
input_vars.append((var_name, var_type))
elif section_name == "Output":
output_vars.append((var_name, var_type))
elif section_name == "InOut":
inout_vars.append((var_name, var_type))
elif section_name == "Temp":
temp_vars.append((var_name, var_type))
elif section_name == "Static":
static_vars.append((var_name, var_type))
else:
try:
# Parse interface definition to extract variables
interface_xml = etree.fromstring(interface_text)
@ -282,6 +307,9 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
# Build dictionaries for Parts and Access elements
parts_dict = {}
access_dict = {}
# Dictionary to store negated status for parts
negated_parts = {}
# Process all Parts elements (including MOVE, Add, Contact, etc)
for part in parts.findall("*"):
@ -293,6 +321,12 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
uid = part.get("UId")
if uid:
parts_dict[uid] = part
# Check if it's a negated contact
negated = part.find(".//Negated")
if negated is not None:
negated_parts[uid] = negated.get("Name") == "operand"
else:
negated_parts[uid] = False
# Process all FlgNet namespace elements too
if flgnet_ns:
@ -305,6 +339,12 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
uid = part.get("UId")
if uid:
parts_dict[uid] = part
# Check if it's a negated contact
negated = part.find(".//flg:Negated", namespaces=flgnet_ns)
if negated is not None:
negated_parts[uid] = negated.get("Name") == "operand"
else:
negated_parts[uid] = False
# Process wires to build connection map
wire_connections = {}
@ -385,6 +425,7 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
"condition": None,
"input_wire": None,
"output_wire": None,
"negated": negated_parts.get(uid, False),
}
# Connect contacts to their inputs/outputs and variables
@ -410,6 +451,45 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
var_uid = conn2["uid"]
contact["condition"] = var_uid
# Build a graph of contacts and their connections for better condition tracing
contact_graph = {}
for uid, contact in contacts.items():
contact_graph[uid] = []
output_wire = contact["output_wire"]
if output_wire:
for conn in wire_connections.get(output_wire, []):
if conn["type"] == "NameCon" and conn["name"] == "in" and conn["uid"] in contacts:
contact_graph[uid].append(conn["uid"])
elif conn["type"] == "NameCon" and conn["name"] == "en" and conn["uid"] in parts_dict:
# This contact enables an operation
contact_graph[uid].append(conn["uid"])
# Helper function to trace conditions
def trace_conditions(contact_uid, visited=None):
if visited is None:
visited = set()
if contact_uid in visited:
return []
visited.add(contact_uid)
if contact_uid not in contacts:
return []
contact = contacts[contact_uid]
conditions = []
if contact["condition"]:
var_name = get_access_value(contact["condition"])
if var_name:
if contact["negated"]:
conditions.append(f"NOT {var_name}")
else:
conditions.append(var_name)
for next_uid in contact_graph.get(contact_uid, []):
conditions.extend(trace_conditions(next_uid, visited))
return conditions
# Get all instruction/operation types from parts
operations = {}
for uid, part in parts_dict.items():
@ -428,6 +508,8 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
"conditions": [],
"condition_uids": set(),
"is_enabled": False,
"enable_wire": None,
"negated": negated_parts.get(uid, False),
}
# Find connections between parts - this contains the actual LAD logic
@ -436,11 +518,11 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
has_powerrail = any(conn["has_powerrail"] for conn in connections)
# Store contact UIDs connected to this wire
connected_contacts = []
contact_outcons = []
for conn in connections:
if conn["type"] == "NameCon" and conn["uid"] in contacts:
if conn["name"] == "out":
connected_contacts.append(conn["uid"])
contact_outcons.append(conn["uid"])
# Process connections
for conn in connections:
@ -450,11 +532,12 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if uid in operations:
if name == "en":
# This is an enable input - may be connected to powerrail
# This is an enable input - may be connected to powerrail or contacts
operations[uid]["is_enabled"] |= has_powerrail
operations[uid]["enable_wire"] = wire_id
# Add connected contacts as conditions
for contact_uid in connected_contacts:
for contact_uid in contact_outcons:
if contacts[contact_uid]["condition"]:
operations[uid]["conditions"].append(
contacts[contact_uid]
@ -488,12 +571,15 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
)
if const_elem is not None:
const_type = None
const_value = None
for child in const_elem:
if child.tag.endswith("ConstantValue"):
if child.tag.endswith("ConstantType"):
const_type = child.text
elif child.tag.endswith("ConstantValue"):
const_value = child.text
break
return const_value
if const_value:
return const_value
elif scope == "GlobalVariable" or scope == "LocalVariable":
# Extract variable name
@ -551,7 +637,10 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if not var_name:
return None
return var_name
if contacts[contact_uid]["negated"]:
return f"NOT {var_name}"
else:
return var_name
def get_connection_source_uid(wire_id, target_uid, target_port):
"""Find the source UID connected to the target through the given wire."""
@ -577,111 +666,35 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
return None
def get_all_conditions_for_operation(op_uid):
"""Get all conditions that affect an operation, handling complex logic."""
if op_uid not in operations:
return []
op = operations[op_uid]
if not op["enable_wire"]:
return []
# Find all contacts connected to the enable wire
contact_uids = []
for conn in wire_connections.get(op["enable_wire"], []):
if conn["type"] == "NameCon" and conn["uid"] in contacts and conn["name"] == "out":
contact_uids.append(conn["uid"])
# Trace conditions from these contacts
all_conditions = []
for contact_uid in contact_uids:
all_conditions.extend(trace_conditions(contact_uid))
return all_conditions
# Process operations in the network to generate code
network_code = []
# 1. Process function calls (Call instructions)
for uid, op in operations.items():
if op["type"] == "Call":
# Get call info
call_info = op["element"].find("CallInfo") or (
op["element"].find("flg:CallInfo", namespaces=flgnet_ns)
if flgnet_ns
else None
)
if call_info is not None:
function_name = call_info.get("Name")
block_type = call_info.get("BlockType")
if op["is_enabled"]:
if op["conditions"]:
# Build condition string using all contacts
condition_parts = []
for contact in op["conditions"]:
var_name = get_contact_condition(
contact["element"].get("UId")
)
if var_name:
condition_parts.append(var_name)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(f" {function_name}();")
network_code.append(f" END_IF;")
else:
network_code.append(f" {function_name}();")
else:
network_code.append(f" {function_name}();")
else:
# It's a conditional call, we'll document it
network_code.append(
f" // Conditional call to {function_name}()"
)
# 2. Process MOVE operations
for uid, op in operations.items():
if op["type"] == "Move":
if "in" not in op["inputs"] or "out1" not in op["outputs"]:
continue
# Get source and destination
source_uid = get_connection_source_uid(
op["inputs"]["in"], uid, "in"
)
dest_uid = get_connection_source_uid(
op["outputs"]["out1"], uid, "out1"
)
if not source_uid or not dest_uid:
continue
source_value = get_access_value(source_uid)
dest_value = get_access_value(dest_uid)
if not source_value or not dest_value:
continue
# Generate assignment code
if op["is_enabled"]:
if op["conditions"]:
# Build condition string using all contacts
condition_parts = []
for contact in op["conditions"]:
var_name = get_contact_condition(
contact["element"].get("UId")
)
if var_name:
condition_parts.append(var_name)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {source_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source_value};"
)
else:
network_code.append(f" {dest_value} := {source_value};")
else:
# It's a conditional assignment, document it
network_code.append(
f" // Conditional: {dest_value} := {source_value};"
)
# 3. Process ADD operations
# 1. Process ADD operations with improved condition handling
for uid, op in operations.items():
if op["type"] == "Add":
if (
"in1" not in op["inputs"]
or "in2" not in op["inputs"]
or "out" not in op["outputs"]
):
if "in1" not in op["inputs"] or "in2" not in op["inputs"] or "out" not in op["outputs"]:
continue
# Get source operands and destination
@ -705,40 +718,60 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if not source1_value or not source2_value or not dest_value:
continue
# Get all conditions for this operation
conditions = get_all_conditions_for_operation(uid)
# Generate addition code
if op["is_enabled"]:
if op["conditions"]:
# Build condition string using all contacts
condition_parts = []
for contact in op["conditions"]:
var_name = get_contact_condition(
contact["element"].get("UId")
)
if var_name:
condition_parts.append(var_name)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {source1_value} + {source2_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source1_value} + {source2_value};"
)
else:
network_code.append(
f" {dest_value} := {source1_value} + {source2_value};"
)
if conditions:
condition_str = " AND ".join(conditions)
network_code.append(f" IF {condition_str} THEN")
network_code.append(f" {dest_value} := {source1_value} + {source2_value};")
network_code.append(f" END_IF;")
else:
# It's a conditional addition, document it
network_code.append(
f" // Conditional: {dest_value} := {source1_value} + {source2_value};"
)
network_code.append(f" {dest_value} := {source1_value} + {source2_value};")
# 4. Process EQ (equality comparison) operations
# 2. Process MOVE operations with improved condition handling
for uid, op in operations.items():
if op["type"] == "Move":
if "in" not in op["inputs"] or not any(out.startswith("out") for out in op["outputs"]):
continue
# Get source and destination
source_uid = get_connection_source_uid(
op["inputs"]["in"], uid, "in"
)
# Find all output connections
destinations = []
for out_port, wire_id in op["outputs"].items():
dest_uid = get_connection_source_uid(wire_id, uid, out_port)
if dest_uid:
dest_value = get_access_value(dest_uid)
if dest_value:
destinations.append(dest_value)
if not source_uid or not destinations:
continue
source_value = get_access_value(source_uid)
if not source_value:
continue
# Get all conditions for this operation
conditions = get_all_conditions_for_operation(uid)
# Generate move code
if conditions:
condition_str = " AND ".join(conditions)
network_code.append(f" IF {condition_str} THEN")
for dest_value in destinations:
network_code.append(f" {dest_value} := {source_value};")
network_code.append(f" END_IF;")
else:
for dest_value in destinations:
network_code.append(f" {dest_value} := {source_value};")
# 3. Process EQ (equality comparison) operations with improved handling
for uid, op in operations.items():
if op["type"] == "Eq":
if "in1" not in op["inputs"] or "in2" not in op["inputs"]:
@ -821,70 +854,7 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
f" // Comparison: {source1_value} = {source2_value}"
)
# 5. Process Coil operations (setting outputs)
for uid, op in parts_dict.items():
if op.get("Name") == "Coil":
connected_var = None
# Check if already handled with EQ
is_handled = False
for line in network_code:
if " := TRUE;" in line or " := FALSE;" in line:
is_handled = True
break
if is_handled:
continue
# Find connected variable
for wire_id, connections in wire_connections.items():
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "operand"
):
for conn2 in connections:
if conn2["type"] == "IdentCon":
var_uid = conn2["uid"]
connected_var = get_access_value(var_uid)
if connected_var:
# Try to find the wire connected to the coil's input
input_cond = None
for wire_id, connections in wire_connections.items():
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "in"
):
# Find if any contacts feed into this
for (
wire_id2,
connections2,
) in wire_connections.items():
for conn2 in connections2:
if (
conn2["type"] == "NameCon"
and conn2["name"] == "out"
and conn2["uid"] in contacts
):
var_name = get_contact_condition(
conn2["uid"]
)
if var_name:
input_cond = var_name
if input_cond:
network_code.append(f" IF {input_cond} THEN")
network_code.append(f" {connected_var} := TRUE;")
network_code.append(f" ELSE")
network_code.append(f" {connected_var} := FALSE;")
network_code.append(f" END_IF;")
else:
network_code.append(f" // Sets output: {connected_var}")
# 6. Process more complex operations (like MOD)
# 4. Process MOD operations with improved condition handling
for uid, op in operations.items():
if op["type"] == "Mod":
if (
@ -915,40 +885,23 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if not source1_value or not source2_value or not dest_value:
continue
# Generate modulus code
if op["is_enabled"]:
if op["conditions"]:
# Build condition string using all contacts
condition_parts = []
for contact in op["conditions"]:
var_name = get_contact_condition(
contact["element"].get("UId")
)
if var_name:
condition_parts.append(var_name)
# Get all conditions for this operation
conditions = get_all_conditions_for_operation(uid)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
else:
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
else:
# It's a conditional operation, document it
# Generate modulus code
if conditions:
condition_str = " AND ".join(conditions)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" // Conditional: {dest_value} := {source1_value} MOD {source2_value};"
f" {dest_value} := {source1_value} MOD {source2_value};"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {source1_value} MOD {source2_value};"
)
# 7. Process Convert operations
# 5. Process Convert operations with improved condition handling
for uid, op in operations.items():
if op["type"] == "Convert":
if "in" not in op["inputs"] or "out" not in op["outputs"]:
@ -975,54 +928,191 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
src_type = op["element"].get("SrcType") or ""
dest_type = op["element"].get("DestType") or ""
# Generate conversion code
if op["is_enabled"]:
if op["conditions"]:
# Build condition string using all contacts
condition_parts = []
for contact in op["conditions"]:
var_name = get_contact_condition(
contact["element"].get("UId")
)
if var_name:
condition_parts.append(var_name)
# Get all conditions for this operation
conditions = get_all_conditions_for_operation(uid)
if condition_parts:
condition_str = " AND ".join(condition_parts)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
else:
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
else:
# It's a conditional conversion, document it
# Generate conversion code
if conditions:
condition_str = " AND ".join(conditions)
network_code.append(f" IF {condition_str} THEN")
network_code.append(
f" // Conditional: {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
network_code.append(f" END_IF;")
else:
network_code.append(
f" {dest_value} := {dest_type}#{source_value}; // Convert from {src_type}"
)
# 6. Process Coil operations (setting outputs) with improved handling
for uid, part in parts_dict.items():
if part.get("Name") == "Coil":
# Skip if already handled with EQ
is_handled = False
for line in network_code:
if " := TRUE;" in line and " := FALSE;" in line:
is_handled = True
break
if is_handled:
continue
# Find connected variable
connected_var = None
for wire_id, connections in wire_connections.items():
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "operand"
):
for conn2 in connections:
if conn2["type"] == "IdentCon":
var_uid = conn2["uid"]
connected_var = get_access_value(var_uid)
if not connected_var:
continue
# Try to find the wire connected to the coil's input and trace conditions
all_conditions = []
input_wire = None
for wire_id, connections in wire_connections.items():
for conn in connections:
if (
conn["type"] == "NameCon"
and conn["uid"] == uid
and conn["name"] == "in"
):
input_wire = wire_id
# Find contacts that feed into this wire
for conn2 in connections:
if (
conn2["type"] == "NameCon"
and conn2["name"] == "out"
and conn2["uid"] in contacts
):
all_conditions.extend(trace_conditions(conn2["uid"]))
if all_conditions:
condition_str = " AND ".join(all_conditions)
# Check if it's a negated coil
if negated_parts.get(uid, False):
network_code.append(f" IF {condition_str} THEN")
network_code.append(f" {connected_var} := FALSE;")
network_code.append(f" ELSE")
network_code.append(f" {connected_var} := TRUE;")
network_code.append(f" END_IF;")
else:
network_code.append(f" IF {condition_str} THEN")
network_code.append(f" {connected_var} := TRUE;")
network_code.append(f" ELSE")
network_code.append(f" {connected_var} := FALSE;")
network_code.append(f" END_IF;")
else:
# If we can't find conditions but have an input wire, try fallback approach
if input_wire:
# Look for a direct powerrail connection
has_powerrail = any(
conn.get("has_powerrail", False)
for conn in wire_connections.get(input_wire, [])
)
if has_powerrail:
network_code.append(f" {connected_var} := TRUE; // Direct from powerrail")
else:
network_code.append(f" // Sets {connected_var} based on complex condition")
else:
network_code.append(f" // Sets output: {connected_var}")
# If no specific operations were handled, try to generate more general code
if not network_code:
# Look for simple powerrail to coil connections (direct assignments)
for uid, part in parts_dict.items():
if part.get("Name") == "Coil":
coil_var = None
direct_connection = False
# Get coil variable
for wire_id, connections in wire_connections.items():
for conn in connections:
if conn["type"] == "NameCon" and conn["uid"] == uid and conn["name"] == "operand":
for conn2 in connections:
if conn2["type"] == "IdentCon":
var_uid = conn2["uid"]
coil_var = get_access_value(var_uid)
if not coil_var:
continue
# Check if directly connected to powerrail
for wire_id, connections in wire_connections.items():
has_powerrail = any(conn.get("has_powerrail", False) for conn in connections)
if has_powerrail:
for conn in connections:
if conn["type"] == "NameCon" and conn["uid"] == uid and conn["name"] == "in":
direct_connection = True
break
if direct_connection:
break
if direct_connection:
if negated_parts.get(uid, False):
network_code.append(f" {coil_var} := FALSE; // Direct negated coil")
else:
network_code.append(f" {coil_var} := TRUE; // Direct coil")
# Add network code to the SCL code
# Check if any code was generated
if network_code:
scl_code.extend(network_code)
else:
# If no operations were documented for this network, add a placeholder
# If no operations were documented for this network, try to extract operations
op_types = set()
op_values = []
# Extract all operation types for documentation
for uid, op in operations.items():
if op["type"]:
op_types.add(op["type"])
# Try to extract values from the operation for better comments
if op["type"] == "Add":
if "in1" in op["inputs"] and "in2" in op["inputs"] and "out" in op["outputs"]:
source1_uid = get_connection_source_uid(op["inputs"]["in1"], uid, "in1")
source2_uid = get_connection_source_uid(op["inputs"]["in2"], uid, "in2")
dest_uid = get_connection_source_uid(op["outputs"]["out"], uid, "out")
if source1_uid and source2_uid and dest_uid:
source1 = get_access_value(source1_uid)
source2 = get_access_value(source2_uid)
dest = get_access_value(dest_uid)
if source1 and source2 and dest:
op_values.append(f"{dest} := {source1} + {source2}")
elif op["type"] == "Move":
if "in" in op["inputs"] and any(out.startswith("out") for out in op["outputs"]):
source_uid = get_connection_source_uid(op["inputs"]["in"], uid, "in")
if source_uid:
source = get_access_value(source_uid)
# Get all destinations
for out_name, wire_id in op["outputs"].items():
dest_uid = get_connection_source_uid(wire_id, uid, out_name)
if dest_uid:
dest = get_access_value(dest_uid)
if source and dest:
op_values.append(f"{dest} := {source}")
for uid, part in parts_dict.items():
if part.get("Name"):
op_types.add(part.get("Name"))
if op_types:
if op_values:
for value in op_values:
scl_code.append(f" // Conditional: {value};")
elif op_types:
scl_code.append(f" // Contains {', '.join(op_types)} operations")
else:
scl_code.append(f" // Empty network or complex logic")
@ -1049,10 +1139,13 @@ def parse_siemens_lad_to_scl(xml_file, debug=True):
if __name__ == "__main__":
if len(sys.argv) > 1:
input_file = sys.argv[1]
if os.path.exists(input_file):
# Verificar que el archivo existe y tiene extensión .xml
if not os.path.exists(input_file):
print(f"Error: Archivo {input_file} no encontrado")
elif not input_file.lower().endswith('.xml'):
print(f"Error: El archivo de entrada debe ser un archivo XML. Recibido: {input_file}")
else:
scl = parse_siemens_lad_to_scl(input_file)
print(scl)
else:
print(f"Error: File {input_file} not found")
else:
print("Usage: python script.py <xml_file>")
print("Uso: python script.py <archivo_xml>")