241 lines
8.2 KiB
Python
241 lines
8.2 KiB
Python
|
# services/excel/excel_service.py
|
||
|
"""
|
||
|
Excel file handling service with retry and formatting capabilities
|
||
|
"""
|
||
|
import pandas as pd
|
||
|
import time
|
||
|
from typing import Optional, Union, Dict, Any
|
||
|
from pathlib import Path
|
||
|
import openpyxl
|
||
|
from openpyxl.utils import get_column_letter
|
||
|
from openpyxl.styles import PatternFill, Alignment, Font
|
||
|
from openpyxl.worksheet.worksheet import Worksheet
|
||
|
|
||
|
class ExcelService:
|
||
|
"""Service for handling Excel files with advanced features"""
|
||
|
|
||
|
def __init__(self, max_retries: int = 5, retry_delay: int = 5):
|
||
|
self.max_retries = max_retries
|
||
|
self.retry_delay = retry_delay
|
||
|
|
||
|
def read_excel(
|
||
|
self,
|
||
|
file_path: Union[str, Path],
|
||
|
sheet_name: str = "Sheet1",
|
||
|
**kwargs
|
||
|
) -> pd.DataFrame:
|
||
|
"""
|
||
|
Read Excel file with retries and cleanup
|
||
|
|
||
|
Args:
|
||
|
file_path: Path to Excel file
|
||
|
sheet_name: Name of sheet to read
|
||
|
**kwargs: Additional arguments for pd.read_excel
|
||
|
|
||
|
Returns:
|
||
|
DataFrame with the Excel content
|
||
|
"""
|
||
|
retries = 0
|
||
|
while retries < self.max_retries:
|
||
|
try:
|
||
|
# Intentar leer el archivo con openpyxl
|
||
|
df = pd.read_excel(file_path, engine="openpyxl", sheet_name=sheet_name, **kwargs)
|
||
|
|
||
|
# Limpiar caracteres especiales y normalizar saltos de línea
|
||
|
for col in df.columns:
|
||
|
df[col] = df[col].apply(
|
||
|
lambda x: self._clean_special_chars(x) if pd.notna(x) else x
|
||
|
)
|
||
|
|
||
|
print(f"Archivo leído y limpiado exitosamente: {file_path}")
|
||
|
return df
|
||
|
|
||
|
except ValueError as ve:
|
||
|
if "must be either numerical or a string containing a wildcard" in str(ve):
|
||
|
print(f"Error al leer el archivo: {ve}")
|
||
|
print("Intentando eliminar filtros y leer el archivo nuevamente...")
|
||
|
try:
|
||
|
# Cargar el libro de trabajo
|
||
|
wb = openpyxl.load_workbook(filename=file_path)
|
||
|
sheet = wb.active
|
||
|
|
||
|
# Eliminar filtros si existen
|
||
|
if sheet.auto_filter:
|
||
|
sheet.auto_filter.ref = None
|
||
|
|
||
|
# Guardar el archivo temporalmente sin filtros
|
||
|
temp_file = str(file_path) + "_temp.xlsx"
|
||
|
wb.save(temp_file)
|
||
|
|
||
|
# Leer el archivo temporal
|
||
|
df = pd.read_excel(temp_file, engine="openpyxl", **kwargs)
|
||
|
|
||
|
# Eliminar el archivo temporal
|
||
|
Path(temp_file).unlink()
|
||
|
|
||
|
return df
|
||
|
except Exception as e:
|
||
|
print(f"Error al intentar eliminar filtros y leer el archivo: {e}")
|
||
|
else:
|
||
|
print(f"Error de valor: {ve}")
|
||
|
|
||
|
except PermissionError as e:
|
||
|
print(
|
||
|
f"Error de permiso: {e}. Por favor cierre el archivo. "
|
||
|
f"Reintentando en {self.retry_delay} segundos..."
|
||
|
)
|
||
|
except Exception as e:
|
||
|
print(f"Error inesperado: {e}. Reintentando en {self.retry_delay} segundos...")
|
||
|
|
||
|
retries += 1
|
||
|
time.sleep(self.retry_delay)
|
||
|
|
||
|
raise Exception(f"No se pudo leer el archivo después de {self.max_retries} intentos.")
|
||
|
|
||
|
def save_excel(
|
||
|
self,
|
||
|
df: pd.DataFrame,
|
||
|
file_path: Union[str, Path],
|
||
|
sheet_name: str = "Sheet1",
|
||
|
format_options: Optional[Dict[str, Any]] = None,
|
||
|
**kwargs
|
||
|
) -> None:
|
||
|
"""
|
||
|
Save DataFrame to Excel with formatting
|
||
|
|
||
|
Args:
|
||
|
df: DataFrame to save
|
||
|
file_path: Path to save Excel file
|
||
|
sheet_name: Name of sheet
|
||
|
format_options: Dictionary with formatting options
|
||
|
**kwargs: Additional arguments for pd.to_excel
|
||
|
"""
|
||
|
if format_options is None:
|
||
|
format_options = {}
|
||
|
|
||
|
retries = 0
|
||
|
while retries < self.max_retries:
|
||
|
try:
|
||
|
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
|
||
|
# Save DataFrame
|
||
|
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
||
|
|
||
|
# Apply formatting
|
||
|
self._format_worksheet(
|
||
|
writer.sheets[sheet_name],
|
||
|
format_options
|
||
|
)
|
||
|
|
||
|
print(f"Archivo guardado exitosamente en: {file_path}")
|
||
|
return
|
||
|
|
||
|
except PermissionError as e:
|
||
|
print(
|
||
|
f"Error de permiso: {e}. Por favor cierre el archivo. "
|
||
|
f"Reintentando en {self.retry_delay} segundos..."
|
||
|
)
|
||
|
except Exception as e:
|
||
|
print(f"Error inesperado: {e}. Reintentando en {self.retry_delay} segundos...")
|
||
|
|
||
|
retries += 1
|
||
|
time.sleep(self.retry_delay)
|
||
|
|
||
|
raise Exception(f"No se pudo guardar el archivo después de {self.max_retries} intentos.")
|
||
|
|
||
|
def _format_worksheet(self, worksheet: Worksheet, options: Dict[str, Any]) -> None:
|
||
|
"""
|
||
|
Apply formatting to worksheet
|
||
|
|
||
|
Args:
|
||
|
worksheet: Worksheet to format
|
||
|
options: Formatting options
|
||
|
"""
|
||
|
# Freeze panes if specified
|
||
|
freeze_row = options.get('freeze_row', 2)
|
||
|
freeze_col = options.get('freeze_col', 1)
|
||
|
if freeze_row or freeze_col:
|
||
|
freeze_cell = f"{get_column_letter(freeze_col)}{freeze_row}"
|
||
|
worksheet.freeze_panes = freeze_cell
|
||
|
|
||
|
# Auto-adjust column widths
|
||
|
max_width = options.get('max_column_width', 50)
|
||
|
min_width = options.get('min_column_width', 8)
|
||
|
wrap_threshold = options.get('wrap_threshold', 50)
|
||
|
|
||
|
for col in worksheet.columns:
|
||
|
max_length = 0
|
||
|
column = col[0].column_letter
|
||
|
|
||
|
for cell in col:
|
||
|
try:
|
||
|
if cell.value:
|
||
|
text_length = len(str(cell.value))
|
||
|
if text_length > wrap_threshold:
|
||
|
cell.alignment = Alignment(wrap_text=True, vertical='top')
|
||
|
text_length = min(
|
||
|
wrap_threshold,
|
||
|
max(len(word) for word in str(cell.value).split())
|
||
|
)
|
||
|
max_length = max(max_length, text_length)
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
adjusted_width = min(max_width, max(min_width, max_length + 2))
|
||
|
worksheet.column_dimensions[column].width = adjusted_width
|
||
|
|
||
|
# Apply custom styles
|
||
|
header_row = options.get('header_row', 1)
|
||
|
if header_row:
|
||
|
header_fill = PatternFill(
|
||
|
start_color=options.get('header_color', 'F2F2F2'),
|
||
|
end_color=options.get('header_color', 'F2F2F2'),
|
||
|
fill_type='solid'
|
||
|
)
|
||
|
header_font = Font(bold=True)
|
||
|
|
||
|
for cell in worksheet[header_row]:
|
||
|
cell.fill = header_fill
|
||
|
cell.font = header_font
|
||
|
|
||
|
def _clean_special_chars(self, text: Any) -> Any:
|
||
|
"""Clean special characters and normalize line breaks"""
|
||
|
if isinstance(text, str):
|
||
|
# Normalize line breaks
|
||
|
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
||
|
# Replace other special characters if needed
|
||
|
return text
|
||
|
return text
|
||
|
|
||
|
# Example usage:
|
||
|
"""
|
||
|
from services.excel.excel_service import ExcelService
|
||
|
|
||
|
# Create service
|
||
|
excel_service = ExcelService()
|
||
|
|
||
|
# Read Excel file
|
||
|
try:
|
||
|
df = excel_service.read_excel("input.xlsx")
|
||
|
print("Data loaded successfully")
|
||
|
|
||
|
# Modify data...
|
||
|
|
||
|
# Save with formatting
|
||
|
format_options = {
|
||
|
'freeze_row': 2,
|
||
|
'freeze_col': 1,
|
||
|
'max_column_width': 50,
|
||
|
'min_column_width': 8,
|
||
|
'wrap_threshold': 50,
|
||
|
'header_color': 'E6E6E6'
|
||
|
}
|
||
|
|
||
|
excel_service.save_excel(
|
||
|
df,
|
||
|
"output.xlsx",
|
||
|
format_options=format_options
|
||
|
)
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"Error handling Excel file: {e}")
|
||
|
"""
|