找回密码
 立即注册
首页 业界区 业界 Django 实战:Celery 异步任务从环境搭建到调用全掌握 ...

Django 实战:Celery 异步任务从环境搭建到调用全掌握

押疙 2025-7-14 06:47:25
一、Celery入门

介绍

Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery是实现异步任务、定时任务的一种工具。
Celery 的核心功能

  • 异步任务处理:将耗时的任务异步执行,不阻塞主程序,从而提高系统的响应速度和扩展性。例如邮件发送、消息推送等。
  • 定时任务调度:可以按照预设的时间间隔或特定时间点执行任务,例如定时清理日志、定时统计数据等。
  • 分布式任务执行:支持在多台服务器上运行 worker 进程,扩展到分布式环境中。
  • 任务状态跟踪和结果存储:可以跟踪任务的执行状态,并将任务的执行结果存储。
Celery 的架构

  • 消息中间件 (Broker):负责接收任务生产者发送的消息并将任务存入队列。常用的消息中间件有 Redis 和 RabbitMQ
  • 任务执行单元 (Worker):执行任务的实际工作进程,会从消息队列中取出任务并执行  。
  • 任务结果存储 (Backend):用于存储任务执行结果,可以是 Redis、RabbitMQ 或数据库  。
  • 任务调度器 (Beat):用于调度定时任务,会周期性地将到期需要执行的任务发送给消息队列  。
Celery 的工作流程

  • 任务生产者将任务发送到消息队列。
  • 消息队列存储任务,直到任务消费者获取它们。
  • 任务消费者从消息队列中获取任务,并在本地执行。
  • 执行完成后,任务结果存储到结果存储后端。
  • 任务生产者可以通过 AsyncResult 查询任务的状态和结果。
生产环境建议

  • Windows 平台上安装Celery,只能用于开发环境或测试环境。生产环境,建议使用 Linux 平台。
安装

安装Redis 作为消息中间件(过程略)
安装 Celery 和 Redis客户端
  1. pip install redis
  2. pip install celery
复制代码
二、Celery与Django集成实战

配置Celery实例

Django项目结构示例
  1. - mysite/
  2.   - manage.py
  3.   - mysite/
  4.     - __init__.py
  5.     - settings.py
  6.     - urls.py
复制代码
定义 Celery 实例:创建文件mysite\mysite\celery.py
  1. """定义和配置 Celery 实例"""
  2. import os
  3. from celery import Celery
  4. from django.conf import settings
  5. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
  6. # 创建 Celery 实例
  7. app = Celery("mysite")
  8. # 加载配置文件中的 Celery 配置
  9. app.config_from_object("django.conf:settings", namespace="CELERY")
  10. # 自动发现并加载任务
  11. app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)
复制代码
配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py
  1. """Django 启动时加载Celery实例"""
  2. from .celery import app as celery_app
  3. __all__ = ("celery_app",)
复制代码
配置Celery:修改Django项目文件settings.py
  1. ### Celery配置
  2. CELERY_RESULT_BACKEND = "redis://localhost:6379/2"
  3. CELERY_BROKER_URL = "redis://localhost:6379/3"
  4. CELERY_TIMEZONE = "Asia/Shanghai"
  5. CELERY_ENABLE_UTC = True
  6. CELERY_RESULT_EXTENDED = True  # 启用后才会记录 task_name、date_started 等字段
  7. CELERY_TASK_TRACK_STARTED = True  # 记录任务开始时间
复制代码
定义任务

发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构
  1. - myapp_system/
  2.     - tasks.py
  3.     - models.py
  4. - myapp_infra/
  5.     - tasks.py
  6.     - models.py
复制代码
定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务
  1. """定义 Celery 任务"""
  2. from celery import shared_task
  3. from time import sleep
  4. @shared_task
  5. def send_email_task(subject, message, recipient_list):
  6.     """
  7.     发送电子邮件的任务
  8.     """
  9.     print("发送邮件任务开始执行...")
  10.     sleep(3)
  11.     print(f"主题: {subject}")
  12.     print(f"内容: {message}")
  13.     print(f"收件人: {recipient_list}")
  14.     print("#" * 10, "\n")
复制代码
调用任务

调用任务:在视图或其他代码中,使用 .delay() 方法将任务发送到 Celery 队列中。
  1. from rest_framework.views import APIView
  2. from rest_framework.response import Response
  3. from .tasks import send_email_task
  4. class SendEmailView(APIView):
  5.     def get(self, request):
  6.         subject = "Hello from Celery"
  7.         message = "This is a test email sent using Celery."
  8.         recipient_list = ["user@example.com"]
  9.         # 异步发送邮件
  10.         send_email_task.delay(subject, message, recipient_list)
  11.         return Response({"message": "Email sent successfully"})
复制代码
启动

启动Celery工作进程:在开发和测试环境中,使用下面命令启动Celery工作进程
  1. # 进入Django项目目录(包含manage.py的目录)
  2. celery -A mysite worker -l INFO -P solo
复制代码
启动Django项目
  1. # 进入Django项目目录(包含manage.py的目录)
  2. python manage.py runserver
复制代码
实战效果

当访问SendEmailView 视图,会看到日志中显示任务已触发和执行。
  1. [2025-04-14 09:11:53,151: INFO/MainProcess] Task mybooks.tasks.send_email_task[c37a8725-aa59-43b4-9949-74a753c019a2] received
  2. [2025-04-14 09:11:55,185: WARNING/MainProcess] 发送邮件任务开始执行...
  3. [2025-04-14 09:11:58,186: WARNING/MainProcess] 主题: Hello from Celery
  4. [2025-04-14 09:11:58,186: WARNING/MainProcess] 内容: This is a test email sent using Celery.
  5. [2025-04-14 09:11:58,186: WARNING/MainProcess] 收件人: ['user@example.com']
  6. [2025-04-14 09:11:58,187: WARNING/MainProcess] ##########
  7. [2025-04-14 09:11:58,187: WARNING/MainProcess]
复制代码
三、delay_on_commit

使用场景

delay_on_commit 是 Celery 提供的一个用于确保任务在数据库事务提交后执行的机制。考虑以下场景:

  • send_email任务可能会在视图将事务提交到数据库之前启动,因此任务可能无法找到用户。
  • 如果事务回滚,任务仍然会执行,处理一个不存在或无效的订单。
  1. # views.py
  2. def create_user(request):
  3.     user = User.objects.create(username=request.POST['username'])
  4.     send_email.delay(user.pk)
  5.     return HttpResponse('User created')
  6. # task.py
  7. @shared_task
  8. def send_email(user_pk):
  9.     user = User.objects.get(pk=user_pk)
  10.     # send email ...
复制代码
使用方法

delay_on_commit 会将任务调度延迟到当前事务成功提交后执行。如果事务回滚,任务也不会被调度。
  1. # views.py
  2. from django.db import transaction
  3. from .tasks import send_email_task
  4. @transaction.atomic
  5. def create_user(request):
  6.     user = User.objects.create(username="zhangsan")  # 数据库操作
  7.     # 正确:事务提交后才会触发任务
  8.     send_email_task.delay_on_commit(user.id)
  9.     # 如果此处抛出异常导致事务回滚,任务不会被执行
  10.     return HttpResponse("OK")
复制代码
点击查看完整代码
您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册