483 lines
18 KiB
Python
483 lines
18 KiB
Python
"""
|
|
Advanced document processing service with table and graphics extraction capabilities.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from pathlib import Path
|
|
import io
|
|
|
|
import pdfplumber
|
|
import fitz # PyMuPDF
|
|
import pandas as pd
|
|
import numpy as np
|
|
from PIL import Image
|
|
import cv2
|
|
import pytesseract
|
|
from pptx import Presentation
|
|
from pptx.enum.shapes import MSO_SHAPE_TYPE
|
|
import tabula
|
|
import camelot
|
|
|
|
from app.core.config import settings
|
|
from app.models.document import Document, DocumentType
|
|
from app.models.tenant import Tenant
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DocumentProcessor:
|
|
"""Advanced document processor with table and graphics extraction."""
|
|
|
|
def __init__(self, tenant: Tenant):
|
|
self.tenant = tenant
|
|
self.supported_formats = {
|
|
'.pdf': self._process_pdf,
|
|
'.pptx': self._process_powerpoint,
|
|
'.xlsx': self._process_excel,
|
|
'.docx': self._process_word,
|
|
'.txt': self._process_text
|
|
}
|
|
|
|
async def process_document(self, file_path: Path, document: Document) -> Dict[str, Any]:
|
|
"""Process a document and extract all content including tables and graphics."""
|
|
try:
|
|
file_extension = file_path.suffix.lower()
|
|
|
|
if file_extension not in self.supported_formats:
|
|
raise ValueError(f"Unsupported file format: {file_extension}")
|
|
|
|
processor = self.supported_formats[file_extension]
|
|
result = await processor(file_path, document)
|
|
|
|
# Add tenant-specific processing
|
|
result['tenant_id'] = str(self.tenant.id)
|
|
result['tenant_name'] = self.tenant.name
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing document {file_path}: {str(e)}")
|
|
raise
|
|
|
|
async def _process_pdf(self, file_path: Path, document: Document) -> Dict[str, Any]:
|
|
"""Process PDF with advanced table and graphics extraction."""
|
|
result = {
|
|
'text_content': [],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': [],
|
|
'metadata': {},
|
|
'structure': {}
|
|
}
|
|
|
|
try:
|
|
# Use pdfplumber for text and table extraction
|
|
with pdfplumber.open(file_path) as pdf:
|
|
result['metadata']['pages'] = len(pdf.pages)
|
|
result['metadata']['file_size'] = file_path.stat().st_size
|
|
|
|
for page_num, page in enumerate(pdf.pages):
|
|
page_result = await self._extract_pdf_page_content(page, page_num)
|
|
result['text_content'].extend(page_result['text'])
|
|
result['tables'].extend(page_result['tables'])
|
|
result['charts'].extend(page_result['charts'])
|
|
result['images'].extend(page_result['images'])
|
|
|
|
# Use PyMuPDF for additional graphics extraction
|
|
await self._extract_pdf_graphics(file_path, result)
|
|
|
|
# Use tabula for complex table extraction
|
|
await self._extract_pdf_tables_tabula(file_path, result)
|
|
|
|
# Use camelot for lattice table extraction
|
|
await self._extract_pdf_tables_camelot(file_path, result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing PDF {file_path}: {str(e)}")
|
|
raise
|
|
|
|
return result
|
|
|
|
async def _extract_pdf_page_content(self, page, page_num: int) -> Dict[str, Any]:
|
|
"""Extract content from a single PDF page."""
|
|
page_result = {
|
|
'text': [],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': []
|
|
}
|
|
|
|
# Extract text
|
|
text = page.extract_text()
|
|
if text:
|
|
page_result['text'].append({
|
|
'page': page_num + 1,
|
|
'content': text,
|
|
'bbox': page.bbox
|
|
})
|
|
|
|
# Extract tables using pdfplumber
|
|
tables = page.extract_tables()
|
|
for table_num, table in enumerate(tables):
|
|
if table and len(table) > 1: # Ensure table has content
|
|
table_data = {
|
|
'page': page_num + 1,
|
|
'table_number': table_num + 1,
|
|
'data': table,
|
|
'rows': len(table),
|
|
'columns': len(table[0]) if table else 0,
|
|
'extraction_method': 'pdfplumber'
|
|
}
|
|
page_result['tables'].append(table_data)
|
|
|
|
# Extract images
|
|
images = page.images
|
|
for img_num, img in enumerate(images):
|
|
image_data = {
|
|
'page': page_num + 1,
|
|
'image_number': img_num + 1,
|
|
'bbox': img['bbox'],
|
|
'width': img['width'],
|
|
'height': img['height'],
|
|
'type': img.get('name', 'unknown')
|
|
}
|
|
page_result['images'].append(image_data)
|
|
|
|
return page_result
|
|
|
|
async def _extract_pdf_graphics(self, file_path: Path, result: Dict[str, Any]):
|
|
"""Extract graphics and charts from PDF using PyMuPDF."""
|
|
try:
|
|
doc = fitz.open(file_path)
|
|
|
|
for page_num in range(len(doc)):
|
|
page = doc[page_num]
|
|
|
|
# Extract images
|
|
image_list = page.get_images()
|
|
for img_index, img in enumerate(image_list):
|
|
xref = img[0]
|
|
pix = fitz.Pixmap(doc, xref)
|
|
|
|
if pix.n - pix.alpha < 4: # GRAY or RGB
|
|
image_data = {
|
|
'page': page_num + 1,
|
|
'image_number': img_index + 1,
|
|
'width': pix.width,
|
|
'height': pix.height,
|
|
'colorspace': pix.colorspace.name,
|
|
'extraction_method': 'PyMuPDF'
|
|
}
|
|
result['images'].append(image_data)
|
|
|
|
# Extract drawings and shapes
|
|
drawings = page.get_drawings()
|
|
for drawing in drawings:
|
|
if drawing.get('type') == 'l': # Line
|
|
chart_data = {
|
|
'page': page_num + 1,
|
|
'type': 'chart_element',
|
|
'bbox': drawing.get('rect'),
|
|
'extraction_method': 'PyMuPDF'
|
|
}
|
|
result['charts'].append(chart_data)
|
|
|
|
doc.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting PDF graphics: {str(e)}")
|
|
|
|
async def _extract_pdf_tables_tabula(self, file_path: Path, result: Dict[str, Any]):
|
|
"""Extract tables using tabula-py."""
|
|
try:
|
|
tables = tabula.read_pdf(str(file_path), pages='all', multiple_tables=True)
|
|
|
|
for page_num, page_tables in enumerate(tables):
|
|
for table_num, table in enumerate(page_tables):
|
|
if not table.empty:
|
|
table_data = {
|
|
'page': page_num + 1,
|
|
'table_number': table_num + 1,
|
|
'data': table.to_dict('records'),
|
|
'rows': len(table),
|
|
'columns': len(table.columns),
|
|
'extraction_method': 'tabula'
|
|
}
|
|
result['tables'].append(table_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting tables with tabula: {str(e)}")
|
|
|
|
async def _extract_pdf_tables_camelot(self, file_path: Path, result: Dict[str, Any]):
|
|
"""Extract tables using camelot-py."""
|
|
try:
|
|
tables = camelot.read_pdf(str(file_path), pages='all')
|
|
|
|
for table in tables:
|
|
if table.df is not None and not table.df.empty:
|
|
table_data = {
|
|
'page': table.page,
|
|
'table_number': table.order,
|
|
'data': table.df.to_dict('records'),
|
|
'rows': len(table.df),
|
|
'columns': len(table.df.columns),
|
|
'accuracy': table.accuracy,
|
|
'whitespace': table.whitespace,
|
|
'extraction_method': 'camelot'
|
|
}
|
|
result['tables'].append(table_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting tables with camelot: {str(e)}")
|
|
|
|
async def _process_powerpoint(self, file_path: Path, document: Document) -> Dict[str, Any]:
|
|
"""Process PowerPoint with table and graphics extraction."""
|
|
result = {
|
|
'text_content': [],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': [],
|
|
'metadata': {},
|
|
'structure': {}
|
|
}
|
|
|
|
try:
|
|
prs = Presentation(file_path)
|
|
result['metadata']['slides'] = len(prs.slides)
|
|
result['metadata']['file_size'] = file_path.stat().st_size
|
|
|
|
for slide_num, slide in enumerate(prs.slides):
|
|
slide_result = await self._extract_powerpoint_slide_content(slide, slide_num)
|
|
result['text_content'].extend(slide_result['text'])
|
|
result['tables'].extend(slide_result['tables'])
|
|
result['charts'].extend(slide_result['charts'])
|
|
result['images'].extend(slide_result['images'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing PowerPoint {file_path}: {str(e)}")
|
|
raise
|
|
|
|
return result
|
|
|
|
async def _extract_powerpoint_slide_content(self, slide, slide_num: int) -> Dict[str, Any]:
|
|
"""Extract content from a single PowerPoint slide."""
|
|
slide_result = {
|
|
'text': [],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': []
|
|
}
|
|
|
|
for shape in slide.shapes:
|
|
# Extract text
|
|
if hasattr(shape, 'text') and shape.text.strip():
|
|
text_data = {
|
|
'slide': slide_num + 1,
|
|
'content': shape.text.strip(),
|
|
'shape_type': str(shape.shape_type),
|
|
'bbox': (shape.left, shape.top, shape.width, shape.height)
|
|
}
|
|
slide_result['text'].append(text_data)
|
|
|
|
# Extract tables
|
|
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
|
|
table_data = await self._extract_powerpoint_table(shape, slide_num)
|
|
slide_result['tables'].append(table_data)
|
|
|
|
# Extract charts
|
|
elif shape.shape_type == MSO_SHAPE_TYPE.CHART:
|
|
chart_data = await self._extract_powerpoint_chart(shape, slide_num)
|
|
slide_result['charts'].append(chart_data)
|
|
|
|
# Extract images
|
|
elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
|
|
image_data = await self._extract_powerpoint_image(shape, slide_num)
|
|
slide_result['images'].append(image_data)
|
|
|
|
return slide_result
|
|
|
|
async def _extract_powerpoint_table(self, shape, slide_num: int) -> Dict[str, Any]:
|
|
"""Extract table data from PowerPoint shape."""
|
|
table = shape.table
|
|
table_data = []
|
|
|
|
for row in table.rows:
|
|
row_data = []
|
|
for cell in row.cells:
|
|
row_data.append(cell.text.strip())
|
|
table_data.append(row_data)
|
|
|
|
return {
|
|
'slide': slide_num + 1,
|
|
'table_number': 1, # Assuming one table per slide for now
|
|
'data': table_data,
|
|
'rows': len(table_data),
|
|
'columns': len(table_data[0]) if table_data else 0,
|
|
'extraction_method': 'python-pptx'
|
|
}
|
|
|
|
async def _extract_powerpoint_chart(self, shape, slide_num: int) -> Dict[str, Any]:
|
|
"""Extract chart data from PowerPoint shape."""
|
|
chart = shape.chart
|
|
|
|
chart_data = {
|
|
'slide': slide_num + 1,
|
|
'chart_type': str(chart.chart_type),
|
|
'title': chart.chart_title.text if chart.chart_title else '',
|
|
'bbox': (shape.left, shape.top, shape.width, shape.height),
|
|
'extraction_method': 'python-pptx'
|
|
}
|
|
|
|
# Extract chart data if available
|
|
if hasattr(chart, 'part') and chart.part:
|
|
# This would require additional processing to extract actual chart data
|
|
chart_data['has_data'] = True
|
|
|
|
return chart_data
|
|
|
|
async def _extract_powerpoint_image(self, shape, slide_num: int) -> Dict[str, Any]:
|
|
"""Extract image data from PowerPoint shape."""
|
|
image = shape.image
|
|
|
|
image_data = {
|
|
'slide': slide_num + 1,
|
|
'image_number': 1, # Assuming one image per shape
|
|
'width': shape.width,
|
|
'height': shape.height,
|
|
'bbox': (shape.left, shape.top, shape.width, shape.height),
|
|
'extraction_method': 'python-pptx'
|
|
}
|
|
|
|
return image_data
|
|
|
|
async def _process_excel(self, file_path: Path, document: Document) -> Dict[str, Any]:
|
|
"""Process Excel file with table extraction."""
|
|
result = {
|
|
'text_content': [],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': [],
|
|
'metadata': {},
|
|
'structure': {}
|
|
}
|
|
|
|
try:
|
|
# Read all sheets
|
|
excel_file = pd.ExcelFile(file_path)
|
|
result['metadata']['sheets'] = excel_file.sheet_names
|
|
result['metadata']['file_size'] = file_path.stat().st_size
|
|
|
|
for sheet_name in excel_file.sheet_names:
|
|
df = pd.read_excel(file_path, sheet_name=sheet_name)
|
|
|
|
if not df.empty:
|
|
table_data = {
|
|
'sheet': sheet_name,
|
|
'table_number': 1,
|
|
'data': df.to_dict('records'),
|
|
'rows': len(df),
|
|
'columns': len(df.columns),
|
|
'extraction_method': 'pandas'
|
|
}
|
|
result['tables'].append(table_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Excel {file_path}: {str(e)}")
|
|
raise
|
|
|
|
return result
|
|
|
|
async def _process_word(self, file_path: Path, document: Document) -> Dict[str, Any]:
|
|
"""Process Word document."""
|
|
# TODO: Implement Word document processing
|
|
return {
|
|
'text_content': [],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': [],
|
|
'metadata': {},
|
|
'structure': {}
|
|
}
|
|
|
|
async def _process_text(self, file_path: Path, document: Document) -> Dict[str, Any]:
|
|
"""Process text file."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
return {
|
|
'text_content': [{'content': content, 'page': 1}],
|
|
'tables': [],
|
|
'charts': [],
|
|
'images': [],
|
|
'metadata': {'file_size': file_path.stat().st_size},
|
|
'structure': {}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error processing text file {file_path}: {str(e)}")
|
|
raise
|
|
|
|
def analyze_table_structure(self, table_data: List[List[str]]) -> Dict[str, Any]:
|
|
"""Analyze table structure and extract metadata."""
|
|
if not table_data or len(table_data) < 2:
|
|
return {}
|
|
|
|
analysis = {
|
|
'header_row': table_data[0] if table_data else [],
|
|
'data_rows': len(table_data) - 1,
|
|
'columns': len(table_data[0]) if table_data else 0,
|
|
'column_types': [],
|
|
'has_numeric_data': False,
|
|
'has_date_data': False
|
|
}
|
|
|
|
# Analyze column types
|
|
if len(table_data) > 1:
|
|
for col_idx in range(len(table_data[0])):
|
|
col_values = [row[col_idx] for row in table_data[1:] if len(row) > col_idx]
|
|
col_type = self._infer_column_type(col_values)
|
|
analysis['column_types'].append(col_type)
|
|
|
|
if col_type == 'numeric':
|
|
analysis['has_numeric_data'] = True
|
|
elif col_type == 'date':
|
|
analysis['has_date_data'] = True
|
|
|
|
return analysis
|
|
|
|
def _infer_column_type(self, values: List[str]) -> str:
|
|
"""Infer the data type of a column."""
|
|
if not values:
|
|
return 'text'
|
|
|
|
numeric_count = 0
|
|
date_count = 0
|
|
|
|
for value in values:
|
|
if value and value.strip():
|
|
# Check if numeric
|
|
try:
|
|
float(value.replace(',', '').replace('$', '').replace('%', ''))
|
|
numeric_count += 1
|
|
except ValueError:
|
|
pass
|
|
|
|
# Check if date (basic check)
|
|
if any(separator in value for separator in ['/', '-', '.']):
|
|
date_count += 1
|
|
|
|
total = len([v for v in values if v and v.strip()])
|
|
if total == 0:
|
|
return 'text'
|
|
|
|
numeric_ratio = numeric_count / total
|
|
date_ratio = date_count / total
|
|
|
|
if numeric_ratio > 0.8:
|
|
return 'numeric'
|
|
elif date_ratio > 0.8:
|
|
return 'date'
|
|
else:
|
|
return 'text'
|