Arch/services/schema_service.py

170 lines
4.9 KiB
Python

import os
import json
from datetime import datetime
import pytz
from flask import current_app
from utils.file_utils import load_json_file, save_json_file
def get_schema_file_path():
"""Get path to the schema storage file."""
storage_path = current_app.config["STORAGE_PATH"]
return os.path.join(storage_path, "schemas", "schema.json")
def load_schemas():
"""Load all schemas from storage."""
file_path = get_schema_file_path()
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save_schemas(schemas):
"""Save schemas to storage."""
file_path = get_schema_file_path()
with open(file_path, "w", encoding="utf-8") as f:
json.dump(schemas, f, ensure_ascii=False, indent=2)
return True
def get_all_schemas():
"""Get all schemas as a list."""
schemas_dict = load_schemas()
return [
{**schema_data, "id": schema_id}
for schema_id, schema_data in schemas_dict.items()
]
def get_schema_by_id(schema_id):
"""Get a specific schema by ID."""
schemas = load_schemas()
if schema_id in schemas:
return {**schemas[schema_id], "id": schema_id}
return None
def create_schema(schema_data, user_id):
"""
Crear un nuevo esquema.
Args:
schema_data (dict): Datos del esquema
user_id (str): ID del usuario que crea el esquema
Returns:
tuple: (éxito, mensaje)
"""
schemas = load_schemas()
# Verificar si ya existe un esquema con ese código
schema_id = schema_data["codigo"]
if schema_id in schemas:
return False, f"Ya existe un esquema con el código {schema_id}."
# Añadir metadatos
schema_data["fecha_creacion"] = datetime.now(pytz.UTC).isoformat()
schema_data["creado_por"] = user_id
# Guardar esquema
schemas[schema_id] = schema_data
save_schemas(schemas)
return True, f"Esquema '{schema_data['descripcion']}' creado correctamente."
def update_schema(schema_id, schema_data):
"""Update an existing schema."""
schemas = load_schemas()
if schema_id not in schemas:
return False
# Update modification timestamp
schema_data["updated_at"] = datetime.now().isoformat()
# Remove id from data before saving (it's used as the key)
data_to_save = {k: v for k, v in schema_data.items() if k != "id"}
schemas[schema_id] = data_to_save
return save_schemas(schemas)
def delete_schema(schema_id):
"""
Eliminar un esquema.
Args:
schema_id (str): ID del esquema a eliminar
Returns:
tuple: (éxito, mensaje)
"""
schemas = load_schemas()
# Verificar si existe el esquema
if schema_id not in schemas:
return False, f"No existe un esquema con el código {schema_id}."
# Verificar si el esquema está en uso
# [Implementar verificación si el esquema está asociado a proyectos]
# Eliminar esquema
schema_desc = schemas[schema_id].get("descripcion", schema_id)
del schemas[schema_id]
save_schemas(schemas)
return True, f"Esquema '{schema_desc}' eliminado correctamente."
def initialize_default_schemas():
"""Initialize default schemas if none exist."""
schemas = load_schemas()
if not schemas:
default_schemas = {
"default": {
"name": "Documento Estándar",
"description": "Esquema básico para documentos",
"fields": [
{"name": "title", "type": "text", "required": True},
{"name": "content", "type": "textarea", "required": True},
{"name": "tags", "type": "tags", "required": False},
],
"created_by": "system",
"created_at": datetime.now().isoformat(),
},
"report": {
"name": "Informe",
"description": "Esquema para informes formales",
"fields": [
{"name": "title", "type": "text", "required": True},
{"name": "summary", "type": "textarea", "required": True},
{"name": "body", "type": "richtext", "required": True},
{"name": "date", "type": "date", "required": True},
{"name": "author", "type": "text", "required": True},
],
"created_by": "system",
"created_at": datetime.now().isoformat(),
},
}
save_schemas(default_schemas)
def get_schema_document_types(schema_id):
"""
Obtener los tipos de documentos definidos en un esquema.
Args:
schema_id (str): ID del esquema
Returns:
list: Lista de tipos de documentos en el esquema, o lista vacía si no existe
"""
schema = get_schema_by_id(schema_id)
if not schema or "documentos" not in schema:
return []
return schema["documentos"]