Files
virtual_board_member/app/api/v1/endpoints/vector_operations.py
2025-08-08 17:17:56 -04:00

376 lines
12 KiB
Python

"""
Vector database operations endpoints for the Virtual Board Member AI System.
Implements Week 3 functionality for vector search, indexing, and performance monitoring.
"""
import logging
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from app.core.auth import get_current_user
from app.models.user import User
from app.models.tenant import Tenant
from app.services.vector_service import vector_service
from app.services.document_chunking import DocumentChunkingService
logger = logging.getLogger(__name__)
router = APIRouter()
class SearchRequest(BaseModel):
"""Request model for vector search operations."""
query: str
limit: int = 10
score_threshold: float = 0.7
chunk_types: Optional[List[str]] = None
filters: Optional[Dict[str, Any]] = None
class StructuredDataSearchRequest(BaseModel):
"""Request model for structured data search."""
query: str
data_type: str = "table" # "table" or "chart"
limit: int = 10
score_threshold: float = 0.7
filters: Optional[Dict[str, Any]] = None
class HybridSearchRequest(BaseModel):
"""Request model for hybrid search operations."""
query: str
limit: int = 10
score_threshold: float = 0.7
semantic_weight: float = 0.7
keyword_weight: float = 0.3
filters: Optional[Dict[str, Any]] = None
class DocumentChunkingRequest(BaseModel):
"""Request model for document chunking operations."""
document_id: str
content: Dict[str, Any]
class SearchResponse(BaseModel):
"""Response model for search operations."""
results: List[Dict[str, Any]]
total_results: int
query: str
search_type: str
execution_time_ms: float
class PerformanceMetricsResponse(BaseModel):
"""Response model for performance metrics."""
tenant_id: str
timestamp: str
collections: Dict[str, Any]
embedding_model: str
embedding_dimension: int
class BenchmarkResponse(BaseModel):
"""Response model for performance benchmarks."""
tenant_id: str
timestamp: str
results: Dict[str, Any]
@router.post("/search", response_model=SearchResponse)
async def search_documents(
request: SearchRequest,
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Search documents using semantic similarity."""
try:
import time
start_time = time.time()
results = await vector_service.search_similar(
tenant_id=str(tenant.id),
query=request.query,
limit=request.limit,
score_threshold=request.score_threshold,
chunk_types=request.chunk_types,
filters=request.filters
)
execution_time = (time.time() - start_time) * 1000
return SearchResponse(
results=results,
total_results=len(results),
query=request.query,
search_type="semantic",
execution_time_ms=round(execution_time, 2)
)
except Exception as e:
logger.error(f"Search failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
@router.post("/search/structured", response_model=SearchResponse)
async def search_structured_data(
request: StructuredDataSearchRequest,
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Search specifically for structured data (tables and charts)."""
try:
import time
start_time = time.time()
results = await vector_service.search_structured_data(
tenant_id=str(tenant.id),
query=request.query,
data_type=request.data_type,
limit=request.limit,
score_threshold=request.score_threshold,
filters=request.filters
)
execution_time = (time.time() - start_time) * 1000
return SearchResponse(
results=results,
total_results=len(results),
query=request.query,
search_type=f"structured_{request.data_type}",
execution_time_ms=round(execution_time, 2)
)
except Exception as e:
logger.error(f"Structured data search failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Structured data search failed: {str(e)}")
@router.post("/search/hybrid", response_model=SearchResponse)
async def hybrid_search(
request: HybridSearchRequest,
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Perform hybrid search combining semantic and keyword matching."""
try:
import time
start_time = time.time()
results = await vector_service.hybrid_search(
tenant_id=str(tenant.id),
query=request.query,
limit=request.limit,
score_threshold=request.score_threshold,
filters=request.filters,
semantic_weight=request.semantic_weight,
keyword_weight=request.keyword_weight
)
execution_time = (time.time() - start_time) * 1000
return SearchResponse(
results=results,
total_results=len(results),
query=request.query,
search_type="hybrid",
execution_time_ms=round(execution_time, 2)
)
except Exception as e:
logger.error(f"Hybrid search failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Hybrid search failed: {str(e)}")
@router.post("/chunk-document")
async def chunk_document(
request: DocumentChunkingRequest,
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Chunk a document for vector indexing."""
try:
chunking_service = DocumentChunkingService(tenant)
chunks = await chunking_service.chunk_document_content(
document_id=request.document_id,
content=request.content
)
# Get chunking statistics
statistics = await chunking_service.get_chunk_statistics(chunks)
return {
"document_id": request.document_id,
"chunks": chunks,
"statistics": statistics,
"status": "success"
}
except Exception as e:
logger.error(f"Document chunking failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Document chunking failed: {str(e)}")
@router.post("/index-document")
async def index_document(
document_id: str,
chunks: Dict[str, List[Dict[str, Any]]],
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Index document chunks in the vector database."""
try:
success = await vector_service.add_document_vectors(
tenant_id=str(tenant.id),
document_id=document_id,
chunks=chunks
)
if success:
return {
"document_id": document_id,
"status": "indexed",
"message": "Document successfully indexed in vector database"
}
else:
raise HTTPException(status_code=500, detail="Failed to index document")
except Exception as e:
logger.error(f"Document indexing failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Document indexing failed: {str(e)}")
@router.get("/collections/stats")
async def get_collection_statistics(
collection_type: str = Query("documents", description="Type of collection"),
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Get statistics for a specific collection."""
try:
stats = await vector_service.get_collection_stats(
tenant_id=str(tenant.id),
collection_type=collection_type
)
if stats:
return stats
else:
raise HTTPException(status_code=404, detail="Collection not found")
except Exception as e:
logger.error(f"Failed to get collection stats: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get collection stats: {str(e)}")
@router.get("/performance/metrics", response_model=PerformanceMetricsResponse)
async def get_performance_metrics(
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Get performance metrics for vector database operations."""
try:
metrics = await vector_service.get_performance_metrics(str(tenant.id))
if "error" in metrics:
raise HTTPException(status_code=500, detail=metrics["error"])
return PerformanceMetricsResponse(**metrics)
except Exception as e:
logger.error(f"Failed to get performance metrics: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get performance metrics: {str(e)}")
@router.post("/performance/benchmarks", response_model=BenchmarkResponse)
async def create_performance_benchmarks(
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Create performance benchmarks for vector operations."""
try:
benchmarks = await vector_service.create_performance_benchmarks(str(tenant.id))
if "error" in benchmarks:
raise HTTPException(status_code=500, detail=benchmarks["error"])
return BenchmarkResponse(**benchmarks)
except Exception as e:
logger.error(f"Failed to create performance benchmarks: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to create performance benchmarks: {str(e)}")
@router.post("/optimize")
async def optimize_collections(
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Optimize vector database collections for performance."""
try:
optimization_results = await vector_service.optimize_collections(str(tenant.id))
if "error" in optimization_results:
raise HTTPException(status_code=500, detail=optimization_results["error"])
return {
"tenant_id": str(tenant.id),
"optimization_results": optimization_results,
"status": "optimization_completed"
}
except Exception as e:
logger.error(f"Collection optimization failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Collection optimization failed: {str(e)}")
@router.delete("/documents/{document_id}")
async def delete_document_vectors(
document_id: str,
collection_type: str = Query("documents", description="Type of collection"),
current_user: User = Depends(get_current_user),
tenant: Tenant = Depends(get_current_user)
):
"""Delete all vectors for a specific document."""
try:
success = await vector_service.delete_document_vectors(
tenant_id=str(tenant.id),
document_id=document_id,
collection_type=collection_type
)
if success:
return {
"document_id": document_id,
"status": "deleted",
"message": "Document vectors successfully deleted"
}
else:
raise HTTPException(status_code=500, detail="Failed to delete document vectors")
except Exception as e:
logger.error(f"Failed to delete document vectors: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete document vectors: {str(e)}")
@router.get("/health")
async def vector_service_health():
"""Check the health of the vector service."""
try:
is_healthy = await vector_service.health_check()
if is_healthy:
return {
"status": "healthy",
"service": "vector_database",
"embedding_model": vector_service.embedding_model.__class__.__name__ if vector_service.embedding_model else "Voyage-3-large API"
}
else:
raise HTTPException(status_code=503, detail="Vector service is unhealthy")
except Exception as e:
logger.error(f"Vector service health check failed: {str(e)}")
raise HTTPException(status_code=503, detail=f"Vector service health check failed: {str(e)}")