import re import os import sys import json import glob script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration def find_working_directory(): configs = load_configuration() working_directory = configs.get("working_directory") if not working_directory: print("No working directory specified in the configuration file.") sys.exit(1) return working_directory def find_data_files(working_dir, source_pattern_suffix="_data.", target_pattern_suffix="_format."): """Find source and target files based on glob patterns and suffixes.""" all_db_files = glob.glob(os.path.join(working_dir, "*.db")) all_awl_files = glob.glob(os.path.join(working_dir, "*.awl")) all_txt_files = glob.glob(os.path.join(working_dir, "*.txt")) # For .db.txt style potential_files = all_db_files + all_awl_files + all_txt_files source_files_found = [] target_files_found = [] for f_path in potential_files: f_name = os.path.basename(f_path) # Check for source pattern (e.g., ends with _data.db or _data.db.txt) # We check if `source_pattern_suffix` is part of the name before the final extension name_part, ext_part = os.path.splitext(f_name) if name_part.endswith(source_pattern_suffix.rstrip('.')): # Handles cases like _data.db or _data.db (if suffix includes .) source_files_found.append(f_path) elif source_pattern_suffix.rstrip('.') in name_part and f_name.endswith(".txt") and ".db" in name_part : # for like _data.db.txt if name_part.split(".db")[0].endswith(source_pattern_suffix.rstrip('.')): source_files_found.append(f_path) # Check for target pattern name_part_target, ext_part_target = os.path.splitext(f_name) if name_part_target.endswith(target_pattern_suffix.rstrip('.')): target_files_found.append(f_path) elif target_pattern_suffix.rstrip('.') in name_part_target and f_name.endswith(".txt") and ".db" in name_part_target: # for like _format.db.txt if name_part_target.split(".db")[0].endswith(target_pattern_suffix.rstrip('.')): target_files_found.append(f_path) if not source_files_found: print(f"Warning: No source files found matching pattern ending with '{source_pattern_suffix}*' in '{working_dir}'.") # Try a broader search for any file containing '_data' if the strict one fails source_files_found = [f for f in potential_files if "_data" in os.path.basename(f)] if source_files_found: print(f"Found potential source files with less strict '_data' search: {source_files_found}") if not target_files_found: print(f"Warning: No target files found matching pattern ending with '{target_pattern_suffix}*' in '{working_dir}'.") # Try a broader search for any file containing '_format' target_files_found = [f for f in potential_files if "_format" in os.path.basename(f)] if target_files_found: print(f"Found potential target files with less strict '_format' search: {target_files_found}") # Logic to select the best match if multiple are found (e.g. prefer .db over .txt, or based on modification time) # For now, just take the first one found. source_file = source_files_found[0] if source_files_found else None target_file = target_files_found[0] if target_files_found else None if source_file: print(f"Selected source file: {os.path.basename(source_file)}") if target_file: print(f"Selected target file: {os.path.basename(target_file)}") return source_file, target_file def extract_sections(content): content = content.replace('\r\n', '\n') # Normalize line endings udt_definitions_content = "" udt_matches = list(re.finditer(r'(?s)(TYPE\s+.*?\s+END_TYPE\s*\n?)', content, re.IGNORECASE)) content_after_udts = content if udt_matches: udt_definitions_content = "".join(match.group(0) for match in udt_matches) last_udt_end = udt_matches[-1].end() content_after_udts = content[last_udt_end:] header_match = re.search(r'(?s)^(.*?(?:DATA_BLOCK.*?VERSION.*?\n))(.*?STRUCT)', content_after_udts, re.IGNORECASE) if header_match: header = header_match.group(1) # The rest_of_content should start from "STRUCT" rest_of_content = content_after_udts[header_match.start(2):] else: # Fallback if the specific DATA_BLOCK header is not found header_fallback_match = re.search(r'(?s)(.*?)(STRUCT)', content_after_udts, re.IGNORECASE) if header_fallback_match: header = header_fallback_match.group(1) # Ensure rest_of_content starts with "STRUCT" rest_of_content = content_after_udts[header_fallback_match.start(2) - len("STRUCT") if header_fallback_match.start(2) >= len("STRUCT") else 0:] if not rest_of_content.lstrip().upper().startswith("STRUCT"): # Verification # This might happen if STRUCT was at the very beginning. rest_of_content = "STRUCT" + rest_of_content # Prepend if missing else: # No STRUCT found at all after UDTs print("Critical Warning: No 'STRUCT' keyword found for DATA_BLOCK content.") return udt_definitions_content, content_after_udts, "", "", "" decl_match = re.search(r'(?s)STRUCT\s+(.*?)BEGIN', rest_of_content, re.IGNORECASE) decl_section = decl_match.group(1) if decl_match else "" init_match = re.search(r'(?s)BEGIN\s+(.*?)END_DATA_BLOCK', rest_of_content, re.IGNORECASE) init_section = init_match.group(1) if init_match else "" footer_match = re.search(r'(?s)END_DATA_BLOCK(.*?)$', rest_of_content, re.IGNORECASE) footer = footer_match.group(1) if footer_match else "" return udt_definitions_content, header, decl_section, init_section, footer def analyze_source_file(decl_section, init_section): source_decl_values = [] source_init_values = [] decl_idx = 0 for line_content in decl_section.split('\n'): line = line_content.strip() if not line or line.startswith('//') or \ (line.upper().startswith('STRUCT') and ';' not in line) or \ (line.upper().startswith('END_STRUCT') and ';' not in line) : continue if ';' in line: type_match = re.search(r':\s*([^:=;]+)', line) var_type = type_match.group(1).strip() if type_match else "" value = None comment = '' assignment_match = re.search(r':=\s*([^;]+)', line) if assignment_match: value = assignment_match.group(1).strip() comment_match = re.search(r';(.*)', line[assignment_match.end():]) if comment_match: comment = comment_match.group(1).strip() else: comment_match = re.search(r';(.*)', line) if comment_match: comment = comment_match.group(1).strip() source_decl_values.append({ "index": decl_idx, "type": var_type, "value": value, "comment": comment, "original_line_for_debug": line }) decl_idx += 1 init_idx = 0 for line_content in init_section.split('\n'): line = line_content.strip() if not line or line.startswith('//'): continue assignment_match = re.search(r':=\s*([^;]+)', line) if assignment_match and ';' in line: value = assignment_match.group(1).strip() comment_match = re.search(r';(.*)', line[assignment_match.end():]) comment = comment_match.group(1).strip() if comment_match else "" source_init_values.append({ "index": init_idx, "value": value, "comment": comment, "original_line_for_debug": line }) init_idx += 1 return source_decl_values, source_init_values def analyze_target_declarations(decl_section): target_decl_info_list = [] current_var_idx = 0 decl_lines_split = decl_section.split('\n') for line_num, line_content in enumerate(decl_lines_split): original_line = line_content line = line_content.strip() is_udt_ref = False udt_name = None var_type_str = None entry = { "line_index_in_section": line_num, "var_idx": -1, "is_udt_instance": False, "udt_name_if_any": None, "original_line": original_line, "type": None } if not line or line.startswith('//') or \ (line.upper().startswith('STRUCT') and ';' not in line and ':' not in line) or \ (line.upper().startswith('END_STRUCT') and ';' not in line and ':' not in line): target_decl_info_list.append(entry) continue if ';' in line: var_type_match = re.search(r':\s*([^:=;]+)', line) var_type_str = var_type_match.group(1).strip() if var_type_match else "" udt_match = re.search(r':\s*"(.*?)"', line) if udt_match: is_udt_ref = True udt_name = udt_match.group(1) entry.update({ "var_idx": current_var_idx, "is_udt_instance": is_udt_ref, "udt_name_if_any": udt_name, "type": var_type_str }) current_var_idx += 1 target_decl_info_list.append(entry) return target_decl_info_list def analyze_target_assignments(init_section): target_init_info_list = [] current_assign_idx = 0 init_lines_split = init_section.split('\n') for line_num, line_content in enumerate(init_lines_split): original_line = line_content line = line_content.strip() entry = {"line_index_in_section": line_num, "assign_idx": -1, "original_line": original_line} if not line or line.startswith('//'): target_init_info_list.append(entry) continue if ':=' in line and ';' in line: entry["assign_idx"] = current_assign_idx current_assign_idx += 1 target_init_info_list.append(entry) return target_init_info_list def is_compatible_type(source_value_str, target_type_str): if source_value_str is None: return True if not target_type_str: return True s_val = source_value_str.upper() t_type = target_type_str.upper() if "STRING" in t_type: return s_val.startswith("'") and s_val.endswith("'") if "BOOL" == t_type: return s_val in ["TRUE", "FALSE", "1", "0"] if "BYTE" == t_type: return s_val.startswith(("B#16#", "16#")) or (s_val.isdigit() and 0 <= int(s_val) <= 255) if "WORD" == t_type or "DWORD" == t_type : return s_val.startswith(("W#16#", "DW#16#", "16#")) if "INT" == t_type: try: int(s_val); return True except ValueError: return False if "DINT" == t_type: try: int(s_val[2:]) if s_val.startswith("L#") else int(s_val) return True except ValueError: return False if "REAL" == t_type: try: float(s_val.replace('E', 'e')); return True except ValueError: return False if t_type.startswith("ARRAY"): return True return True def transfer_values_by_position(source_file_path, target_file_path, output_file_path): try: with open(source_file_path, 'r', encoding='utf-8-sig') as f: source_content = f.read() with open(target_file_path, 'r', encoding='utf-8-sig') as f: target_content = f.read() source_udt_defs_ignored, source_header_ignored, source_decl_sec, source_init_sec, source_footer_ignored = extract_sections(source_content) target_udt_defs, target_header, target_decl_sec, target_init_sec, target_footer = extract_sections(target_content) source_decl_values, source_init_values = analyze_source_file(source_decl_sec, source_init_sec) s_decl_ptr = 0 decl_values_transferred_count = 0 init_values_transferred_count = 0 processed_target_udt_lines = [] if target_udt_defs: udt_section_lines = target_udt_defs.split('\n') in_udt_struct_definition = False for udt_line_content in udt_section_lines: line_ws = udt_line_content stripped_line = udt_line_content.strip() modified_udt_line = line_ws if stripped_line.upper().startswith("TYPE"): in_udt_struct_definition = False if stripped_line.upper().startswith("STRUCT") and not stripped_line.upper().startswith("END_STRUCT"): prev_lines = [l.strip().upper() for l in processed_target_udt_lines if l.strip()] if prev_lines and prev_lines[-1].startswith("TYPE"): in_udt_struct_definition = True if stripped_line.upper().startswith("END_STRUCT"): in_udt_struct_definition = False if in_udt_struct_definition and ';' in stripped_line and \ not stripped_line.upper().startswith(("STRUCT", "END_STRUCT", "//")): if s_decl_ptr < len(source_decl_values): src_data = source_decl_values[s_decl_ptr] src_val_str = src_data["value"] src_comment = src_data["comment"] type_m = re.search(r':\s*([^:=;]+)', stripped_line) target_member_type = type_m.group(1).strip() if type_m else "" if src_val_str is not None: if is_compatible_type(src_val_str, target_member_type): parts = line_ws.split(';',1) decl_part = parts[0] comment_part = f";{parts[1]}" if len(parts) > 1 else ";" if ':=' in decl_part: mod_decl = re.sub(r':=\s*[^;]+', f':= {src_val_str}', decl_part.rstrip()) else: mod_decl = decl_part.rstrip() + f' := {src_val_str}' final_comment = comment_part if comment_part == ";" and src_comment: final_comment = f"; {src_comment}" modified_udt_line = mod_decl + final_comment decl_values_transferred_count +=1 else: parts = line_ws.split(';',1) decl_part = parts[0] comment_part = f";{parts[1]}" if len(parts) > 1 else ";" if ':=' in decl_part: mod_decl = re.sub(r'\s*:=\s*[^;]+', '', decl_part.rstrip()) modified_udt_line = mod_decl + comment_part s_decl_ptr += 1 processed_target_udt_lines.append(modified_udt_line) target_udt_defs_updated = '\n'.join(processed_target_udt_lines) else: target_udt_defs_updated = target_udt_defs target_decl_block_info = analyze_target_declarations(target_decl_sec) output_decl_block_lines = target_decl_sec.split('\n') for target_info in target_decl_block_info: line_idx_in_sec = target_info["line_index_in_section"] if target_info["var_idx"] == -1 or target_info["is_udt_instance"]: continue if s_decl_ptr < len(source_decl_values): src_data = source_decl_values[s_decl_ptr] src_val_str = src_data["value"] src_comment = src_data["comment"] target_type = target_info["type"] original_target_line_ws = target_info["original_line"] if src_val_str is not None: if is_compatible_type(src_val_str, target_type): parts = original_target_line_ws.split(';',1) decl_part = parts[0] comment_part = f";{parts[1]}" if len(parts) > 1 else ";" if ':=' in decl_part: mod_decl = re.sub(r':=\s*[^;]+', f':= {src_val_str}', decl_part.rstrip()) else: mod_decl = decl_part.rstrip() + f' := {src_val_str}' final_comment = comment_part if comment_part == ";" and src_comment: final_comment = f"; {src_comment}" output_decl_block_lines[line_idx_in_sec] = mod_decl + final_comment decl_values_transferred_count +=1 else: parts = original_target_line_ws.split(';',1) decl_part = parts[0] comment_part = f";{parts[1]}" if len(parts) > 1 else ";" if ':=' in decl_part: mod_decl = re.sub(r'\s*:=\s*[^;]+', '', decl_part.rstrip()) output_decl_block_lines[line_idx_in_sec] = mod_decl + comment_part s_decl_ptr += 1 else: pass target_init_block_info = analyze_target_assignments(target_init_sec) output_init_block_lines = target_init_sec.split('\n') for target_info in target_init_block_info: line_idx_in_sec = target_info["line_index_in_section"] if target_info["assign_idx"] == -1: continue current_target_assign_idx = target_info["assign_idx"] original_target_line_ws = target_info["original_line"] if current_target_assign_idx < len(source_init_values): src_data = source_init_values[current_target_assign_idx] src_val_str = src_data["value"] src_comment = src_data["comment"] if src_val_str is not None: parts = original_target_line_ws.split(';',1) assign_part_target = parts[0] comment_part_target = f";{parts[1]}" if len(parts) > 1 else ";" mod_assign = re.sub(r':=\s*.*$', f':= {src_val_str}', assign_part_target.rstrip()) final_comment = comment_part_target if comment_part_target == ";" and src_comment: final_comment = f"; {src_comment}" output_init_block_lines[line_idx_in_sec] = mod_assign + final_comment init_values_transferred_count += 1 final_parts = [] if target_udt_defs_updated.strip(): final_parts.append(target_udt_defs_updated.rstrip('\n') + '\n\n') # Ensure space after UDTs elif target_udt_defs: final_parts.append(target_udt_defs) if target_header.strip() : final_parts.append(target_header) # Header already includes its spacing elif target_header and not target_udt_defs_updated.strip(): # if header has only newlines but no UDTs before it final_parts.append(target_header) if target_decl_sec.strip(): final_parts.append("STRUCT\n") final_parts.append('\n'.join(output_decl_block_lines)) final_parts.append("\n") elif target_decl_sec: final_parts.append(target_decl_sec) final_parts.append("BEGIN\n") final_parts.append('\n'.join(output_init_block_lines)) # Ensure END_DATA_BLOCK is on its own line or correctly spaced final_parts.append("\nEND_DATA_BLOCK") if target_footer: final_parts.append(target_footer.rstrip('\n') + '\n' if target_footer.strip() else target_footer) final_content = "".join(final_parts) # Ensure there's a newline at the end of the file if not final_content.endswith('\n'): final_content += '\n' # Remove potential multiple blank lines at the end, keep one final_content = re.sub(r'\n\s*\n$', '\n', final_content) with open(output_file_path, 'w', encoding='utf-8') as f: f.write(final_content) print(f"\nSuccessfully transferred {decl_values_transferred_count} initial values and {init_values_transferred_count} current values.") print(f"Output file created: {output_file_path}") return True except FileNotFoundError: print(f"Error: File not found. Source: '{source_file_path}', Target: '{target_file_path}'") return False except Exception as e: print(f"An error occurred during transfer: {e}") import traceback print(traceback.format_exc()) return False def main(): print("PLC Data Block Adapter - Advanced UDT Handling (Restored Auto File Find)") print("========================================================================") working_dir = find_working_directory() print(f"Using working directory: {working_dir}") # Using automatic file finding based on patterns # "_data." will match _data.db, _data.awl, _data.db.txt (if .txt is handled in find_data_files) source_f, target_f = find_data_files(working_dir, source_pattern_suffix="_data", target_pattern_suffix="_format") if not source_f or not target_f: print("Error: Could not automatically find required source or target files using patterns.") print("Please ensure files ending with e.g., '_data.db' (source) and '_format.db' (target) exist.") return False # Construct output name target_basename = os.path.basename(target_f) name_part, first_ext = os.path.splitext(target_basename) if first_ext.lower() == ".txt" and ".db" in name_part.lower(): # Handles .db.txt name_part, second_ext = os.path.splitext(name_part) # name_part is now "xxx_format" output_basename = name_part + "_updated" + second_ext + first_ext # e.g. xxx_format_updated.db.txt elif first_ext.lower() in ['.db', '.awl']: output_basename = name_part + "_updated" + first_ext else: # Fallback for other extensions or no extension output_basename = target_basename.rsplit('.',1)[0] if '.' in target_basename else target_basename output_basename += "_updated" + ('.' + target_basename.rsplit('.',1)[1] if '.' in target_basename else ".db") output_f = os.path.join(working_dir, output_basename) print(f"\nProcessing:") print(f" Source: {os.path.basename(source_f)}") print(f" Target: {os.path.basename(target_f)}") print(f" Output: {os.path.basename(output_f)}") success = transfer_values_by_position(source_f, target_f, output_f) if success: print(f"\nSUCCESS: Script finished. Output: '{os.path.basename(output_f)}'") else: print(f"\nERROR: Script failed. Please check messages above.") return success if __name__ == "__main__": main()