Refactor dataset variable areas to uppercase, update plot definitions and variables, and enhance UI schema for better clarity and consistency

- Changed memory area identifiers in dataset_variables.json and related schemas from lowercase to uppercase (e.g., "db" to "DB").
- Added new plot definition for "CTS306 Conductivimeter" in plot_definitions.json.
- Updated plot_variables.json to include new variables for "HMI_Instrument.QTM306.PVFiltered" and "HMI_Instrument.QTM307.PVFiltered".
- Enhanced dataset-variables.schema.json to enforce required fields based on area and type.
- Improved ui schema for dataset variables to provide clearer guidance on field usage.
- Updated PLC client to utilize new snap7.type module and improved error handling.
- Added support for new data types (DWORD) in PlcDataTypeWidget.
- Updated Dashboard.jsx to reflect changes in memory area identifiers and improve default configurations.
- Upgraded python-snap7 dependency to version 2.0.2 for enhanced functionality.
- Modified system_state.json to reflect changes in connection status and last update timestamp.
- Implemented optimized batch reading logic in optimized_batch_reader.py to support new area mappings.
- Added VSCode configuration files for improved development experience.
- Introduced ConditionalObjectFieldTemplate.jsx for dynamic field visibility based on PLC-specific logic.
This commit is contained in:
Miguel 2025-08-25 18:02:24 +02:00
parent aba83f843a
commit 551ec8b4a5
17 changed files with 8318 additions and 9344 deletions

View File

@ -38,7 +38,7 @@ project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
import snap7
from snap7.types import S7DataItem
from snap7.type import S7DataItem
from utils.json_manager import JSONManager
import struct
import ctypes

View File

