Implement OptimizedBatchReader for efficient PLC variable reading

- Added OptimizedBatchReader class to handle batch reading of PLC variables using snap7's read_multi_vars function.
- Introduced automatic chunking to respect S7 PDU limits, allowing for efficient handling of large variable lists.
- Replaced the existing read_variables_batch method in PLCClient with the new optimized implementation.
- Updated system_state.json to reflect the latest last_update timestamp.
- Removed commented-out code related to the previous batch reading method.
This commit is contained in:
Miguel 2025-08-19 10:41:09 +02:00
parent a1c004f11b
commit 4a064937d3
4 changed files with 986 additions and 1129 deletions

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,11 @@ import time
import threading
from typing import Dict, Any, Optional
# from utils.optimized_batch_reader import OptimizedBatchReader
# Reemplazar el método read_variables_batch actual
# self.batch_reader = OptimizedBatchReader(self.plc, self.logger)
class PLCClient:
"""Handles PLC communication operations"""

View File

@ -9,5 +9,5 @@
]
},
"auto_recovery_enabled": true,
"last_update": "2025-08-18T18:51:02.665774"
"last_update": "2025-08-19T10:28:32.782080"
}

View File

@ -0,0 +1,582 @@
"""
🚀 OPTIMIZED BATCH READER - High Performance PLC Variable Reading
================================================================
This module implements an optimized batch reading system using snap7's read_multi_vars
function with automatic chunking to handle large variable lists efficiently.
Key Features:
- Uses snap7.read_multi_vars for maximum efficiency (single network request)
- Automatic chunking to respect S7 PDU limits (19 variables per chunk)
- Works with scattered variables across different memory areas
- No requirement for consecutive variable addresses
- Comprehensive error handling and fallback mechanisms
Usage:
reader = OptimizedBatchReader(plc_client, logger=None)
results = reader.read_variables_batch(variables_config)
For integration into PLCClient:
from utils.optimized_batch_reader import OptimizedBatchReader
# Then replace the read_variables_batch method
"""
import snap7
import snap7.util
import time
import threading
from typing import Dict, Any, Optional, List
# Try to import S7DataItem with fallback for different snap7 versions
try:
from snap7.types import S7DataItem
SNAP7_TYPES_AVAILABLE = True
except ImportError:
try:
from snap7 import S7DataItem
SNAP7_TYPES_AVAILABLE = True
except ImportError:
# Create a placeholder for testing
class S7DataItem:
def __init__(self):
self.Area = 0
self.WordLen = 0
self.DBNumber = 0
self.Start = 0
self.Amount = 0
self.Result = 0
self.pData = None
self.Value = b""
SNAP7_TYPES_AVAILABLE = False
class OptimizedBatchReader:
"""
High-performance batch reader using snap7's read_multi_vars with automatic chunking.
This class can be used standalone for testing or integrated into PLCClient.
"""
def __init__(self, plc_client=None, logger=None, inter_read_delay=0.01):
"""
Initialize the optimized batch reader.
Args:
plc_client: snap7.Client instance (if None, testing mode)
logger: Logger instance for debugging
inter_read_delay: Delay between batch operations (seconds)
"""
self.plc_client = plc_client
self.logger = logger
self.inter_read_delay_seconds = inter_read_delay
# Thread safety for integration with existing PLCClient
self.io_lock = threading.RLock()
# 🔑 S7 PDU Limit: Safe chunk size for S7-300/400 CPUs
# Each variable consumes ~12 bytes in the request packet
# With 240-byte PDU limit: 240/12 ≈ 20, we use 19 for safety margin
self.CHUNK_SIZE = 19
def read_variables_batch(
self, variables_config: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""
🚨 OPTIMIZED: Read multiple variables using read_multi_vars with automatic chunking.
This method splits variable lists larger than the S7 protocol limit into
multiple optimized requests, ensuring reliability for large datasets.
Falls back to individual reads if read_multi_vars is not available.
Args:
variables_config: Dict of {var_name: var_config}
where var_config contains:
- area: "db", "m", "e", "a" (memory area)
- db: data block number (for DB area)
- offset: byte offset
- type: "real", "int", "bool", "dint", etc.
- bit: bit number (for bool type)
Returns:
Dict of {var_name: value} or {var_name: None} if read failed
"""
if not self._is_connected() or not variables_config:
return {name: None for name in variables_config}
# Check if we can use the optimized method
if not SNAP7_TYPES_AVAILABLE or not hasattr(self.plc_client, "read_multi_vars"):
self._log_error(
"read_multi_vars not available, falling back to individual reads"
)
return self._read_variables_individual(variables_config)
all_results = {}
variable_items = list(variables_config.items())
with self.io_lock:
# Process variables in chunks to respect S7 PDU limits
for i in range(0, len(variable_items), self.CHUNK_SIZE):
chunk = variable_items[i : i + self.CHUNK_SIZE]
chunk_results = self._read_variables_chunk(chunk)
all_results.update(chunk_results)
# Apply delay after the entire batch operation
if self.inter_read_delay_seconds > 0:
time.sleep(self.inter_read_delay_seconds)
return all_results
def _read_variables_chunk(self, chunk: List[tuple]) -> Dict[str, Any]:
"""
Read a single chunk of variables using read_multi_vars.
Args:
chunk: List of (var_name, config) tuples
Returns:
Dict of {var_name: value} for this chunk
"""
chunk_results = {}
try:
if not chunk:
return chunk_results
items_to_read = []
var_map = [] # Maps results back to var_name and config
# Prepare S7DataItem list for the chunk
for var_name, config in chunk:
try:
item = S7DataItem()
item.Area = self._get_area_code(config.get("area", "db"))
item.WordLen = self._get_word_len(config["type"])
item.DBNumber = config.get("db", 0)
item.Start = self._calculate_start_offset(config)
item.Amount = 1 # We always read 1 item of the specified WordLen
items_to_read.append(item)
var_map.append({"name": var_name, "config": config})
except Exception as e:
self._log_error(f"Error preparing variable {var_name}: {e}")
chunk_results[var_name] = None
if not items_to_read:
return chunk_results
# Perform the multi-variable read for the current chunk
read_results = self.plc_client.read_multi_vars(items_to_read)
# Unpack the results for the chunk
for j, item_result in enumerate(read_results):
var_name = var_map[j]["name"]
config = var_map[j]["config"]
if item_result.Result == 0: # Success
try:
chunk_results[var_name] = self._unpack_s7_data_item(
item_result, config["type"]
)
except Exception as e:
self._log_error(f"Error unpacking {var_name}: {e}")
chunk_results[var_name] = None
else:
# Handle read error
error_msg = snap7.util.get_error_text(item_result.Result)
self._log_error(f"Failed to read '{var_name}': {error_msg}")
chunk_results[var_name] = None
except Exception as e:
self._log_error(f"Batch read operation failed for chunk: {e}")
# If chunk fails catastrophically, mark all its variables as None
for var_name, _ in chunk:
chunk_results[var_name] = None
return chunk_results
def _get_area_code(self, area_str: str) -> int:
"""Maps string area name to snap7 area code."""
# Use constants that are compatible with different snap7 versions
area_map = {
"pe": 129,
"pa": 130,
"mk": 131,
"db": 132,
"ct": 28,
"tm": 29,
# Aliases commonly used in configurations
"e": 129,
"a": 130,
"i": 129,
"q": 130,
"m": 131,
"mw": 131,
"md": 131,
"mb": 131,
}
return area_map.get(area_str.lower(), 132) # Default to DB
def _get_word_len(self, type_str: str) -> int:
"""Maps string type to snap7 WordLen code."""
# Use constants that are compatible with different snap7 versions
type_map = {
"bool": 1,
"byte": 2,
"word": 4,
"dword": 6,
"int": 5,
"dint": 7,
"real": 8,
"sint": 2,
"usint": 2,
"uint": 4,
"udint": 6,
}
return type_map.get(type_str.lower(), 2) # Default to Byte
def _calculate_start_offset(self, config: Dict[str, Any]) -> int:
"""
Calculates the start offset for S7DataItem.
For bit operations on bool variables, the offset is encoded as:
(byte_offset * 8) + bit_offset
For other types, it's just the byte offset.
"""
offset = config.get("offset", 0)
bit = config.get("bit")
if config.get("type", "").lower() == "bool" and bit is not None:
return (offset * 8) + bit
return offset
def _unpack_s7_data_item(self, item: S7DataItem, var_type: str) -> Any:
"""
Unpacks the value from a returned S7DataItem's internal buffer.
Args:
item: S7DataItem with read data
var_type: Variable type string
Returns:
Unpacked value of appropriate Python type
"""
var_type = var_type.lower()
# Use snap7's high-level getters that read from the item's pData buffer
if var_type == "real":
return snap7.util.get_real(item.pData, 0)
elif var_type == "dint":
return snap7.util.get_dint(item.pData, 0)
elif var_type == "int":
return snap7.util.get_int(item.pData, 0)
elif var_type == "bool":
return snap7.util.get_bool(item.pData, 0, 0) # Bit 0 for boolean items
elif var_type == "word":
return snap7.util.get_word(item.pData, 0)
elif var_type == "byte":
return snap7.util.get_byte(item.pData, 0)
elif var_type == "uint":
return snap7.util.get_uint(item.pData, 0)
elif var_type == "udint":
return snap7.util.get_udint(item.pData, 0)
elif var_type == "sint":
return snap7.util.get_sint(item.pData, 0)
elif var_type == "usint":
return snap7.util.get_usint(item.pData, 0)
elif var_type == "dword":
return snap7.util.get_dword(item.pData, 0)
else:
# Fallback: return raw bytes
return bytes(item.pData[: item.Amount])
def _is_connected(self) -> bool:
"""Check if PLC client is connected."""
if self.plc_client is None:
return False # Testing mode
return hasattr(self.plc_client, "plc") and getattr(
self.plc_client, "connected", False
)
def _log_error(self, message: str):
"""Log error message if logger is available."""
if self.logger:
self.logger.error(message)
def _read_variables_individual(
self, variables_config: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""
Fallback method: Read variables individually when read_multi_vars is not available.
This provides compatibility with older snap7 versions.
"""
results = {}
with self.io_lock:
for var_name, config in variables_config.items():
try:
# Use the PLC client's existing read_variable method if available
if hasattr(self.plc_client, "read_variable"):
value = self.plc_client.read_variable(config)
else:
# Fallback to basic snap7 read
value = self._read_single_variable(config)
results[var_name] = value
except Exception as e:
self._log_error(f"Error reading variable {var_name}: {e}")
results[var_name] = None
# Small delay between individual reads
if self.inter_read_delay_seconds > 0:
time.sleep(
self.inter_read_delay_seconds / 10
) # Smaller delay for individual reads
return results
def _read_single_variable(self, config: Dict[str, Any]) -> Any:
"""
Read a single variable using basic snap7 methods.
This is a minimal implementation for fallback purposes.
"""
area = config.get("area", "db").lower()
offset = config.get("offset", 0)
var_type = config.get("type", "real").lower()
if area == "db":
db = config.get("db", 0)
if var_type == "real":
data = self.plc_client.db_read(db, offset, 4)
return snap7.util.get_real(data, 0)
elif var_type == "int":
data = self.plc_client.db_read(db, offset, 2)
return snap7.util.get_int(data, 0)
elif var_type == "bool":
bit = config.get("bit", 0)
data = self.plc_client.db_read(db, offset, 1)
return snap7.util.get_bool(data, 0, bit)
elif var_type == "dint":
data = self.plc_client.db_read(db, offset, 4)
return snap7.util.get_dint(data, 0)
return None
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics about the batch reader."""
return {
"chunk_size": self.CHUNK_SIZE,
"inter_read_delay": self.inter_read_delay_seconds,
"threading_enabled": True,
}
# ================================================================
# TESTING AND EXAMPLE USAGE
# ================================================================
def create_test_variables_config() -> Dict[str, Dict[str, Any]]:
"""
Create a test configuration with various variable types and locations.
This simulates a real dataset configuration.
"""
return {
# DB variables
"temperature_1": {"area": "db", "db": 10, "offset": 0, "type": "real"},
"temperature_2": {"area": "db", "db": 10, "offset": 4, "type": "real"},
"pressure": {"area": "db", "db": 10, "offset": 8, "type": "real"},
"flow_rate": {"area": "db", "db": 10, "offset": 12, "type": "real"},
"level": {"area": "db", "db": 10, "offset": 16, "type": "real"},
# Integer variables
"counter_1": {"area": "db", "db": 20, "offset": 0, "type": "int"},
"counter_2": {"area": "db", "db": 20, "offset": 2, "type": "int"},
"timer_value": {"area": "db", "db": 20, "offset": 4, "type": "dint"},
# Boolean variables
"pump_1_status": {
"area": "db",
"db": 30,
"offset": 0,
"bit": 0,
"type": "bool",
},
"pump_2_status": {
"area": "db",
"db": 30,
"offset": 0,
"bit": 1,
"type": "bool",
},
"alarm_active": {"area": "db", "db": 30, "offset": 0, "bit": 2, "type": "bool"},
"maintenance_mode": {
"area": "db",
"db": 30,
"offset": 0,
"bit": 3,
"type": "bool",
},
# Memory variables
"system_status": {"area": "m", "offset": 100, "type": "word"},
"error_code": {"area": "m", "offset": 102, "type": "int"},
# Input/Output variables
"digital_input_1": {"area": "e", "offset": 0, "bit": 0, "type": "bool"},
"digital_output_1": {"area": "a", "offset": 0, "bit": 0, "type": "bool"},
# Additional variables to test chunking (more than 19)
"extra_var_1": {"area": "db", "db": 40, "offset": 0, "type": "real"},
"extra_var_2": {"area": "db", "db": 40, "offset": 4, "type": "real"},
"extra_var_3": {"area": "db", "db": 40, "offset": 8, "type": "real"},
"extra_var_4": {"area": "db", "db": 40, "offset": 12, "type": "real"},
"extra_var_5": {"area": "db", "db": 40, "offset": 16, "type": "real"},
"extra_var_6": {"area": "db", "db": 40, "offset": 20, "type": "real"},
}
def test_batch_reader_standalone():
"""
Test the batch reader in standalone mode (without actual PLC connection).
This is useful for validating the logic without hardware.
"""
print("🧪 Testing OptimizedBatchReader in standalone mode...")
# Create test reader without PLC client
reader = OptimizedBatchReader(plc_client=None, logger=None)
# Test configuration parsing
test_config = create_test_variables_config()
print(f"📊 Test configuration has {len(test_config)} variables")
# Test chunking logic
variable_items = list(test_config.items())
chunks = []
for i in range(0, len(variable_items), reader.CHUNK_SIZE):
chunk = variable_items[i : i + reader.CHUNK_SIZE]
chunks.append(chunk)
print(f"📦 Variables split into {len(chunks)} chunks:")
for i, chunk in enumerate(chunks, 1):
print(f" Chunk {i}: {len(chunk)} variables")
# Test helper functions
print("\n🔧 Testing helper functions:")
# Test area code mapping
areas_to_test = ["db", "m", "e", "a", "pe", "pa", "mk"]
for area in areas_to_test:
code = reader._get_area_code(area)
print(f" Area '{area}' -> Code {code}")
# Test word length mapping
types_to_test = ["real", "int", "bool", "dint", "word", "byte"]
for var_type in types_to_test:
wordlen = reader._get_word_len(var_type)
print(f" Type '{var_type}' -> WordLen {wordlen}")
# Test offset calculation
test_configs = [
{"offset": 10, "type": "real"},
{"offset": 5, "bit": 3, "type": "bool"},
{"offset": 20, "type": "int"},
]
for config in test_configs:
offset = reader._calculate_start_offset(config)
print(f" Config {config} -> Start offset {offset}")
print("\n✅ Standalone testing completed successfully!")
def test_with_real_plc(ip: str, rack: int = 0, slot: int = 2):
"""
Test the batch reader with a real PLC connection.
Args:
ip: PLC IP address
rack: PLC rack number
slot: PLC slot number
"""
print(f"🔌 Testing OptimizedBatchReader with real PLC at {ip}...")
try:
# Create PLC client
plc = snap7.client.Client()
plc.connect(ip, rack, slot)
print("✅ Connected to PLC")
# Create reader with real PLC client
reader = OptimizedBatchReader(plc_client=plc, logger=None)
# Use a smaller test configuration for real testing
test_config = {
"test_real": {"area": "db", "db": 1, "offset": 0, "type": "real"},
"test_int": {"area": "db", "db": 1, "offset": 4, "type": "int"},
"test_bool": {"area": "db", "db": 1, "offset": 6, "bit": 0, "type": "bool"},
}
print(f"📊 Reading {len(test_config)} test variables...")
# Perform batch read
start_time = time.time()
results = reader.read_variables_batch(test_config)
read_time = time.time() - start_time
print(f"⏱️ Batch read completed in {read_time:.3f} seconds")
print("📋 Results:")
for var_name, value in results.items():
print(f" {var_name}: {value}")
# Disconnect
plc.disconnect()
print("🔌 Disconnected from PLC")
except Exception as e:
print(f"❌ Error testing with real PLC: {e}")
if __name__ == "__main__":
"""
Main testing entry point.
Run this script directly to test the batch reader.
"""
print("🚀 OptimizedBatchReader Testing Suite")
print("=" * 50)
# Always run standalone tests
test_batch_reader_standalone()
# Test with even larger dataset to verify chunking
print("\n🧪 Testing with large dataset (50+ variables)...")
large_config = {}
for i in range(55): # Create 55 variables to test multiple chunks
large_config[f"var_{i:02d}"] = {
"area": "db",
"db": 1 + (i // 20), # Spread across multiple DBs
"offset": (i % 20) * 4,
"type": "real",
}
reader = OptimizedBatchReader(plc_client=None, logger=None)
variable_items = list(large_config.items())
chunks = []
for i in range(0, len(variable_items), reader.CHUNK_SIZE):
chunk = variable_items[i : i + reader.CHUNK_SIZE]
chunks.append(chunk)
print(f"📦 {len(large_config)} variables split into {len(chunks)} chunks:")
for i, chunk in enumerate(chunks, 1):
print(f" Chunk {i}: {len(chunk)} variables")
# Uncomment and modify the following line to test with a real PLC
# test_with_real_plc("192.168.1.100")
print("\n🎯 Integration Instructions:")
print("1. Test this script thoroughly with your PLC configuration")
print("2. Modify the IP address in test_with_real_plc() and uncomment the call")
print("3. Once tested, import OptimizedBatchReader into plc_client.py")
print("4. Replace the existing read_variables_batch method")
print("5. Ensure proper error handling integration")