Compare commits

...

6 Commits

Author SHA1 Message Date
Miguel 835bfc0383 Mejorado de la simulacion de eventos con error 2025-05-30 23:01:16 +02:00
Miguel a28f02b4c9 Add TCP-Server support and enhance connection management
- Implement TCP-Server connection type in ConfigManager and ConnectionManager.
- Update MaselliApp to handle TCP-Server mode, including UI adjustments.
- Modify NetComTab to prevent operation in TCP-Server mode.
- Enhance SimulatorTab to manage client connections and data sending for TCP-Server.
- Update trace_tab.py to ensure compatibility with TCP-Server.
- Improve logging functionality in utils.py to limit log lines.
2025-05-26 16:12:57 +02:00
Miguel 86669fc94c Add method to create ADAM message directly from mA value and update simulator tab to use it 2025-05-23 23:18:32 +02:00
Miguel 0caea3ddc0 Agregado de Volt a Valores Actuales 2025-05-23 12:34:43 +02:00
Miguel 1266fa705d Enhance NetCom configuration options and improve data handling
- Added support for additional NetCom parameters: data bits, parity, stop bits, and flow control options (RTS/CTS, DSR/DTR, XON/XOFF).
- Updated configuration loading and saving to include new parameters.
- Improved data handling in NetComTab for better byte management and logging.
- Refactored message parsing and sending to accommodate new data types and formats.
2025-05-23 12:20:38 +02:00
Miguel e1c9199cb5 Modularizacion y Agregado de NetCom 2025-05-23 10:15:04 +02:00
18 changed files with 3424 additions and 1214 deletions

97
.doc/CAMBIOS.md Normal file
View File

@ -0,0 +1,97 @@
# Resumen de Cambios y Nueva Estructura
## Estructura Modular Creada
El proyecto ha sido dividido en los siguientes módulos:
### Módulos Principales:
1. **main.py** - Punto de entrada de la aplicación
2. **maselli_app.py** - Aplicación principal y coordinación de la GUI
3. **protocol_handler.py** - Manejo del protocolo ADAM/Maselli
4. **connection_manager.py** - Gestión unificada de conexiones (Serial/TCP/UDP)
5. **config_manager.py** - Gestión de configuración y persistencia
6. **utils.py** - Utilidades comunes
### Módulos de Tabs:
1. **tabs/simulator_tab.py** - Lógica del simulador
2. **tabs/trace_tab.py** - Lógica del trace
3. **tabs/netcom_tab.py** - Nueva funcionalidad de gateway
## Nuevas Características Implementadas:
### 1. Tiempo de Ciclo Completo (Simulador)
- Reemplaza el período entre muestras por un tiempo total de ciclo más intuitivo
- Campo "Tiempo Ciclo (s)" configura la duración total de un ciclo completo
- Campo "Muestras/ciclo" permite ajustar la resolución
- El período entre muestras se calcula automáticamente
### 2. Gráfico de Trace Mejorado
- Ahora muestra tanto Brix (eje Y izquierdo, azul) como mA (eje Y derecho, rojo)
- Marcadores diferentes para cada línea (círculos para Brix, cuadrados para mA)
- Actualización en tiempo real de ambos valores
### 3. Modo NetCom (Gateway)
- Nueva pestaña para funcionalidad de bridge/gateway
- Conecta un puerto COM físico con una conexión TCP/UDP
- Log tipo sniffer que muestra:
- Dirección del tráfico (COM→NET o NET→COM)
- Datos raw con caracteres de control visibles
- Parseo opcional de mensajes ADAM
- Estadísticas de transferencias
- Filtros configurables para el log
- Colores diferenciados por tipo de mensaje
## Mejoras Adicionales:
1. **Mejor Organización del Código**
- Separación clara de responsabilidades
- Código más mantenible y extensible
- Fácil agregar nuevas funcionalidades
2. **Gestión de Configuración Mejorada**
- Validación de parámetros
- Migración automática de configuraciones antiguas
- Valores por defecto sensatos
3. **Manejo de Conexiones Unificado**
- Clase ConnectionManager centraliza toda la lógica de comunicación
- Soporte consistente para Serial, TCP y UDP
- Mejor manejo de errores y timeouts
4. **Interfaz de Usuario Mejorada**
- Configuración compartida visible en todo momento
- Estados visuales claros (colores en NetCom)
- Estadísticas en tiempo real
## Archivos de Soporte:
- **README.md** - Documentación completa actualizada
- **requirements.txt** - Dependencias del proyecto
- **.gitignore** - Para control de versiones
- **run.bat** - Script de inicio fácil para Windows
- **maselli_simulator_config.json** - Configuración de ejemplo
## Cómo Ejecutar:
1. Navegar al directorio del proyecto:
```
cd D:\Proyectos\Scripts\Siemens\MaselliSimulatorApp
```
2. Instalar dependencias (solo la primera vez):
```
pip install -r requirements.txt
```
3. Ejecutar la aplicación:
```
python main.py
```
O simplemente doble clic en `run.bat`
## Notas de Migración:
- La configuración existente se migrará automáticamente
- El campo "period" se convierte a "cycle_time"
- Los valores de configuración de NetCom tienen valores por defecto
La aplicación mantiene toda la funcionalidad original y agrega las nuevas características solicitadas de manera integrada.

167
.doc/README.md Normal file
View File

@ -0,0 +1,167 @@
# Maselli Protocol Simulator/Tracer/NetCom Gateway
## Descripción General
Aplicación de escritorio basada en Python para simular, monitorear y hacer bridge de dispositivos que utilizan el protocolo ADAM/Maselli. La aplicación soporta comunicación Serial (RS485/RS232), TCP y UDP, proporcionando una interfaz gráfica intuitiva construida con Tkinter.
## Características Principales
### 1. **Modo Simulador**
- Emula un dispositivo Maselli enviando paquetes de datos en protocolo ADAM
- Patrones de generación de datos:
- **Lineal**: Onda triangular entre valores mínimo y máximo
- **Sinusoidal**: Onda sinusoidal suave
- **Manual**: Envío de valores individuales mediante slider o entrada directa
- **Tiempo de ciclo configurable**: Define el tiempo total para completar un ciclo de simulación
- Muestras por ciclo ajustables para control fino de la resolución
- Visualización en tiempo real de valores Brix y mA
- Gráfico dual con ejes Y independientes para Brix (azul) y mA (rojo)
### 2. **Modo Trace**
- Escucha y registra datos entrantes de dispositivos Maselli reales
- Parseo automático de mensajes del protocolo ADAM
- Conversión mA ↔ Brix basada en mapeo configurable
- Registro de datos en archivo CSV con campos:
- Timestamp
- Dirección ADAM
- Valor mA
- Valor Brix calculado
- Validez del checksum
- Mensaje raw
- **Gráfico mejorado**: Muestra tanto Brix como mA en tiempo real
- Estadísticas de mensajes recibidos y errores de checksum
### 3. **Modo NetCom (Gateway)**
- Actúa como puente transparente entre:
- Un puerto COM físico (configurable)
- Una conexión de red (TCP/UDP usando la configuración compartida)
- **Función Sniffer**:
- Log detallado de todo el tráfico en ambas direcciones
- Identificación visual de la dirección del tráfico (COM→NET, NET→COM)
- Parseo opcional de mensajes ADAM para mostrar valores interpretados
- Filtros de visualización configurables
- Estadísticas de transferencias y errores
## Estructura Modular
```
MaselliSimulatorApp/
├── main.py # Punto de entrada
├── maselli_app.py # Aplicación principal y GUI
├── protocol_handler.py # Manejo del protocolo ADAM
├── connection_manager.py # Gestión de conexiones
├── config_manager.py # Gestión de configuración
├── utils.py # Utilidades comunes
└── tabs/
├── __init__.py
├── simulator_tab.py # Lógica del simulador
├── trace_tab.py # Lógica del trace
└── netcom_tab.py # Lógica del gateway
```
## Protocolo ADAM
Formato de mensaje:
```
#AA[valor_mA][checksum]\r
```
- `#`: Carácter inicial (opcional en respuestas)
- `AA`: Dirección del dispositivo (2 caracteres)
- `valor_mA`: Valor en mA (6 caracteres, formato XX.XXX)
- `checksum`: Suma de verificación (2 caracteres hex)
- `\r`: Carácter de fin
## Requisitos
- Python 3.7+
- Bibliotecas requeridas:
```bash
pip install pyserial matplotlib tkinter
```
## Instalación y Uso
1. Clonar o descargar el proyecto
2. Instalar dependencias:
```bash
pip install -r requirements.txt
```
3. Ejecutar la aplicación:
```bash
python main.py
```
## Configuración
### Parámetros de Conexión
- **Serial**: Puerto COM y velocidad de baudios
- **TCP/UDP**: Dirección IP y puerto
### Mapeo Brix ↔ mA
- **Min Brix [4mA]**: Valor Brix correspondiente a 4mA
- **Max Brix [20mA]**: Valor Brix correspondiente a 20mA
- Interpolación lineal para valores intermedios
### Configuración del Simulador
- **Dirección ADAM**: 2 caracteres (ej: "01")
- **Tiempo de ciclo**: Duración total de un ciclo completo de simulación
- **Muestras/ciclo**: Número de puntos por ciclo (resolución)
### Configuración NetCom
- **Puerto COM físico**: Puerto para el dispositivo real
- **Baud Rate**: Velocidad del puerto COM físico
## Archivos Generados
- `maselli_simulator_config.json`: Configuración guardada
- `maselli_trace_YYYYMMDD_HHMMSS.csv`: Datos capturados en modo Trace
## Iconos
La aplicación buscará automáticamente archivos de icono en el directorio raíz:
- `icon.png` (recomendado)
- `icon.ico` (Windows)
- `icon.gif`
## Uso Típico
### Como Simulador
1. Configurar tipo de conexión y parámetros
2. Seleccionar función de simulación (Lineal/Sinusoidal)
3. Ajustar tiempo de ciclo según necesidad
4. Iniciar simulación
### Como Monitor (Trace)
1. Configurar conexión según el dispositivo a monitorear
2. Iniciar Trace
3. Los datos se mostrarán en tiempo real y se guardarán en CSV
### Como Gateway (NetCom)
1. Configurar puerto COM del dispositivo físico
2. Configurar conexión de red destino
3. Iniciar Gateway
4. Monitorear el tráfico bidireccional en el log
## Notas de Desarrollo
- La aplicación usa threading para operaciones de comunicación sin bloquear la GUI
- Los gráficos se actualizan mediante matplotlib animation
- El protocolo ADAM es parseado con validación de checksum opcional
- Todos los módulos están diseñados para ser reutilizables
## Mejoras Respecto a la Versión Original
1. **Arquitectura modular**: Código dividido en módulos especializados
2. **Tiempo de ciclo configurable**: Control más intuitivo de la velocidad de simulación
3. **Gráfico de Trace mejorado**: Visualización dual de Brix y mA
4. **Modo NetCom**: Nueva funcionalidad de gateway/bridge con sniffer integrado
5. **Mejor manejo de errores**: Validación robusta y recuperación de errores
6. **Estadísticas detalladas**: Contadores de mensajes, errores y transferencias
## Licencia
Este proyecto es de código abierto. Úselo bajo su propia responsabilidad.
## Autor
Desarrollado para monitoreo y simulación de dispositivos Maselli con protocolo ADAM.

188
.gitignore vendored
View File

@ -1,181 +1,33 @@
# Byte-compiled / optimized / DLL files # Python
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
# C extensions
*.so *.so
# Distribution / packaging
.Python .Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/ env/
venv/ venv/
ENV/ ENV/
env.bak/ .venv
venv.bak/
# Spyder project settings # IDEs
.spyderproject .vscode/
.spyproject .idea/
*.swp
*.swo
# Rope project settings # Archivos generados
.ropeproject *.csv
*.log
# mkdocs documentation # Archivos de configuración local (opcional, quitar si quieres versionar la config)
/site # maselli_simulator_config.json
# mypy # Sistema
.mypy_cache/ .DS_Store
.dmypy.json Thumbs.db
dmypy.json desktop.ini
# Pyre type checker # Iconos (si son específicos del usuario)
.pyre/ # icon.png
# icon.ico
# pytype static type analyzer # icon.gif
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Cursor
# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
# refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore

File diff suppressed because it is too large Load Diff

169
config_manager.py Normal file
View File

@ -0,0 +1,169 @@
"""
Gestor de configuración para guardar y cargar ajustes
"""
import json
import os
class ConfigManager:
def __init__(self, config_file="maselli_simulator_config.json"):
self.config_file = config_file
self.default_config = {
'connection_type': 'TCP',
'com_port': 'COM8',
'baud_rate': '115200',
'ip_address': '10.1.33.18',
'port': '8899',
'adam_address': '01',
'function_type': 'Sinusoidal',
'min_brix_map': '0',
'max_brix_map': '80',
'cycle_time': '0.5',
'manual_input_type': 'Brix', # Nuevo: 'Brix', 'mA', 'Voltaje'
'manual_value': '10.0', # Nuevo: valor correspondiente al manual_input_type
'random_error_interval': '10.0', # Intervalo para errores aleatorios en el simulador
# Configuración para NetCom
'netcom_com_port': 'COM3',
'netcom_baud_rate': '115200',
'netcom_bytesize': 8, # Data bits (5, 6, 7, 8)
'netcom_parity': 'N', # Parity ('N', 'E', 'O', 'M', 'S')
'netcom_stopbits': 1, # Stop bits (1, 1.5, 2)
'netcom_rtscts': False, # Hardware flow control RTS/CTS
'netcom_dsrdtr': False, # Hardware flow control DSR/DTR
'netcom_xonxoff': False, # Software flow control XON/XOFF
'netcom_bridge_delay': 0.001 # Delay in seconds for the bridge polling loop
}
def save_config(self, config_data):
"""Guarda la configuración en archivo JSON"""
try:
with open(self.config_file, 'w') as f:
json.dump(config_data, f, indent=4)
return True
except Exception as e:
print(f"Error al guardar configuración: {e}")
return False
def load_config(self):
"""Carga la configuración desde archivo JSON"""
if not os.path.exists(self.config_file):
return self.default_config.copy()
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
# Asegurarse de que todas las claves necesarias estén presentes
for key, value in self.default_config.items():
if key not in config:
config[key] = value
# Migrar 'period' a 'cycle_time' si existe
if 'period' in config and 'cycle_time' not in config:
config['cycle_time'] = config['period']
del config['period']
# Migrar 'manual_brix' a 'manual_input_type' y 'manual_value'
if 'manual_brix' in config and 'manual_value' not in config:
config['manual_value'] = config['manual_brix']
config['manual_input_type'] = config.get('manual_input_type', 'Brix') # Asumir Brix si no existe
del config['manual_brix']
return config
except Exception as e:
print(f"Error al cargar configuración: {e}")
return self.default_config.copy()
def get_connection_params(self, config, use_netcom_port=False):
"""Extrae los parámetros de conexión de la configuración"""
conn_type = config.get('connection_type', 'Serial')
if conn_type == "Serial":
if use_netcom_port:
# Para NetCom, usar el puerto COM físico dedicado
return {
'port': config.get('netcom_com_port', 'COM3'),
'baud': int(config.get('netcom_baud_rate', '115200'))
}
else:
return {
'port': config.get('com_port', 'COM3'),
'baud': int(config.get('baud_rate', '115200'))
}
elif conn_type == "TCP-Server":
# Para TCP-Server, la IP es implícitamente '0.0.0.0' (escuchar en todas las interfaces)
# Solo necesitamos el puerto para el bind.
return {
'port': int(config.get('port', '8899')) # Usa el mismo campo de puerto
}
else:
return {
'ip': config.get('ip_address', '192.168.1.100'),
'port': int(config.get('port', '502'))
}
def validate_config(self, config):
"""Valida que la configuración tenga valores correctos"""
errors = []
# Validar dirección ADAM
adam_address = config.get('adam_address', '')
if len(adam_address) != 2:
errors.append("La dirección ADAM debe tener exactamente 2 caracteres")
# Validar rango de Brix
try:
min_brix = float(config.get('min_brix_map', '0'))
max_brix = float(config.get('max_brix_map', '80'))
if min_brix >= max_brix:
errors.append("El valor mínimo de Brix debe ser menor que el máximo")
except ValueError:
errors.append("Los valores de Brix deben ser números válidos")
# Validar tiempo de ciclo
try:
cycle_time = float(config.get('cycle_time', '1.0'))
if cycle_time <= 0:
errors.append("El tiempo de ciclo debe ser mayor que 0")
except ValueError:
errors.append("El tiempo de ciclo debe ser un número válido")
# Validar valor manual
try:
manual_value = float(config.get('manual_value', '0'))
# Aquí se podrían añadir validaciones de rango según el manual_input_type
except ValueError:
errors.append("El valor manual debe ser un número válido")
# Validar intervalo de errores aleatorios del simulador
try:
random_error_interval = float(config.get('random_error_interval', '10.0'))
if random_error_interval <= 0:
errors.append("El intervalo para errores aleatorios debe ser un número positivo.")
except ValueError:
errors.append("El intervalo para errores aleatorios debe ser un número válido.")
# Validar puerto serie
if config.get('connection_type') == 'Serial':
com_port = config.get('com_port', '')
if not com_port.upper().startswith('COM'):
errors.append("El puerto COM debe tener formato 'COMx'")
try:
baud_rate = int(config.get('baud_rate', '9600'))
if baud_rate <= 0:
errors.append("La velocidad de baudios debe ser mayor que 0")
except ValueError:
errors.append("La velocidad de baudios debe ser un número entero")
# Validar configuración TCP/UDP/TCP-Server
else:
try:
port = int(config.get('port', '502'))
if port <= 0 or port > 65535:
errors.append("El puerto debe estar entre 1 y 65535")
except ValueError:
errors.append("El puerto debe ser un número entero")
return errors