@ -19,18 +19,19 @@ project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
import snap7
from snap7.types import S7DataItem
from snap7.type import S7DataItem
from utils.json_manager import JSONManager
from utils.optimized_batch_reader import OptimizedBatchReader
import struct
import ctypes
class SimpleDataIntegrityVerifier:
"""
Sistema simplificado de verificación de integridad de datos.
Compara directamente métodos optimizado vs individual.
"""
def __init__(self):
self.json_manager = JSONManager()
self.plc = None
@ -38,46 +39,46 @@ class SimpleDataIntegrityVerifier:
"test_info": {
"start_time": datetime.now().isoformat(),
"plc_ip": None,
"total_variables": 0
"total_variables": 0,
},
"results": {}
"results": {},
}
def connect_plc(self) -> bool:
"""Conectar al PLC."""
try:
print("🔌 Conectando al PLC...")
# Cargar configuración
config_data = self.json_manager.read_json("plc")
plc_config = config_data.get("plc_config", {})
ip = plc_config.get("ip")
rack = plc_config.get("rack", 0)
slot = plc_config.get("slot", 2)
self.test_results["test_info"]["plc_ip"] = ip
# Conectar
self.plc = snap7.client.Client()
self.plc.connect(ip, rack, slot)
print(f"✅ Conectado a PLC: {ip}:{rack}.{slot}")
return True
except Exception as e:
print(f"❌ Error conectando PLC: {e}")
return False
def generate_test_variables(self) -> Dict[str, Dict[str, Any]]:
"""Generar conjunto comprehensivo de variables de test."""
variables = {}
print("🔧 Generando variables de test...")
# 1. DB1011 Variables - diferentes tipos
print(" 📊 Variables DB1011...")
# REALs
for i in range(10):
offset = i * 4
@ -86,9 +87,9 @@ class SimpleDataIntegrityVerifier:
"area": "db",
"db": 1011,
"offset": offset,
"type": "real"
"type": "real",
}
# INTs
for i in range(5):
offset = 100 + (i * 2)
@ -97,9 +98,9 @@ class SimpleDataIntegrityVerifier:
"area": "db",
"db": 1011,
"offset": offset,
"type": "int"
"type": "int",
}
# BOOLs
for byte_offset in range(50, 53):
for bit in range(0, 4):
@ -109,45 +110,33 @@ class SimpleDataIntegrityVerifier:
"db": 1011,
"offset": byte_offset,
"type": "bool",
"bit": bit
"bit": bit,
}
# 2. Memory Variables
print(" 🧠 Variables Memory...")
# Memory REALs
for i in range(5):
offset = 100 + (i * 4)
var_name = f"M{offset}_real"
variables[var_name] = {
"area": "m",
"offset": offset,
"type": "real"
}
variables[var_name] = {"area": "m", "offset": offset, "type": "real"}
# Memory INTs
for i in range(3):
offset = 200 + (i * 2)
var_name = f"M{offset}_int"
variables[var_name] = {
"area": "m",
"offset": offset,
"type": "int"
}
variables[var_name] = {"area": "m", "offset": offset, "type": "int"}
# 3. Input Variables
print(" 📥 Variables Input...")
# Input Words
for i in range(3):
offset = 300 + (i * 2)
var_name = f"PEW{offset}"
variables[var_name] = {
"area": "e",
"offset": offset,
"type": "int"
}
variables[var_name] = {"area": "e", "offset": offset, "type": "int"}
# Input Bits
for byte_offset in range(0, 2):
for bit in range(0, 4):
@ -156,26 +145,28 @@ class SimpleDataIntegrityVerifier:
"area": "e",
"offset": byte_offset,
"type": "bool",
"bit": bit
"bit": bit,
}
self.test_results["test_info"]["total_variables"] = len(variables)
print(f"✅ Generadas {len(variables)} variables de test")
return variables
def read_with_individual_method(self, variables: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
def read_with_individual_method(
self, variables: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Leer variables usando método individual (legacy)."""
print("📖 Leyendo con método INDIVIDUAL...")
results = {}
for var_name, config in variables.items():
try:
area = config.get("area", "db").lower()
offset = config.get("offset", 0)
var_type = config.get("type", "real").lower()
if area == "db":
db = config.get("db", 0)
if var_type == "real":
@ -218,37 +209,39 @@ class SimpleDataIntegrityVerifier:
value = None
else:
value = None
results[var_name] = value
# Pequeña pausa entre lecturas individuales
time.sleep(0.001)
except Exception as e:
print(f" ❌ Error leyendo {var_name}: {e}")
results[var_name] = None
successful = len([v for v in results.values() if v is not None])
print(f"{successful}/{len(variables)} variables leídas exitosamente")
return results
def read_with_optimized_method(self, variables: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
def read_with_optimized_method(
self, variables: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Leer variables usando método optimizado (read_multi_vars) con chunking."""
print("🚀 Leyendo con método OPTIMIZADO...")
results = {}
CHUNK_SIZE = 19 # Límite seguro para S7 (detectamos que el límite es 20)
try:
# Preparar S7DataItems
all_items = []
all_var_map = []
for var_name, config in variables.items():
try:
item = S7DataItem()
# Configurar área
area = config.get("area", "db").lower()
if area == "db":
@ -259,7 +252,7 @@ class SimpleDataIntegrityVerifier:
item.Area = 129
else:
continue
# Configurar tipo
var_type = config.get("type", "real").lower()
if var_type == "real":
@ -273,56 +266,60 @@ class SimpleDataIntegrityVerifier:
buffer_size = 1
else:
continue
item.DBNumber = config.get("db", 0)
item.Start = config.get("offset", 0)
item.Amount = 1
# Para BOOLs, ajustar offset
if var_type == "bool" and "bit" in config:
bit = config["bit"]
item.Start = (item.Start * 8) + bit
# Allocar buffer
buffer = (ctypes.c_ubyte * buffer_size)()
item.pData = ctypes.cast(buffer, ctypes.POINTER(ctypes.c_ubyte))
all_items.append(item)
all_var_map.append({"name": var_name, "config": config})
except Exception as e:
print(f" ❌ Error preparando {var_name}: {e}")
results[var_name] = None
if not all_items:
return results
print(f" 📊 Procesando {len(all_items)} variables en chunks de {CHUNK_SIZE}")
print(
f" 📊 Procesando {len(all_items)} variables en chunks de {CHUNK_SIZE}"
)
# Procesar en chunks
for chunk_start in range(0, len(all_items), CHUNK_SIZE):
chunk_end = min(chunk_start + CHUNK_SIZE, len(all_items))
chunk_items = all_items[chunk_start:chunk_end]
chunk_var_map = all_var_map[chunk_start:chunk_end]
print(f" 🔄 Procesando chunk {chunk_start//CHUNK_SIZE + 1}: variables {chunk_start+1}-{chunk_end}")
print(
f" 🔄 Procesando chunk {chunk_start//CHUNK_SIZE + 1}: variables {chunk_start+1}-{chunk_end}"
)
# Convertir chunk a ctypes array
items_array = (S7DataItem * len(chunk_items))(*chunk_items)
# Llamar read_multi_vars para este chunk
result = self.plc.read_multi_vars(items_array)
if isinstance(result, tuple) and len(result) == 2:
ret_code, returned_items = result
if ret_code == 0:
for i, item in enumerate(returned_items):
var_name = chunk_var_map[i]["name"]
config = chunk_var_map[i]["config"]
var_type = config.get("type", "real").lower()
if item.Result == 0:
try:
if var_type == "real":
@ -333,58 +330,67 @@ class SimpleDataIntegrityVerifier:
value = snap7.util.get_bool(item.pData, 0, 0)
else:
value = None
results[var_name] = value
except Exception as e:
print(f" ❌ Error extrayendo {var_name}: {e}")
results[var_name] = None
else:
print(f" ❌ Error leyendo {var_name}: código {item.Result}")
print(
f" ❌ Error leyendo {var_name}: código {item.Result}"
)
results[var_name] = None
else:
print(f" ❌ Chunk falló: código {ret_code}")
for var_info in chunk_var_map:
results[var_info["name"]] = None
else:
print(f" ❌ Formato de resultado inesperado para chunk: {type(result)}")
print(
f" ❌ Formato de resultado inesperado para chunk: {type(result)}"
)
for var_info in chunk_var_map:
results[var_info["name"]] = None
# Pequeña pausa entre chunks
time.sleep(0.01)
except Exception as e:
print(f" ❌ Error en método optimizado: {e}")
import traceback
traceback.print_exc()
for var_name in variables.keys():
if var_name not in results:
results[var_name] = None
successful = len([v for v in results.values() if v is not None])
print(f"{successful}/{len(variables)} variables leídas exitosamente")
return results
def compare_results(self, individual_results: Dict[str, Any],
optimized_results: Dict[str, Any], pass_name: str) -> Dict[str, Any]:
def compare_results(
self,
individual_results: Dict[str, Any],
optimized_results: Dict[str, Any],
pass_name: str,
) -> Dict[str, Any]:
"""Comparar resultados entre métodos."""
print(f"🔍 Comparando resultados - {pass_name}...")
comparison = {
"identical": [],
"different": [],
"individual_errors": [],
"optimized_errors": [],
"both_errors": []
"both_errors": [],
}
total_vars = len(individual_results)
for var_name in individual_results.keys():
individual_val = individual_results.get(var_name)
optimized_val = optimized_results.get(var_name)
if individual_val is None and optimized_val is None:
comparison["both_errors"].append(var_name)
elif individual_val is None:
@ -396,126 +402,136 @@ class SimpleDataIntegrityVerifier:
if self._values_equal(individual_val, optimized_val):
comparison["identical"].append(var_name)
else:
comparison["different"].append({
"variable": var_name,
"individual": individual_val,
"optimized": optimized_val
})
comparison["different"].append(
{
"variable": var_name,
"individual": individual_val,
"optimized": optimized_val,
}
)
# Estadísticas
identical_count = len(comparison["identical"])
different_count = len(comparison["different"])
print(f" 📊 Resultados {pass_name}:")
print(f" ✅ Idénticas: {identical_count}/{total_vars} ({identical_count/total_vars*100:.1f}%)")
print(
f" ✅ Idénticas: {identical_count}/{total_vars} ({identical_count/total_vars*100:.1f}%)"
)
print(f" ❌ Diferentes: {different_count}/{total_vars}")
print(f" ⚠️ Errores individual: {len(comparison['individual_errors'])}")
print(f" ⚠️ Errores optimizado: {len(comparison['optimized_errors'])}")
print(f" ⚠️ Errores ambos: {len(comparison['both_errors'])}")
return comparison
def _values_equal(self, val1: Any, val2: Any, tolerance: float = 1e-6) -> bool:
"""Comparar valores con tolerancia para floats."""
if type(val1) != type(val2):
return False
if isinstance(val1, float):
return abs(val1 - val2) <= tolerance
else:
return val1 == val2
def run_verification(self) -> bool:
"""Ejecutar verificación de integridad."""
print("🔍 === VERIFICACIÓN DE INTEGRIDAD DE DATOS ===")
print("Sistema de doble pasada para validación de consistencia")
print("=" * 60)
try:
# 1. Conectar PLC
if not self.connect_plc():
return False
# 2. Generar variables
variables = self.generate_test_variables()
# 3. PASADA 1
print(f"\n🔄 PASADA 1")
print("-" * 20)
time.sleep(0.5)
individual_1 = self.read_with_individual_method(variables)
time.sleep(0.2)
optimized_1 = self.read_with_optimized_method(variables)
comparison_1 = self.compare_results(individual_1, optimized_1, "Pasada 1")
# 4. PASADA 2
print(f"\n🔄 PASADA 2")
print("-" * 20)
time.sleep(0.5)
individual_2 = self.read_with_individual_method(variables)
time.sleep(0.2)
optimized_2 = self.read_with_optimized_method(variables)
comparison_2 = self.compare_results(individual_2, optimized_2, "Pasada 2")
# 5. Análisis final
print(f"\n🔬 ANÁLISIS FINAL")
print("-" * 20)
identical_1 = set(comparison_1["identical"])
identical_2 = set(comparison_2["identical"])
consistently_identical = identical_1.intersection(identical_2)
total_vars = len(variables)
success_rate = len(consistently_identical) / total_vars * 100
print(f"📊 Resultados Finales:")
print(f" ✅ Variables consistentemente idénticas: {len(consistently_identical)}/{total_vars} ({success_rate:.1f}%)")
print(f" 🔄 Variables que cambiaron entre pasadas: {len(identical_1.symmetric_difference(identical_2))}")
print(
f" ✅ Variables consistentemente idénticas: {len(consistently_identical)}/{total_vars} ({success_rate:.1f}%)"
)
print(
f" 🔄 Variables que cambiaron entre pasadas: {len(identical_1.symmetric_difference(identical_2))}"
)
# Mostrar variables diferentes si las hay
if comparison_1["different"] or comparison_2["different"]:
print(f"\n❌ Variables con diferencias detectadas:")
for diff in comparison_1["different"]:
print(f" {diff['variable']}: Individual={diff['individual']}, Optimizado={diff['optimized']}")
print(
f" {diff['variable']}: Individual={diff['individual']}, Optimizado={diff['optimized']}"
)
# 6. Guardar resultado
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"data_integrity_simple_{timestamp}.json"
report = {
"test_info": self.test_results["test_info"],
"pass_1": {
"individual": individual_1,
"optimized": optimized_1,
"comparison": comparison_1
"comparison": comparison_1,
},
"pass_2": {
"individual": individual_2,
"optimized": optimized_2,
"comparison": comparison_2
"comparison": comparison_2,
},
"final_analysis": {
"success_rate": success_rate,
"consistently_identical": list(consistently_identical),
"total_variables": total_vars
}
"total_variables": total_vars,
},
}
with open(filename, 'w', encoding='utf-8') as f:
with open(filename, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"📄 Reporte guardado: {filename}")
# 7. Cleanup
self.plc.disconnect()
print("✅ PLC desconectado")
# Resultado final
if success_rate >= 95.0:
print(f"\n🎉 ¡VERIFICACIÓN EXITOSA! ({success_rate:.1f}%)")
@ -523,10 +539,11 @@ class SimpleDataIntegrityVerifier:
else:
print(f"\n⚠️ VERIFICACIÓN CON OBSERVACIONES ({success_rate:.1f}%)")
return False
except Exception as e:
print(f"❌ Error durante verificación: {e}")
import traceback
traceback.print_exc()
return False
@ -535,15 +552,15 @@ def main():
"""Función principal."""
print("🔍 SIMPLE DATA INTEGRITY VERIFICATION")
print("🚀 Iniciando verificación...")
verifier = SimpleDataIntegrityVerifier()
success = verifier.run_verification()
if success:
print("✅ ¡Sistema optimizado mantiene integridad de datos!")
else:
print("⚠️ Revisar reporte para detalles")
return success

0
.vscode/launch.json vendored Normal file
View File

0
.vscode/tasks.json vendored Normal file
View File

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@
"variables": [
{
"configType": "manual",
"area": "db",
"area": "DB",
"db": 1011,
"name": "HMI_Instrument.QTM307.PVFiltered",
"offset": 1322,
@ -14,7 +14,7 @@
},
{
"configType": "manual",
"area": "db",
"area": "DB",
"db": 1011,
"name": "HMI_Instrument.QTM306.PVFiltered",
"offset": 1296,
@ -23,7 +23,7 @@
},
{
"configType": "manual",
"area": "db",
"area": "DB",
"db": 1011,
"name": "HMI_Instrument.CTS306.PVFiltered",
"offset": 1348,
@ -32,7 +32,7 @@
},
{
"configType": "manual",
"area": "pew",
"area": "PEW",
"type": "word",
"streaming": false,
"name": "CTS306_PEW",
@ -44,14 +44,14 @@
"dataset_id": "Fast",
"variables": [
{
"area": "db",
"area": "DB",
"configType": "symbol",
"streaming": true,
"symbol": "AUX Blink_2.0S",
"type": "real"
},
{
"area": "m",
"area": "M",
"bit": 1,
"configType": "manual",
"name": "M50.1",
@ -60,7 +60,7 @@
"type": "bool"
},
{
"area": "m",
"area": "M",
"bit": 2,
"configType": "manual",
"name": "M50.2",

View File

@ -11,6 +11,18 @@
"time_window": 60,
"trigger_enabled": false,
"trigger_on_true": true
},
{
"id": "CTS306",
"line_tension": 0.4,
"name": "CTS306 Conductivimeter",
"point_hover_radius": 4,
"point_radius": 1,
"stacked": true,
"stepped": true,
"time_window": 60,
"trigger_enabled": false,
"trigger_on_true": true
}
]
}

View File

@ -32,22 +32,60 @@
"plot_id": "Clock",
"variables": [
{
"variable_name": "AUX Blink_2.0S",
"color": "#db3376",
"enabled": true,
"line_width": 2,
"y_axis": "left",
"enabled": true
"variable_name": "AUX Blink_2.0S",
"y_axis": "left"
},
{
"color": "#3498db",
"enabled": true,
"line_width": 2,
"variable_name": "M50.1",
"y_axis": "left"
},
{
"color": "#3edb33",
"enabled": true,
"line_width": 2,
"variable_name": "M50.2",
"y_axis": "left"
}
]
},
{
"plot_id": "DAR",
"variables": [
{
"color": "#3498db",
"enabled": true,
"line_width": 2,
"variable_name": "HMI_Instrument.QTM306.PVFiltered",
"y_axis": "left"
},
{
"color": "#e30d4d",
"enabled": true,
"line_width": 2,
"variable_name": "HMI_Instrument.QTM307.PVFiltered",
"y_axis": "left"
}
]
},
{
"plot_id": "CTS306",
"variables": [
{
"variable_name": "CTS306_PEW",
"color": "#3498db",
"line_width": 2,
"y_axis": "left",
"enabled": true
},
{
"variable_name": "M50.2",
"color": "#3edb33",
"variable_name": "HMI_Instrument.CTS306.PVFiltered",
"color": "#1bf38e",
"line_width": 2,
"y_axis": "left",
"enabled": true

View File

@ -38,16 +38,16 @@
"type": "string",
"title": "Memory Area",
"enum": [
"db",
"mw",
"m",
"pew",
"pe",
"paw",
"pa",
"e",
"a",
"mb"
"DB",
"MW",
"M",
"PEW",
"PE",
"PAW",
"PA",
"E",
"A",
"MB"
]
},
"db": {
@ -87,7 +87,8 @@
"uint",
"udint",
"sint",
"usint"
"usint",
"dword"
]
},
"streaming": {
@ -102,6 +103,32 @@
"area",
"offset",
"type"
],
"allOf": [
{
"if": {
"properties": {
"area": {
"const": "DB"
}
}
},
"then": {
"required": ["db"]
}
},
{
"if": {
"properties": {
"type": {
"const": "bool"
}
}
},
"then": {
"required": ["bit"]
}
}
]
},
{

View File

@ -83,43 +83,43 @@
"ui:options": {
"enumOptions": [
{
"value": "db",
"value": "DB",
"label": "🗃️ DB (Data Block)"
},
{
"value": "mw",
"value": "MW",
"label": "📊 MW (Memory Word)"
},
{
"value": "m",
"value": "M",
"label": "💾 M (Memory)"
},
{
"value": "pew",
"value": "PEW",
"label": "📥 PEW (Process Input Word)"
},
{
"value": "pe",
"value": "PE",
"label": "📥 PE (Process Input)"
},
{
"value": "paw",
"value": "PAW",
"label": "📤 PAW (Process Output Word)"
},
{
"value": "pa",
"value": "PA",
"label": "📤 PA (Process Output)"
},
{
"value": "e",
"value": "E",
"label": "🔌 E (Input)"
},
{
"value": "a",
"value": "A",
"label": "🔌 A (Output)"
},
{
"value": "mb",
"value": "MB",
"label": "💾 MB (Memory Byte)"
}
]
@ -127,8 +127,9 @@
},
"db": {
"ui:widget": "updown",
"ui:help": "Data Block number (required for DB area)",
"ui:placeholder": "1011"
"ui:help": "⚠️ Data Block number (only required for DB area - will be ignored for other areas like PE, PA, MW, etc.)",
"ui:placeholder": "1011",
"ui:description": "🗃️ This field is only used when Area = 'DB (Data Block)'"
},
"offset": {
"ui:widget": "updown",
@ -136,7 +137,8 @@
},
"bit": {
"ui:widget": "updown",
"ui:help": "Bit position (0-7) for bit-addressable areas"
"ui:help": "⚠️ Bit position (0-7) - only required for BOOL data type, will be ignored for other types",
"ui:description": "✅ This field is only used when Type = 'BOOL (1-bit boolean)'"
},
"type": {
"ui:widget": "select",
@ -182,6 +184,10 @@
{
"value": "usint",
"label": "🔢 USINT (8-bit unsigned)"
},
{
"value": "dword",
"label": "🔢 DWORD (32-bit unsigned)"
}
]
}

View File

@ -1,5 +1,6 @@
import snap7
import snap7.util
import snap7.type
import struct
import time
import threading
@ -8,6 +9,7 @@ from typing import Dict, Any, Optional
# 🚀 OPTIMIZATION: Check if optimized batch reader is available
try:
import utils.optimized_batch_reader
OPTIMIZED_BATCH_READER_AVAILABLE = True
except ImportError as e:
OPTIMIZED_BATCH_READER_AVAILABLE = False
@ -63,10 +65,11 @@ class PLCClient:
try:
# Import here to avoid circular imports
from utils.optimized_batch_reader import OptimizedBatchReader
self.batch_reader = OptimizedBatchReader(
plc_client=self,
logger=logger,
inter_read_delay=self.inter_read_delay_seconds
plc_client=self,
logger=logger,
inter_read_delay=self.inter_read_delay_seconds,
)
if logger:
logger.info("🚀 OptimizedBatchReader initialized successfully")
@ -324,7 +327,7 @@ class PLCClient:
result = self._read_memory_variable(offset, var_type)
elif area_type in [
"pew",
"pe",
"pe",
"i", # Process Input area
"ped", # Process Input Double word (REAL)
"peb", # Process Input Byte
@ -413,19 +416,21 @@ class PLCClient:
return None
def read_variables_batch(
self, variables_config: Dict[str, Dict[str, Any]], use_optimized_reading: bool = None
self,
variables_config: Dict[str, Dict[str, Any]],
use_optimized_reading: bool = None,
) -> Dict[str, Any]:
"""🚀 OPTIMIZED: Read multiple variables using advanced batch operations
This method can use either the optimized read_multi_vars method or fall back
to the legacy grouping method based on the use_optimized_reading parameter
or the global USE_OPTIMIZED_BATCH_READING setting.
When optimization is enabled and available:
- Uses snap7.read_multi_vars with automatic chunking
- Handles scattered variables across different memory areas
- Significantly reduces network overhead and improves performance
When optimization is disabled or unavailable:
- Falls back to the original grouping and batch reading method
- Maintains compatibility with older snap7 versions
@ -444,21 +449,27 @@ class PLCClient:
# <20> Determine which reading method to use
# Priority: dataset-specific setting > global setting
should_use_optimized = (
use_optimized_reading
if use_optimized_reading is not None
use_optimized_reading
if use_optimized_reading is not None
else USE_OPTIMIZED_BATCH_READING
)
# 🚀 Check if we should use the optimized batch reader
if (
should_use_optimized
and self.batch_reader is not None
should_use_optimized
and self.batch_reader is not None
and OPTIMIZED_BATCH_READER_AVAILABLE
):
# Use the optimized read_multi_vars method
if self.logger:
source = "dataset config" if use_optimized_reading is not None else "global config"
self.logger.debug(f"🚀 Using optimized batch reading for {len(variables_config)} variables (from {source})")
source = (
"dataset config"
if use_optimized_reading is not None
else "global config"
)
self.logger.debug(
f"🚀 Using optimized batch reading for {len(variables_config)} variables (from {source})"
)
return self.batch_reader.read_variables_batch(variables_config)
else:
# Fall back to the legacy grouping method
@ -474,7 +485,7 @@ class PLCClient:
self, variables_config: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Legacy batch reading method (original implementation)
This method groups variables by DB/Area and performs batch reads when possible,
reducing the number of snap7 calls compared to individual reads.
"""
@ -734,48 +745,48 @@ class PLCClient:
def _read_input_variable(self, offset: int, var_type: str) -> Any:
"""Read from Process Inputs using correct area code (0x81)"""
try:
# Use snap7.types.Areas.PE (0x81) for Process Inputs
# Use snap7.type.Areas.PE (0x81) for Process Inputs
# read_area(area, dbnumber, start, size) - only 4 parameters!
if var_type == "real":
# For REAL (32-bit float), read 4 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 4)
return struct.unpack(">f", raw_data)[0]
elif var_type == "int":
# For INT (16-bit signed), read 2 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 2)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 2)
return struct.unpack(">h", raw_data)[0]
elif var_type == "word":
# For WORD (16-bit unsigned), read 2 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 2)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 2)
return struct.unpack(">H", raw_data)[0]
elif var_type == "dint":
# For DINT (32-bit signed), read 4 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 4)
return struct.unpack(">l", raw_data)[0]
elif var_type == "dword":
# For DWORD (32-bit unsigned), read 4 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 4)
return struct.unpack(">L", raw_data)[0]
elif var_type == "byte":
# For BYTE (8-bit), read 1 byte
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 1)
return struct.unpack(">B", raw_data)[0]
elif var_type == "bool":
# For BOOL, we need to read the byte and extract the specific bit
# Default to bit 0 if not specified
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 1)
return bool(raw_data[0] & 0x01)
elif var_type == "uint":
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 2)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 2)
return struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 4)
return struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 1)
return struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 1)
return struct.unpack(">B", raw_data)[0]
except Exception as e:
if self.logger:
@ -791,48 +802,48 @@ class PLCClient:
def _read_output_variable(self, offset: int, var_type: str) -> Any:
"""Read from Process Outputs using correct area code (0x82)"""
try:
# Use snap7.types.Areas.PA (0x82) for Process Outputs
# Use snap7.type.Areas.PA (0x82) for Process Outputs
# read_area(area, dbnumber, start, size) - only 4 parameters!
if var_type == "real":
# For REAL (32-bit float), read 4 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 4)
return struct.unpack(">f", raw_data)[0]
elif var_type == "int":
# For INT (16-bit signed), read 2 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 2)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 2)
return struct.unpack(">h", raw_data)[0]
elif var_type == "word":
# For WORD (16-bit unsigned), read 2 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 2)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 2)
return struct.unpack(">H", raw_data)[0]
elif var_type == "dint":
# For DINT (32-bit signed), read 4 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 4)
return struct.unpack(">l", raw_data)[0]
elif var_type == "dword":
# For DWORD (32-bit unsigned), read 4 bytes
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 4)
return struct.unpack(">L", raw_data)[0]
elif var_type == "byte":
# For BYTE (8-bit), read 1 byte
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 1)
return struct.unpack(">B", raw_data)[0]
elif var_type == "bool":
# For BOOL, we need to read the byte and extract the specific bit
# Default to bit 0 if not specified
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 1)
return bool(raw_data[0] & 0x01)
elif var_type == "uint":
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 2)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 2)
return struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 4)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 4)
return struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 1)
return struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 1)
return struct.unpack(">B", raw_data)[0]
except Exception as e:
if self.logger:
@ -848,9 +859,9 @@ class PLCClient:
def _read_input_bit(self, offset: int, bit: int) -> bool:
"""Read from Process Input Bits using correct area code (0x81)"""
try:
# Use snap7.types.Areas.PE (0x81) for Process Inputs
# Use snap7.type.Areas.PE (0x81) for Process Inputs
# read_area(area, dbnumber, start, size) - only 4 parameters!
raw_data = self.plc.read_area(snap7.types.Areas.PE, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PE, 0, offset, 1)
return snap7.util.get_bool(raw_data, 0, bit)
except Exception as e:
if self.logger:
@ -860,9 +871,9 @@ class PLCClient:
def _read_output_bit(self, offset: int, bit: int) -> bool:
"""Read from Process Output Bits using correct area code (0x82)"""
try:
# Use snap7.types.Areas.PA (0x82) for Process Outputs
# Use snap7.type.Areas.PA (0x82) for Process Outputs
# read_area(area, dbnumber, start, size) - only 4 parameters!
raw_data = self.plc.read_area(snap7.types.Areas.PA, 0, offset, 1)
raw_data = self.plc.read_area(snap7.type.Areas.PA, 0, offset, 1)
return snap7.util.get_bool(raw_data, 0, bit)
except Exception as e:
if self.logger:
@ -1106,7 +1117,7 @@ class PLCClient:
"batch_reader_initialized": self.batch_reader is not None,
"inter_read_delay": self.inter_read_delay_seconds,
}
# Add detailed stats from the batch reader if available
if self.batch_reader is not None:
try:
@ -1115,5 +1126,5 @@ class PLCClient:
except Exception as e:
if self.logger:
self.logger.warning(f"Error getting batch reader stats: {e}")
return base_stats

