#!/usr/bin/env python3 """ Script de gestión de proxies TCP - Versión Python Permite gestionar los proxies desde Python para automatización """ import requests import json import sys import argparse import subprocess import time class ProxyManager: def __init__(self, base_url="http://localhost:8080"): self.base_url = base_url def check_container(self): """Verifica que el contenedor esté ejecutándose""" try: result = subprocess.run( ["docker", "ps", "--filter", "name=tcp-proxy-container", "--format", "{{.Names}}"], capture_output=True, text=True, check=True ) if "tcp-proxy-container" not in result.stdout: raise Exception("Contenedor tcp-proxy no está ejecutándose") except subprocess.CalledProcessError: raise Exception("Error verificando estado del contenedor") def get_status(self): """Obtiene el estado de todos los proxies""" try: self.check_container() response = requests.get(f"{self.base_url}/status", timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise Exception(f"Error obteniendo estado: {e}") def add_proxy(self, local_port, remote_port): """Añade un nuevo proxy""" try: self.check_container() data = { "local_port": int(local_port), "remote_port": int(remote_port) } response = requests.post( f"{self.base_url}/add", json=data, timeout=10 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise Exception(f"Error añadiendo proxy: {e}") def remove_proxy(self, local_port): """Elimina un proxy existente""" try: self.check_container() data = {"local_port": int(local_port)} response = requests.post( f"{self.base_url}/remove", json=data, timeout=10 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise Exception(f"Error eliminando proxy: {e}") def wait_for_service(self, timeout=30): """Espera a que el servicio esté disponible""" start_time = time.time() while time.time() - start_time < timeout: try: requests.get(f"{self.base_url}/status", timeout=5) return True except: time.sleep(1) return False def main(): parser = argparse.ArgumentParser(description="Gestión de proxies TCP") parser.add_argument("--url", default="http://localhost:8080", help="URL base del servicio (default: http://localhost:8080)") subparsers = parser.add_subparsers(dest="command", help="Comandos disponibles") # Comando status subparsers.add_parser("status", help="Mostrar estado de todos los proxies") # Comando add add_parser = subparsers.add_parser("add", help="Añadir nuevo proxy") add_parser.add_argument("local_port", type=int, help="Puerto local") add_parser.add_argument("remote_port", type=int, help="Puerto remoto") # Comando remove remove_parser = subparsers.add_parser("remove", help="Eliminar proxy") remove_parser.add_argument("local_port", type=int, help="Puerto local a eliminar") # Comando wait wait_parser = subparsers.add_parser("wait", help="Esperar a que el servicio esté listo") wait_parser.add_argument("--timeout", type=int, default=30, help="Timeout en segundos") args = parser.parse_args() if not args.command: parser.print_help() return 1 manager = ProxyManager(args.url) try: if args.command == "status": status = manager.get_status() print(json.dumps(status, indent=2)) elif args.command == "add": result = manager.add_proxy(args.local_port, args.remote_port) print(json.dumps(result, indent=2)) elif args.command == "remove": result = manager.remove_proxy(args.local_port) print(json.dumps(result, indent=2)) elif args.command == "wait": print(f"Esperando a que el servicio esté disponible (timeout: {args.timeout}s)...") if manager.wait_for_service(args.timeout): print("✅ Servicio disponible") return 0 else: print("❌ Timeout esperando al servicio") return 1 return 0 except Exception as e: print(f"Error: {e}", file=sys.stderr) return 1 if __name__ == "__main__": sys.exit(main())