feat: Implement CSV header validation and automatic renaming for dataset files

This commit is contained in:
Miguel 2025-08-14 22:33:39 +02:00
parent bd0e169757
commit 032959f491
8 changed files with 1034 additions and 25 deletions

View File

@ -1482,8 +1482,639 @@
"trigger_variable": null,
"auto_started": true
}
},
{
"timestamp": "2025-08-14T21:40:27.128935",
"level": "info",
"event_type": "plot_session_created",
"message": "Plot session 'UR29' created and started",
"details": {
"session_id": "plot_1",
"variables": [
"UR29_Brix",
"UR29_ma",
"AUX Blink_1.0S"
],
"time_window": 20,
"trigger_variable": null,
"auto_started": true
}
},
{
"timestamp": "2025-08-14T21:40:34.404349",
"level": "info",
"event_type": "plot_session_created",
"message": "Plot session 'UR29' created and started",
"details": {
"session_id": "plot_1",
"variables": [
"UR29_Brix",
"UR29_ma",
"AUX Blink_1.0S"
],
"time_window": 20,
"trigger_variable": null,
"auto_started": true
}
},
{
"timestamp": "2025-08-14T21:40:43.787236",
"level": "info",
"event_type": "plot_session_created",
"message": "Plot session 'UR29' created and started",
"details": {
"session_id": "plot_1",
"variables": [
"UR29_Brix",
"UR29_ma",
"AUX Blink_1.0S"
],
"time_window": 20,
"trigger_variable": null,
"auto_started": true
}
},
{
"timestamp": "2025-08-14T21:40:53.670324",
"level": "info",
"event_type": "plot_session_created",
"message": "Plot session 'UR29' created and started",
"details": {
"session_id": "plot_1",
"variables": [
"UR29_Brix",
"UR29_ma",
"AUX Blink_1.0S"
],
"time_window": 20,
"trigger_variable": null,
"auto_started": true
}
},
{
"timestamp": "2025-08-14T21:55:42.281211",
"level": "info",
"event_type": "csv_recording_stopped",
"message": "CSV recording stopped (dataset threads continue for UDP streaming)",
"details": {}
},
{
"timestamp": "2025-08-14T21:55:42.285178",
"level": "info",
"event_type": "udp_streaming_stopped",
"message": "UDP streaming to PlotJuggler stopped (CSV recording continues)",
"details": {}
},
{
"timestamp": "2025-08-14T21:55:42.289219",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: test",
"details": {
"dataset_id": "Test"
}
},
{
"timestamp": "2025-08-14T21:55:42.327691",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: Fast",
"details": {
"dataset_id": "Fast"
}
},
{
"timestamp": "2025-08-14T21:55:42.481025",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: DAR",
"details": {
"dataset_id": "DAR"
}
},
{
"timestamp": "2025-08-14T21:55:42.485065",
"level": "info",
"event_type": "plc_disconnection",
"message": "Disconnected from PLC 10.1.33.11 (stopped recording and streaming)",
"details": {}
},
{
"timestamp": "2025-08-14T21:56:24.835314",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: DAR",
"details": {
"dataset_id": "DAR",
"variables_count": 2,
"streaming_count": 2,
"prefix": "gateway_phoenix"
}
},
{
"timestamp": "2025-08-14T21:56:24.838909",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: Fast",
"details": {
"dataset_id": "Fast",
"variables_count": 1,
"streaming_count": 1,
"prefix": "fast"
}
},
{
"timestamp": "2025-08-14T21:56:24.842990",
"level": "info",
"event_type": "csv_recording_started",
"message": "CSV recording started: 2 datasets activated",
"details": {
"activated_datasets": 2,
"total_datasets": 3
}
},
{
"timestamp": "2025-08-14T21:56:24.845281",
"level": "info",
"event_type": "plc_connection",
"message": "Successfully connected to PLC 10.1.33.11 and auto-started CSV recording for 2 datasets",
"details": {
"ip": "10.1.33.11",
"rack": 0,
"slot": 2,
"symbols_path": "C:/Users/migue/Downloads/symSAE452.asc",
"auto_started_recording": true,
"recording_datasets": 2,
"dataset_names": [
"Fast",
"DAR"
]
}
},
{
"timestamp": "2025-08-14T21:56:57.393755",
"level": "info",
"event_type": "csv_recording_stopped",
"message": "CSV recording stopped (dataset threads continue for UDP streaming)",
"details": {}
},
{
"timestamp": "2025-08-14T21:56:57.396262",
"level": "info",
"event_type": "udp_streaming_stopped",
"message": "UDP streaming to PlotJuggler stopped (CSV recording continues)",
"details": {}
},
{
"timestamp": "2025-08-14T21:56:57.883359",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: Fast",
"details": {
"dataset_id": "Fast"
}
},
{
"timestamp": "2025-08-14T21:56:58.057431",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: DAR",
"details": {
"dataset_id": "DAR"
}
},
{
"timestamp": "2025-08-14T21:56:58.061430",
"level": "info",
"event_type": "plc_disconnection",
"message": "Disconnected from PLC 10.1.33.11 (stopped recording and streaming)",
"details": {}
},
{
"timestamp": "2025-08-14T21:57:06.030039",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-08-14T21:57:10.291305",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: DAR",
"details": {
"dataset_id": "DAR",
"variables_count": 2,
"streaming_count": 2,
"prefix": "gateway_phoenix"
}
},
{
"timestamp": "2025-08-14T21:57:10.295814",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: Fast",
"details": {
"dataset_id": "Fast",
"variables_count": 2,
"streaming_count": 1,
"prefix": "fast"
}
},
{
"timestamp": "2025-08-14T21:57:10.299816",
"level": "info",
"event_type": "csv_recording_started",
"message": "CSV recording started: 2 datasets activated",
"details": {
"activated_datasets": 2,
"total_datasets": 3
}
},
{
"timestamp": "2025-08-14T21:57:10.302320",
"level": "info",
"event_type": "plc_connection",
"message": "Successfully connected to PLC 10.1.33.11 and auto-started CSV recording for 3 datasets",
"details": {
"ip": "10.1.33.11",
"rack": 0,
"slot": 2,
"symbols_path": "C:/Users/migue/Downloads/symSAE452.asc",
"auto_started_recording": true,
"recording_datasets": 3,
"dataset_names": [
"Fast",
"DAR",
"test"
]
}
},
{
"timestamp": "2025-08-14T21:57:10.484412",
"level": "info",
"event_type": "csv_file_renamed",
"message": "CSV file renamed due to header mismatch for dataset 'Fast': fast_21.csv -> fast_to_21_57_10.csv",
"details": {
"dataset_id": "Fast",
"original_file": "records\\14-08-2025\\fast_21.csv",
"renamed_file": "records\\14-08-2025\\fast_to_21_57_10.csv",
"expected_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S"
],
"existing_headers": [
"timestamp",
"AUX Blink_1.0S"
],
"reason": "header_mismatch"
}
},
{
"timestamp": "2025-08-14T22:28:56.805851",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-08-14T22:28:56.932394",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: DAR",
"details": {
"dataset_id": "DAR",
"variables_count": 2,
"streaming_count": 2,
"prefix": "gateway_phoenix"
}
},
{
"timestamp": "2025-08-14T22:28:56.935393",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: Fast",
"details": {
"dataset_id": "Fast",
"variables_count": 3,
"streaming_count": 1,
"prefix": "fast"
}
},
{
"timestamp": "2025-08-14T22:28:56.938395",
"level": "info",
"event_type": "csv_recording_started",
"message": "CSV recording started: 2 datasets activated",
"details": {
"activated_datasets": 2,
"total_datasets": 3
}
},
{
"timestamp": "2025-08-14T22:28:57.166532",
"level": "info",
"event_type": "csv_file_renamed",
"message": "CSV file renamed due to header mismatch for dataset 'Fast': fast_22.csv -> fast_to_22_28_57.csv",
"details": {
"dataset_id": "Fast",
"original_file": "records\\14-08-2025\\fast_22.csv",
"renamed_file": "records\\14-08-2025\\fast_to_22_28_57.csv",
"expected_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S",
"AUX Blink_2.0S"
],
"existing_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S"
],
"reason": "header_mismatch"
}
},
{
"timestamp": "2025-08-14T22:29:09.720553",
"level": "info",
"event_type": "config_reload",
"message": "Dataset configuration reloaded from files",
"details": {
"datasets_count": 3,
"active_datasets_count": 3
}
},
{
"timestamp": "2025-08-14T22:29:18.107433",
"level": "info",
"event_type": "config_reload",
"message": "Dataset configuration reloaded from files",
"details": {
"datasets_count": 3,
"active_datasets_count": 3
}
},
{
"timestamp": "2025-08-14T22:29:34.913083",
"level": "info",
"event_type": "csv_recording_stopped",
"message": "CSV recording stopped (dataset threads continue for UDP streaming)",
"details": {}
},
{
"timestamp": "2025-08-14T22:29:34.916673",
"level": "info",
"event_type": "udp_streaming_stopped",
"message": "UDP streaming to PlotJuggler stopped (CSV recording continues)",
"details": {}
},
{
"timestamp": "2025-08-14T22:29:35.040978",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: Fast",
"details": {
"dataset_id": "Fast"
}
},
{
"timestamp": "2025-08-14T22:29:35.416658",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: DAR",
"details": {
"dataset_id": "DAR"
}
},
{
"timestamp": "2025-08-14T22:29:35.419648",
"level": "info",
"event_type": "dataset_deactivated",
"message": "Dataset deactivated: test",
"details": {
"dataset_id": "Test"
}
},
{
"timestamp": "2025-08-14T22:29:35.422649",
"level": "info",
"event_type": "plc_disconnection",
"message": "Disconnected from PLC 10.1.33.11 (stopped recording and streaming)",
"details": {}
},
{
"timestamp": "2025-08-14T22:29:37.405052",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: DAR",
"details": {
"dataset_id": "DAR",
"variables_count": 2,
"streaming_count": 2,
"prefix": "gateway_phoenix"
}
},
{
"timestamp": "2025-08-14T22:29:37.409057",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: Fast",
"details": {
"dataset_id": "Fast",
"variables_count": 2,
"streaming_count": 1,
"prefix": "fast"
}
},
{
"timestamp": "2025-08-14T22:29:37.411052",
"level": "info",
"event_type": "csv_recording_started",
"message": "CSV recording started: 2 datasets activated",
"details": {
"activated_datasets": 2,
"total_datasets": 3
}
},
{
"timestamp": "2025-08-14T22:29:37.414066",
"level": "info",
"event_type": "plc_connection",
"message": "Successfully connected to PLC 10.1.33.11 and auto-started CSV recording for 2 datasets",
"details": {
"ip": "10.1.33.11",
"rack": 0,
"slot": 2,
"symbols_path": "C:/Users/migue/Downloads/symSAE452.asc",
"auto_started_recording": true,
"recording_datasets": 2,
"dataset_names": [
"Fast",
"DAR"
]
}
},
{
"timestamp": "2025-08-14T22:29:37.623310",
"level": "info",
"event_type": "csv_file_renamed",
"message": "CSV file renamed due to header mismatch for dataset 'Fast': fast_22.csv -> fast_to_22_29_37.csv",
"details": {
"dataset_id": "Fast",
"original_file": "records\\14-08-2025\\fast_22.csv",
"renamed_file": "records\\14-08-2025\\fast_to_22_29_37.csv",
"expected_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S"
],
"existing_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S",
"AUX Blink_2.0S"
],
"reason": "header_mismatch"
}
},
{
"timestamp": "2025-08-14T22:30:22.434058",
"level": "info",
"event_type": "config_reload",
"message": "Dataset configuration reloaded from files",
"details": {
"datasets_count": 3,
"active_datasets_count": 3
}
},
{
"timestamp": "2025-08-14T22:32:00.333252",
"level": "info",
"event_type": "udp_streaming_started",
"message": "UDP streaming to PlotJuggler started",
"details": {
"udp_host": "127.0.0.1",
"udp_port": 9870,
"datasets_available": 3
}
},
{
"timestamp": "2025-08-14T22:33:00.673187",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-08-14T22:33:00.754407",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: DAR",
"details": {
"dataset_id": "DAR",
"variables_count": 2,
"streaming_count": 2,
"prefix": "gateway_phoenix"
}
},
{
"timestamp": "2025-08-14T22:33:00.757714",
"level": "info",
"event_type": "dataset_activated",
"message": "Dataset activated: Fast",
"details": {
"dataset_id": "Fast",
"variables_count": 3,
"streaming_count": 1,
"prefix": "fast"
}
},
{
"timestamp": "2025-08-14T22:33:00.760854",
"level": "info",
"event_type": "csv_recording_started",
"message": "CSV recording started: 2 datasets activated",
"details": {
"activated_datasets": 2,
"total_datasets": 3
}
},
{
"timestamp": "2025-08-14T22:33:00.764562",
"level": "info",
"event_type": "udp_streaming_started",
"message": "UDP streaming to PlotJuggler started",
"details": {
"udp_host": "127.0.0.1",
"udp_port": 9870,
"datasets_available": 3
}
},
{
"timestamp": "2025-08-14T22:33:01.026299",
"level": "info",
"event_type": "csv_file_renamed",
"message": "CSV file renamed due to header mismatch for dataset 'Fast': fast_22.csv -> fast_to_22_33_01.csv",
"details": {
"dataset_id": "Fast",
"original_file": "records\\14-08-2025\\fast_22.csv",
"renamed_file": "records\\14-08-2025\\fast_to_22_33_01.csv",
"expected_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S",
"AUX Blink_2.0S"
],
"existing_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S"
],
"reason": "header_mismatch"
}
},
{
"timestamp": "2025-08-14T22:33:16.675301",
"level": "warning",
"event_type": "csv_headers_mismatch_after_config_reload",
"message": "CSV header mismatches detected and resolved for 1 datasets after configuration reload",
"details": {
"mismatched_datasets": 1,
"total_validated": 2,
"details": [
{
"dataset_id": "Fast",
"dataset_name": "Fast",
"original_file": "records\\14-08-2025\\fast_22.csv",
"renamed_file": "records\\14-08-2025\\fast_to_22_33_16.csv",
"expected_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S"
],
"existing_headers": [
"timestamp",
"AUX Blink_1.0S",
"AUX Blink_1.6S",
"AUX Blink_2.0S"
]
}
]
}
},
{
"timestamp": "2025-08-14T22:33:16.680664",
"level": "info",
"event_type": "config_reload",
"message": "Dataset configuration reloaded from files with CSV header validation",
"details": {
"datasets_count": 3,
"active_datasets_count": 3,
"csv_recording_active": true
}
}
],
"last_updated": "2025-08-14T18:46:27.723966",
"total_entries": 154
"last_updated": "2025-08-14T22:33:16.680664",
"total_entries": 207
}

