-- Processing Jobs Table -- This table stores document processing jobs that need to be executed -- Replaces the in-memory job queue with persistent database storage CREATE TABLE IF NOT EXISTS processing_jobs ( -- Primary key id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), -- Job data document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE, user_id TEXT NOT NULL, -- Job status and progress status TEXT NOT NULL CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'retrying')), attempts INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 3, -- Processing options (stored as JSONB) options JSONB, -- Timestamps created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), started_at TIMESTAMP WITH TIME ZONE, completed_at TIMESTAMP WITH TIME ZONE, updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), -- Error tracking error TEXT, last_error_at TIMESTAMP WITH TIME ZONE, -- Result storage result JSONB ); -- Indexes for efficient querying CREATE INDEX IF NOT EXISTS idx_processing_jobs_status ON processing_jobs(status); CREATE INDEX IF NOT EXISTS idx_processing_jobs_created_at ON processing_jobs(created_at); CREATE INDEX IF NOT EXISTS idx_processing_jobs_document_id ON processing_jobs(document_id); CREATE INDEX IF NOT EXISTS idx_processing_jobs_user_id ON processing_jobs(user_id); CREATE INDEX IF NOT EXISTS idx_processing_jobs_pending ON processing_jobs(status, created_at) WHERE status = 'pending'; -- Function to automatically update updated_at timestamp CREATE OR REPLACE FUNCTION update_processing_jobs_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; -- Trigger to call the update function DROP TRIGGER IF EXISTS set_processing_jobs_updated_at ON processing_jobs; CREATE TRIGGER set_processing_jobs_updated_at BEFORE UPDATE ON processing_jobs FOR EACH ROW EXECUTE FUNCTION update_processing_jobs_updated_at(); -- Grant permissions (adjust role name as needed) -- ALTER TABLE processing_jobs ENABLE ROW LEVEL SECURITY; -- Optional: Create a view for monitoring CREATE OR REPLACE VIEW processing_jobs_summary AS SELECT status, COUNT(*) as count, AVG(EXTRACT(EPOCH FROM (COALESCE(completed_at, NOW()) - created_at))) as avg_duration_seconds, MAX(created_at) as latest_created_at FROM processing_jobs GROUP BY status; -- Comments for documentation COMMENT ON TABLE processing_jobs IS 'Stores document processing jobs for async background processing'; COMMENT ON COLUMN processing_jobs.status IS 'Current status: pending, processing, completed, failed, retrying'; COMMENT ON COLUMN processing_jobs.attempts IS 'Number of processing attempts made'; COMMENT ON COLUMN processing_jobs.max_attempts IS 'Maximum number of retry attempts allowed'; COMMENT ON COLUMN processing_jobs.options IS 'Processing options and configuration (JSON)'; COMMENT ON COLUMN processing_jobs.error IS 'Last error message if processing failed';