141 lines
4.2 KiB
Python
141 lines
4.2 KiB
Python
"""
|
|
Celery configuration for background task processing.
|
|
"""
|
|
|
|
from celery import Celery
|
|
from celery.schedules import crontab
|
|
import structlog
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# Create Celery app
|
|
celery_app = Celery(
|
|
"virtual_board_member",
|
|
broker=settings.CELERY_BROKER_URL,
|
|
backend=settings.CELERY_RESULT_BACKEND,
|
|
include=[
|
|
"app.tasks.document_processing",
|
|
"app.tasks.commitment_extraction",
|
|
"app.tasks.notifications",
|
|
"app.tasks.analytics",
|
|
]
|
|
)
|
|
|
|
# Configure Celery
|
|
celery_app.conf.update(
|
|
task_serializer=settings.CELERY_TASK_SERIALIZER,
|
|
result_serializer=settings.CELERY_RESULT_SERIALIZER,
|
|
accept_content=settings.CELERY_ACCEPT_CONTENT,
|
|
timezone=settings.CELERY_TIMEZONE,
|
|
enable_utc=settings.CELERY_ENABLE_UTC,
|
|
task_track_started=True,
|
|
task_time_limit=30 * 60, # 30 minutes
|
|
task_soft_time_limit=25 * 60, # 25 minutes
|
|
worker_prefetch_multiplier=1,
|
|
worker_max_tasks_per_child=1000,
|
|
result_expires=3600, # 1 hour
|
|
task_always_eager=settings.TESTING, # Run tasks synchronously in tests
|
|
)
|
|
|
|
# Configure periodic tasks
|
|
celery_app.conf.beat_schedule = {
|
|
# Daily commitment reminders
|
|
"daily-commitment-reminders": {
|
|
"task": "app.tasks.notifications.send_commitment_reminders",
|
|
"schedule": crontab(hour=9, minute=0), # 9 AM daily
|
|
},
|
|
|
|
# Weekly analytics report
|
|
"weekly-analytics-report": {
|
|
"task": "app.tasks.analytics.generate_weekly_report",
|
|
"schedule": crontab(day_of_week=1, hour=8, minute=0), # Monday 8 AM
|
|
},
|
|
|
|
# Daily document processing cleanup
|
|
"daily-document-cleanup": {
|
|
"task": "app.tasks.document_processing.cleanup_old_documents",
|
|
"schedule": crontab(hour=2, minute=0), # 2 AM daily
|
|
},
|
|
|
|
# Hourly health check
|
|
"hourly-health-check": {
|
|
"task": "app.tasks.system.health_check",
|
|
"schedule": crontab(minute=0), # Every hour
|
|
},
|
|
|
|
# Daily audit log cleanup
|
|
"daily-audit-cleanup": {
|
|
"task": "app.tasks.system.cleanup_audit_logs",
|
|
"schedule": crontab(hour=3, minute=0), # 3 AM daily
|
|
},
|
|
}
|
|
|
|
# Task routing
|
|
celery_app.conf.task_routes = {
|
|
"app.tasks.document_processing.*": {"queue": "document_processing"},
|
|
"app.tasks.commitment_extraction.*": {"queue": "commitment_extraction"},
|
|
"app.tasks.notifications.*": {"queue": "notifications"},
|
|
"app.tasks.analytics.*": {"queue": "analytics"},
|
|
"app.tasks.system.*": {"queue": "system"},
|
|
}
|
|
|
|
# Task annotations for specific configurations
|
|
celery_app.conf.task_annotations = {
|
|
"app.tasks.document_processing.process_large_document": {
|
|
"rate_limit": "10/m", # 10 per minute
|
|
"time_limit": 1800, # 30 minutes
|
|
},
|
|
"app.tasks.commitment_extraction.extract_commitments": {
|
|
"rate_limit": "50/m", # 50 per minute
|
|
"time_limit": 300, # 5 minutes
|
|
},
|
|
"app.tasks.analytics.generate_weekly_report": {
|
|
"rate_limit": "1/h", # 1 per hour
|
|
"time_limit": 600, # 10 minutes
|
|
},
|
|
}
|
|
|
|
# Error handling
|
|
@celery_app.task(bind=True)
|
|
def debug_task(self):
|
|
"""Debug task for testing."""
|
|
logger.info(f"Request: {self.request!r}")
|
|
|
|
|
|
# Task failure handling
|
|
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
|
|
def retry_task(self, *args, **kwargs):
|
|
"""Base task with retry logic."""
|
|
try:
|
|
# Task logic here
|
|
pass
|
|
except Exception as exc:
|
|
logger.error(
|
|
"Task failed",
|
|
task_name=self.name,
|
|
task_id=self.request.id,
|
|
error=str(exc),
|
|
retry_count=self.request.retries
|
|
)
|
|
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
|
|
|
|
|
|
# Health check task
|
|
@celery_app.task
|
|
def health_check():
|
|
"""Health check task for monitoring."""
|
|
logger.info("Celery health check completed")
|
|
return {"status": "healthy", "timestamp": "2025-01-01T00:00:00Z"}
|
|
|
|
|
|
# Cleanup task
|
|
@celery_app.task
|
|
def cleanup_old_data():
|
|
"""Cleanup old data and temporary files."""
|
|
logger.info("Starting data cleanup")
|
|
# TODO: Implement cleanup logic
|
|
logger.info("Data cleanup completed")
|
|
return {"status": "completed", "cleaned_items": 0}
|