View File

@ -0,0 +1,95 @@
import React from 'react'
import { SimpleGrid, Box, Heading, Text, Stack } from '@chakra-ui/react'
// ConditionalObjectFieldTemplate with PLC-specific field visibility logic
// Hides/shows fields based on PLC memory area and data type rules
export default function ConditionalObjectFieldTemplate(props) {
const { TitleField, DescriptionField, title, description, properties = [], uiSchema, formData } = props
const layout = uiSchema && uiSchema['ui:layout']
// Logic to determine if a field should be visible
const shouldShowField = (fieldName) => {
if (!formData) return true
const area = formData.area
const type = formData.type
// DB Number field logic
if (fieldName === 'db') {
// Only show DB field when area is 'db'
return area === 'db'
}
// Bit Position field logic
if (fieldName === 'bit') {
// Only show bit field for boolean types
return type === 'bool'
}
// Show all other fields by default
return true
}
// Filter properties based on visibility rules
const visibleProperties = properties.filter(prop => shouldShowField(prop.name))
if (!layout) {
return (
<Stack spacing={3}>
{title && (
TitleField ? (
<TitleField id={`${props.idSchema.$id}__title`} title={title} />
) : (
<Heading as="h5" size="sm">{title}</Heading>
)
)}
{description && (
DescriptionField ? (
<DescriptionField id={`${props.idSchema.$id}__desc`} description={description} />
) : (
<Text color="gray.500">{description}</Text>
)
)}
<Stack spacing={2}>
{visibleProperties.map((prop) => (
<Box key={prop.name}>{prop.content}</Box>
))}
</Stack>
</Stack>
)
}
// Map property name to its renderer
const propMap = new Map(visibleProperties.map((p) => [p.name, p]))
return (
<Stack spacing={3}>
{title && (
TitleField ? (
<TitleField id={`${props.idSchema.$id}__title`} title={title} />
) : (
<Heading as="h5" size="sm">{title}</Heading>
)
)}
{description && (
DescriptionField ? (
<DescriptionField id={`${props.idSchema.$id}__desc`} description={description} />
) : (
<Text color="gray.500">{description}</Text>
)
)}
{layout.map((row, rowIdx) => (
<SimpleGrid key={rowIdx} columns={12} spacing={3}>
{row.map((cell, cellIdx) => {
const prop = propMap.get(cell.name)
if (!prop) return null
const col = Math.min(Math.max(cell.width || 12, 1), 12)
return (
<Box key={`${rowIdx}-${cellIdx}`} gridColumn={`span ${col}`}>{prop.content}</Box>
)
})}
</SimpleGrid>
))}
</Stack>
)
}

