| Property | DP Memo Table | Infrastructure Cache |
|---|---|---|
| Key | Subproblem inputs | Request parameters (user_id, URL, query) |
| Value | Computed result | Database row, HTTP response, query result |
| Storage | Dictionary in memory | Redis, Memcached, CDN edge |
| Invalidation needed? | No (pure functions) | Yes (data can change) |
| Invalidation mechanism | N/A | TTL, explicit delete, cache-aside, write-through |
| Hit rate target | 100% (subproblems don’t change) | 80–99% (read-heavy workloads) |
| Failure if miss | O(2ⁿ) without memo | Full database query (acceptable but slow) |
Time-to-Live (TTL): every cache entry expires after a fixed duration.
class TTLCache:
"""Cache with per-entry expiration."""
def __init__(self, ttl_seconds: int):
self._store: dict[str, tuple] = {} # key -> (value, expiry_timestamp)
self._ttl = ttl_seconds
def get(self, key: str):
if key not in self._store:
return None
value, expiry = self._store[key]
if time.time() > expiry:
del self._store[key]
return None # cache miss: entry expired
return value # cache hit
def set(self, key: str, value) -> None:
self._store[key] = (value, time.time() + self._ttl)TTL is the simplest invalidation strategy. Its weakness: data staleness for the full TTL duration. If a user updates their profile, cached profiles remain stale for up to TTL seconds. For a 300-second TTL, users may see outdated profiles for 5 minutes.
Explicit invalidation: when data changes, explicitly delete the cached entry.
def update_user_profile(user_id: int, new_data: dict) -> None:
db.execute(f"UPDATE users SET ... WHERE id={user_id}")
r.delete(f"user:{user_id}:profile") # invalidate immediatelyExplicit invalidation eliminates the staleness window — the cache is cleared at the moment data changes. The cost: every write must know which cache keys to invalidate. For simple one-to-one mappings (user_id → profile), this is easy. For complex queries (all users who joined in March), determining which cache keys to invalidate is hard.
Write-through caching: writes go to both the cache and the database simultaneously.
def write_through_update(user_id: int, new_data: dict) -> None:
db.execute(f"UPDATE users SET ... WHERE id={user_id}")
r.set(f"user:{user_id}:profile", json.dumps(new_data)) # update cache inlineWrite-through ensures the cache always reflects the latest write. The cost: every write is slower (two writes instead of one). For write-heavy workloads, this doubles write latency.
Consider a product catalogue service. A product’s price is read thousands of times per minute, so the lookup is cached:
def get_product_price(product_id: int) -> float:
cache_key = f"product:{product_id}:price"
cached = r.get(cache_key)
if cached:
return float(cached)
price = db.query(f"SELECT price FROM products WHERE id={product_id}")
r.setex(cache_key, 600, price) # TTL = 10 minutes
return priceA pricing team runs a flash sale and updates the price. The function above knows nothing about the update. Three strategies handle this:
Strategy 1 — TTL expiry: do nothing. The cache entry expires after 600 seconds. Every request for up to 10 minutes sees the pre-sale price. Simple to implement; the staleness window is exactly the TTL.
Strategy 2 — Event invalidation: the pricing system emits an event on every price change. A subscriber listens and deletes the cache key immediately:
def on_price_changed(event: dict) -> None:
product_id = event["product_id"]
r.delete(f"product:{product_id}:price") # cache cleared at the moment of changeStaleness window is near-zero. Cost: the event pipeline must be reliable — a dropped event leaves stale data with no expiry (unless a background TTL is kept as a safety net).
Strategy 3 — Write-through: the pricing update service writes to the database and the cache in the same call:
def update_price(product_id: int, new_price: float) -> None:
db.execute(f"UPDATE products SET price={new_price} WHERE id={product_id}")
r.setex(f"product:{product_id}:price", 600, new_price) # cache updated inlineNo staleness at all for the specific key written. Cost: the write path becomes slower (two writes), and if the cache write fails after the database write succeeds, the cache holds the old value until TTL expires.
Choosing: TTL alone is acceptable when staleness of up to the TTL duration is tolerable (user profile avatars: 5-minute staleness is fine). Event invalidation is correct when staleness must be near-zero but the event pipeline is reliable (inventory counts, prices). Write-through is correct when you control all write paths and write volume is low (configuration values, feature flags).
Cache entries expire (TTL) or are invalidated (explicit delete). At the moment of expiration, if many processes simultaneously notice a cache miss for the same key, all of them simultaneously query the database. This is the thundering herd (FM7).
import threading
import time
class ThunderingHerdProtectedCache:
"""
Cache with thundering herd protection via mutex per key.
Only one thread fetches a given key; others wait for the result.
"""
def __init__(self):
self._store: dict = {}
self._locks: dict[str, threading.Event] = {}
self._meta_lock = threading.Lock()
def get_or_compute(self, key: str, compute_fn, ttl: int):
cached = self._store.get(key)
if cached and time.time() < cached[1]:
return cached[0] # cache hit, not expired
with self._meta_lock:
# Re-check under lock (another thread may have filled it)
cached = self._store.get(key)
if cached and time.time() < cached[1]:
return cached[0]
if key in self._locks:
event = self._locks[key]
else:
event = threading.Event()
self._locks[key] = event
should_compute = True
if should_compute:
value = compute_fn()
self._store[key] = (value, time.time() + ttl)
with self._meta_lock:
del self._locks[key]
event.set()
return value
else:
event.wait()
return self._store[key][0]The mutex-per-key pattern ensures only one thread fetches a given key at a time. Other threads wait rather than issuing duplicate database queries. In practice, many systems use a simpler approximation: probabilistic early expiration (PER) — start refreshing the cache slightly before expiry so that it is never empty.
A memo table grows unbounded — every subproblem result is stored forever. A production cache has bounded memory. When the cache is full, something must be evicted. The standard policy is Least Recently Used (LRU): evict the entry that was accessed least recently.
from collections import OrderedDict
class LRUCache:
"""
LRU cache with capacity limit.
Uses OrderedDict to maintain access order in O(1).
"""
def __init__(self, capacity: int):
self.capacity = capacity
self._store: OrderedDict = OrderedDict()
def get(self, key: str):
if key not in self._store:
return None
self._store.move_to_end(key) # mark as recently used
return self._store[key]
def set(self, key: str, value) -> None:
if key in self._store:
self._store.move_to_end(key)
self._store[key] = value
if len(self._store) > self.capacity:
self._store.popitem(last=False) # evict least recently usedLRU eviction exploits temporal locality: recently accessed items are more likely to be accessed again. For social media profiles (popular users accessed repeatedly), LRU correctly keeps hot items in cache. For batch data processing (each item accessed once), LRU provides no benefit — every item is evicted exactly when the next item fills the cache.