ParamManagerScripts/backend/script_groups/EmailCrono/utils/beautify.py

250 lines
8.7 KiB
Python

import json
import re
from pathlib import Path
from collections import defaultdict
from enum import Enum
class PatternType(Enum):
REGEX = "regex"
STRING = "string"
LEFT = "left"
RIGHT = "right"
SUBSTRING = "substring"
class BeautifyProcessor:
def __init__(self, rules_file):
self.rules_by_priority = self._load_rules(rules_file)
def _load_rules(self, rules_file):
rules_by_priority = defaultdict(list)
try:
with open(rules_file, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict) or "rules" not in data:
raise ValueError(
"El archivo JSON debe contener un objeto con una clave 'rules'"
)
for rule in data["rules"]:
try:
pattern = rule["pattern"]
replacement = rule["replacement"]
action = rule["action"]
pattern_type = PatternType(rule.get("type", "string"))
priority = int(rule.get("priority", 999))
# Para remove_block, convertir el patrón con ..... a una regex
if action == "remove_block":
pattern = self._convert_block_pattern_to_regex(pattern)
pattern_type = PatternType.REGEX
elif pattern_type == PatternType.REGEX:
pattern = re.compile(pattern)
rules_by_priority[priority].append(
(pattern, replacement, action, pattern_type)
)
except KeyError as e:
print(f"Error en regla: falta campo requerido {e}")
continue
except ValueError as e:
print(f"Error en regla: tipo de patrón inválido {rule.get('type')}")
continue
except Exception as e:
print(f"Error procesando regla: {e}")
continue
except json.JSONDecodeError as e:
print(f"Error decodificando JSON: {e}")
except Exception as e:
print(f"Error cargando reglas: {e}")
return rules_by_priority
def _convert_block_pattern_to_regex(self, pattern):
"""
Convierte un patrón de bloque con ..... en una expresión regular.
Primero maneja el comodín ..... y luego escapa el resto de caracteres especiales.
"""
# Reemplazar temporalmente los ..... con un marcador único
marker = "__BLOCK_MARKER__"
pattern = pattern.replace(".....", marker)
# Escapar caracteres especiales
pattern = re.escape(pattern)
# Restaurar el marcador con el patrón .*?
pattern = pattern.replace(marker, ".*?")
return re.compile(f"(?s){pattern}")
def _process_remove_block(self, text, pattern):
result = text
matches = list(pattern.finditer(result))
for match in reversed(matches):
start, end = match.span()
line_start = result.rfind("\n", 0, start) + 1
if line_start == 0:
line_start = 0
line_end = result.find("\n", end)
if line_end == -1:
line_end = len(result)
else:
line_end += 1
while (
line_start > 0
and result[line_start - 1 : line_start] == "\n"
and (line_start == 1 or result[line_start - 2 : line_start - 1] == "\n")
):
line_start -= 1
while (
line_end < len(result)
and result[line_end - 1 : line_end] == "\n"
and (
line_end == len(result) - 1
or result[line_end : line_end + 1] == "\n"
)
):
line_end += 1
result = result[:line_start] + result[line_end:]
return result
def _line_matches(self, line, pattern, pattern_type):
line = line.strip()
if pattern_type == PatternType.REGEX:
return bool(pattern.search(line))
elif pattern_type == PatternType.LEFT:
return line.startswith(pattern)
elif pattern_type == PatternType.RIGHT:
return line.endswith(pattern)
elif pattern_type == PatternType.SUBSTRING:
return pattern in line
elif pattern_type == PatternType.STRING:
return line == pattern
return False
def _apply_replace(self, text, pattern, replacement, pattern_type):
if pattern_type == PatternType.REGEX:
return pattern.sub(replacement, text)
elif pattern_type == PatternType.STRING:
return text.replace(pattern, replacement)
elif pattern_type == PatternType.SUBSTRING:
return text.replace(pattern, replacement)
elif pattern_type == PatternType.LEFT:
lines = text.splitlines()
result_lines = []
for line in lines:
if line.strip().startswith(pattern):
result_lines.append(line.replace(pattern, replacement, 1))
else:
result_lines.append(line)
return "\n".join(result_lines)
elif pattern_type == PatternType.RIGHT:
lines = text.splitlines()
result_lines = []
for line in lines:
if line.strip().endswith(pattern):
result_lines.append(
line[: line.rindex(pattern)]
+ replacement
+ line[line.rindex(pattern) + len(pattern) :]
)
else:
result_lines.append(line)
return "\n".join(result_lines)
return text
def process_text(self, text):
if not text:
return text
result = text
for priority in sorted(self.rules_by_priority.keys()):
rules = self.rules_by_priority[priority]
for pattern, replacement, action, pattern_type in rules:
try:
if action == "remove_block":
result = self._process_remove_block(result, pattern)
elif action == "replace":
result = self._apply_replace(
result, pattern, replacement, pattern_type
)
elif action == "remove_line":
result = self._process_remove_line(
result, pattern, pattern_type
)
elif action in ["add_before", "add_after"]:
result = self._process_line_additions(
result, pattern, replacement, action, pattern_type
)
except Exception as e:
print(f"Error aplicando regla {pattern}: {e}")
continue
return result
def process_file(self, input_file, output_file=None):
try:
with open(input_file, "r", encoding="utf-8") as f:
content = f.read()
processed_content = self.process_text(content)
output = output_file or input_file
with open(output, "w", encoding="utf-8") as f:
f.write(processed_content)
except Exception as e:
print(f"Error procesando archivo {input_file}: {e}")
def _process_remove_line(self, text, pattern, pattern_type):
lines = text.splitlines()
result_lines = []
skip_next_empty = False
for i, line in enumerate(lines):
should_remove = self._line_matches(line, pattern, pattern_type)
if should_remove:
if i < len(lines) - 1 and not lines[i + 1].strip():
skip_next_empty = True
continue
if skip_next_empty and not line.strip():
skip_next_empty = False
continue
result_lines.append(line)
skip_next_empty = False
return "\n".join(result_lines)
def _process_line_additions(self, text, pattern, replacement, action, pattern_type):
lines = text.splitlines()
result_lines = []
for line in lines:
if self._line_matches(line, pattern, pattern_type):
if action == "add_before":
result_lines.append(replacement)
result_lines.append(line)
else: # add_after
result_lines.append(line)
result_lines.append(replacement)
else:
result_lines.append(line)
return "\n".join(result_lines)