335 lines
18 KiB
Python
335 lines
18 KiB
Python
import re
|
|
import os
|
|
import sys # Not strictly needed by this version but often kept from original
|
|
import glob
|
|
import pandas as pd # For Excel writing
|
|
|
|
# --- Functions for script operation ---
|
|
|
|
def find_working_directory():
|
|
"""
|
|
Finds the working directory.
|
|
Defaults to current directory. Adapt if specific configuration is needed.
|
|
"""
|
|
print("Info: `find_working_directory_from_x1` is using the current directory.")
|
|
return os.getcwd()
|
|
|
|
def extract_sections(content):
|
|
"""
|
|
Extracts UDT definitions, main declaration section, and initialization section from S7 AWL/DB content.
|
|
Uses re.IGNORECASE and re.DOTALL (via ?is) for matching keywords across different casings and newlines.
|
|
"""
|
|
content = content.replace('\r\n', '\n') # Normalize line endings
|
|
|
|
udt_definitions_content = ""
|
|
# Regex to find TYPE...END_TYPE blocks (UDT definitions)
|
|
udt_matches = list(re.finditer(r'(?is)(TYPE\s+.*?\s+END_TYPE\s*\n?)', content))
|
|
|
|
content_after_udts = content
|
|
if udt_matches:
|
|
udt_definitions_content = "".join(match.group(0) for match in udt_matches)
|
|
# Get content after the last UDT definition
|
|
last_udt_end = udt_matches[-1].end()
|
|
content_after_udts = content[last_udt_end:]
|
|
|
|
header_text = "" # Placeholder, not actively used in this script's comparison logic
|
|
rest_of_content_for_struct = content_after_udts
|
|
|
|
# Try to find the main DATA_BLOCK header and the start of its STRUCT
|
|
header_match = re.search(r'(?is)^(.*?(?:DATA_BLOCK.*?VERSION.*?\n))(.*?STRUCT)', content_after_udts)
|
|
if header_match:
|
|
# Content for further parsing starts at "STRUCT"
|
|
rest_of_content_for_struct = content_after_udts[header_match.start(2):]
|
|
else:
|
|
# Fallback: find the first "STRUCT" if the specific header pattern isn't met
|
|
header_fallback_match = re.search(r'(?is)(.*?)(STRUCT)', content_after_udts)
|
|
if header_fallback_match:
|
|
rest_of_content_for_struct = content_after_udts[header_fallback_match.start(2):]
|
|
else:
|
|
# If no STRUCT is found, declaration section will be empty
|
|
print(f"Warning: No 'STRUCT' keyword found for main DB declarations in a content block.")
|
|
|
|
# Declaration section: from the found STRUCT up to BEGIN
|
|
decl_match = re.search(r'(?is)STRUCT\s*(.*?)BEGIN', rest_of_content_for_struct)
|
|
decl_section = decl_match.group(1).strip() if decl_match else ""
|
|
|
|
# Initialization section: from BEGIN up to END_DATA_BLOCK
|
|
init_match = re.search(r'(?is)BEGIN\s*(.*?)END_DATA_BLOCK', rest_of_content_for_struct)
|
|
init_section = init_match.group(1).strip() if init_match else ""
|
|
|
|
# Footer after END_DATA_BLOCK isn't used
|
|
return udt_definitions_content, header_text, decl_section, init_section, ""
|
|
|
|
|
|
def find_comparison_files_detailed(working_dir, data_suffix="_data", format_suffix="_format", updated_suffix_part="_updated"):
|
|
"""Finds data, format, and _updated files based on naming conventions."""
|
|
all_files_in_dir = []
|
|
for ext_pattern in ["*.db", "*.awl", "*.txt"]: # Common S7 export extensions
|
|
all_files_in_dir.extend(glob.glob(os.path.join(working_dir, ext_pattern)))
|
|
# Normalize paths for consistent comparisons and ensure uniqueness
|
|
all_files_in_dir = sorted(list(set(os.path.normpath(f) for f in all_files_in_dir)))
|
|
|
|
found_paths = {'data': None, 'format': None, 'updated': None}
|
|
|
|
def select_best_file(file_list):
|
|
if not file_list: return None
|
|
# Prioritize: .db, then .awl, then .txt
|
|
file_list.sort(key=lambda x: ('.db' not in x.lower(), '.awl' not in x.lower(), '.txt' not in x.lower()))
|
|
return file_list[0]
|
|
|
|
# Find _data file: contains data_suffix, does not contain updated_suffix_part
|
|
data_candidates = [f for f in all_files_in_dir if data_suffix in os.path.basename(f).lower() and updated_suffix_part not in os.path.basename(f).lower()]
|
|
found_paths['data'] = select_best_file(data_candidates)
|
|
|
|
# Find _format file: contains format_suffix, does not contain updated_suffix_part
|
|
format_candidates = [f for f in all_files_in_dir if format_suffix in os.path.basename(f).lower() and updated_suffix_part not in os.path.basename(f).lower()]
|
|
if found_paths['data'] and format_candidates: # Ensure it's not the same as _data file
|
|
format_candidates = [f for f in format_candidates if f != found_paths['data']]
|
|
found_paths['format'] = select_best_file(format_candidates)
|
|
|
|
# Find _updated file:
|
|
# Strategy 1: Based on format_file name (most reliable if format_file found)
|
|
if found_paths['format']:
|
|
format_basename = os.path.basename(found_paths['format'])
|
|
name_part, first_ext = os.path.splitext(format_basename)
|
|
updated_basename_candidate = ""
|
|
# Handle double extensions like ".db.txt" or ".awl.txt"
|
|
if first_ext.lower() == ".txt" and ('.db' in name_part.lower() or '.awl' in name_part.lower()):
|
|
base_name_for_main_ext, second_ext = os.path.splitext(name_part)
|
|
updated_basename_candidate = base_name_for_main_ext + updated_suffix_part + second_ext + first_ext
|
|
else: # Single extension
|
|
updated_basename_candidate = name_part + updated_suffix_part + first_ext
|
|
|
|
potential_updated_path = os.path.join(working_dir, updated_basename_candidate)
|
|
if os.path.exists(potential_updated_path) and potential_updated_path in all_files_in_dir:
|
|
found_paths['updated'] = potential_updated_path
|
|
|
|
# Strategy 2: If not found by deriving from format_file, search more broadly
|
|
if not found_paths['updated']:
|
|
updated_candidates = [f for f in all_files_in_dir if updated_suffix_part in os.path.basename(f).lower()]
|
|
if found_paths['format'] and updated_candidates: # Prefer updated file related to format file's base name
|
|
format_base = os.path.basename(found_paths['format']).split(format_suffix)[0]
|
|
updated_candidates = [f for f in updated_candidates if format_base in os.path.basename(f)]
|
|
|
|
# Exclude already identified data and format files
|
|
if found_paths['data'] and updated_candidates: updated_candidates = [f for f in updated_candidates if f != found_paths['data']]
|
|
if found_paths['format'] and updated_candidates: updated_candidates = [f for f in updated_candidates if f != found_paths['format']]
|
|
found_paths['updated'] = select_best_file(updated_candidates)
|
|
|
|
print("Identified files for comparison:")
|
|
for key, val in found_paths.items():
|
|
print(f" {key.capitalize()} file: {os.path.basename(val) if val else 'Not found'}")
|
|
return found_paths['data'], found_paths['format'], found_paths['updated']
|
|
|
|
|
|
def get_variables_from_section_content(section_str, section_type="declaration"):
|
|
""" Parses a declaration or initialization section string and returns a list of variable dicts. """
|
|
variables = []
|
|
idx = 0
|
|
lines = section_str.replace('\r\n', '\n').split('\n')
|
|
|
|
for line_content in lines:
|
|
line = line_content.strip()
|
|
if not line or line.startswith('//'): continue # Skip empty or comment lines
|
|
|
|
line_upper = line.upper()
|
|
# Skip lines that are purely structural (STRUCT, TYPE, END_STRUCT)
|
|
# unless they also contain a full declaration/assignment on the same line.
|
|
if (line_upper == 'STRUCT' or line_upper.startswith('TYPE ') or line_upper == 'END_STRUCT' or line_upper == 'BEGIN' or line_upper == 'END_DATA_BLOCK'):
|
|
if not (':' in line and ';' in line or ':=' in line and ';' in line ): # if not also a var line
|
|
continue
|
|
|
|
var_name, var_type, value = None, None, None
|
|
|
|
if section_type == "declaration": # Expect: VarName : VarType [:= InitialValue] ;
|
|
if ':' in line and ';' in line:
|
|
# Name: part before ':' (handles simple and "quoted" names)
|
|
name_match = re.match(r'^\s*(\"(?:\\\"|[^\"])*\"|[a-zA-Z_][\w]*)', line, re.IGNORECASE)
|
|
var_name = name_match.group(1).strip().replace('"', "") if name_match else None
|
|
|
|
# Type: part between ':' and potential ':=' or ';' (handles "UDT", simple, ARRAY)
|
|
type_match = re.search(r':\s*(\"[^\"]+\"|[^:=;]+)', line, re.IGNORECASE)
|
|
var_type = type_match.group(1).strip().replace('"', "") if type_match else None
|
|
|
|
# Value: part between ':=' and ';'
|
|
assign_match = re.search(r':=\s*([^;]+)', line, re.IGNORECASE)
|
|
if assign_match: value = assign_match.group(1).strip()
|
|
|
|
if not var_name or not var_type: continue # Must have name and type for a declaration
|
|
else: continue # Not a declaration line by this rule
|
|
|
|
elif section_type == "initialization": # Expect: VarNameOrPath := Value ;
|
|
if ':=' in line and ';' in line:
|
|
# Name/Path: part before ':=' (handles "Quoted.Path", Simple.Path, Array[1].Path)
|
|
name_match = re.match(r'^\s*(\"(?:\\\"|[^\"])*\"|[a-zA-Z_][\w"\[\],\.]*(?:\[.*?\]|\.[a-zA-Z_][\w"\[\],\.]*)*)\s*:=', line, re.IGNORECASE)
|
|
var_name = name_match.group(1).strip().replace('"', "") if name_match else None
|
|
|
|
# Value: part between ':=' and ';'
|
|
value_match = re.search(r':=\s*([^;]+)', line, re.IGNORECASE)
|
|
value = value_match.group(1).strip() if value_match else None
|
|
|
|
if not var_name or value is None : continue # Must have name and value for assignment
|
|
else: continue # Not an assignment line
|
|
|
|
if var_name is not None: # If a name was captured (and other conditions met), record it
|
|
variables.append({
|
|
"index": idx, "name": var_name, "type": var_type, "value": value,
|
|
"original_line": line_content
|
|
})
|
|
idx += 1
|
|
return variables
|
|
|
|
def process_file_for_vars(file_path):
|
|
"""
|
|
Reads a file, extracts main STRUCT declarations and BEGIN block initializations.
|
|
UDT definitions themselves are not included in the returned `main_struct_decl_vars`.
|
|
"""
|
|
if not file_path or not os.path.exists(file_path):
|
|
return [], [] # Return empty lists if file not found
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8-sig') as f: # utf-8-sig handles BOM
|
|
content = f.read()
|
|
except Exception as e:
|
|
print(f"Error reading file {file_path}: {e}")
|
|
return [], []
|
|
|
|
# udt_definitions_content is extracted but not directly used for the comparison lists below
|
|
_udt_definitions_content, _header, decl_content_main, init_content, _footer = extract_sections(content)
|
|
|
|
# "main_struct_decl_vars" are from the main DATA_BLOCK's STRUCT section (initial values).
|
|
main_struct_decl_vars = get_variables_from_section_content(decl_content_main, "declaration")
|
|
|
|
# "begin_block_init_vars" are from the BEGIN...END_DATA_BLOCK section (current values).
|
|
begin_block_init_vars = get_variables_from_section_content(init_content, "initialization")
|
|
|
|
return main_struct_decl_vars, begin_block_init_vars
|
|
|
|
|
|
def generate_excel_comparison(data_file, format_file, updated_file, output_excel_path):
|
|
"""Generates an Excel file with two sheets comparing variables from three source files."""
|
|
|
|
print(f"\nProcessing _data file: {os.path.basename(data_file) if data_file else 'N/A'}")
|
|
data_decl_vars, data_init_vars = process_file_for_vars(data_file)
|
|
print(f" Found {len(data_decl_vars)} declaration vars, {len(data_init_vars)} initialization vars in _data file.")
|
|
|
|
print(f"Processing _format file: {os.path.basename(format_file) if format_file else 'N/A'}")
|
|
format_decl_vars, format_init_vars = process_file_for_vars(format_file)
|
|
print(f" Found {len(format_decl_vars)} declaration vars, {len(format_init_vars)} initialization vars in _format file.")
|
|
|
|
print(f"Processing _updated file: {os.path.basename(updated_file) if updated_file else 'N/A'}")
|
|
updated_decl_vars, updated_init_vars = process_file_for_vars(updated_file)
|
|
print(f" Found {len(updated_decl_vars)} declaration vars, {len(updated_init_vars)} initialization vars in _updated file.")
|
|
|
|
|
|
placeholder_var = {"name": "", "type": "", "value": "", "original_line": ""}
|
|
# Define column order once, will be used for both sheets
|
|
column_order = ["Variable Name (_data / _format)", "Data Type", "Value (_data)", "Value (_format)", "Value (_updated)"]
|
|
|
|
# --- Prepare data for "Declarations (Initial Values)" sheet ---
|
|
decl_excel_rows = []
|
|
# Determine max length for declaration rows based on non-empty lists
|
|
decl_lengths = [len(lst) for lst in [data_decl_vars, format_decl_vars, updated_decl_vars] if lst is not None]
|
|
max_decl_len = max(decl_lengths) if decl_lengths else 0
|
|
|
|
print(f"\nComparing {max_decl_len} positional declaration entries (STRUCT section)...")
|
|
for i in range(max_decl_len):
|
|
var_d = data_decl_vars[i] if data_decl_vars and i < len(data_decl_vars) else placeholder_var
|
|
var_f = format_decl_vars[i] if format_decl_vars and i < len(format_decl_vars) else placeholder_var
|
|
var_u = updated_decl_vars[i] if updated_decl_vars and i < len(updated_decl_vars) else placeholder_var
|
|
|
|
# Construct combined name
|
|
name_d_str = var_d['name'] if var_d['name'] else ""
|
|
name_f_str = var_f['name'] if var_f['name'] else ""
|
|
combined_name = f"{name_d_str} / {name_f_str}".strip(" /")
|
|
if not combined_name: combined_name = var_u['name'] or name_d_str or name_f_str # Fallback
|
|
|
|
# Determine Data Type: Priority: format, then updated, then data
|
|
type_to_use = var_f['type'] or var_u['type'] or var_d['type'] or "N/A"
|
|
|
|
decl_excel_rows.append({
|
|
"Variable Name (_data / _format)": combined_name,
|
|
"Data Type": type_to_use,
|
|
"Value (_data)": str(var_d['value']) if var_d['value'] is not None else "",
|
|
"Value (_format)": str(var_f['value']) if var_f['value'] is not None else "",
|
|
"Value (_updated)": str(var_u['value']) if var_u['value'] is not None else ""
|
|
})
|
|
df_declarations = pd.DataFrame(decl_excel_rows)
|
|
if not df_declarations.empty: # Apply column order if DataFrame is not empty
|
|
for col in column_order:
|
|
if col not in df_declarations.columns: df_declarations[col] = "" # Ensure all columns exist
|
|
df_declarations = df_declarations[column_order]
|
|
|
|
|
|
# --- Prepare data for "Initializations (Current Values)" sheet ---
|
|
init_excel_rows = []
|
|
init_lengths = [len(lst) for lst in [data_init_vars, format_init_vars, updated_init_vars] if lst is not None]
|
|
max_init_len = max(init_lengths) if init_lengths else 0
|
|
|
|
print(f"Comparing {max_init_len} positional initialization entries (BEGIN block)...")
|
|
for i in range(max_init_len):
|
|
var_d = data_init_vars[i] if data_init_vars and i < len(data_init_vars) else placeholder_var
|
|
var_f = format_init_vars[i] if format_init_vars and i < len(format_init_vars) else placeholder_var
|
|
var_u = updated_init_vars[i] if updated_init_vars and i < len(updated_init_vars) else placeholder_var
|
|
|
|
name_d_str = var_d['name'] if var_d['name'] else ""
|
|
name_f_str = var_f['name'] if var_f['name'] else ""
|
|
combined_name = f"{name_d_str} / {name_f_str}".strip(" /")
|
|
if not combined_name: combined_name = var_u['name'] or name_d_str or name_f_str
|
|
|
|
init_excel_rows.append({
|
|
"Variable Name (_data / _format)": combined_name,
|
|
"Data Type": "N/A", # Type is not usually re-declared in initialization lines
|
|
"Value (_data)": str(var_d['value']) if var_d['value'] is not None else "",
|
|
"Value (_format)": str(var_f['value']) if var_f['value'] is not None else "",
|
|
"Value (_updated)": str(var_u['value']) if var_u['value'] is not None else ""
|
|
})
|
|
df_initializations = pd.DataFrame(init_excel_rows)
|
|
if not df_initializations.empty: # Apply column order
|
|
for col in column_order:
|
|
if col not in df_initializations.columns: df_initializations[col] = ""
|
|
df_initializations = df_initializations[column_order]
|
|
|
|
# --- Write to Excel with two sheets ---
|
|
try:
|
|
with pd.ExcelWriter(output_excel_path, engine='openpyxl') as writer:
|
|
if not df_declarations.empty:
|
|
df_declarations.to_excel(writer, sheet_name='Declarations (Initial Values)', index=False)
|
|
print(f"Written 'Declarations (Initial Values)' sheet with {len(df_declarations)} rows.")
|
|
else:
|
|
print("No data for 'Declarations (Initial Values)' sheet.")
|
|
|
|
if not df_initializations.empty:
|
|
df_initializations.to_excel(writer, sheet_name='Initializations (Current Values)', index=False)
|
|
print(f"Written 'Initializations (Current Values)' sheet with {len(df_initializations)} rows.")
|
|
else:
|
|
print("No data for 'Initializations (Current Values)' sheet.")
|
|
|
|
if df_declarations.empty and df_initializations.empty:
|
|
print("No data written to Excel as both datasets are empty.")
|
|
else:
|
|
print(f"\nSuccessfully generated Excel comparison: {output_excel_path}")
|
|
|
|
except Exception as e:
|
|
print(f"Error writing Excel file {output_excel_path}: {e}")
|
|
|
|
|
|
def main_comparator():
|
|
print("S7 Data Block Comparator to Excel (Multi-Sheet)")
|
|
print("==============================================")
|
|
working_dir = find_working_directory()
|
|
print(f"Using working directory: {working_dir}")
|
|
|
|
data_f, format_f, updated_f = find_comparison_files_detailed(working_dir)
|
|
|
|
if not any([data_f, format_f, updated_f]): # Check if at least one relevant file was found
|
|
print("\nError: Could not find a sufficient set of input files (_data, _format, _updated). Exiting.")
|
|
return
|
|
|
|
output_filename = "S7_DB_Comparison_MultiSheet.xlsx"
|
|
output_excel_file = os.path.join(working_dir, output_filename)
|
|
|
|
generate_excel_comparison(data_f, format_f, updated_f, output_excel_file)
|
|
|
|
if __name__ == "__main__":
|
|
main_comparator() |