937 lines
37 KiB
Python
937 lines
37 KiB
Python
"""
|
|
Qdrant vector database service for the Virtual Board Member AI System.
|
|
Enhanced with Voyage-3-large embeddings and multi-modal support for Week 3.
|
|
"""
|
|
import logging
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from qdrant_client import QdrantClient, models
|
|
from qdrant_client.http import models as rest
|
|
import numpy as np
|
|
import requests
|
|
import json
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
from app.core.config import settings
|
|
from app.models.tenant import Tenant
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class VectorService:
|
|
"""Qdrant vector database service with tenant isolation."""
|
|
|
|
def __init__(self):
|
|
self.client = None
|
|
self.embedding_model = None
|
|
self.voyage_api_key = None
|
|
self._init_client()
|
|
self._init_embedding_model()
|
|
|
|
def _init_client(self):
|
|
"""Initialize Qdrant client."""
|
|
try:
|
|
self.client = QdrantClient(
|
|
host=settings.QDRANT_HOST,
|
|
port=settings.QDRANT_PORT,
|
|
timeout=settings.QDRANT_TIMEOUT
|
|
)
|
|
logger.info("Qdrant client initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Qdrant client: {e}")
|
|
self.client = None
|
|
|
|
def _init_embedding_model(self):
|
|
"""Initialize Voyage-3-large embedding model."""
|
|
try:
|
|
# For Voyage-3-large, we'll use API calls instead of local model
|
|
if settings.EMBEDDING_MODEL == "voyageai/voyage-3-large":
|
|
self.voyage_api_key = settings.VOYAGE_API_KEY
|
|
if not self.voyage_api_key:
|
|
logger.warning("Voyage API key not found, falling back to sentence-transformers")
|
|
self._init_fallback_embedding_model()
|
|
else:
|
|
logger.info("Voyage-3-large embedding model configured successfully")
|
|
else:
|
|
self._init_fallback_embedding_model()
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize embedding model: {e}")
|
|
self._init_fallback_embedding_model()
|
|
|
|
def _init_fallback_embedding_model(self):
|
|
"""Initialize fallback sentence-transformers model."""
|
|
try:
|
|
from sentence_transformers import SentenceTransformer
|
|
fallback_model = "sentence-transformers/all-MiniLM-L6-v2"
|
|
self.embedding_model = SentenceTransformer(fallback_model)
|
|
logger.info(f"Fallback embedding model {fallback_model} loaded successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load fallback embedding model: {e}")
|
|
self.embedding_model = None
|
|
|
|
def _get_collection_name(self, tenant_id: str, collection_type: str = "documents") -> str:
|
|
"""Generate tenant-isolated collection name."""
|
|
return f"{tenant_id}_{collection_type}"
|
|
|
|
async def create_tenant_collections(self, tenant: Tenant) -> bool:
|
|
"""Create all necessary collections for a tenant."""
|
|
if not self.client:
|
|
logger.error("Qdrant client not available")
|
|
return False
|
|
|
|
try:
|
|
tenant_id = str(tenant.id)
|
|
|
|
# Create main documents collection
|
|
documents_collection = self._get_collection_name(tenant_id, "documents")
|
|
await self._create_collection(
|
|
collection_name=documents_collection,
|
|
vector_size=settings.EMBEDDING_DIMENSION,
|
|
description=f"Document embeddings for tenant {tenant.name}"
|
|
)
|
|
|
|
# Create tables collection for structured data
|
|
tables_collection = self._get_collection_name(tenant_id, "tables")
|
|
await self._create_collection(
|
|
collection_name=tables_collection,
|
|
vector_size=settings.EMBEDDING_DIMENSION,
|
|
description=f"Table embeddings for tenant {tenant.name}"
|
|
)
|
|
|
|
# Create charts collection for visual data
|
|
charts_collection = self._get_collection_name(tenant_id, "charts")
|
|
await self._create_collection(
|
|
collection_name=charts_collection,
|
|
vector_size=settings.EMBEDDING_DIMENSION,
|
|
description=f"Chart embeddings for tenant {tenant.name}"
|
|
)
|
|
|
|
logger.info(f"Created collections for tenant {tenant.name} ({tenant_id})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create collections for tenant {tenant.id}: {e}")
|
|
return False
|
|
|
|
async def _create_collection(self, collection_name: str, vector_size: int, description: str) -> bool:
|
|
"""Create a collection with proper configuration."""
|
|
try:
|
|
# Check if collection already exists
|
|
collections = self.client.get_collections()
|
|
existing_collections = [col.name for col in collections.collections]
|
|
|
|
if collection_name in existing_collections:
|
|
logger.info(f"Collection {collection_name} already exists")
|
|
return True
|
|
|
|
# Create collection with optimized settings
|
|
self.client.create_collection(
|
|
collection_name=collection_name,
|
|
vectors_config=models.VectorParams(
|
|
size=vector_size,
|
|
distance=models.Distance.COSINE,
|
|
on_disk=True # Store vectors on disk for large collections
|
|
),
|
|
optimizers_config=models.OptimizersConfigDiff(
|
|
memmap_threshold=10000, # Use memory mapping for collections > 10k points
|
|
default_segment_number=2 # Optimize for parallel processing
|
|
),
|
|
replication_factor=1 # Single replica for development
|
|
)
|
|
|
|
# Add collection description
|
|
self.client.update_collection(
|
|
collection_name=collection_name,
|
|
optimizers_config=models.OptimizersConfigDiff(
|
|
default_segment_number=2
|
|
)
|
|
)
|
|
|
|
logger.info(f"Created collection {collection_name}: {description}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create collection {collection_name}: {e}")
|
|
return False
|
|
|
|
async def delete_tenant_collections(self, tenant_id: str) -> bool:
|
|
"""Delete all collections for a tenant."""
|
|
if not self.client:
|
|
return False
|
|
|
|
try:
|
|
collections_to_delete = [
|
|
self._get_collection_name(tenant_id, "documents"),
|
|
self._get_collection_name(tenant_id, "tables"),
|
|
self._get_collection_name(tenant_id, "charts")
|
|
]
|
|
|
|
for collection_name in collections_to_delete:
|
|
try:
|
|
self.client.delete_collection(collection_name)
|
|
logger.info(f"Deleted collection {collection_name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete collection {collection_name}: {e}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete collections for tenant {tenant_id}: {e}")
|
|
return False
|
|
|
|
async def generate_embedding(self, text: str) -> Optional[List[float]]:
|
|
"""Generate embedding for text using Voyage-3-large or fallback model."""
|
|
try:
|
|
# Try Voyage-3-large first
|
|
if self.voyage_api_key:
|
|
return await self._generate_voyage_embedding(text)
|
|
|
|
# Fallback to sentence-transformers
|
|
if self.embedding_model:
|
|
embedding = self.embedding_model.encode(text)
|
|
return embedding.tolist()
|
|
|
|
logger.error("No embedding model available")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate embedding: {e}")
|
|
return None
|
|
|
|
async def _generate_voyage_embedding(self, text: str) -> Optional[List[float]]:
|
|
"""Generate embedding using Voyage-3-large API."""
|
|
try:
|
|
url = "https://api.voyageai.com/v1/embeddings"
|
|
headers = {
|
|
"Authorization": f"Bearer {self.voyage_api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
data = {
|
|
"model": "voyage-3-large",
|
|
"input": text,
|
|
"input_type": "query" # or "document" for longer texts
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
if "data" in result and len(result["data"]) > 0:
|
|
return result["data"][0]["embedding"]
|
|
|
|
logger.error("No embedding data in Voyage API response")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate Voyage embedding: {e}")
|
|
return None
|
|
|
|
async def generate_batch_embeddings(self, texts: List[str]) -> List[Optional[List[float]]]:
|
|
"""Generate embeddings for a batch of texts."""
|
|
try:
|
|
# Try Voyage-3-large first
|
|
if self.voyage_api_key:
|
|
return await self._generate_voyage_batch_embeddings(texts)
|
|
|
|
# Fallback to sentence-transformers
|
|
if self.embedding_model:
|
|
embeddings = self.embedding_model.encode(texts)
|
|
return [emb.tolist() for emb in embeddings]
|
|
|
|
logger.error("No embedding model available")
|
|
return [None] * len(texts)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate batch embeddings: {e}")
|
|
return [None] * len(texts)
|
|
|
|
async def _generate_voyage_batch_embeddings(self, texts: List[str]) -> List[Optional[List[float]]]:
|
|
"""Generate batch embeddings using Voyage-3-large API."""
|
|
try:
|
|
url = "https://api.voyageai.com/v1/embeddings"
|
|
headers = {
|
|
"Authorization": f"Bearer {self.voyage_api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
data = {
|
|
"model": "voyage-3-large",
|
|
"input": texts,
|
|
"input_type": "document" # Use document type for batch processing
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data, timeout=60)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
if "data" in result:
|
|
return [item["embedding"] for item in result["data"]]
|
|
|
|
logger.error("No embedding data in Voyage API response")
|
|
return [None] * len(texts)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate Voyage batch embeddings: {e}")
|
|
return [None] * len(texts)
|
|
|
|
async def add_document_vectors(
|
|
self,
|
|
tenant_id: str,
|
|
document_id: str,
|
|
chunks: Dict[str, List[Dict[str, Any]]],
|
|
collection_type: str = "documents"
|
|
) -> bool:
|
|
"""Add document chunks to vector database with batch processing."""
|
|
if not self.client:
|
|
logger.error("Qdrant client not available")
|
|
return False
|
|
|
|
try:
|
|
collection_name = self._get_collection_name(tenant_id, collection_type)
|
|
|
|
# Collect all chunks and their types for single batch processing
|
|
all_chunks = []
|
|
chunk_types = []
|
|
|
|
# Collect text chunks
|
|
if "text_chunks" in chunks:
|
|
all_chunks.extend(chunks["text_chunks"])
|
|
chunk_types.extend(["text"] * len(chunks["text_chunks"]))
|
|
|
|
# Collect table chunks
|
|
if "table_chunks" in chunks:
|
|
all_chunks.extend(chunks["table_chunks"])
|
|
chunk_types.extend(["table"] * len(chunks["table_chunks"]))
|
|
|
|
# Collect chart chunks
|
|
if "chart_chunks" in chunks:
|
|
all_chunks.extend(chunks["chart_chunks"])
|
|
chunk_types.extend(["chart"] * len(chunks["chart_chunks"]))
|
|
|
|
if all_chunks:
|
|
# Process all chunks in a single batch
|
|
all_points = await self._process_all_chunks_batch(
|
|
document_id, tenant_id, all_chunks, chunk_types
|
|
)
|
|
|
|
if all_points:
|
|
# Upsert points in batches
|
|
batch_size = settings.EMBEDDING_BATCH_SIZE
|
|
for i in range(0, len(all_points), batch_size):
|
|
batch = all_points[i:i + batch_size]
|
|
self.client.upsert(
|
|
collection_name=collection_name,
|
|
points=batch
|
|
)
|
|
|
|
logger.info(f"Added {len(all_points)} vectors to collection {collection_name}")
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to add document vectors: {e}")
|
|
return False
|
|
|
|
async def _process_all_chunks_batch(
|
|
self,
|
|
document_id: str,
|
|
tenant_id: str,
|
|
chunks: List[Dict[str, Any]],
|
|
chunk_types: List[str]
|
|
) -> List[models.PointStruct]:
|
|
"""Process all chunks in a single batch and generate embeddings."""
|
|
points = []
|
|
|
|
try:
|
|
# Extract texts for batch embedding generation
|
|
texts = [chunk["text"] for chunk in chunks]
|
|
|
|
# Generate embeddings in batch (single call)
|
|
embeddings = await self.generate_batch_embeddings(texts)
|
|
|
|
# Create points with embeddings
|
|
for i, (chunk, embedding, chunk_type) in enumerate(zip(chunks, embeddings, chunk_types)):
|
|
if not embedding:
|
|
continue
|
|
|
|
# Create point with enhanced metadata
|
|
point = models.PointStruct(
|
|
id=chunk["id"],
|
|
vector=embedding,
|
|
payload={
|
|
"document_id": document_id,
|
|
"tenant_id": tenant_id,
|
|
"chunk_index": chunk["chunk_index"],
|
|
"text": chunk["text"],
|
|
"chunk_type": chunk_type,
|
|
"token_count": chunk.get("token_count", 0),
|
|
"page_numbers": chunk.get("page_numbers", []),
|
|
"metadata": chunk.get("metadata", {}),
|
|
"created_at": chunk.get("metadata", {}).get("created_at", datetime.utcnow().isoformat())
|
|
}
|
|
)
|
|
points.append(point)
|
|
|
|
return points
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process all chunks batch: {e}")
|
|
return []
|
|
|
|
async def _process_chunk_batch(
|
|
self,
|
|
document_id: str,
|
|
tenant_id: str,
|
|
chunks: List[Dict[str, Any]],
|
|
chunk_type: str
|
|
) -> List[models.PointStruct]:
|
|
"""Process a batch of chunks and generate embeddings."""
|
|
points = []
|
|
|
|
try:
|
|
# Extract texts for batch embedding generation
|
|
texts = [chunk["text"] for chunk in chunks]
|
|
|
|
# Generate embeddings in batch
|
|
embeddings = await self.generate_batch_embeddings(texts)
|
|
|
|
# Create points with embeddings
|
|
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
|
|
if not embedding:
|
|
continue
|
|
|
|
# Create point with enhanced metadata
|
|
point = models.PointStruct(
|
|
id=chunk["id"],
|
|
vector=embedding,
|
|
payload={
|
|
"document_id": document_id,
|
|
"tenant_id": tenant_id,
|
|
"chunk_index": chunk["chunk_index"],
|
|
"text": chunk["text"],
|
|
"chunk_type": chunk_type,
|
|
"token_count": chunk.get("token_count", 0),
|
|
"page_numbers": chunk.get("page_numbers", []),
|
|
"metadata": chunk.get("metadata", {}),
|
|
"created_at": chunk.get("metadata", {}).get("created_at", datetime.utcnow().isoformat())
|
|
}
|
|
)
|
|
points.append(point)
|
|
|
|
return points
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process {chunk_type} chunk batch: {e}")
|
|
return []
|
|
|
|
async def search_similar(
|
|
self,
|
|
tenant_id: str,
|
|
query: str,
|
|
limit: int = 10,
|
|
score_threshold: float = 0.7,
|
|
collection_type: str = "documents",
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
chunk_types: Optional[List[str]] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Search for similar vectors with multi-modal support."""
|
|
if not self.client:
|
|
return []
|
|
|
|
try:
|
|
collection_name = self._get_collection_name(tenant_id, collection_type)
|
|
|
|
# Generate query embedding
|
|
query_embedding = await self.generate_embedding(query)
|
|
if not query_embedding:
|
|
return []
|
|
|
|
# Build search filter
|
|
search_filter = models.Filter(
|
|
must=[
|
|
models.FieldCondition(
|
|
key="tenant_id",
|
|
match=models.MatchValue(value=tenant_id)
|
|
)
|
|
]
|
|
)
|
|
|
|
# Add chunk type filter if specified
|
|
if chunk_types:
|
|
search_filter.must.append(
|
|
models.FieldCondition(
|
|
key="chunk_type",
|
|
match=models.MatchAny(any=chunk_types)
|
|
)
|
|
)
|
|
|
|
# Add additional filters
|
|
if filters:
|
|
for key, value in filters.items():
|
|
if isinstance(value, list):
|
|
search_filter.must.append(
|
|
models.FieldCondition(
|
|
key=key,
|
|
match=models.MatchAny(any=value)
|
|
)
|
|
)
|
|
else:
|
|
search_filter.must.append(
|
|
models.FieldCondition(
|
|
key=key,
|
|
match=models.MatchValue(value=value)
|
|
)
|
|
)
|
|
|
|
# Perform search
|
|
search_result = self.client.search(
|
|
collection_name=collection_name,
|
|
query_vector=query_embedding,
|
|
query_filter=search_filter,
|
|
limit=limit,
|
|
score_threshold=score_threshold,
|
|
with_payload=True
|
|
)
|
|
|
|
# Format results with enhanced metadata
|
|
results = []
|
|
for point in search_result:
|
|
results.append({
|
|
"id": point.id,
|
|
"score": point.score,
|
|
"payload": point.payload,
|
|
"text": point.payload.get("text", ""),
|
|
"document_id": point.payload.get("document_id"),
|
|
"chunk_type": point.payload.get("chunk_type", "text"),
|
|
"token_count": point.payload.get("token_count", 0),
|
|
"page_numbers": point.payload.get("page_numbers", []),
|
|
"metadata": point.payload.get("metadata", {})
|
|
})
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to search vectors: {e}")
|
|
return []
|
|
|
|
async def search_structured_data(
|
|
self,
|
|
tenant_id: str,
|
|
query: str,
|
|
data_type: str = "table", # "table" or "chart"
|
|
limit: int = 10,
|
|
score_threshold: float = 0.7,
|
|
filters: Optional[Dict[str, Any]] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Search specifically for structured data (tables and charts)."""
|
|
return await self.search_similar(
|
|
tenant_id=tenant_id,
|
|
query=query,
|
|
limit=limit,
|
|
score_threshold=score_threshold,
|
|
collection_type="documents",
|
|
filters=filters,
|
|
chunk_types=[data_type]
|
|
)
|
|
|
|
async def hybrid_search(
|
|
self,
|
|
tenant_id: str,
|
|
query: str,
|
|
limit: int = 10,
|
|
score_threshold: float = 0.7,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
semantic_weight: float = 0.7,
|
|
keyword_weight: float = 0.3
|
|
) -> List[Dict[str, Any]]:
|
|
"""Perform hybrid search combining semantic and keyword matching."""
|
|
try:
|
|
# Semantic search
|
|
semantic_results = await self.search_similar(
|
|
tenant_id=tenant_id,
|
|
query=query,
|
|
limit=limit * 2, # Get more results for re-ranking
|
|
score_threshold=score_threshold * 0.8, # Lower threshold for semantic
|
|
filters=filters
|
|
)
|
|
|
|
# Keyword search (simple implementation)
|
|
keyword_results = await self._keyword_search(
|
|
tenant_id=tenant_id,
|
|
query=query,
|
|
limit=limit * 2,
|
|
filters=filters
|
|
)
|
|
|
|
# Combine and re-rank results
|
|
combined_results = await self._combine_search_results(
|
|
semantic_results, keyword_results, semantic_weight, keyword_weight
|
|
)
|
|
|
|
# Return top results
|
|
return combined_results[:limit]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to perform hybrid search: {e}")
|
|
return []
|
|
|
|
async def _keyword_search(
|
|
self,
|
|
tenant_id: str,
|
|
query: str,
|
|
limit: int = 10,
|
|
filters: Optional[Dict[str, Any]] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Simple keyword search implementation."""
|
|
try:
|
|
# This is a simplified keyword search
|
|
# In a production system, you might use Elasticsearch or similar
|
|
query_terms = query.lower().split()
|
|
|
|
# Get all documents and filter by keywords
|
|
collection_name = self._get_collection_name(tenant_id, "documents")
|
|
|
|
# Build filter
|
|
search_filter = models.Filter(
|
|
must=[
|
|
models.FieldCondition(
|
|
key="tenant_id",
|
|
match=models.MatchValue(value=tenant_id)
|
|
)
|
|
]
|
|
)
|
|
|
|
if filters:
|
|
for key, value in filters.items():
|
|
if isinstance(value, list):
|
|
search_filter.must.append(
|
|
models.FieldCondition(
|
|
key=key,
|
|
match=models.MatchAny(any=value)
|
|
)
|
|
)
|
|
else:
|
|
search_filter.must.append(
|
|
models.FieldCondition(
|
|
key=key,
|
|
match=models.MatchValue(value=value)
|
|
)
|
|
)
|
|
|
|
# Get all points and filter by keywords
|
|
all_points = self.client.scroll(
|
|
collection_name=collection_name,
|
|
scroll_filter=search_filter,
|
|
limit=1000, # Adjust based on your data size
|
|
with_payload=True
|
|
)[0]
|
|
|
|
# Score by keyword matches
|
|
keyword_results = []
|
|
for point in all_points:
|
|
text = point.payload.get("text", "").lower()
|
|
score = sum(1 for term in query_terms if term in text)
|
|
if score > 0:
|
|
keyword_results.append({
|
|
"id": point.id,
|
|
"score": score / len(query_terms), # Normalize score
|
|
"payload": point.payload,
|
|
"text": point.payload.get("text", ""),
|
|
"document_id": point.payload.get("document_id"),
|
|
"chunk_type": point.payload.get("chunk_type", "text"),
|
|
"token_count": point.payload.get("token_count", 0),
|
|
"page_numbers": point.payload.get("page_numbers", []),
|
|
"metadata": point.payload.get("metadata", {})
|
|
})
|
|
|
|
# Sort by score and return top results
|
|
keyword_results.sort(key=lambda x: x["score"], reverse=True)
|
|
return keyword_results[:limit]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to perform keyword search: {e}")
|
|
return []
|
|
|
|
async def _combine_search_results(
|
|
self,
|
|
semantic_results: List[Dict[str, Any]],
|
|
keyword_results: List[Dict[str, Any]],
|
|
semantic_weight: float,
|
|
keyword_weight: float
|
|
) -> List[Dict[str, Any]]:
|
|
"""Combine and re-rank search results."""
|
|
try:
|
|
# Create a map of results by ID
|
|
combined_map = {}
|
|
|
|
# Add semantic results
|
|
for result in semantic_results:
|
|
result_id = result["id"]
|
|
combined_map[result_id] = {
|
|
**result,
|
|
"semantic_score": result["score"],
|
|
"keyword_score": 0.0,
|
|
"combined_score": result["score"] * semantic_weight
|
|
}
|
|
|
|
# Add keyword results
|
|
for result in keyword_results:
|
|
result_id = result["id"]
|
|
if result_id in combined_map:
|
|
# Update existing result
|
|
combined_map[result_id]["keyword_score"] = result["score"]
|
|
combined_map[result_id]["combined_score"] += result["score"] * keyword_weight
|
|
else:
|
|
# Add new result
|
|
combined_map[result_id] = {
|
|
**result,
|
|
"semantic_score": 0.0,
|
|
"keyword_score": result["score"],
|
|
"combined_score": result["score"] * keyword_weight
|
|
}
|
|
|
|
# Convert to list and sort by combined score
|
|
combined_results = list(combined_map.values())
|
|
combined_results.sort(key=lambda x: x["combined_score"], reverse=True)
|
|
|
|
return combined_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to combine search results: {e}")
|
|
return semantic_results # Fallback to semantic results
|
|
|
|
async def delete_document_vectors(self, tenant_id: str, document_id: str, collection_type: str = "documents") -> bool:
|
|
"""Delete all vectors for a specific document."""
|
|
if not self.client:
|
|
return False
|
|
|
|
try:
|
|
collection_name = self._get_collection_name(tenant_id, collection_type)
|
|
|
|
# Delete points with document_id filter
|
|
self.client.delete(
|
|
collection_name=collection_name,
|
|
points_selector=models.FilterSelector(
|
|
filter=models.Filter(
|
|
must=[
|
|
models.FieldCondition(
|
|
key="document_id",
|
|
match=models.MatchValue(value=document_id)
|
|
),
|
|
models.FieldCondition(
|
|
key="tenant_id",
|
|
match=models.MatchValue(value=tenant_id)
|
|
)
|
|
]
|
|
)
|
|
)
|
|
)
|
|
|
|
logger.info(f"Deleted vectors for document {document_id} from collection {collection_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete document vectors: {e}")
|
|
return False
|
|
|
|
async def get_collection_stats(self, tenant_id: str, collection_type: str = "documents") -> Optional[Dict[str, Any]]:
|
|
"""Get collection statistics."""
|
|
if not self.client:
|
|
return None
|
|
|
|
try:
|
|
collection_name = self._get_collection_name(tenant_id, collection_type)
|
|
|
|
info = self.client.get_collection(collection_name)
|
|
count = self.client.count(
|
|
collection_name=collection_name,
|
|
count_filter=models.Filter(
|
|
must=[
|
|
models.FieldCondition(
|
|
key="tenant_id",
|
|
match=models.MatchValue(value=tenant_id)
|
|
)
|
|
]
|
|
)
|
|
)
|
|
|
|
return {
|
|
"collection_name": collection_name,
|
|
"tenant_id": tenant_id,
|
|
"vector_count": count.count,
|
|
"vector_size": info.config.params.vectors.size,
|
|
"distance": info.config.params.vectors.distance,
|
|
"status": info.status
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get collection stats: {e}")
|
|
return None
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Check if vector service is healthy."""
|
|
if not self.client:
|
|
return False
|
|
|
|
try:
|
|
# Check client connection
|
|
collections = self.client.get_collections()
|
|
|
|
# Check embedding model (either Voyage or fallback)
|
|
if not self.voyage_api_key and not self.embedding_model:
|
|
return False
|
|
|
|
# Test embedding generation
|
|
test_embedding = await self.generate_embedding("test")
|
|
if not test_embedding:
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Vector service health check failed: {e}")
|
|
return False
|
|
|
|
async def optimize_collections(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Optimize vector database collections for performance."""
|
|
try:
|
|
optimization_results = {}
|
|
|
|
# Optimize each collection type
|
|
for collection_type in ["documents", "tables", "charts"]:
|
|
collection_name = self._get_collection_name(tenant_id, collection_type)
|
|
|
|
try:
|
|
# Force collection optimization
|
|
self.client.update_collection(
|
|
collection_name=collection_name,
|
|
optimizers_config=models.OptimizersConfigDiff(
|
|
default_segment_number=4, # Increase for better parallelization
|
|
memmap_threshold=5000, # Lower threshold for memory mapping
|
|
vacuum_min_vector_number=1000 # Optimize vacuum threshold
|
|
)
|
|
)
|
|
|
|
# Get collection info
|
|
info = self.client.get_collection(collection_name)
|
|
optimization_results[collection_type] = {
|
|
"status": "optimized",
|
|
"vector_count": info.points_count,
|
|
"segments": info.segments_count,
|
|
"optimized_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to optimize collection {collection_name}: {e}")
|
|
optimization_results[collection_type] = {
|
|
"status": "failed",
|
|
"error": str(e)
|
|
}
|
|
|
|
return optimization_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to optimize collections: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def get_performance_metrics(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Get performance metrics for vector database operations."""
|
|
try:
|
|
metrics = {
|
|
"tenant_id": tenant_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"collections": {},
|
|
"embedding_model": settings.EMBEDDING_MODEL,
|
|
"embedding_dimension": settings.EMBEDDING_DIMENSION
|
|
}
|
|
|
|
# Get metrics for each collection
|
|
for collection_type in ["documents", "tables", "charts"]:
|
|
collection_name = self._get_collection_name(tenant_id, collection_type)
|
|
|
|
try:
|
|
info = self.client.get_collection(collection_name)
|
|
count = self.client.count(
|
|
collection_name=collection_name,
|
|
count_filter=models.Filter(
|
|
must=[
|
|
models.FieldCondition(
|
|
key="tenant_id",
|
|
match=models.MatchValue(value=tenant_id)
|
|
)
|
|
]
|
|
)
|
|
)
|
|
|
|
metrics["collections"][collection_type] = {
|
|
"vector_count": count.count,
|
|
"segments": info.segments_count,
|
|
"status": info.status,
|
|
"vector_size": info.config.params.vectors.size,
|
|
"distance": info.config.params.vectors.distance
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get metrics for collection {collection_name}: {e}")
|
|
metrics["collections"][collection_type] = {
|
|
"error": str(e)
|
|
}
|
|
|
|
return metrics
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get performance metrics: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def create_performance_benchmarks(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Create performance benchmarks for vector operations."""
|
|
try:
|
|
benchmarks = {
|
|
"tenant_id": tenant_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"results": {}
|
|
}
|
|
|
|
# Benchmark embedding generation
|
|
import time
|
|
|
|
# Single embedding benchmark
|
|
start_time = time.time()
|
|
test_embedding = await self.generate_embedding("This is a test document for benchmarking purposes.")
|
|
single_embedding_time = time.time() - start_time
|
|
|
|
# Batch embedding benchmark
|
|
test_texts = [f"Test document {i} for batch benchmarking." for i in range(10)]
|
|
start_time = time.time()
|
|
batch_embeddings = await self.generate_batch_embeddings(test_texts)
|
|
batch_embedding_time = time.time() - start_time
|
|
|
|
# Search benchmark
|
|
if test_embedding:
|
|
start_time = time.time()
|
|
search_results = await self.search_similar(
|
|
tenant_id=tenant_id,
|
|
query="test query",
|
|
limit=5
|
|
)
|
|
search_time = time.time() - start_time
|
|
else:
|
|
search_time = None
|
|
|
|
benchmarks["results"] = {
|
|
"single_embedding_time_ms": round(single_embedding_time * 1000, 2),
|
|
"batch_embedding_time_ms": round(batch_embedding_time * 1000, 2),
|
|
"avg_embedding_per_text_ms": round((batch_embedding_time / len(test_texts)) * 1000, 2),
|
|
"search_time_ms": round(search_time * 1000, 2) if search_time else None,
|
|
"embedding_model": settings.EMBEDDING_MODEL,
|
|
"embedding_dimension": settings.EMBEDDING_DIMENSION
|
|
}
|
|
|
|
return benchmarks
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create performance benchmarks: {e}")
|
|
return {"error": str(e)}
|
|
|
|
# Global vector service instance
|
|
vector_service = VectorService()
|