ParamManagerScripts/backend/script_groups/EmailCrono/x2.py

504 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Editor de Reglas de Embellecimiento (beautify_rules.json)
Aplicación Flask con UI simple para gestionar las reglas en
`config/beautify_rules.json` (CRUD, reordenamiento y guardado con backup).
"""
import json
import os
import shutil
import threading
import time
from datetime import datetime
from typing import Any, Dict, List
import webbrowser
import sys
import re
import signal
from flask import Flask, jsonify, render_template, request
BASE_DIR = os.path.dirname(__file__)
DATA_PATH = os.path.join(BASE_DIR, "config", "beautify_rules.json")
# Cargar configuración del launcher si está disponible
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
try:
from backend.script_utils import load_configuration # type: ignore
HAS_CONFIG = True
except Exception:
HAS_CONFIG = False
def ensure_dirs(app: Flask) -> None:
templates_dir = os.path.join(BASE_DIR, "templates")
static_dir = os.path.join(BASE_DIR, "static")
if not os.path.exists(templates_dir):
os.makedirs(templates_dir, exist_ok=True)
if not os.path.exists(static_dir):
os.makedirs(static_dir, exist_ok=True)
app.template_folder = templates_dir
app.static_folder = static_dir
def load_rules_file() -> Dict[str, Any]:
if not os.path.exists(DATA_PATH):
raise FileNotFoundError(f"No existe el archivo: {DATA_PATH}")
with open(DATA_PATH, "r", encoding="utf-8") as f:
return json.load(f)
def validate_rule(rule: Dict[str, Any]) -> List[str]:
errors: List[str] = []
allowed_actions = {
"replace",
"remove_line",
"remove_block",
"add_before",
"add_after",
}
allowed_types = {"string", "regex", "left", "right", "substring"}
if "pattern" not in rule or not isinstance(rule["pattern"], str):
errors.append("'pattern' es obligatorio y debe ser string")
if "replacement" not in rule or not isinstance(rule["replacement"], str):
msg = "'replacement' es obligatorio y debe ser string " "(puede ser vacío)"
errors.append(msg)
if "action" not in rule or rule["action"] not in allowed_actions:
errors.append(f"'action' debe ser uno de: {sorted(allowed_actions)}")
if "type" not in rule or rule["type"] not in allowed_types:
errors.append(f"'type' debe ser uno de: {sorted(allowed_types)}")
if "priority" not in rule or not isinstance(rule["priority"], int):
errors.append("'priority' es obligatorio y debe ser entero")
if (
"__comment" in rule
and rule["__comment"] is not None
and not isinstance(rule["__comment"], str)
):
errors.append("'__comment' debe ser string si está presente")
return errors
def validate_payload(payload: Dict[str, Any]) -> List[str]:
errors: List[str] = []
if not isinstance(payload, dict):
return ["El payload debe ser un objeto JSON"]
if "rules" not in payload or not isinstance(payload["rules"], list):
return ["El payload debe contener 'rules' como lista"]
for idx, rule in enumerate(payload["rules"]):
if not isinstance(rule, dict):
errors.append(f"Regla #{idx + 1}: debe ser un objeto")
continue
rule_errors = validate_rule(rule)
if rule_errors:
prefix = f"Regla #{idx + 1}: "
errors.extend(prefix + e for e in rule_errors)
return errors
def backup_file(path: str) -> str:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dirname, filename = os.path.dirname(path), os.path.basename(path)
name, ext = os.path.splitext(filename)
backup_name = f"{name}.backup_{timestamp}{ext}"
backup_path = os.path.join(dirname, backup_name)
shutil.copy2(path, backup_path)
return backup_path
def create_app() -> Flask:
app = Flask(__name__)
ensure_dirs(app)
# Detectar ruta a cronologia.md desde configuración
crono_path: str | None = None
if HAS_CONFIG:
try:
configs = load_configuration()
working_directory = configs.get("working_directory")
group_config = configs.get("level2", {})
cronologia_file = group_config.get("cronologia_file", "cronologia.md")
if working_directory:
crono_path = os.path.join(working_directory, cronologia_file)
except Exception:
crono_path = None
app.config["CRONO_PATH"] = crono_path
# Configuración de auto-cierre por inactividad (segundos)
try:
inactivity_env = os.environ.get("INACTIVITY_TIMEOUT_SECONDS", "60").strip()
inactivity_timeout = int(inactivity_env)
except Exception:
inactivity_timeout = 60
app.config["INACTIVITY_TIMEOUT_SECONDS"] = max(10, inactivity_timeout)
app.config["LAST_HEARTBEAT"] = time.time()
def _request_shutdown() -> None:
# Cierre elegante similar a .example/manager.py
def _do():
time.sleep(0.1)
try:
os.kill(os.getpid(), signal.SIGINT)
except Exception:
os._exit(0)
t = threading.Thread(target=_do, daemon=True)
t.start()
@app.route("/")
def index() -> Any:
return render_template("index.html")
@app.get("/api/rules")
def api_get_rules() -> Any:
try:
data = load_rules_file()
return jsonify({"status": "success", "data": data})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.get("/api/heartbeat")
def api_heartbeat() -> Any:
try:
app.config["LAST_HEARTBEAT"] = time.time()
timeout_seconds = app.config["INACTIVITY_TIMEOUT_SECONDS"]
return jsonify(
{
"status": "success",
"server_time": datetime.now().isoformat(),
"timeout_seconds": timeout_seconds,
}
)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.post("/api/rules")
def api_save_rules() -> Any:
try:
payload = request.get_json(silent=True)
if payload is None:
return (
jsonify(
{
"status": "error",
"message": "JSON inválido",
}
),
400,
)
errors = validate_payload(payload)
if errors:
return (
jsonify(
{
"status": "error",
"message": "\n".join(errors),
}
),
400,
)
if not os.path.exists(DATA_PATH):
return (
jsonify(
{
"status": "error",
"message": f"No existe {DATA_PATH}",
}
),
404,
)
backup_path = backup_file(DATA_PATH)
with open(DATA_PATH, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=4)
return jsonify(
{
"status": "success",
"message": "Reglas guardadas correctamente",
"backup": backup_path,
}
)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.get("/api/meta")
def api_meta() -> Any:
try:
data_exists = os.path.exists(DATA_PATH)
size = os.path.getsize(DATA_PATH) if data_exists else 0
mtime = os.path.getmtime(DATA_PATH) if data_exists else None
crono = app.config.get("CRONO_PATH")
crono_exists = os.path.exists(crono) if isinstance(crono, str) else False
return jsonify(
{
"status": "success",
"path": DATA_PATH,
"exists": data_exists,
"size": size,
"modified": (
datetime.fromtimestamp(mtime).isoformat() if mtime else None
),
"cronologia_path": crono,
"cronologia_exists": crono_exists,
}
)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# Helpers para test de patrón sobre cronologia.md
def _convert_block_pattern_to_regex(pattern: str) -> re.Pattern[str]:
marker = "__BLOCK_MARKER__"
marked = pattern.replace(".....", marker)
escaped = re.escape(marked)
final_rx = escaped.replace(marker, ".*?")
return re.compile(f"(?s){final_rx}")
def _line_matches(line: str, patt: Any, ptype: str) -> bool:
sline = line.strip()
if ptype == "regex":
return bool(patt.search(sline))
if ptype == "left":
return sline.startswith(patt)
if ptype == "right":
return sline.endswith(patt)
if ptype == "substring":
return patt in sline
if ptype == "string":
return sline == patt
return False
def _apply_replace(text: str, patt: Any, repl: str, ptype: str) -> str:
if ptype == "regex":
return patt.sub(repl, text)
if ptype in ("string", "substring"):
return text.replace(patt, repl)
if ptype == "left":
out: List[str] = []
for ln in text.splitlines():
if ln.strip().startswith(patt):
out.append(ln.replace(patt, repl, 1))
else:
out.append(ln)
return "\n".join(out)
if ptype == "right":
out = []
for ln in text.splitlines():
if ln.strip().endswith(patt):
idx = ln.rindex(patt)
out.append(ln[:idx] + repl + ln[idx + len(patt) :])
else:
out.append(ln)
return "\n".join(out)
return text
def _process_remove_block(text: str, patt: re.Pattern[str]) -> str:
result = text
matches = list(patt.finditer(result))
for m in reversed(matches):
start, end = m.span()
line_start = result.rfind("\n", 0, start) + 1
if line_start == 0:
line_start = 0
line_end = result.find("\n", end)
if line_end == -1:
line_end = len(result)
else:
line_end += 1
while (
line_start > 0
and result[line_start - 1 : line_start] == "\n"
and (line_start == 1 or result[line_start - 2 : line_start - 1] == "\n")
):
line_start -= 1
while (
line_end < len(result)
and result[line_end - 1 : line_end] == "\n"
and (
line_end == len(result) - 1
or result[line_end : line_end + 1] == "\n"
)
):
line_end += 1
result = result[:line_start] + result[line_end:]
return result
def _process_remove_line(text: str, patt: Any, ptype: str) -> str:
lines = text.splitlines()
out: List[str] = []
skip_next_empty = False
for i, ln in enumerate(lines):
if _line_matches(ln, patt, ptype):
if i < len(lines) - 1 and not lines[i + 1].strip():
skip_next_empty = True
continue
if skip_next_empty and not ln.strip():
skip_next_empty = False
continue
out.append(ln)
skip_next_empty = False
return "\n".join(out)
def _process_line_additions(
text: str, patt: Any, repl: str, action: str, ptype: str
) -> str:
lines = text.splitlines()
out: List[str] = []
for ln in lines:
if _line_matches(ln, patt, ptype):
if action == "add_before":
out.append(repl)
out.append(ln)
else:
out.append(ln)
out.append(repl)
else:
out.append(ln)
return "\n".join(out)
@app.post("/api/test_pattern")
def api_test_pattern() -> Any:
try:
crono = app.config.get("CRONO_PATH")
if not isinstance(crono, str) or not os.path.exists(crono):
return (
jsonify(
{
"status": "error",
"message": "cronologia.md no encontrado",
}
),
404,
)
payload = request.get_json(silent=True) or {}
pattern = payload.get("pattern", "")
replacement = payload.get("replacement", "")
action = payload.get("action", "replace")
ptype = payload.get("type", "string")
max_chars = int(payload.get("max_chars", 4000))
with open(crono, "r", encoding="utf-8") as f:
original = f.read()
# Construir patrón efectivo
if action == "remove_block":
patt_obj = _convert_block_pattern_to_regex(pattern)
ptype_eff = "regex"
elif ptype == "regex":
patt_obj = re.compile(pattern)
ptype_eff = "regex"
else:
patt_obj = pattern
ptype_eff = ptype
# Aplicar acción
if action == "replace":
processed = _apply_replace(
original,
patt_obj,
replacement,
ptype_eff,
)
elif action == "remove_line":
processed = _process_remove_line(original, patt_obj, ptype_eff)
elif action in ("add_before", "add_after"):
processed = _process_line_additions(
original,
patt_obj,
replacement,
action,
ptype_eff,
)
elif action == "remove_block":
processed = _process_remove_block(original, patt_obj)
else:
processed = original
def _trim(txt: str, limit: int) -> str:
if len(txt) <= limit:
return txt
return txt[:limit] + "\n... (recortado)"
return jsonify(
{
"status": "success",
"original_excerpt": _trim(original, max_chars),
"processed_excerpt": _trim(processed, max_chars),
"total_len": len(original),
}
)
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.post("/_shutdown")
def shutdown_route() -> Any:
_request_shutdown()
return jsonify({"status": "success", "message": "Cerrando editor..."})
# Lanzar monitor de inactividad
def _inactivity_monitor() -> None:
check_period = 5
timeout = app.config["INACTIVITY_TIMEOUT_SECONDS"]
while True:
time.sleep(check_period)
last = app.config.get("LAST_HEARTBEAT")
if isinstance(last, (int, float)):
if time.time() - float(last) > timeout:
_request_shutdown()
break
threading.Thread(target=_inactivity_monitor, daemon=True).start()
return app
def main() -> None:
app = create_app()
port = 5137
url = f"http://127.0.0.1:{port}"
print("🧩 Editor de Reglas - iniciado")
print(f"Archivo: {DATA_PATH}")
print(f"URL: {url}")
print("Ctrl+C para cerrar")
# Abrir navegador automáticamente (desactivar con AUTO_OPEN_BROWSER=0)
auto_open_env = os.environ.get("AUTO_OPEN_BROWSER", "1").lower()
auto_open_browser = auto_open_env not in ("0", "false", "no")
if auto_open_browser:
def _open_browser() -> None:
time.sleep(0.8)
try:
webbrowser.open(url)
except Exception:
pass
threading.Timer(0.5, _open_browser).start()
app.run(host="127.0.0.1", port=port, debug=False, use_reloader=False)
if __name__ == "__main__":
main()