Python缓存策略与实现一、缓存的基本概念缓存是一种用空间换时间的优化策略将计算结果或频繁访问的数据存储在快速访问的位置避免重复计算或IO操作。缓存的关键指标- 命中率缓存命中次数 / 总请求次数- 淘汰策略缓存满时如何选择移除的数据- 过期策略数据何时失效- 一致性缓存与源数据的同步二、functools.lru_cachefrom functools import lru_cache, cache# lru_cache最近最少使用缓存lru_cache(maxsize128)def fibonacci(n):if n 2:return nreturn fibonacci(n - 1) fibonacci(n - 2)print(fibonacci(100))print(fibonacci.cache_info())# CacheInfo(hits98, misses101, maxsize128, currsize101)# cache无限制缓存Python 3.9等价于lru_cache(maxsizeNone)cachedef factorial(n):return n * factorial(n - 1) if n else 1# 清除缓存fibonacci.cache_clear()# 注意lru_cache要求参数可哈希# 对于不可哈希参数需要转换def make_hashable(obj):if isinstance(obj, dict):return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))if isinstance(obj, (list, tuple)):return tuple(make_hashable(x) for x in obj)return obj三、带过期时间的缓存import timefrom functools import wrapsdef timed_cache(seconds300, maxsize128):带过期时间的缓存装饰器def decorator(func):cache {}access_order []wraps(func)def wrapper(*args, **kwargs):key (args, tuple(sorted(kwargs.items())))now time.time()# 检查缓存if key in cache:value, timestamp cache[key]if now - timestamp seconds:return valueelse:del cache[key]access_order.remove(key)# 计算结果result func(*args, **kwargs)cache[key] (result, now)access_order.append(key)# 淘汰超出容量的条目while len(cache) maxsize:oldest_key access_order.pop(0)cache.pop(oldest_key, None)return resultwrapper.cache_clear lambda: (cache.clear(), access_order.clear())wrapper.cache_info lambda: {size: len(cache), maxsize: maxsize, ttl: seconds}return wrapperreturn decoratortimed_cache(seconds60, maxsize100)def get_user_profile(user_id):缓存60秒的用户信息return database.query(fSELECT * FROM users WHERE id {user_id})四、多级缓存class MultiLevelCache:多级缓存内存 - Redis - 数据库def __init__(self):self.l1_cache {} # 内存缓存self.l1_ttl {}self.l1_max_size 1000self.l1_default_ttl 60def get(self, key, fetch_funcNone):# L1: 内存缓存value self._l1_get(key)if value is not None:return value# L2: Redis缓存示意value self._l2_get(key)if value is not None:self._l1_set(key, value)return value# L3: 数据源if fetch_func:value fetch_func()if value is not None:self._l1_set(key, value)self._l2_set(key, value)return valuereturn Nonedef _l1_get(self, key):if key in self.l1_cache:if time.time() self.l1_ttl.get(key, 0):return self.l1_cache[key]else:del self.l1_cache[key]del self.l1_ttl[key]return Nonedef _l1_set(self, key, value, ttlNone):if len(self.l1_cache) self.l1_max_size:# 淘汰最早过期的oldest min(self.l1_ttl, keyself.l1_ttl.get)del self.l1_cache[oldest]del self.l1_ttl[oldest]self.l1_cache[key] valueself.l1_ttl[key] time.time() (ttl or self.l1_default_ttl)def _l2_get(self, key):# 实际项目中连接Redisreturn Nonedef _l2_set(self, key, value, ttl300):# 实际项目中写入Redispassdef invalidate(self, key):使缓存失效self.l1_cache.pop(key, None)self.l1_ttl.pop(key, None)# self.redis.delete(key)五、缓存淘汰策略from collections import OrderedDictimport heapqclass LRUCache:最近最少使用def __init__(self, capacity):self.cache OrderedDict()self.capacity capacitydef get(self, key):if key in self.cache:self.cache.move_to_end(key)return self.cache[key]return Nonedef put(self, key, value):if key in self.cache:self.cache.move_to_end(key)self.cache[key] valueif len(self.cache) self.capacity:self.cache.popitem(lastFalse)class LFUCache:最不经常使用def __init__(self, capacity):self.capacity capacityself.cache {} # key - valueself.freq {} # key - frequencyself.freq_map {} # frequency - OrderedDict of keysself.min_freq 0def get(self, key):if key not in self.cache:return Noneself._increase_freq(key)return self.cache[key]def put(self, key, value):if self.capacity 0:returnif key in self.cache:self.cache[key] valueself._increase_freq(key)returnif len(self.cache) self.capacity:self._evict()self.cache[key] valueself.freq[key] 1self.freq_map.setdefault(1, OrderedDict())[key] Noneself.min_freq 1def _increase_freq(self, key):f self.freq[key]self.freq[key] f 1del self.freq_map[f][key]if not self.freq_map[f]:del self.freq_map[f]if self.min_freq f:self.min_freq 1self.freq_map.setdefault(f 1, OrderedDict())[key] Nonedef _evict(self):keys self.freq_map[self.min_freq]evict_key, _ keys.popitem(lastFalse)if not keys:del self.freq_map[self.min_freq]del self.cache[evict_key]del self.freq[evict_key]六、缓存模式6.1 Cache-Aside旁路缓存class CacheAside:def __init__(self, cache, db):self.cache cacheself.db dbdef read(self, key):# 先查缓存value self.cache.get(key)if value is not None:return value# 缓存未命中查数据库value self.db.get(key)if value is not None:self.cache.set(key, value)return valuedef write(self, key, value):# 先更新数据库self.db.set(key, value)# 再删除缓存而非更新避免并发问题self.cache.delete(key)6.2 Write-Through写穿透class WriteThrough:def __init__(self, cache, db):self.cache cacheself.db dbdef write(self, key, value):# 同时写入缓存和数据库self.cache.set(key, value)self.db.set(key, value)6.3 Write-Behind写回import threadingfrom queue import Queueclass WriteBehind:def __init__(self, cache, db, flush_interval5):self.cache cacheself.db dbself.write_queue Queue()self._start_flush_thread(flush_interval)def write(self, key, value):self.cache.set(key, value)self.write_queue.put((key, value))def _start_flush_thread(self, interval):def flush():while True:batch []while not self.write_queue.empty():batch.append(self.write_queue.get())if batch:self.db.batch_write(batch)time.sleep(interval)thread threading.Thread(targetflush, daemonTrue)thread.start()七、缓存穿透、击穿、雪崩import randomimport hashlibclass RobustCache:def __init__(self, cache):self.cache cacheself.null_keys set() # 布隆过滤器的简化版def get_with_protection(self, key, fetch_func, ttl300):防穿透缓存空值if key in self.null_keys:return Nonevalue self.cache.get(key)if value is not None:return valuevalue fetch_func(key)if value is None:self.null_keys.add(key) # 记录不存在的keyreturn None# 防雪崩TTL加随机偏移jitter random.randint(0, 60)self.cache.set(key, value, ttlttl jitter)return valuedef get_with_mutex(self, key, fetch_func, ttl300):防击穿互斥锁value self.cache.get(key)if value is not None:return valuelock_key flock:{key}if self.cache.set_nx(lock_key, 1, ttl10): # 获取锁try:value fetch_func(key)self.cache.set(key, value, ttlttl)return valuefinally:self.cache.delete(lock_key)else:# 等待其他线程填充缓存time.sleep(0.1)return self.cache.get(key)八、异步缓存import asyncioclass AsyncCache:def __init__(self, maxsize1000, ttl300):self._cache {}self._ttl {}self._locks {}self._maxsize maxsizeself._default_ttl ttlasync def get_or_set(self, key, factory, ttlNone):获取缓存不存在则通过factory创建# 检查缓存if key in self._cache and time.time() self._ttl.get(key, 0):return self._cache[key]# 使用锁防止并发重复计算if key not in self._locks:self._locks[key] asyncio.Lock()async with self._locks[key]:# 双重检查if key in self._cache and time.time() self._ttl.get(key, 0):return self._cache[key]value await factory()self._cache[key] valueself._ttl[key] time.time() (ttl or self._default_ttl)# 清理过期和超量self._cleanup()return valuedef _cleanup(self):now time.time()expired [k for k, t in self._ttl.items() if t now]for k in expired:self._cache.pop(k, None)self._ttl.pop(k, None)# 使用cache AsyncCache(ttl60)async def get_user(user_id):return await cache.get_or_set(fuser:{user_id},lambda: db.fetch_user(user_id))九、缓存装饰器工厂def cached(ttl300, key_funcNone, cache_noneFalse):通用缓存装饰器def decorator(func):_cache {}def default_key(*args, **kwargs):return (args, tuple(sorted(kwargs.items())))_key_func key_func or default_keywraps(func)def wrapper(*args, **kwargs):key _key_func(*args, **kwargs)now time.time()if key in _cache:value, expire_at _cache[key]if now expire_at:return valueresult func(*args, **kwargs)if result is not None or cache_none:_cache[key] (result, now ttl)return resultwrapper.invalidate lambda *args, **kwargs: _cache.pop(_key_func(*args, **kwargs), None)wrapper.clear _cache.clearreturn wrapperreturn decorator# 使用cached(ttl120, key_funclambda user_id: fprofile:{user_id})def get_profile(user_id):return db.query_profile(user_id)# 手动失效get_profile.invalidate(123)十、总结缓存策略选择1. 读多写少 - Cache-Aside2. 数据一致性要求高 - Write-Through3. 写入频繁 - Write-Behind异步写回4. 热点数据 - LFU淘汰策略5. 时间局部性强 - LRU淘汰策略注意事项- 缓存不是银弹增加了系统复杂度- 必须考虑缓存失效和数据一致性- 监控缓存命中率命中率低说明缓存策略有问题- 设置合理的TTL避免数据过期太慢或缓存抖动