376 lines
12 KiB
Python
376 lines
12 KiB
Python
"""
|
|
Vector database operations endpoints for the Virtual Board Member AI System.
|
|
Implements Week 3 functionality for vector search, indexing, and performance monitoring.
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
|
|
from app.core.auth import get_current_user
|
|
from app.models.user import User
|
|
from app.models.tenant import Tenant
|
|
from app.services.vector_service import vector_service
|
|
from app.services.document_chunking import DocumentChunkingService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
class SearchRequest(BaseModel):
|
|
"""Request model for vector search operations."""
|
|
query: str
|
|
limit: int = 10
|
|
score_threshold: float = 0.7
|
|
chunk_types: Optional[List[str]] = None
|
|
filters: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class StructuredDataSearchRequest(BaseModel):
|
|
"""Request model for structured data search."""
|
|
query: str
|
|
data_type: str = "table" # "table" or "chart"
|
|
limit: int = 10
|
|
score_threshold: float = 0.7
|
|
filters: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class HybridSearchRequest(BaseModel):
|
|
"""Request model for hybrid search operations."""
|
|
query: str
|
|
limit: int = 10
|
|
score_threshold: float = 0.7
|
|
semantic_weight: float = 0.7
|
|
keyword_weight: float = 0.3
|
|
filters: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class DocumentChunkingRequest(BaseModel):
|
|
"""Request model for document chunking operations."""
|
|
document_id: str
|
|
content: Dict[str, Any]
|
|
|
|
|
|
class SearchResponse(BaseModel):
|
|
"""Response model for search operations."""
|
|
results: List[Dict[str, Any]]
|
|
total_results: int
|
|
query: str
|
|
search_type: str
|
|
execution_time_ms: float
|
|
|
|
|
|
class PerformanceMetricsResponse(BaseModel):
|
|
"""Response model for performance metrics."""
|
|
tenant_id: str
|
|
timestamp: str
|
|
collections: Dict[str, Any]
|
|
embedding_model: str
|
|
embedding_dimension: int
|
|
|
|
|
|
class BenchmarkResponse(BaseModel):
|
|
"""Response model for performance benchmarks."""
|
|
tenant_id: str
|
|
timestamp: str
|
|
results: Dict[str, Any]
|
|
|
|
|
|
@router.post("/search", response_model=SearchResponse)
|
|
async def search_documents(
|
|
request: SearchRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Search documents using semantic similarity."""
|
|
try:
|
|
import time
|
|
start_time = time.time()
|
|
|
|
results = await vector_service.search_similar(
|
|
tenant_id=str(tenant.id),
|
|
query=request.query,
|
|
limit=request.limit,
|
|
score_threshold=request.score_threshold,
|
|
chunk_types=request.chunk_types,
|
|
filters=request.filters
|
|
)
|
|
|
|
execution_time = (time.time() - start_time) * 1000
|
|
|
|
return SearchResponse(
|
|
results=results,
|
|
total_results=len(results),
|
|
query=request.query,
|
|
search_type="semantic",
|
|
execution_time_ms=round(execution_time, 2)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Search failed: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
|
|
|
|
|
|
@router.post("/search/structured", response_model=SearchResponse)
|
|
async def search_structured_data(
|
|
request: StructuredDataSearchRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Search specifically for structured data (tables and charts)."""
|
|
try:
|
|
import time
|
|
start_time = time.time()
|
|
|
|
results = await vector_service.search_structured_data(
|
|
tenant_id=str(tenant.id),
|
|
query=request.query,
|
|
data_type=request.data_type,
|
|
limit=request.limit,
|
|
score_threshold=request.score_threshold,
|
|
filters=request.filters
|
|
)
|
|
|
|
execution_time = (time.time() - start_time) * 1000
|
|
|
|
return SearchResponse(
|
|
results=results,
|
|
total_results=len(results),
|
|
query=request.query,
|
|
search_type=f"structured_{request.data_type}",
|
|
execution_time_ms=round(execution_time, 2)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Structured data search failed: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Structured data search failed: {str(e)}")
|
|
|
|
|
|
@router.post("/search/hybrid", response_model=SearchResponse)
|
|
async def hybrid_search(
|
|
request: HybridSearchRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Perform hybrid search combining semantic and keyword matching."""
|
|
try:
|
|
import time
|
|
start_time = time.time()
|
|
|
|
results = await vector_service.hybrid_search(
|
|
tenant_id=str(tenant.id),
|
|
query=request.query,
|
|
limit=request.limit,
|
|
score_threshold=request.score_threshold,
|
|
filters=request.filters,
|
|
semantic_weight=request.semantic_weight,
|
|
keyword_weight=request.keyword_weight
|
|
)
|
|
|
|
execution_time = (time.time() - start_time) * 1000
|
|
|
|
return SearchResponse(
|
|
results=results,
|
|
total_results=len(results),
|
|
query=request.query,
|
|
search_type="hybrid",
|
|
execution_time_ms=round(execution_time, 2)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Hybrid search failed: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Hybrid search failed: {str(e)}")
|
|
|
|
|
|
@router.post("/chunk-document")
|
|
async def chunk_document(
|
|
request: DocumentChunkingRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Chunk a document for vector indexing."""
|
|
try:
|
|
chunking_service = DocumentChunkingService(tenant)
|
|
|
|
chunks = await chunking_service.chunk_document_content(
|
|
document_id=request.document_id,
|
|
content=request.content
|
|
)
|
|
|
|
# Get chunking statistics
|
|
statistics = await chunking_service.get_chunk_statistics(chunks)
|
|
|
|
return {
|
|
"document_id": request.document_id,
|
|
"chunks": chunks,
|
|
"statistics": statistics,
|
|
"status": "success"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document chunking failed: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Document chunking failed: {str(e)}")
|
|
|
|
|
|
@router.post("/index-document")
|
|
async def index_document(
|
|
document_id: str,
|
|
chunks: Dict[str, List[Dict[str, Any]]],
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Index document chunks in the vector database."""
|
|
try:
|
|
success = await vector_service.add_document_vectors(
|
|
tenant_id=str(tenant.id),
|
|
document_id=document_id,
|
|
chunks=chunks
|
|
)
|
|
|
|
if success:
|
|
return {
|
|
"document_id": document_id,
|
|
"status": "indexed",
|
|
"message": "Document successfully indexed in vector database"
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to index document")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Document indexing failed: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Document indexing failed: {str(e)}")
|
|
|
|
|
|
@router.get("/collections/stats")
|
|
async def get_collection_statistics(
|
|
collection_type: str = Query("documents", description="Type of collection"),
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Get statistics for a specific collection."""
|
|
try:
|
|
stats = await vector_service.get_collection_stats(
|
|
tenant_id=str(tenant.id),
|
|
collection_type=collection_type
|
|
)
|
|
|
|
if stats:
|
|
return stats
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Collection not found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get collection stats: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get collection stats: {str(e)}")
|
|
|
|
|
|
@router.get("/performance/metrics", response_model=PerformanceMetricsResponse)
|
|
async def get_performance_metrics(
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Get performance metrics for vector database operations."""
|
|
try:
|
|
metrics = await vector_service.get_performance_metrics(str(tenant.id))
|
|
|
|
if "error" in metrics:
|
|
raise HTTPException(status_code=500, detail=metrics["error"])
|
|
|
|
return PerformanceMetricsResponse(**metrics)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get performance metrics: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get performance metrics: {str(e)}")
|
|
|
|
|
|
@router.post("/performance/benchmarks", response_model=BenchmarkResponse)
|
|
async def create_performance_benchmarks(
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Create performance benchmarks for vector operations."""
|
|
try:
|
|
benchmarks = await vector_service.create_performance_benchmarks(str(tenant.id))
|
|
|
|
if "error" in benchmarks:
|
|
raise HTTPException(status_code=500, detail=benchmarks["error"])
|
|
|
|
return BenchmarkResponse(**benchmarks)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create performance benchmarks: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to create performance benchmarks: {str(e)}")
|
|
|
|
|
|
@router.post("/optimize")
|
|
async def optimize_collections(
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Optimize vector database collections for performance."""
|
|
try:
|
|
optimization_results = await vector_service.optimize_collections(str(tenant.id))
|
|
|
|
if "error" in optimization_results:
|
|
raise HTTPException(status_code=500, detail=optimization_results["error"])
|
|
|
|
return {
|
|
"tenant_id": str(tenant.id),
|
|
"optimization_results": optimization_results,
|
|
"status": "optimization_completed"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Collection optimization failed: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Collection optimization failed: {str(e)}")
|
|
|
|
|
|
@router.delete("/documents/{document_id}")
|
|
async def delete_document_vectors(
|
|
document_id: str,
|
|
collection_type: str = Query("documents", description="Type of collection"),
|
|
current_user: User = Depends(get_current_user),
|
|
tenant: Tenant = Depends(get_current_user)
|
|
):
|
|
"""Delete all vectors for a specific document."""
|
|
try:
|
|
success = await vector_service.delete_document_vectors(
|
|
tenant_id=str(tenant.id),
|
|
document_id=document_id,
|
|
collection_type=collection_type
|
|
)
|
|
|
|
if success:
|
|
return {
|
|
"document_id": document_id,
|
|
"status": "deleted",
|
|
"message": "Document vectors successfully deleted"
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to delete document vectors")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete document vectors: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to delete document vectors: {str(e)}")
|
|
|
|
|
|
@router.get("/health")
|
|
async def vector_service_health():
|
|
"""Check the health of the vector service."""
|
|
try:
|
|
is_healthy = await vector_service.health_check()
|
|
|
|
if is_healthy:
|
|
return {
|
|
"status": "healthy",
|
|
"service": "vector_database",
|
|
"embedding_model": vector_service.embedding_model.__class__.__name__ if vector_service.embedding_model else "Voyage-3-large API"
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=503, detail="Vector service is unhealthy")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Vector service health check failed: {str(e)}")
|
|
raise HTTPException(status_code=503, detail=f"Vector service health check failed: {str(e)}")
|