ParamManagerScripts/backend/script_groups/S7_DB_Utils/x3.py

533 lines
30 KiB
Python

import re
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Union, Tuple, Any
import copy
@dataclass
class ArrayDimension:
lower_bound: int
upper_bound: int
@property
def count(self) -> int:
return self.upper_bound - self.lower_bound + 1
@dataclass
class VariableInfo:
name: str
data_type: str
byte_offset: float
size_in_bytes: int # For BOOL arrays, this is the number of bytes spanned. For single BOOL, 0.
bit_size: int = 0 # For BOOL, this is 1. For others, usually 0 unless it's a bitfield type.
udt_source_name: Optional[str] = None
string_length: Optional[int] = None
array_dimensions: List[ArrayDimension] = field(default_factory=list)
initial_value: Optional[str] = None
current_value: Optional[str] = None
comment: Optional[str] = None
children: List['VariableInfo'] = field(default_factory=list)
is_udt_expanded_member: bool = False
@dataclass
class UdtInfo:
name: str
family: Optional[str] = None
version: Optional[str] = None
members: List[VariableInfo] = field(default_factory=list)
total_size_in_bytes: int = 0
@dataclass
class DbInfo:
name: str
title: Optional[str] = None
family: Optional[str] = None
version: Optional[str] = None
members: List[VariableInfo] = field(default_factory=list)
total_size_in_bytes: int = 0
_initial_values_from_begin_block: Dict[str, str] = field(default_factory=dict)
@dataclass
class ParsedData:
udts: List[UdtInfo] = field(default_factory=list)
dbs: List[DbInfo] = field(default_factory=list)
@dataclass
class OffsetContext:
byte_offset: int = 0
bit_offset: int = 0 # 0-7
def get_combined_offset(self) -> float:
if self.bit_offset == 0:
return float(self.byte_offset)
return float(self.byte_offset * 10 + self.bit_offset) / 10.0
def advance_bits(self, num_bits: int):
self.bit_offset += num_bits
self.byte_offset += self.bit_offset // 8
self.bit_offset %= 8
def align_to_byte(self):
if self.bit_offset > 0:
self.byte_offset += 1
self.bit_offset = 0
def align_to_word(self):
self.align_to_byte()
if self.byte_offset % 2 != 0:
self.byte_offset += 1
S7_PRIMITIVE_SIZES = {
# type_name: (size_in_bytes, alignment_in_bytes_for_start, is_bool_type)
"BOOL": (0, 1, True), "BYTE": (1, 1, False), "CHAR": (1, 1, False),
"SINT": (1, 1, False), "USINT": (1, 1, False), "WORD": (2, 2, False),
"INT": (2, 2, False), "UINT": (2, 2, False), "S5TIME": (2, 2, False),
"DATE": (2, 2, False), "DWORD": (4, 2, False), "DINT": (4, 2, False),
"UDINT": (4, 2, False), "REAL": (4, 2, False), "TIME": (4, 2, False),
"TIME_OF_DAY": (4, 2, False), "TOD": (4, 2, False), # TOD is alias for TIME_OF_DAY
"LREAL": (8, 2, False), "LINT": (8, 2, False), "ULINT": (8, 2, False),
"LWORD": (8, 2, False), "DATE_AND_TIME": (8, 2, False), "DT": (8, 2, False), # DT is alias for DATE_AND_TIME
# STRING is handled specially due to its length component
}
class S7Parser:
def __init__(self):
self.parsed_data = ParsedData()
self.known_udts: Dict[str, UdtInfo] = {}
self.type_start_regex = re.compile(r'^\s*TYPE\s+"([^"]+)"', re.IGNORECASE)
self.db_start_regex = re.compile(r'^\s*DATA_BLOCK\s+"([^"]+)"', re.IGNORECASE)
self.property_regex = re.compile(r'^\s*([A-Z_]+)\s*:\s*(.+?)(?:\s*;)?\s*(?://.*)?$', re.IGNORECASE)
self.struct_start_regex = re.compile(r'^\s*STRUCT\b', re.IGNORECASE)
self.end_struct_regex = re.compile(r'^\s*END_STRUCT\b', re.IGNORECASE)
self.end_type_regex = re.compile(r'^\s*END_TYPE\b', re.IGNORECASE)
self.end_db_regex = re.compile(r'^\s*END_DATA_BLOCK\b', re.IGNORECASE)
self.begin_regex = re.compile(r'^\s*BEGIN\b', re.IGNORECASE)
self.var_regex_simplified = re.compile(
r'^\s*(?P<name>[a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*'
r'(?P<typefull>'
r'(?:ARRAY\s*\[(?P<arraydims>[^\]]+?)\]\s*OF\s*)?'
r'(?P<basetype>(?:"[^"]+"|[a-zA-Z_][a-zA-Z0-9_]*))' # UDTs in quotes, primitives without
r'(?:\s*\[\s*(?P<stringlength>\d+)\s*\])?' # Optional string length
r')'
r'(?:\s*:=\s*(?P<initval>[^;]*?))??\s*' # Initial value: non-greedy, does not cross a semicolon
r';?\s*$', # Optional semicolon at the end of the declaration
re.IGNORECASE
)
self.array_dim_regex = re.compile(r'(\d+)\s*\.\.\s*(\d+)')
def _get_type_details(self, type_name_raw: str) -> Tuple[int, int, bool, str]:
type_name_cleaned = type_name_raw.strip('"').upper()
udt_original_case_name = type_name_raw.strip('"')
if type_name_cleaned in S7_PRIMITIVE_SIZES:
size, align, is_bool = S7_PRIMITIVE_SIZES[type_name_cleaned]
return size, align, is_bool, type_name_cleaned
elif udt_original_case_name in self.known_udts:
udt = self.known_udts[udt_original_case_name]
return udt.total_size_in_bytes, 2, False, udt_original_case_name # UDTs align like structs (word)
elif type_name_cleaned == "STRUCT": # For explicit STRUCT members
return 0, 2, False, "STRUCT" # Size determined by members, aligns to word
raise ValueError(f"Unknown data type or UDT not defined: '{type_name_raw}' (Cleaned: '{type_name_cleaned}', Original UDT: '{udt_original_case_name}')")
@staticmethod
def _adjust_children_offsets(children: List[VariableInfo], base_offset_add: float):
for child in children:
child.byte_offset += base_offset_add
if child.byte_offset == float(int(child.byte_offset)):
child.byte_offset = float(int(child.byte_offset))
if child.children:
S7Parser._adjust_children_offsets(child.children, base_offset_add) # Pass the original base_offset_add
def _parse_struct_members(self, lines: List[str], current_line_idx: int,
parent_members_list: List[VariableInfo],
active_context: OffsetContext,
is_top_level_struct_in_block: bool = False) -> int:
idx_to_process = current_line_idx
while idx_to_process < len(lines):
original_line_text_with_leading_space = lines[idx_to_process]
original_line_text = original_line_text_with_leading_space.strip()
line_to_parse = original_line_text
line_comment = None
comment_marker_idx = original_line_text.find("//")
if comment_marker_idx != -1:
line_to_parse = original_line_text[:comment_marker_idx].strip()
line_comment = original_line_text[comment_marker_idx + 2:].strip()
line_index_for_return = idx_to_process # Save index before incrementing
idx_to_process += 1 # Pre-increment for next loop or return
if not line_to_parse: # If line is empty after stripping comments, or was originally empty
continue
# Handle block endings (STRUCT, TYPE, DB, BEGIN) based on line_to_parse
if self.end_struct_regex.match(line_to_parse):
if not is_top_level_struct_in_block: # End of a nested STRUCT member
active_context.align_to_byte() # Ensure current byte is finished
if active_context.byte_offset % 2 != 0: # Struct total size must be even
active_context.byte_offset += 1
return idx_to_process # Return line number AFTER END_STRUCT
# If is_top_level_struct_in_block, END_STRUCT is for the main block, handled by END_TYPE/DB
if is_top_level_struct_in_block and \
(self.end_type_regex.match(line_to_parse) or \
self.end_db_regex.match(line_to_parse) or \
self.begin_regex.match(line_to_parse)):
active_context.align_to_byte() # Finish current byte for the whole UDT/DB declaration part
if active_context.byte_offset % 2 != 0: # Total size must be even
active_context.byte_offset += 1
return line_index_for_return # Return index OF the BEGIN/END_TYPE/END_DB line
# Check for STRUCT start for a member
if self.struct_start_regex.match(line_to_parse): # This is a nested STRUCT member declaration
# This line should be matched by var_regex as a variable of type STRUCT
# If we are here, it means var_regex didn't match it, or it's an anonymous struct.
# For simplicity, we assume named structs are parsed by var_regex.
# This path might need review if anonymous structs are common.
# For now, we assume explicit STRUCT members are named and caught by var_regex
pass # Let var_regex handle it if it's a named struct
var_match = self.var_regex_simplified.match(line_to_parse)
if var_match:
var_data = var_match.groupdict()
var_info = VariableInfo(name=var_data['name'], data_type="", byte_offset=0, size_in_bytes=0)
initial_val_str = var_data.get('initval')
if initial_val_str:
var_info.initial_value = initial_val_str.strip()
if line_comment:
var_info.comment = line_comment
raw_base_type = var_data['basetype'].strip()
var_info.data_type = raw_base_type.strip('"') # Store clean type for logic
if raw_base_type.startswith('"') and raw_base_type.endswith('"'):
var_info.udt_source_name = raw_base_type # Store with quotes if UDT
num_array_elements = 1
if var_data['arraydims']:
for dim_match in self.array_dim_regex.finditer(var_data['arraydims']):
var_info.array_dimensions.append(ArrayDimension(int(dim_match.group(1)), int(dim_match.group(2))))
if var_info.array_dimensions:
for dim in var_info.array_dimensions:
num_array_elements *= dim.count
# --- Offset and Size Calculation ---
if var_info.data_type.upper() == "STRUCT": # Member is explicitly 'STRUCT'
active_context.align_to_word()
var_info.byte_offset = active_context.get_combined_offset()
nested_struct_context = OffsetContext() # Children offsets are relative to this new context
# We need to find the line "STRUCT" and parse from the line AFTER it
# The current idx_to_process is already advanced.
# The _parse_struct_members call will continue from where the var_match line was,
# which is not right for a struct definition that spans multiple lines.
# This means "STRUCT" members need to be handled by the parser finding "STRUCT" keyword,
# not just by var_regex matching "Variable : STRUCT".
# For now, assume if var_regex matched, it's a named struct.
# The call to _parse_struct_members for children should start from the *next line* in the input lines list.
idx_after_nested_struct = self._parse_struct_members(
lines,
idx_to_process, # Start parsing members of this struct from the next available line
var_info.children,
nested_struct_context,
is_top_level_struct_in_block=False # This is a nested struct
)
var_info.size_in_bytes = nested_struct_context.byte_offset # This is the calculated size of the nested struct
# Adjust children offsets to be absolute
# The children's byte_offset are currently relative to the start of the nested_struct_context (0.0)
# They need to be relative to the DB/UDT.
# var_info.byte_offset is the absolute start of this STRUCT member.
for child in var_info.children:
child.byte_offset += var_info.byte_offset
if child.byte_offset == float(int(child.byte_offset)):
child.byte_offset = float(int(child.byte_offset))
if child.children: # If UDTs within struct had their own structs
S7Parser._adjust_children_offsets(child.children, var_info.byte_offset)
active_context.byte_offset += var_info.size_in_bytes # Advance parent context
idx_to_process = idx_after_nested_struct # Update main loop's line index
elif var_info.data_type.upper() == "STRING" and var_data['stringlength']:
var_info.string_length = int(var_data['stringlength'])
unit_size = var_info.string_length + 2
active_context.align_to_word() # STRINGs are word-aligned
var_info.byte_offset = active_context.get_combined_offset()
var_info.size_in_bytes = unit_size * num_array_elements
active_context.byte_offset += var_info.size_in_bytes
else: # Primitive or UDT instance
# Use var_info.data_type (cleaned name) for _get_type_details
unit_size_bytes, unit_alignment_req, is_bool, type_name_for_udt_lookup = self._get_type_details(var_info.data_type)
if is_bool:
var_info.bit_size = 1 # A single BOOL is 1 bit
# For an array of BOOLs, record offset of the first bit
var_info.byte_offset = active_context.get_combined_offset()
active_context.advance_bits(num_array_elements) # Advance context by total bits
# Calculate effective byte span for the BOOL or BOOL array
start_byte_abs = int(var_info.byte_offset)
start_bit_in_byte = int(round((var_info.byte_offset - start_byte_abs) * 10))
if num_array_elements == 1:
var_info.size_in_bytes = 0 # Convention for single bit
else: # Array of BOOLs
bits_remaining = num_array_elements
bytes_spanned = 0
if start_bit_in_byte > 0: # Starts mid-byte
bits_in_first_byte = 8 - start_bit_in_byte
if bits_remaining <= bits_in_first_byte:
bytes_spanned = 1
else:
bytes_spanned = 1
bits_remaining -= bits_in_first_byte
bytes_spanned += (bits_remaining + 7) // 8 # Ceiling division for remaining full bytes
else: # Starts on a byte boundary
bytes_spanned = (bits_remaining + 7) // 8
var_info.size_in_bytes = bytes_spanned
else: # Non-BOOL primitive or UDT
active_context.align_to_byte() # Finish any pending bits
if unit_alignment_req == 2: # WORD, DWORD, REAL, UDT, etc.
active_context.align_to_word()
var_info.byte_offset = active_context.get_combined_offset()
var_info.size_in_bytes = unit_size_bytes * num_array_elements
active_context.byte_offset += var_info.size_in_bytes
# If it's a UDT instance, expand its members
if type_name_for_udt_lookup in self.known_udts and not is_bool:
udt_def = self.known_udts[type_name_for_udt_lookup]
udt_instance_absolute_start_offset = var_info.byte_offset
for udt_member_template in udt_def.members:
expanded_member = copy.deepcopy(udt_member_template)
expanded_member.is_udt_expanded_member = True
# udt_member_template.byte_offset is relative to UDT start (0.0)
expanded_member.byte_offset += udt_instance_absolute_start_offset
if expanded_member.byte_offset == float(int(expanded_member.byte_offset)):
expanded_member.byte_offset = float(int(expanded_member.byte_offset))
# If the UDT member itself has children (e.g., a struct within the UDT)
# their offsets also need to be made absolute relative to the DB.
# The base_offset_add for _adjust_children_offsets should be the
# absolute start of the current UDT instance.
if expanded_member.children:
S7Parser._adjust_children_offsets(expanded_member.children, udt_instance_absolute_start_offset)
var_info.children.append(expanded_member)
parent_members_list.append(var_info)
else: # Line not matched by var_regex
# Check if it's a STRUCT definition line that var_regex MISSED
# This is a fallback / debug for when 'STRUCT' starts a definition block for a member
struct_keyword_match = self.struct_start_regex.match(line_to_parse)
if struct_keyword_match and not var_match : # An unnamed struct or parsing issue
print(f"DEBUG: Found 'STRUCT' keyword on line but not parsed by var_regex: '{original_line_text}' | Processed='{line_to_parse}'")
# This case might need more robust handling if anonymous structs are used or if var_regex is too strict for named structs
elif line_to_parse and \
not self.end_struct_regex.match(line_to_parse) and \
not (is_top_level_struct_in_block and \
(self.end_type_regex.match(line_to_parse) or \
self.end_db_regex.match(line_to_parse) or \
self.begin_regex.match(line_to_parse))):
print(f"DEBUG: Line not parsed as variable or known keyword: Original='{original_line_text}' | Processed='{line_to_parse}'")
# This final padding should ideally be handled when END_STRUCT or END_TYPE/DB is detected
# For is_top_level_struct_in_block, it's handled by BEGIN/END_TYPE/DB detection.
# For nested structs, it's handled by END_STRUCT detection.
return idx_to_process
def _parse_begin_block(self, lines: List[str], start_idx: int, db_info: DbInfo) -> int:
idx = start_idx
# Regex for assignment: path := value ;
# Path can contain dots, array indices. Value can be complex.
assignment_regex = re.compile(r'^\s*(?P<path>[a-zA-Z0-9_."\[\],\s]+?)\s*:=\s*(?P<value>.+?)\s*;?\s*$', re.IGNORECASE)
while idx < len(lines):
original_line = lines[idx].strip()
line_to_parse = original_line
comment_marker_idx = original_line.find("//")
if comment_marker_idx != -1:
line_to_parse = original_line[:comment_marker_idx].strip()
# comment = original_line[comment_marker_idx+2:].strip() # Comment in BEGIN usually not stored
if self.end_db_regex.match(line_to_parse): # END_DATA_BLOCK terminates BEGIN section
return idx # Return index of END_DATA_BLOCK
idx += 1 # Advance to next line
if not line_to_parse: continue # Skip empty lines
match = assignment_regex.match(line_to_parse)
if match:
path = match.group("path").strip()
value = match.group("value").strip().rstrip(';').strip()
db_info._initial_values_from_begin_block[path] = value
# else: # Optional: print lines in BEGIN that don't match assignment
# print(f"DEBUG: Line in BEGIN not matched as assignment: '{original_line}'")
raise SyntaxError("Expected END_DATA_BLOCK after BEGIN section, but not found.")
def _apply_current_values(self, members: List[VariableInfo], begin_values: Dict[str, str], current_path_prefix: str = ""):
for var_info in members:
# Construct full path, handling array indices if necessary (simplification: not handling array element assignment here)
# For UDTs, the path in BEGIN block directly names the UDT member, e.g., "MyUdtVar._Name"
full_member_path = f"{current_path_prefix}{var_info.name}"
if var_info.is_udt_expanded_member: # Path comes from the UDT parent
# This requires careful reconstruction if the assignment path is more complex
# For now, assume direct member access for expanded UDTs.
# Example: If parent is "Actual_Recipe", and child is "_Name", path is "Actual_Recipe._Name"
# current_path_prefix should be the name of the UDT variable instance.
pass # The full_member_path is already constructed above with current_path_prefix
if full_member_path in begin_values:
var_info.current_value = begin_values[full_member_path]
elif var_info.initial_value is not None: # Fallback to declaration initial value
var_info.current_value = var_info.initial_value
# If this member itself has children (it's a parsed STRUCT or an expanded UDT that contained STRUCTs),
# recurse into them.
if var_info.children and not var_info.is_udt_expanded_member: # Recurse for normal structs
self._apply_current_values(var_info.children, begin_values, f"{full_member_path}.")
# For expanded UDT members (is_udt_expanded_member = True), their values are set directly,
# and if THEY had children (structs within the UDT def), those are part of the UDT expansion.
# The BEGIN block paths would typically be like "MyUdtInstance.StructInUdt.Member".
# This simplified _apply_current_values might need enhancement for complex paths into UDTs.
def parse_file(self, filepath: str) -> ParsedData:
try:
with open(filepath, 'r', encoding='utf-8-sig') as f:
lines = f.readlines()
except Exception as e:
print(f"Error reading file {filepath}: {e}")
return self.parsed_data
current_block_handler: Optional[Union[UdtInfo, DbInfo]] = None
active_block_context = OffsetContext()
idx = 0
while idx < len(lines):
original_line = lines[idx].strip()
line_to_parse = original_line
comment_marker_idx = original_line.find("//")
if comment_marker_idx != -1:
line_to_parse = original_line[:comment_marker_idx].strip()
# Top-level comments usually not stored with block definition
type_match = self.type_start_regex.match(line_to_parse)
db_match = self.db_start_regex.match(line_to_parse)
if type_match:
if current_block_handler: print(f"Warning: Starting new TYPE block for '{type_match.group(1)}' before previous block '{current_block_handler.name}' ended.")
udt_name = type_match.group(1)
current_block_handler = UdtInfo(name=udt_name)
self.parsed_data.udts.append(current_block_handler)
active_block_context = OffsetContext()
idx += 1; continue
elif db_match:
if current_block_handler: print(f"Warning: Starting new DATA_BLOCK for '{db_match.group(1)}' before previous block '{current_block_handler.name}' ended.")
db_name = db_match.group(1)
current_block_handler = DbInfo(name=db_name)
self.parsed_data.dbs.append(current_block_handler)
active_block_context = OffsetContext()
idx += 1; continue
if not current_block_handler:
idx += 1; continue
# Inside a UDT or DB block definition part (before BEGIN for DBs)
prop_match = self.property_regex.match(original_line) # Properties can have comments
struct_keyword_on_line = self.struct_start_regex.match(line_to_parse) # Check for "STRUCT" keyword line
if prop_match:
key, value = prop_match.group(1).upper(), prop_match.group(2).strip()
attr_name = key.lower()
if hasattr(current_block_handler, attr_name):
setattr(current_block_handler, attr_name, value)
elif struct_keyword_on_line and not current_block_handler.members: # Start of main STRUCT for UDT/DB
# The line 'STRUCT' itself is consumed. Parsing of members starts from the next line.
idx = self._parse_struct_members(
lines, idx + 1, # Start from line AFTER "STRUCT"
current_block_handler.members,
active_block_context,
is_top_level_struct_in_block=True
)
# idx is now the line number of BEGIN, END_TYPE, or END_DB
continue # Let the main loop handle this new line index
elif self.begin_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo):
# Finalize size from declaration part
current_block_handler.total_size_in_bytes = active_block_context.byte_offset
idx = self._parse_begin_block(lines, idx + 1, current_block_handler) # idx + 1 to start after BEGIN
# idx is now the line of END_DATA_BLOCK
continue
elif self.end_type_regex.match(line_to_parse) and isinstance(current_block_handler, UdtInfo):
if not hasattr(current_block_handler, 'total_size_in_bytes') or current_block_handler.total_size_in_bytes == 0:
current_block_handler.total_size_in_bytes = active_block_context.byte_offset # Size from declarations
self.known_udts[current_block_handler.name] = current_block_handler
print(f"Parsed UDT: {current_block_handler.name}, Size: {current_block_handler.total_size_in_bytes} bytes. Members: {len(current_block_handler.members)}")
current_block_handler = None
elif self.end_db_regex.match(line_to_parse) and isinstance(current_block_handler, DbInfo):
if not hasattr(current_block_handler, 'total_size_in_bytes') or current_block_handler.total_size_in_bytes == 0: # If no BEGIN block, size is from declarations
current_block_handler.total_size_in_bytes = active_block_context.byte_offset
self._apply_current_values(current_block_handler.members, current_block_handler._initial_values_from_begin_block)
print(f"Parsed DB: {current_block_handler.name}, Decl. Size: {current_block_handler.total_size_in_bytes} bytes. Members: {len(current_block_handler.members)}")
current_block_handler = None
idx += 1
return self.parsed_data
def custom_json_serializer(obj: Any) -> Any:
if isinstance(obj, OffsetContext): return None # Don't serialize OffsetContext
if hasattr(obj, '__dict__'):
# Filter out None values, empty lists, and specific private fields
d = {k: v for k, v in obj.__dict__.items()
if v is not None and \
not (isinstance(v, list) and not v) and \
k != '_initial_values_from_begin_block'}
# Ensure 'is_udt_expanded_member' is present even if False (unless explicitly None)
if isinstance(obj, VariableInfo):
if 'is_udt_expanded_member' not in d and obj.is_udt_expanded_member is False:
d['is_udt_expanded_member'] = False
# If it was True, it would already be in d or caught by the general v is not None
elif obj.is_udt_expanded_member is True:
d['is_udt_expanded_member'] = True
return d
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
if __name__ == "__main__":
parser = S7Parser()
# IMPORTANT: Ensure this filepath points to your actual source file.
# The filename was changed to .txt for upload, adjust if your local file is .db
filepath = "db1001_format.db.txt" # Or "db1001_format.db" if that's the actual name
print(f"Attempting to parse: {filepath}")
parsed_result = parser.parse_file(filepath)
json_output_filename = "parsed_s7_data_expanded.json" # New output filename
print(f"\nParsing complete. Attempting to serialize to JSON.")
try:
json_output = json.dumps(parsed_result, default=custom_json_serializer, indent=2)
# print(json_output) # Optional: print to console for quick check
with open(json_output_filename, "w", encoding='utf-8') as f:
f.write(json_output)
print(f"Result saved to {json_output_filename}")
except Exception as e:
print(f"Error during JSON serialization or file writing: {e}")