168 lines
7.0 KiB
Python
168 lines
7.0 KiB
Python
import re
|
|
import json
|
|
import pandas as pd
|
|
|
|
def expand_udt_references(db_struct, udts):
|
|
"""
|
|
Recursively expand UDT references in the given DB structure using the UDT definitions.
|
|
This function specifically expands fields designated as 'type' which reference UDTs.
|
|
"""
|
|
if isinstance(db_struct, dict):
|
|
for key, value in list(db_struct.items()):
|
|
if isinstance(value, dict):
|
|
# Recurse into dictionaries
|
|
expand_udt_references(value, udts)
|
|
elif isinstance(value, str) and key == "type": # Only expand 'type' fields
|
|
type_name = value.strip(
|
|
'"'
|
|
) # Remove quotes which may wrap UDT names with spaces
|
|
if type_name in udts:
|
|
# Replace the UDT reference with its definition, if it exists
|
|
db_struct["fields"] = udts[
|
|
type_name
|
|
].copy() # Assume structure to insert is under 'fields'
|
|
print(f"Expanded UDT '{type_name}' at field '{key}' ")
|
|
elif isinstance(db_struct, list):
|
|
for item in db_struct:
|
|
expand_udt_references(item, udts)
|
|
|
|
|
|
def handle_array_types(db_struct):
|
|
"""
|
|
Handle array types to expand them into multiple fields as sub-elements while preserving comments.
|
|
Modifies the structure in place by expanding array definitions within their original field.
|
|
"""
|
|
if isinstance(db_struct, dict):
|
|
for key, value in list(db_struct.items()):
|
|
if isinstance(value, dict):
|
|
# Recursively process nested dictionaries
|
|
handle_array_types(value)
|
|
|
|
if isinstance(value, dict) and 'type' in value:
|
|
match = re.match(r"Array\[(\d+)\.\.(\d+)\] of (\w+)", value['type'])
|
|
if match:
|
|
lower_bound, upper_bound, base_type = int(match.group(1)), int(match.group(2)), match.group(3)
|
|
comment = value.get('comment', '')
|
|
|
|
# Instead of popping the original key, initialize a sub-dictionary
|
|
value['Array'] = {} # Initialize a sub-dictionary for array elements
|
|
|
|
for i in range(lower_bound, upper_bound + 1):
|
|
element_key = f"[{i}]"
|
|
value['Array'][element_key] = {
|
|
'type': base_type,
|
|
'comment': comment,
|
|
'is_array_element': True
|
|
}
|
|
|
|
# Optionally, modify or remove the original type designation if necessary
|
|
# value.pop('type', None) # Uncomment this if you want to remove the 'type' from the original
|
|
|
|
# Continue the recursive handling if it's not an array definition
|
|
elif isinstance(value, dict):
|
|
handle_array_types(value)
|
|
|
|
|
|
|
|
|
|
|
|
type_sizes = {
|
|
"Int": 2,
|
|
"DInt": 4,
|
|
"Word": 2,
|
|
"Real": 4,
|
|
"Bool": 2, # We'll adjust this dynamically based on context (1 byte if alone, 1 bit if grouped)
|
|
"String": 1, # This will be multiplied by the specified size in brackets [n]
|
|
}
|
|
|
|
|
|
def calculate_plc_address(type_name, byte_offset, bit_offset=None):
|
|
"""
|
|
Calculate the PLC address notation based on byte size, byte offset and bit offset.
|
|
"""
|
|
byte_size = type_sizes.get(
|
|
type_name, 0
|
|
)
|
|
if type_name == "Bool":
|
|
if bit_offset is not None:
|
|
return f"DBX{byte_offset}.{bit_offset}" # Address for single bits
|
|
return f"DBB{byte_offset}" # Address for single bytes
|
|
elif byte_size == 2:
|
|
return f"DBW{byte_offset}" # Address for two-byte words
|
|
elif byte_size == 4:
|
|
return f"DBD{byte_offset}" # Address for four-byte double words
|
|
else:
|
|
return f"DBX{byte_offset}.0" # Default to bit address for types longer than 4 bytes (e.g., strings)
|
|
|
|
|
|
def calculate_offsets(db_struct, current_offset=0, parent=None):
|
|
"""
|
|
Recursively calculate byte offsets for each field in the DB structure considering special types.
|
|
"""
|
|
last_key_was_bool = False
|
|
last_bit_offset = 0 # To track bit offsets within a byte
|
|
if isinstance(db_struct, dict):
|
|
for key, value in list(db_struct.items()):
|
|
if isinstance(value, dict):
|
|
if "type" in value:
|
|
type_name = value["type"]
|
|
is_array_element = value.get('is_array_element', False)
|
|
size = type_sizes.get(
|
|
type_name, 0
|
|
) # Default to 1 byte if type is not recognized
|
|
|
|
if not is_array_element and current_offset % 2 != 0:
|
|
current_offset += 1 # Align to the next even offset if it's not an array element
|
|
last_bit_offset = 0
|
|
|
|
plc_address = value.get('plc_address', False)
|
|
if plc_address is not False:
|
|
print("Address Already Calcolated!")
|
|
|
|
plc_address = calculate_plc_address(type_name, current_offset, last_bit_offset)
|
|
|
|
# Special handling for String types
|
|
if "String" in type_name:
|
|
match = re.match(r"String\[(\d+)\]", type_name)
|
|
last_bit_offset = 0
|
|
if match:
|
|
length = int(match.group(1))
|
|
size = length + 2 # Account for null-termination and string length prefix
|
|
else:
|
|
size = type_sizes.get(type_name, 1) # Default to generic size if not an array
|
|
|
|
# Adjusting Bool sizes based on grouping
|
|
if type_name == "Bool":
|
|
if last_key_was_bool: # This is a grouped bool
|
|
last_bit_offset += 1 # One bit per Bool if grouped
|
|
else:
|
|
size = 2 # Bools use a full byte if not grouped
|
|
last_bit_offset = 0
|
|
last_key_was_bool = True
|
|
else:
|
|
last_key_was_bool = False
|
|
|
|
value["offset"] = current_offset
|
|
value['plc_address'] = plc_address # Store the calculated PLC address
|
|
current_offset += size
|
|
|
|
current_offset = calculate_offsets(
|
|
value, current_offset, value
|
|
) # Recurse into nested structs
|
|
elif isinstance(db_struct, list):
|
|
for item in db_struct:
|
|
current_offset = calculate_offsets(item, current_offset, parent)
|
|
return current_offset
|
|
|
|
|
|
def expand_dbs(udts, dbs):
|
|
"""
|
|
Expand all UDT references in all DBs and then handle array types.
|
|
"""
|
|
for db_name, db_content in dbs.items():
|
|
print(f"Expanding DB: {db_name}")
|
|
expand_udt_references(db_content, udts)
|
|
handle_array_types(db_content)
|
|
calculate_offsets(db_content)
|
|
print(f"Completed expansion for DB: {db_name}")
|