View File

@ -86,7 +86,8 @@ export function PlcDataTypeWidget(props) {
'uint': '16-bit',
'udint': '32-bit',
'sint': '8-bit',
'usint': '8-bit'
'usint': '8-bit',
'dword': '32-bit'
}
const typeColors = {
@ -99,7 +100,8 @@ export function PlcDataTypeWidget(props) {
'uint': 'green',
'udint': 'green',
'sint': 'green',
'usint': 'green'
'usint': 'green',
'dword': 'orange'
}
return (

View File

@ -1405,8 +1405,8 @@ function DatasetManager() {
area: {
type: "string",
title: "Memory Area",
enum: ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"],
default: "db"
enum: ["DB", "MW", "M", "PEW", "PE", "PAW", "PA", "E", "A", "MB"],
default: "DB"
},
db: { type: "integer", title: "DB Number", minimum: 1, maximum: 9999 },
offset: { type: "integer", title: "Offset", minimum: 0, maximum: 8191 },
@ -1527,7 +1527,7 @@ function DatasetManager() {
// Build the configuration object, only including relevant fields
const config = {
name: processedVar.name || symbolName,
area: processedVar.area || "db",
area: processedVar.area || "DB",
offset: processedVar.offset !== undefined && processedVar.offset !== null ? processedVar.offset : 0,
type: processedVar.type || "real",
streaming: currentVariable.streaming || false
@ -1536,7 +1536,7 @@ function DatasetManager() {
// Only include db field if it's actually present and area requires it
if (processedVar.db !== undefined && processedVar.db !== null) {
config.db = processedVar.db
} else if (config.area === "db") {
} else if (config.area === "DB") {
// Default to 1 only for DB area if no DB number was provided
config.db = 1
}
@ -1554,7 +1554,7 @@ function DatasetManager() {
// If backend processing failed, return basic defaults
const fallbackConfig = {
name: currentVariable.name || symbolName,
area: "db", // Default to DB area
area: "DB", // Default to DB area
offset: 0,
type: "real",
bit: 0,
@ -1562,7 +1562,7 @@ function DatasetManager() {
}
// Only add db field for DB area
if (fallbackConfig.area === "db") {
if (fallbackConfig.area === "DB") {
fallbackConfig.db = 1
}
@ -1573,7 +1573,7 @@ function DatasetManager() {
// Return basic defaults on error
const errorConfig = {
name: currentVariable.name || symbolName,
area: "db", // Default to DB area
area: "DB", // Default to DB area
offset: 0,
type: "real",
bit: 0,
@ -1581,7 +1581,7 @@ function DatasetManager() {
}
// Only add db field for DB area
if (errorConfig.area === "db") {
if (errorConfig.area === "DB") {
errorConfig.db = 1
}

View File

@ -3,7 +3,7 @@ Flask==2.3.3
Flask-Cors==4.0.0
# PLC Communication
python-snap7==1.3
python-snap7==2.0.2
# System Monitoring & Process Management
psutil==5.9.5

View File

@ -1,11 +1,10 @@
{
"last_state": {
"should_connect": true,
"should_connect": false,
"should_stream": false,
"active_datasets": [
"DAR"
]
"active_datasets": []
},
"auto_recovery_enabled": true,
"last_update": "2025-08-25T16:01:38.412207"
"last_update": "2025-08-25T17:50:18.320817",
"plotjuggler_path": "C:\\Program Files\\PlotJuggler\\plotjuggler.exe"
}

View File

@ -32,7 +32,7 @@ except ImportError:
import snap7
import snap7.util
import snap7.types
import snap7.type
import time
import threading
import ctypes
@ -40,7 +40,7 @@ from typing import Dict, Any, Optional, List
# Try to import S7DataItem with fallback for different snap7 versions
try:
from snap7.types import S7DataItem
from snap7.type import S7DataItem
SNAP7_TYPES_AVAILABLE = True
except ImportError:
@ -213,15 +213,19 @@ class OptimizedBatchReader:
# Convert to ctypes array for read_multi_vars (CRITICAL for snap7 v2)
items_array = (S7DataItem * len(items_to_read))(*items_to_read)
# Perform the multi-variable read for the current chunk
result = self.plc_client.plc.read_multi_vars(items_array)
# Handle result format (result code, array of items)
if isinstance(result, tuple) and len(result) == 2:
ret_code, read_results = result
if ret_code != 0:
error_msg = snap7.util.get_error_text(ret_code) if hasattr(snap7.util, 'get_error_text') else f"Error code: {ret_code}"
error_msg = (
snap7.util.get_error_text(ret_code)
if hasattr(snap7.util, "get_error_text")
else f"Error code: {ret_code}"
)
self._log_error(f"read_multi_vars failed: {error_msg}")
for var_name, _ in chunk:
chunk_results[var_name] = None
@ -244,7 +248,11 @@ class OptimizedBatchReader:
chunk_results[var_name] = None
else:
# Handle read error
error_msg = snap7.util.get_error_text(item_result.Result) if hasattr(snap7.util, 'get_error_text') else f"Error: {item_result.Result}"
error_msg = (
snap7.util.get_error_text(item_result.Result)
if hasattr(snap7.util, "get_error_text")
else f"Error: {item_result.Result}"
)
self._log_error(f"Failed to read '{var_name}': {error_msg}")
chunk_results[var_name] = None
@ -292,6 +300,14 @@ class OptimizedBatchReader:
"mw": 131,
"md": 131,
"mb": 131,
# PEW/PAW area mappings
"pew": 129, # Process Input Words
"paw": 130, # Process Output Words
# Additional PE/PA area mappings for consistency with plc_client.py
"ped": 129, # Process Input Double word (REAL)
"peb": 129, # Process Input Byte
"pad": 130, # Process Output Double word (REAL)
"pab": 130, # Process Output Byte
}
return area_map.get(area_str.lower(), 132) # Default to DB