找回密码
 立即注册
首页 业界区 安全 Django ORM性能优化

Django ORM性能优化

郦珠雨 8 小时前
Django 的数据库操作(ORM)虽然方便,但如果使用不当,很容易出现性能问题(比如查询缓慢、数据库压力大)。数据库优化的核心目标是:减少不必要的查询、减少数据传输量、让查询跑得更快。
1、N+1查询问题

当查询包含外键关联的数据时,如果循环获取关联对象,会产生 “1 次主查询 + N 次关联查询” 的低效操作(N 是主查询结果的数量)。
  1. # models.py
  2. class Author(models.Model):
  3.     name = models.CharField(max_length=100)
  4.     age = models.IntegerField()
  5. class Book(models.Model):
  6.     title = models.CharField(max_length=200)
  7.     author = models.ForeignKey(Author, on_delete=models.CASCADE)  # 外键关联作者
复制代码
1.1 问题
  1. # 1. 先查询所有书籍(1次查询)
  2. books = Book.objects.all()  # SQL: SELECT * FROM book;
  3. # 2. 循环获取每本书的作者(N次查询,N=书籍数量)
  4. for book in books:
  5.     print(book.author.name)  # 每次都会触发:SELECT * FROM author WHERE id=?;
复制代码
1.2 select_related

select_related预加载关联对象,select_related适用于外键、一对一关联,会通过JOIN语句一次性把关联数据查出来。
  1. # 1次查询搞定所有数据(主表+关联表JOIN)
  2. books = Book.objects.select_related('author').all()  # SQL: SELECT * FROM book JOIN author ON ...;
  3. # 循环获取作者时,不会再查数据库
  4. for book in books:
  5.     print(book.author.name)  # 直接从已加载的数据中获取
复制代码
1.3 prefetch_related

prefetch_related适用于多对多 、反向外键(反向查询)关系。它先执行一次主查询,再执行一次查询来获取所有关联对象,然后在 Python 层进行"连接",效率更高。
  1. class Category(models.Model):
  2.     name = models.CharField(max_length=50)
  3. class Book(models.Model):
  4.     # ... 其他字段
  5.     categories = models.ManyToManyField(Category)  # 多对多关联
复制代码
  1. # 用prefetch_related预加载多对多数据
  2. books = Book.objects.prefetch_related('categories').all() #         SELECT ... FROM table1; SELECT ... FROM table2 WHERE id IN (...)
  3. categories = Category.objects.prefetch_related('book_set').all()
  4. for book in books:
  5.     # 不会触发额外查询
  6.     print([c.name for c in book.categories.all()])
复制代码
2、只获取需要的字段

默认情况下,Book.objects.all()会查询表中所有字段,但很多时候我们只需要其中几个字段
2.1 使用 only() 和 defer()
  1. # 只获取需要的字段
  2. books = Book.objects.only('title', 'publication_date')  # 只获取标题和出版日期
  3. # 排除大字段
  4. books = Book.objects.defer('description')  # 不获取描述字段(假设是很大的文本字段)
复制代码
2.2 使用 values() 和 values_list()
  1. # 获取字典列表
  2. book_titles = Book.objects.values('title', 'author__name')  # 返回 [{'title': '...', 'author__name': '...'}]
  3. # 获取元组列表
  4. book_titles = Book.objects.values_list('title', flat=True)  # 返回 ['标题1', '标题2', ...]
复制代码
3、数据库索引

以下情况需要加索引

  • 频繁用filter()、exclude()过滤的字段(比如where author_id=1)
  • 频繁用order_by()排序的字段(比如order by publish_time)
  • 外键字段(Django 会自动加索引)、唯一约束字段(自动加索引)
  1. # models.py
  2. class Book(models.Model):
  3.     title = models.CharField(max_length=200, db_index=True)  # 为标题添加索引
  4.     author = models.ForeignKey(Author, on_delete=models.CASCADE)
  5.     publication_date = models.DateField()
  6.    
  7.     # 复合索引
  8.     class Meta:
  9.         indexes = [
  10.             models.Index(fields=['author', 'publication_date']),  # 复合索引
  11.         ]
