#!/usr/bin/env python3 """ Editor de Reglas de Embellecimiento (beautify_rules.json) Aplicación Flask con UI simple para gestionar las reglas en `config/beautify_rules.json` (CRUD, reordenamiento y guardado con backup). """ import json import os import shutil import threading import time from datetime import datetime from typing import Any, Dict, List import webbrowser import sys import re import signal from flask import Flask, jsonify, render_template, request BASE_DIR = os.path.dirname(__file__) DATA_PATH = os.path.join(BASE_DIR, "config", "beautify_rules.json") # Cargar configuración del launcher si está disponible script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) try: from backend.script_utils import load_configuration # type: ignore HAS_CONFIG = True except Exception: HAS_CONFIG = False def ensure_dirs(app: Flask) -> None: templates_dir = os.path.join(BASE_DIR, "templates") static_dir = os.path.join(BASE_DIR, "static") if not os.path.exists(templates_dir): os.makedirs(templates_dir, exist_ok=True) if not os.path.exists(static_dir): os.makedirs(static_dir, exist_ok=True) app.template_folder = templates_dir app.static_folder = static_dir def load_rules_file() -> Dict[str, Any]: if not os.path.exists(DATA_PATH): raise FileNotFoundError(f"No existe el archivo: {DATA_PATH}") with open(DATA_PATH, "r", encoding="utf-8") as f: return json.load(f) def validate_rule(rule: Dict[str, Any]) -> List[str]: errors: List[str] = [] allowed_actions = { "replace", "remove_line", "remove_block", "add_before", "add_after", } allowed_types = {"string", "regex", "left", "right", "substring"} if "pattern" not in rule or not isinstance(rule["pattern"], str): errors.append("'pattern' es obligatorio y debe ser string") if "replacement" not in rule or not isinstance(rule["replacement"], str): msg = "'replacement' es obligatorio y debe ser string " "(puede ser vacío)" errors.append(msg) if "action" not in rule or rule["action"] not in allowed_actions: errors.append(f"'action' debe ser uno de: {sorted(allowed_actions)}") if "type" not in rule or rule["type"] not in allowed_types: errors.append(f"'type' debe ser uno de: {sorted(allowed_types)}") if "priority" not in rule or not isinstance(rule["priority"], int): errors.append("'priority' es obligatorio y debe ser entero") if ( "__comment" in rule and rule["__comment"] is not None and not isinstance(rule["__comment"], str) ): errors.append("'__comment' debe ser string si está presente") return errors def validate_payload(payload: Dict[str, Any]) -> List[str]: errors: List[str] = [] if not isinstance(payload, dict): return ["El payload debe ser un objeto JSON"] if "rules" not in payload or not isinstance(payload["rules"], list): return ["El payload debe contener 'rules' como lista"] for idx, rule in enumerate(payload["rules"]): if not isinstance(rule, dict): errors.append(f"Regla #{idx + 1}: debe ser un objeto") continue rule_errors = validate_rule(rule) if rule_errors: prefix = f"Regla #{idx + 1}: " errors.extend(prefix + e for e in rule_errors) return errors def backup_file(path: str) -> str: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") dirname, filename = os.path.dirname(path), os.path.basename(path) name, ext = os.path.splitext(filename) backup_name = f"{name}.backup_{timestamp}{ext}" backup_path = os.path.join(dirname, backup_name) shutil.copy2(path, backup_path) return backup_path def create_app() -> Flask: app = Flask(__name__) ensure_dirs(app) # Detectar ruta a cronologia.md desde configuración crono_path: str | None = None if HAS_CONFIG: try: configs = load_configuration() working_directory = configs.get("working_directory") group_config = configs.get("level2", {}) cronologia_file = group_config.get("cronologia_file", "cronologia.md") if working_directory: crono_path = os.path.join(working_directory, cronologia_file) except Exception: crono_path = None app.config["CRONO_PATH"] = crono_path # Configuración de auto-cierre por inactividad (segundos) try: inactivity_env = os.environ.get("INACTIVITY_TIMEOUT_SECONDS", "60").strip() inactivity_timeout = int(inactivity_env) except Exception: inactivity_timeout = 60 app.config["INACTIVITY_TIMEOUT_SECONDS"] = max(10, inactivity_timeout) app.config["LAST_HEARTBEAT"] = time.time() def _request_shutdown() -> None: # Cierre elegante similar a .example/manager.py def _do(): time.sleep(0.1) try: os.kill(os.getpid(), signal.SIGINT) except Exception: os._exit(0) t = threading.Thread(target=_do, daemon=True) t.start() @app.route("/") def index() -> Any: return render_template("index.html") @app.get("/api/rules") def api_get_rules() -> Any: try: data = load_rules_file() return jsonify({"status": "success", "data": data}) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.get("/api/heartbeat") def api_heartbeat() -> Any: try: app.config["LAST_HEARTBEAT"] = time.time() timeout_seconds = app.config["INACTIVITY_TIMEOUT_SECONDS"] return jsonify( { "status": "success", "server_time": datetime.now().isoformat(), "timeout_seconds": timeout_seconds, } ) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.post("/api/rules") def api_save_rules() -> Any: try: payload = request.get_json(silent=True) if payload is None: return ( jsonify( { "status": "error", "message": "JSON inválido", } ), 400, ) errors = validate_payload(payload) if errors: return ( jsonify( { "status": "error", "message": "\n".join(errors), } ), 400, ) if not os.path.exists(DATA_PATH): return ( jsonify( { "status": "error", "message": f"No existe {DATA_PATH}", } ), 404, ) backup_path = backup_file(DATA_PATH) with open(DATA_PATH, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=4) return jsonify( { "status": "success", "message": "Reglas guardadas correctamente", "backup": backup_path, } ) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.get("/api/meta") def api_meta() -> Any: try: data_exists = os.path.exists(DATA_PATH) size = os.path.getsize(DATA_PATH) if data_exists else 0 mtime = os.path.getmtime(DATA_PATH) if data_exists else None crono = app.config.get("CRONO_PATH") crono_exists = os.path.exists(crono) if isinstance(crono, str) else False return jsonify( { "status": "success", "path": DATA_PATH, "exists": data_exists, "size": size, "modified": ( datetime.fromtimestamp(mtime).isoformat() if mtime else None ), "cronologia_path": crono, "cronologia_exists": crono_exists, } ) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # Helpers para test de patrón sobre cronologia.md def _convert_block_pattern_to_regex(pattern: str) -> re.Pattern[str]: marker = "__BLOCK_MARKER__" marked = pattern.replace(".....", marker) escaped = re.escape(marked) final_rx = escaped.replace(marker, ".*?") return re.compile(f"(?s){final_rx}") def _line_matches(line: str, patt: Any, ptype: str) -> bool: sline = line.strip() if ptype == "regex": return bool(patt.search(sline)) if ptype == "left": return sline.startswith(patt) if ptype == "right": return sline.endswith(patt) if ptype == "substring": return patt in sline if ptype == "string": return sline == patt return False def _apply_replace(text: str, patt: Any, repl: str, ptype: str) -> str: if ptype == "regex": return patt.sub(repl, text) if ptype in ("string", "substring"): return text.replace(patt, repl) if ptype == "left": out: List[str] = [] for ln in text.splitlines(): if ln.strip().startswith(patt): out.append(ln.replace(patt, repl, 1)) else: out.append(ln) return "\n".join(out) if ptype == "right": out = [] for ln in text.splitlines(): if ln.strip().endswith(patt): idx = ln.rindex(patt) out.append(ln[:idx] + repl + ln[idx + len(patt) :]) else: out.append(ln) return "\n".join(out) return text def _process_remove_block(text: str, patt: re.Pattern[str]) -> str: result = text matches = list(patt.finditer(result)) for m in reversed(matches): start, end = m.span() line_start = result.rfind("\n", 0, start) + 1 if line_start == 0: line_start = 0 line_end = result.find("\n", end) if line_end == -1: line_end = len(result) else: line_end += 1 while ( line_start > 0 and result[line_start - 1 : line_start] == "\n" and (line_start == 1 or result[line_start - 2 : line_start - 1] == "\n") ): line_start -= 1 while ( line_end < len(result) and result[line_end - 1 : line_end] == "\n" and ( line_end == len(result) - 1 or result[line_end : line_end + 1] == "\n" ) ): line_end += 1 result = result[:line_start] + result[line_end:] return result def _process_remove_line(text: str, patt: Any, ptype: str) -> str: lines = text.splitlines() out: List[str] = [] skip_next_empty = False for i, ln in enumerate(lines): if _line_matches(ln, patt, ptype): if i < len(lines) - 1 and not lines[i + 1].strip(): skip_next_empty = True continue if skip_next_empty and not ln.strip(): skip_next_empty = False continue out.append(ln) skip_next_empty = False return "\n".join(out) def _process_line_additions( text: str, patt: Any, repl: str, action: str, ptype: str ) -> str: lines = text.splitlines() out: List[str] = [] for ln in lines: if _line_matches(ln, patt, ptype): if action == "add_before": out.append(repl) out.append(ln) else: out.append(ln) out.append(repl) else: out.append(ln) return "\n".join(out) @app.post("/api/test_pattern") def api_test_pattern() -> Any: try: crono = app.config.get("CRONO_PATH") if not isinstance(crono, str) or not os.path.exists(crono): return ( jsonify( { "status": "error", "message": "cronologia.md no encontrado", } ), 404, ) payload = request.get_json(silent=True) or {} pattern = payload.get("pattern", "") replacement = payload.get("replacement", "") action = payload.get("action", "replace") ptype = payload.get("type", "string") max_chars = int(payload.get("max_chars", 4000)) with open(crono, "r", encoding="utf-8") as f: original = f.read() # Construir patrón efectivo if action == "remove_block": patt_obj = _convert_block_pattern_to_regex(pattern) ptype_eff = "regex" elif ptype == "regex": patt_obj = re.compile(pattern) ptype_eff = "regex" else: patt_obj = pattern ptype_eff = ptype # Aplicar acción if action == "replace": processed = _apply_replace( original, patt_obj, replacement, ptype_eff, ) elif action == "remove_line": processed = _process_remove_line(original, patt_obj, ptype_eff) elif action in ("add_before", "add_after"): processed = _process_line_additions( original, patt_obj, replacement, action, ptype_eff, ) elif action == "remove_block": processed = _process_remove_block(original, patt_obj) else: processed = original def _trim(txt: str, limit: int) -> str: if len(txt) <= limit: return txt return txt[:limit] + "\n... (recortado)" return jsonify( { "status": "success", "original_excerpt": _trim(original, max_chars), "processed_excerpt": _trim(processed, max_chars), "total_len": len(original), } ) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.post("/_shutdown") def shutdown_route() -> Any: _request_shutdown() return jsonify({"status": "success", "message": "Cerrando editor..."}) # Lanzar monitor de inactividad def _inactivity_monitor() -> None: check_period = 5 timeout = app.config["INACTIVITY_TIMEOUT_SECONDS"] while True: time.sleep(check_period) last = app.config.get("LAST_HEARTBEAT") if isinstance(last, (int, float)): if time.time() - float(last) > timeout: _request_shutdown() break threading.Thread(target=_inactivity_monitor, daemon=True).start() return app def main() -> None: app = create_app() port = 5137 url = f"http://127.0.0.1:{port}" print("🧩 Editor de Reglas - iniciado") print(f"Archivo: {DATA_PATH}") print(f"URL: {url}") print("Ctrl+C para cerrar") # Abrir navegador automáticamente (desactivar con AUTO_OPEN_BROWSER=0) auto_open_env = os.environ.get("AUTO_OPEN_BROWSER", "1").lower() auto_open_browser = auto_open_env not in ("0", "false", "no") if auto_open_browser: def _open_browser() -> None: time.sleep(0.8) try: webbrowser.open(url) except Exception: pass threading.Timer(0.5, _open_browser).start() app.run(host="127.0.0.1", port=port, debug=False, use_reloader=False) if __name__ == "__main__": main()