View File

@ -32,6 +32,13 @@
"streaming": true,
"symbol": "AUX Blink_1.0S",
"type": "real"
},
{
"configType": "symbol",
"area": "db",
"streaming": false,
"symbol": "AUX Blink_1.6S",
"type": "real"
}
]
}

View File

@ -2,11 +2,11 @@
"plots": [
{
"id": "plot_1",
"line_tension": 0,
"line_tension": 0.1,
"name": "UR29",
"point_hover_radius": 4,
"point_radius": 4,
"stepped": true,
"point_radius": 0,
"stepped": false,
"time_window": 20,
"trigger_enabled": false,
"trigger_on_true": true,

View File

@ -200,6 +200,126 @@ class PLCDataStreamer:
f"Disconnected from PLC {self.config_manager.plc_config['ip']} (stopped recording and streaming)",
)
# Configuration Management Methods
def reload_dataset_configuration(self):
"""Reload dataset configuration from JSON files and validate CSV headers"""
try:
self.config_manager.load_datasets()
self.config_manager.sync_streaming_variables()
# 🔍 NEW: Validate CSV headers for active datasets after configuration reload
self._validate_csv_headers_after_config_change()
self.event_logger.log_event(
"info",
"config_reload",
"Dataset configuration reloaded from files with CSV header validation",
{
"datasets_count": len(self.config_manager.datasets),
"active_datasets_count": len(self.config_manager.active_datasets),
"csv_recording_active": self.data_streamer.is_csv_recording(),
}
)
if self.logger:
self.logger.info("Dataset configuration reloaded successfully with CSV header validation")
except Exception as e:
self.event_logger.log_event(
"error",
"config_reload_failed",
f"Failed to reload dataset configuration: {str(e)}",
{"error": str(e)}
)
if self.logger:
self.logger.error(f"Failed to reload dataset configuration: {e}")
raise
def _validate_csv_headers_after_config_change(self):
"""Validate CSV headers for all active datasets after configuration changes"""
if not self.data_streamer.is_csv_recording():
if self.logger:
self.logger.debug("CSV recording not active, skipping header validation")
return
validated_datasets = []
header_mismatches = []
for dataset_id in self.config_manager.active_datasets:
try:
# Check if this dataset has an active CSV file
if dataset_id not in self.data_streamer.dataset_csv_files:
continue
# Get current CSV file path
csv_path = self.data_streamer.get_dataset_csv_file_path(dataset_id)
if not os.path.exists(csv_path):
continue
# Get expected headers based on current configuration
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
expected_headers = ["timestamp"] + list(dataset_variables.keys())
# Read existing headers from the file
existing_headers = self.data_streamer.read_csv_headers(csv_path)
# Compare headers
if existing_headers and not self.data_streamer.compare_headers(existing_headers, expected_headers):
# Header mismatch detected - close current file and rename it
if dataset_id in self.data_streamer.dataset_csv_files:
self.data_streamer.dataset_csv_files[dataset_id].close()
del self.data_streamer.dataset_csv_files[dataset_id]
del self.data_streamer.dataset_csv_writers[dataset_id]
# Rename the file with timestamp
prefix = self.config_manager.datasets[dataset_id]["prefix"]
renamed_path = self.data_streamer.rename_csv_file_with_timestamp(csv_path, prefix)
header_mismatches.append({
"dataset_id": dataset_id,
"dataset_name": self.config_manager.datasets[dataset_id]["name"],
"original_file": csv_path,
"renamed_file": renamed_path,
"expected_headers": expected_headers,
"existing_headers": existing_headers
})
# Create new file with correct headers (will be done on next write)
# The setup_dataset_csv_file method will handle creating the new file
if self.logger:
self.logger.info(
f"CSV header mismatch detected for dataset '{self.config_manager.datasets[dataset_id]['name']}' "
f"after configuration reload. File renamed: {os.path.basename(csv_path)} -> {os.path.basename(renamed_path)}"
)
validated_datasets.append({
"dataset_id": dataset_id,
"dataset_name": self.config_manager.datasets[dataset_id]["name"],
"headers_match": len(header_mismatches) == 0 or dataset_id not in [h["dataset_id"] for h in header_mismatches],
"expected_headers": expected_headers,
"existing_headers": existing_headers
})
except Exception as e:
if self.logger:
self.logger.warning(f"Error validating CSV headers for dataset {dataset_id}: {e}")
# Log summary of validation results
if header_mismatches:
self.event_logger.log_event(
"warning",
"csv_headers_mismatch_after_config_reload",
f"CSV header mismatches detected and resolved for {len(header_mismatches)} datasets after configuration reload",
{
"mismatched_datasets": len(header_mismatches),
"total_validated": len(validated_datasets),
"details": header_mismatches
}
)
else:
if validated_datasets and self.logger:
self.logger.info(f"CSV headers validated for {len(validated_datasets)} active datasets - all headers match")
# Configuration Methods
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Update PLC configuration"""
@ -598,6 +718,11 @@ class PLCDataStreamer:
"""Get streaming status (backward compatibility)"""
return self.data_streamer.is_streaming()
# Methods for backward compatibility
def load_datasets(self):
"""Load datasets (backward compatibility - delegates to reload_dataset_configuration)"""
self.reload_dataset_configuration()
# DEPRECATED: save_datasets() method removed
# Data is now saved directly from frontend via RJSF and API endpoints
# Use load_datasets() to reload configuration when needed
# Use reload_dataset_configuration() to reload configuration when needed

View File

@ -5,8 +5,9 @@ import threading
import csv
import os
import sys
import shutil
from datetime import datetime
from typing import Dict, Any, Optional, Set
from typing import Dict, Any, Optional, Set, List
from pathlib import Path
from .plot_manager import PlotManager
@ -33,6 +34,14 @@ class DataStreamer:
Each dataset thread handles both CSV writing and UDP streaming,
but UDP transmission is controlled by independent flag.
🔍 CSV HEADER VALIDATION
========================
Automatic header validation ensures CSV file integrity:
- When opening existing CSV files, headers are compared with current variable configuration
- If headers don't match, the existing file is renamed with format: prefix_to_HH_MM_SS.csv
- A new CSV file with correct headers is created using the original filename: prefix_HH.csv
- This prevents CSV corruption while preserving historical data
"""
def __init__(self, config_manager, plc_client, event_logger, logger=None):
@ -149,8 +158,55 @@ class DataStreamer:
directory = self.get_csv_directory_path()
return os.path.join(directory, filename)
def read_csv_headers(self, file_path: str) -> List[str]:
"""Read the header row from an existing CSV file"""
try:
with open(file_path, "r", newline="", encoding="utf-8") as file:
reader = csv.reader(file)
headers = next(reader, [])
return headers
except (IOError, StopIteration) as e:
if self.logger:
self.logger.warning(f"Could not read headers from {file_path}: {e}")
return []
def compare_headers(
self, existing_headers: List[str], new_headers: List[str]
) -> bool:
"""Compare two header lists for equality"""
return existing_headers == new_headers
def rename_csv_file_with_timestamp(self, original_path: str, prefix: str) -> str:
"""Rename CSV file with 'to' timestamp format"""
try:
directory = os.path.dirname(original_path)
timestamp = datetime.now().strftime("%H_%M_%S")
new_filename = f"{prefix}_to_{timestamp}.csv"
new_path = os.path.join(directory, new_filename)
# Ensure the new filename is unique
counter = 1
while os.path.exists(new_path):
new_filename = f"{prefix}_to_{timestamp}_{counter}.csv"
new_path = os.path.join(directory, new_filename)
counter += 1
shutil.move(original_path, new_path)
if self.logger:
self.logger.info(
f"Renamed CSV file due to header mismatch: {os.path.basename(original_path)} -> {os.path.basename(new_path)}"
)
return new_path
except Exception as e:
if self.logger:
self.logger.error(f"Error renaming CSV file {original_path}: {e}")
raise
def setup_dataset_csv_file(self, dataset_id: str):
"""Setup CSV file for a specific dataset"""
"""Setup CSV file for a specific dataset with header validation"""
current_hour = datetime.now().hour
# If we're using a modification file and the hour hasn't changed, keep using it
@ -176,9 +232,72 @@ class DataStreamer:
self.ensure_csv_directory()
csv_path = self.get_dataset_csv_file_path(dataset_id)
# Check if file exists to determine if we need headers
file_exists = os.path.exists(csv_path)
# Get current dataset variables and create expected headers
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
expected_headers = ["timestamp"] + list(dataset_variables.keys())
# Check if file exists and validate headers
file_exists = os.path.exists(csv_path)
need_to_rename_file = False
if file_exists and dataset_variables:
# Read existing headers from the file
existing_headers = self.read_csv_headers(csv_path)
# Compare headers - if they don't match, we need to rename the old file
if existing_headers and not self.compare_headers(
existing_headers, expected_headers
):
need_to_rename_file = True
prefix = self.config_manager.datasets[dataset_id]["prefix"]
if self.logger:
self.logger.info(
f"Header mismatch detected in CSV file for dataset '{self.config_manager.datasets[dataset_id]['name']}'. "
f"Expected: {expected_headers}, Found: {existing_headers}"
)
# Rename the existing file if headers don't match
if need_to_rename_file:
try:
prefix = self.config_manager.datasets[dataset_id]["prefix"]
renamed_path = self.rename_csv_file_with_timestamp(csv_path, prefix)
# Log the rename event
self.event_logger.log_event(
"info",
"csv_file_renamed",
f"CSV file renamed due to header mismatch for dataset '{self.config_manager.datasets[dataset_id]['name']}': "
f"{os.path.basename(csv_path)} -> {os.path.basename(renamed_path)}",
{
"dataset_id": dataset_id,
"original_file": csv_path,
"renamed_file": renamed_path,
"expected_headers": expected_headers,
"existing_headers": existing_headers,
"reason": "header_mismatch",
},
)
# File no longer exists after rename, so we'll create a new one
file_exists = False
except Exception as e:
if self.logger:
self.logger.error(f"Failed to rename CSV file {csv_path}: {e}")
# Continue with the existing file despite the header mismatch
self.event_logger.log_event(
"error",
"csv_file_rename_failed",
f"Failed to rename CSV file for dataset '{self.config_manager.datasets[dataset_id]['name']}': {str(e)}",
{
"dataset_id": dataset_id,
"file_path": csv_path,
"error": str(e),
},
)
# Open the file for appending (or creating if it doesn't exist)
self.dataset_csv_files[dataset_id] = open(
csv_path, "a", newline="", encoding="utf-8"
)
@ -190,16 +309,19 @@ class DataStreamer:
# Reset modification file flag when creating regular hourly file
self.dataset_using_modification_files[dataset_id] = False
# Write headers if it's a new file
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
if not file_exists and dataset_variables:
headers = ["timestamp"] + list(dataset_variables.keys())
self.dataset_csv_writers[dataset_id].writerow(headers)
# Write headers if it's a new file or if the file was renamed
if (not file_exists or need_to_rename_file) and dataset_variables:
self.dataset_csv_writers[dataset_id].writerow(expected_headers)
self.dataset_csv_files[dataset_id].flush()
if self.logger:
action = (
"recreated with correct headers"
if need_to_rename_file
else "created"
)
self.logger.info(
f"CSV file created for dataset '{self.config_manager.datasets[dataset_id]['name']}': {csv_path}"
f"CSV file {action} for dataset '{self.config_manager.datasets[dataset_id]['name']}': {csv_path}"
)
def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]):

