Software Base
This commit is contained in:
commit
b0150a58dd
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"input_dir": "D:\\Proyectos\\Scripts\\EmailCrono",
|
||||
"output_dir": "C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\04-InLavoro\\HENKEL\\93040 - HENKEL - BowlingGreen\\Description\\HENKEL - ALPLA - AUTEFA - Batch Data",
|
||||
"cronologia_file": "cronologia.md",
|
||||
"attachments_dir": "adjuntos"
|
||||
}
|
Binary file not shown.
|
@ -0,0 +1,44 @@
|
|||
# config/config.py
|
||||
import json
|
||||
import os
|
||||
|
||||
class Config:
|
||||
def __init__(self, config_file='config.json'):
|
||||
self.config_file = config_file
|
||||
self.config = self._load_config()
|
||||
|
||||
def _load_config(self):
|
||||
if not os.path.exists(self.config_file):
|
||||
default_config = {
|
||||
'input_dir': '.',
|
||||
'output_dir': '.',
|
||||
'cronologia_file': 'cronologia.md',
|
||||
'attachments_dir': 'adjuntos'
|
||||
}
|
||||
self._save_config(default_config)
|
||||
return default_config
|
||||
|
||||
with open(self.config_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
def _save_config(self, config):
|
||||
with open(self.config_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(config, f, indent=4)
|
||||
|
||||
def get_input_dir(self):
|
||||
return self.config.get('input_dir', '.')
|
||||
|
||||
def get_output_dir(self):
|
||||
return self.config.get('output_dir', '.')
|
||||
|
||||
def get_cronologia_file(self):
|
||||
return os.path.join(
|
||||
self.get_output_dir(),
|
||||
self.config.get('cronologia_file', 'cronologia.md')
|
||||
)
|
||||
|
||||
def get_attachments_dir(self):
|
||||
return os.path.join(
|
||||
self.get_output_dir(),
|
||||
self.config.get('attachments_dir', 'adjuntos')
|
||||
)
|
|
@ -0,0 +1,52 @@
|
|||
# main.py
|
||||
import os
|
||||
from pathlib import Path
|
||||
from utils.email_parser import procesar_eml
|
||||
from utils.markdown_handler import cargar_cronologia_existente
|
||||
from config.config import Config
|
||||
|
||||
def main():
|
||||
config = Config()
|
||||
|
||||
# Debug prints
|
||||
print(f"Input directory: {config.get_input_dir()}")
|
||||
print(f"Output directory: {config.get_output_dir()}")
|
||||
print(f"Cronologia file: {config.get_cronologia_file()}")
|
||||
print(f"Attachments directory: {config.get_attachments_dir()}")
|
||||
|
||||
# Ensure directories exist
|
||||
os.makedirs(config.get_output_dir(), exist_ok=True)
|
||||
os.makedirs(config.get_attachments_dir(), exist_ok=True)
|
||||
|
||||
# Check if input directory exists and has files
|
||||
input_path = Path(config.get_input_dir())
|
||||
if not input_path.exists():
|
||||
print(f"Error: Input directory {input_path} does not exist")
|
||||
return
|
||||
|
||||
eml_files = list(input_path.glob('*.eml'))
|
||||
print(f"Found {len(eml_files)} .eml files")
|
||||
|
||||
# mensajes = cargar_cronologia_existente(config.get_cronologia_file())
|
||||
mensajes = []
|
||||
print(f"Loaded {len(mensajes)} existing messages")
|
||||
mensajes_hash = {msg.hash for msg in mensajes}
|
||||
|
||||
for archivo in eml_files:
|
||||
print(f"Processing {archivo}")
|
||||
nuevos_mensajes = procesar_eml(archivo, config.get_attachments_dir())
|
||||
for msg in nuevos_mensajes:
|
||||
if msg.hash not in mensajes_hash:
|
||||
mensajes.append(msg)
|
||||
mensajes_hash.add(msg.hash)
|
||||
|
||||
mensajes.sort(key=lambda x: x.fecha)
|
||||
|
||||
output_file = config.get_cronologia_file()
|
||||
print(f"Writing to {output_file}")
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
for msg in mensajes:
|
||||
f.write(msg.to_markdown())
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Binary file not shown.
|
@ -0,0 +1,81 @@
|
|||
# models/mensaje_email.py
|
||||
import re
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
from email.utils import parseaddr, parsedate_to_datetime
|
||||
|
||||
class MensajeEmail:
|
||||
def __init__(self, remitente, fecha, contenido, subject=None, adjuntos=None):
|
||||
self.remitente = self._estandarizar_remitente(remitente)
|
||||
self.fecha = self._estandarizar_fecha(fecha)
|
||||
self.subject = subject
|
||||
self.contenido = self._limpiar_contenido(contenido)
|
||||
self.adjuntos = adjuntos if adjuntos else []
|
||||
self.hash = self._generar_hash()
|
||||
|
||||
def _limpiar_contenido(self, contenido):
|
||||
if not contenido:
|
||||
return ""
|
||||
|
||||
# Eliminar líneas de metadatos
|
||||
lines = contenido.split('\n')
|
||||
cleaned_lines = []
|
||||
|
||||
for line in lines:
|
||||
# Skip metadata lines
|
||||
if line.strip().startswith(('Da: ', 'Inviato: ', 'A: ', 'From: ', 'Sent: ', 'To: ')) or line.strip().startswith('Oggetto: '):
|
||||
continue
|
||||
cleaned_lines.append(line)
|
||||
|
||||
# Unir las líneas
|
||||
text = '\n'.join(cleaned_lines)
|
||||
|
||||
# Reemplazar 3 o más saltos de línea por dos
|
||||
text = re.sub(r'\n{3,}', '\n\n', text)
|
||||
|
||||
return text.strip()
|
||||
|
||||
def to_markdown(self):
|
||||
fecha_formato = self.fecha.strftime('%Y%m%d%H%M%S')
|
||||
md = f"## {fecha_formato}|{self.remitente}\n\n"
|
||||
if self.subject:
|
||||
md += f"**Asunto**: {self.subject}\n\n"
|
||||
md += self.contenido + "\n\n"
|
||||
if self.adjuntos:
|
||||
md += "### Adjuntos\n"
|
||||
for adj in self.adjuntos:
|
||||
md += f"- [[{adj}]]\n"
|
||||
md += "---\n\n"
|
||||
return md
|
||||
|
||||
def _estandarizar_remitente(self, remitente):
|
||||
if 'Da:' in remitente:
|
||||
remitente = remitente.split('Da:')[1].split('Inviato:')[0]
|
||||
elif 'From:' in remitente:
|
||||
remitente = remitente.split('From:')[1].split('Sent:')[0]
|
||||
|
||||
nombre, email = parseaddr(remitente)
|
||||
if not nombre and email:
|
||||
nombre = email.split('@')[0]
|
||||
elif not nombre and not email:
|
||||
nombre_match = re.search(r'([A-Za-z\s]+)\s*<', remitente)
|
||||
if nombre_match:
|
||||
nombre = nombre_match.group(1)
|
||||
else:
|
||||
return "Remitente Desconocido"
|
||||
|
||||
nombre = re.sub(r'[<>:"/\\|?*]', '', nombre.strip())
|
||||
nombre = nombre.encode('ascii', 'ignore').decode('ascii')
|
||||
return nombre
|
||||
|
||||
def _estandarizar_fecha(self, fecha):
|
||||
if isinstance(fecha, str):
|
||||
try:
|
||||
return parsedate_to_datetime(fecha)
|
||||
except:
|
||||
return datetime.now()
|
||||
return fecha
|
||||
|
||||
def _generar_hash(self):
|
||||
texto = f"{self.remitente}{self.fecha.isoformat()}{self.contenido}"
|
||||
return hashlib.md5(texto.encode()).hexdigest()
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,33 @@
|
|||
# utils/attachment_handler.py
|
||||
import os
|
||||
import hashlib
|
||||
import re
|
||||
|
||||
def guardar_adjunto(parte, dir_adjuntos):
|
||||
nombre = parte.get_filename()
|
||||
if not nombre:
|
||||
return None
|
||||
|
||||
nombre = re.sub(r'[<>:"/\\|?*]', '_', nombre)
|
||||
ruta = os.path.join(dir_adjuntos, nombre)
|
||||
|
||||
if os.path.exists(ruta):
|
||||
contenido_nuevo = parte.get_payload(decode=True)
|
||||
hash_nuevo = hashlib.md5(contenido_nuevo).hexdigest()
|
||||
|
||||
with open(ruta, 'rb') as f:
|
||||
hash_existente = hashlib.md5(f.read()).hexdigest()
|
||||
|
||||
if hash_nuevo == hash_existente:
|
||||
return ruta
|
||||
|
||||
base, ext = os.path.splitext(nombre)
|
||||
i = 1
|
||||
while os.path.exists(ruta):
|
||||
ruta = os.path.join(dir_adjuntos, f"{base}_{i}{ext}")
|
||||
i += 1
|
||||
|
||||
with open(ruta, 'wb') as f:
|
||||
f.write(parte.get_payload(decode=True))
|
||||
|
||||
return ruta
|
|
@ -0,0 +1,134 @@
|
|||
# utils/email_parser.py
|
||||
import email
|
||||
from email import policy
|
||||
from email.parser import BytesParser
|
||||
from datetime import datetime
|
||||
import re
|
||||
from pathlib import Path
|
||||
from bs4 import BeautifulSoup
|
||||
from email.utils import parsedate_to_datetime
|
||||
from models.mensaje_email import MensajeEmail
|
||||
from utils.attachment_handler import guardar_adjunto
|
||||
|
||||
def _html_a_markdown(html):
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
|
||||
# Convert tables, keeping all newlines
|
||||
for table in soup.find_all('table'):
|
||||
rows = table.find_all('tr')
|
||||
|
||||
if rows:
|
||||
markdown_table = []
|
||||
# Get maximum width for each column
|
||||
max_widths = []
|
||||
for row in rows:
|
||||
cells = row.find_all(['th', 'td'])
|
||||
while len(max_widths) < len(cells):
|
||||
max_widths.append(0)
|
||||
for i, cell in enumerate(cells):
|
||||
max_widths[i] = max(max_widths[i], len(cell.get_text().strip()))
|
||||
|
||||
# Build table rows
|
||||
header_row = rows[0].find_all(['th', 'td'])
|
||||
header = '| ' + ' | '.join(cell.get_text().strip().ljust(max_widths[i])
|
||||
for i, cell in enumerate(header_row)) + ' |'
|
||||
separator = '|' + '|'.join('-' * (width + 2) for width in max_widths) + '|'
|
||||
|
||||
markdown_table.append(header)
|
||||
markdown_table.append(separator)
|
||||
|
||||
for row in rows[1:]:
|
||||
cells = row.find_all(['td', 'th'])
|
||||
row_text = '| ' + ' | '.join(cell.get_text().strip().ljust(max_widths[i])
|
||||
for i, cell in enumerate(cells)) + ' |'
|
||||
markdown_table.append(row_text)
|
||||
|
||||
# Join with newlines and replace
|
||||
new_text = '\n' + '\n'.join(markdown_table)
|
||||
table.replace_with(soup.new_string(new_text))
|
||||
|
||||
# Handle basic HTML elements
|
||||
for br in soup.find_all('br'):
|
||||
br.replace_with('\n')
|
||||
|
||||
# Get text content
|
||||
text = soup.get_text()
|
||||
|
||||
# Only extract subject and remove basic email headers
|
||||
lines = text.split('\n')
|
||||
cleaned_lines = []
|
||||
subject = None
|
||||
|
||||
for line in lines:
|
||||
# Extract subject if present
|
||||
if line.startswith('Oggetto: '):
|
||||
subject = line[9:].strip()
|
||||
continue
|
||||
|
||||
# Skip only the most basic email headers
|
||||
if line.startswith(('Da: ', 'Inviato: ', 'A: ', 'From: ', 'Sent: ', 'To: ')):
|
||||
continue
|
||||
|
||||
# Keep the line as is, with all its spacing
|
||||
cleaned_lines.append(line)
|
||||
|
||||
# Join lines preserving all newlines
|
||||
text = '\n'.join(cleaned_lines)
|
||||
|
||||
return subject, text
|
||||
|
||||
def procesar_eml(ruta_archivo, dir_adjuntos):
|
||||
with open(ruta_archivo, 'rb') as eml:
|
||||
mensaje = BytesParser(policy=policy.default).parse(eml)
|
||||
|
||||
remitente = mensaje.get('from', '')
|
||||
fecha_str = mensaje.get('date', '')
|
||||
fecha = _parsear_fecha(fecha_str)
|
||||
|
||||
contenido = ""
|
||||
subject = None
|
||||
adjuntos = []
|
||||
|
||||
if mensaje.is_multipart():
|
||||
for parte in mensaje.walk():
|
||||
if parte.get_content_type() == "text/plain":
|
||||
text = parte.get_payload(decode=True).decode(parte.get_content_charset() or 'utf-8', errors='ignore')
|
||||
contenido += text
|
||||
elif parte.get_content_type() == "text/html":
|
||||
html_content = parte.get_payload(decode=True).decode(parte.get_content_charset() or 'utf-8', errors='ignore')
|
||||
part_subject, text = _html_a_markdown(html_content)
|
||||
if part_subject and not subject:
|
||||
subject = part_subject
|
||||
contenido += text
|
||||
elif parte.get_content_disposition() == 'attachment':
|
||||
ruta_adjunto = guardar_adjunto(parte, dir_adjuntos)
|
||||
if ruta_adjunto:
|
||||
adjuntos.append(Path(ruta_adjunto).name)
|
||||
else:
|
||||
if mensaje.get_content_type() == "text/html":
|
||||
html_content = mensaje.get_payload(decode=True).decode(mensaje.get_content_charset() or 'utf-8', errors='ignore')
|
||||
subject, contenido = _html_a_markdown(html_content)
|
||||
else:
|
||||
contenido = mensaje.get_payload(decode=True).decode(mensaje.get_content_charset() or 'utf-8', errors='ignore')
|
||||
|
||||
return [MensajeEmail(remitente=remitente, fecha=fecha, contenido=contenido, subject=subject, adjuntos=adjuntos)]
|
||||
|
||||
def _parsear_fecha(fecha_str):
|
||||
try:
|
||||
fecha = parsedate_to_datetime(fecha_str)
|
||||
return fecha.replace(tzinfo=None) # Remove timezone info
|
||||
except:
|
||||
try:
|
||||
fecha_match = re.search(r'venerd=EC (\d{1,2}) (\w+) (\d{4}) (\d{1,2}):(\d{2})', fecha_str)
|
||||
if fecha_match:
|
||||
dia, mes, año, hora, minuto = fecha_match.groups()
|
||||
meses_it = {
|
||||
'gennaio': 1, 'febbraio': 2, 'marzo': 3, 'aprile': 4,
|
||||
'maggio': 5, 'giugno': 6, 'luglio': 7, 'agosto': 8,
|
||||
'settembre': 9, 'ottobre': 10, 'novembre': 11, 'dicembre': 12
|
||||
}
|
||||
mes_num = meses_it.get(mes.lower(), 1)
|
||||
return datetime(int(año), mes_num, int(dia), int(hora), int(minuto))
|
||||
except:
|
||||
pass
|
||||
return datetime.now()
|
|
@ -0,0 +1,39 @@
|
|||
# utils/markdown_handler.py
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
from models.mensaje_email import MensajeEmail
|
||||
|
||||
def cargar_cronologia_existente(archivo):
|
||||
mensajes = []
|
||||
if not os.path.exists(archivo):
|
||||
return mensajes
|
||||
|
||||
with open(archivo, 'r', encoding='utf-8') as f:
|
||||
contenido = f.read()
|
||||
|
||||
bloques = contenido.split('---\n\n')
|
||||
for bloque in bloques:
|
||||
if not bloque.strip():
|
||||
continue
|
||||
|
||||
match = re.match(r'## (\d{14})\|(.*?)\n\n(.*)', bloque.strip(), re.DOTALL)
|
||||
if match:
|
||||
fecha_str, remitente, contenido = match.groups()
|
||||
fecha = datetime.strptime(fecha_str, '%Y%m%d%H%M%S')
|
||||
|
||||
adjuntos = []
|
||||
if '### Adjuntos' in contenido:
|
||||
contenido_principal, lista_adjuntos = contenido.split('### Adjuntos')
|
||||
adjuntos = [adj.strip()[2:-2] for adj in lista_adjuntos.strip().split('\n')]
|
||||
contenido = contenido_principal.strip()
|
||||
|
||||
mensajes.append(MensajeEmail(
|
||||
remitente=remitente,
|
||||
fecha=fecha,
|
||||
contenido=contenido,
|
||||
adjuntos=adjuntos
|
||||
))
|
||||
|
||||
return mensajes
|
||||
|
Loading…
Reference in New Issue