160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
"""
|
|
Manejador del protocolo ADAM/Maselli
|
|
Contiene las funciones para formatear mensajes, calcular checksums y parsear respuestas
|
|
"""
|
|
|
|
class ProtocolHandler:
|
|
@staticmethod
|
|
def calculate_checksum(message_part):
|
|
"""Calcula el checksum de un mensaje ADAM"""
|
|
s = sum(ord(c) for c in message_part)
|
|
checksum_byte = s % 256
|
|
return f"{checksum_byte:02X}"
|
|
|
|
@staticmethod
|
|
def format_ma_value(ma_val):
|
|
"""Formatea un valor mA al formato ADAM: XX.XXX (6 caracteres)"""
|
|
return f"{ma_val:06.3f}"
|
|
|
|
@staticmethod
|
|
def scale_to_ma(brix_value, min_brix_map, max_brix_map):
|
|
"""Convierte valor Brix a mA usando el mapeo configurado"""
|
|
if max_brix_map == min_brix_map:
|
|
return 4.0
|
|
|
|
percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map)
|
|
percentage = max(0.0, min(1.0, percentage))
|
|
|
|
ma_value = 4.0 + percentage * 16.0
|
|
return ma_value
|
|
|
|
@staticmethod
|
|
def ma_to_brix(ma_value, min_brix_map, max_brix_map):
|
|
"""Convierte valor mA a Brix usando el mapeo configurado"""
|
|
try:
|
|
if ma_value <= 4.0:
|
|
return min_brix_map
|
|
elif ma_value >= 20.0:
|
|
return max_brix_map
|
|
else:
|
|
# Interpolación lineal
|
|
percentage = (ma_value - 4.0) / 16.0
|
|
return min_brix_map + percentage * (max_brix_map - min_brix_map)
|
|
except:
|
|
return 0.0
|
|
|
|
@staticmethod
|
|
def create_adam_message(adam_address, brix_value, min_brix_map, max_brix_map):
|
|
"""Crea un mensaje completo ADAM (como bytes) a partir de un valor Brix"""
|
|
ma_val = ProtocolHandler.scale_to_ma(brix_value, min_brix_map, max_brix_map)
|
|
ma_str = ProtocolHandler.format_ma_value(ma_val)
|
|
|
|
message_part = f"#{adam_address}{ma_str}"
|
|
checksum = ProtocolHandler.calculate_checksum(message_part)
|
|
full_message_str = f"{message_part}{checksum}\r"
|
|
|
|
return full_message_str.encode('ascii'), ma_val
|
|
|
|
@staticmethod
|
|
def ma_to_voltage(ma_value):
|
|
"""Convierte valor mA a Voltaje (0-10V). 0mA -> 0V, 20mA -> 10V."""
|
|
# Escala lineal: Voltage = (ma_value / 20mA) * 10V
|
|
voltage = (ma_value / 20.0) * 10.0
|
|
return max(0.0, min(10.0, voltage)) # Asegurar que esté en el rango 0-10V
|
|
|
|
@staticmethod
|
|
def voltage_to_ma(voltage_value):
|
|
"""Convierte valor Voltaje (0-10V) a mA. 0V -> 0mA, 10V -> 20mA."""
|
|
# Escala lineal: mA = (voltage_value / 10V) * 20mA
|
|
ma = (voltage_value / 10.0) * 20.0
|
|
return max(0.0, min(20.0, ma)) # Asegurar que esté en el rango 0-20mA
|
|
|
|
@staticmethod
|
|
def format_voltage_display(voltage_value):
|
|
"""Formatea un valor de Voltaje para mostrar."""
|
|
return f"{voltage_value:.2f} V"
|
|
|
|
@staticmethod
|
|
def parse_adam_message(data):
|
|
"""
|
|
Parsea un mensaje del protocolo ADAM y retorna el valor en mA
|
|
Formato esperado: #AA[valor_mA][checksum]\r
|
|
Donde:
|
|
- # : Carácter inicial (opcional en algunas respuestas)
|
|
- AA : Dirección del dispositivo (2 caracteres)
|
|
- valor_mA : Valor en mA (6 caracteres, formato XX.XXX)
|
|
- checksum : Suma de verificación (2 caracteres hex)
|
|
- \r : Carácter de fin (opcional)
|
|
|
|
Retorna: dict con 'address' y 'ma', o None si no es válido
|
|
"""
|
|
try:
|
|
# Formato esperado: #AA[valor_mA][checksum]\r
|
|
# Pero también manejar respuestas sin # inicial o sin \r final
|
|
data = data.strip()
|
|
|
|
# Si empieza con #, es un mensaje estándar
|
|
if data.startswith('#'):
|
|
data = data[1:] # Remover #
|
|
|
|
# Si termina con \r, removerlo
|
|
if data.endswith('\r'):
|
|
data = data[:-1]
|
|
|
|
# Verificar longitud mínima
|
|
if len(data) < 8: # 2 addr + 6 valor mínimo
|
|
return None
|
|
|
|
address = data[:2]
|
|
value_str = data[2:8] # 6 caracteres para el valor (XX.XXX)
|
|
|
|
# Verificar si hay checksum
|
|
checksum_valid = True
|
|
if len(data) >= 10:
|
|
checksum = data[8:10] # 2 caracteres para checksum
|
|
|
|
# Verificar checksum
|
|
message_part = f"#{address}{value_str}"
|
|
calculated_checksum = ProtocolHandler.calculate_checksum(message_part)
|
|
|
|
if checksum != calculated_checksum:
|
|
checksum_valid = False
|
|
|
|
# Convertir valor a float
|
|
try:
|
|
ma_value = float(value_str)
|
|
return {
|
|
'address': address,
|
|
'ma': ma_value,
|
|
'checksum_valid': checksum_valid
|
|
}
|
|
except ValueError:
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def format_for_display(message, hex_non_printable=False):
|
|
"""Formatea un mensaje (bytes o str) para mostrar en el log"""
|
|
if isinstance(message, bytes):
|
|
if hex_non_printable:
|
|
parts = []
|
|
for byte_val in message:
|
|
# Caracteres ASCII imprimibles (32 a 126) se dejan como están.
|
|
# Otros se convierten a \xHH.
|
|
if 32 <= byte_val <= 126:
|
|
parts.append(chr(byte_val))
|
|
else:
|
|
parts.append(f'\\x{byte_val:02x}')
|
|
message_str = "".join(parts)
|
|
else:
|
|
message_str = message.decode('ascii', errors='replace') # 'replace' muestra para no decodificables
|
|
else: # Asumir que es str
|
|
message_str = str(message)
|
|
|
|
# Si no es formato hexadecimal, reemplazar caracteres de control comunes por representaciones legibles.
|
|
if not hex_non_printable:
|
|
message_str = message_str.replace('\r', '<CR>').replace('\n', '<LF>').replace('\t', '<TAB>')
|
|
return message_str
|