31
main.py
View File

@ -208,13 +208,32 @@ def write_config(config_id):
# Write the data
json_manager.write_json(config_id, payload)
# Notify backend to reload if it's PLC config
if config_id == "plc" and streamer:
# Automatically reload backend configuration for specific config types
if streamer:
try:
streamer.config_manager.load_configuration()
if config_id == "plc":
streamer.config_manager.load_configuration()
elif config_id in ["dataset-definitions", "dataset-variables"]:
# Reload dataset configuration to pick up changes
streamer.reload_dataset_configuration()
if streamer.logger:
streamer.logger.info(
f"Auto-reloaded backend configuration after updating {config_id}"
)
elif config_id in ["plot-definitions", "plot-variables"]:
# Plot configurations don't need backend reload currently
pass
except Exception as e:
# Log the error but don't fail the save operation
print(f"Warning: Could not reload config in backend: {e}")
if streamer and streamer.logger:
streamer.logger.warning(
f"Could not auto-reload {config_id} config in backend: {e}"
)
else:
print(
f"Warning: Could not auto-reload {config_id} config in backend: {e}"
)
return jsonify(
{
@ -260,8 +279,8 @@ def reload_config(config_id):
if config_id == "plc":
streamer.config_manager.load_configuration()
elif config_id in ["dataset-definitions", "dataset-variables"]:
# Reload dataset configuration
streamer.load_datasets()
# Reload dataset configuration using the new method
streamer.reload_dataset_configuration()
elif config_id in ["plot-definitions", "plot-variables"]:
# Reload plot configuration if needed
pass

View File

@ -3,11 +3,11 @@
"should_connect": true,
"should_stream": true,
"active_datasets": [
"Test",
"DAR",
"Fast",
"DAR"
"Test"
]
},
"auto_recovery_enabled": true,
"last_update": "2025-08-14T18:46:16.622461"
"last_update": "2025-08-14T22:33:00.768192"
}

