找回密码
 立即注册
首页 业界区 业界 驾驭FastAPI多数据库:从读写分离到跨库事务的艺术 ...

驾驭FastAPI多数据库:从读写分离到跨库事务的艺术

峰襞副 2025-6-8 13:38:06
title: 驾驭FastAPI多数据库:从读写分离到跨库事务的艺术
date: 2025/05/16 00:58:24
updated: 2025/05/16 00:58:24
author:  cmdragon
excerpt:
在微服务架构中,FastAPI 多数据库配置管理通过独立数据存储实现隔离性、扩展性和性能优化。配置主从数据库时,使用 SQLAlchemy 创建异步引擎和会话工厂,并通过中间件实现动态数据库路由,实现读写分离。跨库事务处理采用 Saga 事务模式,确保分布式事务的一致性。以电商订单系统为例,展示了如何在 PostgreSQL、MongoDB 和 MySQL 之间进行跨库操作,并通过补偿机制处理事务失败。常见报错解决方案包括精确查询条件、正确管理会话和处理事务回滚。
categories:

  • 后端开发
  • FastAPI
tags:

  • FastAPI
  • 多数据库配置
  • 微服务架构
  • 分布式事务
  • Saga模式
  • 数据库连接池
  • 电商系统
1.jpeg
2.jpg
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意:https://tools.cmdragon.cn/
1. FastAPI多数据库配置管理实战

1.1 微服务架构下的数据库挑战

在微服务架构中,每个服务通常需要独立的数据存储。就像大型图书馆需要将不同学科的书籍分馆存放一样,电商系统可能将用户数据、订单数据、商品数据分别存储在不同数据库。这种架构带来三个核心需求:

  • 隔离性:每个服务的数据库独立运行,避免单点故障
  • 扩展性:不同数据库可按需选择存储引擎(如MySQL、MongoDB)
  • 性能优化:读写分离配置可提升系统吞吐量
1.2 多数据库配置实现

以下示例展示如何在FastAPI中配置主从数据库:
  1. # database.py
  2. from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
  3. from sqlalchemy.orm import sessionmaker
  4. # 主数据库配置(写操作)
  5. MASTER_DATABASE_URL = "postgresql+asyncpg://user:password@master-host/dbname"
  6. master_engine = create_async_engine(MASTER_DATABASE_URL, pool_size=10)
  7. # 从数据库配置(读操作)
  8. REPLICA_DATABASE_URL = "postgresql+asyncpg://user:password@replica-host/dbname"
  9. replica_engine = create_async_engine(REPLICA_DATABASE_URL, pool_size=20)
  10. # 创建会话工厂
  11. MasterSession = sessionmaker(master_engine, class_=AsyncSession, expire_on_commit=False)
  12. ReplicaSession = sessionmaker(replica_engine, class_=AsyncSession, expire_on_commit=False)
复制代码
关键配置参数说明:

  • pool_size:连接池大小,根据服务负载调整
  • max_overflow:允许超出连接池数量的临时连接
  • pool_timeout:获取连接的超时时间(秒)
1.3 动态数据库路由

通过中间件实现读写分离:
  1. # dependencies.py
  2. from fastapi import Request, Depends
  3. from database import MasterSession, ReplicaSession
  4. async def get_db(request: Request):
  5.     """智能路由数据库连接"""
  6.     # 写操作路由到主库
  7.     if request.method in ['POST', 'PUT', 'DELETE']:
  8.         db = MasterSession()
  9.     else:  # 读操作使用从库
  10.         db = ReplicaSession()
  11.     try:
  12.         yield db
  13.     finally:
  14.         await db.close()
  15. # 在路由中使用
  16. @app.post("/orders")
  17. async def create_order(
  18.         order: OrderSchema,
  19.         db: AsyncSession = Depends(get_db)
  20. ):
  21. # 业务逻辑
复制代码
2. 跨库事务处理方案

2.1 分布式事务的挑战