345
connection_manager.py Normal file
View File

@ -0,0 +1,345 @@
"""
Gestor de conexiones para Serial, TCP y UDP
"""
import serial
import socket
import time
class ConnectionManagerError(Exception):
"""Clase base para excepciones de ConnectionManager."""
pass
class ClientDisconnectedError(ConnectionManagerError):
"""Excepción personalizada para cuando un cliente TCP se desconecta."""
pass
class NoClientConnectedError(ConnectionManagerError):
"""Excepción personalizada para cuando se intenta enviar datos sin un cliente TCP conectado."""
pass
class ConnectionManager:
def __init__(self):
self.connection = None # Para Serial (Serial obj), TCP Client (socket), UDP (socket)
self.server_socket = None # Para TCP Server (listening socket)
self.client_socket = None # Para TCP Server (accepted client connection)
self.client_address = None # Para TCP Server (address of accepted client)
self.connection_type = None
self.dest_address = None # Para UDP
self.ClientDisconnectedError = ClientDisconnectedError
self.NoClientConnectedError = NoClientConnectedError
def open_connection(self, conn_type, conn_params):
"""Abre una conexión según el tipo especificado"""
try:
listening_info = None # Información sobre dónde está escuchando el servidor
if conn_type == "Serial":
self.connection = serial.Serial(
port=conn_params['port'],
baudrate=conn_params['baudrate'], # Standard pyserial parameter name
timeout=conn_params.get('timeout', 1),
bytesize=conn_params.get('bytesize', serial.EIGHTBITS), # Use provided or default
parity=conn_params.get('parity', serial.PARITY_NONE), # Use provided or default
stopbits=conn_params.get('stopbits', serial.STOPBITS_ONE), # Use provided or default
xonxoff=conn_params.get('xonxoff', False),
rtscts=conn_params.get('rtscts', False),
dsrdtr=conn_params.get('dsrdtr', False)
)
self.connection_type = "Serial"
elif conn_type == "TCP":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0)
sock.connect((conn_params['ip'], conn_params['port']))
self.connection = sock
self.connection_type = "TCP"
elif conn_type == "UDP":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
self.dest_address = (conn_params['ip'], conn_params['port'])
self.connection = sock
self.connection_type = "UDP"
elif conn_type == "TCP-Server":
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# conn_params para TCP-Server solo debería tener 'port'
# listen_ip_param es la IP que se usará para bind (e.g., '0.0.0.0' o una IP específica)
listen_ip_param = conn_params.get('ip', '0.0.0.0')
listen_port = conn_params['port']
actual_listen_description = ""
if listen_ip_param == '0.0.0.0':
try:
hostname = socket.gethostname()
ip_list = socket.gethostbyname_ex(hostname)[2] # Direcciones IPv4
non_loopback_ips = [ip for ip in ip_list if not ip.startswith("127.")]
display_ips = non_loopback_ips if non_loopback_ips else ip_list
if display_ips:
actual_listen_description = f"en IPs: {', '.join(display_ips)} en puerto {listen_port}"
else:
actual_listen_description = f"en todas las interfaces (0.0.0.0) puerto {listen_port} (no se pudieron determinar IPs específicas)"
except socket.gaierror:
actual_listen_description = f"en todas las interfaces (0.0.0.0) puerto {listen_port} (error al resolver hostname para IPs específicas)"
except Exception as e_get_ip:
print(f"Advertencia: Error obteniendo IPs locales: {e_get_ip}")
actual_listen_description = f"en todas las interfaces (0.0.0.0) puerto {listen_port} (error obteniendo IPs locales)"
else: # Se especificó una IP para escuchar
actual_listen_description = f"en IP: {listen_ip_param}:{listen_port}"
self.server_socket.bind((listen_ip_param, listen_port))
self.server_socket.listen(1) # Escuchar hasta 1 conexión en cola
self.connection_type = "TCP-Server"
self.connection = self.server_socket # self.connection apunta al socket principal
listening_info = f"TCP Server escuchando {actual_listen_description}"
print(listening_info) # Log para la consola
return self.connection, listening_info
except Exception as e:
raise Exception(f"Error al abrir conexión {conn_type}: {e}")
def close_connection(self):
"""Cierra la conexión actual"""
try:
if self.connection_type == "Serial":
if self.connection and self.connection.is_open:
self.connection.close()
elif self.connection_type in ["TCP", "UDP"]:
if self.connection:
self.connection.close()
elif self.connection_type == "TCP-Server":
if self.client_socket:
try: self.client_socket.close()
except Exception as e_client: print(f"Error cerrando client_socket: {e_client}")
self.client_socket = None
self.client_address = None
if self.server_socket: # server_socket es self.connection en este modo
try: self.server_socket.close()
except Exception as e_server: print(f"Error cerrando server_socket: {e_server}")
self.server_socket = None
self.connection = None # Asegurar que self.connection también se limpie
except Exception as e:
print(f"Error al cerrar conexión: {e}")
finally:
self.connection = None
self.connection_type = None
# No limpiar server_socket, client_socket aquí, ya se hizo arriba si era TCP-Server
self.dest_address = None
def send_data(self, data_bytes):
"""Envía datos por la conexión actual"""
if not self.connection:
raise Exception("No hay conexión activa")
data_to_send = None
if isinstance(data_bytes, str):
# Esto no debería suceder si el llamador (NetComTab, SimulatorTab) funciona como se espera.
# Loguear una advertencia e intentar codificar como último recurso.
print(f"ADVERTENCIA: ConnectionManager.send_data recibió str, se esperaba bytes. Intentando codificar a ASCII. Datos: {data_bytes!r}")
try:
data_to_send = data_bytes.encode('ascii')
except UnicodeEncodeError as uee:
print(f"ERROR CRÍTICO: No se pudo codificar la cadena (str) a ASCII antes de enviar: {uee}. Datos: {data_bytes!r}")
# Elevar una excepción clara porque no se puede continuar si la codificación falla.
raise Exception(f"Error al enviar datos: la cadena no pudo ser codificada a ASCII: {uee}") from uee
elif isinstance(data_bytes, (bytes, bytearray)):
data_to_send = data_bytes # Ya es bytes o bytearray (que .write/.send aceptan)
else:
# Si no es ni str ni bytes/bytearray, es un error de tipo fundamental.
print(f"ERROR CRÍTICO: ConnectionManager.send_data recibió un tipo inesperado: {type(data_bytes)}. Se esperaba bytes. Datos: {data_bytes!r}")
raise TypeError(f"Error al enviar datos: se esperaba un objeto tipo bytes, pero se recibió {type(data_bytes)}")
try:
if self.connection_type == "Serial":
self.connection.write(data_to_send)
elif self.connection_type == "TCP":
self.connection.send(data_to_send)
elif self.connection_type == "UDP":
self.connection.sendto(data_to_send, self.dest_address)
elif self.connection_type == "TCP-Server":
if self.client_socket:
try:
self.client_socket.sendall(data_to_send) # sendall es más robusto
except (socket.error, BrokenPipeError, ConnectionResetError) as e_send:
print(f"TCP Server: Cliente desconectado durante el envío: {e_send}")
self.reset_client_connection() # Limpiar el socket del cliente
raise self.ClientDisconnectedError(f"Cliente desconectado: {e_send}") from e_send
else:
# Opción: ser silencioso si no hay cliente, o lanzar NoClientConnectedError
# print(f"TCP Server: No hay cliente conectado, datos no enviados: {data_to_send!r}")
pass # No enviar si no hay cliente, la simulación puede continuar
except self.ClientDisconnectedError: # Permitir que esta pase tal cual
raise
except self.NoClientConnectedError: # Permitir que esta pase tal cual
raise
except Exception as e:
# Solo envolver otras excepciones no manejadas específicamente
raise Exception(f"Error al enviar datos ({self.connection_type}): {e}") from e
def read_response(self, timeout=0.5):
"""Intenta leer una respuesta del dispositivo"""
if not self.connection:
return None
try:
response = None
if self.connection_type == "Serial":
# Guardar timeout original
original_timeout = self.connection.timeout
self.connection.timeout = timeout
# Esperar un poco para que llegue la respuesta
time.sleep(0.05)
# Leer todos los bytes disponibles
response_bytes = b""
start_time = time.time()
while (time.time() - start_time) < timeout:
if self.connection.in_waiting > 0:
response_bytes += self.connection.read(self.connection.in_waiting)
# Si encontramos un terminador, salir
if b'\r' in response_bytes or b'\n' in response_bytes:
break
else:
time.sleep(0.01)
if response_bytes:
response = response_bytes.decode('ascii', errors='ignore')
self.connection.timeout = original_timeout
elif self.connection_type == "TCP":
self.connection.settimeout(timeout)
try:
response = self.connection.recv(1024).decode('ascii', errors='ignore')
except socket.timeout:
pass
elif self.connection_type == "UDP":
self.connection.settimeout(timeout)
try:
response, addr = self.connection.recvfrom(1024)
response = response.decode('ascii', errors='ignore')
except socket.timeout:
pass
elif self.connection_type == "TCP-Server":
if self.client_socket:
self.client_socket.settimeout(timeout)
try:
response_bytes = self.client_socket.recv(1024)
if response_bytes:
response = response_bytes.decode('ascii', errors='ignore')
else: # Cliente cerró conexión
self.reset_client_connection()
print("TCP Server: Cliente cerró conexión durante lectura.")
except socket.timeout:
pass # Sin datos en timeout
except (socket.error, ConnectionResetError) as e_read:
print(f"TCP Server: Error leyendo de cliente o cliente desconectado: {e_read}")
self.reset_client_connection()
else:
# Sin cliente conectado, no hay respuesta posible
pass
return response
except Exception as e:
print(f"Error al leer respuesta: {e}")
return None
def read_data_non_blocking(self):
"""Lee datos disponibles sin bloquear (para modo trace y netcom)"""
if not self.connection:
return None
try:
data = None
if self.connection_type == "Serial":
if self.connection.in_waiting > 0:
data = self.connection.read(self.connection.in_waiting) # Returns bytes
elif self.connection_type == "TCP":
self.connection.settimeout(0.1)
try:
data = self.connection.recv(1024) # Returns bytes
if not data: # Conexión cerrada
return None
except socket.timeout:
pass
elif self.connection_type == "UDP":
self.connection.settimeout(0.1)
try:
data, addr = self.connection.recvfrom(1024)
# data is already bytes
except socket.timeout:
pass
elif self.connection_type == "TCP-Server":
if self.client_socket:
self.client_socket.settimeout(0.01) # Timeout muy corto para no bloquear
try:
data = self.client_socket.recv(1024) # Retorna bytes
if not data: # Cliente cerró conexión
self.reset_client_connection()
print("TCP Server: Cliente cerró conexión (lectura no bloqueante).")
return None # Indicar que la conexión se cerró
except socket.timeout:
pass # Sin datos disponibles
except (socket.error, ConnectionResetError) as e_read_nb:
print(f"TCP Server: Error leyendo (no bloqueante) o cliente desconectado: {e_read_nb}")
self.reset_client_connection()
data = None # Error
return data
except Exception as e:
print(f"Error al leer datos: {e}")
return None
def is_connected(self):
"""Verifica si hay una conexión activa"""
if self.connection_type == "Serial":
return self.connection and self.connection.is_open
elif self.connection_type == "TCP-Server":
# "Conectado" significa que el servidor está escuchando.
# Para enviar datos, is_client_connected() es más relevante.
return self.server_socket is not None
else:
return self.connection is not None
def accept_client(self, timeout=None):
"""Acepta una conexión de cliente (solo para modo TCP-Server)."""
if self.connection_type != "TCP-Server" or not self.server_socket:
return False
if self.client_socket: # Ya hay un cliente conectado
return True
original_timeout = self.server_socket.gettimeout()
self.server_socket.settimeout(timeout)
try:
self.client_socket, self.client_address = self.server_socket.accept()
self.client_socket.settimeout(None) # Volver a modo bloqueante para send/recv en client_socket
print(f"TCP Server: Cliente conectado desde {self.client_address}")
self.server_socket.settimeout(original_timeout)
return True
except socket.timeout:
self.server_socket.settimeout(original_timeout)
return False
except Exception as e:
print(f"TCP Server: Error aceptando cliente: {e}")
self.server_socket.settimeout(original_timeout)
return False
def is_client_connected(self): # Específico para TCP-Server
return self.connection_type == "TCP-Server" and self.client_socket is not None
def reset_client_connection(self): # Específico para TCP-Server
if self.client_socket:
try: self.client_socket.close()
except Exception: pass
self.client_socket = None
self.client_address = None
print("TCP Server: Conexión con cliente reseteada.")

21
main.py Normal file
View File

@ -0,0 +1,21 @@
"""
Punto de entrada principal para la aplicación Maselli Simulator/Trace/NetCom
"""
import tkinter as tk
import sys
import os
# Agregar el directorio actual al path para importaciones
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from maselli_app import MaselliApp
def main():
"""Función principal que inicia la aplicación"""
root = tk.Tk()
app = MaselliApp(root)
root.mainloop()
if __name__ == "__main__":
main()

421
maselli_app.py Normal file
View File