105
test_header_validation.py Normal file
View File

@ -0,0 +1,105 @@
"""
Test script for CSV header validation functionality
"""
import csv
import os
import tempfile
import shutil
from datetime import datetime
def test_header_validation():
"""Test the header validation logic without full system dependencies"""
# Create a temporary test directory
test_dir = tempfile.mkdtemp()
try:
# Test 1: Create a CSV file with old headers
old_csv_path = os.path.join(test_dir, "test_data_14.csv")
old_headers = ["timestamp", "var1", "var2"]
with open(old_csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(old_headers)
writer.writerow(["2025-08-14 12:00:00", "100", "200"])
writer.writerow(["2025-08-14 12:00:01", "101", "201"])
print(f"✅ Created test CSV file: {old_csv_path}")
# Test 2: Read headers function
def read_csv_headers(file_path):
try:
with open(file_path, "r", newline="", encoding="utf-8") as file:
reader = csv.reader(file)
headers = next(reader, [])
return headers
except (IOError, StopIteration) as e:
print(f"Could not read headers from {file_path}: {e}")
return []
# Test 3: Compare headers function
def compare_headers(existing_headers, new_headers):
return existing_headers == new_headers
# Test 4: Rename file function
def rename_csv_file_with_timestamp(original_path, prefix):
directory = os.path.dirname(original_path)
timestamp = datetime.now().strftime("%H_%M_%S")
new_filename = f"{prefix}_to_{timestamp}.csv"
new_path = os.path.join(directory, new_filename)
# Ensure the new filename is unique
counter = 1
while os.path.exists(new_path):
new_filename = f"{prefix}_to_{timestamp}_{counter}.csv"
new_path = os.path.join(directory, new_filename)
counter += 1
shutil.move(original_path, new_path)
return new_path
# Test the functions
existing_headers = read_csv_headers(old_csv_path)
new_headers = ["timestamp", "var1", "var2", "var3"] # Different headers
print(f"Existing headers: {existing_headers}")
print(f"New headers: {new_headers}")
print(f"Headers match: {compare_headers(existing_headers, new_headers)}")
# Test header mismatch scenario
if not compare_headers(existing_headers, new_headers):
print("❌ Header mismatch detected! Renaming file...")
renamed_path = rename_csv_file_with_timestamp(old_csv_path, "test_data")
print(f"✅ File renamed to: {os.path.basename(renamed_path)}")
# Create new file with correct headers
new_csv_path = old_csv_path # Same original path
with open(new_csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(new_headers)
writer.writerow(["2025-08-14 12:00:02", "102", "202", "302"])
print(
f"✅ Created new CSV file with correct headers: {os.path.basename(new_csv_path)}"
)
# Verify the files
print(f"\nFiles in test directory:")
for file in os.listdir(test_dir):
if file.endswith(".csv"):
file_path = os.path.join(test_dir, file)
headers = read_csv_headers(file_path)
print(f" {file}: {headers}")
print("\n✅ All tests passed!")
finally:
# Clean up
shutil.rmtree(test_dir)
print(f"🧹 Cleaned up test directory: {test_dir}")
if __name__ == "__main__":
test_header_validation()