Python asyncio - Async/Await Concurrency Overview
Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features:
async/await syntax for readable concurrent code Event loop for managing concurrent operations Tasks for running multiple coroutines concurrently Primitives: locks, semaphores, events, queues HTTP client/server with aiohttp Database async support (asyncpg, aiomysql, motor) FastAPI async endpoints WebSocket support Background task management
Installation:
asyncio is built-in (Python 3.7+)
Async HTTP client
pip install aiohttp
Async HTTP requests (alternative)
pip install httpx
Async database drivers
pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB
FastAPI with async support
pip install fastapi uvicorn[standard]
Async testing
pip install pytest-asyncio
Basic Async/Await Patterns 1. Simple Async Function import asyncio
async def hello(): """Basic async function (coroutine).""" print("Hello") await asyncio.sleep(1) # Async sleep (non-blocking) print("World") return "Done"
Run async function
result = asyncio.run(hello()) print(result) # "Done"
Key Points:
async def defines a coroutine function await suspends execution until awaitable completes asyncio.run() is the entry point for async programs Coroutines must be awaited or scheduled 2. Multiple Concurrent Tasks import asyncio import time
async def task(name, duration): """Simulate async task.""" print(f"{name}: Starting (duration: {duration}s)") await asyncio.sleep(duration) print(f"{name}: Complete") return f"{name} result"
async def run_concurrent(): """Run multiple tasks concurrently.""" start = time.time()
# Sequential (slow) - 6 seconds total
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# Concurrent (fast) - 3 seconds total
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")
asyncio.run(run_concurrent())
Output: Total time: 3.00s (tasks ran concurrently)
- Task Creation and Management import asyncio
async def background_task(name): """Long-running background task.""" for i in range(5): print(f"{name}: iteration {i}") await asyncio.sleep(1) return f"{name} complete"
async def main(): # Create task (starts immediately) task1 = asyncio.create_task(background_task("Task-1")) task2 = asyncio.create_task(background_task("Task-2"))
# Do other work while tasks run
print("Main: doing other work")
await asyncio.sleep(2)
# Wait for tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
- Error Handling in Async Code import asyncio
async def risky_operation(fail=False): """Operation that might fail.""" await asyncio.sleep(1) if fail: raise ValueError("Operation failed") return "Success"
async def handle_errors(): # Individual try/except try: result = await risky_operation(fail=True) except ValueError as e: print(f"Caught error: {e}") result = "Fallback value"
# Gather with error handling
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # Return exceptions instead of raising
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(handle_errors())
Event Loop Fundamentals 1. Event Loop Lifecycle import asyncio
Modern approach (Python 3.7+)
async def main(): print("Main coroutine") await asyncio.sleep(1)
asyncio.run(main()) # Creates loop, runs main, closes loop
Manual loop management (advanced use cases)
async def manual_example(): loop = asyncio.get_event_loop()
# Schedule coroutine
task = loop.create_task(some_coroutine())
# Schedule callback
loop.call_later(5, callback_function)
# Run until complete
result = await task
return result
Get current event loop
async def get_current_loop(): loop = asyncio.get_running_loop() print(f"Loop: {loop}")
# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))
await asyncio.sleep(0) # Let callback execute
- Loop Scheduling and Callbacks import asyncio from datetime import datetime
def callback(name, loop): """Callback function (not async).""" print(f"{datetime.now()}: {name} callback executed")
# Stop loop after callback
# loop.stop()
async def schedule_callbacks(): loop = asyncio.get_running_loop()
# Schedule immediate callback
loop.call_soon(callback, "Immediate", loop)
# Schedule callback after delay
loop.call_later(2, callback, "Delayed 2s", loop)
# Schedule callback at specific time
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# Wait for callbacks to execute
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())
- Running Blocking Code import asyncio import time
def blocking_io(): """CPU-intensive or blocking I/O operation.""" print("Blocking operation started") time.sleep(2) # Blocks thread print("Blocking operation complete") return "Blocking result"
async def run_in_executor(): """Run blocking code in thread pool.""" loop = asyncio.get_running_loop()
# Run in default executor (thread pool)
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
print(f"Result: {result}")
Run blocking operations concurrently
async def concurrent_blocking(): loop = asyncio.get_running_loop()
# These run in thread pool, don't block event loop
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"All results: {results}")
asyncio.run(concurrent_blocking())
Asyncio Primitives 1. Locks for Mutual Exclusion import asyncio
Shared resource
counter = 0 lock = asyncio.Lock()
async def increment_with_lock(name): """Increment counter with lock protection.""" global counter
async with lock:
# Critical section - only one task at a time
print(f"{name}: acquired lock")
current = counter
await asyncio.sleep(0.1) # Simulate processing
counter = current + 1
print(f"{name}: released lock, counter={counter}")
async def increment_without_lock(name): """Increment without lock - race condition!""" global counter
current = counter
await asyncio.sleep(0.1) # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks(): global counter
# Without lock (race condition)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"Without lock: {counter}") # Often wrong (< 3)
# With lock (correct)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"With lock: {counter}") # Always 3
asyncio.run(test_locks())
- Semaphores for Resource Limiting import asyncio
Limit concurrent operations
semaphore = asyncio.Semaphore(2) # Max 2 concurrent
async def limited_operation(name): """Operation limited by semaphore.""" print(f"{name}: waiting for semaphore")
async with semaphore:
print(f"{name}: acquired semaphore")
await asyncio.sleep(2) # Simulate work
print(f"{name}: releasing semaphore")
async def test_semaphore(): # Create 5 tasks, but only 2 run concurrently await asyncio.gather( limited_operation("Task-1"), limited_operation("Task-2"), limited_operation("Task-3"), limited_operation("Task-4"), limited_operation("Task-5") )
asyncio.run(test_semaphore())
Only 2 tasks hold semaphore at any time
- Events for Signaling import asyncio
event = asyncio.Event()
async def waiter(name): """Wait for event to be set.""" print(f"{name}: waiting for event") await event.wait() # Block until event is set print(f"{name}: event received!")
async def setter(): """Set event after delay.""" await asyncio.sleep(2) print("Setter: setting event") event.set() # Wake up all waiters
async def test_event(): # Create waiters await asyncio.gather( waiter("Waiter-1"), waiter("Waiter-2"), waiter("Waiter-3"), setter() )
asyncio.run(test_event())
- Queues for Task Distribution import asyncio import random
async def producer(queue, name): """Produce items and add to queue.""" for i in range(5): item = f"{name}-item-{i}" await queue.put(item) print(f"{name}: produced {item}") await asyncio.sleep(random.uniform(0.1, 0.5))
# Signal completion
await queue.put(None)
async def consumer(queue, name): """Consume items from queue.""" while True: item = await queue.get() # Block until item available
if item is None: # Shutdown signal
queue.task_done()
break
print(f"{name}: consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue(): queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# Wait for all items to be processed
await queue.join()
print("All tasks complete")
asyncio.run(test_queue())
- Condition Variables import asyncio
condition = asyncio.Condition() items = []
async def consumer(name): """Wait for items to be available.""" async with condition: # Wait until items are available await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: consumed {item}")
async def producer(name): """Add items and notify consumers.""" async with condition: item = f"{name}-item" items.append(item) print(f"{name}: produced {item}")
# Notify one waiting consumer
condition.notify(n=1)
# Or notify all: condition.notify_all()
async def test_condition(): await asyncio.gather( consumer("Consumer-1"), consumer("Consumer-2"), producer("Producer-1"), producer("Producer-2") )
asyncio.run(test_condition())
Async HTTP with aiohttp 1. Basic HTTP Client import asyncio import aiohttp
async def fetch_url(session, url): """Fetch single URL.""" async with session.get(url) as response: status = response.status text = await response.text() return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls(): """Fetch multiple URLs concurrently.""" urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/json", ]
async with aiohttp.ClientSession() as session:
# Concurrent requests
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(fetch_multiple_urls())
- HTTP Client with Error Handling import asyncio import aiohttp from typing import Dict, Any
async def fetch_with_retry( session: aiohttp.ClientSession, url: str, max_retries: int = 3 ) -> Dict[str, Any]: """Fetch URL with retry logic.""" for attempt in range(max_retries): try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: response.raise_for_status() # Raise for 4xx/5xx data = await response.json() return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
return {"success": False, "error": "Timeout"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls(): """Make parallel API calls with error handling.""" urls = [ "https://httpbin.org/json", "https://httpbin.org/status/500", # Will fail "https://httpbin.org/delay/1", ]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # Don't stop on errors
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Exception - {result}")
elif result["success"]:
print(f"{url}: Success")
else:
print(f"{url}: Failed - {result['error']}")
asyncio.run(parallel_api_calls())
- HTTP Server with aiohttp from aiohttp import web import asyncio
async def handle_hello(request): """Simple GET handler.""" name = request.query.get("name", "World") return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request): """POST handler with JSON body.""" data = await request.json()
# Simulate async processing
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request): """Streaming response.""" response = web.StreamResponse() await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
Create application
app = web.Application() app.router.add_get("/hello", handle_hello) app.router.add_post("/process", handle_post) app.router.add_get("/stream", handle_stream)
Run server
if name == "main": web.run_app(app, host="0.0.0.0", port=8080)
- WebSocket Client import asyncio import aiohttp
async def websocket_client(): """Connect to WebSocket server.""" url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# Send messages
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
break
asyncio.run(websocket_client())
Async Database Operations 1. PostgreSQL with asyncpg import asyncio import asyncpg
async def database_operations(): """Async PostgreSQL operations.""" # Create connection pool pool = await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password", min_size=5, max_size=20 )
try:
# Acquire connection from pool
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())
- MongoDB with motor import asyncio from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations(): """Async MongoDB operations.""" # Create client client = AsyncIOMotorClient("mongodb://localhost:27017") db = client.mydb collection = db.users
try:
# Insert document
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"Inserted ID: {result.inserted_id}")
# Find documents
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"User: {document['name']}")
# Update document
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# Aggregation pipeline
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"Average age: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())
- Connection Pool Pattern import asyncio import asyncpg from typing import Optional
class DatabasePool: """Async database connection pool manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Create connection pool."""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""Execute query."""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""Fetch multiple rows."""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""Fetch single row."""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
Usage
async def use_pool(): db = DatabasePool("postgresql://user:pass@localhost/mydb") await db.connect()
try:
# Execute operations
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()
asyncio.run(use_pool())
FastAPI Async Patterns 1. Async Endpoints from fastapi import FastAPI, HTTPException import asyncio import httpx
app = FastAPI()
@app.get("/") async def root(): """Simple async endpoint.""" return {"message": "Hello World"}
@app.get("/delay/{seconds}") async def delayed_response(seconds: int): """Endpoint with async delay.""" await asyncio.sleep(seconds) return {"message": f"Waited {seconds} seconds"}
@app.get("/fetch") async def fetch_external(): """Fetch data from external API.""" async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/json") return response.json()
@app.get("/parallel") async def parallel_requests(): """Make parallel API calls.""" async with httpx.AsyncClient() as client: responses = await asyncio.gather( client.get("https://httpbin.org/delay/1"), client.get("https://httpbin.org/delay/2"), client.get("https://httpbin.org/json") )
return {
"results": [r.json() for r in responses]
}
- Background Tasks from fastapi import FastAPI, BackgroundTasks import asyncio
app = FastAPI()
async def send_email(email: str, message: str): """Simulate sending email.""" print(f"Sending email to {email}") await asyncio.sleep(5) # Simulate slow email service print(f"Email sent to {email}: {message}")
@app.post("/send-notification") async def send_notification( email: str, message: str, background_tasks: BackgroundTasks ): """Send notification in background.""" # Add task to background background_tasks.add_task(send_email, email, message)
# Return immediately
return {"status": "notification queued"}
Alternative: manual task creation
@app.post("/send-notification-manual") async def send_notification_manual(email: str, message: str): """Create background task manually.""" asyncio.create_task(send_email(email, message)) return {"status": "notification queued"}
- Async Dependencies from fastapi import FastAPI, Depends import asyncpg
app = FastAPI()
Database pool (global)
db_pool = None
async def get_db(): """Dependency: database connection.""" async with db_pool.acquire() as conn: yield conn
@app.on_event("startup") async def startup(): """Initialize database pool on startup.""" global db_pool db_pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/mydb" )
@app.on_event("shutdown") async def shutdown(): """Close database pool on shutdown.""" await db_pool.close()
@app.get("/users/{user_id}") async def get_user(user_id: int, conn=Depends(get_db)): """Get user with async database dependency.""" user = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id )
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
- WebSocket Endpoints from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import List import asyncio
app = FastAPI()
Active connections
active_connections: List[WebSocket] = []
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint.""" await websocket.accept() active_connections.append(websocket)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Broadcast to all connections
for connection in active_connections:
await connection.send_text(f"Broadcast: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("Client disconnected")
Background task to send periodic updates
async def broadcast_updates(): """Send periodic updates to all clients.""" while True: await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("Periodic update")
except:
active_connections.remove(connection)
@app.on_event("startup") async def startup(): """Start background update task.""" asyncio.create_task(broadcast_updates())
Common Patterns and Best Practices 1. Timeout Handling import asyncio
async def slow_operation(): """Slow operation.""" await asyncio.sleep(10) return "Result"
async def with_timeout(): """Run operation with timeout.""" try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
- Cancellation Handling import asyncio
async def cancellable_task(): """Task that can be cancelled.""" try: for i in range(10): print(f"Working: {i}") await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise # Re-raise to propagate cancellation
async def cancel_example(): """Example of task cancellation.""" task = asyncio.create_task(cancellable_task())
# Let it run for a bit
await asyncio.sleep(3)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: task was cancelled")
asyncio.run(cancel_example())
- Resource Cleanup with Context Managers import asyncio
class AsyncResource: """Async context manager for resource management."""
async def __aenter__(self):
"""Async setup."""
print("Acquiring resource")
await asyncio.sleep(1) # Simulate async setup
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async cleanup."""
print("Releasing resource")
await asyncio.sleep(1) # Simulate async cleanup
self.connection = None
async def use_resource(): """Use async resource.""" async with AsyncResource() as resource: print(f"Using resource: {resource.connection}") await asyncio.sleep(1) # Resource automatically cleaned up
asyncio.run(use_resource())
- Debouncing and Throttling import asyncio from datetime import datetime
class Debouncer: """Debounce async function calls."""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""Debounced function call."""
# Cancel previous task
if self.task:
self.task.cancel()
# Create new task
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str): """Simulated API call.""" print(f"{datetime.now()}: API call with query: {query}")
async def debounce_example(): """Example of debouncing.""" debouncer = Debouncer(delay=1.0)
# Rapid calls - only last one executes
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# Wait for debounced call
await asyncio.sleep(2)
asyncio.run(debounce_example())
Output: Only "query3" API call executes
- Rate Limiting import asyncio from datetime import datetime
class RateLimiter: """Limit rate of async operations."""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""Acquire rate limit slot."""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# Remove old calls outside period
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# Wait until oldest call expires
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""Release semaphore."""
self.semaphore.release()
async def rate_limited_operation(limiter, name): """Operation with rate limiting.""" async with limiter: print(f"{datetime.now()}: {name}") await asyncio.sleep(0.1)
async def rate_limit_example(): """Example of rate limiting.""" # Max 3 calls per 2 seconds limiter = RateLimiter(max_calls=3, period=2.0)
# Try to make 6 calls
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())
Debugging Async Code 1. Enable Debug Mode import asyncio import logging
Enable asyncio debug mode
asyncio.run(main(), debug=True)
Or set environment variable:
PYTHONASYNCIODEBUG=1 python script.py
Configure logging
logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(name)
async def debug_example(): logger.debug("Starting operation") await asyncio.sleep(1) logger.debug("Operation complete")
- Detect Blocking Code import asyncio import time
async def problematic_code(): """Code with blocking operation.""" print("Starting")
# BAD: Blocking sleep
time.sleep(2) # This blocks the event loop!
print("Complete")
Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
Warning: Executing took 2.001 seconds
- Track Pending Tasks import asyncio
async def track_tasks(): """Track all pending tasks.""" # Get all tasks tasks = asyncio.all_tasks()
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# Check if task is done
if task.done():
try:
result = task.result()
print(f" Result: {result}")
except Exception as e:
print(f" Exception: {e}")
Create some tasks
async def main(): task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task") task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
Testing Async Code 1. pytest-asyncio Setup
test_async.py
import pytest import asyncio
Mark test as async
@pytest.mark.asyncio async def test_async_function(): """Test async function.""" result = await some_async_function() assert result == "expected"
@pytest.mark.asyncio async def test_async_http(): """Test async HTTP client.""" async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: assert response.status == 200 data = await response.json() assert "slideshow" in data
Async fixture
@pytest.fixture async def async_client(): """Async test fixture.""" client = await create_async_client() yield client await client.close()
@pytest.mark.asyncio async def test_with_fixture(async_client): """Test using async fixture.""" result = await async_client.fetch_data() assert result is not None
- Mocking Async Functions import pytest from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio async def test_with_mock(): """Test with async mock.""" # Create async mock mock_func = AsyncMock(return_value="mocked result")
result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()
@pytest.mark.asyncio @patch("module.async_function", new_callable=AsyncMock) async def test_with_patch(mock_async): """Test with patched async function.""" mock_async.return_value = {"status": "success"}
result = await some_function_that_calls_async()
assert result["status"] == "success"
mock_async.assert_called_once()
Performance Optimization 1. Use asyncio.gather() for Parallelism import asyncio import time
async def slow_task(n): await asyncio.sleep(1) return n * 2
async def optimized(): """Parallel execution.""" start = time.time()
# Sequential (slow) - 5 seconds
# results = []
# for i in range(5):
# result = await slow_task(i)
# results.append(result)
# Parallel (fast) - 1 second
results = await asyncio.gather(*[slow_task(i) for i in range(5)])
elapsed = time.time() - start
print(f"Time: {elapsed:.2f}s, Results: {results}")
asyncio.run(optimized())
- Connection Pooling import asyncio import aiohttp
BAD: Create new session for each request
async def bad_pattern(): for i in range(10): async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: await response.json()
GOOD: Reuse session with connection pool
async def good_pattern(): async with aiohttp.ClientSession() as session: tasks = [ session.get("https://httpbin.org/json") for i in range(10) ] responses = await asyncio.gather(*tasks) for response in responses: await response.json()
- Avoid Blocking Operations import asyncio
BAD: Blocking I/O in async function
async def bad_file_read(): with open("large_file.txt") as f: # Blocks event loop! data = f.read() return data
GOOD: Use async file I/O or run in executor
async def good_file_read(): loop = asyncio.get_running_loop()
# Run blocking operation in thread pool
data = await loop.run_in_executor(
None,
lambda: open("large_file.txt").read()
)
return data
BETTER: Use aiofiles for async file I/O
import aiofiles
async def better_file_read(): async with aiofiles.open("large_file.txt") as f: data = await f.read() return data
Common Pitfalls ❌ Anti-Pattern 1: Not Awaiting Coroutines
WRONG
async def bad():
result = async_function() # Returns coroutine, doesn't execute!
print(result) # Prints:
CORRECT
async def good(): result = await async_function() # Actually executes print(result)
❌ Anti-Pattern 2: Blocking the Event Loop
WRONG
import time
async def bad(): time.sleep(5) # Blocks entire event loop!
CORRECT
async def good(): await asyncio.sleep(5) # Non-blocking
❌ Anti-Pattern 3: Not Handling Cancellation
WRONG
async def bad(): await asyncio.sleep(10) # No cleanup if cancelled
CORRECT
async def good(): try: await asyncio.sleep(10) except asyncio.CancelledError: # Cleanup resources await cleanup() raise # Re-raise to propagate
❌ Anti-Pattern 4: Creating Event Loop Incorrectly
WRONG (Python 3.7+)
loop = asyncio.get_event_loop() loop.run_until_complete(main())
CORRECT (Python 3.7+)
asyncio.run(main())
❌ Anti-Pattern 5: Not Closing Resources
WRONG
async def bad(): session = aiohttp.ClientSession() response = await session.get(url) # Session never closed - resource leak!
CORRECT
async def good(): async with aiohttp.ClientSession() as session: response = await session.get(url) # Session automatically closed
Best Practices Use asyncio.run() for entry point (Python 3.7+) Always await coroutines - don't forget await Use async context managers for resource cleanup Connection pooling for HTTP and database clients Handle CancelledError for graceful shutdown Use asyncio.gather() for parallel operations Avoid blocking operations in async functions Use timeouts to prevent hanging operations Debug mode during development to catch issues Test async code with pytest-asyncio Quick Reference Common Commands
Run async script
python script.py
Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py
Run tests
pytest -v --asyncio-mode=auto
Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio
Essential Imports import asyncio import aiohttp import asyncpg from typing import List, Dict, Any
Resources Official Documentation: https://docs.python.org/3/library/asyncio.html aiohttp: https://docs.aiohttp.org/ asyncpg: https://magicstack.github.io/asyncpg/ FastAPI Async: https://fastapi.tiangolo.com/async/ pytest-asyncio: https://pytest-asyncio.readthedocs.io/ Related Skills
When using asyncio, consider these complementary skills:
fastapi-local-dev: FastAPI async server patterns and production deployment pytest: Testing async code with pytest-asyncio and fixtures systematic-debugging: Debugging async race conditions and deadlocks Quick FastAPI Async Patterns (Inlined for Standalone Use)
FastAPI async endpoint pattern
from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker
app = FastAPI()
Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db(): async with AsyncSessionLocal() as session: yield session
@app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): # Async database query result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") return user
Background tasks with asyncio
@app.post("/send-email") async def send_email_endpoint(email: EmailSchema): # Non-blocking background task asyncio.create_task(send_email_async(email)) return {"status": "email queued"}
Quick pytest-asyncio Patterns (Inlined for Standalone Use)
Testing async functions with pytest
import pytest import pytest_asyncio from httpx import AsyncClient
Async test fixture
@pytest_asyncio.fixture async def async_client(): async with AsyncClient(app=app, base_url="http://test") as client: yield client
Async test function
@pytest.mark.asyncio async def test_get_user(async_client): response = await async_client.get("/users/1") assert response.status_code == 200 assert response.json()["id"] == 1
Testing concurrent operations
@pytest.mark.asyncio async def test_concurrent_requests(): async with AsyncClient(app=app, base_url="http://test") as client: # Run 10 requests concurrently responses = await asyncio.gather( *[client.get(f"/users/{i}") for i in range(1, 11)] ) assert all(r.status_code == 200 for r in responses)
Mock async dependencies
@pytest_asyncio.fixture async def mock_db(): # Setup mock database db = AsyncMock() yield db # Cleanup
Quick Async Debugging Reference (Inlined for Standalone Use)
Common Async Pitfalls:
Blocking the Event Loop
❌ WRONG: Blocking call in async function
async def bad_function(): time.sleep(5) # Blocks entire event loop! return "done"
✅ CORRECT: Use asyncio.sleep
async def good_function(): await asyncio.sleep(5) # Releases event loop return "done"
Debugging Race Conditions
Add logging to track execution order
import logging logging.basicConfig(level=logging.DEBUG)
async def debug_task(name): logging.debug(f"{name}: Starting") await asyncio.sleep(1) logging.debug(f"{name}: Finished") return name
Run with detailed tracing
asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
Deadlock Detection
Use timeout to detect deadlocks
try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("Deadlock detected: operation timed out") # Investigate what's blocking
Inspecting Running Tasks
Check all pending tasks
tasks = asyncio.all_tasks() for task in tasks: print(f"Task: {task.get_name()}, Done: {task.done()}") if not task.done(): print(f" Current coro: {task.get_coro()}")
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]
Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.