feat: Update dataset configurations and improve PLC communication
- Changed sampling interval for "Fast" dataset from 0.1 to 0.5 seconds. - Updated variable definitions for "Fast" dataset, including renaming and modifying properties for AUX Blink and M50 variables. - Adjusted plot definitions to reduce time window from 36 to 10 seconds. - Enhanced plot variables to ensure correct labeling and enablement of variables. - Increased inter-read delay in PLCClient for improved stability and added batch reading functionality to optimize variable reads. - Implemented asynchronous CSV writing system in DataStreamer to reduce I/O operations and improve performance. - Updated ChartjsPlot component to modify refresh rates for real-time data handling. - Adjusted system state to disable automatic connection and clear active datasets.
This commit is contained in:
parent
61365240d6
commit
ac87ce2568
11454
application_events.json
11454
application_events.json
File diff suppressed because it is too large
Load Diff
|
@ -14,7 +14,7 @@
|
|||
"id": "Fast",
|
||||
"name": "Fast",
|
||||
"prefix": "fast",
|
||||
"sampling_interval": 0.1
|
||||
"sampling_interval": 0.5
|
||||
},
|
||||
{
|
||||
"enabled": true,
|
||||
|
|
|
@ -36,20 +36,29 @@
|
|||
"dataset_id": "Fast",
|
||||
"variables": [
|
||||
{
|
||||
"area": "db",
|
||||
"configType": "symbol",
|
||||
"area": "db",
|
||||
"streaming": true,
|
||||
"symbol": "AUX Blink_1.0S",
|
||||
"symbol": "AUX Blink_2.0S",
|
||||
"type": "real"
|
||||
},
|
||||
{
|
||||
"area": "m",
|
||||
"bit": 6,
|
||||
"configType": "manual",
|
||||
"name": "AUX Blink_1.6S",
|
||||
"offset": 0,
|
||||
"streaming": true,
|
||||
"area": "m",
|
||||
"bit": 1,
|
||||
"name": "M50.1",
|
||||
"offset": 50,
|
||||
"streaming": false,
|
||||
"type": "bool"
|
||||
},
|
||||
{
|
||||
"configType": "manual",
|
||||
"area": "m",
|
||||
"type": "bool",
|
||||
"streaming": false,
|
||||
"offset": 50,
|
||||
"bit": 2,
|
||||
"name": "M50.2"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
"point_radius": 2.5,
|
||||
"stacked": true,
|
||||
"stepped": true,
|
||||
"time_window": 36,
|
||||
"time_window": 10,
|
||||
"trigger_enabled": false,
|
||||
"trigger_on_true": true,
|
||||
"trigger_variable": null,
|
||||
|
|
|
@ -4,34 +4,27 @@
|
|||
"plot_id": "plot_1",
|
||||
"variables": [
|
||||
{
|
||||
"variable_name": "UR29_Brix",
|
||||
"color": "#3498db",
|
||||
"enabled": true,
|
||||
"label": "Brix",
|
||||
"color": "#3498db",
|
||||
"line_width": 2,
|
||||
"y_axis": "left",
|
||||
"enabled": true
|
||||
"variable_name": "UR29_Brix",
|
||||
"y_axis": "left"
|
||||
},
|
||||
{
|
||||
"variable_name": "UR29_ma",
|
||||
"label": "ma",
|
||||
"color": "#dce740",
|
||||
"enabled": true,
|
||||
"label": "ma",
|
||||
"line_width": 2,
|
||||
"y_axis": "left",
|
||||
"enabled": true
|
||||
"variable_name": "UR29_ma",
|
||||
"y_axis": "left"
|
||||
},
|
||||
{
|
||||
"variable_name": "AUX Blink_1.0S",
|
||||
"color": "#3498db",
|
||||
"enabled": true,
|
||||
"line_width": 2,
|
||||
"y_axis": "right",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"variable_name": "AUX Blink_1.6S",
|
||||
"color": "#630de3",
|
||||
"line_width": 2,
|
||||
"y_axis": "right",
|
||||
"enabled": true
|
||||
"variable_name": "AUX Blink_2.0S",
|
||||
"y_axis": "right"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
@ -39,18 +32,25 @@
|
|||
"plot_id": "Clock",
|
||||
"variables": [
|
||||
{
|
||||
"color": "#3498db",
|
||||
"enabled": true,
|
||||
"variable_name": "AUX Blink_2.0S",
|
||||
"color": "#db3376",
|
||||
"line_width": 2,
|
||||
"variable_name": "AUX Blink_1.0S",
|
||||
"y_axis": "left"
|
||||
"y_axis": "left",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"color": "#87db33",
|
||||
"enabled": true,
|
||||
"variable_name": "M50.1",
|
||||
"color": "#3498db",
|
||||
"line_width": 2,
|
||||
"variable_name": "AUX Blink_1.6S",
|
||||
"y_axis": "left"
|
||||
"y_axis": "left",
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"variable_name": "M50.2",
|
||||
"color": "#3edb33",
|
||||
"line_width": 2,
|
||||
"y_axis": "left",
|
||||
"enabled": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -38,8 +38,9 @@ class PLCClient:
|
|||
# Global I/O serialization to avoid concurrent snap7 calls
|
||||
# Acts as a simple read queue to prevent 'CLI : Job pending'
|
||||
self.io_lock = threading.RLock()
|
||||
# Small inter-read delay to give PLC time between requests (seconds)
|
||||
self.inter_read_delay_seconds = 0.002
|
||||
# 🚨 CRITICAL FIX: Increased inter-read delay for industrial PLC stability
|
||||
# Original 0.002s was too aggressive, causing timing issues and lost points
|
||||
self.inter_read_delay_seconds = 0.01 # 10ms between reads for stability
|
||||
|
||||
def connect(self, ip: str, rack: int, slot: int) -> bool:
|
||||
"""Connect to S7-315 PLC"""
|
||||
|
@ -281,7 +282,13 @@ class PLCClient:
|
|||
"md",
|
||||
"mb",
|
||||
]: # Memory Word, Memory, Memory Double, Memory Byte
|
||||
result = self._read_memory_variable(offset, var_type)
|
||||
# 🚨 CRITICAL FIX: Handle memory bit reads correctly
|
||||
if var_type == "bool" and bit is not None:
|
||||
# Specific bit read (e.g., M50.1, M50.2, etc.)
|
||||
result = self._read_memory_bit(offset, bit)
|
||||
else:
|
||||
# Standard memory variable read
|
||||
result = self._read_memory_variable(offset, var_type)
|
||||
elif area_type in [
|
||||
"pew",
|
||||
"pe",
|
||||
|
@ -307,7 +314,7 @@ class PLCClient:
|
|||
self.logger.error(f"Unsupported area type: {area_type}")
|
||||
result = None
|
||||
|
||||
# Small pacing delay between PLC I/O to avoid job overlap
|
||||
# 🚨 CRITICAL: Increased pacing delay for industrial PLC stability
|
||||
if self.inter_read_delay_seconds and self.inter_read_delay_seconds > 0:
|
||||
time.sleep(self.inter_read_delay_seconds)
|
||||
|
||||
|
@ -346,6 +353,201 @@ class PLCClient:
|
|||
|
||||
return None
|
||||
|
||||
def read_variables_batch(
|
||||
self, variables_config: Dict[str, Dict[str, Any]]
|
||||
) -> Dict[str, Any]:
|
||||
"""🚨 NEW: Read multiple variables in optimized batch operations to reduce snap7 calls
|
||||
|
||||
Args:
|
||||
variables_config: Dict of {var_name: var_config}
|
||||
|
||||
Returns:
|
||||
Dict of {var_name: value} or {var_name: None} if read failed
|
||||
|
||||
This method groups variables by DB/Area and performs batch reads when possible,
|
||||
significantly reducing the number of snap7 calls and improving timing performance.
|
||||
"""
|
||||
if not self.is_connected():
|
||||
return {name: None for name in variables_config.keys()}
|
||||
|
||||
results = {}
|
||||
|
||||
# Ensure only one snap7 operation at a time
|
||||
with self.io_lock:
|
||||
try:
|
||||
# Group variables by area and DB for batch reading
|
||||
groups = self._group_variables_for_batch_reading(variables_config)
|
||||
|
||||
for group_key, var_group in groups.items():
|
||||
area_type, db_number = group_key
|
||||
|
||||
if area_type == "db" and len(var_group) > 1:
|
||||
# Batch read for DB variables
|
||||
batch_results = self._batch_read_db_variables(
|
||||
db_number, var_group
|
||||
)
|
||||
results.update(batch_results)
|
||||
else:
|
||||
# Fall back to individual reads for non-batchable variables
|
||||
for var_name, var_config in var_group.items():
|
||||
results[var_name] = self.read_variable(var_config)
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Error in batch read operation: {e}")
|
||||
|
||||
# Fall back to individual reads if batch fails
|
||||
for var_name, var_config in variables_config.items():
|
||||
results[var_name] = self.read_variable(var_config)
|
||||
|
||||
return results
|
||||
|
||||
def _group_variables_for_batch_reading(
|
||||
self, variables_config: Dict[str, Dict[str, Any]]
|
||||
) -> Dict[tuple, Dict[str, Dict[str, Any]]]:
|
||||
"""Group variables by area and DB for efficient batch reading"""
|
||||
groups = {}
|
||||
|
||||
for var_name, var_config in variables_config.items():
|
||||
area_type = var_config.get("area", "db").lower()
|
||||
db_number = var_config.get("db", None)
|
||||
|
||||
# Create group key
|
||||
group_key = (area_type, db_number)
|
||||
|
||||
if group_key not in groups:
|
||||
groups[group_key] = {}
|
||||
|
||||
groups[group_key][var_name] = var_config
|
||||
|
||||
return groups
|
||||
|
||||
def _batch_read_db_variables(
|
||||
self, db_number: int, variables: Dict[str, Dict[str, Any]]
|
||||
) -> Dict[str, Any]:
|
||||
"""Perform optimized batch read for DB variables in the same data block"""
|
||||
if not variables:
|
||||
return {}
|
||||
|
||||
results = {}
|
||||
|
||||
try:
|
||||
# Find min and max offsets to determine read range
|
||||
offsets = []
|
||||
for var_config in variables.values():
|
||||
offset = var_config.get("offset", 0)
|
||||
var_type = var_config.get("type", "real").lower()
|
||||
|
||||
# Calculate size based on type
|
||||
type_sizes = {
|
||||
"real": 4,
|
||||
"dint": 4,
|
||||
"int": 2,
|
||||
"bool": 1,
|
||||
"word": 2,
|
||||
"byte": 1,
|
||||
"uint": 2,
|
||||
"udint": 4,
|
||||
"sint": 1,
|
||||
"usint": 1,
|
||||
}
|
||||
size = type_sizes.get(var_type, 4)
|
||||
|
||||
offsets.append((offset, offset + size))
|
||||
|
||||
if not offsets:
|
||||
return results
|
||||
|
||||
# Calculate optimal read range
|
||||
min_offset = min(start for start, end in offsets)
|
||||
max_offset = max(end for start, end in offsets)
|
||||
read_size = max_offset - min_offset
|
||||
|
||||
# If range is reasonable (< 256 bytes), do single batch read
|
||||
if read_size <= 256 and len(variables) > 1:
|
||||
raw_data = self.plc.db_read(db_number, min_offset, read_size)
|
||||
|
||||
# Extract individual variable values from the batch
|
||||
for var_name, var_config in variables.items():
|
||||
try:
|
||||
offset = var_config.get("offset", 0)
|
||||
var_type = var_config.get("type", "real").lower()
|
||||
bit = var_config.get("bit")
|
||||
|
||||
# Calculate relative offset in the batch
|
||||
relative_offset = offset - min_offset
|
||||
|
||||
# Extract value based on type
|
||||
value = self._extract_value_from_batch(
|
||||
raw_data, relative_offset, var_type, bit
|
||||
)
|
||||
results[var_name] = value
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Error extracting variable {var_name} from batch: {e}"
|
||||
)
|
||||
results[var_name] = None
|
||||
|
||||
# Single delay for the entire batch
|
||||
if self.inter_read_delay_seconds > 0:
|
||||
time.sleep(self.inter_read_delay_seconds)
|
||||
|
||||
else:
|
||||
# Range too large or single variable, fall back to individual reads
|
||||
for var_name, var_config in variables.items():
|
||||
results[var_name] = self.read_variable(var_config)
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Batch read failed for DB{db_number}, falling back to individual reads: {e}"
|
||||
)
|
||||
|
||||
# Fall back to individual reads
|
||||
for var_name, var_config in variables.items():
|
||||
results[var_name] = self.read_variable(var_config)
|
||||
|
||||
return results
|
||||
|
||||
def _extract_value_from_batch(
|
||||
self, raw_data: bytes, offset: int, var_type: str, bit: Optional[int] = None
|
||||
) -> Any:
|
||||
"""Extract a specific variable value from batch read data"""
|
||||
try:
|
||||
if var_type == "real":
|
||||
return struct.unpack(">f", raw_data[offset : offset + 4])[0]
|
||||
elif var_type == "int":
|
||||
return struct.unpack(">h", raw_data[offset : offset + 2])[0]
|
||||
elif var_type == "bool":
|
||||
if bit is not None:
|
||||
return snap7.util.get_bool(raw_data[offset : offset + 1], 0, bit)
|
||||
else:
|
||||
return bool(raw_data[offset] & 0x01)
|
||||
elif var_type == "dint":
|
||||
return struct.unpack(">l", raw_data[offset : offset + 4])[0]
|
||||
elif var_type == "word":
|
||||
return struct.unpack(">H", raw_data[offset : offset + 2])[0]
|
||||
elif var_type == "byte":
|
||||
return struct.unpack(">B", raw_data[offset : offset + 1])[0]
|
||||
elif var_type == "uint":
|
||||
return struct.unpack(">H", raw_data[offset : offset + 2])[0]
|
||||
elif var_type == "udint":
|
||||
return struct.unpack(">L", raw_data[offset : offset + 4])[0]
|
||||
elif var_type == "sint":
|
||||
return struct.unpack(">b", raw_data[offset : offset + 1])[0]
|
||||
elif var_type == "usint":
|
||||
return struct.unpack(">B", raw_data[offset : offset + 1])[0]
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Error extracting {var_type} at offset {offset}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
def _read_db_variable(
|
||||
self, var_config: Dict[str, Any], offset: int, var_type: str, bit: Optional[int]
|
||||
) -> Any:
|
||||
|
|
269
core/streamer.py
269
core/streamer.py
|
@ -74,7 +74,15 @@ class DataStreamer:
|
|||
self.dataset_csv_hours = {} # dataset_id -> current hour
|
||||
self.dataset_using_modification_files = {} # dataset_id -> bool
|
||||
|
||||
# 📊 CACHE SYSTEM - Central data storage
|
||||
# <20> NEW: Asynchronous CSV Writing System
|
||||
# Buffer system to accumulate data and write in batches every 5 seconds
|
||||
self.csv_buffer = {} # dataset_id -> list of [timestamp, data_dict]
|
||||
self.csv_buffer_lock = threading.Lock()
|
||||
self.csv_flush_interval = 5.0 # seconds
|
||||
self.csv_flush_thread = None
|
||||
self.csv_flush_active = False
|
||||
|
||||
# <20>📊 CACHE SYSTEM - Central data storage
|
||||
# This is the ONLY source of data for all APIs and interfaces
|
||||
# Updated ONLY by read_dataset_variables() during streaming cycles
|
||||
self.last_read_values = {} # dataset_id -> {var_name: value}
|
||||
|
@ -334,7 +342,7 @@ class DataStreamer:
|
|||
)
|
||||
|
||||
def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]):
|
||||
"""Write data to CSV file for a specific dataset"""
|
||||
"""🚨 DEPRECATED: Use buffer_dataset_csv_data instead for async writing"""
|
||||
if dataset_id not in self.config_manager.active_datasets:
|
||||
return
|
||||
|
||||
|
@ -366,6 +374,119 @@ class DataStreamer:
|
|||
f"Error writing CSV data for dataset {dataset_id}: {e}"
|
||||
)
|
||||
|
||||
def buffer_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]):
|
||||
"""🚨 NEW: Buffer CSV data for asynchronous writing to reduce I/O operations"""
|
||||
if dataset_id not in self.config_manager.active_datasets:
|
||||
return
|
||||
|
||||
timestamp = datetime.now()
|
||||
|
||||
with self.csv_buffer_lock:
|
||||
if dataset_id not in self.csv_buffer:
|
||||
self.csv_buffer[dataset_id] = []
|
||||
|
||||
# Add data to buffer with timestamp
|
||||
self.csv_buffer[dataset_id].append({
|
||||
'timestamp': timestamp,
|
||||
'data': data.copy()
|
||||
})
|
||||
|
||||
def _csv_flush_loop(self):
|
||||
"""🚨 Background thread to flush CSV buffers every flush_interval seconds"""
|
||||
while self.csv_flush_active:
|
||||
try:
|
||||
time.sleep(self.csv_flush_interval)
|
||||
self._flush_csv_buffers()
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Error in CSV flush loop: {e}")
|
||||
|
||||
def _flush_csv_buffers(self):
|
||||
"""🚨 Flush all buffered CSV data to files"""
|
||||
flush_start_time = time.time()
|
||||
total_points_written = 0
|
||||
datasets_flushed = 0
|
||||
|
||||
with self.csv_buffer_lock:
|
||||
for dataset_id, buffer_data in self.csv_buffer.items():
|
||||
if not buffer_data:
|
||||
continue
|
||||
|
||||
try:
|
||||
self.setup_dataset_csv_file(dataset_id)
|
||||
|
||||
if dataset_id in self.dataset_csv_writers:
|
||||
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
|
||||
|
||||
for entry in buffer_data:
|
||||
timestamp_str = entry['timestamp'].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||||
data = entry['data']
|
||||
|
||||
# Create row with all variables for this dataset
|
||||
row = [timestamp_str]
|
||||
for var_name in dataset_variables.keys():
|
||||
value = data.get(var_name, None)
|
||||
# Convert boolean values to 0 or 1 for CSV consistency
|
||||
if isinstance(value, bool):
|
||||
value = 1 if value else 0
|
||||
row.append(value)
|
||||
|
||||
self.dataset_csv_writers[dataset_id].writerow(row)
|
||||
total_points_written += 1
|
||||
|
||||
# Flush file to disk
|
||||
self.dataset_csv_files[dataset_id].flush()
|
||||
datasets_flushed += 1
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Error flushing CSV buffer for dataset {dataset_id}: {e}")
|
||||
|
||||
# Clear all buffers after successful flush
|
||||
self.csv_buffer.clear()
|
||||
|
||||
flush_time = time.time() - flush_start_time
|
||||
|
||||
if total_points_written > 0 and self.logger:
|
||||
# 🚨 FIX: Avoid division by zero when flush_time is 0
|
||||
throughput = total_points_written / max(flush_time, 0.001) # Minimum 1ms to avoid division by zero
|
||||
self.logger.debug(
|
||||
f"📝 CSV Batch Flush: {total_points_written} points written across {datasets_flushed} datasets "
|
||||
f"in {flush_time:.3f}s ({throughput:.1f} points/sec)"
|
||||
)
|
||||
|
||||
def start_csv_flush_thread(self):
|
||||
"""🚨 Start the asynchronous CSV flush thread"""
|
||||
if self.csv_flush_active:
|
||||
return
|
||||
|
||||
self.csv_flush_active = True
|
||||
self.csv_flush_thread = threading.Thread(
|
||||
target=self._csv_flush_loop,
|
||||
name="csv_flush_thread",
|
||||
daemon=True
|
||||
)
|
||||
self.csv_flush_thread.start()
|
||||
|
||||
if self.logger:
|
||||
self.logger.info(f"📝 CSV async flush thread started (interval: {self.csv_flush_interval}s)")
|
||||
|
||||
def stop_csv_flush_thread(self):
|
||||
"""🚨 Stop the asynchronous CSV flush thread and flush remaining data"""
|
||||
if not self.csv_flush_active:
|
||||
return
|
||||
|
||||
self.csv_flush_active = False
|
||||
|
||||
# Final flush of any remaining data
|
||||
self._flush_csv_buffers()
|
||||
|
||||
if self.csv_flush_thread and self.csv_flush_thread.is_alive():
|
||||
self.csv_flush_thread.join(timeout=5.0)
|
||||
|
||||
if self.logger:
|
||||
self.logger.info("📝 CSV async flush thread stopped, final flush completed")
|
||||
|
||||
def create_new_dataset_csv_file_for_variable_modification(self, dataset_id: str):
|
||||
"""Create a new CSV file for a dataset when variables are modified during active recording"""
|
||||
if dataset_id not in self.config_manager.active_datasets:
|
||||
|
@ -455,44 +576,92 @@ class DataStreamer:
|
|||
|
||||
Called by: dataset_streaming_loop() at configured intervals
|
||||
Updates: self.last_read_values cache for use by all other functions
|
||||
|
||||
🚨 CRITICAL FIX: Returns None if ANY variable fails - prevents corrupt CSV data
|
||||
🚀 PERFORMANCE FIX: Uses batch reading to reduce snap7 calls and improve timing
|
||||
"""
|
||||
data = {}
|
||||
errors = {}
|
||||
timestamp = datetime.now()
|
||||
failed_variables = []
|
||||
|
||||
for var_name, var_config in variables.items():
|
||||
try:
|
||||
value = self.plc_client.read_variable(var_config)
|
||||
data[var_name] = value
|
||||
try:
|
||||
# 🚀 NEW: Use batch reading for improved performance
|
||||
batch_results = self.plc_client.read_variables_batch(variables)
|
||||
|
||||
for var_name, value in batch_results.items():
|
||||
if value is not None:
|
||||
data[var_name] = value
|
||||
|
||||
# Clear any previous error for this variable
|
||||
if (
|
||||
dataset_id in self.last_read_errors
|
||||
and var_name in self.last_read_errors[dataset_id]
|
||||
):
|
||||
del self.last_read_errors[dataset_id][var_name]
|
||||
else:
|
||||
# Variable read failed
|
||||
var_config = variables.get(var_name, {})
|
||||
error_msg = f"Batch read failed for variable {var_name}"
|
||||
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Error reading variable {var_name} in dataset {dataset_id}: {error_msg}"
|
||||
)
|
||||
data[var_name] = None
|
||||
errors[var_name] = error_msg
|
||||
failed_variables.append(var_name)
|
||||
|
||||
except Exception as e:
|
||||
# Fall back to individual reads if batch reading fails completely
|
||||
if self.logger:
|
||||
self.logger.warning(f"Batch reading failed for dataset {dataset_id}, falling back to individual reads: {e}")
|
||||
|
||||
for var_name, var_config in variables.items():
|
||||
try:
|
||||
value = self.plc_client.read_variable(var_config)
|
||||
data[var_name] = value
|
||||
|
||||
# Clear any previous error for this variable
|
||||
if (
|
||||
dataset_id in self.last_read_errors
|
||||
and var_name in self.last_read_errors[dataset_id]
|
||||
):
|
||||
del self.last_read_errors[dataset_id][var_name]
|
||||
# Clear any previous error for this variable
|
||||
if (
|
||||
dataset_id in self.last_read_errors
|
||||
and var_name in self.last_read_errors[dataset_id]
|
||||
):
|
||||
del self.last_read_errors[dataset_id][var_name]
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Error reading variable {var_name} in dataset {dataset_id}: {e}"
|
||||
)
|
||||
data[var_name] = None
|
||||
errors[var_name] = f"Read error: {str(e)}"
|
||||
except Exception as read_error:
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Error reading variable {var_name} in dataset {dataset_id}: {read_error}"
|
||||
)
|
||||
data[var_name] = None
|
||||
errors[var_name] = f"Read error: {str(read_error)}"
|
||||
failed_variables.append(var_name)
|
||||
|
||||
# Update cache with latest values and timestamp
|
||||
self.last_read_values[dataset_id] = data.copy()
|
||||
self.last_read_timestamps[dataset_id] = timestamp
|
||||
|
||||
# Update errors cache
|
||||
if errors:
|
||||
# 🚨 CRITICAL FIX: If ANY variable failed, return None to prevent CSV corruption
|
||||
# This ensures that incomplete data is never written to CSV
|
||||
if failed_variables:
|
||||
if self.logger:
|
||||
dataset_name = self.config_manager.datasets.get(dataset_id, {}).get('name', dataset_id)
|
||||
self.logger.warning(
|
||||
f"🚨 CRITICAL: Dataset '{dataset_name}' read failed - {len(failed_variables)} variables failed: {failed_variables}. "
|
||||
f"Skipping CSV write to prevent data corruption."
|
||||
)
|
||||
# Still update cache for monitoring purposes, but return None for CSV
|
||||
self.last_read_values[dataset_id] = data.copy()
|
||||
self.last_read_timestamps[dataset_id] = timestamp
|
||||
if dataset_id not in self.last_read_errors:
|
||||
self.last_read_errors[dataset_id] = {}
|
||||
self.last_read_errors[dataset_id].update(errors)
|
||||
elif dataset_id in self.last_read_errors:
|
||||
# Clear all errors if this read was completely successful
|
||||
if all(value is not None for value in data.values()):
|
||||
self.last_read_errors[dataset_id] = {}
|
||||
return None
|
||||
|
||||
# Update cache with latest values and timestamp ONLY if all reads succeeded
|
||||
self.last_read_values[dataset_id] = data.copy()
|
||||
self.last_read_timestamps[dataset_id] = timestamp
|
||||
|
||||
# Clear all errors if this read was completely successful
|
||||
if dataset_id in self.last_read_errors:
|
||||
self.last_read_errors[dataset_id] = {}
|
||||
|
||||
return data
|
||||
|
||||
|
@ -635,29 +804,31 @@ class DataStreamer:
|
|||
all_data = self.read_dataset_variables(dataset_id, dataset_variables)
|
||||
|
||||
read_time = time.time() - read_start
|
||||
read_success = bool(all_data)
|
||||
# 🚨 CRITICAL FIX: Proper validation - all_data is None if ANY variable failed
|
||||
read_success = all_data is not None
|
||||
|
||||
if all_data:
|
||||
if all_data is not None:
|
||||
consecutive_errors = 0
|
||||
|
||||
# 📝 CSV Recording: ALWAYS FIRST - HIGHEST PRIORITY with timing
|
||||
# 📝 CSV Recording: ALWAYS FIRST - HIGHEST PRIORITY with ASYNC BUFFERING
|
||||
if self.csv_recording_enabled:
|
||||
csv_start = time.time()
|
||||
try:
|
||||
self.write_dataset_csv_data(dataset_id, all_data)
|
||||
# 🚨 NEW: Use async buffering instead of immediate write
|
||||
self.buffer_dataset_csv_data(dataset_id, all_data)
|
||||
csv_write_time = time.time() - csv_start
|
||||
|
||||
# 📊 Record successful CSV write
|
||||
# 📊 Record successful CSV buffer operation (much faster)
|
||||
self.performance_monitor.record_csv_write(dataset_id, csv_write_time, success=True)
|
||||
|
||||
except Exception as csv_error:
|
||||
csv_write_time = time.time() - csv_start
|
||||
|
||||
# 📊 Record CSV write error
|
||||
# 📊 Record CSV buffer error
|
||||
self.performance_monitor.record_csv_write(dataset_id, csv_write_time, success=False)
|
||||
|
||||
if self.logger:
|
||||
self.logger.error(f"🚨 CSV WRITE ERROR for dataset '{dataset_info['name']}': {csv_error}")
|
||||
self.logger.error(f"🚨 CSV BUFFER ERROR for dataset '{dataset_info['name']}': {csv_error}")
|
||||
|
||||
# 📡 UDP Streaming: Lower priority - only if enabled
|
||||
if self.udp_streaming_enabled:
|
||||
|
@ -673,7 +844,14 @@ class DataStreamer:
|
|||
)
|
||||
|
||||
else:
|
||||
# 🚨 CRITICAL: All data read failed - increment error counter
|
||||
consecutive_errors += 1
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"🚨 CRITICAL: Dataset '{dataset_info['name']}' read completely failed. "
|
||||
f"No CSV data written to prevent corruption (consecutive errors: {consecutive_errors})"
|
||||
)
|
||||
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
self.event_logger.log_event(
|
||||
"error",
|
||||
|
@ -940,6 +1118,9 @@ class DataStreamer:
|
|||
|
||||
# 📊 START PERFORMANCE MONITORING
|
||||
self.performance_monitor.start_monitoring()
|
||||
|
||||
# 🚨 START ASYNC CSV FLUSH THREAD
|
||||
self.start_csv_flush_thread()
|
||||
|
||||
# Activate all datasets that have variables for CSV recording
|
||||
activated_count = 0
|
||||
|
@ -974,21 +1155,26 @@ class DataStreamer:
|
|||
self.event_logger.log_event(
|
||||
"info",
|
||||
"csv_recording_started",
|
||||
f"🔥 CRITICAL PRIORITY: CSV recording started with MAXIMUM PRIORITY and performance monitoring: {activated_count} datasets activated",
|
||||
f"🔥 CRITICAL PRIORITY: CSV recording started with MAXIMUM PRIORITY, async buffering, and performance monitoring: {activated_count} datasets activated",
|
||||
{
|
||||
"activated_datasets": activated_count,
|
||||
"total_datasets": len(self.config_manager.datasets),
|
||||
"priority": "CRITICAL",
|
||||
"recording_protection": True,
|
||||
"performance_monitoring": True
|
||||
"performance_monitoring": True,
|
||||
"async_csv_buffering": True,
|
||||
"csv_flush_interval": self.csv_flush_interval
|
||||
},
|
||||
)
|
||||
return True
|
||||
|
||||
def stop_csv_recording(self):
|
||||
"""🔥 CRITICAL: Stop CSV recording safely with performance monitoring"""
|
||||
"""🔥 CRITICAL: Stop CSV recording safely with performance monitoring and async flush"""
|
||||
self.csv_recording_enabled = False
|
||||
|
||||
# 🚨 STOP ASYNC CSV FLUSH THREAD AND FLUSH REMAINING DATA
|
||||
self.stop_csv_flush_thread()
|
||||
|
||||
# 🔥 DISABLE RECORDING PROTECTION MODE
|
||||
self.recording_protector.stop_recording_protection()
|
||||
|
||||
|
@ -1488,9 +1674,12 @@ class DataStreamer:
|
|||
# 1. Stop performance monitoring first
|
||||
self.performance_monitor.stop_monitoring()
|
||||
|
||||
# 2. Stop CSV recording first (graceful stop)
|
||||
# 2. Stop CSV recording first (graceful stop with buffer flush)
|
||||
if self.csv_recording_enabled:
|
||||
self.stop_csv_recording()
|
||||
else:
|
||||
# Ensure CSV flush thread is stopped even if recording wasn't active
|
||||
self.stop_csv_flush_thread()
|
||||
|
||||
# 3. Stop UDP streaming
|
||||
if self.udp_streaming_enabled:
|
||||
|
|
|
@ -55,7 +55,7 @@ const ChartjsPlot = ({ session, height = '400px' }) => {
|
|||
insertNaNOnNextIngest: false,
|
||||
isPaused: false,
|
||||
isRealTimeMode: true,
|
||||
refreshRate: 1000,
|
||||
refreshRate: 500,
|
||||
userOverrideUntil: 0,
|
||||
userPaused: false,
|
||||
sessionId: null
|
||||
|
@ -702,7 +702,7 @@ const ChartjsPlot = ({ session, height = '400px' }) => {
|
|||
realtime: {
|
||||
duration: (config.time_window || 60) * 1000,
|
||||
refresh: sessionDataRef.current.refreshRate,
|
||||
delay: 0,
|
||||
delay: sessionDataRef.current.refreshRate,
|
||||
frameRate: 30,
|
||||
pause: !session.is_active || session.is_paused,
|
||||
onRefresh: (chart) => {
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
{
|
||||
"last_state": {
|
||||
"should_connect": true,
|
||||
"should_connect": false,
|
||||
"should_stream": false,
|
||||
"active_datasets": [
|
||||
"Fast",
|
||||
"DAR"
|
||||
]
|
||||
"active_datasets": []
|
||||
},
|
||||
"auto_recovery_enabled": true,
|
||||
"last_update": "2025-08-17T10:48:18.548096"
|
||||
"last_update": "2025-08-17T11:41:21.785119"
|
||||
}
|
Loading…
Reference in New Issue