ProxyTcpReverse/scripts/nat_client.py

277 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Cliente para gestión del sistema NAT industrial
Permite a PC2 crear conexiones dinámicas a PLCs/SCADA
"""
import requests
import json
import sys
import argparse
import time
from typing import Dict, List, Optional
class IndustrialNATClient:
def __init__(self, manager_url: str = "http://91.99.210.72:8080"):
"""
Cliente para gestionar NAT industrial
Args:
manager_url: URL del gestor NAT (por defecto PC3:8080)
"""
self.manager_url = manager_url.rstrip('/')
def get_status(self) -> Dict:
"""Obtiene estado completo del sistema NAT"""
try:
response = requests.get(f"{self.manager_url}/status", timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"Error obteniendo estado: {e}")
def quick_connect(self, target_ip: str, target_port: int, description: str = "") -> Dict:
"""
Conexión rápida - asigna puerto automáticamente
Args:
target_ip: IP del PLC/dispositivo (ej: 10.1.33.11)
target_port: Puerto del dispositivo (ej: 5900 para VNC)
description: Descripción opcional
Returns:
Dict con información de conexión
"""
try:
data = {
"target_ip": target_ip,
"target_port": target_port,
"description": description or f"Quick connect to {target_ip}:{target_port}"
}
response = requests.post(
f"{self.manager_url}/quick-connect",
json=data,
timeout=30 # Puede tardar en establecer túnel SSH
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"Error creando conexión: {e}")
def add_nat_rule(self, target_ip: str, target_port: int,
external_port: Optional[int] = None, description: str = "") -> Dict:
"""
Añade regla NAT específica
Args:
target_ip: IP del PLC/dispositivo
target_port: Puerto del dispositivo
external_port: Puerto específico en PC3 (opcional)
description: Descripción
"""
try:
data = {
"target_ip": target_ip,
"target_port": target_port,
"description": description
}
if external_port:
data["external_port"] = external_port
response = requests.post(
f"{self.manager_url}/add",
json=data,
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"Error añadiendo regla NAT: {e}")
def remove_nat_rule(self, external_port: int) -> Dict:
"""Elimina regla NAT"""
try:
data = {"external_port": external_port}
response = requests.post(
f"{self.manager_url}/remove",
json=data,
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"Error eliminando regla NAT: {e}")
def list_active_connections(self) -> List[Dict]:
"""Lista todas las conexiones activas"""
status = self.get_status()
return [
rule for rule in status['rules']
if rule['proxy_running'] and rule['tunnel_running']
]
def connect_to_plc(self, plc_ip: str, service: str = "vnc") -> Dict:
"""
Método de conveniencia para conectar a PLCs comunes
Args:
plc_ip: IP del PLC
service: Tipo de servicio ('vnc', 'web', 'modbus', 'ssh', 'telnet')
"""
service_ports = {
'vnc': 5900,
'web': 80,
'https': 443,
'modbus': 502,
'ssh': 22,
'telnet': 23,
'ftp': 21,
'http-alt': 8080
}
if service not in service_ports:
raise ValueError(f"Servicio desconocido: {service}. Disponibles: {list(service_ports.keys())}")
port = service_ports[service]
description = f"PLC {plc_ip} - {service.upper()}"
return self.quick_connect(plc_ip, port, description)
def wait_for_connection(self, external_port: int, timeout: int = 60) -> bool:
"""Espera a que la conexión esté lista"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
status = self.get_status()
for rule in status['rules']:
if (rule['external_port'] == external_port and
rule['proxy_running'] and rule['tunnel_running']):
return True
except Exception:
pass
time.sleep(2)
return False
def main():
parser = argparse.ArgumentParser(description="Cliente NAT Industrial para acceso a PLCs/SCADA")
parser.add_argument("--url", default="http://91.99.210.72:8080",
help="URL del gestor NAT (default: PC3)")
subparsers = parser.add_subparsers(dest="command", help="Comandos disponibles")
# Comando status
subparsers.add_parser("status", help="Mostrar estado del sistema NAT")
# Comando quick-connect
quick_parser = subparsers.add_parser("connect", help="Conexión rápida a PLC/dispositivo")
quick_parser.add_argument("target_ip", help="IP del PLC/dispositivo (ej: 10.1.33.11)")
quick_parser.add_argument("target_port", type=int, help="Puerto del dispositivo")
quick_parser.add_argument("--description", default="", help="Descripción opcional")
quick_parser.add_argument("--wait", action="store_true", help="Esperar a que la conexión esté lista")
# Comando plc (conveniencia)
plc_parser = subparsers.add_parser("plc", help="Conectar a PLC con servicios predefinidos")
plc_parser.add_argument("plc_ip", help="IP del PLC")
plc_parser.add_argument("service", choices=['vnc', 'web', 'https', 'modbus', 'ssh', 'telnet', 'ftp', 'http-alt'],
help="Tipo de servicio")
plc_parser.add_argument("--wait", action="store_true", help="Esperar a que la conexión esté lista")
# Comando add
add_parser = subparsers.add_parser("add", help="Añadir regla NAT específica")
add_parser.add_argument("target_ip", help="IP del dispositivo")
add_parser.add_argument("target_port", type=int, help="Puerto del dispositivo")
add_parser.add_argument("--external-port", type=int, help="Puerto específico en PC3")
add_parser.add_argument("--description", default="", help="Descripción")
# Comando remove
remove_parser = subparsers.add_parser("remove", help="Eliminar regla NAT")
remove_parser.add_argument("external_port", type=int, help="Puerto externo a eliminar")
# Comando list
subparsers.add_parser("list", help="Listar conexiones activas")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
client = IndustrialNATClient(args.url)
try:
if args.command == "status":
status = client.get_status()
print(json.dumps(status, indent=2))
elif args.command == "connect":
print(f"Conectando a {args.target_ip}:{args.target_port}...")
result = client.quick_connect(args.target_ip, args.target_port, args.description)
if result['success']:
print(f"✅ Conexión establecida!")
print(f" Acceso desde PC2: {result['access_url']}")
print(f" Puerto asignado: {result['external_port']}")
if args.wait:
print("⏳ Esperando que la conexión esté lista...")
if client.wait_for_connection(result['external_port']):
print("✅ Conexión lista!")
else:
print("⚠️ Timeout esperando conexión")
else:
print(f"❌ Error: {result['error']}")
return 1
elif args.command == "plc":
print(f"Conectando a PLC {args.plc_ip} ({args.service})...")
result = client.connect_to_plc(args.plc_ip, args.service)
if result['success']:
print(f"✅ Conexión a PLC establecida!")
print(f" Acceso desde PC2: {result['access_url']}")
print(f" Servicio: {args.service.upper()}")
if args.wait:
print("⏳ Esperando que la conexión esté lista...")
if client.wait_for_connection(result['external_port']):
print("✅ PLC accesible!")
else:
print("⚠️ Timeout esperando conexión")
else:
print(f"❌ Error: {result['error']}")
return 1
elif args.command == "add":
result = client.add_nat_rule(
args.target_ip, args.target_port,
args.external_port, args.description
)
print(json.dumps(result, indent=2))
elif args.command == "remove":
result = client.remove_nat_rule(args.external_port)
print(json.dumps(result, indent=2))
elif args.command == "list":
connections = client.list_active_connections()
print(f"Conexiones activas: {len(connections)}")
for conn in connections:
print(f" {conn['access_url']} -> {conn['target_ip']}:{conn['target_port']} ({conn['description']})")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())