找回密码
 立即注册
首页 业界区 业界 Redis缓存一致性

Redis缓存一致性

迎脾 2025-10-1 13:00:21
Redis 缓存一致性指的是缓存数据与数据库数据保持同步,避免出现缓存数据过时、与数据库数据不匹配的情况。
策略核心思想一致性强度性能影响实现复杂度适用场景Cache-Aside应用层主动管理缓存:读时延后加载,写时更新DB并删除缓存最终一致性读操作可能延迟低最常用,读多写少,如用户信息、商品信息Write-Through将缓存作为主要数据入口,所有写操作同步更新缓存和数据库强一致性写性能较低中写多读少,对一致性要求极高,如金融账户Write-Behind写操作先更新缓存,随后异步批量更新数据库最终一致性写性能高高写操作密集,可容忍数据丢失,如日志、统计Read-Through应用只读缓存,由缓存组件自身负责未命中时从数据库加载取决于背后的写策略读操作平均延迟低中希望简化应用逻辑,有相应缓存组件支持1、Cache-Aside (旁路缓存) - 最常用模式

这是最广泛采用的模式,由应用程序显式地管理缓存和数据库的交互。
读流程:

  • 接收读请求。
  • 首先尝试从 Redis 中读取数据。
  • 如果命中(Cache Hit),直接返回数据。
  • 如果未命中(Cache Miss),则从数据库查询。
  • 将数据库返回的数据写入 Redis(称为“回填”),然后返回响应。
写流程:

  • 接收写请求。
  • 更新数据库。
  • 删除(而非更新)Redis 中对应的缓存键。
为何删除缓存,而不是更新它?
在并发场景下,如果采用更新缓存的方式,可能会因为操作时序问题导致缓存中被写入旧数据。删除缓存是一种更安全、更简单的策略,它确保后续的读请求能从数据库加载最新数据并重新回填缓存。
优点:实现简单,对业务代码侵入性低,缓存中只保留真正被请求的热点数据。
缺点:在极端的并发情况下,仍可能出现短时间的数据不一致(可通过“延迟双删”策略优化)。
代码示例
  1. import redis
  2. import pymysql
  3. import json
  4. import time
  5. # 初始化Redis和数据库连接
  6. r = redis.Redis(host='localhost', port=6379, db=0)
  7. db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')
  8. def get_product(product_id):
  9.     """读取产品信息"""
  10.     cache_key = f"product:{product_id}"
  11.    
  12.     # 1. 尝试从缓存获取
  13.     product_data = r.get(cache_key)
  14.     if product_data:
  15.         print(f"Cache hit for product {product_id}")
  16.         return json.loads(product_data)
  17.    
  18.     print(f"Cache miss for product {product_id}")
  19.    
  20.     # 2. 缓存未命中,查询数据库
  21.     cursor = db.cursor()
  22.     cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))
  23.     product = cursor.fetchone()
  24.     cursor.close()
  25.    
  26.     if not product:
  27.         # 3. 数据库也没有,缓存空值防止穿透
  28.         r.setex(cache_key, 300, json.dumps({"status": "not_found"}))  # 5分钟空值缓存
  29.         return None
  30.    
  31.     # 4. 数据库有数据,写入缓存
  32.     product_dict = {
  33.         'id': product[0],
  34.         'name': product[1],
  35.         'price': float(product[2]),
  36.         'stock': product[3]
  37.     }
  38.     r.setex(cache_key, 3600, json.dumps(product_dict))  # 缓存1小时
  39.    
  40.     return product_dict
  41. def update_product(product_id, new_data):
  42.     """更新产品信息"""
  43.     cache_key = f"product:{product_id}"
  44.    
  45.     # 1. 更新数据库
  46.     cursor = db.cursor()
  47.     cursor.execute(
  48.         "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s",
  49.         (new_data['name'], new_data['price'], new_data['stock'], product_id)
  50.     )
  51.     db.commit()
  52.     cursor.close()
  53.    
  54.     # 2. 删除缓存
  55.     r.delete(cache_key)
  56.     print(f"Cache invalidated for product {product_id}")
  57.    
  58.     # 3. 可选:预热缓存
  59.     # get_product(product_id)  # 立即重新加载到缓存
