Files
cim_summary/backend/src/services/rag/chunkProcessing.ts
admin 87c6da4225 Refactor LLM service architecture and improve document processing
- Refactor LLM service with provider pattern (Anthropic, OpenAI, OpenRouter)
- Add structured LLM prompts and utilities (token estimation, cost calculation, JSON extraction)
- Implement RAG improvements with optimized chunking and embedding services
- Add financial extraction monitoring service
- Add parallel document processor
- Improve error handling with dedicated error handlers
- Add comprehensive TypeScript types for LLM, document, and processing
- Update optimized agentic RAG processor and simple document processor
2025-11-11 21:04:42 -05:00

81 lines
2.4 KiB
TypeScript

import { logger } from '../../utils/logger';
import type { ProcessingChunk } from './types';
const BATCH_SIZE = 10;
/**
* Enrich chunk metadata with additional analysis
*/
export function enrichChunkMetadata(chunk: ProcessingChunk): Record<string, any> {
const metadata: Record<string, any> = {
chunkSize: chunk.content.length,
wordCount: chunk.content.split(/\s+/).length,
sentenceCount: (chunk.content.match(/[.!?]+/g) || []).length,
hasNumbers: /\d/.test(chunk.content),
hasFinancialData: /revenue|ebitda|profit|margin|growth|valuation/i.test(chunk.content),
hasTechnicalData: /technology|software|platform|api|database/i.test(chunk.content),
processingTimestamp: new Date().toISOString()
};
return metadata;
}
/**
* Process chunks in batches to manage memory and API limits
*/
export async function processChunksInBatches(
chunks: ProcessingChunk[],
documentId: string,
options: {
enableMetadataEnrichment?: boolean;
similarityThreshold?: number;
}
): Promise<ProcessingChunk[]> {
const processedChunks: ProcessingChunk[] = [];
// Process chunks in batches
for (let i = 0; i < chunks.length; i += BATCH_SIZE) {
const batch = chunks.slice(i, i + BATCH_SIZE);
logger.info(`Processing batch ${Math.floor(i / BATCH_SIZE) + 1}/${Math.ceil(chunks.length / BATCH_SIZE)} for document: ${documentId}`);
// Process batch with concurrency control
const batchPromises = batch.map(async (chunk, batchIndex) => {
try {
// Add delay to respect API rate limits
if (batchIndex > 0) {
await new Promise(resolve => setTimeout(resolve, 100));
}
// Enrich metadata if enabled
if (options.enableMetadataEnrichment) {
chunk.metadata = {
...chunk.metadata,
...enrichChunkMetadata(chunk)
};
}
return chunk;
} catch (error) {
logger.error(`Failed to process chunk ${chunk.chunkIndex}`, error);
return null;
}
});
const batchResults = await Promise.all(batchPromises);
processedChunks.push(...batchResults.filter(chunk => chunk !== null) as ProcessingChunk[]);
// Force garbage collection between batches
if (global.gc) {
global.gc();
}
// Log memory usage
const memoryUsage = process.memoryUsage();
logger.info(`Batch completed. Memory usage: ${Math.round(memoryUsage.heapUsed / 1024 / 1024)}MB`);
}
return processedChunks;
}