python-background-jobs

安装量: 2.6K
排名: #787

安装

npx skills add https://github.com/wshobson/agents --skill python-background-jobs

Python Background Jobs & Task Queues Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously. When to Use This Skill Processing tasks that take longer than a few seconds Sending emails, notifications, or webhooks Generating reports or exporting data Processing uploads or media transformations Integrating with unreliable external services Building event-driven architectures Core Concepts 1. Task Queue Pattern API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously. 2. Idempotency Tasks may be retried on failure. Design for safe re-execution. 3. Job State Machine Jobs transition through states: pending → running → succeeded/failed. 4. At-Least-Once Delivery Most queues guarantee at-least-once delivery. Your code must handle duplicates. Quick Start This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices. from celery import Celery app = Celery ( "tasks" , broker = "redis://localhost:6379" ) @app . task def send_email ( to : str , subject : str , body : str ) -

None :

This runs in a background worker

email_client . send ( to , subject , body )

In your API handler

send_email . delay ( "user@example.com" , "Welcome!" , "Thanks for signing up" ) Fundamental Patterns Pattern 1: Return Job ID Immediately For operations exceeding a few seconds, return a job ID and process asynchronously. from uuid import uuid4 from dataclasses import dataclass from enum import Enum from datetime import datetime class JobStatus ( Enum ) : PENDING = "pending" RUNNING = "running" SUCCEEDED = "succeeded" FAILED = "failed" @dataclass class Job : id : str status : JobStatus created_at : datetime started_at : datetime | None = None completed_at : datetime | None = None result : dict | None = None error : str | None = None

API endpoint

async def start_export ( request : ExportRequest ) -

JobResponse : """Start export job and return job ID.""" job_id = str ( uuid4 ( ) )

Persist job record

await jobs_repo . create ( Job ( id = job_id , status = JobStatus . PENDING , created_at = datetime . utcnow ( ) , ) )

Enqueue task for background processing

await task_queue . enqueue ( "export_data" , job_id = job_id , params = request . model_dump ( ) , )

Return immediately with job ID

return JobResponse ( job_id = job_id , status = "pending" , poll_url = f"/jobs/ { job_id } " , ) Pattern 2: Celery Task Configuration Configure Celery tasks with proper retry and timeout settings. from celery import Celery app = Celery ( "tasks" , broker = "redis://localhost:6379" )

Global configuration

app . conf . update ( task_time_limit = 3600 ,

Hard limit: 1 hour

task_soft_time_limit

3000 ,

Soft limit: 50 minutes

task_acks_late

True ,

Acknowledge after completion

task_reject_on_worker_lost

True , worker_prefetch_multiplier = 1 ,

Don't prefetch too many tasks

) @app . task ( bind = True , max_retries = 3 , default_retry_delay = 60 , autoretry_for = ( ConnectionError , TimeoutError ) , ) def process_payment ( self , payment_id : str ) -

dict : """Process payment with automatic retry on transient errors.""" try : result = payment_gateway . charge ( payment_id ) return { "status" : "success" , "transaction_id" : result . id } except PaymentDeclinedError as e :

Don't retry permanent failures

return { "status" : "declined" , "reason" : str ( e ) } except TransientError as e :

Retry with exponential backoff

raise self . retry ( exc = e , countdown = 2 ** self . request . retries * 60 ) Pattern 3: Make Tasks Idempotent Workers may retry on crash or timeout. Design for safe re-execution. @app . task ( bind = True ) def process_order ( self , order_id : str ) -

None : """Process order idempotently.""" order = orders_repo . get ( order_id )

Already processed? Return early

if order . status == OrderStatus . COMPLETED : logger . info ( "Order already processed" , order_id = order_id ) return

Already in progress? Check if we should continue

if order . status == OrderStatus . PROCESSING :

Use idempotency key to avoid double-charging

pass

Process with idempotency key

result

payment_provider . charge ( amount = order . total , idempotency_key = f"order- { order_id } " ,

Critical!

)
orders_repo
.
update
(
order_id
,
status
=
OrderStatus
.
COMPLETED
)
Idempotency Strategies:
Check-before-write
Verify state before action
Idempotency keys
Use unique tokens with external services
Upsert patterns
:
INSERT ... ON CONFLICT UPDATE
Deduplication window
Track processed IDs for N hours Pattern 4: Job State Management Persist job state transitions for visibility and debugging. class JobRepository : """Repository for managing job state.""" async def create ( self , job : Job ) -

