""" Advanced document processing service with table and graphics extraction capabilities. """ import asyncio import logging from typing import Dict, List, Optional, Tuple, Any from pathlib import Path import io import pdfplumber import fitz # PyMuPDF import pandas as pd import numpy as np from PIL import Image import cv2 import pytesseract from pptx import Presentation from pptx.enum.shapes import MSO_SHAPE_TYPE import tabula import camelot from app.core.config import settings from app.models.document import Document, DocumentType from app.models.tenant import Tenant logger = logging.getLogger(__name__) class DocumentProcessor: """Advanced document processor with table and graphics extraction.""" def __init__(self, tenant: Tenant): self.tenant = tenant self.supported_formats = { '.pdf': self._process_pdf, '.pptx': self._process_powerpoint, '.xlsx': self._process_excel, '.docx': self._process_word, '.txt': self._process_text } async def process_document(self, file_path: Path, document: Document) -> Dict[str, Any]: """Process a document and extract all content including tables and graphics.""" try: file_extension = file_path.suffix.lower() if file_extension not in self.supported_formats: raise ValueError(f"Unsupported file format: {file_extension}") processor = self.supported_formats[file_extension] result = await processor(file_path, document) # Add tenant-specific processing result['tenant_id'] = str(self.tenant.id) result['tenant_name'] = self.tenant.name return result except Exception as e: logger.error(f"Error processing document {file_path}: {str(e)}") raise async def _process_pdf(self, file_path: Path, document: Document) -> Dict[str, Any]: """Process PDF with advanced table and graphics extraction.""" result = { 'text_content': [], 'tables': [], 'charts': [], 'images': [], 'metadata': {}, 'structure': {} } try: # Use pdfplumber for text and table extraction with pdfplumber.open(file_path) as pdf: result['metadata']['pages'] = len(pdf.pages) result['metadata']['file_size'] = file_path.stat().st_size for page_num, page in enumerate(pdf.pages): page_result = await self._extract_pdf_page_content(page, page_num) result['text_content'].extend(page_result['text']) result['tables'].extend(page_result['tables']) result['charts'].extend(page_result['charts']) result['images'].extend(page_result['images']) # Use PyMuPDF for additional graphics extraction await self._extract_pdf_graphics(file_path, result) # Use tabula for complex table extraction await self._extract_pdf_tables_tabula(file_path, result) # Use camelot for lattice table extraction await self._extract_pdf_tables_camelot(file_path, result) except Exception as e: logger.error(f"Error processing PDF {file_path}: {str(e)}") raise return result async def _extract_pdf_page_content(self, page, page_num: int) -> Dict[str, Any]: """Extract content from a single PDF page.""" page_result = { 'text': [], 'tables': [], 'charts': [], 'images': [] } # Extract text text = page.extract_text() if text: page_result['text'].append({ 'page': page_num + 1, 'content': text, 'bbox': page.bbox }) # Extract tables using pdfplumber tables = page.extract_tables() for table_num, table in enumerate(tables): if table and len(table) > 1: # Ensure table has content table_data = { 'page': page_num + 1, 'table_number': table_num + 1, 'data': table, 'rows': len(table), 'columns': len(table[0]) if table else 0, 'extraction_method': 'pdfplumber' } page_result['tables'].append(table_data) # Extract images images = page.images for img_num, img in enumerate(images): image_data = { 'page': page_num + 1, 'image_number': img_num + 1, 'bbox': img['bbox'], 'width': img['width'], 'height': img['height'], 'type': img.get('name', 'unknown') } page_result['images'].append(image_data) return page_result async def _extract_pdf_graphics(self, file_path: Path, result: Dict[str, Any]): """Extract graphics and charts from PDF using PyMuPDF.""" try: doc = fitz.open(file_path) for page_num in range(len(doc)): page = doc[page_num] # Extract images image_list = page.get_images() for img_index, img in enumerate(image_list): xref = img[0] pix = fitz.Pixmap(doc, xref) if pix.n - pix.alpha < 4: # GRAY or RGB image_data = { 'page': page_num + 1, 'image_number': img_index + 1, 'width': pix.width, 'height': pix.height, 'colorspace': pix.colorspace.name, 'extraction_method': 'PyMuPDF' } result['images'].append(image_data) # Extract drawings and shapes drawings = page.get_drawings() for drawing in drawings: if drawing.get('type') == 'l': # Line chart_data = { 'page': page_num + 1, 'type': 'chart_element', 'bbox': drawing.get('rect'), 'extraction_method': 'PyMuPDF' } result['charts'].append(chart_data) doc.close() except Exception as e: logger.error(f"Error extracting PDF graphics: {str(e)}") async def _extract_pdf_tables_tabula(self, file_path: Path, result: Dict[str, Any]): """Extract tables using tabula-py.""" try: tables = tabula.read_pdf(str(file_path), pages='all', multiple_tables=True) for page_num, page_tables in enumerate(tables): for table_num, table in enumerate(page_tables): if not table.empty: table_data = { 'page': page_num + 1, 'table_number': table_num + 1, 'data': table.to_dict('records'), 'rows': len(table), 'columns': len(table.columns), 'extraction_method': 'tabula' } result['tables'].append(table_data) except Exception as e: logger.error(f"Error extracting tables with tabula: {str(e)}") async def _extract_pdf_tables_camelot(self, file_path: Path, result: Dict[str, Any]): """Extract tables using camelot-py.""" try: tables = camelot.read_pdf(str(file_path), pages='all') for table in tables: if table.df is not None and not table.df.empty: table_data = { 'page': table.page, 'table_number': table.order, 'data': table.df.to_dict('records'), 'rows': len(table.df), 'columns': len(table.df.columns), 'accuracy': table.accuracy, 'whitespace': table.whitespace, 'extraction_method': 'camelot' } result['tables'].append(table_data) except Exception as e: logger.error(f"Error extracting tables with camelot: {str(e)}") async def _process_powerpoint(self, file_path: Path, document: Document) -> Dict[str, Any]: """Process PowerPoint with table and graphics extraction.""" result = { 'text_content': [], 'tables': [], 'charts': [], 'images': [], 'metadata': {}, 'structure': {} } try: prs = Presentation(file_path) result['metadata']['slides'] = len(prs.slides) result['metadata']['file_size'] = file_path.stat().st_size for slide_num, slide in enumerate(prs.slides): slide_result = await self._extract_powerpoint_slide_content(slide, slide_num) result['text_content'].extend(slide_result['text']) result['tables'].extend(slide_result['tables']) result['charts'].extend(slide_result['charts']) result['images'].extend(slide_result['images']) except Exception as e: logger.error(f"Error processing PowerPoint {file_path}: {str(e)}") raise return result async def _extract_powerpoint_slide_content(self, slide, slide_num: int) -> Dict[str, Any]: """Extract content from a single PowerPoint slide.""" slide_result = { 'text': [], 'tables': [], 'charts': [], 'images': [] } for shape in slide.shapes: # Extract text if hasattr(shape, 'text') and shape.text.strip(): text_data = { 'slide': slide_num + 1, 'content': shape.text.strip(), 'shape_type': str(shape.shape_type), 'bbox': (shape.left, shape.top, shape.width, shape.height) } slide_result['text'].append(text_data) # Extract tables if shape.shape_type == MSO_SHAPE_TYPE.TABLE: table_data = await self._extract_powerpoint_table(shape, slide_num) slide_result['tables'].append(table_data) # Extract charts elif shape.shape_type == MSO_SHAPE_TYPE.CHART: chart_data = await self._extract_powerpoint_chart(shape, slide_num) slide_result['charts'].append(chart_data) # Extract images elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE: image_data = await self._extract_powerpoint_image(shape, slide_num) slide_result['images'].append(image_data) return slide_result async def _extract_powerpoint_table(self, shape, slide_num: int) -> Dict[str, Any]: """Extract table data from PowerPoint shape.""" table = shape.table table_data = [] for row in table.rows: row_data = [] for cell in row.cells: row_data.append(cell.text.strip()) table_data.append(row_data) return { 'slide': slide_num + 1, 'table_number': 1, # Assuming one table per slide for now 'data': table_data, 'rows': len(table_data), 'columns': len(table_data[0]) if table_data else 0, 'extraction_method': 'python-pptx' } async def _extract_powerpoint_chart(self, shape, slide_num: int) -> Dict[str, Any]: """Extract chart data from PowerPoint shape.""" chart = shape.chart chart_data = { 'slide': slide_num + 1, 'chart_type': str(chart.chart_type), 'title': chart.chart_title.text if chart.chart_title else '', 'bbox': (shape.left, shape.top, shape.width, shape.height), 'extraction_method': 'python-pptx' } # Extract chart data if available if hasattr(chart, 'part') and chart.part: # This would require additional processing to extract actual chart data chart_data['has_data'] = True return chart_data async def _extract_powerpoint_image(self, shape, slide_num: int) -> Dict[str, Any]: """Extract image data from PowerPoint shape.""" image = shape.image image_data = { 'slide': slide_num + 1, 'image_number': 1, # Assuming one image per shape 'width': shape.width, 'height': shape.height, 'bbox': (shape.left, shape.top, shape.width, shape.height), 'extraction_method': 'python-pptx' } return image_data async def _process_excel(self, file_path: Path, document: Document) -> Dict[str, Any]: """Process Excel file with table extraction.""" result = { 'text_content': [], 'tables': [], 'charts': [], 'images': [], 'metadata': {}, 'structure': {} } try: # Read all sheets excel_file = pd.ExcelFile(file_path) result['metadata']['sheets'] = excel_file.sheet_names result['metadata']['file_size'] = file_path.stat().st_size for sheet_name in excel_file.sheet_names: df = pd.read_excel(file_path, sheet_name=sheet_name) if not df.empty: table_data = { 'sheet': sheet_name, 'table_number': 1, 'data': df.to_dict('records'), 'rows': len(df), 'columns': len(df.columns), 'extraction_method': 'pandas' } result['tables'].append(table_data) except Exception as e: logger.error(f"Error processing Excel {file_path}: {str(e)}") raise return result async def _process_word(self, file_path: Path, document: Document) -> Dict[str, Any]: """Process Word document.""" # TODO: Implement Word document processing return { 'text_content': [], 'tables': [], 'charts': [], 'images': [], 'metadata': {}, 'structure': {} } async def _process_text(self, file_path: Path, document: Document) -> Dict[str, Any]: """Process text file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return { 'text_content': [{'content': content, 'page': 1}], 'tables': [], 'charts': [], 'images': [], 'metadata': {'file_size': file_path.stat().st_size}, 'structure': {} } except Exception as e: logger.error(f"Error processing text file {file_path}: {str(e)}") raise def analyze_table_structure(self, table_data: List[List[str]]) -> Dict[str, Any]: """Analyze table structure and extract metadata.""" if not table_data or len(table_data) < 2: return {} analysis = { 'header_row': table_data[0] if table_data else [], 'data_rows': len(table_data) - 1, 'columns': len(table_data[0]) if table_data else 0, 'column_types': [], 'has_numeric_data': False, 'has_date_data': False } # Analyze column types if len(table_data) > 1: for col_idx in range(len(table_data[0])): col_values = [row[col_idx] for row in table_data[1:] if len(row) > col_idx] col_type = self._infer_column_type(col_values) analysis['column_types'].append(col_type) if col_type == 'numeric': analysis['has_numeric_data'] = True elif col_type == 'date': analysis['has_date_data'] = True return analysis def _infer_column_type(self, values: List[str]) -> str: """Infer the data type of a column.""" if not values: return 'text' numeric_count = 0 date_count = 0 for value in values: if value and value.strip(): # Check if numeric try: float(value.replace(',', '').replace('$', '').replace('%', '')) numeric_count += 1 except ValueError: pass # Check if date (basic check) if any(separator in value for separator in ['/', '-', '.']): date_count += 1 total = len([v for v in values if v and v.strip()]) if total == 0: return 'text' numeric_ratio = numeric_count / total date_ratio = date_count / total if numeric_ratio > 0.8: return 'numeric' elif date_ratio > 0.8: return 'date' else: return 'text'