import { EventEmitter } from 'events'; import path from 'path'; import { logger, StructuredLogger } from '../utils/logger'; import { config } from '../config/env'; import { unifiedDocumentProcessor } from './unifiedDocumentProcessor'; import { uploadMonitoringService } from './uploadMonitoringService'; // Define ProcessingOptions interface locally since documentProcessingService was removed export interface ProcessingOptions { strategy?: string; fileBuffer?: Buffer; fileName?: string; mimeType?: string; [key: string]: any; } export interface Job { id: string; type: 'document_processing'; data: { documentId: string; userId: string; options?: ProcessingOptions; }; status: 'pending' | 'processing' | 'completed' | 'failed' | 'retrying'; priority: number; attempts: number; maxAttempts: number; createdAt: Date; startedAt?: Date; completedAt?: Date; error?: string; result?: any; } export interface JobQueueConfig { maxConcurrentJobs: number; defaultMaxAttempts: number; retryDelayMs: number; maxRetryDelayMs: number; cleanupIntervalMs: number; maxJobAgeMs: number; } class JobQueueService extends EventEmitter { private queue: Job[] = []; private processing: Job[] = []; private config: JobQueueConfig; private isRunning = false; private cleanupInterval: any = null; constructor(config: Partial = {}) { super(); this.config = { maxConcurrentJobs: 3, defaultMaxAttempts: 3, retryDelayMs: 5000, maxRetryDelayMs: 300000, // 5 minutes cleanupIntervalMs: 300000, // 5 minutes maxJobAgeMs: 24 * 60 * 60 * 1000, // 24 hours ...config, }; this.startCleanupInterval(); } /** * Add a job to the queue */ async addJob( type: 'document_processing', data: { documentId: string; userId: string; options?: ProcessingOptions }, priority: number = 0, maxAttempts?: number ): Promise { const jobId = `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; const structuredLogger = new StructuredLogger(); const job: Job = { id: jobId, type, data, status: 'pending', priority, attempts: 0, maxAttempts: maxAttempts || this.config.defaultMaxAttempts, createdAt: new Date(), }; this.queue.push(job); this.queue.sort((a, b) => b.priority - a.priority); // Higher priority first // Track job queue operation uploadMonitoringService.trackUploadEvent({ userId: data.userId, fileInfo: { originalName: `document_${data.documentId}`, size: 0, mimetype: 'application/octet-stream', }, status: 'started', stage: 'job_queued', correlationId: jobId, }); structuredLogger.jobQueueOperation('add_job', jobId, 'pending'); logger.info(`Job added to queue: ${jobId}`, { type, documentId: data.documentId, userId: data.userId, priority, queueLength: this.queue.length, }); this.emit('job:added', job); this.processNextJob(); return jobId; } /** * Process the next job in the queue */ private async processNextJob(): Promise { if (!this.isRunning || this.processing.length >= this.config.maxConcurrentJobs) { return; } const job = this.queue.shift(); if (!job) { return; } this.processing.push(job); job.status = 'processing'; job.startedAt = new Date(); job.attempts++; logger.info(`Starting job processing: ${job.id}`, { type: job.type, attempts: job.attempts, processingCount: this.processing.length, }); this.emit('job:started', job); logger.info(`Job execution started: ${job.id}`, { jobId: job.id, type: job.type, documentId: job.data.documentId, userId: job.data.userId, attempts: job.attempts, maxAttempts: job.maxAttempts }); try { const result = await this.executeJob(job); logger.info(`Job execution completed successfully: ${job.id}`, { jobId: job.id, documentId: job.data.documentId }); job.status = 'completed'; job.completedAt = new Date(); job.result = result; const processingTime = job.completedAt.getTime() - job.startedAt!.getTime(); // Track job completion uploadMonitoringService.trackUploadEvent({ userId: job.data.userId, fileInfo: { originalName: `document_${job.data.documentId}`, size: 0, mimetype: 'application/octet-stream', }, status: 'success', stage: 'job_completed', processingTime, correlationId: job.id, }); const structuredLogger = new StructuredLogger(); structuredLogger.jobQueueOperation('job_completed', job.id, 'completed'); logger.info(`Job completed successfully: ${job.id}`, { processingTime, }); this.emit('job:completed', job); } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; const errorStack = error instanceof Error ? error.stack : undefined; logger.error(`Job ${job.id} execution failed`, { jobId: job.id, documentId: job.data.documentId, error: errorMessage, stack: errorStack, attempts: job.attempts, maxAttempts: job.maxAttempts }); job.error = errorMessage; job.status = 'failed'; const processingTime = job.startedAt ? Date.now() - job.startedAt.getTime() : 0; // Track job failure uploadMonitoringService.trackUploadEvent({ userId: job.data.userId, fileInfo: { originalName: `document_${job.data.documentId}`, size: 0, mimetype: 'application/octet-stream', }, status: 'failed', stage: 'job_failed', error: { message: errorMessage, type: 'job_processing_error', }, processingTime, correlationId: job.id, }); const structuredLogger = new StructuredLogger(); structuredLogger.jobQueueOperation('job_failed', job.id, 'failed', error); logger.error(`Job failed: ${job.id}`, { error: errorMessage, attempts: job.attempts, maxAttempts: job.maxAttempts, }); this.emit('job:failed', job); // Retry logic if (job.attempts < job.maxAttempts) { await this.retryJob(job); } else { logger.error(`Job exceeded max attempts: ${job.id}`, { attempts: job.attempts, maxAttempts: job.maxAttempts, }); this.emit('job:max_attempts_exceeded', job); } } finally { // Remove from processing array const index = this.processing.findIndex(j => j.id === job.id); if (index !== -1) { this.processing.splice(index, 1); } // Process next job setImmediate(() => this.processNextJob()); } } /** * Execute a specific job */ private async executeJob(job: Job): Promise { // Add timeout handling to prevent stuck jobs const timeoutMs = 15 * 60 * 1000; // 15 minutes timeout const timeoutPromise = new Promise((_, reject) => { setTimeout(() => { reject(new Error(`Job ${job.id} timed out after ${timeoutMs / 1000 / 60} minutes`)); }, timeoutMs); }); const jobPromise = (async () => { switch (job.type) { case 'document_processing': return await this.processDocumentJob(job); default: throw new Error(`Unknown job type: ${job.type}`); } })(); try { return await Promise.race([jobPromise, timeoutPromise]); } catch (error) { logger.error(`Job ${job.id} failed or timed out`, { jobId: job.id, error: error instanceof Error ? error.message : 'Unknown error' }); throw error; } } /** * Process a document processing job */ private async processDocumentJob(job: Job): Promise { const { documentId, userId, options } = job.data; logger.info('Starting document processing job', { jobId: job.id, documentId, userId, strategy: options?.strategy }); // Update job status in database await this.updateJobStatus(job.id, 'processing'); // Get document record to find file path const { DocumentModel } = await import('../models/DocumentModel'); const document = await DocumentModel.findById(documentId); if (!document) { throw new Error(`Document ${documentId} not found`); } logger.info('Document found, downloading file', { documentId, filePath: document.file_path, fileName: document.original_file_name }); // Download file from GCS for processing const { fileStorageService } = await import('./fileStorageService'); let fileBuffer: Buffer | null = null; // Retry file download up to 3 times for (let attempt = 1; attempt <= 3; attempt++) { try { const waitTime = 2000 * attempt; if (attempt > 1) { logger.info(`File download retry attempt ${attempt}`, { documentId, waitTime }); await new Promise(resolve => setTimeout(resolve, waitTime)); } fileBuffer = await fileStorageService.getFile(document.file_path); if (fileBuffer) { logger.info(`File downloaded successfully on attempt ${attempt}`, { documentId, fileSize: fileBuffer.length }); break; } } catch (error) { logger.error(`File download attempt ${attempt} failed`, { documentId, error: error instanceof Error ? error.message : String(error), attempt }); if (attempt === 3) { throw new Error(`Failed to download file after ${attempt} attempts: ${error instanceof Error ? error.message : String(error)}`); } } } if (!fileBuffer) { throw new Error('Failed to download file from storage'); } // Use unified processor for strategy-aware processing const strategy = options?.strategy || config.processingStrategy; logger.info('Processing document with unified processor', { documentId, strategy, jobId: job.id, fileSize: fileBuffer.length, fileName: document.original_file_name }); try { const result = await unifiedDocumentProcessor.processDocument( documentId, userId, '', // text will be extracted by the processor { strategy, fileBuffer: fileBuffer, fileName: document.original_file_name, mimeType: 'application/pdf', ...options } ); // Update document with processing results const { DocumentModel } = await import('../models/DocumentModel'); const updateData: any = { status: 'completed', processing_completed_at: new Date().toISOString() }; // Check if result has valid analysis data if (result.success && result.analysisData && Object.keys(result.analysisData).length > 0) { updateData.analysis_data = result.analysisData; logger.info('Analysis data saved to document', { documentId, analysisDataKeys: Object.keys(result.analysisData), hasSummary: !!result.summary, summaryLength: result.summary?.length || 0 }); } else { logger.warn('Processing completed but analysisData is empty or invalid', { documentId, success: result.success, hasAnalysisData: !!result.analysisData, analysisDataKeys: result.analysisData ? Object.keys(result.analysisData) : [], hasSummary: !!result.summary, error: result.error }); // Still save whatever we have, but log the issue if (result.analysisData) { updateData.analysis_data = result.analysisData; } // If no analysis data, mark as failed if (!result.analysisData || Object.keys(result.analysisData).length === 0) { throw new Error(result.error || 'Processing completed but no analysis data was generated'); } } // Save generated summary if available if (result.summary) { updateData.generated_summary = result.summary; } // Generate PDF from the summary if available if (result.summary) { try { const { pdfGenerationService } = await import('./pdfGenerationService'); const { fileStorageService } = await import('./fileStorageService'); // Generate PDF buffer const pdfBuffer = await pdfGenerationService.generateCIMReviewPDF(result.analysisData); if (pdfBuffer) { // Save PDF to GCS const timestamp = Date.now(); const pdfFilename = `${documentId}_cim_review_${timestamp}.pdf`; const pdfPath = `summaries/${pdfFilename}`; // Upload PDF buffer to GCS using the new method const saved = await fileStorageService.saveBuffer(pdfBuffer, pdfPath, 'application/pdf'); if (saved) { // Note: summary_pdf_path column doesn't exist in current database schema // updateData.summary_pdf_path = pdfPath; logger.info(`PDF generated and uploaded to GCS successfully for document: ${documentId}`, { pdfPath }); } else { logger.warn(`Failed to upload PDF to GCS for document: ${documentId}`); } } else { logger.warn(`Failed to generate PDF for document: ${documentId}`); } } catch (error) { logger.error(`Error generating PDF for document: ${documentId}`, { error }); } } await DocumentModel.updateById(documentId, updateData); logger.info(`Document ${documentId} processing completed successfully`, { jobId: job.id, processingTime: result.processingTime, strategy: result.processingStrategy }); // Update job status in database await this.updateJobStatus(job.id, 'completed'); return result; } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Processing failed'; const errorStack = error instanceof Error ? error.stack : undefined; logger.error(`Document ${documentId} processing failed in job queue`, { jobId: job.id, documentId, userId, error: errorMessage, stack: errorStack, errorDetails: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { type: typeof error, value: String(error) } }); // Update document status to failed try { const { DocumentModel } = await import('../models/DocumentModel'); await DocumentModel.updateById(documentId, { status: 'failed', error_message: errorMessage }); logger.info('Document status updated to failed', { documentId }); } catch (updateError) { logger.error('Failed to update document status to failed', { documentId, updateError: updateError instanceof Error ? updateError.message : String(updateError) }); } // Update job status to failed await this.updateJobStatus(job.id, 'failed'); throw error; } } /** * Retry a failed job */ private async retryJob(job: Job): Promise { const delay = Math.min( this.config.retryDelayMs * Math.pow(2, job.attempts - 1), this.config.maxRetryDelayMs ); job.status = 'retrying'; logger.info(`Scheduling job retry: ${job.id}`, { delay, attempts: job.attempts, maxAttempts: job.maxAttempts, }); this.emit('job:retrying', job); setTimeout(() => { job.status = 'pending'; this.queue.push(job); this.queue.sort((a, b) => b.priority - a.priority); this.processNextJob(); }, delay); } /** * Get job status */ getJobStatus(jobId: string): Job | null { // Check processing jobs const processingJob = this.processing.find(j => j.id === jobId); if (processingJob) { return processingJob; } // Check queued jobs const queuedJob = this.queue.find(j => j.id === jobId); if (queuedJob) { return queuedJob; } return null; } /** * Get all jobs */ getAllJobs(): { queue: Job[]; processing: Job[] } { return { queue: [...this.queue], processing: [...this.processing], }; } /** * Clear stuck jobs that have been processing for too long */ clearStuckJobs(): number { const stuckThreshold = 20 * 60 * 1000; // 20 minutes const now = new Date(); let clearedCount = 0; this.processing = this.processing.filter(job => { if (job.startedAt && (now.getTime() - job.startedAt.getTime()) > stuckThreshold) { logger.warn(`Clearing stuck job: ${job.id}`, { jobId: job.id, startedAt: job.startedAt, processingTime: now.getTime() - job.startedAt.getTime() }); clearedCount++; return false; } return true; }); return clearedCount; } /** * Get queue statistics */ getQueueStats(): { queueLength: number; processingCount: number; totalJobs: number; completedJobs: number; failedJobs: number; } { return { queueLength: this.queue.length, processingCount: this.processing.length, totalJobs: this.queue.length + this.processing.length, completedJobs: 0, // TODO: Track completed jobs failedJobs: 0, // TODO: Track failed jobs }; } /** * Get queue statistics for a specific user */ getUserQueueStats(userId?: string): { pending: number; processing: number; completed: number; failed: number; } { if (!userId) { return { pending: this.queue.length, processing: this.processing.length, completed: 0, failed: 0 }; } const userQueueJobs = this.queue.filter(job => job.data.userId === userId); const userProcessingJobs = this.processing.filter(job => job.data.userId === userId); return { pending: userQueueJobs.length, processing: userProcessingJobs.length, completed: 0, // TODO: Track completed jobs per user failed: 0 // TODO: Track failed jobs per user }; } /** * Cancel a job */ cancelJob(jobId: string): boolean { // Check processing jobs const processingIndex = this.processing.findIndex(j => j.id === jobId); if (processingIndex !== -1) { const job = this.processing[processingIndex]; if (job) { job.status = 'failed'; job.error = 'Job cancelled'; this.processing.splice(processingIndex, 1); } logger.info(`Job cancelled: ${jobId}`); this.emit('job:cancelled', job); return true; } // Check queued jobs const queueIndex = this.queue.findIndex(j => j.id === jobId); if (queueIndex !== -1) { const job = this.queue[queueIndex]; if (job) { job.status = 'failed'; job.error = 'Job cancelled'; this.queue.splice(queueIndex, 1); } logger.info(`Job cancelled: ${jobId}`); this.emit('job:cancelled', job); return true; } return false; } /** * Start the job queue */ start(): void { if (this.isRunning) { return; } this.isRunning = true; logger.info('Job queue started', { maxConcurrentJobs: this.config.maxConcurrentJobs, }); this.emit('queue:started'); this.processNextJob(); } /** * Pause the job queue */ pause(): void { this.isRunning = false; logger.info('Job queue paused'); this.emit('queue:paused'); } /** * Resume the job queue */ resume(): void { this.isRunning = true; logger.info('Job queue resumed'); this.emit('queue:resumed'); this.processNextJob(); } /** * Clear the queue */ clearQueue(): number { const count = this.queue.length; this.queue = []; logger.info(`Queue cleared: ${count} jobs removed`); this.emit('queue:cleared', count); return count; } /** * Start cleanup interval */ private startCleanupInterval(): void { this.cleanupInterval = setInterval(() => { this.cleanupOldJobs(); }, this.config.cleanupIntervalMs); } /** * Clean up old completed/failed jobs */ private cleanupOldJobs(): void { const cutoffTime = Date.now() - this.config.maxJobAgeMs; let cleanedCount = 0; // Clear stuck jobs first const stuckJobsCleared = this.clearStuckJobs(); cleanedCount += stuckJobsCleared; // Clean up processing jobs that are too old this.processing = this.processing.filter(job => { if (job.createdAt.getTime() < cutoffTime) { cleanedCount++; logger.info(`Cleaned up old processing job: ${job.id}`); return false; } return true; }); // Clean up queued jobs that are too old this.queue = this.queue.filter(job => { if (job.createdAt.getTime() < cutoffTime) { cleanedCount++; logger.info(`Cleaned up old queued job: ${job.id}`); return false; } return true; }); if (cleanedCount > 0) { logger.info(`Cleaned up ${cleanedCount} old/stuck jobs (${stuckJobsCleared} stuck)`); this.emit('queue:cleaned', cleanedCount); } } /** * Update job status in database */ private async updateJobStatus(jobId: string, status: string): Promise { // Note: Job queue service manages jobs in memory, database jobs are separate // This method is kept for potential future integration but currently disabled // to avoid warnings about missing job_id values in database logger.debug(`Job queue status update (in-memory): ${jobId} -> ${status}`); } /** * Stop the service and cleanup */ stop(): void { this.isRunning = false; if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } this.queue = []; this.processing = []; this.removeAllListeners(); logger.info('Job queue service stopped'); } } export const jobQueueService = new JobQueueService(); export default jobQueueService;