ParamManagerScripts/backend/script_groups/ragex/x1.py

113 lines
3.4 KiB
Python

"""
Este script realiza la ingesta de los datos alammacenados en el subdirectorio de ingesta.
"""
import os
import sys
from pathlib import Path
import json
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings # Cambiado a OpenAI Embeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
import os
import glob
def load_documents(directory):
documents = []
# Cargar archivos markdown
for md_file in glob.glob(os.path.join(directory, "**/*.md"), recursive=True):
with open(md_file, "r", encoding="utf-8") as f:
content = f.read()
documents.append(
{
"content": content,
"metadata": {"source": md_file, "type": "markdown"},
}
)
# Cargar archivos de texto
for txt_file in glob.glob(os.path.join(directory, "**/*.txt"), recursive=True):
with open(txt_file, "r", encoding="utf-8") as f:
content = f.read()
documents.append(
{"content": content, "metadata": {"source": txt_file, "type": "text"}}
)
return documents
def process_documents(documents, db_directory):
# Usar OpenAI Embeddings en lugar de HuggingFace
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Dividir documentos en chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n## ", "\n### ", "\n#### ", "\n", " ", ""],
keep_separator=True,
)
docs = []
for doc in documents:
chunks = text_splitter.split_text(doc["content"])
for i, chunk in enumerate(chunks):
docs.append(
Document(
page_content=chunk,
metadata={
**doc["metadata"],
"chunk_id": i,
"chunk": chunk[:100] + "...", # Extracto para referencia
},
)
)
# Configurar Chroma para evitar dependencia de ONNX
from chromadb.config import Settings
# Crear o actualizar la base de datos vectorial con configuración específica
db = Chroma.from_documents(
docs,
embeddings,
persist_directory=db_directory,
client_settings=Settings(anonymized_telemetry=False, is_persistent=True),
)
db.persist()
print(f"Procesados {len(docs)} fragmentos de {len(documents)} documentos")
return db
def main():
# Cargar configuraciones del entorno
configs = json.loads(os.environ.get("SCRIPT_CONFIGS", "{}"))
# Obtener working directory
working_directory = configs.get("working_directory", ".")
# Obtener configuraciones de nivel 2 (grupo)
group_config = configs.get("level2", {})
work_config = configs.get("level3", {})
in_dir = work_config.get("in_dir", ".")
docs_directory = os.path.join(working_directory, in_dir)
db_directory = os.path.join(working_directory, "chroma_db")
print("Cargando documentos...")
documents = load_documents(docs_directory)
print(f"Se encontraron {len(documents)} documentos.")
print("Procesando e indexando documentos...")
db = process_documents(documents, db_directory)
print("¡Ingesta completada con éxito!")
if __name__ == "__main__":
main()