@ -0,0 +1,421 @@
"""
Aplicación principal del Simulador/Trace Maselli
Une todos los módulos y maneja la interfaz principal
"""
import tkinter as tk
from tkinter import ttk, messagebox
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.animation as animation
from config_manager import ConfigManager
from utils import Utils
from tabs.simulator_tab import SimulatorTab
from tabs.trace_tab import TraceTab
from tabs.netcom_tab import NetComTab
class MaselliApp:
def __init__(self, root):
self.root = root
self.root.title("Simulador/Trace/NetCom Protocolo Maselli")
self.root.geometry("1000x800")
# Cargar icono
Utils.load_icon(self.root)
# Gestor de configuración
self.config_manager = ConfigManager()
self.config = self.config_manager.load_config()
# Diccionario para compartir configuración entre tabs
self.shared_config = {
'config_manager': self.config_manager
}
# Crear interfaz
self.create_widgets()
# Cargar configuración inicial
self.load_config_to_gui()
# Configurar eventos
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# Inicializar animaciones de gráficos
self.sim_ani = animation.FuncAnimation(
self.sim_fig, self.update_sim_graph, interval=100, blit=False, cache_frame_data=False
)
self.trace_ani = animation.FuncAnimation(
self.trace_fig, self.update_trace_graph, interval=100, blit=False, cache_frame_data=False
)
def create_widgets(self):
"""Crea todos los widgets de la aplicación"""
# Frame principal
main_frame = ttk.Frame(self.root)
main_frame.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
# Configurar pesos
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(1, weight=1) # Notebook
# Frame de configuración compartida
self.create_shared_config_frame(main_frame)
# Notebook para tabs
self.notebook = ttk.Notebook(main_frame)
self.notebook.grid(row=1, column=0, sticky="nsew")
# Tab Simulador
sim_frame = ttk.Frame(self.notebook)
self.notebook.add(sim_frame, text="Simulador")
self.simulator_tab = SimulatorTab(sim_frame, self.shared_config)
# Tab Trace
trace_frame = ttk.Frame(self.notebook)
self.notebook.add(trace_frame, text="Trace")
self.trace_tab = TraceTab(trace_frame, self.shared_config)
# Tab NetCom
netcom_frame = ttk.Frame(self.notebook)
self.notebook.add(netcom_frame, text="NetCom (Gateway)")
self.netcom_tab = NetComTab(netcom_frame, self.shared_config)
# Crear gráficos
self.create_graphs()
# Establecer callbacks para actualización de gráficos
self.simulator_tab.graph_update_callback = self.update_sim_graph
self.trace_tab.graph_update_callback = self.update_trace_graph
def create_shared_config_frame(self, parent):
"""Crea el frame de configuración compartida"""
config_frame = ttk.LabelFrame(parent, text="Configuración de Conexión")
config_frame.grid(row=0, column=0, sticky="ew", pady=(0, 5))
# Tipo de conexión
ttk.Label(config_frame, text="Tipo:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.connection_type_var = tk.StringVar()
self.connection_type_combo = ttk.Combobox(
config_frame, textvariable=self.connection_type_var,
values=["Serial", "TCP", "UDP", "TCP-Server"], state="readonly", width=10
)
self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5)
self.connection_type_combo.bind("<<ComboboxSelected>>", self.on_connection_type_change)
# Frame para Serial
self.serial_frame = ttk.Frame(config_frame)
self.serial_frame.grid(row=0, column=2, columnspan=6, padx=5, pady=5, sticky="ew") # Ajustado columnspan
ttk.Label(self.serial_frame, text="Puerto:").grid(row=0, column=0, padx=5, sticky="w")
self.com_port_var = tk.StringVar()
self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10)
self.com_port_entry.grid(row=0, column=1, padx=5)
ttk.Label(self.serial_frame, text="Baud:").grid(row=0, column=2, padx=5, sticky="w")
self.baud_rate_var = tk.StringVar()
self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10)
self.baud_rate_entry.grid(row=0, column=3, padx=5)
# Frame para Ethernet
self.ethernet_frame = ttk.Frame(config_frame)
self.ethernet_frame.grid(row=0, column=2, columnspan=6, padx=5, pady=5, sticky="ew") # Aumentado columnspan
self.ethernet_frame.grid_remove()
self.ip_address_label_widget = ttk.Label(self.ethernet_frame, text="IP:")
self.ip_address_label_widget.grid(row=0, column=0, padx=5, sticky="w")
self.ip_address_var = tk.StringVar()
self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15)
self.ip_address_entry.grid(row=0, column=1, padx=5)
self.port_label_widget = ttk.Label(self.ethernet_frame, text="Puerto:")
self.port_label_widget.grid(row=0, column=2, padx=5, sticky="w")
self.port_var = tk.StringVar()
self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=8)
self.port_entry.grid(row=0, column=3, padx=(0,5), pady=5) # Ajustado padx
# Label para mostrar el cliente conectado en modo TCP-Server
self.client_connected_label_widget = ttk.Label(self.ethernet_frame, text="Cliente Conectado:")
# Se mostrará/ocultará en on_connection_type_change
self.client_connected_var = tk.StringVar(value="Ninguno")
self.client_connected_display = ttk.Label(self.ethernet_frame, textvariable=self.client_connected_var, width=25)
# Parámetros de mapeo
ttk.Label(config_frame, text="Min Brix [4mA]:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.min_brix_map_var = tk.StringVar()
self.min_brix_map_entry = ttk.Entry(config_frame, textvariable=self.min_brix_map_var, width=10)
self.min_brix_map_entry.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(config_frame, text="Max Brix [20mA]:").grid(row=1, column=2, padx=5, pady=5, sticky="w")
self.max_brix_map_var = tk.StringVar()
self.max_brix_map_entry = ttk.Entry(config_frame, textvariable=self.max_brix_map_var, width=10)
self.max_brix_map_entry.grid(row=1, column=3, padx=5, pady=5)
# Botones
ttk.Button(config_frame, text="Guardar Config",
command=self.save_config).grid(row=1, column=4, padx=5, pady=5)
ttk.Button(config_frame, text="Cargar Config",
command=self.load_config).grid(row=1, column=5, padx=5, pady=5)
# Guardar referencias para compartir
self.shared_config.update({
'connection_type_var': self.connection_type_var,
'com_port_var': self.com_port_var,
'baud_rate_var': self.baud_rate_var,
'ip_address_var': self.ip_address_var,
'port_var': self.port_var,
'min_brix_map_var': self.min_brix_map_var,
'max_brix_map_var': self.max_brix_map_var,
'client_connected_var': self.client_connected_var, # Para actualizar desde el simulador
'shared_widgets': [
self.connection_type_combo,
self.com_port_entry,
self.baud_rate_entry,
self.ip_address_entry,
self.port_entry,
self.min_brix_map_entry,
self.max_brix_map_entry,
# self.client_connected_display # No deshabilitar el display, solo su contenido
]
})
def create_graphs(self):
"""Crea los gráficos para simulador y trace"""
# Gráfico del simulador
sim_graph_frame = self.simulator_tab.get_graph_frame()
self.sim_fig = Figure(figsize=(8, 3.5), dpi=100)
self.sim_ax1 = self.sim_fig.add_subplot(111)
self.sim_ax2 = self.sim_ax1.twinx()
self.sim_ax1.set_xlabel('Tiempo (s)')
self.sim_ax1.set_ylabel('Brix', color='b')
self.sim_ax2.set_ylabel('mA', color='r')
self.sim_ax1.tick_params(axis='y', labelcolor='b')
self.sim_ax2.tick_params(axis='y', labelcolor='r')
self.sim_ax1.grid(True, alpha=0.3)
self.sim_line_brix, = self.sim_ax1.plot([], [], 'b-', label='Brix', linewidth=2)
self.sim_line_ma, = self.sim_ax2.plot([], [], 'r-', label='mA', linewidth=2)
self.sim_canvas = FigureCanvasTkAgg(self.sim_fig, master=sim_graph_frame)
self.sim_canvas.draw()
self.sim_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Gráfico del trace (ahora con doble eje Y)
trace_graph_frame = self.trace_tab.get_graph_frame()
self.trace_fig = Figure(figsize=(8, 4), dpi=100)
self.trace_ax1 = self.trace_fig.add_subplot(111)
self.trace_ax2 = self.trace_ax1.twinx()
self.trace_ax1.set_xlabel('Tiempo (s)')
self.trace_ax1.set_ylabel('Brix', color='b')
self.trace_ax2.set_ylabel('mA', color='r')
self.trace_ax1.tick_params(axis='y', labelcolor='b')
self.trace_ax2.tick_params(axis='y', labelcolor='r')
self.trace_ax1.grid(True, alpha=0.3)
self.trace_line_brix, = self.trace_ax1.plot([], [], 'b-', label='Brix', linewidth=2, marker='o', markersize=4)
self.trace_line_ma, = self.trace_ax2.plot([], [], 'r-', label='mA', linewidth=2, marker='s', markersize=3)
self.trace_canvas = FigureCanvasTkAgg(self.trace_fig, master=trace_graph_frame)
self.trace_canvas.draw()
self.trace_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
def update_sim_graph(self, frame=None):
"""Actualiza el gráfico del simulador"""
time_data = list(self.simulator_tab.time_data)
brix_data = list(self.simulator_tab.brix_data)
ma_data = list(self.simulator_tab.ma_data)
if len(time_data) > 0:
self.sim_line_brix.set_data(time_data, brix_data)
self.sim_line_ma.set_data(time_data, ma_data)
if len(time_data) > 1:
self.sim_ax1.set_xlim(min(time_data), max(time_data))
if brix_data:
brix_min = min(brix_data) - 1
brix_max = max(brix_data) + 1
self.sim_ax1.set_ylim(brix_min, brix_max)
if ma_data:
ma_min = min(ma_data) - 0.5
ma_max = max(ma_data) + 0.5
self.sim_ax2.set_ylim(ma_min, ma_max)
self.sim_canvas.draw_idle()
return self.sim_line_brix, self.sim_line_ma
def update_trace_graph(self, frame=None):
"""Actualiza el gráfico del trace"""
time_data = list(self.trace_tab.time_data)
brix_data = list(self.trace_tab.brix_data)
ma_data = list(self.trace_tab.ma_data)
if len(time_data) > 0:
self.trace_line_brix.set_data(time_data, brix_data)
self.trace_line_ma.set_data(time_data, ma_data)
if len(time_data) > 1:
self.trace_ax1.set_xlim(min(time_data), max(time_data))
if brix_data:
brix_min = min(brix_data) - 1
brix_max = max(brix_data) + 1
self.trace_ax1.set_ylim(brix_min, brix_max)
if ma_data:
ma_min = min(ma_data) - 0.5
ma_max = max(ma_data) + 0.5
self.trace_ax2.set_ylim(ma_min, ma_max)
self.trace_canvas.draw_idle()
return self.trace_line_brix, self.trace_line_ma
def on_connection_type_change(self, event=None):
"""Maneja el cambio de tipo de conexión"""
conn_type = self.connection_type_var.get()
is_server_mode = (conn_type == "TCP-Server")
if conn_type == "Serial":
self.ethernet_frame.grid_remove()
self.serial_frame.grid()
# No es necesario manipular los widgets dentro de ethernet_frame si está oculto
self.client_connected_label_widget.grid_remove()
self.client_connected_display.grid_remove()
self.client_connected_var.set("Ninguno")
elif conn_type == "TCP-Server":
self.serial_frame.grid_remove()
self.ethernet_frame.grid()
self.ip_address_label_widget.grid_remove() # Ocultar etiqueta IP
self.ip_address_entry.config(state=tk.DISABLED) # IP no se usa para el servidor
self.ip_address_entry.grid_remove() # Ocultar campo IP
self.port_label_widget.grid(row=0, column=2, padx=5, sticky="w") # Asegurar que la etiqueta Puerto esté visible
self.port_entry.config(state=tk.NORMAL) # Puerto es para escuchar
self.port_entry.grid(row=0, column=3, padx=(0,5), pady=5) # Asegurar que el campo Puerto esté visible
self.client_connected_label_widget.grid(row=0, column=4, padx=(10,2), pady=5, sticky="w")
self.client_connected_display.grid(row=0, column=5, padx=(0,5), pady=5, sticky="w")
else: # TCP, UDP
self.serial_frame.grid_remove()
self.ethernet_frame.grid()
self.ip_address_label_widget.grid(row=0, column=0, padx=5, sticky="w") # Asegurar que la etiqueta IP esté visible
self.ip_address_entry.config(state=tk.NORMAL)
self.ip_address_entry.grid(row=0, column=1, padx=5) # Asegurar que el campo IP esté visible
self.port_label_widget.grid(row=0, column=2, padx=5, sticky="w") # Asegurar que la etiqueta Puerto esté visible
self.port_entry.config(state=tk.NORMAL)
self.port_entry.grid(row=0, column=3, padx=(0,5), pady=5) # Asegurar que el campo Puerto esté visible
self.client_connected_label_widget.grid_remove()
self.client_connected_display.grid_remove()
self.client_connected_var.set("Ninguno")
# Actualizar info en NetCom
if hasattr(self, 'netcom_tab'):
self.netcom_tab.update_net_info()
# Habilitar/deshabilitar botones Start en otras pestañas según compatibilidad
if hasattr(self, 'simulator_tab'):
# El simulador maneja TCP-Server, su lógica de botón es interna
pass
if hasattr(self, 'trace_tab'):
if is_server_mode:
self.trace_tab.start_button.config(state=tk.DISABLED)
if self.trace_tab.tracing: # Si estaba trazando y el modo cambió
self.trace_tab.stop_trace()
messagebox.showinfo("Trace Detenido", "El modo Trace se detuvo porque el tipo de conexión cambió a TCP-Server, que no es compatible.")
elif not self.trace_tab.tracing : # Habilitar solo si no está trazando
self.trace_tab.start_button.config(state=tk.NORMAL)
if hasattr(self, 'netcom_tab'):
if is_server_mode:
self.netcom_tab.start_button.config(state=tk.DISABLED)
if self.netcom_tab.bridging: # Si estaba en modo bridge
self.netcom_tab.stop_bridge()
messagebox.showinfo("NetCom Detenido", "El modo NetCom se detuvo porque el tipo de conexión cambió a TCP-Server, que no es compatible.")
elif not self.netcom_tab.bridging: # Habilitar solo si no está en modo bridge
self.netcom_tab.start_button.config(state=tk.NORMAL)
def save_config(self):
"""Guarda la configuración actual"""
# Recopilar configuración de todos los componentes
config = {
'connection_type': self.connection_type_var.get(),
'com_port': self.com_port_var.get(),
'baud_rate': self.baud_rate_var.get(),
'ip_address': self.ip_address_var.get(),
'port': self.port_var.get(),
'min_brix_map': self.min_brix_map_var.get(),
'max_brix_map': self.max_brix_map_var.get()
}
# Agregar configuración de cada tab
config.update(self.simulator_tab.get_config())
config.update(self.netcom_tab.get_config())
# Validar configuración
errors = self.config_manager.validate_config(config)
if errors:
messagebox.showerror("Error de Configuración", "\n".join(errors))
return
# Guardar
if self.config_manager.save_config(config):
messagebox.showinfo("Éxito", "Configuración guardada correctamente.")
else:
messagebox.showerror("Error", "No se pudo guardar la configuración.")
def load_config(self):
"""Carga la configuración desde archivo"""
self.config = self.config_manager.load_config()
self.load_config_to_gui()
messagebox.showinfo("Éxito", "Configuración cargada correctamente.")
def load_config_to_gui(self):
"""Carga la configuración en los widgets de la GUI"""
# Configuración compartida
self.connection_type_var.set(self.config.get('connection_type', 'Serial'))
self.com_port_var.set(self.config.get('com_port', 'COM3'))
self.baud_rate_var.set(self.config.get('baud_rate', '115200'))
self.ip_address_var.set(self.config.get('ip_address', '192.168.1.100'))
self.port_var.set(self.config.get('port', '502'))
self.min_brix_map_var.set(self.config.get('min_brix_map', '0'))
self.max_brix_map_var.set(self.config.get('max_brix_map', '80'))
# Configuración específica de cada tab
self.simulator_tab.set_config(self.config)
self.netcom_tab.set_config(self.config)
# Actualizar vista
self.on_connection_type_change()
if self.connection_type_var.get() != "TCP-Server":
self.client_connected_var.set("Ninguno")
def on_closing(self):
"""Maneja el cierre de la aplicación"""
# Detener cualquier operación activa
if hasattr(self.simulator_tab, 'simulating') and self.simulator_tab.simulating:
self.simulator_tab.stop_simulation()
if hasattr(self.trace_tab, 'tracing') and self.trace_tab.tracing:
self.trace_tab.stop_trace()
if hasattr(self.netcom_tab, 'bridging') and self.netcom_tab.bridging:
self.netcom_tab.stop_bridge()
# Cerrar ventana
self.root.destroy()

View File

@ -1,13 +1,21 @@
{ {
"connection_type": "TCP", "connection_type": "TCP-Server",
"com_port": "COM8", "com_port": "COM8",
"baud_rate": "115200", "baud_rate": "115200",
"ip_address": "10.1.33.18", "ip_address": "10.1.33.18",
"port": "8899", "port": "8899",
"min_brix_map": "0",
"max_brix_map": "60",
"adam_address": "01", "adam_address": "01",
"function_type": "Sinusoidal", "function_type": "Sinusoidal",
"min_brix_map": "0", "cycle_time": "15",
"max_brix_map": "80", "samples_per_cycle": "70",
"period": "0.5", "manual_input_type": "Brix",
"manual_brix": "10.0" "manual_value": "0.00",
"random_error_interval": "2.0",
"netcom_com_port": "COM11",
"netcom_baud_rate": "9600",
"netcom_rtscts": false,
"netcom_dsrdtr": false,
"netcom_xonxoff": false
} }

View File

@ -1 +0,0 @@
Timestamp,mA,Brix,Raw_Message
1 Timestamp mA Brix Raw_Message

197
protocol_handler.py Normal file
View File

@ -0,0 +1,197 @@
"""
Manejador del protocolo ADAM/Maselli
Contiene las funciones para formatear mensajes, calcular checksums y parsear respuestas
"""
class ProtocolHandler:
@staticmethod
def calculate_checksum(message_part):
"""Calcula el checksum de un mensaje ADAM"""
s = sum(ord(c) for c in message_part)
checksum_byte = s % 256
return f"{checksum_byte:02X}"
@staticmethod
def format_ma_value(ma_val):
"""Formatea un valor mA al formato ADAM: XX.XXX (6 caracteres)"""
return f"{ma_val:06.3f}"
@staticmethod
def scale_to_ma(brix_value, min_brix_map, max_brix_map):
"""Convierte valor Brix a mA usando el mapeo configurado"""
if max_brix_map == min_brix_map:
return 4.0
percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map)
percentage = max(0.0, min(1.0, percentage))
ma_value = 4.0 + percentage * 16.0
return ma_value
@staticmethod
def ma_to_brix(ma_value, min_brix_map, max_brix_map):
"""Convierte valor mA a Brix usando el mapeo configurado"""
try:
if ma_value <= 4.0:
return min_brix_map
elif ma_value >= 20.0:
return max_brix_map
else:
# Interpolación lineal
percentage = (ma_value - 4.0) / 16.0
return min_brix_map + percentage * (max_brix_map - min_brix_map)
except:
return 0.0
@staticmethod
def create_adam_message(adam_address, brix_value, min_brix_map, max_brix_map):
"""Crea un mensaje completo ADAM (como bytes) a partir de un valor Brix"""
ma_val = ProtocolHandler.scale_to_ma(brix_value, min_brix_map, max_brix_map)
ma_str = ProtocolHandler.format_ma_value(ma_val)
message_part = f"#{adam_address}{ma_str}"
checksum = ProtocolHandler.calculate_checksum(message_part)
full_message_str = f"{message_part}{checksum}\r"
return full_message_str.encode('ascii'), ma_val
@staticmethod
def create_adam_message_from_ma(adam_address, ma_value):
"""
Crea un mensaje completo ADAM (como bytes) directamente desde un valor mA.
Permite enviar valores mA fuera del rango 4-20mA si es necesario.
"""
ma_str = ProtocolHandler.format_ma_value(ma_value) # Formato XX.XXX
message_part = f"#{adam_address}{ma_str}"
checksum = ProtocolHandler.calculate_checksum(message_part)
full_message_str = f"{message_part}{checksum}\r"
return full_message_str.encode('ascii'), ma_value
@staticmethod
def create_adam_message_with_bad_checksum(adam_address, ma_value):
"""
Crea un mensaje completo ADAM (como bytes) directamente desde un valor mA,
pero con un checksum deliberadamente incorrecto.
"""
ma_str = ProtocolHandler.format_ma_value(ma_value) # Formato XX.XXX
message_part = f"#{adam_address}{ma_str}"
correct_checksum = ProtocolHandler.calculate_checksum(message_part)
# Generar un checksum incorrecto.
bad_checksum_str = "XX" # Valor por defecto si algo falla
try:
correct_sum_val = int(correct_checksum, 16)
# Sumar 1 (o cualquier otro valor) y tomar módulo 256 para que siga siendo un byte.
# Asegurarse de que sea diferente al original.
bad_sum_val = (correct_sum_val + 1) % 256
if f"{bad_sum_val:02X}" == correct_checksum: # En caso de que correct_checksum fuera FF
bad_sum_val = (correct_sum_val + 2) % 256
bad_checksum_str = f"{bad_sum_val:02X}"
except ValueError: # Si correct_checksum no es un hexadecimal válido (no debería pasar)
pass # bad_checksum_str se queda como "XX"
full_message_str = f"{message_part}{bad_checksum_str}\r"
return full_message_str.encode('ascii'), ma_value
@staticmethod
def ma_to_voltage(ma_value):
"""Convierte valor mA a Voltaje (0-10V). 0mA -> 0V, 20mA -> 10V."""
# Escala lineal: Voltage = (ma_value / 20mA) * 10V
voltage = (ma_value / 20.0) * 10.0
return max(0.0, min(10.0, voltage)) # Asegurar que esté en el rango 0-10V
@staticmethod
def voltage_to_ma(voltage_value):
"""Convierte valor Voltaje (0-10V) a mA. 0V -> 0mA, 10V -> 20mA."""
# Escala lineal: mA = (voltage_value / 10V) * 20mA
ma = (voltage_value / 10.0) * 20.0
return max(0.0, min(20.0, ma)) # Asegurar que esté en el rango 0-20mA
@staticmethod
def format_voltage_display(voltage_value):
"""Formatea un valor de Voltaje para mostrar."""
return f"{voltage_value:.2f} V"
@staticmethod
def parse_adam_message(data):
"""
Parsea un mensaje del protocolo ADAM y retorna el valor en mA
Formato esperado: #AA[valor_mA][checksum]\r
Donde:
- # : Carácter inicial (opcional en algunas respuestas)
- AA : Dirección del dispositivo (2 caracteres)
- valor_mA : Valor en mA (6 caracteres, formato XX.XXX)
- checksum : Suma de verificación (2 caracteres hex)
- \r : Carácter de fin (opcional)
Retorna: dict con 'address' y 'ma', o None si no es válido
"""
try:
# Formato esperado: #AA[valor_mA][checksum]\r
# Pero también manejar respuestas sin # inicial o sin \r final
data = data.strip()
# Si empieza con #, es un mensaje estándar
if data.startswith('#'):
data = data[1:] # Remover #
# Si termina con \r, removerlo
if data.endswith('\r'):
data = data[:-1]
# Verificar longitud mínima
if len(data) < 8: # 2 addr + 6 valor mínimo
return None
address = data[:2]
value_str = data[2:8] # 6 caracteres para el valor (XX.XXX)
# Verificar si hay checksum
checksum_valid = True
if len(data) >= 10:
checksum = data[8:10] # 2 caracteres para checksum
# Verificar checksum
message_part = f"#{address}{value_str}"
calculated_checksum = ProtocolHandler.calculate_checksum(message_part)
if checksum != calculated_checksum:
checksum_valid = False
# Convertir valor a float
try:
ma_value = float(value_str)
return {
'address': address,
'ma': ma_value,
'checksum_valid': checksum_valid
}
except ValueError:
return None
except Exception:
return None
@staticmethod
def format_for_display(message, hex_non_printable=False):
"""Formatea un mensaje (bytes o str) para mostrar en el log"""
if isinstance(message, bytes):
if hex_non_printable:
parts = []
for byte_val in message:
# Caracteres ASCII imprimibles (32 a 126) se dejan como están.
# Otros se convierten a \xHH.
if 32 <= byte_val <= 126:
parts.append(chr(byte_val))
else:
parts.append(f'\\x{byte_val:02x}')
message_str = "".join(parts)
else:
message_str = message.decode('ascii', errors='replace') # 'replace' muestra para no decodificables
else: # Asumir que es str
message_str = str(message)
# Si no es formato hexadecimal, reemplazar caracteres de control comunes por representaciones legibles.
if not hex_non_printable:
message_str = message_str.replace('\r', '<CR>').replace('\n', '<LF>').replace('\t', '<TAB>')
return message_str

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
# Dependencias para Maselli Simulator/Trace/NetCom
pyserial==3.5
matplotlib==3.7.1