复制代码
index_together
  1. class Customer(models.Model):
  2.     first_name = models.CharField(max_length=100, db_index=True)
  3.     last_name = models.CharField(max_length=100, db_index=True)
  4.     email = models.EmailField(unique=True)  # 唯一约束自动创建索引
  5.    
  6.     class Meta:
  7.         # 复合索引
  8.         index_together = [
  9.             ['first_name', 'last_name'],
  10.         ]
复制代码
4、批量操作代替循环操作

用bulk_create(批量创建)和bulk_update(批量更新)
4.1 使用 bulk_create() 一次性创建多个对象
  1. # 不好的做法
  2. books = []
  3. for i in range(1000):
  4.     book = Book(title=f"Book {i}", author=some_author)
  5.     book.save()  # 每次保存都执行一次INSERT
  6. # 好的做法
  7. books = [Book(title=f"Book {i}", author=some_author) for i in range(1000)]
  8. Book.objects.bulk_create(books)  # 一次性执行批量INSERT
复制代码
4.2 使用 bulk_update() 批量更新对象
  1. # 批量更新
  2. books = Book.objects.filter(publication_date__year=2020)
  3. for book in books:
  4.     book.price = book.price * 0.9  # 打9折
  5. Book.objects.bulk_update(books, ['price'])  # 一次性批量更新
  6. # 使用F表达式
  7. Book.objects.filter(publication_date__year=2020).update(price=F('price') * 0.8)
复制代码
5、使用连接池

在高并发场景下,为每个请求创建和销毁数据库连接开销很大。使用数据库连接池可以复用连接,显著提升性能。
安装django-db-connection-pool
  1. pip install django-db-connection-pool
复制代码
settings.py配置
  1. DATABASES = {
  2.     'default': {
  3.         'ENGINE': 'dj_db_conn_pool.backends.mysql',
  4.         'NAME': 'your_db',
  5.         'USER': 'your_user',
  6.         'PASSWORD': 'your_password',
  7.         'HOST': 'localhost',
  8.         'PORT': '3306',
  9.         'OPTIONS': {
  10.             'POOL_SIZE': 20,       # 连接池大小
  11.             'MAX_OVERFLOW': 10,    # 允许超过POOL_SIZE的最大连接数
  12.             'POOL_RECYCLE': 3600,  # 连接回收时间(秒)
  13.         }
  14.     }
  15. }
复制代码
6、使用聚合和注解

annotate和 aggregate是 Django ORM 中用于执行数据库聚合操作的两个强大工具,它们允许你在数据库层面进行计算,避免将大量数据拉到 Python 中进行处理,从而显著提升性能。
模型示例:
  1. from django.db import models
  2. class Author(models.Model):
  3.     name = models.CharField(max_length=100)
  4.     country = models.CharField(max_length=50)
  5. class Book(models.Model):
  6.     title = models.CharField(max_length=200)
  7.     author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name='books')
  8.     price = models.DecimalField(max_digits=6, decimal_places=2)
  9.     published_date = models.DateField()
  10.     rating = models.IntegerField(choices=[(i, i) for i in range(1, 6)])
复制代码
6.1 annotate()

annotate()为查询集中的每个对象添加计算字段(注解)。
6.1.1 基本用法:为作者添加书籍计数
  1. from django.db.models import Count
  2. # 获取所有作者,并为每个作者添加书籍数量字段
  3. authors = Author.objects.annotate(book_count=Count('books'))
  4. for author in authors:
  5.     print(f"{author.name} 写了 {author.book_count} 本书")
