CtrEditor/Scripts/HydraulicSystemTests.py

630 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Sistema de Pruebas Hidráulicas para CtrEditor
Basado en FluidManagementSystem.md y MCP_LLM_Guide.md
Este script ejecuta pruebas sistemáticas del sistema hidráulico:
- Equilibrio de flujo entre tanques
- Cálculos de unidades correctos
- Comportamiento con diferentes tipos de fluidos
- Verificación de niveles después de tiempo determinado
"""
import json
import time
import requests
import math
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass, asdict
@dataclass
class FluidProperties:
"""Propiedades de un fluido"""
name: str
density: float # kg/m³
viscosity: float # Pa·s
temperature: float # °C
@dataclass
class TankState:
"""Estado de un tanque"""
id: str
name: str
level_m: float
volume_l: float
max_volume_l: float
fluid_primary: FluidProperties
fluid_secondary: FluidProperties
primary_percentage: float
@dataclass
class PumpState:
"""Estado de una bomba"""
id: str
name: str
is_running: bool
current_flow: float # m³/s
max_flow: float # m³/s
pump_head: float # m
@dataclass
class PipeState:
"""Estado de una tubería"""
id: str
name: str
current_flow: float # m³/s
pressure_drop: float # Pa
fluid_type: str
class HydraulicTestManager:
"""Administrador de pruebas hidráulicas"""
def __init__(self, mcp_url: str = "http://localhost:3000"):
self.mcp_url = mcp_url
self.test_results = []
def send_mcp_request(self, method: str, params: Dict = None) -> Dict:
"""Envía una solicitud MCP y retorna la respuesta"""
payload = {"jsonrpc": "2.0", "id": 1, "method": method, "params": params or {}}
try:
response = requests.post(self.mcp_url, json=payload, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error en solicitud MCP: {e}")
return {"error": str(e)}
def get_simulation_objects(self) -> List[Dict]:
"""Obtiene todos los objetos de la simulación"""
response = self.send_mcp_request("list_objects")
if "result" in response:
return response["result"].get("objects", [])
return []
def get_tanks(self) -> List[TankState]:
"""Obtiene el estado de todos los tanques"""
objects = self.get_simulation_objects()
tanks = []
for obj in objects:
if obj.get("type") == "osHydTank":
fluid_props = obj.get("properties", {})
# Crear propiedades de fluido primario
primary_fluid = FluidProperties(
name=fluid_props.get("PrimaryFluidName", "Water"),
density=fluid_props.get("PrimaryFluidDensity", 1000.0),
viscosity=fluid_props.get("PrimaryFluidViscosity", 0.001),
temperature=fluid_props.get("PrimaryFluidTemperature", 20.0),
)
# Crear propiedades de fluido secundario
secondary_fluid = FluidProperties(
name=fluid_props.get("SecondaryFluidName", "Air"),
density=fluid_props.get("SecondaryFluidDensity", 1.225),
viscosity=fluid_props.get("SecondaryFluidViscosity", 1.8e-5),
temperature=fluid_props.get("SecondaryFluidTemperature", 20.0),
)
tank = TankState(
id=obj["id"],
name=obj["name"],
level_m=fluid_props.get("CurrentLevelM", 0.0),
volume_l=fluid_props.get("CurrentVolumeL", 0.0),
max_volume_l=fluid_props.get("MaxVolumeL", 1000.0),
fluid_primary=primary_fluid,
fluid_secondary=secondary_fluid,
primary_percentage=fluid_props.get("PrimaryFluidPercentage", 100.0),
)
tanks.append(tank)
return tanks
def get_pumps(self) -> List[PumpState]:
"""Obtiene el estado de todas las bombas"""
objects = self.get_simulation_objects()
pumps = []
for obj in objects:
if obj.get("type") == "osHydPump":
props = obj.get("properties", {})
pump = PumpState(
id=obj["id"],
name=obj["name"],
is_running=props.get("IsRunning", False),
current_flow=props.get("CurrentFlow", 0.0),
max_flow=props.get("MaxFlow", 0.02),
pump_head=props.get("PumpHead", 75.0),
)
pumps.append(pump)
return pumps
def get_pipes(self) -> List[PipeState]:
"""Obtiene el estado de todas las tuberías"""
objects = self.get_simulation_objects()
pipes = []
for obj in objects:
if obj.get("type") == "osHydPipe":
props = obj.get("properties", {})
pipe = PipeState(
id=obj["id"],
name=obj["name"],
current_flow=props.get("CurrentFlow", 0.0),
pressure_drop=props.get("PressureDrop", 0.0),
fluid_type=props.get("FluidType", "Unknown"),
)
pipes.append(pipe)
return pipes
def start_simulation(self) -> bool:
"""Inicia la simulación"""
response = self.send_mcp_request("start_simulation")
return "result" in response and response["result"].get("success", False)
def stop_simulation(self) -> bool:
"""Detiene la simulación"""
response = self.send_mcp_request("stop_simulation")
return "result" in response and response["result"].get("success", False)
def reset_simulation_timing(self) -> bool:
"""Resetea los contadores de tiempo de simulación"""
response = self.send_mcp_request("reset_simulation_timing")
return "result" in response and response["result"].get("success", False)
def wait_simulation_time(self, target_seconds: float):
"""Espera hasta que la simulación haya ejecutado un tiempo determinado"""
print(f"⏰ Esperando {target_seconds} segundos reales de simulación...")
target_ms = target_seconds * 1000
start_time = time.time()
max_wait_time = target_seconds * 3 # Timeout si no progresa
while True:
# Verificar tiempo real transcurrido para timeout
if time.time() - start_time > max_wait_time:
print(f"⚠️ Timeout después de {max_wait_time} segundos reales")
break
# Obtener estado actual de simulación
response = self.send_mcp_request("get_simulation_status")
if "result" in response:
result = response["result"]
simulation_ms = result.get("simulation_elapsed_ms", 0)
is_running = result.get("is_running", False)
if simulation_ms >= target_ms:
print(
f"✅ Simulación completó {simulation_ms}ms ({simulation_ms/1000:.3f}s)"
)
break
if not is_running:
print(f"⚠️ Simulación detenida en {simulation_ms}ms")
break
# Esperar un poco antes de verificar nuevamente
time.sleep(0.1)
else:
print("⚠️ Error obteniendo estado de simulación")
time.sleep(0.5)
def calculate_mass_balance(
self, initial_tanks: List[TankState], final_tanks: List[TankState]
) -> Dict:
"""Calcula el balance de masa entre estados inicial y final"""
initial_mass = 0.0
final_mass = 0.0
for i, tank in enumerate(initial_tanks):
# Masa inicial = volumen × densidad del fluido primario × porcentaje
tank_mass = (
(tank.volume_l / 1000.0)
* tank.fluid_primary.density
* (tank.primary_percentage / 100.0)
)
initial_mass += tank_mass
for i, tank in enumerate(final_tanks):
# Masa final = volumen × densidad del fluido primario × porcentaje
tank_mass = (
(tank.volume_l / 1000.0)
* tank.fluid_primary.density
* (tank.primary_percentage / 100.0)
)
final_mass += tank_mass
return {
"initial_mass_kg": initial_mass,
"final_mass_kg": final_mass,
"mass_difference_kg": final_mass - initial_mass,
"conservation_percentage": (
(final_mass / initial_mass * 100.0) if initial_mass > 0 else 0.0
),
}
def test_basic_flow_equilibrium(self) -> Dict:
"""
Prueba básica de equilibrio de flujo entre dos tanques
"""
print("\n🧪 EJECUTANDO: Prueba de Equilibrio de Flujo Básico")
test_result = {
"test_name": "basic_flow_equilibrium",
"description": "Verificar equilibrio de flujo entre dos tanques con bomba",
"success": False,
"details": {},
"measurements": {},
}
try:
# Resetear timing para medición precisa
self.reset_simulation_timing()
# Estado inicial
initial_tanks = self.get_tanks()
initial_pumps = self.get_pumps()
initial_pipes = self.get_pipes()
if len(initial_tanks) < 2:
test_result["details"][
"error"
] = "Se requieren al menos 2 tanques para la prueba"
return test_result
print(f"📊 Estado inicial:")
for tank in initial_tanks:
print(f" - {tank.name}: {tank.level_m:.2f}m ({tank.volume_l:.1f}L)")
# Iniciar simulación
if not self.start_simulation():
test_result["details"]["error"] = "No se pudo iniciar la simulación"
return test_result
# Esperar tiempo real de simulación
target_simulation_time = 30.0 # 30 segundos reales de simulación
self.wait_simulation_time(target_simulation_time)
# Estado final
final_tanks = self.get_tanks()
final_pumps = self.get_pumps()
final_pipes = self.get_pipes()
print(f"📊 Estado final:")
for tank in final_tanks:
print(f" - {tank.name}: {tank.level_m:.2f}m ({tank.volume_l:.1f}L)")
# Detener simulación
self.stop_simulation()
# Calcular balance de masa
mass_balance = self.calculate_mass_balance(initial_tanks, final_tanks)
# Obtener tiempo real de simulación
status_response = self.send_mcp_request("get_simulation_status")
actual_simulation_ms = 0
if "result" in status_response:
actual_simulation_ms = status_response["result"].get(
"simulation_elapsed_ms", 0
)
# Almacenar mediciones
test_result["measurements"] = {
"target_simulation_time_s": target_simulation_time,
"actual_simulation_time_ms": actual_simulation_ms,
"actual_simulation_time_s": actual_simulation_ms / 1000.0,
"initial_tanks": [asdict(tank) for tank in initial_tanks],
"final_tanks": [asdict(tank) for tank in final_tanks],
"pumps": [asdict(pump) for pump in final_pumps],
"pipes": [asdict(pipe) for pipe in final_pipes],
"mass_balance": mass_balance,
}
# Verificar conservación de masa (tolerancia 5%)
conservation_ok = (
abs(mass_balance["conservation_percentage"] - 100.0) <= 5.0
)
# Verificar que hubo transferencia de fluido
volume_change = any(
abs(final.volume_l - initial.volume_l) > 1.0
for initial, final in zip(initial_tanks, final_tanks)
)
test_result["success"] = conservation_ok and volume_change
test_result["details"] = {
"conservation_percentage": mass_balance["conservation_percentage"],
"volume_transfer_detected": volume_change,
"mass_conserved": conservation_ok,
}
if test_result["success"]:
print(
"✅ Prueba EXITOSA: Equilibrio de flujo funcionando correctamente"
)
else:
print("❌ Prueba FALLIDA: Problemas en equilibrio de flujo")
except Exception as e:
test_result["details"]["error"] = str(e)
print(f"❌ Error en prueba: {e}")
return test_result
def test_fluid_properties_consistency(self) -> Dict:
"""
Prueba de consistencia de propiedades de fluidos
"""
print("\n🧪 EJECUTANDO: Prueba de Consistencia de Propiedades de Fluidos")
test_result = {
"test_name": "fluid_properties_consistency",
"description": "Verificar que las propiedades de fluidos se mantienen consistentes",
"success": False,
"details": {},
"measurements": {},
}
try:
# Obtener estado actual
tanks = self.get_tanks()
pipes = self.get_pipes()
if len(tanks) < 2:
test_result["details"][
"error"
] = "Se requieren al menos 2 tanques para la prueba"
return test_result
# Verificar propiedades de fluidos en tanques
fluid_consistency = True
density_checks = []
for tank in tanks:
# Verificar que las densidades sean razonables
primary_density_ok = (
500.0 <= tank.fluid_primary.density <= 2000.0
) # kg/m³
secondary_density_ok = (
0.5 <= tank.fluid_secondary.density <= 2000.0
) # kg/m³
density_checks.append(
{
"tank_name": tank.name,
"primary_density": tank.fluid_primary.density,
"secondary_density": tank.fluid_secondary.density,
"primary_ok": primary_density_ok,
"secondary_ok": secondary_density_ok,
}
)
if not (primary_density_ok and secondary_density_ok):
fluid_consistency = False
# Verificar que las tuberías muestren información de fluido
pipe_fluid_info = []
for pipe in pipes:
has_fluid_info = pipe.fluid_type != "Unknown" and pipe.fluid_type != ""
pipe_fluid_info.append(
{
"pipe_name": pipe.name,
"fluid_type": pipe.fluid_type,
"has_fluid_info": has_fluid_info,
}
)
test_result["measurements"] = {
"density_checks": density_checks,
"pipe_fluid_info": pipe_fluid_info,
}
pipe_info_ok = (
all(info["has_fluid_info"] for info in pipe_fluid_info)
if pipe_fluid_info
else True
)
test_result["success"] = fluid_consistency and pipe_info_ok
test_result["details"] = {
"fluid_densities_valid": fluid_consistency,
"pipe_fluid_info_available": pipe_info_ok,
"tanks_checked": len(tanks),
"pipes_checked": len(pipes),
}
if test_result["success"]:
print("✅ Prueba EXITOSA: Propiedades de fluidos consistentes")
else:
print("❌ Prueba FALLIDA: Inconsistencias en propiedades de fluidos")
except Exception as e:
test_result["details"]["error"] = str(e)
print(f"❌ Error en prueba: {e}")
return test_result
def test_mixed_fluid_behavior(self) -> Dict:
"""
Prueba de comportamiento con fluidos mezclados
"""
print("\n🧪 EJECUTANDO: Prueba de Comportamiento de Fluidos Mezclados")
test_result = {
"test_name": "mixed_fluid_behavior",
"description": "Verificar comportamiento con fluidos primarios y secundarios",
"success": False,
"details": {},
"measurements": {},
}
try:
# Obtener tanques
tanks = self.get_tanks()
if len(tanks) < 2:
test_result["details"][
"error"
] = "Se requieren al menos 2 tanques para la prueba"
return test_result
# Verificar que tenemos fluidos mezclados
mixed_tanks = []
for tank in tanks:
has_mixed_fluid = (
tank.primary_percentage > 0.0 and tank.primary_percentage < 100.0
)
mixed_tanks.append(
{
"tank_name": tank.name,
"primary_percentage": tank.primary_percentage,
"has_mixed_fluid": has_mixed_fluid,
"primary_fluid": tank.fluid_primary.name,
"secondary_fluid": tank.fluid_secondary.name,
}
)
# Ejecutar simulación corta
if self.start_simulation():
self.wait_simulation_time(10.0)
# Verificar que los porcentajes se mantienen en rangos válidos
final_tanks = self.get_tanks()
percentage_stability = []
for initial, final in zip(tanks, final_tanks):
percentage_change = abs(
final.primary_percentage - initial.primary_percentage
)
stable = percentage_change <= 10.0 # Tolerancia 10%
percentage_stability.append(
{
"tank_name": initial.name,
"initial_percentage": initial.primary_percentage,
"final_percentage": final.primary_percentage,
"change": percentage_change,
"stable": stable,
}
)
self.stop_simulation()
test_result["measurements"] = {
"mixed_tanks": mixed_tanks,
"percentage_stability": percentage_stability,
}
# Determinar éxito
has_mixed_fluids = any(tank["has_mixed_fluid"] for tank in mixed_tanks)
percentages_stable = all(
check["stable"] for check in percentage_stability
)
test_result["success"] = has_mixed_fluids or percentages_stable
test_result["details"] = {
"mixed_fluids_detected": has_mixed_fluids,
"percentages_stable": percentages_stable,
"tanks_with_mixed_fluids": sum(
1 for tank in mixed_tanks if tank["has_mixed_fluid"]
),
}
if test_result["success"]:
print(
"✅ Prueba EXITOSA: Comportamiento de fluidos mezclados correcto"
)
else:
print("❌ Prueba FALLIDA: Problemas con fluidos mezclados")
else:
test_result["details"]["error"] = "No se pudo iniciar la simulación"
except Exception as e:
test_result["details"]["error"] = str(e)
print(f"❌ Error en prueba: {e}")
return test_result
def run_all_tests(self) -> Dict:
"""Ejecuta todas las pruebas del sistema hidráulico"""
print("🚀 INICIANDO PRUEBAS DEL SISTEMA HIDRÁULICO")
print("=" * 60)
all_results = {
"test_suite": "HydraulicSystemTests",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"tests": [],
"summary": {},
}
# Lista de pruebas a ejecutar
tests = [
self.test_basic_flow_equilibrium,
self.test_fluid_properties_consistency,
self.test_mixed_fluid_behavior,
]
successful_tests = 0
for test_func in tests:
result = test_func()
all_results["tests"].append(result)
if result["success"]:
successful_tests += 1
# Resumen
all_results["summary"] = {
"total_tests": len(tests),
"successful_tests": successful_tests,
"failed_tests": len(tests) - successful_tests,
"success_rate": (successful_tests / len(tests)) * 100.0,
}
print("\n" + "=" * 60)
print("📋 RESUMEN DE PRUEBAS")
print(f"Total de pruebas: {all_results['summary']['total_tests']}")
print(f"Exitosas: {all_results['summary']['successful_tests']}")
print(f"Fallidas: {all_results['summary']['failed_tests']}")
print(f"Tasa de éxito: {all_results['summary']['success_rate']:.1f}%")
return all_results
def main():
"""Función principal"""
print("🔧 Sistema de Pruebas Hidráulicas para CtrEditor")
print("=" * 50)
# Crear administrador de pruebas
test_manager = HydraulicTestManager()
# Ejecutar todas las pruebas
results = test_manager.run_all_tests()
# Guardar resultados
results_file = f"hydraulic_test_results_{int(time.time())}.json"
with open(results_file, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n💾 Resultados guardados en: {results_file}")
return results
if __name__ == "__main__":
main()