Files
virtual_board_member/app/services/document_processor.py

483 lines
18 KiB
Python

"""
Advanced document processing service with table and graphics extraction capabilities.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Tuple, Any
from pathlib import Path
import io
import pdfplumber
import fitz # PyMuPDF
import pandas as pd
import numpy as np
from PIL import Image
import cv2
import pytesseract
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
import tabula
import camelot
from app.core.config import settings
from app.models.document import Document, DocumentType
from app.models.tenant import Tenant
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""Advanced document processor with table and graphics extraction."""
def __init__(self, tenant: Tenant):
self.tenant = tenant
self.supported_formats = {
'.pdf': self._process_pdf,
'.pptx': self._process_powerpoint,
'.xlsx': self._process_excel,
'.docx': self._process_word,
'.txt': self._process_text
}
async def process_document(self, file_path: Path, document: Document) -> Dict[str, Any]:
"""Process a document and extract all content including tables and graphics."""
try:
file_extension = file_path.suffix.lower()
if file_extension not in self.supported_formats:
raise ValueError(f"Unsupported file format: {file_extension}")
processor = self.supported_formats[file_extension]
result = await processor(file_path, document)
# Add tenant-specific processing
result['tenant_id'] = str(self.tenant.id)
result['tenant_name'] = self.tenant.name
return result
except Exception as e:
logger.error(f"Error processing document {file_path}: {str(e)}")
raise
async def _process_pdf(self, file_path: Path, document: Document) -> Dict[str, Any]:
"""Process PDF with advanced table and graphics extraction."""
result = {
'text_content': [],
'tables': [],
'charts': [],
'images': [],
'metadata': {},
'structure': {}
}
try:
# Use pdfplumber for text and table extraction
with pdfplumber.open(file_path) as pdf:
result['metadata']['pages'] = len(pdf.pages)
result['metadata']['file_size'] = file_path.stat().st_size
for page_num, page in enumerate(pdf.pages):
page_result = await self._extract_pdf_page_content(page, page_num)
result['text_content'].extend(page_result['text'])
result['tables'].extend(page_result['tables'])
result['charts'].extend(page_result['charts'])
result['images'].extend(page_result['images'])
# Use PyMuPDF for additional graphics extraction
await self._extract_pdf_graphics(file_path, result)
# Use tabula for complex table extraction
await self._extract_pdf_tables_tabula(file_path, result)
# Use camelot for lattice table extraction
await self._extract_pdf_tables_camelot(file_path, result)
except Exception as e:
logger.error(f"Error processing PDF {file_path}: {str(e)}")
raise
return result
async def _extract_pdf_page_content(self, page, page_num: int) -> Dict[str, Any]:
"""Extract content from a single PDF page."""
page_result = {
'text': [],
'tables': [],
'charts': [],
'images': []
}
# Extract text
text = page.extract_text()
if text:
page_result['text'].append({
'page': page_num + 1,
'content': text,
'bbox': page.bbox
})
# Extract tables using pdfplumber
tables = page.extract_tables()
for table_num, table in enumerate(tables):
if table and len(table) > 1: # Ensure table has content
table_data = {
'page': page_num + 1,
'table_number': table_num + 1,
'data': table,
'rows': len(table),
'columns': len(table[0]) if table else 0,
'extraction_method': 'pdfplumber'
}
page_result['tables'].append(table_data)
# Extract images
images = page.images
for img_num, img in enumerate(images):
image_data = {
'page': page_num + 1,
'image_number': img_num + 1,
'bbox': img['bbox'],
'width': img['width'],
'height': img['height'],
'type': img.get('name', 'unknown')
}
page_result['images'].append(image_data)
return page_result
async def _extract_pdf_graphics(self, file_path: Path, result: Dict[str, Any]):
"""Extract graphics and charts from PDF using PyMuPDF."""
try:
doc = fitz.open(file_path)
for page_num in range(len(doc)):
page = doc[page_num]
# Extract images
image_list = page.get_images()
for img_index, img in enumerate(image_list):
xref = img[0]
pix = fitz.Pixmap(doc, xref)
if pix.n - pix.alpha < 4: # GRAY or RGB
image_data = {
'page': page_num + 1,
'image_number': img_index + 1,
'width': pix.width,
'height': pix.height,
'colorspace': pix.colorspace.name,
'extraction_method': 'PyMuPDF'
}
result['images'].append(image_data)
# Extract drawings and shapes
drawings = page.get_drawings()
for drawing in drawings:
if drawing.get('type') == 'l': # Line
chart_data = {
'page': page_num + 1,
'type': 'chart_element',
'bbox': drawing.get('rect'),
'extraction_method': 'PyMuPDF'
}
result['charts'].append(chart_data)
doc.close()
except Exception as e:
logger.error(f"Error extracting PDF graphics: {str(e)}")
async def _extract_pdf_tables_tabula(self, file_path: Path, result: Dict[str, Any]):
"""Extract tables using tabula-py."""
try:
tables = tabula.read_pdf(str(file_path), pages='all', multiple_tables=True)
for page_num, page_tables in enumerate(tables):
for table_num, table in enumerate(page_tables):
if not table.empty:
table_data = {
'page': page_num + 1,
'table_number': table_num + 1,
'data': table.to_dict('records'),
'rows': len(table),
'columns': len(table.columns),
'extraction_method': 'tabula'
}
result['tables'].append(table_data)
except Exception as e:
logger.error(f"Error extracting tables with tabula: {str(e)}")
async def _extract_pdf_tables_camelot(self, file_path: Path, result: Dict[str, Any]):
"""Extract tables using camelot-py."""
try:
tables = camelot.read_pdf(str(file_path), pages='all')
for table in tables:
if table.df is not None and not table.df.empty:
table_data = {
'page': table.page,
'table_number': table.order,
'data': table.df.to_dict('records'),
'rows': len(table.df),
'columns': len(table.df.columns),
'accuracy': table.accuracy,
'whitespace': table.whitespace,
'extraction_method': 'camelot'
}
result['tables'].append(table_data)
except Exception as e:
logger.error(f"Error extracting tables with camelot: {str(e)}")
async def _process_powerpoint(self, file_path: Path, document: Document) -> Dict[str, Any]:
"""Process PowerPoint with table and graphics extraction."""
result = {
'text_content': [],
'tables': [],
'charts': [],
'images': [],
'metadata': {},
'structure': {}
}
try:
prs = Presentation(file_path)
result['metadata']['slides'] = len(prs.slides)
result['metadata']['file_size'] = file_path.stat().st_size
for slide_num, slide in enumerate(prs.slides):
slide_result = await self._extract_powerpoint_slide_content(slide, slide_num)
result['text_content'].extend(slide_result['text'])
result['tables'].extend(slide_result['tables'])
result['charts'].extend(slide_result['charts'])
result['images'].extend(slide_result['images'])
except Exception as e:
logger.error(f"Error processing PowerPoint {file_path}: {str(e)}")
raise
return result
async def _extract_powerpoint_slide_content(self, slide, slide_num: int) -> Dict[str, Any]:
"""Extract content from a single PowerPoint slide."""
slide_result = {
'text': [],
'tables': [],
'charts': [],
'images': []
}
for shape in slide.shapes:
# Extract text
if hasattr(shape, 'text') and shape.text.strip():
text_data = {
'slide': slide_num + 1,
'content': shape.text.strip(),
'shape_type': str(shape.shape_type),
'bbox': (shape.left, shape.top, shape.width, shape.height)
}
slide_result['text'].append(text_data)
# Extract tables
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
table_data = await self._extract_powerpoint_table(shape, slide_num)
slide_result['tables'].append(table_data)
# Extract charts
elif shape.shape_type == MSO_SHAPE_TYPE.CHART:
chart_data = await self._extract_powerpoint_chart(shape, slide_num)
slide_result['charts'].append(chart_data)
# Extract images
elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
image_data = await self._extract_powerpoint_image(shape, slide_num)
slide_result['images'].append(image_data)
return slide_result
async def _extract_powerpoint_table(self, shape, slide_num: int) -> Dict[str, Any]:
"""Extract table data from PowerPoint shape."""
table = shape.table
table_data = []
for row in table.rows:
row_data = []
for cell in row.cells:
row_data.append(cell.text.strip())
table_data.append(row_data)
return {
'slide': slide_num + 1,
'table_number': 1, # Assuming one table per slide for now
'data': table_data,
'rows': len(table_data),
'columns': len(table_data[0]) if table_data else 0,
'extraction_method': 'python-pptx'
}
async def _extract_powerpoint_chart(self, shape, slide_num: int) -> Dict[str, Any]:
"""Extract chart data from PowerPoint shape."""
chart = shape.chart
chart_data = {
'slide': slide_num + 1,
'chart_type': str(chart.chart_type),
'title': chart.chart_title.text if chart.chart_title else '',
'bbox': (shape.left, shape.top, shape.width, shape.height),
'extraction_method': 'python-pptx'
}
# Extract chart data if available
if hasattr(chart, 'part') and chart.part:
# This would require additional processing to extract actual chart data
chart_data['has_data'] = True
return chart_data
async def _extract_powerpoint_image(self, shape, slide_num: int) -> Dict[str, Any]:
"""Extract image data from PowerPoint shape."""
image = shape.image
image_data = {
'slide': slide_num + 1,
'image_number': 1, # Assuming one image per shape
'width': shape.width,
'height': shape.height,
'bbox': (shape.left, shape.top, shape.width, shape.height),
'extraction_method': 'python-pptx'
}
return image_data
async def _process_excel(self, file_path: Path, document: Document) -> Dict[str, Any]:
"""Process Excel file with table extraction."""
result = {
'text_content': [],
'tables': [],
'charts': [],
'images': [],
'metadata': {},
'structure': {}
}
try:
# Read all sheets
excel_file = pd.ExcelFile(file_path)
result['metadata']['sheets'] = excel_file.sheet_names
result['metadata']['file_size'] = file_path.stat().st_size
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
if not df.empty:
table_data = {
'sheet': sheet_name,
'table_number': 1,
'data': df.to_dict('records'),
'rows': len(df),
'columns': len(df.columns),
'extraction_method': 'pandas'
}
result['tables'].append(table_data)
except Exception as e:
logger.error(f"Error processing Excel {file_path}: {str(e)}")
raise
return result
async def _process_word(self, file_path: Path, document: Document) -> Dict[str, Any]:
"""Process Word document."""
# TODO: Implement Word document processing
return {
'text_content': [],
'tables': [],
'charts': [],
'images': [],
'metadata': {},
'structure': {}
}
async def _process_text(self, file_path: Path, document: Document) -> Dict[str, Any]:
"""Process text file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
'text_content': [{'content': content, 'page': 1}],
'tables': [],
'charts': [],
'images': [],
'metadata': {'file_size': file_path.stat().st_size},
'structure': {}
}
except Exception as e:
logger.error(f"Error processing text file {file_path}: {str(e)}")
raise
def analyze_table_structure(self, table_data: List[List[str]]) -> Dict[str, Any]:
"""Analyze table structure and extract metadata."""
if not table_data or len(table_data) < 2:
return {}
analysis = {
'header_row': table_data[0] if table_data else [],
'data_rows': len(table_data) - 1,
'columns': len(table_data[0]) if table_data else 0,
'column_types': [],
'has_numeric_data': False,
'has_date_data': False
}
# Analyze column types
if len(table_data) > 1:
for col_idx in range(len(table_data[0])):
col_values = [row[col_idx] for row in table_data[1:] if len(row) > col_idx]
col_type = self._infer_column_type(col_values)
analysis['column_types'].append(col_type)
if col_type == 'numeric':
analysis['has_numeric_data'] = True
elif col_type == 'date':
analysis['has_date_data'] = True
return analysis
def _infer_column_type(self, values: List[str]) -> str:
"""Infer the data type of a column."""
if not values:
return 'text'
numeric_count = 0
date_count = 0
for value in values:
if value and value.strip():
# Check if numeric
try:
float(value.replace(',', '').replace('$', '').replace('%', ''))
numeric_count += 1
except ValueError:
pass
# Check if date (basic check)
if any(separator in value for separator in ['/', '-', '.']):
date_count += 1
total = len([v for v in values if v and v.strip()])
if total == 0:
return 'text'
numeric_ratio = numeric_count / total
date_ratio = date_count / total
if numeric_ratio > 0.8:
return 'numeric'
elif date_ratio > 0.8:
return 'date'
else:
return 'text'