当订单服务需要同时更新订单库和扣减库存库时,传统ACID事务不再适用。这就像需要同时在两个不同银行账户之间转账,必须保证要么全部成功,要么全部失败。
2.2 Saga事务模式实现
  1. # services/transaction_coordinator.py
  2. from typing import List
  3. from fastapi import HTTPException
  4. class SagaCoordinator:
  5.     def __init__(self):
  6.         self.compensation_actions = []
  7.     async def execute_transaction(self, steps: List[callable]):
  8.         """执行Saga事务"""
  9.         try:
  10.             for step in steps:
  11.                 await step()
  12.         except Exception as e:
  13.             await self.compensate()
  14.             raise HTTPException(500, "Transaction failed")
  15.     async def compensate(self):
  16.         """补偿操作执行"""
  17.         for action in reversed(self.compensation_actions):
  18.             try:
  19.                 await action()
  20.             except Exception as compen_e:
  21.                 # 记录补偿失败日志
  22.                 logger.error(f"Compensation failed: {compen_e}")
  23. # 使用示例
  24. async def create_order_transaction():
  25.     coordinator = SagaCoordinator()
  26.     async def deduct_inventory():
  27.         # 预留库存
  28.         coordinator.compensation_actions.append(restore_inventory)
  29.     async def create_order_record():
  30.         # 创建订单记录
  31.         coordinator.compensation_actions.append(delete_order_record)
  32.     await coordinator.execute_transaction([
  33.         deduct_inventory,
  34.         create_order_record
  35.     ])
复制代码
3. 企业级案例:电商订单系统

3.1 场景描述

用户下单时需要同时操作:

  • 订单数据库(PostgreSQL)
  • 库存数据库(MongoDB)
  • 用户积分数据库(MySQL)
3.2 完整实现代码
  1. # models.py
  2. from pydantic import BaseModel
  3. class OrderCreate(BaseModel):
  4.     user_id: int
  5.     product_id: str
  6.     quantity: int
  7. # services/order_service.py
  8. from sqlalchemy import text
  9. from motor.motor_asyncio import AsyncIOMotorClient
  10. class OrderService:
  11.     def __init__(self):
  12.         # 初始化各数据库连接
  13.         self.pg_pool = MasterSession
  14.         self.mongo_client = AsyncIOMotorClient(MONGO_URI)
  15.         self.mysql_pool = create_async_engine(MYSQL_URI)
  16.     async def create_order(self, order_data: OrderCreate):
  17.         """创建订单事务"""
  18.         async with self.pg_pool() as pg_session,
  19.                 self.mysql_pool.begin() as mysql_conn:
  20.             # 步骤1:扣减MySQL库存
  21.             mysql_update = text("""
  22.                 UPDATE inventory
  23.                 SET stock = stock - :quantity
  24.                 WHERE product_id = :product_id
  25.                 AND stock >= :quantity
  26.             """)
  27.             await mysql_conn.execute(
  28.                 mysql_update,
  29.                 product_id=order_data.product_id,
  30.                 quantity=order_data.quantity
  31.             )
  32.             # 步骤2:创建PostgreSQL订单
  33.             pg_insert = text("""
  34.                 INSERT INTO orders (user_id, product_id, quantity)
  35.                 VALUES (:user_id, :product_id, :quantity)
  36.             """)
  37.             await pg_session.execute(pg_insert, order_data.dict())
  38.             # 步骤3:更新MongoDB用户行为
  39.             mongo_db = self.mongo_client.user_behavior
  40.             await mongo_db.events.insert_one({
  41.                 "user_id": order_data.user_id,
  42.                 "event_type": "order_created",
  43.                 "timestamp": datetime.now()
  44.             })
  45.             # 提交PostgreSQL事务
  46.             await pg_session.commit()
复制代码
课后Quiz

问题1: 当使用多个数据库时,如何保证跨库查询的事务一致性?
A. 使用数据库自带的分布式事务功能
B. 采用最终一致性模式配合补偿机制
C. 强制所有操作使用同个数据库
D. 增加重试机制自动处理失败
答案: B
解析: 在微服务架构中,不同服务通常使用不同数据库实例,传统ACID事务难以实施。采用Saga模式等最终一致性方案,配合补偿事务(如订单取消时的库存回补),是更可行的解决方案。
常见报错解决方案

