CtrEditor/test_arrays_fix.py

212 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Corrección completa de TSNet con arrays para initial_velocity
"""
import sys
import os
import numpy as np
def test_complete_arrays_fix():
"""
Prueba la corrección completa incluyendo arrays para initial_velocity
"""
print("=== CORRECCIÓN COMPLETA CON ARRAYS ===")
try:
import tsnet
import wntr
print(f"✓ TSNet version: {tsnet.__version__}")
print(f"✓ WNTR version: {wntr.__version__}")
except ImportError as e:
print(f"✗ Error al importar: {e}")
return False
# Usar archivo INP existente
temp_dir = r"c:\Users\migue\AppData\Local\Temp\TSNet"
inp_path = os.path.join(temp_dir, "network_20250912_003944.inp")
if not os.path.exists(inp_path):
print(f"✗ Archivo INP no encontrado: {inp_path}")
return False
try:
print("\n=== APLICANDO TODAS LAS CORRECCIONES ===")
# PASO 1: Cargar modelo
print("1. Cargando modelo TSNet...")
tm = tsnet.network.TransientModel(inp_path)
print("✓ Modelo cargado")
# PASO 2: Corrección división por cero
print("\n2. Aplicando corrección división por cero...")
if hasattr(tm, "simulation_period") and tm.simulation_period <= 0:
tm.simulation_period = 1.0
print("✓ simulation_period = 1.0")
if hasattr(tm, "time_step") and tm.time_step <= 0:
tm.time_step = 0.1
print("✓ time_step = 0.1")
if tm.time_step >= tm.simulation_period:
tm.time_step = tm.simulation_period / 10.0
print(f"✓ time_step ajustado = {tm.time_step}")
print(f"simulation_period final = {tm.simulation_period}")
print(f"time_step final = {tm.time_step}")
# PASO 3: Corrección COMPLETA (pipes + bombas)
print("\n3. Aplicando corrección completa de pipes y bombas...")
if hasattr(tm, "pipe_name_list") and hasattr(tm, "get_link"):
pipe_names = tm.pipe_name_list
print(f"Pipes encontrados: {pipe_names}")
for pipe_name in pipe_names:
pipe_obj = tm.get_link(pipe_name)
print(f"\n Pipe {pipe_name}:")
# Obtener longitud del pipe
pipe_length = getattr(pipe_obj, "length", 1.0)
dx = 0.1
num_segments = int(pipe_length / dx) + 1
print(f" Longitud: {pipe_length}, Segmentos: {num_segments}")
# Corrección initial_head (TAMBIÉN array, no escalar)
if not hasattr(pipe_obj, "initial_head"):
pipe_obj.initial_head = np.zeros(num_segments)
print(f" ✓ initial_head = array({num_segments})")
else:
existing_head = pipe_obj.initial_head
if isinstance(existing_head, (int, float)):
pipe_obj.initial_head = np.zeros(num_segments)
print(f" ✓ initial_head convertido a array({num_segments})")
else:
print(f" ✓ initial_head ya es array")
# Corrección initial_velocity (array)
if not hasattr(pipe_obj, "initial_velocity"):
pipe_obj.initial_velocity = np.zeros(num_segments)
print(f" ✓ initial_velocity = array({num_segments})")
else:
existing_vel = pipe_obj.initial_velocity
if isinstance(existing_vel, (int, float)):
pipe_obj.initial_velocity = np.zeros(num_segments)
print(
f" ✓ initial_velocity convertido a array({num_segments})"
)
else:
print(f" ✓ initial_velocity ya es array")
# Corrección wavev (velocidad de onda acústica)
if not hasattr(pipe_obj, "wavev"):
pipe_obj.wavev = 1000.0 # Velocidad de onda típica en agua (m/s)
print(f" ✓ wavev = 1000.0 m/s (velocidad onda acústica)")
# Corrección number_of_segments
if not hasattr(pipe_obj, "number_of_segments"):
pipe_obj.number_of_segments = num_segments
print(f" ✓ number_of_segments = {num_segments}")
# Corrección roughness_height desde roughness
if not hasattr(pipe_obj, "roughness_height") and hasattr(
pipe_obj, "roughness"
):
pipe_obj.roughness_height = pipe_obj.roughness
print(
f" ✓ roughness_height = {pipe_obj.roughness} (desde roughness)"
)
elif not hasattr(pipe_obj, "roughness_height"):
pipe_obj.roughness_height = 0.001 # Valor por defecto razonable
print(f" ✓ roughness_height = 0.001 (valor por defecto)")
# CORRECCIÓN DE BOMBAS: curve_coef
if hasattr(tm, "pump_name_list") and hasattr(tm, "get_link"):
pump_names = tm.pump_name_list
if pump_names:
print(f"\nBombas encontradas: {pump_names}")
for pump_name in pump_names:
pump_obj = tm.get_link(pump_name)
print(f"\n Bomba {pump_name}:")
# Corrección curve_coef desde _curve_coeffs
if not hasattr(pump_obj, "curve_coef") and hasattr(
pump_obj, "_curve_coeffs"
):
pump_obj.curve_coef = pump_obj._curve_coeffs
print(f" ✓ curve_coef asignado desde _curve_coeffs")
elif hasattr(pump_obj, "curve_coef"):
print(f" ✓ curve_coef ya existe")
else:
print(
f" ⚠️ No se encontró _curve_coeffs para asignar curve_coef"
)
else:
print("No se encontraron bombas en el modelo")
# PASO 4: Simulación
print(f"\n4. Ejecutando simulación TSNet...")
try:
results = tsnet.simulation.MOCSimulator(
tm, results_obj="results", friction="steady"
)
print("🎉 ¡SIMULACIÓN TSNET COMPLETAMENTE EXITOSA!")
print("✅ Todas las correcciones aplicadas correctamente")
print("✅ TSNet funciona sin fallback")
print("✅ Division by zero: SOLUCIONADO")
print("✅ initial_head: SOLUCIONADO")
print("✅ initial_velocity arrays: SOLUCIONADO")
return True
except Exception as tsnet_error:
print(f"✗ Error en TSNet: {tsnet_error}")
# Análisis del error
error_str = str(tsnet_error)
if "initial_head" in error_str:
print("❌ Error de initial_head persistente")
elif "initial_velocity" in error_str:
print("❌ Error de initial_velocity persistente")
elif "division by zero" in error_str:
print("❌ Error de división por cero persistente")
elif "subscriptable" in error_str:
print("❌ Error de indexing - verificar arrays")
else:
print(f"❌ Nuevo error: {error_str}")
print("\n--- TRACEBACK DETALLADO ---")
import traceback
traceback.print_exc()
return False
except Exception as e:
print(f"✗ Error general: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Función principal"""
success = test_complete_arrays_fix()
if success:
print("\n🎉 ¡CORRECCIÓN PERFECTA!")
print("TSNet funciona al 100% sin necesidad de fallback")
print("Listo para implementar en CtrEditor")
else:
print("\n⚠️ NECESITA MÁS CORRECCIONES")
print("Revisar el error específico para la siguiente iteración")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)