feat: Implement historical data caching mechanism with buffer zones for improved performance and efficiency in data retrieval
This commit is contained in:
parent
11d1d2ad81
commit
ae1fc0508d
|
@ -8212,8 +8212,15 @@
|
|||
"event_type": "application_started",
|
||||
"message": "Application initialization completed successfully",
|
||||
"details": {}
|
||||
},
|
||||
{
|
||||
"timestamp": "2025-08-16T10:01:50.009210",
|
||||
"level": "info",
|
||||
"event_type": "application_started",
|
||||
"message": "Application initialization completed successfully",
|
||||
"details": {}
|
||||
}
|
||||
],
|
||||
"last_updated": "2025-08-16T09:30:47.754146",
|
||||
"total_entries": 684
|
||||
"last_updated": "2025-08-16T10:01:50.009210",
|
||||
"total_entries": 685
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
"""
|
||||
Historical Data Cache Manager
|
||||
Provides efficient caching of CSV data for historical plot requests with buffer zones
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import pandas as pd
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
import threading
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
class HistoricalDataCache:
|
||||
"""
|
||||
Cache manager for historical CSV data with the following features:
|
||||
- 25% buffer around requested time ranges
|
||||
- Memory-efficient storage using pandas DataFrames
|
||||
- Automatic cache invalidation based on file modification times
|
||||
- Thread-safe operations
|
||||
- LRU eviction when memory limit is reached
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
logger: logging.Logger,
|
||||
max_cache_entries: int = 50,
|
||||
buffer_percentage: float = 0.25,
|
||||
):
|
||||
self.logger = logger
|
||||
self.max_cache_entries = max_cache_entries
|
||||
self.buffer_percentage = buffer_percentage
|
||||
|
||||
# Cache structure: {cache_key: CacheEntry}
|
||||
self.cache = OrderedDict()
|
||||
self.cache_lock = threading.RLock()
|
||||
|
||||
# File modification time tracking
|
||||
self.file_mtimes = {}
|
||||
|
||||
self.logger.info(
|
||||
f"Historical data cache initialized with {max_cache_entries} max entries and {buffer_percentage*100}% buffer"
|
||||
)
|
||||
|
||||
def _generate_cache_key(
|
||||
self, variables: List[str], start_time: datetime, end_time: datetime
|
||||
) -> str:
|
||||
"""Generate a unique cache key for the given parameters"""
|
||||
variables_key = "_".join(sorted(variables))
|
||||
time_key = f"{start_time.isoformat()}_{end_time.isoformat()}"
|
||||
return f"{variables_key}|{time_key}"
|
||||
|
||||
def _calculate_buffer_range(
|
||||
self, start_time: datetime, end_time: datetime
|
||||
) -> Tuple[datetime, datetime]:
|
||||
"""Calculate buffered time range with 25% extension on both sides"""
|
||||
duration = end_time - start_time
|
||||
buffer_duration = duration * self.buffer_percentage
|
||||
|
||||
buffered_start = start_time - buffer_duration
|
||||
buffered_end = end_time + buffer_duration
|
||||
|
||||
return buffered_start, buffered_end
|
||||
|
||||
def _check_file_modifications(self, csv_files: List[str]) -> bool:
|
||||
"""Check if any CSV files have been modified since last cache"""
|
||||
for file_path in csv_files:
|
||||
if not os.path.exists(file_path):
|
||||
continue
|
||||
|
||||
current_mtime = os.path.getmtime(file_path)
|
||||
if (
|
||||
file_path not in self.file_mtimes
|
||||
or self.file_mtimes[file_path] != current_mtime
|
||||
):
|
||||
self.file_mtimes[file_path] = current_mtime
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _find_overlapping_cache(
|
||||
self, variables: List[str], start_time: datetime, end_time: datetime
|
||||
) -> Optional[str]:
|
||||
"""Find existing cache entry that can satisfy the request"""
|
||||
with self.cache_lock:
|
||||
for cache_key, cache_entry in self.cache.items():
|
||||
# Check if variables match
|
||||
if set(cache_entry["variables"]) != set(variables):
|
||||
continue
|
||||
|
||||
# Check if time range is covered by cached range
|
||||
cached_start = cache_entry["time_range"]["start"]
|
||||
cached_end = cache_entry["time_range"]["end"]
|
||||
|
||||
if cached_start <= start_time and cached_end >= end_time:
|
||||
self.logger.debug(f"Found overlapping cache entry: {cache_key}")
|
||||
return cache_key
|
||||
|
||||
return None
|
||||
|
||||
def _filter_data_by_time_range(
|
||||
self, data: pd.DataFrame, start_time: datetime, end_time: datetime
|
||||
) -> pd.DataFrame:
|
||||
"""Filter DataFrame to the requested time range"""
|
||||
if data.empty:
|
||||
return data
|
||||
|
||||
# Convert timestamp column to datetime if it's not already
|
||||
if "timestamp" in data.columns:
|
||||
if not pd.api.types.is_datetime64_any_dtype(data["timestamp"]):
|
||||
data["timestamp"] = pd.to_datetime(data["timestamp"])
|
||||
|
||||
# Filter by time range
|
||||
mask = (data["timestamp"] >= start_time) & (data["timestamp"] <= end_time)
|
||||
return data[mask].copy()
|
||||
|
||||
return data
|
||||
|
||||
def get_cached_data(
|
||||
self,
|
||||
variables: List[str],
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
csv_files: List[str],
|
||||
) -> Optional[pd.DataFrame]:
|
||||
"""
|
||||
Retrieve data from cache if available and valid
|
||||
Returns None if cache miss or invalid
|
||||
"""
|
||||
try:
|
||||
# Check if files have been modified
|
||||
if self._check_file_modifications(csv_files):
|
||||
self.logger.debug("CSV files modified, invalidating cache")
|
||||
self.clear_cache()
|
||||
return None
|
||||
|
||||
# Look for overlapping cache entry
|
||||
cache_key = self._find_overlapping_cache(variables, start_time, end_time)
|
||||
if not cache_key:
|
||||
return None
|
||||
|
||||
with self.cache_lock:
|
||||
cache_entry = self.cache[cache_key]
|
||||
|
||||
# Move to end for LRU
|
||||
self.cache.move_to_end(cache_key)
|
||||
|
||||
# Filter data to requested time range
|
||||
filtered_data = self._filter_data_by_time_range(
|
||||
cache_entry["data"], start_time, end_time
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f"Cache hit: {len(filtered_data)} points returned from cache"
|
||||
)
|
||||
return filtered_data
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error retrieving cached data: {e}")
|
||||
return None
|
||||
|
||||
def store_data(
|
||||
self,
|
||||
variables: List[str],
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
data: pd.DataFrame,
|
||||
):
|
||||
"""
|
||||
Store data in cache with buffered time range
|
||||
"""
|
||||
try:
|
||||
# Calculate buffered range for cache storage
|
||||
buffered_start, buffered_end = self._calculate_buffer_range(
|
||||
start_time, end_time
|
||||
)
|
||||
|
||||
cache_key = self._generate_cache_key(
|
||||
variables, buffered_start, buffered_end
|
||||
)
|
||||
|
||||
cache_entry = {
|
||||
"variables": variables.copy(),
|
||||
"time_range": {"start": buffered_start, "end": buffered_end},
|
||||
"original_request": {"start": start_time, "end": end_time},
|
||||
"data": data.copy(),
|
||||
"cached_at": datetime.now(),
|
||||
"size_bytes": data.memory_usage(deep=True).sum(),
|
||||
}
|
||||
|
||||
with self.cache_lock:
|
||||
# Remove oldest entries if cache is full
|
||||
while len(self.cache) >= self.max_cache_entries:
|
||||
oldest_key = next(iter(self.cache))
|
||||
removed_entry = self.cache.pop(oldest_key)
|
||||
self.logger.debug(f"Evicted cache entry: {oldest_key}")
|
||||
|
||||
# Store new entry
|
||||
self.cache[cache_key] = cache_entry
|
||||
|
||||
total_size = sum(entry["size_bytes"] for entry in self.cache.values())
|
||||
self.logger.info(
|
||||
f"Cached data for {len(variables)} variables, "
|
||||
f"{len(data)} points, buffered range: {buffered_start} to {buffered_end}"
|
||||
)
|
||||
self.logger.debug(
|
||||
f"Cache stats: {len(self.cache)} entries, {total_size/1024/1024:.2f} MB"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error storing data in cache: {e}")
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clear all cached data"""
|
||||
with self.cache_lock:
|
||||
self.cache.clear()
|
||||
self.file_mtimes.clear()
|
||||
self.logger.info("Cache cleared")
|
||||
|
||||
def get_cache_stats(self) -> Dict:
|
||||
"""Get cache statistics for monitoring"""
|
||||
with self.cache_lock:
|
||||
total_size = sum(entry["size_bytes"] for entry in self.cache.values())
|
||||
return {
|
||||
"entries": len(self.cache),
|
||||
"total_size_mb": total_size / 1024 / 1024,
|
||||
"max_entries": self.max_cache_entries,
|
||||
"buffer_percentage": self.buffer_percentage * 100,
|
||||
}
|
|
@ -106,17 +106,39 @@ export default function PlotHistoricalSession({
|
|||
const toast = useToast()
|
||||
const { isOpen: isConfigModalOpen, onOpen: onConfigModalOpen, onClose: onConfigModalClose } = useDisclosure()
|
||||
|
||||
// Keep track of the last loaded data range for optimization
|
||||
const [loadedDataRange, setLoadedDataRange] = useState(null)
|
||||
|
||||
// Load historical data on component mount and when time range changes
|
||||
useEffect(() => {
|
||||
loadHistoricalData()
|
||||
}, [session.id, localTimeRange])
|
||||
|
||||
const loadHistoricalData = async () => {
|
||||
// Function to check if a range is contained within another range
|
||||
const isRangeContained = (newRange, existingRange) => {
|
||||
if (!existingRange || !newRange) return false
|
||||
const newStart = new Date(newRange.start).getTime()
|
||||
const newEnd = new Date(newRange.end).getTime()
|
||||
const existingStart = new Date(existingRange.start).getTime()
|
||||
const existingEnd = new Date(existingRange.end).getTime()
|
||||
|
||||
return newStart >= existingStart && newEnd <= existingEnd
|
||||
}
|
||||
|
||||
const loadHistoricalData = async (forceReload = false) => {
|
||||
if (!session.variables || session.variables.length === 0) {
|
||||
setError('No variables defined for this plot')
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the new range is contained within the previously loaded range
|
||||
if (!forceReload && loadedDataRange && isRangeContained(localTimeRange, loadedDataRange)) {
|
||||
console.log('📊 Zoom optimization: New range is contained within loaded data, skipping reload')
|
||||
console.log('📊 New range:', localTimeRange)
|
||||
console.log('📊 Loaded range:', loadedDataRange)
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
setIsLoading(true)
|
||||
setError(null)
|
||||
|
@ -136,6 +158,7 @@ export default function PlotHistoricalSession({
|
|||
|
||||
console.log('📊 Loading historical data for session:', session.id)
|
||||
console.log('📊 Request data:', requestData)
|
||||
console.log('📊 Force reload:', forceReload)
|
||||
|
||||
// Simulate progress
|
||||
const progressInterval = setInterval(() => {
|
||||
|
@ -152,6 +175,13 @@ export default function PlotHistoricalSession({
|
|||
|
||||
if (response.data) {
|
||||
setHistoricalData(response.data)
|
||||
|
||||
// Update the loaded data range for optimization
|
||||
setLoadedDataRange({
|
||||
start: new Date(localTimeRange.start),
|
||||
end: new Date(localTimeRange.end)
|
||||
})
|
||||
|
||||
setDataStats({
|
||||
totalPoints: response.total_points || response.data.length,
|
||||
variablesFound: response.variables_found || [],
|
||||
|
@ -162,7 +192,8 @@ export default function PlotHistoricalSession({
|
|||
console.log('📊 Historical data loaded:', {
|
||||
points: response.data.length,
|
||||
variables: response.variables_found,
|
||||
loadTime
|
||||
loadTime,
|
||||
loadedRange: localTimeRange
|
||||
})
|
||||
|
||||
toast({
|
||||
|
@ -206,13 +237,26 @@ export default function PlotHistoricalSession({
|
|||
}
|
||||
|
||||
const handleZoomToTimeRange = (start, end) => {
|
||||
console.log('📊 Zoom event - loading data for range:', { start, end })
|
||||
setLocalTimeRange({ start: new Date(start), end: new Date(end) })
|
||||
console.log('📊 Zoom event - evaluating range:', { start, end })
|
||||
const newRange = { start: new Date(start), end: new Date(end) }
|
||||
|
||||
// Check if the new range is contained within the loaded data
|
||||
if (loadedDataRange && isRangeContained(newRange, loadedDataRange)) {
|
||||
console.log('📊 Zoom optimization: Range contained in loaded data, skipping reload')
|
||||
setLocalTimeRange(newRange)
|
||||
} else {
|
||||
console.log('📊 Zoom requires data reload - new range outside loaded data')
|
||||
setLocalTimeRange(newRange)
|
||||
}
|
||||
}
|
||||
|
||||
const handlePanToTimeRange = (start, end) => {
|
||||
console.log('📊 Pan event - loading data for range:', { start, end })
|
||||
setLocalTimeRange({ start: new Date(start), end: new Date(end) })
|
||||
console.log('📊 Pan event - evaluating range:', { start, end })
|
||||
const newRange = { start: new Date(start), end: new Date(end) }
|
||||
|
||||
// Pan always requires checking if we need new data
|
||||
console.log('📊 Pan event - loading data for range:', newRange)
|
||||
setLocalTimeRange(newRange)
|
||||
}
|
||||
|
||||
// Color mode
|
||||
|
@ -276,7 +320,7 @@ export default function PlotHistoricalSession({
|
|||
icon={<RepeatIcon />}
|
||||
size="sm"
|
||||
variant="ghost"
|
||||
onClick={loadHistoricalData}
|
||||
onClick={() => loadHistoricalData(true)}
|
||||
isLoading={isLoading}
|
||||
/>
|
||||
</Tooltip>
|
||||
|
|
430
main.py
430
main.py
|
@ -21,6 +21,7 @@ except ImportError:
|
|||
TKINTER_AVAILABLE = False
|
||||
print("Warning: tkinter not available. File browse functionality will be limited.")
|
||||
from core import PLCDataStreamer
|
||||
from core.historical_cache import HistoricalDataCache
|
||||
from utils.json_manager import JSONManager, SchemaManager
|
||||
from utils.symbol_loader import SymbolLoader
|
||||
from utils.symbol_processor import SymbolProcessor
|
||||
|
@ -64,6 +65,7 @@ def project_path(*parts: str) -> str:
|
|||
|
||||
# Global instances
|
||||
streamer = None
|
||||
historical_cache = None
|
||||
json_manager = JSONManager()
|
||||
schema_manager = SchemaManager()
|
||||
|
||||
|
@ -1838,7 +1840,9 @@ def get_plot_variables():
|
|||
|
||||
@app.route("/api/plots/historical", methods=["POST"])
|
||||
def get_historical_data():
|
||||
"""Get historical data from CSV files for plot initialization"""
|
||||
"""Get historical data from CSV files for plot initialization with caching"""
|
||||
global historical_cache
|
||||
|
||||
print("🔍 DEBUG: Historical endpoint called")
|
||||
try:
|
||||
data = request.get_json()
|
||||
|
@ -1851,7 +1855,7 @@ def get_historical_data():
|
|||
variables = data.get("variables", [])
|
||||
time_window_seconds = data.get("time_window", 60)
|
||||
|
||||
# NEW: Support for explicit start_time and end_time parameters
|
||||
# Support for explicit start_time and end_time parameters
|
||||
start_time_param = data.get("start_time")
|
||||
end_time_param = data.get("end_time")
|
||||
|
||||
|
@ -1873,33 +1877,29 @@ def get_historical_data():
|
|||
variables = valid_variables
|
||||
print(f"🔍 DEBUG: Valid variables after filtering: {variables}")
|
||||
|
||||
# Import pandas and glob (datetime already imported globally)
|
||||
# Import required modules
|
||||
try:
|
||||
print("🔍 DEBUG: Importing pandas...")
|
||||
print("🔍 DEBUG: Importing modules...")
|
||||
import pandas as pd
|
||||
|
||||
print("🔍 DEBUG: Importing glob...")
|
||||
import glob
|
||||
|
||||
print("🔍 DEBUG: Importing timedelta...")
|
||||
from datetime import timedelta
|
||||
|
||||
print("🔍 DEBUG: All imports successful")
|
||||
except ImportError as e:
|
||||
print(f"❌ DEBUG: Import failed: {e}")
|
||||
return jsonify({"error": f"pandas import failed: {str(e)}"}), 500
|
||||
except Exception as e:
|
||||
print(f"❌ DEBUG: Unexpected import error: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return jsonify({"error": f"Import error: {str(e)}"}), 500
|
||||
# Initialize cache if not exists
|
||||
if historical_cache is None:
|
||||
historical_cache = HistoricalDataCache(
|
||||
logger=streamer.logger if streamer and streamer.logger else None,
|
||||
max_cache_entries=50,
|
||||
buffer_percentage=0.25
|
||||
)
|
||||
|
||||
# Calculate time range
|
||||
try:
|
||||
print("🔍 DEBUG: Calculating time range...")
|
||||
|
||||
# NEW: Use explicit start/end times if provided, otherwise fall back to time_window
|
||||
if start_time_param and end_time_param:
|
||||
start_time = datetime.fromisoformat(start_time_param.replace('Z', '+00:00'))
|
||||
end_time = datetime.fromisoformat(end_time_param.replace('Z', '+00:00'))
|
||||
|
@ -1923,145 +1923,119 @@ def get_historical_data():
|
|||
|
||||
except Exception as e:
|
||||
print(f"❌ DEBUG: Time calculation error: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return jsonify({"error": f"Time calculation failed: {str(e)}"}), 500
|
||||
|
||||
# Get records directory
|
||||
try:
|
||||
print("🔍 DEBUG: Getting records directory...")
|
||||
records_dir = os.path.join(os.path.dirname(__file__), "records")
|
||||
print(f"🔍 DEBUG: Records directory: {records_dir}")
|
||||
print(f"🔍 DEBUG: Records dir exists: {os.path.exists(records_dir)}")
|
||||
|
||||
if not os.path.exists(records_dir):
|
||||
print("🔍 DEBUG: Records directory not found, returning empty data")
|
||||
return jsonify({
|
||||
"data": [],
|
||||
"time_range": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"variables_found": [],
|
||||
"total_points": 0,
|
||||
})
|
||||
|
||||
historical_data = []
|
||||
|
||||
# NEW: Improved date folder calculation to support wider time ranges
|
||||
print("🔍 DEBUG: Calculating date folders...")
|
||||
current_date = start_time.date()
|
||||
end_date = end_time.date()
|
||||
date_folders = []
|
||||
# Get relevant CSV files for cache checking
|
||||
records_dir = os.path.join(os.path.dirname(__file__), "records")
|
||||
csv_files = []
|
||||
|
||||
if os.path.exists(records_dir):
|
||||
# Calculate buffered time range for file discovery (same as cache buffer)
|
||||
duration = end_time - start_time
|
||||
buffer_duration = duration * 0.25
|
||||
buffer_start = start_time - buffer_duration
|
||||
buffer_end = end_time + buffer_duration
|
||||
|
||||
# Generate all dates in the range
|
||||
current_date = buffer_start.date()
|
||||
end_date = buffer_end.date()
|
||||
|
||||
# Collect all relevant CSV files
|
||||
while current_date <= end_date:
|
||||
date_str = current_date.strftime("%d-%m-%Y")
|
||||
folder_path = os.path.join(records_dir, date_str)
|
||||
if os.path.exists(folder_path):
|
||||
date_folders.append(folder_path)
|
||||
print(f"🔍 DEBUG: Added date folder: {folder_path}")
|
||||
csv_files.extend(glob.glob(os.path.join(folder_path, "*.csv")))
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
print(f"🔍 DEBUG: Found {len(csv_files)} CSV files for cache checking")
|
||||
|
||||
# Try to get data from cache first
|
||||
cached_data = historical_cache.get_cached_data(variables, start_time, end_time, csv_files)
|
||||
|
||||
if cached_data is not None:
|
||||
print("<EFBFBD> DEBUG: Cache hit! Returning cached data")
|
||||
|
||||
print(f"🔍 DEBUG: Found date folders: {date_folders}")
|
||||
except Exception as e:
|
||||
print(f"❌ DEBUG: Records directory error: {e}")
|
||||
import traceback
|
||||
# Convert DataFrame to the expected format
|
||||
historical_data = []
|
||||
for _, row in cached_data.iterrows():
|
||||
if 'timestamp' in row and 'variable' in row and 'value' in row:
|
||||
historical_data.append({
|
||||
"timestamp": row['timestamp'].isoformat() if pd.api.types.is_datetime64_any_dtype(row['timestamp']) else str(row['timestamp']),
|
||||
"variable": str(row['variable']),
|
||||
"value": float(row['value']) if pd.api.types.is_numeric_dtype(type(row['value'])) else row['value']
|
||||
})
|
||||
|
||||
return jsonify({
|
||||
"data": historical_data,
|
||||
"time_range": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"variables_found": list(set([item["variable"] for item in historical_data])),
|
||||
"total_points": len(historical_data),
|
||||
"cached": True # Indicate this was from cache
|
||||
})
|
||||
|
||||
traceback.print_exc()
|
||||
return jsonify({"error": f"Records directory error: {str(e)}"}), 500
|
||||
# Cache miss - load data from CSV files
|
||||
print("🔍 DEBUG: Cache miss - loading from CSV files")
|
||||
historical_data = []
|
||||
|
||||
if not os.path.exists(records_dir):
|
||||
print("🔍 DEBUG: Records directory not found, returning empty data")
|
||||
return jsonify({
|
||||
"data": [],
|
||||
"time_range": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"variables_found": [],
|
||||
"total_points": 0,
|
||||
})
|
||||
|
||||
# Search for CSV files with any of the required variables
|
||||
# Calculate extended range for cache storage (25% buffer)
|
||||
duration = end_time - start_time
|
||||
buffer_duration = duration * 0.25
|
||||
buffered_start = start_time - buffer_duration
|
||||
buffered_end = end_time + buffer_duration
|
||||
|
||||
# Search for CSV files in the buffered range
|
||||
current_date = buffered_start.date()
|
||||
end_date = buffered_end.date()
|
||||
date_folders = []
|
||||
|
||||
while current_date <= end_date:
|
||||
date_str = current_date.strftime("%d-%m-%Y")
|
||||
folder_path = os.path.join(records_dir, date_str)
|
||||
if os.path.exists(folder_path):
|
||||
date_folders.append(folder_path)
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
print(f"🔍 DEBUG: Processing {len(date_folders)} date folders with buffer")
|
||||
|
||||
# Process CSV files and collect all data (including buffer)
|
||||
all_data_for_cache = []
|
||||
|
||||
for folder_path in date_folders:
|
||||
csv_files = glob.glob(os.path.join(folder_path, "*.csv"))
|
||||
print(f"🔍 DEBUG: CSV files in {folder_path}: {csv_files}")
|
||||
|
||||
for csv_file in csv_files:
|
||||
csv_files_in_folder = glob.glob(os.path.join(folder_path, "*.csv"))
|
||||
|
||||
for csv_file in csv_files_in_folder:
|
||||
try:
|
||||
print(f"🔍 DEBUG: Processing CSV file: {csv_file}")
|
||||
|
||||
# Read first line to check if any required variables are present with proper encoding handling
|
||||
header_line = None
|
||||
for encoding in ["utf-8", "utf-8-sig", "utf-16", "latin-1"]:
|
||||
try:
|
||||
with open(csv_file, "r", encoding=encoding) as f:
|
||||
header_line = f.readline().strip()
|
||||
if header_line:
|
||||
print(
|
||||
f"🔍 DEBUG: Successfully read header with {encoding} encoding"
|
||||
)
|
||||
break
|
||||
except (UnicodeDecodeError, UnicodeError):
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not header_line:
|
||||
print(
|
||||
f"🔍 DEBUG: Could not read header from {csv_file}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
# Clean header line from BOM and normalize
|
||||
header_line = header_line.replace("\ufeff", "").replace("\x00", "")
|
||||
headers = [
|
||||
h.strip().replace("\x00", "") for h in header_line.split(",")
|
||||
]
|
||||
# Clean any remaining unicode artifacts
|
||||
headers = [
|
||||
h
|
||||
for h in headers
|
||||
if h and len(h.replace("\x00", "").strip()) > 0
|
||||
]
|
||||
print(f"🔍 DEBUG: Headers in {csv_file}: {headers}")
|
||||
|
||||
# Check if any of our variables are in this file
|
||||
matching_vars = [var for var in variables if var in headers]
|
||||
print(f"🔍 DEBUG: Matching variables: {matching_vars}")
|
||||
|
||||
if not matching_vars:
|
||||
print(
|
||||
f"🔍 DEBUG: No matching variables in {csv_file}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
# Read the CSV file with proper encoding and error handling
|
||||
print(f"🔍 DEBUG: Reading CSV file with pandas...")
|
||||
# Read CSV file with encoding detection
|
||||
df = None
|
||||
for encoding in ["utf-8", "utf-8-sig", "utf-16", "latin-1"]:
|
||||
try:
|
||||
df = pd.read_csv(
|
||||
csv_file, encoding=encoding, on_bad_lines="skip"
|
||||
)
|
||||
print(
|
||||
f"🔍 DEBUG: CSV loaded with {encoding}, shape: {df.shape}"
|
||||
)
|
||||
df = pd.read_csv(csv_file, encoding=encoding, on_bad_lines="skip")
|
||||
break
|
||||
except (
|
||||
UnicodeDecodeError,
|
||||
UnicodeError,
|
||||
pd.errors.ParserError,
|
||||
):
|
||||
except:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"🔍 DEBUG: Error reading with {encoding}: {e}")
|
||||
continue
|
||||
|
||||
|
||||
if df is None:
|
||||
print(
|
||||
f"Warning: Could not read CSV file {csv_file} with any encoding"
|
||||
)
|
||||
continue
|
||||
|
||||
# Clean column names from BOM and unicode artifacts
|
||||
df.columns = [
|
||||
col.replace("\ufeff", "").replace("\x00", "").strip()
|
||||
for col in df.columns
|
||||
]
|
||||
# Clean column names
|
||||
df.columns = [col.replace("\ufeff", "").replace("\x00", "").strip() for col in df.columns]
|
||||
|
||||
# Convert timestamp to datetime with flexible parsing
|
||||
print(f"🔍 DEBUG: Converting timestamps...")
|
||||
# Find timestamp column
|
||||
timestamp_col = None
|
||||
for col in df.columns:
|
||||
if "timestamp" in col.lower():
|
||||
|
@ -2069,74 +2043,40 @@ def get_historical_data():
|
|||
break
|
||||
|
||||
if timestamp_col is None:
|
||||
print(
|
||||
f"🔍 DEBUG: No timestamp column found in {csv_file}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
# Try multiple timestamp formats
|
||||
df[timestamp_col] = pd.to_datetime(
|
||||
df[timestamp_col], errors="coerce"
|
||||
)
|
||||
# Remove rows with invalid timestamps
|
||||
df = df.dropna(subset=[timestamp_col])
|
||||
|
||||
if df.empty:
|
||||
print(
|
||||
f"🔍 DEBUG: No valid timestamps in {csv_file}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
# Normalize column name to 'timestamp'
|
||||
if timestamp_col != "timestamp":
|
||||
df = df.rename(columns={timestamp_col: "timestamp"})
|
||||
|
||||
print(
|
||||
f"🔍 DEBUG: Timestamp range: {df['timestamp'].min()} to {df['timestamp'].max()}"
|
||||
)
|
||||
print(f"🔍 DEBUG: Filter range: {start_time} to {end_time}")
|
||||
except Exception as e:
|
||||
print(
|
||||
f"🔍 DEBUG: Timestamp conversion failed for {csv_file}: {e}"
|
||||
)
|
||||
# Convert timestamps
|
||||
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors="coerce")
|
||||
df = df.dropna(subset=[timestamp_col])
|
||||
|
||||
if df.empty:
|
||||
continue
|
||||
|
||||
# Recalculate matching variables after column cleaning
|
||||
clean_headers = list(df.columns)
|
||||
matching_vars = [var for var in variables if var in clean_headers]
|
||||
print(
|
||||
f"🔍 DEBUG: Matching variables after cleaning: {matching_vars}"
|
||||
)
|
||||
# Normalize column name
|
||||
if timestamp_col != "timestamp":
|
||||
df = df.rename(columns={timestamp_col: "timestamp"})
|
||||
|
||||
if not matching_vars:
|
||||
print(
|
||||
f"🔍 DEBUG: No matching variables after cleaning in {csv_file}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
# Filter by time range
|
||||
mask = (df["timestamp"] >= start_time) & (
|
||||
df["timestamp"] <= end_time
|
||||
)
|
||||
# Filter to buffered time range
|
||||
mask = (df["timestamp"] >= buffered_start) & (df["timestamp"] <= buffered_end)
|
||||
filtered_df = df[mask]
|
||||
print(f"🔍 DEBUG: Filtered dataframe shape: {filtered_df.shape}")
|
||||
|
||||
if filtered_df.empty:
|
||||
print(f"🔍 DEBUG: No data in time range for {csv_file}")
|
||||
continue
|
||||
|
||||
# Extract data for matching variables only
|
||||
print(f"🔍 DEBUG: Extracting data for variables: {matching_vars}")
|
||||
# Check for matching variables
|
||||
matching_vars = [var for var in variables if var in filtered_df.columns]
|
||||
|
||||
if not matching_vars:
|
||||
continue
|
||||
|
||||
# Extract data for cache
|
||||
for _, row in filtered_df.iterrows():
|
||||
timestamp = row["timestamp"]
|
||||
for var in matching_vars:
|
||||
if var in row and pd.notna(row[var]):
|
||||
try:
|
||||
# Convert value to appropriate type
|
||||
value = row[var]
|
||||
|
||||
# Handle boolean values
|
||||
# Type conversion
|
||||
if isinstance(value, str):
|
||||
value_lower = value.lower().strip()
|
||||
if value_lower == "true":
|
||||
|
@ -2153,63 +2093,101 @@ def get_historical_data():
|
|||
else:
|
||||
continue
|
||||
|
||||
historical_data.append(
|
||||
{
|
||||
"timestamp": timestamp.isoformat(),
|
||||
"variable": var,
|
||||
"value": value,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
# Skip invalid values
|
||||
print(
|
||||
f"🔍 DEBUG: Skipping invalid value for {var}: {e}"
|
||||
)
|
||||
all_data_for_cache.append({
|
||||
"timestamp": timestamp,
|
||||
"variable": var,
|
||||
"value": value,
|
||||
})
|
||||
except:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
# Skip files that can't be read
|
||||
print(f"Warning: Could not read CSV file {csv_file}: {e}")
|
||||
continue
|
||||
|
||||
# Sort by timestamp
|
||||
historical_data.sort(key=lambda x: x["timestamp"])
|
||||
# Convert to DataFrame for caching
|
||||
if all_data_for_cache:
|
||||
cache_df = pd.DataFrame(all_data_for_cache)
|
||||
cache_df = cache_df.sort_values('timestamp')
|
||||
|
||||
# Store in cache
|
||||
historical_cache.store_data(variables, start_time, end_time, cache_df)
|
||||
|
||||
# Filter to actual requested range for response
|
||||
response_mask = (cache_df["timestamp"] >= start_time) & (cache_df["timestamp"] <= end_time)
|
||||
response_df = cache_df[response_mask]
|
||||
|
||||
# Convert to response format
|
||||
for _, row in response_df.iterrows():
|
||||
historical_data.append({
|
||||
"timestamp": row["timestamp"].isoformat(),
|
||||
"variable": row["variable"],
|
||||
"value": row["value"]
|
||||
})
|
||||
|
||||
print(f"🔍 DEBUG: Total historical data points found: {len(historical_data)}")
|
||||
print(
|
||||
f"🔍 DEBUG: Variables found: {list(set([item['variable'] for item in historical_data]))}"
|
||||
)
|
||||
print(f"🔍 DEBUG: Loaded {len(historical_data)} data points for response")
|
||||
print(f"🔍 DEBUG: Cached {len(all_data_for_cache)} total data points")
|
||||
|
||||
return jsonify(
|
||||
{
|
||||
"data": historical_data,
|
||||
"time_range": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"variables_found": list(
|
||||
set([item["variable"] for item in historical_data])
|
||||
),
|
||||
"total_points": len(historical_data),
|
||||
}
|
||||
)
|
||||
return jsonify({
|
||||
"data": historical_data,
|
||||
"time_range": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"variables_found": list(set([item["variable"] for item in historical_data])),
|
||||
"total_points": len(historical_data),
|
||||
"cached": False # Indicate this was loaded fresh
|
||||
})
|
||||
|
||||
except ImportError as e:
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"error": f"pandas is required for historical data processing: {str(e)}"
|
||||
}
|
||||
),
|
||||
500,
|
||||
)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return jsonify({"error": f"Internal server error: {str(e)}"}), 500
|
||||
|
||||
|
||||
@app.route("/api/plots/historical/cache/stats", methods=["GET"])
|
||||
def get_cache_stats():
|
||||
"""Get cache statistics for monitoring"""
|
||||
global historical_cache
|
||||
|
||||
try:
|
||||
if historical_cache is None:
|
||||
return jsonify({
|
||||
"cache_initialized": False,
|
||||
"stats": {
|
||||
"entries": 0,
|
||||
"total_size_mb": 0,
|
||||
"max_entries": 0,
|
||||
"buffer_percentage": 0
|
||||
}
|
||||
})
|
||||
|
||||
stats = historical_cache.get_cache_stats()
|
||||
return jsonify({
|
||||
"cache_initialized": True,
|
||||
"stats": stats
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@app.route("/api/plots/historical/cache/clear", methods=["POST"])
|
||||
def clear_cache():
|
||||
"""Clear the historical data cache"""
|
||||
global historical_cache
|
||||
|
||||
try:
|
||||
if historical_cache is not None:
|
||||
historical_cache.clear_cache()
|
||||
return jsonify({"success": True, "message": "Cache cleared successfully"})
|
||||
else:
|
||||
return jsonify({"success": False, "message": "Cache not initialized"})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@app.route("/api/plots/sessions/<plot_id>", methods=["GET"])
|
||||
def get_plot_sessions(plot_id):
|
||||
"""Get all session IDs for a specific plot ID"""
|
||||
|
|
Loading…
Reference in New Issue