错误1: MultipleResultsFound: Multiple rows were found when one was required
原因: 查询语句返回了多个结果,但期望单个结果
解决:

  • 检查查询条件是否足够精确
  • 使用.first()代替.one()
  • 添加LIMIT 1子句
错误2: InterfaceError: Connection already closed
原因: 数据库连接过早关闭
预防:

  • 使用上下文管理器管理会话
  • 检查连接池配置
  • 增加连接存活检测
  1. # 正确使用方式
  2. async def get_db():
  3.     async with Session() as session:
  4.         yield session
复制代码
错误3: DBAPIError: Can't reconnect until invalid transaction is rolled back
原因: 未正确处理事务回滚
解决:

  • 在异常处理中添加显式回滚
  • 设置事务自动回滚
  1. async def safe_transaction():
  2.     async with session.begin():
  3.         try:
  4.             # 业务操作
  5.             await session.commit()
  6.         except:
  7.             await session.rollback()
  8.             raise
复制代码
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:驾驭FastAPI多数据库:从读写分离到跨库事务的艺术 | cmdragon's Blog
往期文章归档:


  • 数据库事务隔离与Alembic数据恢复的实战艺术 | cmdragon's Blog
  • FastAPI与Alembic:数据库迁移的隐秘艺术 | cmdragon's Blog
  • 飞行中的引擎更换:生产环境数据库迁移的艺术与科学 | cmdragon's Blog
  • Alembic迁移脚本冲突的智能检测与优雅合并之道 | cmdragon's Blog
  • 多数据库迁移的艺术:Alembic在复杂环境中的精妙应用 | cmdragon's Blog
  • 数据库事务回滚:FastAPI中的存档与读档大法 | cmdragon's Blog
  • Alembic迁移脚本:让数据库变身时间旅行者 | cmdragon's Blog
  • 数据库连接池:从银行柜台到代码世界的奇妙旅程 | cmdragon's Blog
  • 点赞背后的技术大冒险:分布式事务与SAGA模式 | cmdragon's Blog
  • N+1查询:数据库性能的隐形杀手与终极拯救指南 | cmdragon's Blog
  • FastAPI与Tortoise-ORM开发的神奇之旅 | cmdragon's Blog
  • DDD分层设计与异步职责划分:让你的代码不再“异步”混乱 | cmdragon's Blog
  • 异步数据库事务锁:电商库存扣减的防超卖秘籍 | cmdragon's Blog
  • FastAPI中的复杂查询与原子更新指南 | cmdragon's Blog
  • 深入解析Tortoise-ORM关系型字段与异步查询 | cmdragon's Blog
  • FastAPI与Tortoise-ORM模型配置及aerich迁移工具 | cmdragon's Blog
  • 异步IO与Tortoise-ORM的数据库 | cmdragon's Blog
  • FastAPI数据库连接池配置与监控 | cmdragon's Blog
  • 分布式事务在点赞功能中的实现 | cmdragon's Blog
  • Tortoise-ORM级联查询与预加载性能优化 | cmdragon's Blog
  • 使用Tortoise-ORM和FastAPI构建评论系统 | cmdragon's Blog
  • 分层架构在博客评论功能中的应用与实现 | cmdragon's Blog
  • 深入解析事务基础与原子操作原理 | cmdragon's Blog
  • 掌握Tortoise-ORM高级异步查询技巧 | cmdragon's Blog
  • FastAPI与Tortoise-ORM实现关系型数据库关联 | cmdragon's Blog
  • Tortoise-ORM与FastAPI集成:异步模型定义与实践 | cmdragon's Blog
  • 异步编程与Tortoise-ORM框架 | cmdragon's Blog
  • FastAPI数据库集成与事务管理 | cmdragon's Blog
  • FastAPI与SQLAlchemy数据库集成 | cmdragon's Blog
  • FastAPI与SQLAlchemy数据库集成与CRUD操作 | cmdragon's Blog
  • FastAPI与SQLAlchemy同步数据库集成 | cmdragon's Blog
  • SQLAlchemy 核心概念与同步引擎配置详解 | cmdragon's Blog
  • FastAPI依赖注入性能优化策略 | cmdragon's Blog
  • FastAPI安全认证中的依赖组合 | cmdragon's Blog
  • XML Sitemap


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册