245 lines
8.4 KiB
Python
245 lines
8.4 KiB
Python
|
import xmltodict
|
||
|
import pandas as pd
|
||
|
import re
|
||
|
import sys
|
||
|
|
||
|
def save_json_to_xml(json_data, filename="DB_Structure.xml"):
|
||
|
"""
|
||
|
Convert JSON data to XML and save it to a file.
|
||
|
"""
|
||
|
xml_data = xmltodict.unparse({"root": json_data}, pretty=True)
|
||
|
with open(filename, "w") as xml_file:
|
||
|
xml_file.write(xml_data)
|
||
|
print(f"XML data saved to {filename}")
|
||
|
|
||
|
|
||
|
def save_dataframe_to_excel(df, filename="DB_Structure.xlsx"):
|
||
|
"""
|
||
|
Save the provided DataFrame to an Excel file.
|
||
|
"""
|
||
|
df.to_excel(filename, index=False)
|
||
|
print(f"Data saved to {filename}")
|
||
|
|
||
|
|
||
|
def save_dataframe_to_file(df, filename="DB_Structure.csv"):
|
||
|
"""
|
||
|
Save the provided DataFrame to a CSV file.
|
||
|
"""
|
||
|
df.to_csv(filename, index=False)
|
||
|
print(f"Data saved to {filename}")
|
||
|
|
||
|
|
||
|
type_sizes = {
|
||
|
"Byte": 1,
|
||
|
"Char": 1,
|
||
|
"Int": 2,
|
||
|
"DInt": 4,
|
||
|
"Word": 2,
|
||
|
"DWord": 4,
|
||
|
"Real": 4,
|
||
|
"Date": 2,
|
||
|
"Time": 4,
|
||
|
"Time_Of_Day": 4,
|
||
|
"S5Time": 2,
|
||
|
"Bool": 0.125, # Recuerda, esto significa 1 bit, pero es comúnmente agrupado en 8 bits = 1 Byte
|
||
|
"String": 256, # Especificar si es necesario como String[256] que serían 258 bytes (256 caracteres + 2 de longitud)
|
||
|
"WString": 512,
|
||
|
"LReal": 8, # Doble precisión de punto flotante
|
||
|
"UDInt": 4, # Entero sin signo de 32 bits
|
||
|
"USInt": 1, # Entero sin signo de 8 bits (Byte)
|
||
|
"UInt": 2, # Entero sin signo de 16 bits (Word)
|
||
|
"ULInt": 8, # Entero sin signo de 64 bits (Doble DWord)
|
||
|
"LWord": 8, # Entero sin signo de 64 bits (Doble DWord)
|
||
|
"LInt": 8, # Entero con signo de 64 bits
|
||
|
"Date_And_Time": 8, # Fecha y hora combinadas, 8 bytes
|
||
|
"DTL": 12, # Date and time long (fecha, hora y precisión a microsegundos, 12 bytes)
|
||
|
}
|
||
|
|
||
|
|
||
|
def calculate_plc_address(type_name, byte_offset):
|
||
|
"""
|
||
|
Calculate the PLC address notation based on byte size, byte offset and bit offset.
|
||
|
"""
|
||
|
byte_size = type_sizes.get(type_name, 0)
|
||
|
bit_offset = int((byte_offset - int(byte_offset)) * 8)
|
||
|
byte_offset = int(byte_offset)
|
||
|
if type_name == "Bool":
|
||
|
if bit_offset is not None:
|
||
|
return f"DBX{byte_offset}.{bit_offset}" # Address for single bits
|
||
|
return f"DBB{byte_offset}" # Address for single bytes
|
||
|
elif type_name == "Byte":
|
||
|
return f"DBB{byte_offset}" # Address for two-byte words
|
||
|
elif byte_size == 2:
|
||
|
return f"DBW{byte_offset}" # Address for two-byte words
|
||
|
elif byte_size == 4:
|
||
|
return f"DBD{byte_offset}" # Address for four-byte double words
|
||
|
else:
|
||
|
return f"DBX{byte_offset}.0" # Default to bit address for types longer than 4 bytes (e.g., strings)
|
||
|
|
||
|
|
||
|
def calculate_plc_size(size):
|
||
|
byte_size = size
|
||
|
bit_offset = int((size - int(size)) * 8)
|
||
|
size = int(size)
|
||
|
if bit_offset > 0:
|
||
|
return f"{size}.{bit_offset}"
|
||
|
else:
|
||
|
return f"{size}"
|
||
|
|
||
|
|
||
|
class OffsetState:
|
||
|
last_key_was_bool = False
|
||
|
last_bit_offset = 0 # To track bit offsets within a byte
|
||
|
current_offset = 0
|
||
|
|
||
|
|
||
|
def calculate_offsets(value, state):
|
||
|
|
||
|
type_name = value["type"]
|
||
|
is_array_element = value.get("is_array_element", False)
|
||
|
is_array_definition = value.get("array_definition", False)
|
||
|
is_udt_definition = value.get("is_udt_definition", False)
|
||
|
|
||
|
if state.last_key_was_bool:
|
||
|
is_array_element = True
|
||
|
size = 0
|
||
|
|
||
|
if not is_array_element:
|
||
|
if state.current_offset % 2 != 0:
|
||
|
state.current_offset += (
|
||
|
1 # Align to the next even offset if it's not an array element
|
||
|
)
|
||
|
|
||
|
# Adjusting Bool sizes based on grouping
|
||
|
if type_name == "Bool":
|
||
|
state.last_key_was_bool = True
|
||
|
size += 1 / 8
|
||
|
|
||
|
else:
|
||
|
if state.last_key_was_bool: # After bools
|
||
|
state.last_key_was_bool = False ## Not Bool
|
||
|
if (
|
||
|
state.last_bit_offset > 0
|
||
|
or int(state.current_offset) != state.current_offset
|
||
|
):
|
||
|
state.last_bit_offset = 0
|
||
|
state.current_offset = int(state.current_offset) + 1
|
||
|
if state.current_offset % 2 != 0:
|
||
|
state.current_offset += (
|
||
|
1 # Align to the next even offset if it's not an array element
|
||
|
)
|
||
|
|
||
|
# Special handling for String types
|
||
|
if type_name.startswith("String"):
|
||
|
match = re.match(r"String\[(\d+)\]", type_name)
|
||
|
state.last_bit_offset = 0
|
||
|
if match:
|
||
|
length = int(match.group(1))
|
||
|
size = (
|
||
|
length + 2
|
||
|
) # Account for null-termination and string length prefix
|
||
|
else:
|
||
|
size = type_sizes.get(type_name, 0) ## Standar size for strings
|
||
|
|
||
|
else: ## Other Data Types
|
||
|
if is_array_definition:
|
||
|
size = 0
|
||
|
if state.current_offset % 2 != 0:
|
||
|
state.current_offset += (
|
||
|
1 # Align to the next even offset if it's not an array element
|
||
|
)
|
||
|
else:
|
||
|
size = type_sizes.get(
|
||
|
type_name, -1
|
||
|
) # Default to 1 byte if type is not recognized
|
||
|
|
||
|
if size == -1 and not is_udt_definition:
|
||
|
print(f"UDT o DataType '{type_name}' no encontrado. Abortando.")
|
||
|
sys.exit()
|
||
|
|
||
|
plc_address = calculate_plc_address(type_name, state.current_offset)
|
||
|
value["offset"] = int(state.current_offset)
|
||
|
value["plc_address"] = plc_address # Store the calculated PLC address
|
||
|
value["size"] = calculate_plc_size(size)
|
||
|
# print(f"Offset '{state.current_offset}' at field '{key}' ")
|
||
|
state.current_offset += size
|
||
|
return state
|
||
|
|
||
|
|
||
|
def collect_data_for_table(
|
||
|
db_struct, offset_state, level=0, parent_prefix="", collected_data=[]
|
||
|
):
|
||
|
"""
|
||
|
Recursively collect data from the DB structure to display in a tabular format,
|
||
|
omitting 'fields' and 'Struct' in the names.
|
||
|
"""
|
||
|
is_array_element = False
|
||
|
increase_level = 0
|
||
|
|
||
|
if isinstance(db_struct, dict):
|
||
|
for key, value in db_struct.items():
|
||
|
# Skip 'fields' and 'Struct' keys in the name path
|
||
|
#
|
||
|
if key == "fields" or key == "Struct" or key == "Array":
|
||
|
next_prefix = parent_prefix # Continue with the current prefix
|
||
|
else:
|
||
|
if isinstance(value, dict):
|
||
|
is_array_element = value.get("is_array_element", False)
|
||
|
if not is_array_element:
|
||
|
next_prefix = f"{parent_prefix}.{key}" if parent_prefix else key
|
||
|
else:
|
||
|
next_prefix = f"{parent_prefix}{key}" if parent_prefix else key
|
||
|
|
||
|
if (
|
||
|
isinstance(value, dict) and "type" in value
|
||
|
): # Directly a field with 'type'
|
||
|
offset_state = calculate_offsets(value, offset_state)
|
||
|
field_data = {
|
||
|
"Nombre": next_prefix,
|
||
|
"Tipo": value.get("type", "N/A"),
|
||
|
"Offset": value.get("offset", "N/A"),
|
||
|
"Size": value.get("size", "N/A"),
|
||
|
"Level": level,
|
||
|
"Dirección PLC": value.get("plc_address", "N/A"),
|
||
|
"Comentario": value.get("comment", "N/A"),
|
||
|
}
|
||
|
collected_data.append(field_data)
|
||
|
increase_level = 1
|
||
|
|
||
|
# Recursively handle nested dictionaries and lists
|
||
|
if isinstance(value, dict) or isinstance(value, list):
|
||
|
collect_data_for_table(
|
||
|
value,
|
||
|
offset_state,
|
||
|
level + increase_level,
|
||
|
next_prefix,
|
||
|
collected_data,
|
||
|
)
|
||
|
elif isinstance(db_struct, list):
|
||
|
for index, item in enumerate(db_struct):
|
||
|
item_prefix = f"{parent_prefix}[{index}]" if parent_prefix else f"[{index}]"
|
||
|
collect_data_for_table(
|
||
|
item, offset_state, level + increase_level, item_prefix, collected_data
|
||
|
)
|
||
|
|
||
|
return collected_data
|
||
|
|
||
|
|
||
|
def convert_to_table(db_struct):
|
||
|
offset_state = OffsetState()
|
||
|
return collect_data_for_table(db_struct, offset_state)
|
||
|
|
||
|
|
||
|
def display_as_table(dbs):
|
||
|
"""
|
||
|
Convert collected DB data into a pandas DataFrame and display it.
|
||
|
"""
|
||
|
all_data = []
|
||
|
for db_name, db_content in dbs.items():
|
||
|
print(f"Processing DB: {db_name}")
|
||
|
db_data = convert_to_table(db_content)
|
||
|
all_data.extend(db_data)
|
||
|
|
||
|
df = pd.DataFrame(all_data)
|
||
|
return df
|