复制代码
缓存双删代码示例
这第二次删除是为了清除在“更新数据库”到“第一次删除缓存”这个极短时间窗口内,可能被其他读请求回填到缓存中的旧数据。
  1. import threading
  2. def update_product_with_double_delete(product_id, new_data):
  3.     """使用延迟双删策略更新产品"""
  4.     cache_key = f"product:{product_id}"
  5.    
  6.     # 第一次删除缓存
  7.     r.delete(cache_key)
  8.     print(f"First cache deletion for product {product_id}")
  9.    
  10.     # 更新数据库
  11.     cursor = db.cursor()
  12.     cursor.execute(
  13.         "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s",
  14.         (new_data['name'], new_data['price'], new_data['stock'], product_id)
  15.     )
  16.     db.commit()
  17.     cursor.close()
  18.    
  19.     # 延迟第二次删除
  20.     def delayed_delete():
  21.         time.sleep(0.5)  # 延迟500ms
  22.         r.delete(cache_key)
  23.         print(f"Second cache deletion for product {product_id}")
  24.    
  25.     threading.Thread(target=delayed_delete).start()
复制代码
2、Write-Through (写穿透) - 强一致性模式

在此模式下,缓存被视为主要的数据源。所有写操作都同步地经过缓存,由缓存组件负责同时更新缓存和数据库。
写流程:

  • 应用将数据写入缓存。
  • 缓存组件同步地将数据写入底层数据库。
  • 写入成功后返回。
优点:保证了强一致性,读写操作都面对缓存,性能通常优于直接读数据库。
缺点:写延迟较高(因为需要等待两个写操作完成),且需要缓存组件或中间件支持此模式。
示例代码
  1. class WriteThroughCache:
  2.     def __init__(self):
  3.         self.redis = redis.Redis(host='localhost', port=6379, db=0)
  4.         self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')
  5.    
  6.     def set(self, key, value, ttl=None):
  7.         """写入数据(同时更新缓存和数据库)"""
  8.         # 1. 更新数据库
  9.         self._update_db(key, value)
  10.         
  11.         # 2. 更新缓存
  12.         if ttl:
  13.             self.redis.setex(key, ttl, json.dumps(value))
  14.         else:
  15.             self.redis.set(key, json.dumps(value))
  16.    
  17.     def get(self, key):
  18.         """读取数据"""
  19.         # 直接从缓存读取
  20.         data = self.redis.get(key)
  21.         if data:
  22.             return json.loads(data)
  23.         return None
  24.    
  25.     def _update_db(self, key, value):
  26.         """更新数据库(根据key类型处理)"""
  27.         if key.startswith("product:"):
  28.             product_id = key.split(":")[1]
  29.             cursor = self.db.cursor()
  30.             cursor.execute(
  31.                 "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s",
  32.                 (value['name'], value['price'], value['stock'], product_id)
  33.             )
  34.             self.db.commit()
  35.             cursor.close()
  36. # 使用示例
  37. cache = WriteThroughCache()
  38. product_data = {'id': 123, 'name': 'New Product', 'price': 29.99, 'stock': 100}
  39. cache.set("product:123", product_data, ttl=3600)
  40. # 读取
  41. cached_product = cache.get("product:123")
复制代码
3、Write-Behind (写回) - 高性能写模式

与 Write-Through 类似,写操作首先更新缓存。但不同的是,对数据库的更新是异步批量进行的。
写流程:

  • 应用将数据写入缓存,写入成功后立即返回。
  • 缓存组件在后台异步地、批量地将一段时间内累积的写操作刷新到数据库。