复制代码
6.1.2 多字段注解:计算平均价格和最高评分
  1. from django.db.models import Avg, Max
  2. # 为每个作者添加平均价格和最高评分字段
  3. authors = Author.objects.annotate(
  4.     avg_price=Avg('books__price'),
  5.     max_rating=Max('books__rating')
  6. )
  7. for author in authors:
  8.     print(f"{author.name}: 平均价格 ${author.avg_price:.2f}, 最高评分 {author.max_rating}")
复制代码
6.1.3 过滤后注解:计算特定年份的书籍数量
  1. from django.db.models import Count
  2. # 为每个作者添加2020年出版的书籍数量
  3. authors = Author.objects.annotate(
  4.     books_2020=Count('books', filter=models.Q(books__published_date__year=2020))
  5. )
  6. for author in authors:
  7.     print(f"{author.name} 在2020年出版了 {author.books_2020} 本书")
复制代码
6.1.4 链式注解:复杂计算
  1. from django.db.models import F, Count, Value
  2. # 计算每个作者的平均评分和总价值
  3. authors = Author.objects.annotate(
  4.     book_count=Count('books'),
  5.     total_value=Sum('books__price'),
  6. ).annotate(
  7.     avg_rating=Avg('books__rating'),
  8.     value_per_book=F('total_value') / F('book_count')
  9. )
  10. for author in authors:
  11.     print(f"{author.name}: 每本书平均价值 ${author.value_per_book:.2f}")
复制代码
6.2 aggregate()

aggregate()计算整个查询集的统计值,返回一个字典。
6.2.1 基本聚合:计算所有书籍的总价和平均价
  1. from django.db.models import Sum, Avg
  2. # 计算所有书籍的总价和平均价
  3. stats = Book.objects.aggregate(
  4.     total_price=Sum('price'),
  5.     average_price=Avg('price')
  6. )
  7. print(f"所有书籍总价: ${stats['total_price']}")
  8. print(f"平均价格: ${stats['average_price']:.2f}")
复制代码
6.2.2 多字段聚合:最高和最低评分
  1. from django.db.models import Max, Min
  2. # 获取最高和最低评分
  3. rating_stats = Book.objects.aggregate(
  4.     highest_rating=Max('rating'),
  5.     lowest_rating=Min('rating')
  6. )
  7. print(f"最高评分: {rating_stats['highest_rating']}")
  8. print(f"最低评分: {rating_stats['lowest_rating']}")
复制代码
6.2.3 过滤后聚合:特定作者书籍统计
  1. from django.db.models import Count, Avg
  2. # 统计某位作者的书籍
  3. author_stats = Book.objects.filter(
  4.     author__name="J.K. Rowling"
  5. ).aggregate(
  6.     book_count=Count('id'),
  7.     avg_rating=Avg('rating')
  8. )
  9. print(f"J.K. Rowling 写了 {author_stats['book_count']} 本书")
  10. print(f"平均评分: {author_stats['avg_rating']:.1f}")
复制代码
7、其他

7.1 用count()和exists()代替全量查询
  1. # 好:直接查数量(高效)
  2. total = Book.objects.count()
  3. # 差:先查所有数据再算长度(低效,尤其数据量大时)
  4. total = len(Book.objects.all())
复制代码
  1. # 好:存在即返回True(查到1条就停止)
  2. has_book = Book.objects.filter(title='Django入门').exists()
  3. # 差:查所有数据再判断(可能查很多条)
  4. has_book = len(Book.objects.filter(title='Django入门')) > 0
复制代码
7.2 适当使用原生SQL

如果 ORM 查询太复杂(比如多表复杂 JOIN),可以用原生 SQL:
  1. from django.db import connection
  2. def get_book_stats():
  3.     with connection.cursor() as cursor:
  4.         cursor.execute("""
  5.             SELECT author.name, COUNT(book.id)
  6.             FROM author
  7.             JOIN book ON author.id = book.author_id
  8.             GROUP BY author.id
  9.         """)
  10.         # 获取查询结果((作者名, 书籍数量), ...)
  11.         result = cursor.fetchall()
  12.     return result
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册