Django Celery Expert Overview
This skill provides expert guidance for Django applications using Celery for asynchronous task processing. It covers task design patterns, worker configuration, error handling, monitoring, and production deployment strategies.
Key Capabilities:
Task design and implementation patterns Worker configuration and scaling Error handling and retry strategies Periodic/scheduled task management Monitoring and observability Production deployment best practices When to Use
Invoke this skill when you encounter these triggers:
Task Design:
"Create a Celery task for..." "Move this to a background job" "Process this asynchronously" "Handle this outside the request"
Configuration & Setup:
"Configure Celery for Django" "Set up task queues" "Configure Celery workers" "Set up Celery Beat for scheduling"
Error Handling:
"Handle task failures" "Implement retry logic" "Task keeps failing" "Set up dead letter queue"
Performance & Scaling:
"Scale Celery workers" "Optimize task throughput" "Tasks are too slow" "Handle high task volume"
Monitoring:
"Monitor Celery tasks" "Set up Flower" "Track task progress" "Debug stuck tasks" Instructions
Follow this workflow when handling Django Celery requests:
- Analyze the Request
Identify the task type:
Simple background task (fire-and-forget) Task with result tracking (need to poll for completion) Chained/grouped tasks (workflow orchestration) Periodic/scheduled tasks (cron-like behavior) Long-running tasks (need progress tracking)
Key questions:
Does the caller need the result? Should failures be retried? Is idempotency required? What's the expected execution time? How critical is guaranteed execution? 2. Load Relevant Reference Documentation
Based on the task type, reference the appropriate bundled documentation:
Django-specific patterns -> references/django-integration.md Task implementation -> references/task-design-patterns.md Configuration & setup -> references/configuration-guide.md Error handling & retries -> references/error-handling.md Periodic tasks -> references/periodic-tasks.md Monitoring & debugging -> references/monitoring-observability.md Production deployment -> references/production-deployment.md 3. Implement Following Best Practices
Task design principles:
Keep tasks small and focused Design for idempotency when possible Use explicit task names Bind tasks for access to self Pass serializable arguments only (IDs, not objects)
Error handling:
Configure appropriate retry behavior Use exponential backoff Set max retry limits Handle specific exceptions appropriately Log failures with context
Performance:
Use appropriate serializers (JSON for safety, pickle for Python objects) Configure prefetch limits Use task routing for prioritization Batch operations when appropriate Monitor memory usage 4. Validate Implementation
Before presenting the solution:
Verify task is idempotent if retries enabled Check serialization of arguments Ensure proper error handling Verify monitoring/logging is in place Consider failure scenarios Bundled Resources
references/ - Comprehensive Celery documentation loaded into context as needed
references/django-integration.md
transaction.on_commit() for safe task queuing Database as source of truth with recovery tasks Request-task correlation with django-guid Testing Django Celery tasks
references/task-design-patterns.md
Task signatures and calling patterns Binding and accessing task properties Task inheritance and base classes Workflow patterns (chains, groups, chords) Idempotency and exactly-once delivery
references/configuration-guide.md
Django-Celery integration setup Broker configuration (Redis, RabbitMQ) Result backend options Worker settings and concurrency Task routing and queues
references/error-handling.md
Retry strategies and backoff Exception handling patterns Dead letter queues Task rejection and requeue Timeout handling
references/periodic-tasks.md
Celery Beat configuration Crontab and interval schedules Django database scheduler Dynamic schedule management Timezone considerations
references/monitoring-observability.md
Flower setup and usage Prometheus/Grafana integration Task event monitoring Logging best practices Debugging stuck tasks
references/production-deployment.md
Worker deployment patterns Process supervision (systemd, supervisor) Containerized deployments Scaling strategies Health checks and graceful shutdown Examples Example 1: Basic Background Task
User Request:
"Send welcome emails in the background after user registration"
Implementation:
tasks.py
from celery import shared_task from django.core.mail import send_mail
@shared_task(bind=True, max_retries=3) def send_welcome_email(self, user_id): from users.models import User
try:
user = User.objects.get(id=user_id)
send_mail(
subject="Welcome!",
message=f"Hi {user.name}, welcome to our platform!",
from_email="noreply@example.com",
recipient_list=[user.email],
)
except User.DoesNotExist:
pass # User deleted, skip
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
views.py
def register(request): user = User.objects.create(...) send_welcome_email.delay(user.id) # Fire and forget return redirect('dashboard')
Example 2: Task with Progress Tracking
User Request:
"Process a large CSV import with progress updates"
Implementation:
@shared_task(bind=True) def import_csv(self, file_path, total_rows): from myapp.models import Record
with open(file_path) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
Record.objects.create(**row)
if i % 100 == 0:
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total_rows}
)
return {'status': 'complete', 'processed': total_rows}
Check progress
result = import_csv.AsyncResult(task_id) if result.state == 'PROGRESS': progress = result.info.get('current', 0) / result.info.get('total', 1)
Example 3: Workflow with Chains
User Request:
"Process an order: validate inventory, charge payment, then send confirmation"
Implementation:
from celery import chain
@shared_task def validate_inventory(order_id): # Returns order_id if valid, raises if not order = Order.objects.get(id=order_id) if not order.items_in_stock(): raise ValueError("Items out of stock") return order_id
@shared_task def charge_payment(order_id): order = Order.objects.get(id=order_id) order.charge() return order_id
@shared_task def send_confirmation(order_id): order = Order.objects.get(id=order_id) order.send_confirmation_email()
def process_order(order_id): workflow = chain( validate_inventory.s(order_id), charge_payment.s(), send_confirmation.s() ) workflow.delay()
Additional Notes
Common Pitfalls:
Passing Django model instances instead of IDs Not handling task idempotency with retries Missing timeout configuration for long tasks Not monitoring task queue depth Ignoring result backend cleanup
Django Integration:
Use django-celery-beat for database-backed schedules Use django-celery-results for storing task results in Django Configure CELERY_ settings in Django settings.py Use @shared_task for reusable apps
Security:
Never pass sensitive data in task arguments Use signed serializers if pickle is required Restrict Flower access in production Validate task arguments