优点:写性能极高,非常适合写操作非常密集的场景。
缺点:有数据丢失的风险(如果缓存宕机,尚未持久化到数据库的数据就会丢失),只能保证最终一致性。实现复杂度最高。
示例代码
  1. class WriteBehindCache:
  2.     def __init__(self):
  3.         self.redis = redis.Redis(host='localhost', port=6379, db=0)
  4.         self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')
  5.         self.write_queue = []
  6.         self.batch_size = 100
  7.         self.batch_interval = 10  # 秒
  8.         self.running = True
  9.         self.lock = threading.Lock()
  10.         
  11.         # 启动后台批处理线程
  12.         self.batch_thread = threading.Thread(target=self._batch_writer)
  13.         self.batch_thread.daemon = True
  14.         self.batch_thread.start()
  15.    
  16.     def set(self, key, value):
  17.         """写入数据(先更新缓存,异步更新数据库)"""
  18.         # 1. 立即更新缓存
  19.         self.redis.set(key, json.dumps(value))
  20.         
  21.         # 2. 将数据库更新加入队列
  22.         with self.lock:
  23.             self.write_queue.append((key, value))
  24.    
  25.     def get(self, key):
  26.         """读取数据"""
  27.         data = self.redis.get(key)
  28.         if data:
  29.             return json.loads(data)
  30.         return None
  31.    
  32.     def _batch_writer(self):
  33.         """后台批处理线程"""
  34.         while self.running:
  35.             time.sleep(self.batch_interval)
  36.             
  37.             if not self.write_queue:
  38.                 continue
  39.                
  40.             # 获取当前批量的数据
  41.             with self.lock:
  42.                 batch = self.write_queue[:self.batch_size]
  43.                 self.write_queue = self.write_queue[self.batch_size:]
  44.             
  45.             if not batch:
  46.                 continue
  47.                
  48.             try:
  49.                 # 批量更新数据库
  50.                 cursor = self.db.cursor()
  51.                 for key, value in batch:
  52.                     if key.startswith("product:"):
  53.                         product_id = key.split(":")[1]
  54.                         cursor.execute(
  55.                             "INSERT INTO products (id, name, price, stock) VALUES (%s, %s, %s, %s) "
  56.                             "ON DUPLICATE KEY UPDATE name=%s, price=%s, stock=%s",
  57.                             (product_id, value['name'], value['price'], value['stock'],
  58.                              value['name'], value['price'], value['stock'])
  59.                         )
  60.                 self.db.commit()
  61.                 cursor.close()
  62.                 print(f"Batch updated {len(batch)} items to database")
  63.             except Exception as e:
  64.                 print(f"Batch update failed: {e}")
  65.                 # 将失败的任务重新加入队列
  66.                 with self.lock:
  67.                     self.write_queue = batch + self.write_queue
  68.    
  69.     def stop(self):
  70.         """停止服务"""
  71.         self.running = False
  72.         self.batch_thread.join()
  73.         # 处理剩余队列
  74.         self._batch_writer()
  75. # 使用示例
  76. cache = WriteBehindCache()
  77. # 高频率写入
  78. for i in range(1000):
  79.     product_data = {'id': i, 'name': f'Product {i}', 'price': i*1.1, 'stock': i%100}
  80.     cache.set(f"product:{i}", product_data)
  81. # 停止服务时
  82. cache.stop()
复制代码
4、 Read-Through (读穿透)

此模式是 Cache-Aside 的变体,旨在简化应用逻辑。应用只从缓存读数据,当缓存未命中时,由缓存组件自身(而不是应用程序)负责从数据库加载数据并回填,然后返回给应用。
读流程:

  • 应用向缓存请求数据。
  • 如果缓存命中,直接返回。
  • 如果缓存未命中,缓存组件从数据库加载数据,存入缓存,然后返回。
优点:将缓存逻辑从应用中解耦,应用代码更简洁。
缺点:需要缓存组件(或智能客户端)支持此功能。
示例代码
  1. class ReadThroughCache:
  2.     def __init__(self):
  3.         self.redis = redis.Redis(host='localhost', port=6379, db=0)
  4.         self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')
  5.    
  6.     def get(self, key):
  7.         """读取数据(缓存未命中时自动加载)"""
  8.         # 1. 尝试从缓存获取
  9.         data = self.redis.get(key)
  10.         if data:
  11.             return json.loads(data)
  12.         
  13.         # 2. 缓存未命中,从数据库加载
  14.         value = self._load_from_db(key)
  15.         if value is None:
  16.             # 缓存空值防止穿透
  17.             self.redis.setex(key, 300, json.dumps({"status": "not_found"}))
  18.             return None
  19.         
  20.         # 3. 写入缓存
  21.         self.redis.setex(key, 3600, json.dumps(value))
  22.         return value
  23.    
  24.     def _load_from_db(self, key):
  25.         """从数据库加载数据"""
  26.         if key.startswith("product:"):
  27.             product_id = key.split(":")[1]
  28.             cursor = self.db.cursor()
  29.             cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))
  30.             product = cursor.fetchone()
  31.             cursor.close()
  32.             
  33.             if product:
  34.                 return {
  35.                     'id': product[0],
  36.                     'name': product[1],
  37.                     'price': float(product[2]),
  38.                     'stock': product[3]
  39.                 }
  40.         return None
  41. # 使用示例
  42. cache = ReadThroughCache()
  43. product = cache.get("product:123")
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册