4
run.bat Normal file
View File

@ -0,0 +1,4 @@
@echo off
echo Iniciando Maselli Simulator/Trace/NetCom...
python main.py
pause

1
tabs/__init__.py Normal file
View File

@ -0,0 +1 @@
# Paquete para los tabs de la aplicación

565
tabs/netcom_tab.py Normal file
View File

@ -0,0 +1,565 @@
"""
Tab NetCom - Gateway/Bridge entre puerto COM físico y conexión TCP/UDP
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import time
from datetime import datetime
from connection_manager import ConnectionManager
from protocol_handler import ProtocolHandler
from utils import Utils
class NetComTab:
def __init__(self, parent_frame, shared_config):
self.frame = parent_frame
self.shared_config = shared_config
# Estado del gateway
self.bridging = False
self.bridge_thread = None
# Conexiones
self.com_connection = ConnectionManager()
self.net_connection = ConnectionManager()
# Estadísticas
self.com_to_net_count = 0
self.net_to_com_count = 0
self.error_count = 0
self.create_widgets()
def create_widgets(self):
"""Crea los widgets del tab NetCom"""
# Frame de configuración COM física
com_config_frame = ttk.LabelFrame(self.frame, text="Configuración Puerto COM Físico")
com_config_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
ttk.Label(com_config_frame, text="Puerto COM:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.com_port_var = tk.StringVar(value=self.shared_config.get('netcom_com_port', 'COM3'))
self.com_port_entry = ttk.Entry(com_config_frame, textvariable=self.com_port_var, width=10)
self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(com_config_frame, text="Baud Rate:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.baud_rate_var = tk.StringVar(value=self.shared_config.get('netcom_baud_rate', '115200'))
self.baud_rate_entry = ttk.Entry(com_config_frame, textvariable=self.baud_rate_var, width=10)
self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
# Data bits, Parity, Stop bits
ttk.Label(com_config_frame, text="Data Bits:").grid(row=0, column=4, padx=5, pady=5, sticky="w")
self.bytesize_var = tk.StringVar(value=str(self.shared_config.get('netcom_bytesize', 8)))
self.bytesize_combo = ttk.Combobox(com_config_frame, textvariable=self.bytesize_var,
values=["5", "6", "7", "8"], state="readonly", width=5)
self.bytesize_combo.grid(row=0, column=5, padx=5, pady=5, sticky="ew")
ttk.Label(com_config_frame, text="Parity:").grid(row=0, column=6, padx=5, pady=5, sticky="w")
self.parity_var = tk.StringVar(value=self.shared_config.get('netcom_parity', 'N'))
self.parity_combo = ttk.Combobox(com_config_frame, textvariable=self.parity_var,
values=["N", "E", "O", "M", "S"], state="readonly", width=5) # N: None, E: Even, O: Odd, M: Mark, S: Space
self.parity_combo.grid(row=0, column=7, padx=5, pady=5, sticky="ew")
ttk.Label(com_config_frame, text="Stop Bits:").grid(row=0, column=8, padx=5, pady=5, sticky="w")
self.stopbits_var = tk.StringVar(value=str(self.shared_config.get('netcom_stopbits', 1)))
self.stopbits_combo = ttk.Combobox(com_config_frame, textvariable=self.stopbits_var,
values=["1", "1.5", "2"], state="readonly", width=5)
self.stopbits_combo.grid(row=0, column=9, padx=5, pady=5, sticky="ew")
# Flow control options
self.rtscts_var = tk.BooleanVar(value=self.shared_config.get('netcom_rtscts', False))
self.rtscts_check = ttk.Checkbutton(com_config_frame, text="RTS/CTS", variable=self.rtscts_var)
self.rtscts_check.grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.dsrdtr_var = tk.BooleanVar(value=self.shared_config.get('netcom_dsrdtr', False))
self.dsrdtr_check = ttk.Checkbutton(com_config_frame, text="DSR/DTR", variable=self.dsrdtr_var)
self.dsrdtr_check.grid(row=1, column=1, padx=5, pady=5, sticky="w")
self.xonxoff_var = tk.BooleanVar(value=self.shared_config.get('netcom_xonxoff', False))
self.xonxoff_check = ttk.Checkbutton(com_config_frame, text="XON/XOFF", variable=self.xonxoff_var)
self.xonxoff_check.grid(row=1, column=2, padx=5, pady=5, sticky="w")
# Bridge delay
ttk.Label(com_config_frame, text="Retardo Bridge (s):").grid(row=1, column=3, padx=5, pady=5, sticky="w")
self.bridge_delay_var = tk.StringVar(value=str(self.shared_config.get('netcom_bridge_delay', 0.001)))
self.bridge_delay_entry = ttk.Entry(com_config_frame, textvariable=self.bridge_delay_var, width=8)
self.bridge_delay_entry.grid(row=1, column=4, padx=5, pady=5, sticky="ew")
# Info frame
info_frame = ttk.LabelFrame(self.frame, text="Información de Conexión")
info_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
ttk.Label(info_frame, text="Conexión de Red:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.net_info_var = tk.StringVar(value="No configurada")
ttk.Label(info_frame, textvariable=self.net_info_var, font=("Courier", 10)).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(info_frame, text="Estado:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.status_var = tk.StringVar(value="Desconectado")
self.status_label = ttk.Label(info_frame, textvariable=self.status_var, font=("Courier", 10, "bold"))
self.status_label.grid(row=1, column=1, padx=5, pady=5, sticky="w")
# Control Frame
control_frame = ttk.LabelFrame(self.frame, text="Control Gateway")
control_frame.grid(row=2, column=0, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(control_frame, text="Iniciar Gateway", command=self.start_bridge)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(control_frame, text="Detener Gateway", command=self.stop_bridge, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_log_button = ttk.Button(control_frame, text="Limpiar Log", command=self.clear_log)
self.clear_log_button.pack(side=tk.LEFT, padx=5)
# Statistics Frame
stats_frame = ttk.LabelFrame(self.frame, text="Estadísticas")
stats_frame.grid(row=2, column=1, padx=10, pady=5, sticky="ew")
ttk.Label(stats_frame, text="COM → NET:").grid(row=0, column=0, padx=5, pady=2, sticky="w")
self.com_to_net_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.com_to_net_var).grid(row=0, column=1, padx=5, pady=2, sticky="w")
ttk.Label(stats_frame, text="NET → COM:").grid(row=0, column=2, padx=5, pady=2, sticky="w")
self.net_to_com_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.net_to_com_var).grid(row=0, column=3, padx=5, pady=2, sticky="w")
ttk.Label(stats_frame, text="Errores:").grid(row=1, column=0, padx=5, pady=2, sticky="w")
self.errors_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.errors_var, foreground="red").grid(row=1, column=1, padx=5, pady=2, sticky="w")
# Log Frame con filtros
log_control_frame = ttk.Frame(self.frame)
log_control_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=(10,0), sticky="ew")
ttk.Label(log_control_frame, text="Filtros:").pack(side=tk.LEFT, padx=5)
self.show_com_to_net_var = tk.BooleanVar(value=True)
ttk.Checkbutton(log_control_frame, text="COM→NET",
variable=self.show_com_to_net_var).pack(side=tk.LEFT, padx=5)
self.show_net_to_com_var = tk.BooleanVar(value=True)
ttk.Checkbutton(log_control_frame, text="NET→COM",
variable=self.show_net_to_com_var).pack(side=tk.LEFT, padx=5)
self.show_parsed_var = tk.BooleanVar(value=True)
ttk.Checkbutton(log_control_frame, text="Mostrar datos parseados",
variable=self.show_parsed_var).pack(side=tk.LEFT, padx=5)
# Log Frame
log_frame = ttk.LabelFrame(self.frame, text="Log de Gateway (Sniffer)")
log_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.log_text = scrolledtext.ScrolledText(log_frame, height=20, width=80, wrap=tk.WORD, state=tk.DISABLED)
self.log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Configurar tags para colores
self.log_text.tag_config("com_to_net", foreground="blue")
self.log_text.tag_config("net_to_com", foreground="green")
self.log_text.tag_config("error", foreground="red")
self.log_text.tag_config("info", foreground="black")
self.log_text.tag_config("parsed", foreground="purple")
# Configurar pesos
self.frame.columnconfigure(0, weight=1)
self.frame.columnconfigure(1, weight=1)
self.frame.rowconfigure(4, weight=1)
# Actualizar info de red
self.update_net_info()
def update_net_info(self):
"""Actualiza la información de la conexión de red configurada"""
conn_type = self.shared_config['connection_type_var'].get()
if conn_type == "Serial":
port = self.shared_config['com_port_var'].get()
baud = self.shared_config['baud_rate_var'].get()
self.net_info_var.set(f"Serial: {port} @ {baud} bps")
elif conn_type == "TCP":
ip = self.shared_config['ip_address_var'].get()
port = self.shared_config['port_var'].get()
self.net_info_var.set(f"TCP: {ip}:{port}")
elif conn_type == "UDP":
ip = self.shared_config['ip_address_var'].get()
port = self.shared_config['port_var'].get()
self.net_info_var.set(f"UDP: {ip}:{port}")
def log_message(self, message, tag="info", force=False):
"""Log con formato especial para el sniffer"""
# Verificar filtros
if not force:
if tag == "com_to_net" and not self.show_com_to_net_var.get():
return
if tag == "net_to_com" and not self.show_net_to_com_var.get():
return
self.log_text.configure(state=tk.NORMAL)
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
# Agregar prefijo según la dirección
if tag == "com_to_net":
prefix = "[COM→NET]"
elif tag == "net_to_com":
prefix = "[NET→COM]"
elif tag == "error":
prefix = "[ERROR] "
elif tag == "parsed":
prefix = "[PARSED] "
else:
prefix = "[INFO] "
full_message = f"[{timestamp}] {prefix} {message}\n"
# Insertar con color
start_index = self.log_text.index(tk.END)
self.log_text.insert(tk.END, full_message)
end_index = self.log_text.index(tk.END)
self.log_text.tag_add(tag, start_index, end_index)
self.log_text.see(tk.END)
self.log_text.configure(state=tk.DISABLED)
def start_bridge(self):
"""Inicia el gateway/bridge"""
if self.bridging:
messagebox.showwarning("Advertencia", "El gateway ya está activo.")
return
# Verificar si el tipo de conexión global es compatible con el lado de red de NetCom
global_conn_type_for_network_side = self.shared_config['connection_type_var'].get()
if global_conn_type_for_network_side == "TCP-Server":
messagebox.showerror("Modo No Compatible", "El lado de red de NetCom no puede operar en modo TCP-Server (configuración global). Seleccione TCP, UDP o Serial para la conexión de red.")
return
# Actualizar info de red
self.update_net_info()
# Validar configuración
try:
com_port = self.com_port_var.get()
baud_rate = int(self.baud_rate_var.get())
bridge_delay_str = self.bridge_delay_var.get()
if not com_port.upper().startswith('COM'):
raise ValueError("Puerto COM inválido")
if baud_rate <= 0:
raise ValueError("Baud rate debe ser mayor que 0")
try:
self.current_bridge_delay = float(bridge_delay_str)
if self.current_bridge_delay < 0:
raise ValueError("El retardo del bridge no puede ser negativo.")
except ValueError:
raise ValueError("Retardo del bridge inválido. Debe ser un número (ej: 0.001).")
except ValueError as e:
messagebox.showerror("Error", f"Configuración inválida: {e}")
return
# Abrir conexión COM física
try:
# open_connection ahora devuelve (connection_object, listening_info)
_, _ = self.com_connection.open_connection("Serial", { # Ignoramos listening_info para Serial
'port': com_port,
'baudrate': baud_rate,
'bytesize': int(self.bytesize_var.get()),
'parity': self.parity_var.get(),
'stopbits': float(self.stopbits_var.get()),
'rtscts': self.rtscts_var.get(),
'dsrdtr': self.dsrdtr_var.get(),
'xonxoff': self.xonxoff_var.get()
})
# Log basic serial config
serial_config_log = f"{com_port} @ {baud_rate} bps, {self.bytesize_var.get()}{self.parity_var.get()}{self.stopbits_var.get()}"
fc_log = []
if self.rtscts_var.get(): fc_log.append("RTS/CTS")
if self.dsrdtr_var.get(): fc_log.append("DSR/DTR")
if self.xonxoff_var.get(): fc_log.append("XON/XOFF")
self.log_message(f"Puerto COM abierto: {serial_config_log}. Flow control: {', '.join(fc_log) if fc_log else 'Ninguno'}")
except Exception as e:
messagebox.showerror("Error", f"No se pudo abrir puerto COM: {e}")
return
# Abrir conexión de red
try:
# For the network side of the bridge, use shared connection settings
net_conn_type_actual = self.shared_config['connection_type_var'].get()
current_shared_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
# The first argument to get_connection_params is the dictionary it will read from.
net_conn_params = self.shared_config['config_manager'].get_connection_params(current_shared_config_values)
# open_connection ahora devuelve (connection_object, listening_info)
_, net_listening_details = self.net_connection.open_connection(net_conn_type_actual, net_conn_params)
if net_conn_type_actual == "TCP-Server" and net_listening_details: # Aunque NetCom no usa TCP-Server globalmente
self.log_message(f"{net_listening_details}")
else:
self.log_message(f"Conexión de red ({net_conn_type_actual}) abierta: {self.net_info_var.get()}")
except Exception as e:
self.com_connection.close_connection()
messagebox.showerror("Error", f"No se pudo abrir conexión de red: {e}")
return
# Resetear estadísticas
self.com_to_net_count = 0
self.net_to_com_count = 0
self.error_count = 0
self.update_stats()
# Iniciar bridge
self.bridging = True
self.status_var.set("Conectado")
self.status_label.config(foreground="green")
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_entries_state(tk.DISABLED)
self.bridge_thread = threading.Thread(target=self.run_bridge, daemon=True)
self.bridge_thread.start()
self.log_message("Gateway iniciado - Modo bridge activo")
def stop_bridge(self):
"""Detiene el gateway/bridge"""
if not self.bridging:
return
self.bridging = False
# Esperar a que termine el thread
if self.bridge_thread and self.bridge_thread.is_alive():
self.bridge_thread.join(timeout=2.0)
# Cerrar conexiones
self.com_connection.close_connection()
self.net_connection.close_connection()
self.status_var.set("Desconectado")
self.status_label.config(foreground="black")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_entries_state(tk.NORMAL)
self.log_message("Gateway detenido")
self.log_message(f"Total transferencias - COM→NET: {self.com_to_net_count}, NET→COM: {self.net_to_com_count}, Errores: {self.error_count}")
def run_bridge(self):
"""Thread principal del bridge"""
com_buffer = bytearray()
# net_buffer = bytearray() # Ya no se usa para el flujo NET->COM si pasamos los datos directamente
current_delay = self.current_bridge_delay
while self.bridging:
try:
# Leer del COM físico
com_data = self.com_connection.read_data_non_blocking()
if com_data:
com_buffer += com_data
# Buscar mensajes completos para logging
# Adaptar la condición para bytearray
while self._find_message_end_conditions(com_buffer):
end_idx = self._find_message_end(com_buffer)
if end_idx > 0:
message_bytes = bytes(com_buffer[:end_idx])
com_buffer = com_buffer[end_idx:]
# Log y parseo
use_hex_format_for_raw = self.show_parsed_var.get()
display_msg = ProtocolHandler.format_for_display(message_bytes, hex_non_printable=use_hex_format_for_raw)
self.log_message(f"Data: {display_msg}", "com_to_net")
# Intentar parsear si está habilitado
if self.show_parsed_var.get():
# Decodificar solo para el parseo
message_str_for_parse = message_bytes.decode('ascii', errors='ignore')
parsed = ProtocolHandler.parse_adam_message(message_str_for_parse)
if parsed:
# Obtener valores de mapeo
min_brix = float(self.shared_config['min_brix_map_var'].get())
max_brix = float(self.shared_config['max_brix_map_var'].get())
brix_value = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix, max_brix)
self.log_message(
f"ADAM - Addr: {parsed['address']}, "
f"mA: {parsed['ma']:.3f}, "
f"Brix: {brix_value:.3f}, "
f"Checksum: {'OK' if parsed.get('checksum_valid', True) else 'ERROR'}",
"parsed"
)
# Reenviar a la red
try:
self.net_connection.send_data(message_bytes)
self.com_to_net_count += 1
self.update_stats()
except Exception as e:
self.log_message(f"Error enviando a red: {e}", "error")
self.error_count += 1
self.update_stats()
else:
break
# Leer de la red
net_data = self.net_connection.read_data_non_blocking()
if net_data:
# Los datos de la red (net_data) son bytes.
# Se reenvían directamente al puerto COM.
# Log y parseo (opcional, sobre los datos recibidos directamente)
use_hex_format_for_raw = self.show_parsed_var.get()
display_msg = ProtocolHandler.format_for_display(net_data, hex_non_printable=use_hex_format_for_raw)
self.log_message(f"Data: {display_msg}", "net_to_com")
# Intentar parsear si está habilitado (puede ser sobre fragmentos)
if self.show_parsed_var.get():
# Decodificar solo para el parseo
# Nota: parsear fragmentos puede no ser siempre significativo para protocolos como ADAM.
message_str_for_parse = net_data.decode('ascii', errors='ignore')
parsed = ProtocolHandler.parse_adam_message(message_str_for_parse)
if parsed:
min_brix = float(self.shared_config['min_brix_map_var'].get())
max_brix = float(self.shared_config['max_brix_map_var'].get())
brix_value = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix, max_brix)
self.log_message(
f"ADAM (datos red) - Addr: {parsed['address']}, "
f"mA: {parsed['ma']:.3f}, "
f"Brix: {brix_value:.3f}, "
f"Checksum: {'OK' if parsed.get('checksum_valid', True) else 'ERROR'}",
"parsed"
)
# Reenviar al COM
try:
self.com_connection.send_data(net_data) # Enviar los bytes tal cual se recibieron
self.net_to_com_count += len(net_data) # Contar bytes en lugar de "mensajes"
self.update_stats()
except Exception as e:
self.log_message(f"Error enviando a COM: {e}", "error")
self.error_count += 1
self.update_stats()
except Exception as e:
if self.bridging:
self.log_message(f"Error en bridge: {e}", "error")
self.error_count += 1
self.update_stats()
break
# Pequeña pausa para no consumir demasiado CPU
if not com_data and not net_data:
time.sleep(current_delay)
# Asegurar que el estado se actualice
if not self.bridging:
self.frame.after(0, self._ensure_stopped_state)
def _find_message_end_conditions(self, buffer_bytes: bytearray):
"""Verifica si hay condiciones para buscar el final de un mensaje."""
if not buffer_bytes:
return False
has_terminator = any(byte_val in (ord(b'\r'), ord(b'\n')) for byte_val in buffer_bytes)
return has_terminator or len(buffer_bytes) >= 10
def _find_message_end(self, buffer_bytes: bytearray):
"""Encuentra el final de un mensaje en el buffer de bytes."""
# Buscar terminadores
for i, byte_val in enumerate(buffer_bytes):
if byte_val == ord(b'\r') or byte_val == ord(b'\n'):
return i + 1
# Si no hay terminador pero el buffer es largo, buscar mensaje ADAM completo
# Esta parte es una heurística para mensajes tipo ADAM que podrían no tener terminador
# y debe usarse con cuidado para no cortar mensajes prematuramente.
if len(buffer_bytes) >= 10:
starts_with_hash = (buffer_bytes[0] == ord(b'#'))
is_adam_value_like = False
if len(buffer_bytes) >= 8: # Asegurar que el slice buffer_bytes[2:8] sea válido
try:
# Convertir la parte del valor a string para una verificación más sencilla
value_part_str = bytes(buffer_bytes[2:8]).decode('ascii')
# Formato ADAM es XX.XXX (6 caracteres)
if len(value_part_str) == 6 and value_part_str[2] == '.' and \
value_part_str[0:2].isdigit() and value_part_str[3:6].isdigit():
is_adam_value_like = True
except UnicodeDecodeError:
pass # No es ASCII, no es el formato ADAM esperado
if starts_with_hash or is_adam_value_like:
# Heurística: si parece ADAM y tiene al menos 10 bytes.
# Si después de 10 bytes hay un terminador, incluirlo.
if len(buffer_bytes) > 10 and \
(buffer_bytes[10] == ord(b'\r') or buffer_bytes[10] == ord(b'\n')):
return 11
else:
# Asumir un mensaje de 10 bytes (ej: #AAXX.XXXCC)
return 10
return -1
def update_stats(self):
"""Actualiza las estadísticas en la GUI"""
self.com_to_net_var.set(str(self.com_to_net_count))
# Si net_to_com_count ahora cuenta bytes, el label "NET → COM:" seguido de un número
# podría interpretarse como mensajes. Para mayor claridad, se podría cambiar el label
# o el formato del valor (ej. self.net_to_com_var.set(f"{self.net_to_com_count} bytes")).
# Por ahora, solo actualizamos el valor; el label no cambia.
self.net_to_com_var.set(str(self.net_to_com_count))
self.errors_var.set(str(self.error_count))
def clear_log(self):
"""Limpia el log"""
self.log_text.configure(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.configure(state=tk.DISABLED)
self.log_message("Log limpiado", force=True)
def _set_entries_state(self, state):
"""Habilita/deshabilita los controles durante el bridge"""
self.com_port_entry.config(state=state)
self.baud_rate_entry.config(state=state)
self.bytesize_combo.config(state=state)
self.parity_combo.config(state=state)
self.stopbits_combo.config(state=state)
self.rtscts_check.config(state=state)
self.dsrdtr_check.config(state=state)
self.xonxoff_check.config(state=state)
self.bridge_delay_entry.config(state=state)
# También deshabilitar controles compartidos
if 'shared_widgets' in self.shared_config:
Utils.set_widgets_state(self.shared_config['shared_widgets'], state)
def _ensure_stopped_state(self):
"""Asegura que la GUI refleje el estado detenido"""
self.status_var.set("Desconectado")
self.status_label.config(foreground="black")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_entries_state(tk.NORMAL)
def get_config(self):
"""Obtiene la configuración actual del NetCom"""
return {
'netcom_com_port': self.com_port_var.get(),
'netcom_baud_rate': self.baud_rate_var.get(),
'netcom_rtscts': self.rtscts_var.get(),
'netcom_dsrdtr': self.dsrdtr_var.get(),
'netcom_xonxoff': self.xonxoff_var.get()
}
def set_config(self, config):
"""Establece la configuración del NetCom"""
self.com_port_var.set(config.get('netcom_com_port', 'COM3'))
self.baud_rate_var.set(config.get('netcom_baud_rate', '115200'))
self.rtscts_var.set(config.get('netcom_rtscts', False))
self.dsrdtr_var.set(config.get('netcom_dsrdtr', False))
self.xonxoff_var.set(config.get('netcom_xonxoff', False))

938
tabs/simulator_tab.py Normal file
View File

@ -0,0 +1,938 @@
"""
Tab del Simulador - Genera valores de prueba en protocolo ADAM
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import random
import time
import math
from collections import deque
from protocol_handler import ProtocolHandler
from connection_manager import ConnectionManager
from utils import Utils
class SimulatorTab:
def __init__(self, parent_frame, shared_config):
self.frame = parent_frame
self.shared_config = shared_config
# Estado del simulador
self.simulating = False
self.simulation_thread = None
self.simulation_step = 0
self.connection_manager = ConnectionManager()
# Datos para el gráfico
self.max_points = 100
self.time_data = deque(maxlen=self.max_points)
self.brix_data = deque(maxlen=self.max_points)
self.ma_data = deque(maxlen=self.max_points)
self.start_time = time.time()
# Cargar configuración inicial para obtener valores por defecto para StringVars
# Esto es para asegurar que las StringVars tengan un valor inicial antes de que set_config sea llamado
# por maselli_app.py.
initial_config = self.shared_config['config_manager'].load_config()
self.adam_address_var = tk.StringVar(value=initial_config.get('adam_address', '01'))
self.function_type_var = tk.StringVar(value=initial_config.get('function_type', 'Lineal'))
self.cycle_time_var = tk.StringVar(value=initial_config.get('cycle_time', '10.0'))
self.samples_per_cycle_var = tk.StringVar(value=initial_config.get('samples_per_cycle', '100'))
# Configuración para modo manual y errores
self.manual_input_type_var = tk.StringVar(value=initial_config.get('manual_input_type', 'Brix'))
self.manual_value_var = tk.StringVar(value=initial_config.get('manual_value', '10.0'))
try:
manual_value_float = float(initial_config.get('manual_value', '10.0'))
except ValueError:
manual_value_float = 10.0 # Fallback
self.manual_slider_var = tk.DoubleVar(value=manual_value_float)
self.current_brix_var = tk.StringVar(value="---")
self.current_ma_var = tk.StringVar(value="--.-- mA")
self.current_voltage_var = tk.StringVar(value="-.-- V") # Nueva para voltaje
# Para simulación de errores
self.random_error_timer = None
self.random_error_timer_stop_event = threading.Event()
self.replace_normal_with_error_var = tk.BooleanVar(value=False)
self.next_frame_is_error_event = threading.Event()
self.random_error_interval_var = tk.StringVar(value=initial_config.get('random_error_interval', '10.0'))
self.error_details_for_replacement = None # (message_bytes, log_suffix, error_type_str)
self.create_widgets()
def create_widgets(self):
"""Crea los widgets del tab simulador"""
# Frame de configuración del simulador
config_frame = ttk.LabelFrame(self.frame, text="Configuración Simulador")
config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2)
# Dirección ADAM
ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# self.adam_address_var inicializada en __init__
self.adam_address_entry = ttk.Entry(config_frame, textvariable=self.adam_address_var, width=5)
self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
# Función
ttk.Label(config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
# self.function_type_var inicializada en __init__
self.function_type_combo = ttk.Combobox(config_frame, textvariable=self.function_type_var,
values=["Lineal", "Sinusoidal", "Manual"],
state="readonly", width=10)
self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.bind("<<ComboboxSelected>>", self.on_function_type_change)
# Tiempo de ciclo completo (nueva característica)
ttk.Label(config_frame, text="Tiempo Ciclo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w")
# self.cycle_time_var inicializada en __init__
self.cycle_time_entry = ttk.Entry(config_frame, textvariable=self.cycle_time_var, width=8)
self.cycle_time_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew")
# Velocidad de muestreo (calculada automáticamente)
ttk.Label(config_frame, text="Muestras/ciclo:").grid(row=0, column=6, padx=5, pady=5, sticky="w")
# self.samples_per_cycle_var inicializada en __init__
self.samples_per_cycle_entry = ttk.Entry(config_frame, textvariable=self.samples_per_cycle_var, width=8)
self.samples_per_cycle_entry.grid(row=0, column=7, padx=5, pady=5, sticky="ew")
# --- Frame para modo Manual (Modificado) ---
manual_frame = ttk.LabelFrame(config_frame, text="Modo Manual")
manual_frame.grid(row=1, column=0, columnspan=8, padx=5, pady=5, sticky="ew")
ttk.Label(manual_frame, text="Entrada Por:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# self.manual_input_type_var inicializada en __init__
self.manual_input_type_combo = ttk.Combobox(manual_frame, textvariable=self.manual_input_type_var,
values=["Brix", "mA", "Voltaje"], state="readonly", width=8)
self.manual_input_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.manual_input_type_combo.bind("<<ComboboxSelected>>", self.on_manual_input_type_change)
self.manual_value_label = ttk.Label(manual_frame, text="Valor Brix:") # Se actualiza dinámicamente
self.manual_value_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
# self.manual_value_var y self.manual_slider_var inicializadas en __init__
self.manual_value_entry = ttk.Entry(manual_frame, textvariable=self.manual_value_var, width=10, state=tk.DISABLED)
self.manual_value_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
self.manual_value_entry.bind('<Return>', lambda e: self.update_slider_from_entry())
self.manual_value_entry.bind('<FocusOut>', lambda e: self.update_slider_from_entry())
# Slider
self.manual_slider = ttk.Scale(manual_frame, orient=tk.HORIZONTAL, # from_ y to_ se configuran dinámicamente
variable=self.manual_slider_var, command=self.on_slider_change,
state=tk.DISABLED, length=200)
self.manual_slider.grid(row=1, column=2, padx=5, pady=5, sticky="ew")
manual_frame.columnconfigure(2, weight=1)
# Controls Frame
controls_frame = ttk.LabelFrame(self.frame, text="Control Simulación")
controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_graph)
self.clear_graph_button.pack(side=tk.LEFT, padx=5)
# Display Frame
display_frame = ttk.LabelFrame(self.frame, text="Valores Actuales")
display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="ew")
ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# self.current_brix_var inicializada en __init__
ttk.Label(display_frame, textvariable=self.current_brix_var,
font=("Courier", 14, "bold")).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
# self.current_ma_var inicializada en __init__
ttk.Label(display_frame, textvariable=self.current_ma_var,
font=("Courier", 14, "bold")).grid(row=1, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="Voltaje:").grid(row=2, column=0, padx=5, pady=5, sticky="w")
# self.current_voltage_var inicializada en __init__
ttk.Label(display_frame, textvariable=self.current_voltage_var,
font=("Courier", 14, "bold")).grid(row=2, column=1, padx=5, pady=5, sticky="w")
# Log Frame
log_frame = ttk.LabelFrame(self.frame, text="Log de Comunicación")
log_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.log_text = scrolledtext.ScrolledText(log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# --- Frame para Simulación de Errores ---
self._setup_error_simulation_ui() # Se añade al final de create_widgets
# Configurar pesos
self.frame.columnconfigure(0, weight=1)
self.frame.columnconfigure(1, weight=1)
self.frame.rowconfigure(2, weight=1) # Log frame
# Inicializar estado
self.on_function_type_change()
def get_graph_frame(self):
"""Crea y retorna el frame para el gráfico"""
graph_frame = ttk.LabelFrame(self.frame, text="Gráfico Simulador")
graph_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
# El rowconfigure para el gráfico se hace aquí, y el de errores abajo
self.frame.rowconfigure(3, weight=1) # Graph frame (se mueve una fila abajo)
return graph_frame
def _setup_error_simulation_ui(self):
"""Crea los controles para la simulación de errores."""
error_frame = ttk.LabelFrame(self.frame, text="Simulación de Errores (Modo TCP Server)")
error_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=10, sticky="ew")
self.frame.rowconfigure(4, weight=0) # Error frame no se expande tanto como el log o gráfico
ttk.Label(error_frame, text="Tipo de Error:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.error_type_var = tk.StringVar(value="Ninguno")
self.error_type_combo = ttk.Combobox(
error_frame,
textvariable=self.error_type_var,
state="disabled", # Se habilita/deshabilita dinámicamente
values=[
"Ninguno", # Para enviar una trama normal desde este control
"ID Erróneo",
"Valor Fuera de Escala (mA)",
"Checksum Erróneo",
"Longitud Errónea (Aleatoria)",
"Trama Faltante (Omitir Envío)"
]
)
self.error_type_combo.current(0)
self.error_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.send_error_button = ttk.Button(error_frame, text="Enviar Trama Errónea",
command=self.send_selected_error_manually, state=tk.DISABLED)
self.send_error_button.grid(row=0, column=2, padx=5, pady=5)
self.random_error_var = tk.BooleanVar(value=False)
self.random_error_check = ttk.Checkbutton(
error_frame,
text="Errores Aleatorios (cada ~10s)",
variable=self.random_error_var,
command=self.toggle_random_errors,
state="disabled" # Se habilita/deshabilita dinámicamente
)
self.random_error_check.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky="w")
# Checkbox para reemplazar trama normal con error (ahora en su propia fila para claridad)
self.replace_with_error_check = ttk.Checkbutton(
error_frame,
text="Reemplazar trama normal con error",
variable=self.replace_normal_with_error_var,
state="disabled" # Se habilita/deshabilita dinámicamente
)
self.replace_with_error_check.grid(row=1, column=2, padx=(10,5), pady=5, sticky="w")
ttk.Label(error_frame, text="Intervalo Errores Aleatorios (s):").grid(row=2, column=0, padx=5, pady=5, sticky="w")
self.random_error_interval_entry = ttk.Entry(
error_frame,
textvariable=self.random_error_interval_var,
width=8, # El Entry solo necesita el parent, textvariable, width y state.
state="disabled" # Se habilita/deshabilita dinámicamente
)
self.random_error_interval_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew") # Añadir el grid para el Entry
# El grid para self.replace_with_error_check ya está definido donde se crea ese widget.
error_frame.columnconfigure(1, weight=1)
self.update_error_controls_state() # Establecer estado inicial
def update_error_controls_state(self):
"""Habilita o deshabilita los controles de error según el modo de conexión."""
# Asegurarse de que los widgets de error existan antes de intentar configurarlos
if not hasattr(self, 'error_type_combo'):
return
is_tcp_server_mode = self.shared_config['connection_type_var'].get() == "TCP-Server"
# Considerar si la simulación (conexión) está activa para habilitar el envío
# is_connection_active = self.simulating # O una propiedad más directa de ConnectionManager
# Los controles de error solo tienen sentido si estamos en modo TCP-Server
# y la conexión está activa (es decir, la simulación principal está corriendo o
# el servidor está escuchando de alguna forma).
# Por ahora, lo basaremos en is_tcp_server_mode y self.simulating
enable_controls = is_tcp_server_mode and self.simulating
new_state_tk = tk.NORMAL if enable_controls else tk.DISABLED
new_state_str = "normal" if enable_controls else "disabled" # Para Checkbutton
self.error_type_combo.config(state=new_state_tk if is_tcp_server_mode else tk.DISABLED) # Combo siempre según modo
self.send_error_button.config(state=new_state_tk)
self.random_error_check.config(state=new_state_str)
# El entry del intervalo de errores aleatorios depende de que el check de errores aleatorios esté activo
interval_entry_state_tk = tk.NORMAL if enable_controls and self.random_error_var.get() else tk.DISABLED
self.random_error_interval_entry.config(state=interval_entry_state_tk)
# El check de "Reemplazar trama normal" se habilita si los controles de error están habilitados
self.replace_with_error_check.config(state=new_state_str)
if not enable_controls and self.random_error_var.get():
self.random_error_var.set(False)
self.toggle_random_errors() # Detiene el timer si estaba activo y se deshabilitan controles
def get_current_error_sim_parameters(self):
"""Obtiene parámetros para la simulación de errores (dirección ADAM, valor mA base)."""
adam_address = self.adam_address_var.get()
base_ma_value = 12.345 # Valor por defecto
if self.function_type_var.get() == "Manual":
try:
manual_val = float(self.manual_value_var.get())
input_type = self.manual_input_type_var.get()
if input_type == "Brix":
min_b = float(self.shared_config['min_brix_map_var'].get())
max_b = float(self.shared_config['max_brix_map_var'].get())
base_ma_value = ProtocolHandler.scale_to_ma(manual_val, min_b, max_b)
elif input_type == "mA":
base_ma_value = manual_val
elif input_type == "Voltaje":
base_ma_value = ProtocolHandler.voltage_to_ma(manual_val)
except (ValueError, KeyError, TypeError):
Utils.log_message(self.log_text, "Error Sim: Usando valor mA base por defecto para error.")
else: # Si no es manual, o para tener un valor si la simulación principal no corre
# Podríamos tomar el self.current_ma_var si la simulación está corriendo
# pero para simplicidad, un valor fijo si no es manual.
pass # Mantiene 12.345
return adam_address, base_ma_value
def on_function_type_change(self, event=None):
"""Maneja el cambio de tipo de función"""
func_type = self.function_type_var.get()
is_manual_mode = (func_type == "Manual")
# Si la simulación está corriendo y el tipo de función cambia, detenerla.
if self.simulating:
self.stop_simulation()
# Configurar controles de entrada manual
manual_specific_state = tk.NORMAL if is_manual_mode else tk.DISABLED
self.manual_input_type_combo.config(state=manual_specific_state)
self.manual_value_entry.config(state=manual_specific_state)
self.manual_slider.config(state=manual_specific_state)
# Tiempo de ciclo y muestras por ciclo ahora están habilitados para todos los modos continuos
self.cycle_time_entry.config(state=tk.NORMAL)
self.samples_per_cycle_entry.config(state=tk.NORMAL)
if is_manual_mode:
self.on_manual_input_type_change() # Actualizar rangos de slider/entry y valor actual
# El estado de los botones Start/Stop depende de si la simulación está (o estaba) corriendo.
# Como stop_simulation() se llama arriba si estaba corriendo, self.simulating debería ser False aquí.
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
else:
# Este estado idealmente no se alcanzaría si stop_simulation()
# establece correctamente self.simulating a False y actualiza los botones.
# Sin embargo, como salvaguarda:
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.update_error_controls_state() # Actualizar estado de controles de error
def on_manual_input_type_change(self, event=None):
"""Maneja el cambio de tipo de entrada manual (Brix, mA, Voltaje)"""
input_type = self.manual_input_type_var.get()
min_val, max_val, default_val, label_text, precision = 0, 100, 10.0, "Valor Brix:", 2
if input_type == "Brix":
try:
min_val = float(self.shared_config['min_brix_map_var'].get())
max_val = float(self.shared_config['max_brix_map_var'].get())
if min_val >= max_val: min_val, max_val = 0.0, 80.0 # Fallback
default_val = min_val + (max_val - min_val) / 4
except (ValueError, KeyError, TypeError):
min_val, max_val = 0.0, 80.0
default_val = 10.0
label_text = "Valor Brix:"
precision = 2
elif input_type == "mA":
min_val, max_val = 0.0, 20.0
default_val = 12.0
label_text = "Valor mA:"
precision = 3
elif input_type == "Voltaje":
min_val, max_val = 0.0, 10.0
default_val = 5.0
label_text = "Valor Voltaje:"
precision = 2
self.manual_value_label.config(text=label_text)
self.manual_slider.config(from_=min_val, to=max_val)
try:
current_numeric_val = float(self.manual_value_var.get())
if not (min_val <= current_numeric_val <= max_val):
self.manual_value_var.set(f"{default_val:.{precision}f}")
self.manual_slider_var.set(default_val)
else:
self.manual_slider_var.set(current_numeric_val)
self.manual_value_var.set(f"{current_numeric_val:.{precision}f}")
except ValueError:
self.manual_value_var.set(f"{default_val:.{precision}f}")
self.manual_slider_var.set(default_val)
def on_slider_change(self, value_str):
"""Actualiza el valor del entry cuando cambia el slider"""
value = float(value_str)
input_type = self.manual_input_type_var.get()
precision = 2
if input_type == "Brix": precision = 2
elif input_type == "mA": precision = 3
elif input_type == "Voltaje": precision = 2
self.manual_value_var.set(f"{value:.{precision}f}")
def update_slider_from_entry(self):
"""Actualiza el slider cuando cambia el entry"""
try:
value = float(self.manual_value_var.get())
input_type = self.manual_input_type_var.get()
min_val, max_val, precision = 0,100,2
if input_type == "Brix":
min_val = float(self.shared_config['min_brix_map_var'].get())
max_val = float(self.shared_config['max_brix_map_var'].get())
if min_val >= max_val: min_val, max_val = 0.0, 80.0
precision = 2
elif input_type == "mA": min_val, max_val, precision = 0.0, 20.0, 3
elif input_type == "Voltaje": min_val, max_val, precision = 0.0, 10.0, 2
value = max(min_val, min(max_val, value)) # Clampear al rango
self.manual_slider_var.set(value)
self.manual_value_var.set(f"{value:.{precision}f}")
except (ValueError, KeyError, TypeError):
# Si el valor no es un número o shared_config no está listo, resetear al valor del slider
current_slider_val = self.manual_slider_var.get()
precision_fallback = 2
if self.manual_input_type_var.get() == "mA": precision_fallback = 3
self.manual_value_var.set(f"{current_slider_val:.{precision_fallback}f}")
def start_simulation(self):
"""Inicia la simulación continua"""
if self.simulating:
messagebox.showwarning("Advertencia", "La simulación ya está en curso.")
return
try:
adam_address = self.adam_address_var.get()
if len(adam_address) != 2:
messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.")
return
cycle_time = float(self.cycle_time_var.get())
if cycle_time <= 0:
messagebox.showerror("Error", "El tiempo de ciclo debe ser mayor que 0.")
return
samples_per_cycle = int(self.samples_per_cycle_var.get())
if samples_per_cycle <= 0:
messagebox.showerror("Error", "Las muestras por ciclo deben ser mayor que 0.")
return
# Validar mapeo Brix
float(self.shared_config['min_brix_map_var'].get())
float(self.shared_config['max_brix_map_var'].get())
except (ValueError, KeyError, TypeError):
messagebox.showerror("Error", "Valores inválidos en la configuración (ADAM, ciclo, muestras o mapeo Brix).")
return
try:
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
conn_type = current_config_values['connection_type']
conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values)
# open_connection ahora devuelve (connection_object, listening_info)
# El connection_object se guarda internamente en self.connection_manager
_, listening_details = self.connection_manager.open_connection(conn_type, conn_params)
if conn_type == "TCP-Server":
Utils.log_message(self.log_text, f"{listening_details} para simulación.")
elif conn_type != "TCP-Server": # Para otros tipos, el mensaje genérico
Utils.log_message(self.log_text, f"Conexión {conn_type} abierta para simulación.")
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
return
self.simulating = True
self.simulation_step = 0
self.start_time = time.time() # Reset start time for graph
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_entries_state(tk.DISABLED)
self.update_error_controls_state() # Habilitar controles de error si es TCP Server
if conn_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Esperando...")
self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True)
self.simulation_thread.start()
Utils.log_message(self.log_text, "Simulación iniciada.")
def stop_simulation(self):
"""Detiene la simulación"""
if not self.simulating:
return
self.simulating = False
# Detener el timer de errores aleatorios primero
if self.random_error_timer and self.random_error_timer.is_alive():
self.random_error_timer_stop_event.set()
self.random_error_timer.join(timeout=1.0) # Esperar un poco
self.random_error_timer = None
self.next_frame_is_error_event.clear()
self.error_details_for_replacement = None
if self.simulation_thread and self.simulation_thread.is_alive():
self.simulation_thread.join(timeout=2.0)
self.connection_manager.close_connection()
Utils.log_message(self.log_text, "Conexión cerrada.")
self._set_entries_state(tk.NORMAL)
self.on_function_type_change() # Re-evaluar estado de controles manuales
if self.connection_manager.connection_type == "TCP-Server": # Limpiar info del cliente
self.shared_config['client_connected_var'].set("Ninguno")
Utils.log_message(self.log_text, "Simulación detenida.")
self.current_brix_var.set("---")
self.current_ma_var.set("--.-- mA")
self.current_voltage_var.set("-.-- V")
self.start_button.config(state=tk.NORMAL) # Mover después de _set_entries_state y on_function_type_change
self.stop_button.config(state=tk.DISABLED)
self.update_error_controls_state() # Deshabilitar controles de error
def run_simulation(self):
"""Thread principal de simulación"""
try:
adam_address = self.adam_address_var.get()
min_brix_map = float(self.shared_config['min_brix_map_var'].get())
max_brix_map = float(self.shared_config['max_brix_map_var'].get())
function_type = self.function_type_var.get()
cycle_time = float(self.cycle_time_var.get())
samples_per_cycle = int(self.samples_per_cycle_var.get())
conn_type = self.connection_manager.connection_type # Obtener el tipo de conexión actual
# Obtener la configuración actual para el log del puerto en TCP-Server
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
sample_period = cycle_time / samples_per_cycle
while self.simulating:
message_to_send = None
ma_value_for_message_generation = 0.0 # mA que se usaría para generar la trama (normal o base para error)
# --- Determinar valores base de la simulación para este ciclo (Brix, mA) ---
# Esta lógica calcula los valores que se mostrarían y graficarían,
# y que se usarían para generar una trama normal.
target_brix = 0.0 # Brix consistente con target_ma para display/graph
# target_ma es el valor de mA que se usaría para generar el mensaje ADAM si fuera normal
# o el valor base si un error lo reemplaza.
current_manual_input_type = self.manual_input_type_var.get() # Cache para este ciclo
if function_type == "Manual": # Lógica para modo Manual
manual_input_type = self.manual_input_type_var.get()
manual_numeric_value = 0.0
try:
manual_numeric_value = float(self.manual_value_var.get())
except ValueError:
Utils.log_message(self.log_text, f"Valor manual inválido: '{self.manual_value_var.get()}'. Usando valor por defecto.")
if manual_input_type == "Brix": manual_numeric_value = min_brix_map
elif manual_input_type == "mA": manual_numeric_value = 4.0
elif manual_input_type == "Voltaje": manual_numeric_value = ProtocolHandler.ma_to_voltage(4.0)
if manual_input_type == "Brix":
target_brix = manual_numeric_value
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
elif manual_input_type == "mA":
ma_value_for_message_generation = manual_numeric_value
target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map)
elif manual_input_type == "Voltaje":
voltage_input = manual_numeric_value
ma_value_for_message_generation = ProtocolHandler.voltage_to_ma(voltage_input)
target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map)
elif function_type == "Lineal":
cycle_progress = (self.simulation_step % (2 * samples_per_cycle)) / samples_per_cycle
if cycle_progress > 1.0:
cycle_progress = 2.0 - cycle_progress
target_brix = min_brix_map + (max_brix_map - min_brix_map) * cycle_progress
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
elif function_type == "Sinusoidal":
progress = (self.simulation_step % samples_per_cycle) / samples_per_cycle
phase = progress * 2 * math.pi
sin_val = (math.sin(phase) + 1) / 2
target_brix = min_brix_map + (max_brix_map - min_brix_map) * sin_val
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
# ma_value_in_message es el valor de mA que realmente se usaría en la trama o que se mostraría
# Si la trama es reemplazada por un error, este valor sigue siendo el de la simulación normal
# para la UI, pero la trama enviada será diferente.
ma_value_for_ui_display = ma_value_for_message_generation
voltage_value_display = ProtocolHandler.ma_to_voltage(ma_value_for_ui_display)
# --- Preparar la trama a enviar (normal o error de reemplazo) ---
log_prefix_for_send = "Enviando"
log_suffix_for_send = ""
actual_error_type_sent = "Normal" # Para el log
if self.next_frame_is_error_event.is_set() and \
self.error_details_for_replacement is not None and \
self.replace_normal_with_error_var.get():
error_msg_bytes, error_log_suffix, error_type_str = self.error_details_for_replacement
message_to_send = error_msg_bytes
log_prefix_for_send = "Error Sim (Reemplazo Programado)"
log_suffix_for_send = error_log_suffix
actual_error_type_sent = error_type_str
self.next_frame_is_error_event.clear()
self.error_details_for_replacement = None
else:
# Generar trama normal
message_to_send, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, ma_value_for_message_generation)
# Preparar texto para display
brix_display_text = ""
if ma_value_for_ui_display < 4.0 and function_type == "Manual" and \
(current_manual_input_type == "mA" or current_manual_input_type == "Voltaje"):
brix_display_text = "Error (Sub 4mA)"
else:
brix_display_text = Utils.format_brix_display(target_brix)
# Actualizar GUI (StringVars son thread-safe para .set())
self.current_brix_var.set(brix_display_text)
self.current_ma_var.set(Utils.format_ma_display(ma_value_for_ui_display))
self.current_voltage_var.set(ProtocolHandler.format_voltage_display(voltage_value_display))
# Agregar punto de datos al gráfico (desde el thread GUI)
self.frame.after(0, lambda b=target_brix, m=ma_value_for_ui_display: self.add_data_point(b, m))
# --- Enviar la trama (normal o de error) ---
if message_to_send: # Si hay algo que enviar (no es "Trama Faltante" de reemplazo)
try:
if conn_type == "TCP-Server":
if not self.connection_manager.is_client_connected():
if not hasattr(self, '_waiting_for_client_logged') or not self._waiting_for_client_logged:
port_to_log = self.shared_config['config_manager'].get_connection_params(current_config_values)['port']
Utils.log_message(self.log_text, f"TCP Server: Esperando cliente en puerto {port_to_log}...")
self._waiting_for_client_logged = True
if self.connection_manager.accept_client(timeout=0.05):
Utils.log_message(self.log_text, f"TCP Server: Cliente conectado desde {self.connection_manager.client_address}")
client_info = f"{self.connection_manager.client_address[0]}:{self.connection_manager.client_address[1]}"
self.shared_config['client_connected_var'].set(client_info)
self._waiting_for_client_logged = False
elif not self.connection_manager.is_client_connected() and \
self.shared_config['client_connected_var'].get() != "Esperando...":
self.shared_config['client_connected_var'].set("Esperando...")
log_content = ProtocolHandler.format_for_display(message_to_send, hex_non_printable=True)
if actual_error_type_sent != "Normal" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"):
Utils.log_message(self.log_text, f"{log_prefix_for_send}: Trama '{actual_error_type_sent}'{log_suffix_for_send} -> {log_content}")
else:
Utils.log_message(self.log_text, f"{log_prefix_for_send}: {log_content}")
self.connection_manager.send_data(message_to_send)
if conn_type != "TCP-Server": # No leer respuesta en modo servidor
response = self.connection_manager.read_response(timeout=0.1)
if response and response.strip():
Utils.log_message(self.log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}")
parsed = ProtocolHandler.parse_adam_message(response)
if parsed:
brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map)
Utils.log_message(self.log_text,
f" -> Addr: {parsed['address']}, "
f"mA: {parsed['ma']:.3f}, "
f"Brix: {brix_resp:.3f}")
except self.connection_manager.ClientDisconnectedError:
Utils.log_message(self.log_text, "TCP Server: Cliente desconectado. Esperando nueva conexión.")
if conn_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Esperando...")
self._waiting_for_client_logged = False
except Exception as e:
Utils.log_message(self.log_text, f"Error en comunicación ({conn_type}): {e}")
self.frame.after(0, self.stop_simulation_error)
break
elif actual_error_type_sent == "Trama Faltante (Omitir Envío)" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"):
# Loguear que se omitió una trama debido al reemplazo por "Trama Faltante"
Utils.log_message(self.log_text, f"{log_prefix_for_send}: Simulación de '{actual_error_type_sent}'{log_suffix_for_send}. No se envió trama.")
self.simulation_step += 1
time.sleep(sample_period)
except Exception as e: # Catches errors in parameter fetching or main loop logic
Utils.log_message(self.log_text, f"Error en simulación: {e}")
if self.simulating: # Ensure stop is called only if an error occurs while simulating
self.frame.after(0, self.stop_simulation_error)
def stop_simulation_error(self):
"""Detiene la simulación debido a un error y muestra mensaje"""
if self.simulating: # Solo actuar si la simulación estaba activa
messagebox.showerror("Error de Simulación", "Error durante la simulación. Simulación detenida.")
self.stop_simulation() # Llama al método normal de parada
def generate_erroneous_message_logic(self, error_type, adam_address, base_ma_value):
"""Genera la trama (bytes) según el tipo de error."""
message_bytes = None
log_message_suffix = ""
if error_type == "ID Erróneo":
wrong_adam_address = "99" if adam_address != "99" else "98"
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(wrong_adam_address, base_ma_value)
log_message_suffix = f" (ID cambiado a {wrong_adam_address})"
elif error_type == "Valor Fuera de Escala (mA)":
out_of_scale_ma = 2.500 if random.random() < 0.5 else 22.500
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, out_of_scale_ma)
log_message_suffix = f" (valor mA: {out_of_scale_ma:.3f})"
elif error_type == "Checksum Erróneo":
message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value)
log_message_suffix = " (checksum incorrecto)"
elif error_type == "Longitud Errónea (Aleatoria)":
base_msg_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value)
if len(base_msg_bytes) > 1:
if random.choice([True, False]): # Acortar
cut_len = random.randint(1, max(1, len(base_msg_bytes) // 2))
message_bytes = base_msg_bytes[:-cut_len]
log_message_suffix = f" (longitud acortada en {cut_len} bytes)"
else: # Alargar
add_len = random.randint(1, 5) # Aumentado un poco el largo posible
garbage = bytes([random.randint(32, 126) for _ in range(add_len)])
message_bytes = base_msg_bytes + garbage # Podría ser al final o en medio
log_message_suffix = f" (longitud aumentada en {add_len} bytes)"
else:
message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value)
log_message_suffix = " (longitud errónea -> fallback a checksum incorrecto)"
elif error_type == "Trama Faltante (Omitir Envío)":
log_message_suffix = " (trama omitida)"
return None, log_message_suffix
elif error_type == "Ninguno": # Enviar trama normal
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value)
log_message_suffix = " (trama normal)"
else:
Utils.log_message(self.log_text, f"Error Sim: Tipo de error '{error_type}' desconocido.")
return None, f" (tipo de error '{error_type}' desconocido)"
return message_bytes, log_message_suffix
def send_selected_error_manually(self):
"""Manejador del botón 'Enviar Trama Errónea'."""
if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating):
messagebox.showwarning("No Activo", "La simulación de errores manuales requiere modo TCP-Server y simulación activa.")
return
if not self.connection_manager.is_client_connected():
Utils.log_message(self.log_text, "Error Sim: No hay cliente conectado para enviar trama errónea.")
# messagebox.showinfo("Sin Cliente", "No hay cliente conectado para enviar la trama errónea.")
# return # Permitir enviar aunque no haya cliente, el log lo indicará
error_type = self.error_type_var.get()
adam_address, base_ma_value = self.get_current_error_sim_parameters()
message_bytes, log_suffix_from_gen = self.generate_erroneous_message_logic(error_type, adam_address, base_ma_value)
if self.replace_normal_with_error_var.get():
# Programar para reemplazo en el siguiente ciclo de simulación
self.error_details_for_replacement = (message_bytes, log_suffix_from_gen, error_type)
self.next_frame_is_error_event.set()
if error_type == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.log_text, f"Error Sim Manual: Programada OMISIÓN de trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
elif message_bytes:
Utils.log_message(self.log_text, f"Error Sim Manual: Programada trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
else: # Error en generación o tipo desconocido
Utils.log_message(self.log_text, f"Error Sim Manual: No se pudo programar trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
else:
# Enviar inmediatamente como trama adicional
if message_bytes:
try:
self.connection_manager.send_data(message_bytes)
Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Trama '{error_type}'{log_suffix_from_gen} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}")
except Exception as e:
Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Fallo al enviar trama: {e}")
elif error_type == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Simulación de '{error_type}'{log_suffix_from_gen}. No se envió trama adicional.")
# else: Ya logueado por generate_erroneous_message_logic si message_bytes es None y no es "Trama Faltante"
def toggle_random_errors(self):
"""Activa o desactiva el envío de errores aleatorios."""
# self.random_error_var.get() refleja el nuevo estado del checkbox debido al clic del usuario
can_actually_start_random_errors = (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating)
if self.random_error_var.get(): # Si el usuario intenta activar los errores aleatorios
if not can_actually_start_random_errors:
Utils.log_message(self.log_text, "Error Sim: Errores aleatorios solo en TCP-Server con simulación activa.")
self.random_error_var.set(False) # Forzar a False ya que las condiciones no se cumplen
# El timer no se iniciará. update_error_controls_state() al final se encargará.
else: # Las condiciones se cumplen, iniciar el timer si no está ya activo
try:
interval_val = float(self.random_error_interval_var.get())
if interval_val <= 0:
messagebox.showerror("Error de Intervalo", "El intervalo para errores aleatorios debe ser un número positivo.")
self.random_error_var.set(False)
self.update_error_controls_state()
return
except ValueError:
messagebox.showerror("Error de Intervalo", "Valor inválido para el intervalo de errores aleatorios.")
self.random_error_var.set(False)
self.update_error_controls_state()
return
# Las condiciones se cumplen, iniciar el timer si no está ya activo
if self.random_error_timer is None or not self.random_error_timer.is_alive():
self.random_error_timer_stop_event.clear()
self.random_error_timer = threading.Thread(target=self._random_error_loop, args=(interval_val,), daemon=True)
self.random_error_timer.start()
else: # Si el usuario intenta desactivar los errores aleatorios (el checkbox ahora está desmarcado)
if self.random_error_timer and self.random_error_timer.is_alive():
Utils.log_message(self.log_text, "Error Sim: Deteniendo envío de errores aleatorios.")
self.random_error_timer_stop_event.set()
# No es necesario join aquí, se hará en stop_simulation o al cerrar.
# Actualizar siempre el estado de los controles al final, basado en el estado final de self.random_error_var
self.update_error_controls_state()
def _random_error_loop(self, initial_interval_s):
"""Bucle del hilo que envía errores aleatorios."""
possible_error_types = [val for val in self.error_type_combo['values'] if val != "Ninguno"]
if not possible_error_types: return
current_interval = initial_interval_s
Utils.log_message(self.log_text, f"Error Sim: Hilo de errores aleatorios iniciado con intervalo {current_interval:.2f}s.")
while not self.random_error_timer_stop_event.is_set():
if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating and self.connection_manager.is_client_connected()):
self.random_error_timer_stop_event.wait(1.0) # Esperar si no hay cliente o no está activo
continue
selected_random_error = random.choice(possible_error_types)
adam_address, base_ma_value = self.get_current_error_sim_parameters()
message_bytes, log_suffix = self.generate_erroneous_message_logic(selected_random_error, adam_address, base_ma_value)
if self.replace_normal_with_error_var.get():
# Programar el error para que reemplace la siguiente trama normal
self.error_details_for_replacement = (message_bytes, log_suffix, selected_random_error)
self.next_frame_is_error_event.set()
# El log de este envío se hará en run_simulation cuando efectivamente se envíe/omita
Utils.log_message(self.log_text, f"Error Sim Aleatorio: Programada trama '{selected_random_error}'{log_suffix} para reemplazo.")
else:
# Enviar el error inmediatamente, además de las tramas normales
if message_bytes:
try:
self.connection_manager.send_data(message_bytes)
Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Trama '{selected_random_error}'{log_suffix} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}")
except Exception as e:
Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Fallo al enviar: {e}")
elif selected_random_error == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Simulación de '{selected_random_error}'{log_suffix}. No se envió trama adicional.")
# Permitir que el intervalo se actualice dinámicamente
try:
new_interval = float(self.random_error_interval_var.get())
if new_interval > 0 and new_interval != current_interval:
current_interval = new_interval
Utils.log_message(self.log_text, f"Error Sim: Intervalo de errores aleatorios actualizado a {current_interval:.2f}s.")
except ValueError:
pass # Mantener el intervalo actual si el nuevo valor es inválido
self.random_error_timer_stop_event.wait(timeout=current_interval)
Utils.log_message(self.log_text, "Error Sim: Hilo de errores aleatorios detenido.")
def add_data_point(self, brix_value, ma_value):
"""Agrega un punto de datos al gráfico"""
current_time = time.time() - self.start_time
self.time_data.append(current_time)
self.brix_data.append(brix_value)
self.ma_data.append(ma_value)
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def clear_graph(self):
"""Limpia los datos del gráfico"""
Utils.clear_graph_data(self.time_data, self.brix_data, self.ma_data)
self.start_time = time.time()
Utils.log_message(self.log_text, "Gráfico limpiado.")
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def _set_entries_state(self, state):
"""Habilita/deshabilita los controles durante la simulación"""
sim_specific_widgets = [
self.adam_address_entry,
self.function_type_combo,
self.cycle_time_entry,
self.samples_per_cycle_entry
]
# No deshabilitar controles de modo manual aquí, se manejan en on_function_type_change
Utils.set_widgets_state(sim_specific_widgets, state)
if 'shared_widgets' in self.shared_config:
Utils.set_widgets_state(self.shared_config['shared_widgets'], state)
# self.update_error_controls_state() # El estado de los controles de error depende también de self.simulating
def get_config(self):
"""Obtiene la configuración actual del simulador"""
return {
'adam_address': self.adam_address_var.get(),
'function_type': self.function_type_var.get(),
'cycle_time': self.cycle_time_var.get(),
'samples_per_cycle': self.samples_per_cycle_var.get(),
'manual_input_type': self.manual_input_type_var.get(),
'manual_value': self.manual_value_var.get(),
'random_error_interval': self.random_error_interval_var.get()
}
def set_config(self, config):
"""Establece la configuración del simulador"""
self.adam_address_var.set(config.get('adam_address', '01'))
self.function_type_var.set(config.get('function_type', 'Lineal'))
self.cycle_time_var.set(config.get('cycle_time', '10.0'))
self.samples_per_cycle_var.set(config.get('samples_per_cycle', '100'))
self.manual_input_type_var.set(config.get('manual_input_type', 'Brix'))
self.manual_value_var.set(config.get('manual_value', '10.0'))
self.random_error_interval_var.set(config.get('random_error_interval', '10.0'))
try:
self.manual_slider_var.set(float(self.manual_value_var.get()))
except ValueError:
# Si el valor no es un float válido, intentar con un default o el valor del tipo
# Esto se manejará mejor en on_manual_input_type_change
pass
self.on_function_type_change() # Esto llamará a on_manual_input_type_change si es necesario
self.update_error_controls_state() # Actualizar estado de controles de error al cargar config
def on_app_close(self):
"""Llamado cuando la aplicación se está cerrando para limpiar recursos."""
if self.simulating:
self.stop_simulation() # Asegura que todo se detenga y limpie correctamente

