Add agentic RAG implementation with enhanced document processing and LLM services

This commit is contained in:
Jon
2025-07-27 22:06:13 -04:00
parent c67dab22b4
commit 9c1b6d1327
18 changed files with 5804 additions and 1581 deletions

View File

@@ -1,795 +1,145 @@
import { Router, Request, Response, NextFunction } from 'express';
import { auth } from '../middleware/auth';
import { validateDocumentUpload } from '../middleware/validation';
import { handleFileUpload, cleanupUploadedFile } from '../middleware/upload';
import { fileStorageService } from '../services/fileStorageService';
import { uploadProgressService } from '../services/uploadProgressService';
import { documentProcessingService } from '../services/documentProcessingService';
import { jobQueueService } from '../services/jobQueueService';
import { DocumentModel } from '../models/DocumentModel';
import express from 'express';
import { authenticateToken } from '../middleware/auth';
import { documentController } from '../controllers/documentController';
import { unifiedDocumentProcessor } from '../services/unifiedDocumentProcessor';
import { logger } from '../utils/logger';
import { v4 as uuidv4 } from 'uuid';
import fs from 'fs';
const router = Router();
const router = express.Router();
// Apply authentication middleware to all document routes
router.use(auth);
// Apply authentication to all routes
router.use(authenticateToken);
// GET /api/documents - Get all documents for the authenticated user
router.get('/', async (req: Request, res: Response, next: NextFunction) => {
try {
const userId = (req as any).user.userId;
const documents = await DocumentModel.findByUserId(userId);
res.json({
success: true,
data: documents,
message: 'Documents retrieved successfully',
});
} catch (error) {
next(error);
}
});
// Existing routes
router.post('/upload', documentController.uploadDocument);
router.get('/', documentController.getDocuments);
router.get('/:id', documentController.getDocument);
router.get('/:id/progress', documentController.getDocumentProgress);
router.delete('/:id', documentController.deleteDocument);
// GET /api/documents/:id - Get a specific document
router.get('/:id', async (req: Request, res: Response, next: NextFunction) => {
// New RAG processing routes
router.post('/:id/process-rag', async (req, res) => {
try {
const { id } = req.params;
const userId = req.user?.id;
// Enhanced validation for document ID
if (!id || id === 'undefined' || id === 'null' || id.trim() === '') {
return res.status(400).json({
success: false,
error: 'Invalid document ID provided',
});
}
const userId = (req as any).user.userId;
// Check if user owns the document or is admin
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
if (!userId) {
return res.status(401).json({ error: 'User not authenticated' });
}
return res.json({
success: true,
data: document,
message: 'Document retrieved successfully',
});
} catch (error) {
return next(error);
}
});
// POST /api/documents - Upload and process a new document
router.post('/', validateDocumentUpload, handleFileUpload, async (req: Request, res: Response) => {
const uploadId = uuidv4();
const userId = (req as any).user.userId;
let uploadedFilePath: string | null = null;
try {
if (!req.file) {
return res.status(400).json({
success: false,
error: 'No file uploaded',
message: 'Please select a PDF file to upload',
});
}
const { processImmediately = false } = req.body;
const file = req.file;
uploadedFilePath = file.path;
// Store file using storage service
const storageResult = await fileStorageService.storeFile(file, userId);
// Get document text (you'll need to implement this)
const documentText = await documentController.getDocumentText(id);
if (!storageResult.success) {
throw new Error(storageResult.error || 'Failed to store file');
}
// Add document to database
const document = await DocumentModel.create({
user_id: userId,
original_file_name: file.originalname,
file_path: file.path,
file_size: file.size,
});
// Process document if requested
let processingJobId: string | null = null;
if (processImmediately) {
try {
processingJobId = await jobQueueService.addJob('document_processing', {
documentId: document.id,
userId,
});
logger.info(`Document processing job queued: ${document.id}`, {
jobId: processingJobId,
documentId: document.id,
userId,
});
} catch (processingError) {
logger.error('Failed to queue document processing', {
documentId: document.id,
error: processingError instanceof Error ? processingError.message : 'Unknown error',
});
// Don't fail the upload if processing fails
}
}
// Note: Don't clean up uploaded file here - it will be cleaned up after processing
// cleanupUploadedFile(uploadedFilePath);
return res.json({
success: true,
data: {
id: document.id,
uploadId,
processingJobId,
status: 'uploaded',
filename: file.originalname,
fileSize: file.size,
message: 'Document uploaded successfully',
},
});
} catch (error) {
// Clean up uploaded file on error
if (uploadedFilePath) {
cleanupUploadedFile(uploadedFilePath);
}
logger.error('Document upload failed', {
userId,
filename: req.file?.originalname,
error: error instanceof Error ? error.message : 'Unknown error',
});
return res.status(500).json({
success: false,
error: 'Upload failed',
message: error instanceof Error ? error.message : 'An error occurred during upload',
});
}
});
// POST /api/documents/:id/process - Start processing a document
router.post('/:id/process', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
// Enhanced validation for document ID
if (!id || id === 'undefined' || id === 'null' || id.trim() === '') {
return res.status(400).json({
success: false,
error: 'Invalid document ID provided',
});
}
const userId = (req as any).user.userId;
const { options } = req.body;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// Check if document is already being processed
if (document.status === 'processing_llm' || document.status === 'extracting_text' || document.status === 'generating_pdf') {
return res.status(400).json({
success: false,
error: 'Document is already being processed',
});
}
// Add processing job to queue
const jobId = await jobQueueService.addJob('document_processing', {
documentId: id,
userId,
options: options || {
extractText: true,
generateSummary: true,
performAnalysis: true,
},
}, 0, 3);
// Update document status
await DocumentModel.updateById(id, {
status: 'extracting_text',
processing_started_at: new Date(),
});
logger.info(`Document processing started: ${id}`, {
jobId,
userId,
options,
});
res.json({
success: true,
data: {
jobId,
documentId: id,
status: 'processing',
},
message: 'Document processing started',
});
} catch (error) {
return next(error);
}
});
// GET /api/documents/:id/processing-status - Get document processing status
router.get('/:id/processing-status', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
// Enhanced validation for document ID
if (!id || id === 'undefined' || id === 'null' || id.trim() === '') {
return res.status(400).json({
success: false,
error: 'Invalid document ID provided',
});
}
const userId = (req as any).user.userId;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// Get processing history
const processingHistory = await documentProcessingService.getDocumentProcessingHistory(id);
// Get current job status if processing
let currentJob = null;
if (document.status === 'processing_llm' || document.status === 'extracting_text' || document.status === 'generating_pdf') {
const jobs = jobQueueService.getAllJobs();
currentJob = [...jobs.queue, ...jobs.processing].find(job =>
job.data.documentId === id &&
(job.status === 'pending' || job.status === 'processing')
);
}
res.json({
success: true,
data: {
documentId: id,
status: document.status,
currentJob,
processingHistory,
extractedText: document.extracted_text,
summary: document.generated_summary,
analysis: null, // TODO: Add analysis data field to Document model
},
message: 'Processing status retrieved successfully',
});
} catch (error) {
return next(error);
}
});
// GET /api/documents/:id/progress - Get processing progress for a document
router.get('/:id/progress', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
// Enhanced validation for document ID
if (!id || id === 'undefined' || id === 'null' || id.trim() === '') {
return res.status(400).json({
success: false,
error: 'Invalid document ID provided',
});
}
const userId = (req as any).user.userId;
// Check if user owns the document or is admin
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// Get progress from progress service
let progress = uploadProgressService.getProgress(id);
// If no progress from service, check document status in database
if (!progress) {
// Check if document is completed in database
if (document.status === 'completed') {
progress = {
documentId: id,
jobId: '', // Document doesn't have job_id, will be empty for completed docs
status: 'completed',
step: 'storage',
progress: 100,
message: 'Document processing completed successfully',
startTime: document.created_at || new Date(),
};
} else if (document.status === 'processing_llm') {
progress = {
documentId: id,
jobId: '', // Document doesn't have job_id, will be empty for processing docs
status: 'processing',
step: 'summary_generation',
progress: 60,
message: 'Processing document with LLM...',
startTime: document.created_at || new Date(),
};
} else if (document.status === 'uploaded') {
progress = {
documentId: id,
jobId: '', // Document doesn't have job_id, will be empty for uploaded docs
status: 'processing',
step: 'validation',
progress: 10,
message: 'Document uploaded, waiting for processing...',
startTime: document.created_at || new Date(),
};
} else {
return res.status(404).json({
success: false,
error: 'No progress tracking found for this document',
});
}
}
return res.json({
success: true,
data: progress,
message: 'Progress retrieved successfully',
});
} catch (error) {
return next(error);
}
});
// GET /api/documents/queue/status - Get job queue status and active jobs
router.get('/queue/status', async (req: Request, res: Response, next: NextFunction) => {
try {
const userId = (req as any).user.userId;
// Get queue statistics
const stats = jobQueueService.getQueueStats();
// Get all jobs and filter to user's documents
const allJobs = jobQueueService.getAllJobs();
const userDocuments = await DocumentModel.findByUserId(userId);
const userDocumentIds = new Set(userDocuments.map(doc => doc.id));
// Filter active jobs to only show user's documents
const activeJobs = [...allJobs.queue, ...allJobs.processing]
.filter(job => userDocumentIds.has(job.data.documentId))
.map(job => ({
id: job.id,
type: job.type,
status: job.status,
createdAt: job.createdAt.toISOString(),
startedAt: job.startedAt?.toISOString(),
completedAt: job.completedAt?.toISOString(),
data: job.data,
}));
return res.json({
success: true,
data: {
stats,
activeJobs,
},
message: 'Queue status retrieved successfully',
});
} catch (error) {
return next(error);
}
});
// GET /api/documents/progress/all - Get all active processing progress
router.get('/progress/all', async (req: Request, res: Response, next: NextFunction) => {
try {
const userId = (req as any).user.userId;
// Get all progress and filter by user's documents
const allProgress = uploadProgressService.getAllProgress();
const userDocuments = await DocumentModel.findByUserId(userId);
const userDocumentIds = new Set(userDocuments.map(doc => doc.id));
// Filter progress to only show user's documents
const userProgress = allProgress.filter(progress =>
userDocumentIds.has(progress.documentId)
const result = await unifiedDocumentProcessor.processDocument(
id,
userId,
documentText,
{ strategy: 'rag' }
);
return res.json({
success: true,
data: userProgress,
message: 'Progress retrieved successfully',
res.json({
success: result.success,
processingStrategy: result.processingStrategy,
processingTime: result.processingTime,
apiCalls: result.apiCalls,
summary: result.summary,
analysisData: result.analysisData,
error: result.error
});
} catch (error) {
return next(error);
logger.error('RAG processing failed', { error });
res.status(500).json({ error: 'RAG processing failed' });
}
});
// POST /api/documents/:id/regenerate-summary - Regenerate summary for a document
router.post('/:id/regenerate-summary', async (req: Request, res: Response, next: NextFunction) => {
router.post('/:id/compare-strategies', async (req, res) => {
try {
const { id } = req.params;
const userId = req.user?.id;
// Enhanced validation for document ID
if (!id || id === 'undefined' || id === 'null' || id.trim() === '') {
return res.status(400).json({
success: false,
error: 'Invalid document ID provided',
});
if (!userId) {
return res.status(401).json({ error: 'User not authenticated' });
}
// Get document text
const documentText = await documentController.getDocumentText(id);
const userId = (req as any).user.userId;
// Check if user owns the document or is admin
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// Check if document has extracted text
if (!document.extracted_text) {
return res.status(400).json({
success: false,
error: 'Document has no extracted text to regenerate summary from',
});
}
// Start regeneration in background
documentProcessingService.regenerateSummary(id).catch(error => {
logger.error('Background summary regeneration failed', {
documentId: id,
error: error instanceof Error ? error.message : 'Unknown error'
});
});
return res.json({
success: true,
message: 'Summary regeneration started. Check document status for progress.',
});
} catch (error) {
return next(error);
}
});
// GET /api/documents/:id/download - Download document summary
router.get('/:id/download', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
if (!id) {
return res.status(400).json({
success: false,
error: 'Document ID is required',
});
}
const userId = (req as any).user.userId;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// Check if document is completed
if (document.status !== 'completed') {
return res.status(400).json({
success: false,
error: 'Document processing not completed',
});
}
// Try to serve PDF first, then markdown
let filePath = null;
let contentType = 'application/pdf';
let fileName = `${document.original_file_name.replace(/\.[^/.]+$/, '')}_summary.pdf`;
if (document.summary_pdf_path && fs.existsSync(document.summary_pdf_path)) {
filePath = document.summary_pdf_path;
} else if (document.summary_markdown_path && fs.existsSync(document.summary_markdown_path)) {
filePath = document.summary_markdown_path;
contentType = 'text/markdown';
fileName = `${document.original_file_name.replace(/\.[^/.]+$/, '')}_summary.md`;
} else {
// Create a simple text file with the summary
const summaryText = document.generated_summary || 'No summary available';
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', `attachment; filename="${fileName.replace('.pdf', '.txt')}"`);
return res.send(summaryText);
}
if (!filePath) {
return res.status(404).json({
success: false,
error: 'Summary file not found',
});
}
res.setHeader('Content-Type', contentType);
res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`);
res.sendFile(filePath);
logger.info(`Document downloaded: ${id}`, {
const comparison = await unifiedDocumentProcessor.compareProcessingStrategies(
id,
userId,
filename: document.original_file_name,
filePath,
});
} catch (error) {
return next(error);
}
});
// GET /api/documents/:id/file - Stream document file
router.get('/:id/file', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
if (!id) {
return res.status(400).json({
success: false,
error: 'Document ID is required',
});
}
const userId = (req as any).user.userId;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// TODO: Implement actual file streaming
// For now, return a placeholder response
return res.status(404).json({
success: false,
error: 'File not found',
message: 'File serving not yet implemented',
});
} catch (error) {
return next(error);
}
});
// POST /api/documents/:id/feedback - Submit feedback for document regeneration
router.post('/:id/feedback', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
if (!id) {
return res.status(400).json({
success: false,
error: 'Document ID is required',
});
}
const { feedback: _feedback } = req.body;
const userId = (req as any).user.userId;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// TODO: Implement feedback submission
// For now, return a placeholder response
return res.json({
success: true,
data: {
feedbackId: 'temp-feedback-id',
},
message: 'Feedback submitted successfully',
});
} catch (error) {
return next(error);
}
});
// POST /api/documents/:id/regenerate - Regenerate document with feedback
router.post('/:id/regenerate', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
if (!id) {
return res.status(400).json({
success: false,
error: 'Document ID is required',
});
}
const { feedbackId: _feedbackId } = req.body;
const userId = (req as any).user.userId;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// TODO: Implement document regeneration
// For now, return a placeholder response
return res.json({
success: true,
data: {
jobId: 'temp-job-id',
status: 'processing',
},
message: 'Document regeneration started',
});
} catch (error) {
return next(error);
}
});
// DELETE /api/documents/:id - Delete a document
router.delete('/:id', async (req: Request, res: Response, next: NextFunction) => {
try {
const { id } = req.params;
if (!id) {
return res.status(400).json({
success: false,
error: 'Document ID is required',
});
}
const userId = (req as any).user.userId;
const document = await DocumentModel.findById(id);
if (!document) {
return res.status(404).json({
success: false,
error: 'Document not found',
});
}
// Check if user owns the document or is admin
if (document.user_id !== userId && (req as any).user.role !== 'admin') {
return res.status(403).json({
success: false,
error: 'Access denied',
});
}
// Cancel any pending processing jobs
const jobs = jobQueueService.getAllJobs();
const documentJobs = [...jobs.queue, ...jobs.processing].filter(job =>
job.data.documentId === id
documentText
);
documentJobs.forEach(job => {
jobQueueService.cancelJob(job.id);
res.json({
winner: comparison.winner,
performanceMetrics: comparison.performanceMetrics,
chunking: {
success: comparison.chunking.success,
processingTime: comparison.chunking.processingTime,
apiCalls: comparison.chunking.apiCalls,
error: comparison.chunking.error
},
rag: {
success: comparison.rag.success,
processingTime: comparison.rag.processingTime,
apiCalls: comparison.rag.apiCalls,
error: comparison.rag.error
}
});
// Delete the file from storage
if (document.file_path) {
await fileStorageService.deleteFile(document.file_path);
}
// Delete the document record
const deleted = await DocumentModel.delete(id);
if (!deleted) {
return res.status(500).json({
success: false,
error: 'Failed to delete document',
});
}
logger.info(`Document deleted: ${id}`, {
userId,
filename: document.original_file_name,
cancelledJobs: documentJobs.length,
});
return res.json({
success: true,
message: 'Document deleted successfully',
});
} catch (error) {
return next(error);
logger.error('Strategy comparison failed', { error });
res.status(500).json({ error: 'Strategy comparison failed' });
}
});
router.get('/processing-stats', async (req, res) => {
try {
const stats = await unifiedDocumentProcessor.getProcessingStats();
res.json(stats);
} catch (error) {
logger.error('Failed to get processing stats', { error });
res.status(500).json({ error: 'Failed to get processing stats' });
}
});
router.post('/:id/switch-strategy', async (req, res) => {
try {
const { id } = req.params;
const { strategy } = req.body;
const userId = req.user?.id;
if (!userId) {
return res.status(401).json({ error: 'User not authenticated' });
}
if (!['chunking', 'rag'].includes(strategy)) {
return res.status(400).json({ error: 'Invalid strategy. Must be "chunking" or "rag"' });
}
// Get document text
const documentText = await documentController.getDocumentText(id);
const result = await unifiedDocumentProcessor.switchStrategy(
id,
userId,
documentText,
strategy
);
res.json({
success: result.success,
processingStrategy: result.processingStrategy,
processingTime: result.processingTime,
apiCalls: result.apiCalls,
summary: result.summary,
analysisData: result.analysisData,
error: result.error
});
} catch (error) {
logger.error('Strategy switch failed', { error });
res.status(500).json({ error: 'Strategy switch failed' });
}
});