""" Qdrant vector database service for the Virtual Board Member AI System. Enhanced with Voyage-3-large embeddings and multi-modal support for Week 3. """ import logging from typing import List, Dict, Any, Optional, Tuple from qdrant_client import QdrantClient, models from qdrant_client.http import models as rest import numpy as np import requests import json import asyncio from datetime import datetime from app.core.config import settings from app.models.tenant import Tenant logger = logging.getLogger(__name__) class VectorService: """Qdrant vector database service with tenant isolation.""" def __init__(self): self.client = None self.embedding_model = None self.voyage_api_key = None self._init_client() self._init_embedding_model() def _init_client(self): """Initialize Qdrant client.""" try: self.client = QdrantClient( host=settings.QDRANT_HOST, port=settings.QDRANT_PORT, timeout=settings.QDRANT_TIMEOUT ) logger.info("Qdrant client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Qdrant client: {e}") self.client = None def _init_embedding_model(self): """Initialize Voyage-3-large embedding model.""" try: # For Voyage-3-large, we'll use API calls instead of local model if settings.EMBEDDING_MODEL == "voyageai/voyage-3-large": self.voyage_api_key = settings.VOYAGE_API_KEY if not self.voyage_api_key: logger.warning("Voyage API key not found, falling back to sentence-transformers") self._init_fallback_embedding_model() else: logger.info("Voyage-3-large embedding model configured successfully") else: self._init_fallback_embedding_model() except Exception as e: logger.error(f"Failed to initialize embedding model: {e}") self._init_fallback_embedding_model() def _init_fallback_embedding_model(self): """Initialize fallback sentence-transformers model.""" try: from sentence_transformers import SentenceTransformer fallback_model = "sentence-transformers/all-MiniLM-L6-v2" self.embedding_model = SentenceTransformer(fallback_model) logger.info(f"Fallback embedding model {fallback_model} loaded successfully") except Exception as e: logger.error(f"Failed to load fallback embedding model: {e}") self.embedding_model = None def _get_collection_name(self, tenant_id: str, collection_type: str = "documents") -> str: """Generate tenant-isolated collection name.""" return f"{tenant_id}_{collection_type}" async def create_tenant_collections(self, tenant: Tenant) -> bool: """Create all necessary collections for a tenant.""" if not self.client: logger.error("Qdrant client not available") return False try: tenant_id = str(tenant.id) # Create main documents collection documents_collection = self._get_collection_name(tenant_id, "documents") await self._create_collection( collection_name=documents_collection, vector_size=settings.EMBEDDING_DIMENSION, description=f"Document embeddings for tenant {tenant.name}" ) # Create tables collection for structured data tables_collection = self._get_collection_name(tenant_id, "tables") await self._create_collection( collection_name=tables_collection, vector_size=settings.EMBEDDING_DIMENSION, description=f"Table embeddings for tenant {tenant.name}" ) # Create charts collection for visual data charts_collection = self._get_collection_name(tenant_id, "charts") await self._create_collection( collection_name=charts_collection, vector_size=settings.EMBEDDING_DIMENSION, description=f"Chart embeddings for tenant {tenant.name}" ) logger.info(f"Created collections for tenant {tenant.name} ({tenant_id})") return True except Exception as e: logger.error(f"Failed to create collections for tenant {tenant.id}: {e}") return False async def _create_collection(self, collection_name: str, vector_size: int, description: str) -> bool: """Create a collection with proper configuration.""" try: # Check if collection already exists collections = self.client.get_collections() existing_collections = [col.name for col in collections.collections] if collection_name in existing_collections: logger.info(f"Collection {collection_name} already exists") return True # Create collection with optimized settings self.client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams( size=vector_size, distance=models.Distance.COSINE, on_disk=True # Store vectors on disk for large collections ), optimizers_config=models.OptimizersConfigDiff( memmap_threshold=10000, # Use memory mapping for collections > 10k points default_segment_number=2 # Optimize for parallel processing ), replication_factor=1 # Single replica for development ) # Add collection description self.client.update_collection( collection_name=collection_name, optimizers_config=models.OptimizersConfigDiff( default_segment_number=2 ) ) logger.info(f"Created collection {collection_name}: {description}") return True except Exception as e: logger.error(f"Failed to create collection {collection_name}: {e}") return False async def delete_tenant_collections(self, tenant_id: str) -> bool: """Delete all collections for a tenant.""" if not self.client: return False try: collections_to_delete = [ self._get_collection_name(tenant_id, "documents"), self._get_collection_name(tenant_id, "tables"), self._get_collection_name(tenant_id, "charts") ] for collection_name in collections_to_delete: try: self.client.delete_collection(collection_name) logger.info(f"Deleted collection {collection_name}") except Exception as e: logger.warning(f"Failed to delete collection {collection_name}: {e}") return True except Exception as e: logger.error(f"Failed to delete collections for tenant {tenant_id}: {e}") return False async def generate_embedding(self, text: str) -> Optional[List[float]]: """Generate embedding for text using Voyage-3-large or fallback model.""" try: # Try Voyage-3-large first if self.voyage_api_key: return await self._generate_voyage_embedding(text) # Fallback to sentence-transformers if self.embedding_model: embedding = self.embedding_model.encode(text) return embedding.tolist() logger.error("No embedding model available") return None except Exception as e: logger.error(f"Failed to generate embedding: {e}") return None async def _generate_voyage_embedding(self, text: str) -> Optional[List[float]]: """Generate embedding using Voyage-3-large API.""" try: url = "https://api.voyageai.com/v1/embeddings" headers = { "Authorization": f"Bearer {self.voyage_api_key}", "Content-Type": "application/json" } data = { "model": "voyage-3-large", "input": text, "input_type": "query" # or "document" for longer texts } response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() if "data" in result and len(result["data"]) > 0: return result["data"][0]["embedding"] logger.error("No embedding data in Voyage API response") return None except Exception as e: logger.error(f"Failed to generate Voyage embedding: {e}") return None async def generate_batch_embeddings(self, texts: List[str]) -> List[Optional[List[float]]]: """Generate embeddings for a batch of texts.""" try: # Try Voyage-3-large first if self.voyage_api_key: return await self._generate_voyage_batch_embeddings(texts) # Fallback to sentence-transformers if self.embedding_model: embeddings = self.embedding_model.encode(texts) return [emb.tolist() for emb in embeddings] logger.error("No embedding model available") return [None] * len(texts) except Exception as e: logger.error(f"Failed to generate batch embeddings: {e}") return [None] * len(texts) async def _generate_voyage_batch_embeddings(self, texts: List[str]) -> List[Optional[List[float]]]: """Generate batch embeddings using Voyage-3-large API.""" try: url = "https://api.voyageai.com/v1/embeddings" headers = { "Authorization": f"Bearer {self.voyage_api_key}", "Content-Type": "application/json" } data = { "model": "voyage-3-large", "input": texts, "input_type": "document" # Use document type for batch processing } response = requests.post(url, headers=headers, json=data, timeout=60) response.raise_for_status() result = response.json() if "data" in result: return [item["embedding"] for item in result["data"]] logger.error("No embedding data in Voyage API response") return [None] * len(texts) except Exception as e: logger.error(f"Failed to generate Voyage batch embeddings: {e}") return [None] * len(texts) async def add_document_vectors( self, tenant_id: str, document_id: str, chunks: Dict[str, List[Dict[str, Any]]], collection_type: str = "documents" ) -> bool: """Add document chunks to vector database with batch processing.""" if not self.client: logger.error("Qdrant client not available") return False try: collection_name = self._get_collection_name(tenant_id, collection_type) # Collect all chunks and their types for single batch processing all_chunks = [] chunk_types = [] # Collect text chunks if "text_chunks" in chunks: all_chunks.extend(chunks["text_chunks"]) chunk_types.extend(["text"] * len(chunks["text_chunks"])) # Collect table chunks if "table_chunks" in chunks: all_chunks.extend(chunks["table_chunks"]) chunk_types.extend(["table"] * len(chunks["table_chunks"])) # Collect chart chunks if "chart_chunks" in chunks: all_chunks.extend(chunks["chart_chunks"]) chunk_types.extend(["chart"] * len(chunks["chart_chunks"])) if all_chunks: # Process all chunks in a single batch all_points = await self._process_all_chunks_batch( document_id, tenant_id, all_chunks, chunk_types ) if all_points: # Upsert points in batches batch_size = settings.EMBEDDING_BATCH_SIZE for i in range(0, len(all_points), batch_size): batch = all_points[i:i + batch_size] self.client.upsert( collection_name=collection_name, points=batch ) logger.info(f"Added {len(all_points)} vectors to collection {collection_name}") return True return False except Exception as e: logger.error(f"Failed to add document vectors: {e}") return False async def _process_all_chunks_batch( self, document_id: str, tenant_id: str, chunks: List[Dict[str, Any]], chunk_types: List[str] ) -> List[models.PointStruct]: """Process all chunks in a single batch and generate embeddings.""" points = [] try: # Extract texts for batch embedding generation texts = [chunk["text"] for chunk in chunks] # Generate embeddings in batch (single call) embeddings = await self.generate_batch_embeddings(texts) # Create points with embeddings for i, (chunk, embedding, chunk_type) in enumerate(zip(chunks, embeddings, chunk_types)): if not embedding: continue # Create point with enhanced metadata point = models.PointStruct( id=chunk["id"], vector=embedding, payload={ "document_id": document_id, "tenant_id": tenant_id, "chunk_index": chunk["chunk_index"], "text": chunk["text"], "chunk_type": chunk_type, "token_count": chunk.get("token_count", 0), "page_numbers": chunk.get("page_numbers", []), "metadata": chunk.get("metadata", {}), "created_at": chunk.get("metadata", {}).get("created_at", datetime.utcnow().isoformat()) } ) points.append(point) return points except Exception as e: logger.error(f"Failed to process all chunks batch: {e}") return [] async def _process_chunk_batch( self, document_id: str, tenant_id: str, chunks: List[Dict[str, Any]], chunk_type: str ) -> List[models.PointStruct]: """Process a batch of chunks and generate embeddings.""" points = [] try: # Extract texts for batch embedding generation texts = [chunk["text"] for chunk in chunks] # Generate embeddings in batch embeddings = await self.generate_batch_embeddings(texts) # Create points with embeddings for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): if not embedding: continue # Create point with enhanced metadata point = models.PointStruct( id=chunk["id"], vector=embedding, payload={ "document_id": document_id, "tenant_id": tenant_id, "chunk_index": chunk["chunk_index"], "text": chunk["text"], "chunk_type": chunk_type, "token_count": chunk.get("token_count", 0), "page_numbers": chunk.get("page_numbers", []), "metadata": chunk.get("metadata", {}), "created_at": chunk.get("metadata", {}).get("created_at", datetime.utcnow().isoformat()) } ) points.append(point) return points except Exception as e: logger.error(f"Failed to process {chunk_type} chunk batch: {e}") return [] async def search_similar( self, tenant_id: str, query: str, limit: int = 10, score_threshold: float = 0.7, collection_type: str = "documents", filters: Optional[Dict[str, Any]] = None, chunk_types: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """Search for similar vectors with multi-modal support.""" if not self.client: return [] try: collection_name = self._get_collection_name(tenant_id, collection_type) # Generate query embedding query_embedding = await self.generate_embedding(query) if not query_embedding: return [] # Build search filter search_filter = models.Filter( must=[ models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) ] ) # Add chunk type filter if specified if chunk_types: search_filter.must.append( models.FieldCondition( key="chunk_type", match=models.MatchAny(any=chunk_types) ) ) # Add additional filters if filters: for key, value in filters.items(): if isinstance(value, list): search_filter.must.append( models.FieldCondition( key=key, match=models.MatchAny(any=value) ) ) else: search_filter.must.append( models.FieldCondition( key=key, match=models.MatchValue(value=value) ) ) # Perform search search_result = self.client.search( collection_name=collection_name, query_vector=query_embedding, query_filter=search_filter, limit=limit, score_threshold=score_threshold, with_payload=True ) # Format results with enhanced metadata results = [] for point in search_result: results.append({ "id": point.id, "score": point.score, "payload": point.payload, "text": point.payload.get("text", ""), "document_id": point.payload.get("document_id"), "chunk_type": point.payload.get("chunk_type", "text"), "token_count": point.payload.get("token_count", 0), "page_numbers": point.payload.get("page_numbers", []), "metadata": point.payload.get("metadata", {}) }) return results except Exception as e: logger.error(f"Failed to search vectors: {e}") return [] async def search_structured_data( self, tenant_id: str, query: str, data_type: str = "table", # "table" or "chart" limit: int = 10, score_threshold: float = 0.7, filters: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """Search specifically for structured data (tables and charts).""" return await self.search_similar( tenant_id=tenant_id, query=query, limit=limit, score_threshold=score_threshold, collection_type="documents", filters=filters, chunk_types=[data_type] ) async def hybrid_search( self, tenant_id: str, query: str, limit: int = 10, score_threshold: float = 0.7, filters: Optional[Dict[str, Any]] = None, semantic_weight: float = 0.7, keyword_weight: float = 0.3 ) -> List[Dict[str, Any]]: """Perform hybrid search combining semantic and keyword matching.""" try: # Semantic search semantic_results = await self.search_similar( tenant_id=tenant_id, query=query, limit=limit * 2, # Get more results for re-ranking score_threshold=score_threshold * 0.8, # Lower threshold for semantic filters=filters ) # Keyword search (simple implementation) keyword_results = await self._keyword_search( tenant_id=tenant_id, query=query, limit=limit * 2, filters=filters ) # Combine and re-rank results combined_results = await self._combine_search_results( semantic_results, keyword_results, semantic_weight, keyword_weight ) # Return top results return combined_results[:limit] except Exception as e: logger.error(f"Failed to perform hybrid search: {e}") return [] async def _keyword_search( self, tenant_id: str, query: str, limit: int = 10, filters: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """Simple keyword search implementation.""" try: # This is a simplified keyword search # In a production system, you might use Elasticsearch or similar query_terms = query.lower().split() # Get all documents and filter by keywords collection_name = self._get_collection_name(tenant_id, "documents") # Build filter search_filter = models.Filter( must=[ models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) ] ) if filters: for key, value in filters.items(): if isinstance(value, list): search_filter.must.append( models.FieldCondition( key=key, match=models.MatchAny(any=value) ) ) else: search_filter.must.append( models.FieldCondition( key=key, match=models.MatchValue(value=value) ) ) # Get all points and filter by keywords all_points = self.client.scroll( collection_name=collection_name, scroll_filter=search_filter, limit=1000, # Adjust based on your data size with_payload=True )[0] # Score by keyword matches keyword_results = [] for point in all_points: text = point.payload.get("text", "").lower() score = sum(1 for term in query_terms if term in text) if score > 0: keyword_results.append({ "id": point.id, "score": score / len(query_terms), # Normalize score "payload": point.payload, "text": point.payload.get("text", ""), "document_id": point.payload.get("document_id"), "chunk_type": point.payload.get("chunk_type", "text"), "token_count": point.payload.get("token_count", 0), "page_numbers": point.payload.get("page_numbers", []), "metadata": point.payload.get("metadata", {}) }) # Sort by score and return top results keyword_results.sort(key=lambda x: x["score"], reverse=True) return keyword_results[:limit] except Exception as e: logger.error(f"Failed to perform keyword search: {e}") return [] async def _combine_search_results( self, semantic_results: List[Dict[str, Any]], keyword_results: List[Dict[str, Any]], semantic_weight: float, keyword_weight: float ) -> List[Dict[str, Any]]: """Combine and re-rank search results.""" try: # Create a map of results by ID combined_map = {} # Add semantic results for result in semantic_results: result_id = result["id"] combined_map[result_id] = { **result, "semantic_score": result["score"], "keyword_score": 0.0, "combined_score": result["score"] * semantic_weight } # Add keyword results for result in keyword_results: result_id = result["id"] if result_id in combined_map: # Update existing result combined_map[result_id]["keyword_score"] = result["score"] combined_map[result_id]["combined_score"] += result["score"] * keyword_weight else: # Add new result combined_map[result_id] = { **result, "semantic_score": 0.0, "keyword_score": result["score"], "combined_score": result["score"] * keyword_weight } # Convert to list and sort by combined score combined_results = list(combined_map.values()) combined_results.sort(key=lambda x: x["combined_score"], reverse=True) return combined_results except Exception as e: logger.error(f"Failed to combine search results: {e}") return semantic_results # Fallback to semantic results async def delete_document_vectors(self, tenant_id: str, document_id: str, collection_type: str = "documents") -> bool: """Delete all vectors for a specific document.""" if not self.client: return False try: collection_name = self._get_collection_name(tenant_id, collection_type) # Delete points with document_id filter self.client.delete( collection_name=collection_name, points_selector=models.FilterSelector( filter=models.Filter( must=[ models.FieldCondition( key="document_id", match=models.MatchValue(value=document_id) ), models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) ] ) ) ) logger.info(f"Deleted vectors for document {document_id} from collection {collection_name}") return True except Exception as e: logger.error(f"Failed to delete document vectors: {e}") return False async def get_collection_stats(self, tenant_id: str, collection_type: str = "documents") -> Optional[Dict[str, Any]]: """Get collection statistics.""" if not self.client: return None try: collection_name = self._get_collection_name(tenant_id, collection_type) info = self.client.get_collection(collection_name) count = self.client.count( collection_name=collection_name, count_filter=models.Filter( must=[ models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) ] ) ) return { "collection_name": collection_name, "tenant_id": tenant_id, "vector_count": count.count, "vector_size": info.config.params.vectors.size, "distance": info.config.params.vectors.distance, "status": info.status } except Exception as e: logger.error(f"Failed to get collection stats: {e}") return None async def health_check(self) -> bool: """Check if vector service is healthy.""" if not self.client: return False try: # Check client connection collections = self.client.get_collections() # Check embedding model (either Voyage or fallback) if not self.voyage_api_key and not self.embedding_model: return False # Test embedding generation test_embedding = await self.generate_embedding("test") if not test_embedding: return False return True except Exception as e: logger.error(f"Vector service health check failed: {e}") return False async def optimize_collections(self, tenant_id: str) -> Dict[str, Any]: """Optimize vector database collections for performance.""" try: optimization_results = {} # Optimize each collection type for collection_type in ["documents", "tables", "charts"]: collection_name = self._get_collection_name(tenant_id, collection_type) try: # Force collection optimization self.client.update_collection( collection_name=collection_name, optimizers_config=models.OptimizersConfigDiff( default_segment_number=4, # Increase for better parallelization memmap_threshold=5000, # Lower threshold for memory mapping vacuum_min_vector_number=1000 # Optimize vacuum threshold ) ) # Get collection info info = self.client.get_collection(collection_name) optimization_results[collection_type] = { "status": "optimized", "vector_count": info.points_count, "segments": info.segments_count, "optimized_at": datetime.utcnow().isoformat() } except Exception as e: logger.warning(f"Failed to optimize collection {collection_name}: {e}") optimization_results[collection_type] = { "status": "failed", "error": str(e) } return optimization_results except Exception as e: logger.error(f"Failed to optimize collections: {e}") return {"error": str(e)} async def get_performance_metrics(self, tenant_id: str) -> Dict[str, Any]: """Get performance metrics for vector database operations.""" try: metrics = { "tenant_id": tenant_id, "timestamp": datetime.utcnow().isoformat(), "collections": {}, "embedding_model": settings.EMBEDDING_MODEL, "embedding_dimension": settings.EMBEDDING_DIMENSION } # Get metrics for each collection for collection_type in ["documents", "tables", "charts"]: collection_name = self._get_collection_name(tenant_id, collection_type) try: info = self.client.get_collection(collection_name) count = self.client.count( collection_name=collection_name, count_filter=models.Filter( must=[ models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) ] ) ) metrics["collections"][collection_type] = { "vector_count": count.count, "segments": info.segments_count, "status": info.status, "vector_size": info.config.params.vectors.size, "distance": info.config.params.vectors.distance } except Exception as e: logger.warning(f"Failed to get metrics for collection {collection_name}: {e}") metrics["collections"][collection_type] = { "error": str(e) } return metrics except Exception as e: logger.error(f"Failed to get performance metrics: {e}") return {"error": str(e)} async def create_performance_benchmarks(self, tenant_id: str) -> Dict[str, Any]: """Create performance benchmarks for vector operations.""" try: benchmarks = { "tenant_id": tenant_id, "timestamp": datetime.utcnow().isoformat(), "results": {} } # Benchmark embedding generation import time # Single embedding benchmark start_time = time.time() test_embedding = await self.generate_embedding("This is a test document for benchmarking purposes.") single_embedding_time = time.time() - start_time # Batch embedding benchmark test_texts = [f"Test document {i} for batch benchmarking." for i in range(10)] start_time = time.time() batch_embeddings = await self.generate_batch_embeddings(test_texts) batch_embedding_time = time.time() - start_time # Search benchmark if test_embedding: start_time = time.time() search_results = await self.search_similar( tenant_id=tenant_id, query="test query", limit=5 ) search_time = time.time() - start_time else: search_time = None benchmarks["results"] = { "single_embedding_time_ms": round(single_embedding_time * 1000, 2), "batch_embedding_time_ms": round(batch_embedding_time * 1000, 2), "avg_embedding_per_text_ms": round((batch_embedding_time / len(test_texts)) * 1000, 2), "search_time_ms": round(search_time * 1000, 2) if search_time else None, "embedding_model": settings.EMBEDDING_MODEL, "embedding_dimension": settings.EMBEDDING_DIMENSION } return benchmarks except Exception as e: logger.error(f"Failed to create performance benchmarks: {e}") return {"error": str(e)} # Global vector service instance vector_service = VectorService()