""" Document organization service for managing hierarchical folder structures, tagging, categorization, and metadata with multi-tenant support. """ import asyncio import logging from typing import Dict, List, Optional, Any, Set from datetime import datetime import uuid from pathlib import Path import json from sqlalchemy.orm import Session from sqlalchemy import and_, or_, func from app.models.document import Document, DocumentTag, DocumentType from app.models.tenant import Tenant from app.core.database import get_db logger = logging.getLogger(__name__) class DocumentOrganizationService: """Service for organizing documents with hierarchical structures and metadata.""" def __init__(self, tenant: Tenant): self.tenant = tenant self.default_categories = { DocumentType.BOARD_PACK: ["Board Meetings", "Strategic Planning", "Governance"], DocumentType.MINUTES: ["Board Meetings", "Committee Meetings", "Executive Meetings"], DocumentType.STRATEGIC_PLAN: ["Strategic Planning", "Business Planning", "Long-term Planning"], DocumentType.FINANCIAL_REPORT: ["Financial", "Reports", "Performance"], DocumentType.COMPLIANCE_REPORT: ["Compliance", "Regulatory", "Audit"], DocumentType.POLICY_DOCUMENT: ["Policies", "Procedures", "Governance"], DocumentType.CONTRACT: ["Legal", "Contracts", "Agreements"], DocumentType.PRESENTATION: ["Presentations", "Communications", "Training"], DocumentType.SPREADSHEET: ["Data", "Analysis", "Reports"], DocumentType.OTHER: ["General", "Miscellaneous"] } async def create_folder_structure(self, db: Session, folder_path: str, description: str = None) -> Dict[str, Any]: """ Create a hierarchical folder structure. """ try: # Parse folder path (e.g., "Board Meetings/2024/Q1") folders = folder_path.strip("/").split("/") # Create folder metadata folder_metadata = { "type": "folder", "path": folder_path, "name": folders[-1], "parent_path": "/".join(folders[:-1]) if len(folders) > 1 else "", "description": description, "created_at": datetime.utcnow().isoformat(), "tenant_id": str(self.tenant.id) } # Store folder metadata in document table with special type folder_document = Document( id=uuid.uuid4(), title=folder_path, description=description or f"Folder: {folder_path}", document_type=DocumentType.OTHER, filename="", # Folders don't have files file_path="", file_size=0, mime_type="application/x-folder", uploaded_by=None, # System-created organization_id=self.tenant.id, processing_status="completed", document_metadata=folder_metadata ) db.add(folder_document) db.commit() db.refresh(folder_document) return { "id": str(folder_document.id), "path": folder_path, "name": folders[-1], "parent_path": folder_metadata["parent_path"], "description": description, "created_at": folder_document.created_at.isoformat() } except Exception as e: logger.error(f"Error creating folder structure {folder_path}: {str(e)}") raise async def move_document_to_folder(self, db: Session, document_id: str, folder_path: str) -> bool: """ Move a document to a specific folder. """ try: document = db.query(Document).filter( and_( Document.id == document_id, Document.organization_id == self.tenant.id ) ).first() if not document: raise ValueError("Document not found") # Update document metadata with folder information if not document.document_metadata: document.document_metadata = {} document.document_metadata["folder_path"] = folder_path document.document_metadata["folder_name"] = folder_path.split("/")[-1] document.document_metadata["moved_at"] = datetime.utcnow().isoformat() db.commit() return True except Exception as e: logger.error(f"Error moving document {document_id} to folder {folder_path}: {str(e)}") return False async def get_documents_in_folder(self, db: Session, folder_path: str, skip: int = 0, limit: int = 100) -> Dict[str, Any]: """ Get all documents in a specific folder. """ try: # Query documents with folder metadata query = db.query(Document).filter( and_( Document.organization_id == self.tenant.id, Document.document_metadata.contains({"folder_path": folder_path}) ) ) total = query.count() documents = query.offset(skip).limit(limit).all() return { "folder_path": folder_path, "documents": [ { "id": str(doc.id), "title": doc.title, "description": doc.description, "document_type": doc.document_type, "filename": doc.filename, "file_size": doc.file_size, "processing_status": doc.processing_status, "created_at": doc.created_at.isoformat(), "tags": [{"id": str(tag.id), "name": tag.name} for tag in doc.tags] } for doc in documents ], "total": total, "skip": skip, "limit": limit } except Exception as e: logger.error(f"Error getting documents in folder {folder_path}: {str(e)}") return {"folder_path": folder_path, "documents": [], "total": 0, "skip": skip, "limit": limit} async def get_folder_structure(self, db: Session, root_path: str = "") -> Dict[str, Any]: """ Get the complete folder structure. """ try: # Get all folder documents folder_query = db.query(Document).filter( and_( Document.organization_id == self.tenant.id, Document.mime_type == "application/x-folder" ) ) folders = folder_query.all() # Build hierarchical structure folder_tree = self._build_folder_tree(folders, root_path) return { "root_path": root_path, "folders": folder_tree, "total_folders": len(folders) } except Exception as e: logger.error(f"Error getting folder structure: {str(e)}") return {"root_path": root_path, "folders": [], "total_folders": 0} async def auto_categorize_document(self, db: Session, document: Document) -> List[str]: """ Automatically categorize a document based on its type and content. """ try: categories = [] # Add default categories based on document type if document.document_type in self.default_categories: categories.extend(self.default_categories[document.document_type]) # Add categories based on extracted text content if document.extracted_text: text_categories = await self._extract_categories_from_text(document.extracted_text) categories.extend(text_categories) # Add categories based on metadata if document.document_metadata: metadata_categories = await self._extract_categories_from_metadata(document.document_metadata) categories.extend(metadata_categories) # Remove duplicates and limit to top categories unique_categories = list(set(categories))[:10] return unique_categories except Exception as e: logger.error(f"Error auto-categorizing document {document.id}: {str(e)}") return [] async def create_or_get_tag(self, db: Session, tag_name: str, description: str = None, color: str = None) -> DocumentTag: """ Create a new tag or get existing one. """ try: # Check if tag already exists tag = db.query(DocumentTag).filter( and_( DocumentTag.name == tag_name, # In a real implementation, you'd have tenant_id in DocumentTag ) ).first() if not tag: tag = DocumentTag( id=uuid.uuid4(), name=tag_name, description=description or f"Tag: {tag_name}", color=color or "#3B82F6" # Default blue color ) db.add(tag) db.commit() db.refresh(tag) return tag except Exception as e: logger.error(f"Error creating/getting tag {tag_name}: {str(e)}") raise async def add_tags_to_document(self, db: Session, document_id: str, tag_names: List[str]) -> bool: """ Add multiple tags to a document. """ try: document = db.query(Document).filter( and_( Document.id == document_id, Document.organization_id == self.tenant.id ) ).first() if not document: raise ValueError("Document not found") for tag_name in tag_names: tag = await self.create_or_get_tag(db, tag_name.strip()) if tag not in document.tags: document.tags.append(tag) db.commit() return True except Exception as e: logger.error(f"Error adding tags to document {document_id}: {str(e)}") return False async def remove_tags_from_document(self, db: Session, document_id: str, tag_names: List[str]) -> bool: """ Remove tags from a document. """ try: document = db.query(Document).filter( and_( Document.id == document_id, Document.organization_id == self.tenant.id ) ).first() if not document: raise ValueError("Document not found") for tag_name in tag_names: tag = db.query(DocumentTag).filter(DocumentTag.name == tag_name).first() if tag and tag in document.tags: document.tags.remove(tag) db.commit() return True except Exception as e: logger.error(f"Error removing tags from document {document_id}: {str(e)}") return False async def get_documents_by_tags(self, db: Session, tag_names: List[str], skip: int = 0, limit: int = 100) -> Dict[str, Any]: """ Get documents that have specific tags. """ try: query = db.query(Document).filter(Document.organization_id == self.tenant.id) # Add tag filters for tag_name in tag_names: query = query.join(Document.tags).filter(DocumentTag.name == tag_name) total = query.count() documents = query.offset(skip).limit(limit).all() return { "tag_names": tag_names, "documents": [ { "id": str(doc.id), "title": doc.title, "description": doc.description, "document_type": doc.document_type, "filename": doc.filename, "file_size": doc.file_size, "processing_status": doc.processing_status, "created_at": doc.created_at.isoformat(), "tags": [{"id": str(tag.id), "name": tag.name} for tag in doc.tags] } for doc in documents ], "total": total, "skip": skip, "limit": limit } except Exception as e: logger.error(f"Error getting documents by tags {tag_names}: {str(e)}") return {"tag_names": tag_names, "documents": [], "total": 0, "skip": skip, "limit": limit} async def get_popular_tags(self, db: Session, limit: int = 20) -> List[Dict[str, Any]]: """ Get the most popular tags. """ try: # Count tag usage tag_counts = db.query( DocumentTag.name, func.count(DocumentTag.documents).label('count') ).join(DocumentTag.documents).filter( Document.organization_id == self.tenant.id ).group_by(DocumentTag.name).order_by( func.count(DocumentTag.documents).desc() ).limit(limit).all() return [ { "name": tag_name, "count": count, "percentage": round((count / sum(t[1] for t in tag_counts)) * 100, 2) } for tag_name, count in tag_counts ] except Exception as e: logger.error(f"Error getting popular tags: {str(e)}") return [] async def extract_metadata(self, document: Document) -> Dict[str, Any]: """ Extract metadata from document content and structure. """ try: metadata = { "extraction_timestamp": datetime.utcnow().isoformat(), "tenant_id": str(self.tenant.id) } # Extract basic metadata if document.filename: metadata["original_filename"] = document.filename metadata["file_extension"] = Path(document.filename).suffix.lower() # Extract metadata from content if document.extracted_text: text_metadata = await self._extract_text_metadata(document.extracted_text) metadata.update(text_metadata) # Extract metadata from document structure if document.document_metadata: structure_metadata = await self._extract_structure_metadata(document.document_metadata) metadata.update(structure_metadata) return metadata except Exception as e: logger.error(f"Error extracting metadata for document {document.id}: {str(e)}") return {} def _build_folder_tree(self, folders: List[Document], root_path: str) -> List[Dict[str, Any]]: """ Build hierarchical folder tree structure. """ tree = [] for folder in folders: folder_metadata = folder.document_metadata or {} folder_path = folder_metadata.get("path", "") if folder_path.startswith(root_path): relative_path = folder_path[len(root_path):].strip("/") if "/" not in relative_path: # Direct child tree.append({ "id": str(folder.id), "name": folder_metadata.get("name", folder.title), "path": folder_path, "description": folder_metadata.get("description"), "created_at": folder.created_at.isoformat(), "children": self._build_folder_tree(folders, folder_path + "/") }) return tree async def _extract_categories_from_text(self, text: str) -> List[str]: """ Extract categories from document text content. """ categories = [] # Simple keyword-based categorization text_lower = text.lower() # Financial categories if any(word in text_lower for word in ["revenue", "profit", "loss", "financial", "budget", "cost"]): categories.append("Financial") # Risk categories if any(word in text_lower for word in ["risk", "threat", "vulnerability", "compliance", "audit"]): categories.append("Risk & Compliance") # Strategic categories if any(word in text_lower for word in ["strategy", "planning", "objective", "goal", "initiative"]): categories.append("Strategic Planning") # Operational categories if any(word in text_lower for word in ["operation", "process", "procedure", "workflow"]): categories.append("Operations") # Technology categories if any(word in text_lower for word in ["technology", "digital", "system", "platform", "software"]): categories.append("Technology") return categories async def _extract_categories_from_metadata(self, metadata: Dict[str, Any]) -> List[str]: """ Extract categories from document metadata. """ categories = [] # Extract from tables if "tables" in metadata: categories.append("Data & Analytics") # Extract from charts if "charts" in metadata: categories.append("Visualizations") # Extract from images if "images" in metadata: categories.append("Media Content") return categories async def _extract_text_metadata(self, text: str) -> Dict[str, Any]: """ Extract metadata from text content. """ metadata = {} # Word count metadata["word_count"] = len(text.split()) # Character count metadata["character_count"] = len(text) # Line count metadata["line_count"] = len(text.splitlines()) # Language detection (simplified) metadata["language"] = "en" # Default to English # Content type detection text_lower = text.lower() if any(word in text_lower for word in ["board", "director", "governance"]): metadata["content_type"] = "governance" elif any(word in text_lower for word in ["financial", "revenue", "profit"]): metadata["content_type"] = "financial" elif any(word in text_lower for word in ["strategy", "planning", "objective"]): metadata["content_type"] = "strategic" else: metadata["content_type"] = "general" return metadata async def _extract_structure_metadata(self, structure_metadata: Dict[str, Any]) -> Dict[str, Any]: """ Extract metadata from document structure. """ metadata = {} # Page count if "pages" in structure_metadata: metadata["page_count"] = structure_metadata["pages"] # Table count if "tables" in structure_metadata: metadata["table_count"] = len(structure_metadata["tables"]) # Chart count if "charts" in structure_metadata: metadata["chart_count"] = len(structure_metadata["charts"]) # Image count if "images" in structure_metadata: metadata["image_count"] = len(structure_metadata["images"]) return metadata