Job : """Create new job record.""" await self . _db . execute ( """INSERT INTO jobs (id, status, created_at) VALUES ($1, $2, $3)""" , job . id , job . status . value , job . created_at , ) return job async def update_status ( self , job_id : str , status : JobStatus , ** fields , ) -

None : """Update job status with timestamp.""" updates = { "status" : status . value , ** fields } if status == JobStatus . RUNNING : updates [ "started_at" ] = datetime . utcnow ( ) elif status in ( JobStatus . SUCCEEDED , JobStatus . FAILED ) : updates [ "completed_at" ] = datetime . utcnow ( ) await self . _db . execute ( "UPDATE jobs SET status = $1, ... WHERE id = $2" , updates , job_id , ) logger . info ( "Job status updated" , job_id = job_id , status = status . value , ) Advanced Patterns Pattern 5: Dead Letter Queue Handle permanently failed tasks for manual inspection. @app . task ( bind = True , max_retries = 3 ) def process_webhook ( self , webhook_id : str , payload : dict ) -

None : """Process webhook with DLQ for failures.""" try : result = send_webhook ( payload ) if not result . success : raise WebhookFailedError ( result . error ) except Exception as e : if self . request . retries = self . max_retries :

Move to dead letter queue for manual inspection

dead_letter_queue . send ( { "task" : "process_webhook" , "webhook_id" : webhook_id , "payload" : payload , "error" : str ( e ) , "attempts" : self . request . retries + 1 , "failed_at" : datetime . utcnow ( ) . isoformat ( ) , } ) logger . error ( "Webhook moved to DLQ after max retries" , webhook_id = webhook_id , error = str ( e ) , ) return

Exponential backoff retry

raise self . retry ( exc = e , countdown = 2 ** self . request . retries * 60 ) Pattern 6: Status Polling Endpoint Provide an endpoint for clients to check job status. from fastapi import FastAPI , HTTPException app = FastAPI ( ) @app . get ( "/jobs/{job_id}" ) async def get_job_status ( job_id : str ) -

JobStatusResponse : """Get current status of a background job.""" job = await jobs_repo . get ( job_id ) if job is None : raise HTTPException ( 404 , f"Job { job_id } not found" ) return JobStatusResponse ( job_id = job . id , status = job . status . value , created_at = job . created_at , started_at = job . started_at , completed_at = job . completed_at , result = job . result if job . status == JobStatus . SUCCEEDED else None , error = job . error if job . status == JobStatus . FAILED else None ,

Helpful for clients

is_terminal

job . status in ( JobStatus . SUCCEEDED , JobStatus . FAILED ) , ) Pattern 7: Task Chaining and Workflows Compose complex workflows from simple tasks. from celery import chain , group , chord

Simple chain: A → B → C

workflow

chain ( extract_data . s ( source_id ) , transform_data . s ( ) , load_data . s ( destination_id ) , )

Parallel execution: A, B, C all at once

parallel

group ( send_email . s ( user_email ) , send_sms . s ( user_phone ) , update_analytics . s ( event_data ) , )

Chord: Run tasks in parallel, then a callback

Process all items, then send completion notification

workflow

chord
(
[
process_item
.
s
(
item_id
)
for
item_id
in
item_ids
]
,
send_completion_notification
.
s
(
batch_id
)
,
)
workflow
.
apply_async
(
)
Pattern 8: Alternative Task Queues
Choose the right tool for your needs.
RQ (Redis Queue)
Simple, Redis-based
from
rq
import
Queue
from
redis
import
Redis
queue
=
Queue
(
connection
=
Redis
(
)
)
job
=
queue
.
enqueue
(
send_email
,
"user@example.com"
,
"Subject"
,
"Body"
)
Dramatiq
Modern Celery alternative import dramatiq from dramatiq . brokers . redis import RedisBroker dramatiq . set_broker ( RedisBroker ( ) ) @dramatiq . actor def send_email ( to : str , subject : str , body : str ) -

None : email_client . send ( to , subject , body ) Cloud-native options: AWS SQS + Lambda Google Cloud Tasks Azure Functions Best Practices Summary Return immediately - Don't block requests for long operations Persist job state - Enable status polling and debugging Make tasks idempotent - Safe to retry on any failure Use idempotency keys - For external service calls Set timeouts - Both soft and hard limits Implement DLQ - Capture permanently failed tasks Log transitions - Track job state changes Retry appropriately - Exponential backoff for transient errors Don't retry permanent failures - Validation errors, invalid credentials Monitor queue depth - Alert on backlog growth

返回排行榜