feat: Enhance application event logging, improve CSV handling, and add CSV validator utility
This commit is contained in:
parent
4f7b55bd0d
commit
e517f40a5d
|
@ -3726,8 +3726,210 @@
|
|||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:22:03.893270",
|
||||
"level": "info",
|
||||
"event_type": "application_started",
|
||||
"message": "Application initialization completed successfully",
|
||||
"details": {}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:22:03.973434",
|
||||
"level": "info",
|
||||
"event_type": "dataset_activated",
|
||||
"message": "Dataset activated: DAR",
|
||||
"details": {
|
||||
"dataset_id": "DAR",
|
||||
"variables_count": 2,
|
||||
"streaming_count": 2,
|
||||
"prefix": "gateway_phoenix"
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:22:03.978435",
|
||||
"level": "info",
|
||||
"event_type": "dataset_activated",
|
||||
"message": "Dataset activated: Fast",
|
||||
"details": {
|
||||
"dataset_id": "Fast",
|
||||
"variables_count": 2,
|
||||
"streaming_count": 1,
|
||||
"prefix": "fast"
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:22:03.984513",
|
||||
"level": "info",
|
||||
"event_type": "csv_recording_started",
|
||||
"message": "CSV recording started: 2 datasets activated",
|
||||
"details": {
|
||||
"activated_datasets": 2,
|
||||
"total_datasets": 3
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:22:03.990721",
|
||||
"level": "info",
|
||||
"event_type": "udp_streaming_started",
|
||||
"message": "UDP streaming to PlotJuggler started",
|
||||
"details": {
|
||||
"udp_host": "127.0.0.1",
|
||||
"udp_port": 9870,
|
||||
"datasets_available": 3
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:22:53.358065",
|
||||
"level": "info",
|
||||
"event_type": "plot_session_created",
|
||||
"message": "Plot session 'UR29' created and started",
|
||||
"details": {
|
||||
"session_id": "plot_1",
|
||||
"variables": [
|
||||
"UR29_Brix",
|
||||
"UR29_ma",
|
||||
"AUX Blink_1.0S"
|
||||
],
|
||||
"time_window": 20,
|
||||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:23:20.844836",
|
||||
"level": "info",
|
||||
"event_type": "plot_session_created",
|
||||
"message": "Plot session 'UR29' created and started",
|
||||
"details": {
|
||||
"session_id": "plot_1",
|
||||
"variables": [
|
||||
"UR29_Brix",
|
||||
"UR29_ma",
|
||||
"AUX Blink_1.0S"
|
||||
],
|
||||
"time_window": 20,
|
||||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:23:59.638322",
|
||||
"level": "info",
|
||||
"event_type": "udp_streaming_stopped",
|
||||
"message": "UDP streaming to PlotJuggler stopped (CSV recording continues)",
|
||||
"details": {}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:24:35.808418",
|
||||
"level": "info",
|
||||
"event_type": "plot_session_created",
|
||||
"message": "Plot session 'UR29' created and started",
|
||||
"details": {
|
||||
"session_id": "plot_1",
|
||||
"variables": [
|
||||
"UR29_Brix",
|
||||
"UR29_ma",
|
||||
"AUX Blink_1.0S"
|
||||
],
|
||||
"time_window": 20,
|
||||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:26:15.271771",
|
||||
"level": "info",
|
||||
"event_type": "application_started",
|
||||
"message": "Application initialization completed successfully",
|
||||
"details": {}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:26:15.365923",
|
||||
"level": "info",
|
||||
"event_type": "dataset_activated",
|
||||
"message": "Dataset activated: DAR",
|
||||
"details": {
|
||||
"dataset_id": "DAR",
|
||||
"variables_count": 2,
|
||||
"streaming_count": 2,
|
||||
"prefix": "gateway_phoenix"
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:26:15.372925",
|
||||
"level": "info",
|
||||
"event_type": "dataset_activated",
|
||||
"message": "Dataset activated: Fast",
|
||||
"details": {
|
||||
"dataset_id": "Fast",
|
||||
"variables_count": 2,
|
||||
"streaming_count": 1,
|
||||
"prefix": "fast"
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:26:15.377433",
|
||||
"level": "info",
|
||||
"event_type": "csv_recording_started",
|
||||
"message": "CSV recording started: 2 datasets activated",
|
||||
"details": {
|
||||
"activated_datasets": 2,
|
||||
"total_datasets": 3
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:26:31.108086",
|
||||
"level": "info",
|
||||
"event_type": "plot_session_created",
|
||||
"message": "Plot session 'UR29' created and started",
|
||||
"details": {
|
||||
"session_id": "plot_1",
|
||||
"variables": [
|
||||
"UR29_Brix",
|
||||
"UR29_ma",
|
||||
"AUX Blink_1.0S"
|
||||
],
|
||||
"time_window": 20,
|
||||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:27:24.396654",
|
||||
"level": "info",
|
||||
"event_type": "plot_session_created",
|
||||
"message": "Plot session 'UR29' created and started",
|
||||
"details": {
|
||||
"session_id": "plot_1",
|
||||
"variables": [
|
||||
"UR29_Brix",
|
||||
"UR29_ma",
|
||||
"AUX Blink_1.0S"
|
||||
],
|
||||
"time_window": 20,
|
||||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-15T00:27:40.211134",
|
||||
"level": "info",
|
||||
"event_type": "plot_session_created",
|
||||
"message": "Plot session 'UR29' created and started",
|
||||
"details": {
|
||||
"session_id": "plot_1",
|
||||
"variables": [
|
||||
"UR29_Brix",
|
||||
"UR29_ma",
|
||||
"AUX Blink_1.0S"
|
||||
],
|
||||
"time_window": 500,
|
||||
"trigger_variable": null,
|
||||
"auto_started": true
|
||||
}
|
||||
}
|
||||
],
|
||||
"last_updated": "2025-08-15T00:17:19.296417",
|
||||
"total_entries": 344
|
||||
"last_updated": "2025-08-15T00:27:40.211134",
|
||||
"total_entries": 360
|
||||
}
|
|
@ -7,7 +7,7 @@
|
|||
"point_hover_radius": 4,
|
||||
"point_radius": 0,
|
||||
"stepped": false,
|
||||
"time_window": 20,
|
||||
"time_window": 500,
|
||||
"trigger_enabled": false,
|
||||
"trigger_on_true": true,
|
||||
"trigger_variable": null,
|
||||
|
|
|
@ -146,6 +146,7 @@ const ChartjsPlot = ({ session, height = '400px' }) => {
|
|||
const loadHistoricalData = useCallback(async (variables, timeWindow) => {
|
||||
try {
|
||||
console.log(`📊 Loading historical data for ${variables.length} variables (${timeWindow}s window)...`);
|
||||
setIsLoadingHistorical(true);
|
||||
|
||||
const response = await fetch('/api/plots/historical', {
|
||||
method: 'POST',
|
||||
|
@ -160,15 +161,41 @@ const ChartjsPlot = ({ session, height = '400px' }) => {
|
|||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP ${response.status}`);
|
||||
const errorData = await response.json();
|
||||
throw new Error(`HTTP ${response.status}: ${errorData.error || 'Unknown error'}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
console.log(`📊 Historical data response:`, data);
|
||||
console.log(`📊 Historical data loaded:`, {
|
||||
totalPoints: data.total_points || 0,
|
||||
variablesFound: data.variables_found || [],
|
||||
variablesRequested: variables,
|
||||
timeRange: data.time_range,
|
||||
dataPreview: data.data?.slice(0, 3) // Show first 3 points
|
||||
});
|
||||
|
||||
// Show summary in console for debugging
|
||||
if (data.data && data.data.length > 0) {
|
||||
const variableStats = {};
|
||||
data.data.forEach(point => {
|
||||
if (!variableStats[point.variable]) {
|
||||
variableStats[point.variable] = { count: 0, min: point.value, max: point.value };
|
||||
}
|
||||
variableStats[point.variable].count++;
|
||||
variableStats[point.variable].min = Math.min(variableStats[point.variable].min, point.value);
|
||||
variableStats[point.variable].max = Math.max(variableStats[point.variable].max, point.value);
|
||||
});
|
||||
console.log(`📊 Variable statistics:`, variableStats);
|
||||
}
|
||||
|
||||
return data.data || [];
|
||||
} catch (error) {
|
||||
console.warn('⚠️ Failed to load historical data:', error);
|
||||
// Set error state to show user
|
||||
setError(`Historical data loading failed: ${error.message}`);
|
||||
return [];
|
||||
} finally {
|
||||
setIsLoadingHistorical(false);
|
||||
}
|
||||
}, []);
|
||||
|
||||
|
@ -1276,6 +1303,42 @@ const ChartjsPlot = ({ session, height = '400px' }) => {
|
|||
borderColor="gray.200"
|
||||
position="relative"
|
||||
>
|
||||
{/* Historical data loading indicator */}
|
||||
{isLoadingHistorical && (
|
||||
<Box
|
||||
position="absolute"
|
||||
top={2}
|
||||
right={2}
|
||||
bg="blue.500"
|
||||
color="white"
|
||||
px={2}
|
||||
py={1}
|
||||
borderRadius="md"
|
||||
fontSize="xs"
|
||||
zIndex={10}
|
||||
>
|
||||
📊 Loading historical data...
|
||||
</Box>
|
||||
)}
|
||||
|
||||
{/* Data points counter */}
|
||||
{dataPointsCount > 0 && (
|
||||
<Box
|
||||
position="absolute"
|
||||
top={2}
|
||||
left={2}
|
||||
bg="green.500"
|
||||
color="white"
|
||||
px={2}
|
||||
py={1}
|
||||
borderRadius="md"
|
||||
fontSize="xs"
|
||||
zIndex={10}
|
||||
>
|
||||
📈 {dataPointsCount} points
|
||||
</Box>
|
||||
)}
|
||||
|
||||
<canvas
|
||||
ref={canvasRef}
|
||||
style={{
|
||||
|
|
112
main.py
112
main.py
|
@ -1923,14 +1923,29 @@ def get_historical_data():
|
|||
try:
|
||||
print(f"🔍 DEBUG: Processing CSV file: {csv_file}")
|
||||
|
||||
# Read first line to check if any required variables are present
|
||||
with open(csv_file, "r") as f:
|
||||
header_line = f.readline().strip()
|
||||
if not header_line:
|
||||
print(f"🔍 DEBUG: Empty header in {csv_file}, skipping")
|
||||
# Read first line to check if any required variables are present with proper encoding handling
|
||||
header_line = None
|
||||
for encoding in ['utf-8', 'utf-8-sig', 'utf-16', 'latin-1']:
|
||||
try:
|
||||
with open(csv_file, "r", encoding=encoding) as f:
|
||||
header_line = f.readline().strip()
|
||||
if header_line:
|
||||
print(f"🔍 DEBUG: Successfully read header with {encoding} encoding")
|
||||
break
|
||||
except (UnicodeDecodeError, UnicodeError):
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not header_line:
|
||||
print(f"🔍 DEBUG: Could not read header from {csv_file}, skipping")
|
||||
continue
|
||||
|
||||
headers = [h.strip() for h in header_line.split(",")]
|
||||
# Clean header line from BOM and normalize
|
||||
header_line = header_line.replace('\ufeff', '').replace('\x00', '')
|
||||
headers = [h.strip().replace('\x00', '') for h in header_line.split(",")]
|
||||
# Clean any remaining unicode artifacts
|
||||
headers = [h for h in headers if h and len(h.replace('\x00', '').strip()) > 0]
|
||||
print(f"🔍 DEBUG: Headers in {csv_file}: {headers}")
|
||||
|
||||
# Check if any of our variables are in this file
|
||||
|
@ -1943,22 +1958,69 @@ def get_historical_data():
|
|||
)
|
||||
continue
|
||||
|
||||
# Read the CSV file
|
||||
# Read the CSV file with proper encoding and error handling
|
||||
print(f"🔍 DEBUG: Reading CSV file with pandas...")
|
||||
df = pd.read_csv(csv_file)
|
||||
print(f"🔍 DEBUG: CSV loaded, shape: {df.shape}")
|
||||
df = None
|
||||
for encoding in ['utf-8', 'utf-8-sig', 'utf-16', 'latin-1']:
|
||||
try:
|
||||
df = pd.read_csv(csv_file, encoding=encoding, on_bad_lines='skip')
|
||||
print(f"🔍 DEBUG: CSV loaded with {encoding}, shape: {df.shape}")
|
||||
break
|
||||
except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError):
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"🔍 DEBUG: Error reading with {encoding}: {e}")
|
||||
continue
|
||||
|
||||
if df is None:
|
||||
print(f"Warning: Could not read CSV file {csv_file} with any encoding")
|
||||
continue
|
||||
|
||||
# Clean column names from BOM and unicode artifacts
|
||||
df.columns = [col.replace('\ufeff', '').replace('\x00', '').strip() for col in df.columns]
|
||||
|
||||
if "timestamp" not in df.columns:
|
||||
print(f"🔍 DEBUG: No timestamp column in {csv_file}, skipping")
|
||||
# Convert timestamp to datetime with flexible parsing
|
||||
print(f"🔍 DEBUG: Converting timestamps...")
|
||||
timestamp_col = None
|
||||
for col in df.columns:
|
||||
if 'timestamp' in col.lower():
|
||||
timestamp_col = col
|
||||
break
|
||||
|
||||
if timestamp_col is None:
|
||||
print(f"🔍 DEBUG: No timestamp column found in {csv_file}, skipping")
|
||||
continue
|
||||
|
||||
try:
|
||||
# Try multiple timestamp formats
|
||||
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce')
|
||||
# Remove rows with invalid timestamps
|
||||
df = df.dropna(subset=[timestamp_col])
|
||||
|
||||
if df.empty:
|
||||
print(f"🔍 DEBUG: No valid timestamps in {csv_file}, skipping")
|
||||
continue
|
||||
|
||||
# Normalize column name to 'timestamp'
|
||||
if timestamp_col != 'timestamp':
|
||||
df = df.rename(columns={timestamp_col: 'timestamp'})
|
||||
|
||||
print(
|
||||
f"🔍 DEBUG: Timestamp range: {df['timestamp'].min()} to {df['timestamp'].max()}"
|
||||
)
|
||||
print(f"🔍 DEBUG: Filter range: {start_time} to {end_time}")
|
||||
except Exception as e:
|
||||
print(f"🔍 DEBUG: Timestamp conversion failed for {csv_file}: {e}")
|
||||
continue
|
||||
|
||||
# Convert timestamp to datetime
|
||||
print(f"🔍 DEBUG: Converting timestamps...")
|
||||
df["timestamp"] = pd.to_datetime(df["timestamp"])
|
||||
print(
|
||||
f"🔍 DEBUG: Timestamp range: {df['timestamp'].min()} to {df['timestamp'].max()}"
|
||||
)
|
||||
print(f"🔍 DEBUG: Filter range: {start_time} to {end_time}")
|
||||
# Recalculate matching variables after column cleaning
|
||||
clean_headers = list(df.columns)
|
||||
matching_vars = [var for var in variables if var in clean_headers]
|
||||
print(f"🔍 DEBUG: Matching variables after cleaning: {matching_vars}")
|
||||
|
||||
if not matching_vars:
|
||||
print(f"🔍 DEBUG: No matching variables after cleaning in {csv_file}, skipping")
|
||||
continue
|
||||
|
||||
# Filter by time range
|
||||
mask = (df["timestamp"] >= start_time) & (
|
||||
|
@ -1976,24 +2038,27 @@ def get_historical_data():
|
|||
for _, row in filtered_df.iterrows():
|
||||
timestamp = row["timestamp"]
|
||||
for var in matching_vars:
|
||||
if var in row:
|
||||
if var in row and pd.notna(row[var]):
|
||||
try:
|
||||
# Convert value to appropriate type
|
||||
value = row[var]
|
||||
if pd.isna(value):
|
||||
continue
|
||||
|
||||
# Handle boolean values
|
||||
if isinstance(value, str):
|
||||
if value.lower() == "true":
|
||||
value_lower = value.lower().strip()
|
||||
if value_lower == "true":
|
||||
value = True
|
||||
elif value.lower() == "false":
|
||||
elif value_lower == "false":
|
||||
value = False
|
||||
else:
|
||||
try:
|
||||
value = float(value)
|
||||
except ValueError:
|
||||
continue
|
||||
elif isinstance(value, (int, float)):
|
||||
value = float(value)
|
||||
else:
|
||||
continue
|
||||
|
||||
historical_data.append(
|
||||
{
|
||||
|
@ -2004,6 +2069,7 @@ def get_historical_data():
|
|||
)
|
||||
except Exception as e:
|
||||
# Skip invalid values
|
||||
print(f"🔍 DEBUG: Skipping invalid value for {var}: {e}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
{
|
||||
"last_state": {
|
||||
"should_connect": true,
|
||||
"should_stream": true,
|
||||
"should_stream": false,
|
||||
"active_datasets": [
|
||||
"DAR",
|
||||
"Test",
|
||||
"Fast"
|
||||
"Fast",
|
||||
"DAR"
|
||||
]
|
||||
},
|
||||
"auto_recovery_enabled": true,
|
||||
"last_update": "2025-08-15T00:17:13.675666"
|
||||
"last_update": "2025-08-15T00:26:15.383017"
|
||||
}
|
|
@ -0,0 +1,298 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
CSV Validator and Cleaner Utility
|
||||
|
||||
This utility helps diagnose and fix common CSV issues:
|
||||
1. Encoding problems (BOM, UTF-16, etc.)
|
||||
2. Inconsistent number of fields
|
||||
3. Malformed timestamps
|
||||
4. Invalid data types
|
||||
|
||||
Usage:
|
||||
python utils/csv_validator.py --scan records/
|
||||
python utils/csv_validator.py --fix records/15-08-2025/problematic_file.csv
|
||||
python utils/csv_validator.py --validate records/15-08-2025/
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import pandas as pd
|
||||
import glob
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class CSVValidator:
|
||||
"""Validates and cleans CSV files for the PLC streaming system"""
|
||||
|
||||
def __init__(self):
|
||||
self.encodings_to_try = ['utf-8', 'utf-8-sig', 'utf-16', 'latin-1', 'cp1252']
|
||||
self.issues_found = []
|
||||
self.fixed_files = []
|
||||
|
||||
def detect_encoding(self, file_path):
|
||||
"""Detect the encoding of a CSV file"""
|
||||
for encoding in self.encodings_to_try:
|
||||
try:
|
||||
with open(file_path, 'r', encoding=encoding) as f:
|
||||
f.read(1024) # Read first 1KB
|
||||
return encoding
|
||||
except (UnicodeDecodeError, UnicodeError):
|
||||
continue
|
||||
return None
|
||||
|
||||
def read_csv_headers(self, file_path):
|
||||
"""Read CSV headers with encoding detection"""
|
||||
encoding = self.detect_encoding(file_path)
|
||||
if not encoding:
|
||||
return None, None, "Could not detect encoding"
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding=encoding) as f:
|
||||
header_line = f.readline().strip()
|
||||
if not header_line:
|
||||
return None, encoding, "Empty header line"
|
||||
|
||||
# Clean header from BOM and unicode artifacts
|
||||
header_line = header_line.replace('\ufeff', '').replace('\x00', '')
|
||||
headers = [h.strip().replace('\x00', '') for h in header_line.split(',')]
|
||||
headers = [h for h in headers if h and len(h.strip()) > 0]
|
||||
|
||||
return headers, encoding, None
|
||||
except Exception as e:
|
||||
return None, encoding, str(e)
|
||||
|
||||
def validate_csv_structure(self, file_path):
|
||||
"""Validate CSV structure and detect issues"""
|
||||
issues = []
|
||||
|
||||
# Check headers
|
||||
headers, encoding, header_error = self.read_csv_headers(file_path)
|
||||
if header_error:
|
||||
issues.append({
|
||||
'type': 'header_error',
|
||||
'message': header_error,
|
||||
'file': file_path
|
||||
})
|
||||
return issues
|
||||
|
||||
if not headers:
|
||||
issues.append({
|
||||
'type': 'no_headers',
|
||||
'message': 'No valid headers found',
|
||||
'file': file_path
|
||||
})
|
||||
return issues
|
||||
|
||||
# Check for encoding issues in headers
|
||||
if any('\x00' in h or 'ÿþ' in h or '' in h for h in headers):
|
||||
issues.append({
|
||||
'type': 'encoding_artifacts',
|
||||
'message': f'Headers contain encoding artifacts: {headers}',
|
||||
'file': file_path,
|
||||
'encoding': encoding
|
||||
})
|
||||
|
||||
# Try to read full CSV with pandas
|
||||
try:
|
||||
df = None
|
||||
for enc in self.encodings_to_try:
|
||||
try:
|
||||
df = pd.read_csv(file_path, encoding=enc, on_bad_lines='skip')
|
||||
break
|
||||
except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError):
|
||||
continue
|
||||
|
||||
if df is None:
|
||||
issues.append({
|
||||
'type': 'read_error',
|
||||
'message': 'Could not read CSV with any encoding',
|
||||
'file': file_path
|
||||
})
|
||||
return issues
|
||||
|
||||
# Check for inconsistent columns
|
||||
expected_cols = len(headers)
|
||||
if len(df.columns) != expected_cols:
|
||||
issues.append({
|
||||
'type': 'column_mismatch',
|
||||
'message': f'Expected {expected_cols} columns, found {len(df.columns)}',
|
||||
'file': file_path,
|
||||
'expected_headers': headers,
|
||||
'actual_headers': list(df.columns)
|
||||
})
|
||||
|
||||
# Check for timestamp column
|
||||
timestamp_cols = [col for col in df.columns if 'timestamp' in col.lower()]
|
||||
if not timestamp_cols:
|
||||
issues.append({
|
||||
'type': 'no_timestamp',
|
||||
'message': 'No timestamp column found',
|
||||
'file': file_path,
|
||||
'columns': list(df.columns)
|
||||
})
|
||||
else:
|
||||
# Validate timestamp format
|
||||
timestamp_col = timestamp_cols[0]
|
||||
try:
|
||||
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce')
|
||||
invalid_timestamps = df[timestamp_col].isna().sum()
|
||||
if invalid_timestamps > 0:
|
||||
issues.append({
|
||||
'type': 'invalid_timestamps',
|
||||
'message': f'{invalid_timestamps} invalid timestamps found',
|
||||
'file': file_path,
|
||||
'timestamp_column': timestamp_col
|
||||
})
|
||||
except Exception as e:
|
||||
issues.append({
|
||||
'type': 'timestamp_parse_error',
|
||||
'message': f'Cannot parse timestamps: {str(e)}',
|
||||
'file': file_path,
|
||||
'timestamp_column': timestamp_col
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
issues.append({
|
||||
'type': 'general_error',
|
||||
'message': f'Error reading CSV: {str(e)}',
|
||||
'file': file_path
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def scan_directory(self, directory_path):
|
||||
"""Scan directory for CSV issues"""
|
||||
print(f"🔍 Scanning directory: {directory_path}")
|
||||
|
||||
csv_files = glob.glob(os.path.join(directory_path, "**/*.csv"), recursive=True)
|
||||
total_files = len(csv_files)
|
||||
|
||||
print(f"📁 Found {total_files} CSV files")
|
||||
|
||||
all_issues = []
|
||||
for i, csv_file in enumerate(csv_files, 1):
|
||||
print(f"📄 Checking {i}/{total_files}: {os.path.basename(csv_file)}")
|
||||
|
||||
issues = self.validate_csv_structure(csv_file)
|
||||
if issues:
|
||||
all_issues.extend(issues)
|
||||
print(f" ⚠️ {len(issues)} issues found")
|
||||
else:
|
||||
print(f" ✅ OK")
|
||||
|
||||
return all_issues
|
||||
|
||||
def fix_csv_file(self, file_path, backup=True):
|
||||
"""Fix a problematic CSV file"""
|
||||
print(f"🔧 Fixing CSV file: {file_path}")
|
||||
|
||||
if backup:
|
||||
backup_path = f"{file_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
import shutil
|
||||
shutil.copy2(file_path, backup_path)
|
||||
print(f"📋 Backup created: {backup_path}")
|
||||
|
||||
# Detect encoding and read file
|
||||
encoding = self.detect_encoding(file_path)
|
||||
if not encoding:
|
||||
print("❌ Could not detect encoding")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Read with detected encoding
|
||||
df = pd.read_csv(file_path, encoding=encoding, on_bad_lines='skip')
|
||||
|
||||
# Clean column names
|
||||
df.columns = [col.replace('\ufeff', '').replace('\x00', '').strip() for col in df.columns]
|
||||
|
||||
# Find timestamp column
|
||||
timestamp_cols = [col for col in df.columns if 'timestamp' in col.lower()]
|
||||
if timestamp_cols:
|
||||
timestamp_col = timestamp_cols[0]
|
||||
# Normalize to 'timestamp'
|
||||
if timestamp_col != 'timestamp':
|
||||
df = df.rename(columns={timestamp_col: 'timestamp'})
|
||||
|
||||
# Fix timestamps
|
||||
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
|
||||
# Remove rows with invalid timestamps
|
||||
df = df.dropna(subset=['timestamp'])
|
||||
|
||||
# Write fixed file with UTF-8 encoding
|
||||
df.to_csv(file_path, index=False, encoding='utf-8')
|
||||
print(f"✅ Fixed and saved with UTF-8 encoding")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error fixing file: {e}")
|
||||
return False
|
||||
|
||||
def print_issue_summary(self, issues):
|
||||
"""Print a summary of issues found"""
|
||||
if not issues:
|
||||
print("\n🎉 No issues found!")
|
||||
return
|
||||
|
||||
print(f"\n📊 Summary: {len(issues)} issues found")
|
||||
|
||||
issue_types = {}
|
||||
for issue in issues:
|
||||
issue_type = issue['type']
|
||||
if issue_type not in issue_types:
|
||||
issue_types[issue_type] = []
|
||||
issue_types[issue_type].append(issue)
|
||||
|
||||
for issue_type, type_issues in issue_types.items():
|
||||
print(f"\n🔸 {issue_type.replace('_', ' ').title()}: {len(type_issues)} files")
|
||||
for issue in type_issues[:5]: # Show first 5
|
||||
print(f" - {os.path.basename(issue['file'])}: {issue['message']}")
|
||||
if len(type_issues) > 5:
|
||||
print(f" ... and {len(type_issues) - 5} more")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='CSV Validator and Cleaner for PLC Streaming System')
|
||||
parser.add_argument('path', help='Path to CSV file or directory to process')
|
||||
parser.add_argument('--scan', action='store_true', help='Scan directory for issues (default)')
|
||||
parser.add_argument('--fix', action='store_true', help='Fix individual CSV file')
|
||||
parser.add_argument('--fix-all', action='store_true', help='Fix all problematic files in directory')
|
||||
parser.add_argument('--no-backup', action='store_true', help='Do not create backup when fixing')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
validator = CSVValidator()
|
||||
|
||||
if args.fix and os.path.isfile(args.path):
|
||||
# Fix single file
|
||||
success = validator.fix_csv_file(args.path, backup=not args.no_backup)
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
elif os.path.isdir(args.path):
|
||||
# Scan directory
|
||||
issues = validator.scan_directory(args.path)
|
||||
validator.print_issue_summary(issues)
|
||||
|
||||
if args.fix_all and issues:
|
||||
print(f"\n🔧 Fixing {len(set(issue['file'] for issue in issues))} problematic files...")
|
||||
|
||||
problematic_files = set(issue['file'] for issue in issues if issue['type'] != 'no_timestamp')
|
||||
fixed_count = 0
|
||||
|
||||
for file_path in problematic_files:
|
||||
if validator.fix_csv_file(file_path, backup=not args.no_backup):
|
||||
fixed_count += 1
|
||||
|
||||
print(f"✅ Fixed {fixed_count}/{len(problematic_files)} files")
|
||||
|
||||
sys.exit(1 if issues else 0)
|
||||
|
||||
else:
|
||||
print(f"❌ Path not found: {args.path}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue