ParamManagerScripts/backend/script_groups/EmailCrono/utils/beautify.py

225 lines
8.6 KiB
Python

import json
import re
from pathlib import Path
from collections import defaultdict
from enum import Enum
class PatternType(Enum):
REGEX = "regex"
STRING = "string"
LEFT = "left"
RIGHT = "right"
SUBSTRING = "substring"
class BeautifyProcessor:
def __init__(self, rules_file):
self.rules_by_priority = self._load_rules(rules_file)
def _load_rules(self, rules_file):
rules_by_priority = defaultdict(list)
try:
with open(rules_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict) or 'rules' not in data:
raise ValueError("El archivo JSON debe contener un objeto con una clave 'rules'")
for rule in data['rules']:
try:
pattern = rule['pattern']
replacement = rule['replacement']
action = rule['action']
pattern_type = PatternType(rule.get('type', 'string'))
priority = int(rule.get('priority', 999))
# Para remove_block, convertir el patrón con ..... a una regex
if action == "remove_block":
pattern = self._convert_block_pattern_to_regex(pattern)
pattern_type = PatternType.REGEX
elif pattern_type == PatternType.REGEX:
pattern = re.compile(pattern)
rules_by_priority[priority].append((pattern, replacement, action, pattern_type))
except KeyError as e:
print(f"Error en regla: falta campo requerido {e}")
continue
except ValueError as e:
print(f"Error en regla: tipo de patrón inválido {rule.get('type')}")
continue
except Exception as e:
print(f"Error procesando regla: {e}")
continue
except json.JSONDecodeError as e:
print(f"Error decodificando JSON: {e}")
except Exception as e:
print(f"Error cargando reglas: {e}")
return rules_by_priority
def _convert_block_pattern_to_regex(self, pattern):
"""
Convierte un patrón de bloque con ..... en una expresión regular.
Primero maneja el comodín ..... y luego escapa el resto de caracteres especiales.
"""
# Reemplazar temporalmente los ..... con un marcador único
marker = "__BLOCK_MARKER__"
pattern = pattern.replace(".....", marker)
# Escapar caracteres especiales
pattern = re.escape(pattern)
# Restaurar el marcador con el patrón .*?
pattern = pattern.replace(marker, ".*?")
return re.compile(f'(?s){pattern}')
def _process_remove_block(self, text, pattern):
result = text
matches = list(pattern.finditer(result))
for match in reversed(matches):
start, end = match.span()
line_start = result.rfind('\n', 0, start) + 1
if line_start == 0:
line_start = 0
line_end = result.find('\n', end)
if line_end == -1:
line_end = len(result)
else:
line_end += 1
while line_start > 0 and result[line_start-1:line_start] == '\n' and \
(line_start == 1 or result[line_start-2:line_start-1] == '\n'):
line_start -= 1
while line_end < len(result) and result[line_end-1:line_end] == '\n' and \
(line_end == len(result)-1 or result[line_end:line_end+1] == '\n'):
line_end += 1
result = result[:line_start] + result[line_end:]
return result
def _line_matches(self, line, pattern, pattern_type):
line = line.strip()
if pattern_type == PatternType.REGEX:
return bool(pattern.search(line))
elif pattern_type == PatternType.LEFT:
return line.startswith(pattern)
elif pattern_type == PatternType.RIGHT:
return line.endswith(pattern)
elif pattern_type == PatternType.SUBSTRING:
return pattern in line
elif pattern_type == PatternType.STRING:
return line == pattern
return False
def _apply_replace(self, text, pattern, replacement, pattern_type):
if pattern_type == PatternType.REGEX:
return pattern.sub(replacement, text)
elif pattern_type == PatternType.STRING:
return text.replace(pattern, replacement)
elif pattern_type == PatternType.SUBSTRING:
return text.replace(pattern, replacement)
elif pattern_type == PatternType.LEFT:
lines = text.splitlines()
result_lines = []
for line in lines:
if line.strip().startswith(pattern):
result_lines.append(line.replace(pattern, replacement, 1))
else:
result_lines.append(line)
return '\n'.join(result_lines)
elif pattern_type == PatternType.RIGHT:
lines = text.splitlines()
result_lines = []
for line in lines:
if line.strip().endswith(pattern):
result_lines.append(line[:line.rindex(pattern)] + replacement + line[line.rindex(pattern) + len(pattern):])
else:
result_lines.append(line)
return '\n'.join(result_lines)
return text
def process_text(self, text):
if not text:
return text
result = text
for priority in sorted(self.rules_by_priority.keys()):
rules = self.rules_by_priority[priority]
print(f"Aplicando reglas de prioridad {priority}")
for pattern, replacement, action, pattern_type in rules:
try:
if action == "remove_block":
result = self._process_remove_block(result, pattern)
elif action == "replace":
result = self._apply_replace(result, pattern, replacement, pattern_type)
elif action == "remove_line":
result = self._process_remove_line(result, pattern, pattern_type)
elif action in ["add_before", "add_after"]:
result = self._process_line_additions(result, pattern, replacement, action, pattern_type)
except Exception as e:
print(f"Error aplicando regla {pattern}: {e}")
continue
return result
def process_file(self, input_file, output_file=None):
try:
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
processed_content = self.process_text(content)
output = output_file or input_file
with open(output, 'w', encoding='utf-8') as f:
f.write(processed_content)
except Exception as e:
print(f"Error procesando archivo {input_file}: {e}")
def _process_remove_line(self, text, pattern, pattern_type):
lines = text.splitlines()
result_lines = []
skip_next_empty = False
for i, line in enumerate(lines):
should_remove = self._line_matches(line, pattern, pattern_type)
if should_remove:
if i < len(lines) - 1 and not lines[i + 1].strip():
skip_next_empty = True
continue
if skip_next_empty and not line.strip():
skip_next_empty = False
continue
result_lines.append(line)
skip_next_empty = False
return '\n'.join(result_lines)
def _process_line_additions(self, text, pattern, replacement, action, pattern_type):
lines = text.splitlines()
result_lines = []
for line in lines:
if self._line_matches(line, pattern, pattern_type):
if action == "add_before":
result_lines.append(replacement)
result_lines.append(line)
else: # add_after
result_lines.append(line)
result_lines.append(replacement)
else:
result_lines.append(line)
return '\n'.join(result_lines)