Redis State Management
A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.
When to Use This Skill
Use this skill when:
Implementing high-performance caching layers for web applications Managing user sessions in distributed environments Building real-time messaging and event distribution systems Coordinating distributed processes with locks and synchronization Storing and querying structured data with Redis data structures Optimizing application performance with Redis Scaling applications horizontally with shared state Implementing rate limiting, counters, and analytics Building microservices with Redis as a communication layer Managing temporary data with automatic expiration (TTL) Implementing leaderboards, queues, and real-time features Core Concepts Redis Fundamentals
Redis (Remote Dictionary Server) is an in-memory data structure store used as:
Database: Persistent key-value storage Cache: High-speed data layer Message Broker: Pub/sub and stream messaging Session Store: Distributed session management
Key Characteristics:
In-memory storage (microsecond latency) Optional persistence (RDB snapshots, AOF logs) Rich data structures beyond key-value Atomic operations on complex data types Built-in replication and clustering Pub/sub messaging support Lua scripting for complex operations Pipelining for batch operations Redis Data Structures
Redis provides multiple data types for different use cases:
Strings: Simple key-value pairs, binary safe
Use for: Cache values, counters, flags, JSON objects Max size: 512 MB Commands: SET, GET, INCR, APPEND
Hashes: Field-value maps (objects)
Use for: User profiles, configuration objects, small entities Efficient for storing objects with multiple fields Commands: HSET, HGET, HMGET, HINCRBY
Lists: Ordered collections (linked lists)
Use for: Queues, activity feeds, recent items Operations at head/tail are O(1) Commands: LPUSH, RPUSH, LPOP, RPOP, LRANGE
Sets: Unordered unique collections
Use for: Tags, unique visitors, relationships Set operations: union, intersection, difference Commands: SADD, SMEMBERS, SISMEMBER, SINTER
Sorted Sets: Ordered sets with scores
Use for: Leaderboards, time-series, priority queues Range queries by score or rank Commands: ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
Streams: Append-only logs with consumer groups
Use for: Event sourcing, activity logs, message queues Built-in consumer group support Commands: XADD, XREAD, XREADGROUP Connection Management
Connection Pools: Redis connections are expensive to create. Always use connection pools:
import redis
Connection pool (recommended)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10) r = redis.Redis(connection_pool=pool)
Direct connection (avoid in production)
r = redis.Redis(host='localhost', port=6379, db=0)
Best Practices:
Use connection pools for all applications Set appropriate max_connections based on workload Enable decode_responses=True for string data Configure socket_timeout and socket_keepalive Handle connection errors with retries Data Persistence
Redis offers two persistence mechanisms:
RDB (Redis Database): Point-in-time snapshots
Compact binary format Fast restart times Lower disk I/O Potential data loss between snapshots
AOF (Append-Only File): Log of write operations
Better durability (fsync policies) Larger files, slower restarts Can be automatically rewritten/compacted Minimal data loss potential
Hybrid Approach: RDB + AOF for best of both worlds
RESP 3 Protocol
Redis Serialization Protocol version 3 offers:
Client-side caching support Better data type support Push notifications Performance improvements import redis from redis.cache import CacheConfig
Enable RESP3 with client-side caching
r = redis.Redis(host='localhost', port=6379, protocol=3, cache_config=CacheConfig())
Caching Strategies Cache-Aside (Lazy Loading)
Pattern: Application checks cache first, loads from database on miss
import redis import json from typing import Optional, Dict, Any
r = redis.Redis(decode_responses=True)
def get_user(user_id: int) -> Optional[Dict[str, Any]]: """Cache-aside pattern for user data.""" cache_key = f"user:{user_id}"
# Try cache first
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Cache miss - load from database
user_data = database.get_user(user_id) # Your DB query
if user_data:
# Store in cache with 1 hour TTL
r.setex(cache_key, 3600, json.dumps(user_data))
return user_data
Advantages:
Only requested data is cached (efficient memory usage) Cache failures don't break the application Simple to implement
Disadvantages:
Cache miss penalty (latency spike) Thundering herd on popular items Stale data until cache expiration Write-Through Cache
Pattern: Write to cache and database simultaneously
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool: """Write-through pattern for user updates.""" cache_key = f"user:{user_id}"
# Write to database first
success = database.update_user(user_id, user_data)
if success:
# Update cache immediately
r.setex(cache_key, 3600, json.dumps(user_data))
return success
Advantages:
Cache always consistent with database No read penalty for recently written data
Disadvantages:
Write latency increases Unused data may be cached Extra cache write overhead Write-Behind (Write-Back) Cache
Pattern: Write to cache immediately, sync to database asynchronously
import redis import json from queue import Queue from threading import Thread
r = redis.Redis(decode_responses=True) write_queue = Queue()
def async_writer(): """Background worker to sync cache to database.""" while True: user_id, user_data = write_queue.get() try: database.update_user(user_id, user_data) except Exception as e: # Log error, potentially retry print(f"Failed to write user {user_id}: {e}") finally: write_queue.task_done()
Start background writer
Thread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]): """Write-behind pattern for fast writes.""" cache_key = f"user:{user_id}"
# Write to cache immediately (fast)
r.setex(cache_key, 3600, json.dumps(user_data))
# Queue database write (async)
write_queue.put((user_id, user_data))
Advantages:
Minimal write latency Can batch database writes Handles write spikes
Disadvantages:
Risk of data loss if cache fails Complex error handling Consistency challenges Cache Invalidation Strategies
Time-based Expiration (TTL):
Set key with expiration
r.setex("session:abc123", 1800, session_data) # 30 minutes
Or set TTL on existing key
r.expire("user:profile:123", 3600) # 1 hour
Check remaining TTL
ttl = r.ttl("user:profile:123")
Event-based Invalidation:
def update_product(product_id: int, product_data: dict): """Invalidate cache on update.""" # Update database database.update_product(product_id, product_data)
# Invalidate related caches
r.delete(f"product:{product_id}")
r.delete(f"product_list:category:{product_data['category']}")
r.delete("products:featured")
Pattern-based Invalidation:
Delete all keys matching pattern
def invalidate_user_cache(user_id: int): """Invalidate all cache entries for a user.""" pattern = f"user:{user_id}:*"
# Find and delete matching keys
for key in r.scan_iter(match=pattern, count=100):
r.delete(key)
Cache Stampede Prevention
Problem: Multiple requests simultaneously miss cache and query database
Solution 1: Probabilistic Early Expiration
import time import random
def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0): """Prevent stampede with probabilistic early recomputation.""" value = r.get(key)
if value is None:
# Cache miss - compute and cache
value = compute_value(key)
r.setex(key, ttl, value)
return value
# Check if we should recompute early
current_time = time.time()
delta = current_time - float(r.get(f"{key}:timestamp") or 0)
expiry = ttl * random.random() * beta
if delta > expiry:
# Recompute in background
value = compute_value(key)
r.setex(key, ttl, value)
r.set(f"{key}:timestamp", current_time)
return value
Solution 2: Locking
from contextlib import contextmanager
@contextmanager def cache_lock(key: str, timeout: int = 10): """Acquire lock for cache computation.""" lock_key = f"{key}:lock" identifier = str(time.time())
# Try to acquire lock
if r.set(lock_key, identifier, nx=True, ex=timeout):
try:
yield True
finally:
# Release lock
if r.get(lock_key) == identifier:
r.delete(lock_key)
else:
yield False
def get_with_lock(key: str): """Use lock to prevent stampede.""" value = r.get(key)
if value is None:
with cache_lock(key) as acquired:
if acquired:
# We got the lock - compute value
value = compute_value(key)
r.setex(key, 3600, value)
else:
# Someone else is computing - wait and retry
time.sleep(0.1)
value = r.get(key) or compute_value(key)
return value
Session Management Distributed Session Storage
Basic Session Management:
import redis import json import uuid from datetime import datetime, timedelta
r = redis.Redis(decode_responses=True)
class SessionManager: def init(self, ttl: int = 1800): """Session manager with Redis backend.
Args:
ttl: Session timeout in seconds (default 30 minutes)
"""
self.ttl = ttl
def create_session(self, user_id: int, data: dict = None) -> str:
"""Create new session and return session ID."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"data": data or {}
}
r.setex(session_key, self.ttl, json.dumps(session_data))
return session_id
def get_session(self, session_id: str) -> dict:
"""Retrieve session data and refresh TTL."""
session_key = f"session:{session_id}"
session_data = r.get(session_key)
if session_data:
# Refresh TTL on access (sliding expiration)
r.expire(session_key, self.ttl)
return json.loads(session_data)
return None
def update_session(self, session_id: str, data: dict) -> bool:
"""Update session data."""
session_key = f"session:{session_id}"
session_data = self.get_session(session_id)
if session_data:
session_data["data"].update(data)
r.setex(session_key, self.ttl, json.dumps(session_data))
return True
return False
def delete_session(self, session_id: str) -> bool:
"""Delete session (logout)."""
session_key = f"session:{session_id}"
return r.delete(session_key) > 0
Session with Hash Storage
More efficient for session objects:
class HashSessionManager: """Session manager using Redis hashes for better performance."""
def __init__(self, ttl: int = 1800):
self.ttl = ttl
def create_session(self, user_id: int, **kwargs) -> str:
"""Create session using hash."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
# Store as hash for efficient field access
session_fields = {
"user_id": str(user_id),
"created_at": datetime.utcnow().isoformat(),
**{k: str(v) for k, v in kwargs.items()}
}
r.hset(session_key, mapping=session_fields)
r.expire(session_key, self.ttl)
return session_id
def get_field(self, session_id: str, field: str) -> str:
"""Get single session field efficiently."""
session_key = f"session:{session_id}"
value = r.hget(session_key, field)
if value:
r.expire(session_key, self.ttl) # Refresh TTL
return value
def set_field(self, session_id: str, field: str, value: str) -> bool:
"""Update single session field."""
session_key = f"session:{session_id}"
if r.exists(session_key):
r.hset(session_key, field, value)
r.expire(session_key, self.ttl)
return True
return False
def get_all(self, session_id: str) -> dict:
"""Get all session fields."""
session_key = f"session:{session_id}"
data = r.hgetall(session_key)
if data:
r.expire(session_key, self.ttl)
return data
User Activity Tracking def track_user_activity(user_id: int, action: str): """Track user activity with automatic expiration.""" activity_key = f"user:{user_id}:activity" timestamp = datetime.utcnow().isoformat()
# Add activity to list
r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))
# Keep only last 100 activities
r.ltrim(activity_key, 0, 99)
# Set expiration (30 days)
r.expire(activity_key, 2592000)
def get_recent_activity(user_id: int, limit: int = 10) -> list: """Get recent user activities.""" activity_key = f"user:{user_id}:activity" activities = r.lrange(activity_key, 0, limit - 1)
return [json.loads(a) for a in activities]
Pub/Sub Patterns Basic Publisher/Subscriber
Publisher:
import redis
r = redis.Redis(decode_responses=True)
def publish_event(channel: str, message: dict): """Publish event to channel.""" import json r.publish(channel, json.dumps(message))
Example usage
publish_event("notifications", { "type": "user_signup", "user_id": 12345, "timestamp": datetime.utcnow().isoformat() })
Subscriber:
import redis import json
def handle_message(message): """Process received message.""" data = json.loads(message['data']) print(f"Received: {data}")
Initialize pubsub
r = redis.Redis(decode_responses=True) p = r.pubsub()
Subscribe to channels
p.subscribe('notifications', 'alerts')
Listen for messages
for message in p.listen(): if message['type'] == 'message': handle_message(message)
Pattern-Based Subscriptions
Subscribe to multiple channels with patterns
p = r.pubsub() p.psubscribe('user:', 'notification:')
Get messages from pattern subscriptions
for message in p.listen(): if message['type'] == 'pmessage': channel = message['channel'] pattern = message['pattern'] data = message['data'] print(f"Pattern {pattern} matched {channel}: {data}")
Async Pub/Sub with Background Thread import redis import time
r = redis.Redis(decode_responses=True) p = r.pubsub()
def message_handler(message): """Handle messages in background thread.""" print(f"Handler received: {message['data']}")
Subscribe with handler
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})
Run in background thread
thread = p.run_in_thread(sleep_time=0.001)
Publish some messages
r.publish('notifications', 'Hello!') r.publish('alerts', 'Warning!')
time.sleep(1)
Stop background thread
thread.stop()
Async Pub/Sub with asyncio import asyncio import redis.asyncio as redis
async def reader(channel: redis.client.PubSub): """Async message reader.""" while True: message = await channel.get_message(ignore_subscribe_messages=True, timeout=None) if message is not None: print(f"Received: {message}")
# Stop on specific message
if message["data"].decode() == "STOP":
break
async def pubsub_example(): """Async pub/sub example.""" r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
# Subscribe to channels
await pubsub.subscribe("channel:1", "channel:2")
# Create reader task
reader_task = asyncio.create_task(reader(pubsub))
# Publish messages
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", "STOP")
# Wait for reader to finish
await reader_task
await r.close()
Run async example
asyncio.run(pubsub_example())
Sharded Pub/Sub (Redis 7.0+) from redis.cluster import RedisCluster, ClusterNode
Connect to cluster
rc = RedisCluster(startup_nodes=[ ClusterNode('localhost', 6379), ClusterNode('localhost', 6380) ])
Create sharded pubsub
p = rc.pubsub() p.ssubscribe('foo')
Get message from specific node
message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
Distributed Locks Simple Lock Implementation import redis import time import uuid
class RedisLock: """Simple distributed lock using Redis."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock."""
end_time = time.time() + (timeout or self.timeout)
while True:
# Try to set lock with NX (only if not exists) and EX (expiration)
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
return True
if not blocking:
return False
if timeout and time.time() > end_time:
return False
# Wait before retry
time.sleep(0.01)
def release(self) -> bool:
"""Release lock only if we own it."""
# Use Lua script for atomic check-and-delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
def __enter__(self):
"""Context manager support."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager cleanup."""
self.release()
Usage example
r = redis.Redis() lock = RedisLock(r, "resource:123", timeout=5)
with lock: # Critical section - only one process at a time print("Processing resource 123") process_resource()
Advanced Lock with Auto-Renewal import threading
class RenewableLock: """Distributed lock with automatic renewal."""
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.renewal_thread = None
self.stop_renewal = threading.Event()
def _renew_lock(self):
"""Background task to renew lock."""
while not self.stop_renewal.is_set():
time.sleep(self.timeout / 3) # Renew at 1/3 of timeout
# Renew only if we still own the lock
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key,
self.identifier, self.timeout)
if result == 0:
# We lost the lock
self.stop_renewal.set()
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
"""Acquire lock and start auto-renewal."""
if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
# Start renewal thread
self.stop_renewal.clear()
self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
self.renewal_thread.start()
return True
return False
def release(self) -> bool:
"""Release lock and stop renewal."""
self.stop_renewal.set()
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.key, self.identifier)
return result == 1
Redlock Algorithm (Multiple Redis Instances) class Redlock: """Redlock algorithm for distributed locking across multiple Redis instances."""
def __init__(self, redis_instances: list):
"""
Args:
redis_instances: List of Redis client connections
"""
self.instances = redis_instances
self.quorum = len(redis_instances) // 2 + 1
def acquire(self, resource: str, ttl: int = 10000) -> tuple:
"""
Acquire lock across multiple Redis instances.
Returns:
(success: bool, lock_identifier: str)
"""
identifier = str(uuid.uuid4())
start_time = int(time.time() * 1000)
# Try to acquire lock on all instances
acquired = 0
for instance in self.instances:
try:
if instance.set(f"lock:{resource}", identifier,
nx=True, px=ttl):
acquired += 1
except Exception:
pass
# Calculate elapsed time
elapsed = int(time.time() * 1000) - start_time
validity_time = ttl - elapsed - 100 # drift compensation
# Check if we got quorum
if acquired >= self.quorum and validity_time > 0:
return True, identifier
else:
# Release locks if we didn't get quorum
self._release_all(resource, identifier)
return False, None
def _release_all(self, resource: str, identifier: str):
"""Release lock on all instances."""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
for instance in self.instances:
try:
instance.eval(lua_script, 1, f"lock:{resource}", identifier)
except Exception:
pass
Data Structures and Operations Working with Hashes
User profile storage
def save_user_profile(user_id: int, profile: dict): """Save user profile as hash.""" key = f"user:profile:{user_id}" r.hset(key, mapping=profile) r.expire(key, 86400) # 24 hour TTL
def get_user_profile(user_id: int) -> dict: """Get complete user profile.""" key = f"user:profile:{user_id}" return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str): """Update single profile field.""" key = f"user:profile:{user_id}" r.hset(key, field, value)
Example usage
save_user_profile(123, { "username": "alice", "email": "alice@example.com", "age": "30" })
Atomic increment
r.hincrby("user:profile:123", "login_count", 1)
Working with Lists
Job queue implementation
def enqueue_job(queue_name: str, job_data: dict): """Add job to queue.""" key = f"queue:{queue_name}" r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict: """Get job from queue (blocking).""" key = f"queue:{queue_name}"
if timeout > 0:
# Blocking pop with timeout
result = r.blpop(key, timeout=timeout)
if result:
_, job_data = result
return json.loads(job_data)
else:
# Non-blocking pop
job_data = r.lpop(key)
if job_data:
return json.loads(job_data)
return None
Activity feed
def add_to_feed(user_id: int, activity: dict): """Add activity to user feed.""" key = f"feed:{user_id}" r.lpush(key, json.dumps(activity)) r.ltrim(key, 0, 99) # Keep only latest 100 items r.expire(key, 604800) # 7 days
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list: """Get user feed with pagination.""" key = f"feed:{user_id}" items = r.lrange(key, start, end) return [json.loads(item) for item in items]
Working with Sets
Tags and relationships
def add_tags(item_id: int, tags: list): """Add tags to item.""" key = f"item:{item_id}:tags" r.sadd(key, *tags)
def get_tags(item_id: int) -> set: """Get all tags for item.""" key = f"item:{item_id}:tags" return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set: """Find items having all specified tags.""" keys = [f"item::tags" for _ in tags] # This is simplified - in practice, you'd need to track item IDs differently return r.sinter(keys)
Online users tracking
def user_online(user_id: int): """Mark user as online.""" r.sadd("users:online", user_id) r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int): """Mark user as offline.""" r.srem("users:online", user_id)
def get_online_users() -> set: """Get all online users.""" return r.smembers("users:online")
def get_online_count() -> int: """Get count of online users.""" return r.scard("users:online")
Working with Sorted Sets
Leaderboard implementation
def update_score(leaderboard: str, user_id: int, score: float): """Update user score in leaderboard.""" key = f"leaderboard:{leaderboard}" r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list: """Get top players (descending order).""" key = f"leaderboard:{leaderboard}" # ZREVRANGE for descending order (highest scores first) return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int: """Get user's rank (0-indexed).""" key = f"leaderboard:{leaderboard}" # ZREVRANK for descending rank rank = r.zrevrank(key, user_id) return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float: """Get user's score.""" key = f"leaderboard:{leaderboard}" score = r.zscore(key, user_id) return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list: """Get users within score range.""" key = f"leaderboard:{leaderboard}" return r.zrangebyscore(key, min_score, max_score, withscores=True)
Time-based sorted set (activity stream)
def add_activity(user_id: int, activity: str): """Add timestamped activity.""" key = f"user:{user_id}:activities" timestamp = time.time() r.zadd(key, {activity: timestamp})
# Keep only last 24 hours
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)
def get_recent_activities(user_id: int, count: int = 10) -> list: """Get recent activities.""" key = f"user:{user_id}:activities" # Get most recent (highest timestamps) return r.zrevrange(key, 0, count - 1, withscores=True)
Working with Streams
Event stream
def add_event(stream_key: str, event_data: dict) -> str: """Add event to stream.""" # Returns auto-generated ID (timestamp-sequence) event_id = r.xadd(stream_key, event_data) return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list: """Read events from stream.""" events = r.xread({stream_key: start_id}, count=count)
# events format: [(stream_name, [(id, data), (id, data), ...])]
if events:
_, event_list = events[0]
return event_list
return []
Consumer groups
def create_consumer_group(stream_key: str, group_name: str): """Create consumer group for stream.""" try: r.xgroup_create(name=stream_key, groupname=group_name, id='0') except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise
def read_from_group(stream_key: str, group_name: str, consumer_name: str, count: int = 10) -> list: """Read events as consumer in group.""" # Read new messages with '>' events = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: '>'}, count=count, block=5000 # 5 second timeout )
if events:
_, event_list = events[0]
return event_list
return []
def acknowledge_event(stream_key: str, group_name: str, event_id: str): """Acknowledge processed event.""" r.xack(stream_key, group_name, event_id)
Example: Processing events with consumer group
def process_events(stream_key: str, group_name: str, consumer_name: str): """Process events from stream.""" create_consumer_group(stream_key, group_name)
while True:
events = read_from_group(stream_key, group_name, consumer_name, count=10)
for event_id, event_data in events:
try:
# Process event
process_event(event_data)
# Acknowledge successful processing
acknowledge_event(stream_key, group_name, event_id)
except Exception as e:
print(f"Failed to process event {event_id}: {e}")
# Event remains unacknowledged for retry
Performance Optimization Pipelining for Batch Operations
Without pipelining (slow - multiple round trips)
for i in range(1000): r.set(f"key:{i}", f"value:{i}")
With pipelining (fast - single round trip)
pipe = r.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") results = pipe.execute()
Pipelining with reads
pipe = r.pipeline() for i in range(100): pipe.get(f"key:{i}") values = pipe.execute()
Builder pattern with pipeline
class DataLoader: def init(self): self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
"""Add user data."""
self.pipeline.hset(f"user:{user_id}", mapping=user_data)
return self
def add_to_set(self, set_name: str, value: str):
"""Add to set."""
self.pipeline.sadd(set_name, value)
return self
def execute(self):
"""Execute all pipelined commands."""
return self.pipeline.execute()
Usage
loader = DataLoader() results = (loader .add_user(1, {"name": "Alice", "email": "alice@example.com"}) .add_user(2, {"name": "Bob", "email": "bob@example.com"}) .add_to_set("active_users", "1") .add_to_set("active_users", "2") .execute())
Transactions with WATCH
Optimistic locking with WATCH
def transfer_credits(from_user: int, to_user: int, amount: int) -> bool: """Transfer credits between users with optimistic locking."""
with r.pipeline() as pipe:
while True:
try:
# Watch the keys we're going to modify
pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")
# Get current values
from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)
# Check if transfer is possible
if from_credits < amount:
pipe.unwatch()
return False
# Start transaction
pipe.multi()
pipe.set(f"user:{from_user}:credits", from_credits - amount)
pipe.set(f"user:{to_user}:credits", to_credits + amount)
# Execute transaction
pipe.execute()
return True
except redis.WatchError:
# Key was modified by another client - retry
continue
Lua scripts for atomic operations
increment_script = """ local current = redis.call('GET', KEYS[1]) if not current then current = 0 end local new_val = tonumber(current) + tonumber(ARGV[1]) redis.call('SET', KEYS[1], new_val) return new_val """
Register and use Lua script
increment = r.register_script(increment_script) new_value = increment(keys=['counter:views'], args=[1])
Lua Scripts for Complex Operations
Rate limiting with Lua
rate_limit_script = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key)
if current == 1 then redis.call('EXPIRE', key, window) end
if current > limit then return 0 else return 1 end """
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool: """Check if user is within rate limit.""" key = f"rate_limit:{user_id}" result = rate_limiter(keys=[key], args=[limit, window]) return result == 1
Get-or-set pattern with Lua
get_or_set_script = """ local value = redis.call('GET', KEYS[1]) if value then return value else redis.call('SET', KEYS[1], ARGV[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return ARGV[1] end """
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600): """Get value from cache or compute and cache it.""" value = get_or_set(keys=[key], args=["COMPUTING", ttl])
if value == "__COMPUTING__":
# We set the placeholder - compute the real value
computed = compute_fn()
r.setex(key, ttl, computed)
return computed
return value
Production Patterns High Availability with Sentinel from redis.sentinel import Sentinel
Connect to Sentinel
sentinel = Sentinel([ ('sentinel1', 26379), ('sentinel2', 26379), ('sentinel3', 26379) ], socket_timeout=0.5)
Get master connection
master = sentinel.master_for('mymaster', socket_timeout=0.5)
Get replica connection (for read-only operations)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)
Use master for writes
master.set('key', 'value')
Use replica for reads (optional, for load distribution)
value = replica.get('key')
Async Redis with asyncio import asyncio import redis.asyncio as redis
async def async_redis_operations(): """Async Redis operations example.""" # Create async connection r = await redis.from_url("redis://localhost")
try:
# Async operations
await r.set("async_key", "async_value")
value = await r.get("async_key")
print(f"Value: {value}")
# Async pipeline
async with r.pipeline(transaction=True) as pipe:
await pipe.set("key1", "value1")
await pipe.set("key2", "value2")
await pipe.get("key1")
results = await pipe.execute()
print(f"Pipeline results: {results}")
finally:
await r.close()
Run async operations
asyncio.run(async_redis_operations())
Connection Pool Configuration
Production-ready connection pool
pool = redis.ConnectionPool( host='localhost', port=6379, db=0, max_connections=50, # Max pool size socket_timeout=5, # Socket timeout socket_connect_timeout=5, # Connection timeout socket_keepalive=True, # Keep TCP connection alive socket_keepalive_options={ socket.TCP_KEEPIDLE: 60, socket.TCP_KEEPINTVL: 10, socket.TCP_KEEPCNT: 3 }, retry_on_timeout=True, # Retry on timeout health_check_interval=30, # Health check every 30s decode_responses=True # Auto-decode bytes to strings )
r = redis.Redis(connection_pool=pool)
Error Handling and Resilience import redis from redis.exceptions import ConnectionError, TimeoutError import time
class ResilientRedisClient: """Redis client with retry logic and circuit breaker."""
def __init__(self, max_retries: int = 3, backoff: float = 0.1):
self.redis = redis.Redis(
host='localhost',
port=6379,
socket_timeout=5,
retry_on_timeout=True
)
self.max_retries = max_retries
self.backoff = backoff
def get_with_retry(self, key: str, default=None):
"""Get value with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
return self.redis.get(key) or default
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
# Log error and return default
print(f"Redis error after {self.max_retries} attempts: {e}")
return default
# Exponential backoff
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
"""Set value with retry logic."""
for attempt in range(self.max_retries):
try:
if ttl:
return self.redis.setex(key, ttl, value)
else:
return self.redis.set(key, value)
except (ConnectionError, TimeoutError) as e:
if attempt == self.max_retries - 1:
print(f"Redis error after {self.max_retries} attempts: {e}")
return False
wait_time = self.backoff * (2 ** attempt)
time.sleep(wait_time)
Monitoring and Metrics def get_redis_info(section: str = None) -> dict: """Get Redis server information.""" return r.info(section=section)
def monitor_memory_usage(): """Monitor Redis memory usage.""" info = r.info('memory')
used_memory = info['used_memory_human']
peak_memory = info['used_memory_peak_human']
memory_fragmentation = info['mem_fragmentation_ratio']
print(f"Used Memory: {used_memory}")
print(f"Peak Memory: {peak_memory}")
print(f"Fragmentation Ratio: {memory_fragmentation}")
return info
def monitor_stats(): """Monitor Redis statistics.""" info = r.info('stats')
total_connections = info['total_connections_received']
total_commands = info['total_commands_processed']
ops_per_sec = info['instantaneous_ops_per_sec']
print(f"Total Connections: {total_connections}")
print(f"Total Commands: {total_commands}")
print(f"Ops/sec: {ops_per_sec}")
return info
def get_slow_log(count: int = 10): """Get slow query log.""" slow_log = r.slowlog_get(count)
for entry in slow_log:
print(f"Command: {entry['command']}")
print(f"Duration: {entry['duration']} microseconds")
print(f"Time: {entry['start_time']}")
print("---")
return slow_log
Best Practices Key Naming Conventions
Use consistent, hierarchical naming:
Good naming patterns
user:123:profile # User profile data user:123:sessions:abc # User session cache:product:456 # Cached product queue:emails:pending # Email queue lock:resource:789 # Resource lock counter:api:requests:daily # Daily API request counter leaderboard:global:score # Global leaderboard
Avoid
u123 # Too cryptic user_profile_123 # Underscores less common 123:user # Wrong hierarchy
Memory Management
Set TTL on all temporary data
r.setex("temp:data", 3600, value) # Expires in 1 hour
Limit collection sizes
r.lpush("activity_log", entry) r.ltrim("activity_log", 0, 999) # Keep only 1000 items
Use appropriate data structures
Hash is more memory-efficient than multiple keys
r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})
vs
r.set("user:123:name", "Alice") r.set("user:123:email", "alice@example.com")
Monitor memory usage
if r.info('memory')['used_memory'] > threshold: # Implement eviction or cleanup cleanup_old_data()
Security
Use authentication
r = redis.Redis( host='localhost', port=6379, password='your-secure-password', username='your-username' # Redis 6+ )
Use SSL/TLS for production
pool = redis.ConnectionPool( host='redis.example.com', port=6380, connection_class=redis.SSLConnection, ssl_cert_reqs='required', ssl_ca_certs='/path/to/ca-cert.pem' )
Credential provider pattern
from redis import UsernamePasswordCredentialProvider
creds_provider = UsernamePasswordCredentialProvider("username", "password") r = redis.Redis( host="localhost", port=6379, credential_provider=creds_provider )
Testing import fakeredis import pytest
@pytest.fixture def redis_client(): """Provide fake Redis client for testing.""" return fakeredis.FakeRedis(decode_responses=True)
def test_caching(redis_client): """Test caching logic.""" # Test cache miss assert redis_client.get("test_key") is None
# Test cache set
redis_client.setex("test_key", 60, "test_value")
assert redis_client.get("test_key") == "test_value"
# Test expiration
assert redis_client.ttl("test_key") <= 60
def test_session_management(redis_client): """Test session operations.""" session_manager = SessionManager(redis_client)
# Create session
session_id = session_manager.create_session(user_id=123)
assert session_id is not None
# Get session
session = session_manager.get_session(session_id)
assert session['user_id'] == 123
# Delete session
assert session_manager.delete_session(session_id) is True
assert session_manager.get_session(session_id) is None
Examples Example 1: User Session Management with Redis import redis import json import uuid from datetime import datetime, timedelta
class UserSessionManager: """Complete user session management with Redis."""
def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
self.redis = redis_client
self.ttl = ttl
def create_session(self, user_id: int, user_data: dict = None) -> str:
"""Create new user session."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
"last_accessed": datetime.utcnow().isoformat(),
"data": user_data or {}
}
# Store session with TTL
self.redis.setex(session_key, self.ttl, json.dumps(session_data))
# Track user's active sessions
self.redis.sadd(f"user:{user_id}:sessions", session_id)
return session_id
def get_session(self, session_id: str) -> dict:
"""Get session and refresh TTL."""
session_key = f"session:{session_id}"
session_data = self.redis.get(session_key)
if session_data:
session = json.loads(session_data)
session['last_accessed'] = datetime.utcnow().isoformat()
# Refresh TTL
self.redis.setex(session_key, self.ttl, json.dumps(session))
return session
return None
def delete_session(self, session_id: str) -> bool:
"""Delete session."""
session = self.get_session(session_id)
if not session:
return False
user_id = session['user_id']
# Remove session
self.redis.delete(f"session:{session_id}")
# Remove from user's session set
self.redis.srem(f"user:{user_id}:sessions", session_id)
return True
def delete_all_user_sessions(self, user_id: int):
"""Delete all sessions for a user."""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
for session_id in session_ids:
self.redis.delete(f"session:{session_id}")
self.redis.delete(sessions_key)
def get_user_sessions(self, user_id: int) -> list:
"""Get all active sessions for a user."""
sessions_key = f"user:{user_id}:sessions"
session_ids = self.redis.smembers(sessions_key)
sessions = []
for session_id in session_ids:
session = self.get_session(session_id)
if session:
session['session_id'] = session_id
sessions.append(session)
return sessions
Usage
r = redis.Redis(decode_responses=True) session_mgr = UserSessionManager(r)
Create session
session_id = session_mgr.create_session( user_id=123, user_data={"role": "admin", "permissions": ["read", "write"]} )
Get session
session = session_mgr.get_session(session_id) print(f"User ID: {session['user_id']}")
List all user sessions
sessions = session_mgr.get_user_sessions(123) print(f"Active sessions: {len(sessions)}")
Logout (delete session)
session_mgr.delete_session(session_id)
Example 2: Real-Time Leaderboard import redis import time
class Leaderboard: """Real-time leaderboard using Redis sorted sets."""
def __init__(self, redis_client: redis.Redis, name: str):
self.redis = redis_client
self.key = f"leaderboard:{name}"
def add_score(self, player_id: str, score: float):
"""Add or update player score."""
self.redis.zadd(self.key, {player_id: score})
def increment_score(self, player_id: str, increment: float):
"""Increment player score."""
self.redis.zincrby(self.key, increment, player_id)
def get_top(self, count: int = 10) -> list:
"""Get top players."""
# ZREVRANGE for highest scores first
players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)
return [
{
"rank": idx + 1,
"player_id": player_id,
"score": score
}
for idx, (player_id, score) in enumerate(players)
]
def get_rank(self, player_id: str) -> dict:
"""Get player rank and score."""
score = self.redis.zscore(self.key, player_id)
if score is None:
return None
# ZREVRANK for rank (0-indexed, highest first)
rank = self.redis.zrevrank(self.key, player_id)
return {
"player_id": player_id,
"rank": rank + 1 if rank is not None else None,
"score": score
}
def get_around(self, player_id: str, count: int = 5) -> list:
"""Get players around a specific player."""
rank = self.redis.zrevrank(self.key, player_id)
if rank is None:
return []
# Get players before and after
start = max(0, rank - count)
end = rank + count
players = self.redis.zrevrange(self.key, start, end, withscores=True)
return [
{
"rank": start + idx + 1,
"player_id": pid,
"score": score,
"is_current": pid == player_id
}
for idx, (pid, score) in enumerate(players)
]
def get_total_players(self) -> int:
"""Get total number of players."""
return self.redis.zcard(self.key)
def remove_player(self, player_id: str) -> bool:
"""Remove player from leaderboard."""
return self.redis.zrem(self.key, player_id) > 0
Usage
r = redis.Redis(decode_responses=True) leaderboard = Leaderboard(r, "global")
Add scores
leaderboard.add_score("alice", 1500) leaderboard.add_score("bob", 2000) leaderboard.add_score("charlie", 1800) leaderboard.increment_score("alice", 200) # alice now at 1700
Get top 10
top_players = leaderboard.get_top(10) for player in top_players: print(f"#{player['rank']}: {player['player_id']} - {player['score']}")
Get player rank
alice_stats = leaderboard.get_rank("alice") print(f"Alice is rank {alice_stats['rank']} with {alice_stats['score']} points")
Get players around alice
nearby = leaderboard.get_around("alice", count=2) for player in nearby: marker = " <-- YOU" if player['is_current'] else "" print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")
Example 3: Distributed Rate Limiter import redis import time
class RateLimiter: """Distributed rate limiter using Redis."""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Lua script for atomic rate limiting
self.rate_limit_script = self.redis.register_script("""
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return {0, limit, current - 1}
else
return {1, limit, current}
end
""")
def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
"""
Check if request is within rate limit.
Args:
identifier: User ID, IP address, or API key
limit: Maximum requests allowed
window: Time window in seconds
Returns:
dict with allowed (bool), limit, current, remaining
"""
key = f"rate_limit:{identifier}:{int(time.time() // window)}"
allowed, max_limit, current = self.rate_limit_script(
keys=[key],
args=[limit, window]
)
return {
"allowed": bool(allowed),
"limit": max_limit,
"current": current,
"remaining": max(0, max_limit - current),
"reset_at": (int(time.time() // window) + 1) * window
}
def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
"""
Sliding window rate limiter using sorted sets.
More accurate but slightly more expensive.
"""
key = f"rate_limit:sliding:{identifier}"
now = time.time()
window_start = now - window
# Remove old entries
self.redis.zremrangebyscore(key, 0, window_start)
# Count current requests
current = self.redis.zcard(key)
if current < limit:
# Add new request
self.redis.zadd(key, {str(now): now})
self.redis.expire(key, window)
return {
"allowed": True,
"limit": limit,
"current": current + 1,
"remaining": limit - current - 1
}
else:
return {
"allowed": False,
"limit": limit,
"current": current,
"remaining": 0
}
Usage
r = redis.Redis(decode_responses=True) limiter = RateLimiter(r)
API rate limiting: 100 requests per minute
user_id = "user_123" result = limiter.check_rate_limit(user_id, limit=100, window=60)
if result["allowed"]: print(f"Request allowed. {result['remaining']} requests remaining.") # Process request else: print(f"Rate limit exceeded. Try again at {result['reset_at']}") # Return 429 Too Many Requests
More accurate sliding window
result = limiter.sliding_window_check(user_id, limit=100, window=60)
Example 4: Distributed Job Queue import redis import json import time import uuid from typing import Optional, Callable
class JobQueue: """Distributed job queue with Redis."""
def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
self.redis = redis_client
self.queue_name = queue_name
self.queue_key = f"queue:{queue_name}"
self.processing_key = f"queue:{queue_name}:processing"
def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
"""
Add job to queue.
Args:
job_type: Type of job (for routing to workers)
payload: Job data
priority: Higher priority = processed first (0 = normal)
Returns:
job_id
"""
job_id = str(uuid.uuid4())
job_data = {
"id": job_id,
"type": job_type,
"payload": payload,
"enqueued_at": time.time(),
"attempts": 0
}
# Add to queue (use ZADD for priority queue)
score = -priority # Negative for higher priority first
self.redis.zadd(self.queue_key, {json.dumps(job_data): score})
return job_id
def dequeue(self, timeout: int = 0) -> Optional[dict]:
"""
Get next job from queue.
Args:
timeout: Block for this many seconds (0 = no blocking)
Returns:
Job data or None
"""
# Get highest priority job (lowest score)
jobs = self.redis.zrange(self.queue_key, 0, 0)
if not jobs:
if timeout > 0:
time.sleep(min(timeout, 1))
return self.dequeue(timeout - 1)
return None
job_json = jobs[0]
# Move to processing set atomically
pipe = self.redis.pipeline()
pipe.zrem(self.queue_key, job_json)
pipe.zadd(self.processing_key, {job_json: time.time()})
pipe.execute()
job_data = json.loads(job_json)
job_data['attempts'] += 1
return job_data
def complete(self, job_data: dict):
"""Mark job as completed."""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# Remove from processing
self.redis.zrem(self.processing_key, job_json)
def retry(self, job_data: dict, delay: int = 0):
"""Retry failed job."""
job_json = json.dumps({
k: v for k, v in job_data.items()
if k != 'attempts'
})
# Remove from processing
self.redis.zrem(self.processing_key, job_json)
# Re-enqueue with delay
if delay > 0:
time.sleep(delay)
self.redis.zadd(self.queue_key, {job_json: 0})
def get_stats(self) -> dict:
"""Get queue statistics."""
return {
"queued": self.redis.zcard(self.queue_key),
"processing": self.redis.zcard(self.processing_key)
}
Worker example
class Worker: """Job worker."""
def __init__(self, queue: JobQueue, handlers: dict):
self.queue = queue
self.handlers = handlers
def process_jobs(self):
"""Process jobs from queue."""
print("Worker started. Waiting for jobs...")
while True:
job = self.queue.dequeue(timeout=5)
if job:
print(f"Processing job {job['id']} (type: {job['type']})")
try:
# Get handler for job type
handler = self.handlers.get(job['type'])
if handler:
handler(job['payload'])
self.queue.complete(job)
print(f"Job {job['id']} completed")
else:
print(f"No handler for job type: {job['type']}")
self.queue.complete(job)
except Exception as e:
print(f"Job {job['id']} failed: {e}")
if job['attempts'] < 3:
# Retry with exponential backoff
delay = 2 ** job['attempts']
print(f"Retrying in {delay}s...")
self.queue.retry(job, delay=delay)
else:
print(f"Job {job['id']} failed permanently")
self.queue.complete(job)
Usage
r = redis.Redis(decode_responses=True) queue = JobQueue(r, "email_queue")
Enqueue jobs
job_id = queue.enqueue("send_email", { "to": "user@example.com", "subject": "Welcome!", "body": "Thanks for signing up." }, priority=1)
Define handlers
def send_email_handler(payload): print(f"Sending email to {payload['to']}") # Email sending logic here time.sleep(1) # Simulate work
handlers = { "send_email": send_email_handler }
Start worker
worker = Worker(queue, handlers)
worker.process_jobs() # This blocks - run in separate process
Example 5: Real-Time Event Streaming import redis import json import time from typing import Callable, Optional
class EventStream: """Real-time event streaming with Redis Streams."""
def __init__(self, redis_client: redis.Redis, stream_name: str):
self.redis = redis_client
self.stream_name = stream_name
def publish(self, event_type: str, data: dict) -> str:
"""Publish event to stream."""
event = {
"type": event_type,
"data": json.dumps(data),
"timestamp": time.time()
}
# Add to stream (returns auto-generated ID)
event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
return event_id
def read_events(self, last_id: str = '0', count: int = 10) -> list:
"""Read events from stream."""
events = self.redis.xread(
{self.stream_name: last_id},
count=count,
block=1000 # 1 second timeout
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def create_consumer_group(self, group_name: str):
"""Create consumer group for parallel processing."""
try:
self.redis.xgroup_create(
name=self.stream_name,
groupname=group_name,
id='0',
mkstream=True
)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
def consume_events(self, group_name: str, consumer_name: str,
count: int = 10) -> list:
"""Consume events as part of consumer group."""
events = self.redis.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={self.stream_name: '>'},
count=count,
block=5000
)
if not events:
return []
_, event_list = events[0]
return [
{
"id": event_id,
"type": event_data[b'type'].decode(),
"data": json.loads(event_data[b'data'].decode()),
"timestamp": float(event_data[b'timestamp'])
}
for event_id, event_data in event_list
]
def acknowledge(self, group_name: str, event_id: str):
"""Acknowledge processed event."""
self.redis.xack(self.stream_name, group_name, event_id)
def get_pending(self, group_name: str) -> list:
"""Get pending (unacknowledged) events."""
pending = self.redis.xpending_range(
name=self.stream_name,
groupname=group_name,
min='-',
max='+',
count=100
)
return pending
Usage Example: Activity Feed
r = redis.Redis() activity_stream = EventStream(r, "user_activity")
Publish events
activity_stream.publish("user_signup", { "user_id": 123, "email": "alice@example.com" })
activity_stream.publish("post_created", { "user_id": 123, "post_id": 456, "title": "My First Post" })
Read events (simple consumer)
last_id = '0' while True: events = activity_stream.read_events(last_id, count=10)
for event in events:
print(f"Event: {event['type']}")
print(f"Data: {event['data']}")
last_id = event['id']
if not events:
break
Consumer group example
activity_stream.create_consumer_group("processors")
Worker consuming events
while True: events = activity_stream.consume_events( group_name="processors", consumer_name="worker-1", count=10 )
for event in events:
try:
# Process event
process_event(event)
# Acknowledge
activity_stream.acknowledge("processors", event['id'])
except Exception as e:
print(f"Failed to process event {event['id']}: {e}")
# Event remains unacknowledged for retry
Example 6: Cache-Aside Pattern with Multi-Level Caching import redis import json import hashlib from typing import Optional, Any, Callable
class MultiLevelCache: """Multi-level caching with Redis and local cache."""
def __init__(self, redis_client: redis.Redis,
local_cache_size: int = 100,
local_ttl: int = 60,
redis_ttl: int = 3600):
self.redis = redis_client
self.local_cache = {}
self.local_cache_size = local_cache_size
self.local_ttl = local_ttl
self.redis_ttl = redis_ttl
def _make_key(self, namespace: str, key: str) -> str:
"""Generate cache key."""
return f"cache:{namespace}:{key}"
def get(self, namespace: str, key: str,
compute_fn: Optional[Callable] = None) -> Optional[Any]:
"""
Get value from cache with fallback to compute function.
Lookup order: Local cache → Redis → Compute function
"""
cache_key = self._make_key(namespace, key)
# Level 1: Local cache
if cache_key in self.local_cache:
entry = self.local_cache[cache_key]
if time.time() < entry['expires_at']:
return entry['value']
else:
del self.local_cache[cache_key]
# Level 2: Redis cache
redis_value = self.redis.get(cache_key)
if redis_value:
value = json.loads(redis_value)
# Populate local cache
self._set_local(cache_key, value)
return value
# Level 3: Compute function
if compute_fn:
value = compute_fn()
if value is not None:
self.set(namespace, key, value)
return value
return None
def set(self, namespace: str, key: str, value: Any):
"""Set value in both cache levels."""
cache_key = self._make_key(namespace, key)
serialized = json.dumps(value)
# Set in Redis
self.redis.setex(cache_key, self.redis_ttl, serialized)
# Set in local cache
self._set_local(cache_key, value)
def _set_local(self, key: str, value: Any):
"""Set value in local cache with LRU eviction."""
# Simple LRU: remove oldest if at capacity
if len(self.local_cache) >= self.local_cache_size:
# Remove oldest entry
oldest_key = min(
self.local_cache.keys(),
key=lambda k: self.local_cache[k]['expires_at']
)
del self.local_cache[oldest_key]
self.local_cache[key] = {
'value': value,
'expires_at': time.time() + self.local_ttl
}
def delete(self, namespace: str, key: str):
"""Delete from all cache levels."""
cache_key = self._make_key(namespace, key)
# Delete from Redis
self.redis.delete(cache_key)
# Delete from local cache
if cache_key in self.local_cache:
del self.local_cache[cache_key]
def invalidate_namespace(self, namespace: str):
"""Invalidate all keys in namespace."""
pattern = f"cache:{namespace}:*"
# Delete from Redis
for key in self.redis.scan_iter(match=pattern, count=100):
self.redis.delete(key)
# Delete from local cache
to_delete = [
k for k in self.local_cache.keys()
if k.startswith(f"cache:{namespace}:")
]
for k in to_delete:
del self.local_cache[k]
Usage
r = redis.Redis(decode_responses=True) cache = MultiLevelCache(r)
def get_user(user_id: int) -> dict: """Get user with multi-level caching.""" return cache.get( namespace="users", key=str(user_id), compute_fn=lambda: database.query_user(user_id) )
First call: Queries database, caches result
user = get_user(123)
Second call: Returns from local cache (fastest)
user = get_user(123)
Update user
def update_user(user_id: int, data: dict): database.update_user(user_id, data)
# Invalidate cache
cache.delete("users", str(user_id))
Invalidate all user caches
cache.invalidate_namespace("users")
Example 7: Geo-Location with Redis import redis
class GeoLocation: """Geo-spatial indexing and queries with Redis."""
def __init__(self, redis_client: redis.Redis, index_name: str):
self.redis = redis_client
self.key = f"geo:{index_name}"
def add_location(self, location_id: str, longitude: float, latitude: float):
"""Add location to geo index."""
self.redis.geoadd(self.key, longitude, latitude, location_id)
def add_locations(self, locations: list):
"""Batch add locations.
Args:
locations: List of (location_id, longitude, latitude) tuples
"""
self.redis.geoadd(self.key, *[
item for loc in locations
for item in (loc[1], loc[2], loc[0])
])
def get_position(self, location_id: str) -> tuple:
"""Get coordinates of a location."""
result = self.redis.geopos(self.key, location_id)
if result and result[0]:
return result[0] # (longitude, latitude)
return None
def find_nearby(self, longitude: float, latitude: float,
radius: float, unit: str = 'km', count: int = None) -> list:
"""
Find locations within radius.
Args:
longitude: Center longitude
latitude: Center latitude
radius: Search radius
unit: Distance unit ('m', 'km', 'mi', 'ft')
count: Maximum results
"""
args = {
'longitude': longitude,
'latitude': latitude,
'radius': radius,
'unit': unit,
'withdist': True,
'withcoord': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadius(self.key, **args)
return [
{
'location_id': location_id,
'distance': distance,
'coordinates': (longitude, latitude)
}
for location_id, distance, (longitude, latitude) in results
]
def find_nearby_member(self, location_id: str, radius: float,
unit: str = 'km', count: int = None) -> list:
"""Find locations near an existing member."""
args = {
'member': location_id,
'radius': radius,
'unit': unit,
'withdist': True,
'sort': 'ASC'
}
if count:
args['count'] = count
results = self.redis.georadiusbymember(self.key, **args)
return [
{
'location_id': loc_id,
'distance': distance
}
for loc_id, distance in results
if loc_id != location_id # Exclude self
]
def distance_between(self, location_id1: str, location_id2: str,
unit: str = 'km') -> float:
"""Calculate distance between two locations."""
return self.redis.geodist(self.key, location_id1, location_id2, unit)
Usage Example: Restaurant finder
r = redis.Redis(decode_responses=True) restaurants = GeoLocation(r, "restaurants")
Add restaurants
restaurants.add_locations([ ("rest1", -122.4194, 37.7749), # San Francisco ("rest2", -122.4068, 37.7849), ("rest3", -122.4312, 37.7652), ])
Find restaurants near coordinates
nearby = restaurants.find_nearby( longitude=-122.4194, latitude=37.7749, radius=5, unit='km', count=10 )
for restaurant in nearby: print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} km away")
Find restaurants near a specific restaurant
similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')
Get distance between two restaurants
distance = restaurants.distance_between("rest1", "rest2", unit='km') print(f"Distance: {distance:.2f} km")
Quick Reference Common Operations
Connection
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
Strings
r.set('key', 'value') r.setex('key', 3600, 'value') # With TTL r.get('key') r.incr('counter')
Hashes
r.hset('user:123', 'name', 'Alice') r.hset('user:123', mapping={'name': 'Alice', 'age': 30}) r.hget('user:123', 'name') r.hgetall('user:123')
Lists
r.lpush('queue', 'item') r.rpush('queue', 'item') r.lpop('queue') r.lrange('queue', 0, -1)
Sets
r.sadd('tags', 'python', 'redis') r.smembers('tags') r.sismember('tags', 'python')
Sorted Sets
r.zadd('leaderboard', {'alice': 100, 'bob': 200}) r.zrange('leaderboard', 0, -1, withscores=True) r.zrank('leaderboard', 'alice')
Expiration
r.expire('key', 3600) r.ttl('key')
Pipelining
pipe = r.pipeline() pipe.set('key1', 'value1') pipe.set('key2', 'value2') results = pipe.execute()
Time Complexity GET, SET: O(1) HGET, HSET: O(1) LPUSH, RPUSH, LPOP, RPOP: O(1) SADD, SREM, SISMEMBER: O(1) ZADD, ZREM: O(log(N)) ZRANGE, ZREVRANGE: O(log(N)+M) where M is result size SCAN, SSCAN, HSCAN, ZSCAN: O(1) per iteration
Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: State Management, Distributed Systems, Performance Optimization Compatible With: redis-py, Redis 6.0+, Redis 7.0+