Redis Caching Patterns for Web Applications
Redis Caching Patterns for Web Applications
Redis is a lightning-fast in-memory data store that's perfect for caching. By reducing database load and response times, proper caching can transform your application's performance. Let's explore the most effective patterns.
Why Redis for Caching?
- Sub-millisecond latency for reads and writes
- Rich data structures: strings, hashes, lists, sets, sorted sets
- Built-in TTL (time-to-live) for automatic expiration
- Atomic operations prevent race conditions
- Persistence options if you need durability
- Cluster mode for horizontal scaling
Pattern 1: Cache-Aside (Lazy Loading)
The most common pattern: check cache first, fall back to database if miss, then populate cache for next time.
import redis
import json
from typing import Optional
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id: int) -> Optional[dict]:
"""Get user with cache-aside pattern."""
cache_key = f"user:{user_id}"
# 1. Check cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. Cache miss - query database
user = db.query("SELECT * FROM users WHERE id = %s", [user_id])
if user is None:
return None
# 3. Populate cache for next time (TTL: 1 hour)
r.setex(cache_key, 3600, json.dumps(user))
return user
def invalidate_user(user_id: int) -> None:
"""Call this when user data changes."""
r.delete(f"user:{user_id}")
Pattern 2: Write-Through
Update cache synchronously when data changes. Ensures cache is always fresh but adds latency to writes.
def update_user(user_id: int, data: dict) -> dict:
"""Update user with write-through caching."""
# 1. Update database
db.execute(
"UPDATE users SET name = %s, email = %s WHERE id = %s",
[data["name"], data["email"], user_id]
)
# 2. Update cache immediately
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
return data
def create_user(data: dict) -> dict:
"""Create user and warm cache."""
# 1. Insert into database
user_id = db.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
[data["name"], data["email"]]
)
data["id"] = user_id
# 2. Warm the cache
cache_key = f"user:{user_id}"
r.setex(cache_key, 3600, json.dumps(data))
return data
Pattern 3: Cache with Refresh-Ahead
Proactively refresh cache entries before they expire. Prevents cache misses for frequently accessed data.
import time
def get_user_with_refresh(user_id: int) -> Optional[dict]:
"""Get user with refresh-ahead pattern."""
cache_key = f"user:{user_id}"
ttl_key = f"user:{user_id}:ttl"
# Check cache
cached = r.get(cache_key)
if cached:
# Check if we should refresh (TTL < 5 minutes)
ttl = r.ttl(cache_key)
if ttl < 300:
# Trigger async refresh (don't block the request)
refresh_cache_async.delay(user_id)
return json.loads(cached)
# Cache miss - fetch and cache
user = db.query("SELECT * FROM users WHERE id = %s", [user_id])
if user:
r.setex(cache_key, 3600, json.dumps(user))
return user
# Celery task for async refresh
@celery.task
def refresh_cache_async(user_id: int):
user = db.query("SELECT * FROM users WHERE id = %s", [user_id])
if user:
r.setex(f"user:{user_id}", 3600, json.dumps(user))
Pattern 4: Request Coalescing
Prevent cache stampede by ensuring only one request fetches data while others wait:
import time
from contextlib import contextmanager
@contextmanager
def cache_lock(key: str, timeout: int = 10):
"""Distributed lock using Redis."""
lock_key = f"lock:{key}"
# Try to acquire lock
acquired = r.set(lock_key, "1", nx=True, ex=timeout)
try:
yield acquired
finally:
if acquired:
r.delete(lock_key)
def get_user_coalesced(user_id: int) -> Optional[dict]:
"""Get user with request coalescing."""
cache_key = f"user:{user_id}"
# Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Try to acquire lock
with cache_lock(cache_key) as acquired:
if acquired:
# We got the lock - fetch from database
user = db.query("SELECT * FROM users WHERE id = %s", [user_id])
if user:
r.setex(cache_key, 3600, json.dumps(user))
return user
else:
# Another request is fetching - wait and retry
for _ in range(50): # Wait up to 5 seconds
time.sleep(0.1)
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Timeout - fetch ourselves
return db.query("SELECT * FROM users WHERE id = %s", [user_id])
Pattern 5: Rate Limiting
Redis is perfect for rate limiting due to its atomic operations:
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
"""
Sliding window rate limiter.
Args:
user_id: User to check
limit: Maximum requests per window
window: Time window in seconds
Returns:
True if rate limited, False otherwise
"""
key = f"ratelimit:{user_id}"
now = time.time()
window_start = now - window
# Use a pipeline for atomic operations
pipe = r.pipeline()
# Remove old entries outside the window
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in current window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry on the key
pipe.expire(key, window)
results = pipe.execute()
request_count = results[1]
return request_count >= limit
# Usage in a view
def api_endpoint(request):
if is_rate_limited(request.user.id):
return Response({"error": "Rate limited"}, status=429)
# ... handle request
Caching Best Practices
- Set appropriate TTLs - balance freshness vs. hit rate
- Use consistent key naming: {type}:{id}:{qualifier}
- Serialize efficiently - JSON for readability, MessagePack for speed
- Monitor hit rates - low hit rate means wasted memory
- Handle cache failures gracefully - fall back to database
- Invalidate conservatively - when in doubt, invalidate
- Consider cache warming for critical data
- Use Redis pipelining for multiple operations
Advanced tips
One of the two hard problems in CS. Strategies:
- TTL-based: Set reasonable expiry, accept staleness
- Event-based: Invalidate on write operations
- Versioned keys: user:123:v2 instead of updating user:123
Memory Management
- Set maxmemory in redis.conf
- Choose eviction policy: allkeys-lru is usually best
- Monitor with redis-cli INFO memory
Serialization
- JSON: Human-readable, ~1ms per object
- MessagePack: Binary, ~0.1ms per object
- Pickle: Python-only, security concerns
- Protocol Buffers: Schema-based, very fast
Monitoring Cache Performance
# Redis CLI commands for monitoring
redis-cli INFO stats # Hit rate, operations/sec
redis-cli INFO memory # Memory usage
redis-cli SLOWLOG GET 10 # Slow operations
redis-cli MONITOR # Real-time command log (careful in prod!)
# Key metrics to track:
# - keyspace_hits / (keyspace_hits + keyspace_misses) = hit rate
# - used_memory vs maxmemory
# - connected_clients
# - instantaneous_ops_per_sec
Proper caching can reduce database load by 90%+ and cut response times from hundreds of milliseconds to single digits. Start with cache-aside, measure, and evolve your strategy based on actual usage patterns.
Related Posts
PostgreSQL Performance Tuning
Optimize your PostgreSQL database for better performance. Learn configuration tuning, query analysis, and indexing strategies.
1 min read
Building Interactive UIs with Django and HTMX
Learn how to create dynamic, JavaScript-free interfaces using Django and HTMX. Build modern web apps without the complexity of SPAs.
1 min read
Introduction to Machine Learning with Python
Start your ML journey with Python and scikit-learn. Build your first machine learning models with practical examples.
1 min read
Python Best Practices for 2024
Essential Python patterns and practices every developer should know. From type hints to dataclasses, level up your Python code.
1 min read
Comments
Log in to leave a comment.
Log In
Loading comments...