360
tabs/trace_tab.py Normal file
View File

@ -0,0 +1,360 @@
"""
Tab del Trace - Escucha datos de un dispositivo Maselli real
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import time
import csv
from collections import deque
from datetime import datetime
import sys # Add sys import
import os # Add os import
# If this script is run directly, add the parent directory to sys.path
# to allow imports of modules like protocol_handler, connection_manager, utils
if __name__ == "__main__" and __package__ is None:
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from protocol_handler import ProtocolHandler
from connection_manager import ConnectionManager
from utils import Utils
class TraceTab:
def __init__(self, parent_frame, shared_config):
self.frame = parent_frame
self.shared_config = shared_config
# Estado del trace
self.tracing = False
self.trace_thread = None
self.connection_manager = ConnectionManager()
# Archivo CSV
self.csv_file = None
self.csv_writer = None
# Datos para el gráfico (ahora con mA también)
self.max_points = 100
self.time_data = deque(maxlen=self.max_points)
self.brix_data = deque(maxlen=self.max_points)
self.ma_data = deque(maxlen=self.max_points) # Nueva línea para mA
self.start_time = time.time()
self.create_widgets()
def create_widgets(self):
"""Crea los widgets del tab trace"""
# Control Frame
control_frame = ttk.LabelFrame(self.frame, text="Control Trace")
control_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(control_frame, text="Iniciar Trace", command=self.start_trace)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(control_frame, text="Detener Trace", command=self.stop_trace, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_graph_button = ttk.Button(control_frame, text="Limpiar Gráfico", command=self.clear_graph)
self.clear_graph_button.pack(side=tk.LEFT, padx=5)
ttk.Label(control_frame, text="Archivo CSV:").pack(side=tk.LEFT, padx=(20, 5))
self.csv_filename_var = tk.StringVar(value="Sin archivo")
ttk.Label(control_frame, textvariable=self.csv_filename_var).pack(side=tk.LEFT, padx=5)
# Display Frame
display_frame = ttk.LabelFrame(self.frame, text="Último Valor Recibido")
display_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
ttk.Label(display_frame, text="Timestamp:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.timestamp_var = tk.StringVar(value="---")
ttk.Label(display_frame, textvariable=self.timestamp_var,
font=("Courier", 12)).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="Dirección:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.address_var = tk.StringVar(value="--")
ttk.Label(display_frame, textvariable=self.address_var,
font=("Courier", 12)).grid(row=0, column=3, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="Valor mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.ma_var = tk.StringVar(value="---")
ttk.Label(display_frame, textvariable=self.ma_var,
font=("Courier", 12, "bold"), foreground="red").grid(row=1, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="Valor Brix:").grid(row=1, column=2, padx=5, pady=5, sticky="w")
self.brix_var = tk.StringVar(value="---")
ttk.Label(display_frame, textvariable=self.brix_var,
font=("Courier", 12, "bold"), foreground="blue").grid(row=1, column=3, padx=5, pady=5, sticky="w")
# Statistics Frame
stats_frame = ttk.LabelFrame(self.frame, text="Estadísticas")
stats_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
ttk.Label(stats_frame, text="Mensajes recibidos:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.msg_count_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.msg_count_var).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(stats_frame, text="Errores checksum:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.checksum_errors_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.checksum_errors_var).grid(row=0, column=3, padx=5, pady=5, sticky="w")
# Log Frame
log_frame = ttk.LabelFrame(self.frame, text="Log de Recepción")
log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.log_text = scrolledtext.ScrolledText(log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Configurar pesos
self.frame.columnconfigure(0, weight=1)
self.frame.columnconfigure(1, weight=1)
self.frame.rowconfigure(3, weight=1)
# Contadores
self.message_count = 0
self.checksum_error_count = 0
def get_graph_frame(self):
"""Crea y retorna el frame para el gráfico"""
graph_frame = ttk.LabelFrame(self.frame, text="Gráfico Trace (Brix y mA)")
graph_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.frame.rowconfigure(4, weight=1)
return graph_frame
def start_trace(self):
"""Inicia el modo trace"""
if self.tracing:
messagebox.showwarning("Advertencia", "El trace ya está en curso.")
return
# Verificar si el tipo de conexión global es compatible
global_conn_type = self.shared_config['connection_type_var'].get()
if global_conn_type == "TCP-Server":
messagebox.showerror("Modo No Compatible", "El modo Trace no es compatible cuando el tipo de conexión global es TCP-Server.")
return
# Crear archivo CSV
csv_filename = Utils.create_csv_filename("maselli_trace")
try:
self.csv_file = open(csv_filename, 'w', newline='', encoding='utf-8')
self.csv_writer = csv.writer(self.csv_file)
self.csv_writer.writerow(['Timestamp', 'Address', 'mA', 'Brix', 'Checksum_Valid', 'Raw_Message'])
self.csv_filename_var.set(csv_filename)
except Exception as e:
messagebox.showerror("Error", f"No se pudo crear archivo CSV: {e}")
return
# Abrir conexión
try:
# Construct a dictionary of current config values for get_connection_params
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
conn_type = current_config_values['connection_type']
conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values)
self.connection_manager.open_connection(conn_type, conn_params)
Utils.log_message(self.log_text, f"Conexión {conn_type} abierta para trace.")
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
if self.csv_file:
self.csv_file.close()
return
# Resetear contadores
self.message_count = 0
self.checksum_error_count = 0
self.msg_count_var.set("0")
self.checksum_errors_var.set("0")
self.tracing = True
self.start_time = time.time()
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_entries_state(tk.DISABLED)
# Iniciar thread de recepción
self.trace_thread = threading.Thread(target=self.run_trace, daemon=True)
self.trace_thread.start()
Utils.log_message(self.log_text, "Trace iniciado.")
def stop_trace(self):
"""Detiene el modo trace"""
if not self.tracing:
return
self.tracing = False
# Esperar a que termine el thread
if self.trace_thread and self.trace_thread.is_alive():
self.trace_thread.join(timeout=2.0)
# Cerrar conexión
self.connection_manager.close_connection()
Utils.log_message(self.log_text, "Conexión cerrada.")
# Cerrar archivo CSV
if self.csv_file:
self.csv_file.close()
self.csv_file = None
self.csv_writer = None
Utils.log_message(self.log_text, f"Archivo CSV guardado: {self.csv_filename_var.get()}")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_entries_state(tk.NORMAL)
Utils.log_message(self.log_text, "Trace detenido.")
Utils.log_message(self.log_text, f"Total mensajes: {self.message_count}, Errores checksum: {self.checksum_error_count}")
def run_trace(self):
"""Thread principal para recepción de datos"""
buffer = bytearray() # Cambiar buffer a bytearray
while self.tracing:
try:
# Leer datos disponibles
data = self.connection_manager.read_data_non_blocking()
if data:
buffer.extend(data) # Usar extend para bytearray
# Las condiciones de búsqueda ahora deben usar bytes
while b'\r' in buffer or b'\n' in buffer or len(buffer) >= 10: # Encontrar el primer terminador
end_idx = -1
# Iterar sobre los valores de byte
for i, byte_val in enumerate(buffer):
if byte_val == ord(b'\r') or byte_val == ord(b'\n'):
end_idx = i + 1
break
# Si no hay terminador pero el buffer es largo, buscar mensaje completo
if end_idx == -1 and len(buffer) >= 10:
# Verificar si hay un mensaje ADAM completo
# Heurística: si empieza con '#' o parece un valor ADAM
# Decodificar solo la parte necesaria para la heurística
is_adam_like = False
try:
temp_str_for_check = buffer[:10].decode('ascii', errors='ignore')
if temp_str_for_check.startswith('#') or \
(len(temp_str_for_check) >= 8 and temp_str_for_check[2:8].replace('.', '').isdigit()):
is_adam_like = True
except: pass
if is_adam_like:
end_idx = 10 # Longitud de un mensaje ADAM sin terminador explícito
if len(buffer) > 10 and (buffer[10] == ord(b'\r') or buffer[10] == ord(b'\n')):
end_idx = 11
if end_idx > 0:
message_bytes = bytes(buffer[:end_idx]) # Extraer como bytes
buffer = buffer[end_idx:]
# Procesar mensaje si tiene contenido
message_str = message_bytes.decode('ascii', errors='ignore') # Decodificar a string
if message_str.strip(): # Procesar si la cadena decodificada tiene contenido
self._process_message(message_str)
else:
break
except Exception as e:
if self.tracing:
Utils.log_message(self.log_text, f"Error en trace: {e}")
break
# Pequeña pausa para no consumir demasiado CPU
if not data:
time.sleep(0.01)
def _process_message(self, message):
"""Procesa un mensaje recibido"""
# Log del mensaje raw
display_msg = ProtocolHandler.format_for_display(message)
Utils.log_message(self.log_text, f"Recibido: {display_msg}")
# Parsear mensaje
parsed = ProtocolHandler.parse_adam_message(message)
if parsed:
# Obtener valores de mapeo
min_brix = float(self.shared_config['min_brix_map_var'].get())
max_brix = float(self.shared_config['max_brix_map_var'].get())
ma_value = parsed['ma']
brix_value = ProtocolHandler.ma_to_brix(ma_value, min_brix, max_brix)
timestamp = datetime.now()
# Actualizar contadores
self.message_count += 1
self.msg_count_var.set(str(self.message_count))
if not parsed.get('checksum_valid', True):
self.checksum_error_count += 1
self.checksum_errors_var.set(str(self.checksum_error_count))
# Actualizar display
self.timestamp_var.set(timestamp.strftime("%H:%M:%S.%f")[:-3])
self.address_var.set(parsed['address'])
self.ma_var.set(Utils.format_ma_display(ma_value))
self.brix_var.set(Utils.format_brix_display(brix_value))
# Log con detalles
checksum_status = "OK" if parsed.get('checksum_valid', True) else "ERROR"
Utils.log_message(self.log_text,
f" -> Addr: {parsed['address']}, "
f"mA: {ma_value:.3f}, "
f"Brix: {brix_value:.3f}, "
f"Checksum: {checksum_status}")
# Guardar en CSV
if self.csv_writer:
self.csv_writer.writerow([
timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
parsed['address'],
f"{ma_value:.3f}",
f"{brix_value:.3f}",
parsed.get('checksum_valid', True),
display_msg
])
if self.csv_file:
self.csv_file.flush()
# Agregar al gráfico
current_time = time.time() - self.start_time
self.time_data.append(current_time)
self.brix_data.append(brix_value)
self.ma_data.append(ma_value) # Agregar también mA
# Actualizar gráfico
if hasattr(self, 'graph_update_callback'):
self.frame.after(0, self.graph_update_callback)
else:
# Mensaje no válido
Utils.log_message(self.log_text, f" -> Mensaje no válido ADAM")
def clear_graph(self):
"""Limpia los datos del gráfico"""
Utils.clear_graph_data(self.time_data, self.brix_data, self.ma_data)
self.start_time = time.time()
Utils.log_message(self.log_text, "Gráfico limpiado.")
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def _set_entries_state(self, state):
"""Habilita/deshabilita los controles durante el trace"""
# Deshabilitar controles compartidos
if 'shared_widgets' in self.shared_config:
Utils.set_widgets_state(self.shared_config['shared_widgets'], state)
def get_config(self):
"""Obtiene la configuración actual (no hay configuración específica para trace)"""
return {}
def set_config(self, config):
"""Establece la configuración (no hay configuración específica para trace)"""
pass

103
utils.py Normal file
View File

@ -0,0 +1,103 @@
"""
Utilidades comunes para el proyecto
"""
import tkinter as tk
from datetime import datetime
import os
MAX_LOG_LINES = 100 # Número máximo de líneas en el log
class Utils:
@staticmethod
def log_message(log_widget, message):
"""Escribe un mensaje con timestamp en el widget de log especificado, limitando el número de líneas."""
if log_widget:
log_widget.configure(state=tk.NORMAL)
timestamp = datetime.now().strftime('%H:%M:%S')
log_widget.insert(tk.END, f"[{timestamp}] {message}\n")
# Limitar el número de líneas
num_lines = int(log_widget.index('end-1c').split('.')[0])
if num_lines > MAX_LOG_LINES:
lines_to_delete = num_lines - MAX_LOG_LINES
# Sumamos 1.0 porque delete va hasta el índice anterior al segundo parámetro
log_widget.delete('1.0', f"{lines_to_delete + 1}.0")
log_widget.see(tk.END)
log_widget.configure(state=tk.DISABLED)
@staticmethod
def load_icon(root):
"""Intenta cargar un icono para la ventana"""
icon_loaded = False
for icon_file in ['icon.png', 'icon.ico', 'icon.gif']:
if os.path.exists(icon_file):
try:
if icon_file.endswith('.ico'):
root.iconbitmap(icon_file)
else:
icon = tk.PhotoImage(file=icon_file)
root.iconphoto(True, icon)
icon_loaded = True
break
except Exception as e:
print(f"No se pudo cargar {icon_file}: {e}")
if not icon_loaded:
print("No se encontró ningún archivo de icono (icon.png, icon.ico, icon.gif)")
@staticmethod
def create_csv_filename(prefix="maselli"):
"""Crea un nombre de archivo CSV con timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{prefix}_{timestamp}.csv"
@staticmethod
def validate_number_entry(value, min_val=None, max_val=None, is_float=True):
"""Valida que una entrada sea un número válido dentro del rango especificado"""
try:
num = float(value) if is_float else int(value)
if min_val is not None and num < min_val:
return False
if max_val is not None and num > max_val:
return False
return True
except ValueError:
return False
@staticmethod
def set_widgets_state(widgets, state):
"""Establece el estado de múltiples widgets"""
for widget in widgets:
try:
widget.config(state=state)
except:
pass # Algunos widgets pueden no tener la propiedad state
@staticmethod
def format_brix_display(brix_value):
"""Formatea un valor Brix para mostrar"""
return f"{brix_value:.3f} Brix"
@staticmethod
def format_ma_display(ma_value):
"""Formatea un valor mA para mostrar"""
return f"{ma_value:.3f} mA"
@staticmethod
def clear_graph_data(*data_containers):
"""Limpia los contenedores de datos del gráfico"""
for container in data_containers:
if hasattr(container, 'clear'):
container.clear()
@staticmethod
def clear_log_widget(log_widget):
"""Limpia el contenido del widget de log especificado."""
if log_widget:
log_widget.configure(state=tk.NORMAL)
log_widget.delete('1.0', tk.END)
log_widget.configure(state=tk.DISABLED)