找回密码
 立即注册
首页 业界区 业界 Django+Celery 进阶:动态定时任务的添加、修改与智能调 ...

Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

注思 8 小时前
一、Celery定时任务

Celery Beat 介绍

Celery Beat 是 Celery 框架的一个内置组件,专门用于定时任务调度。它可以按照预设的时间规则(如固定间隔、特定时间点、CRON 表达式等)自动触发 Celery 任务,广泛应用于需要周期性执行的场景(如定时数据备份、日志清理、报表生成等)。
工作原理

  • Beat 进程:独立运行的调度进程,负责解析定时规则并生成任务消息。
  • 任务发送:当到达预设时间,Beat 进程将任务发送到 Celery 的消息队列(如 Redis)。
  • 任务执行:Celery Worker 进程从队列中获取任务并执行。
简单来说,Celery Beat 是 “定时发令枪”,而 Worker 是 “执行者”。
Celery Beat 配置持久化

默认情况下,任务配置存储在内存中,重启后会丢失。需要通过后端存储(如数据库)实现持久化,确保任务配置不丢失。
项目名称说明django-celery-beat通过数据库实现任务配置持久化django-celery-results通过数据库实现任务结果持久化django-celery只支持Celery 3.X版本(不推荐)二、Celery Beat与Django集成

安装配置

安装
  1. pip install django-celery-beat django-celery-results
复制代码
在Django项目settings.py中添加
  1. INSTALLED_APPS = [
  2.     ...
  3.     'django_celery_beat',
  4.     'django_celery_results'
  5. ]
  6. ### Celery 配置
  7. CELERY_BROKER_URL = f"{REDIS_URL}/{REDIS_DB}"  # 使用Redis作为消息代理
  8. CELERY_RESULT_BACKEND = "django-db"  # 使用数据库存储结果
  9. CELERY_BEAT_SCHEDULER = (
  10.     "django_celery_beat.schedulers:DatabaseScheduler"  # 使用数据库保存定时任务
  11. )
  12. CELERY_TIMEZONE = "Asia/Shanghai"
  13. CELERY_ENABLE_UTC = True
  14. CELERY_RESULT_EXTENDED = True  # 启用后才会记录 task_name、date_started 等字段
  15. CELERY_TASK_TRACK_STARTED = True  # 记录任务开始时间
复制代码
定义 Celery 实例:创建文件mysite\mysite\celery.py
  1. """定义和配置 Celery 实例"""
  2. import os
  3. from celery import Celery
  4. from django.conf import settings
  5. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
  6. # 创建 Celery 实例
  7. app = Celery("mysite")
  8. # 加载配置文件中的 Celery 配置
  9. app.config_from_object("django.conf:settings", namespace="CELERY")
  10. # 自动发现并加载任务
  11. app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)
复制代码
配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py
  1. """Django 启动时加载Celery实例"""
  2. from .celery import app as celery_app
  3. __all__ = ("celery_app",)
复制代码
数据库迁移

执行数据库迁移,创建相关数据库表。其中:

  • django_celery_beat_periodictask:用于存储任务名称、任务路径、任务参数等元数据。
  • django_celery_beat_crontabschedule:用于存储CRON表达式。
  1. # 在Django项目根目录(包括manage.py的目录)执行
  2. python manage.py migrate django_celery_beat
  3. python manage.py migrate django_celery_results
复制代码
1.png

三、Celery Beat项目实战

定义Celery任务

发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构
  1. - myapp_system/
  2.     - tasks.py
  3.     - models.py
  4. - myapp_infra/
  5.     - tasks.py
  6.     - models.py
复制代码
定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务
  1. """定义 Celery 任务"""
  2. from time import sleep
  3. from celery import shared_task
  4. from django.utils import timezone
  5. @shared_task
  6. def send_daily_report():
  7.     # 示例:发送日报
  8.     print(f"开始发送日报,现在时间:{timezone.now()}")
  9.     sleep(30)
  10.     print("发送成功")
  11.     return "发送成功"
  12. @shared_task
  13. def cleanup_expired_data():
  14.     # 示例:清理过期数据
  15.     print("清理过期数据")
  16.     sleep(15)
  17.     print("清理完成")
  18.     return "清理完成"
复制代码
通过视图集动态管理定时任务

下面是通过 DRF 视图集,动态管理定时任务示例,实现对定时任务的增删改查、手动触发、开启暂停等操作

  • 定义视图:myapp_infra\job\views.py
  1. import json
  2. from celery import current_app
  3. from django_celery_beat.models import PeriodicTask
  4. from drf_spectacular.utils import extend_schema
  5. from rest_framework.decorators import action
  6. from rest_framework.generics import get_object_or_404
  7. from mars_framework.viewsets.base import CustomModelViewSetNoSimple
  8. from mars_framework.permissions.base import HasPermission
  9. from mars_framework.response.base import CommonResponse
  10. from .serializers import JobSaveSerializer, JobSerializer
  11. from .filters import JobFilter
  12. from .services import infra_job_service
  13. @extend_schema(tags=["管理后台-infra-定时任务"])
  14. class JobViewSet(CustomModelViewSetNoSimple):
  15.     queryset = PeriodicTask.objects.all()
  16.     serializer_class = JobSerializer
  17.     filterset_class = JobFilter
  18.     action_serializers = {
  19.         "create": JobSaveSerializer,
  20.         "update": JobSaveSerializer,
  21.     }
  22.     action_permissions = {
  23.         "create": [HasPermission("infra:job:create")],
  24.         "destroy": [HasPermission("infra:job:delete")],  # TODO 是否需要删除对应shedule
  25.         "update": [HasPermission("infra:job:update")],
  26.         "retrieve": [HasPermission("infra:job:query")],
  27.         "list": [HasPermission("infra:job:query")],
  28.         "export": [HasPermission("infra:job:export")],
  29.         "update_status": [HasPermission("infra:job:update")],
  30.         "trigger": [HasPermission("infra:job:trigger")],
  31.         "sync": [HasPermission("infra:job:create")],
  32.         "get_next_times": [HasPermission("infra:job:query")],
  33.     }
  34.     action_querysets = {
  35.         # 排除name=celery.backend_cleanup
  36.         "list": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),
  37.         "export": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),
  38.     }
  39.     export_name = "定时任务列表"
  40.     export_fields_labels = {
  41.         "id": "任务编号",
  42.         "name": "任务名称",
  43.         "task": "处理器名字",
  44.         "kwargs": "处理器参数",
  45.         "cron_expression": "CRON表达式",
  46.         "status": "任务状态",
  47.     }
  48.     export_data_map = {
  49.         "status": {1: "开启", 2: "暂停"},
  50.     }
  51.     @extend_schema(summary="新增")
  52.     def create(self, request, *args, **kwargs):
  53.         """创建定时任务"""
  54.         serializer = self.get_serializer(data=request.data)
  55.         serializer.is_valid(raise_exception=True)
  56.         # CRON表达式
  57.         cron_expression = serializer.validated_data.pop("cron_expression")
  58.         schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)
  59.         # 创建任务
  60.         task_data = {
  61.             "name": serializer.validated_data.get("name"),
  62.             "task": serializer.validated_data.get("task"),
  63.             "kwargs": serializer.validated_data.get("kwargs"),
  64.             "crontab": schedule,
  65.             "enabled": False,  # 默认禁用
  66.         }
  67.         PeriodicTask.objects.create(**task_data)
  68.         return CommonResponse.success()
  69.     @extend_schema(summary="更新")
  70.     def update(self, request, *args, **kwargs):
  71.         """更新定时任务"""
  72.         instance = self.get_object()
  73.         serializer = self.get_serializer(instance, data=request.data)
  74.         serializer.is_valid(raise_exception=True)
  75.         # 任务CRON表达式
  76.         cron_expression = serializer.validated_data.pop("cron_expression")
  77.         schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)
  78.         # 更新任务
  79.         task_data = {
  80.             "name": serializer.validated_data.get("name"),
  81.             "task": serializer.validated_data.get("task"),
  82.             "kwargs": serializer.validated_data.get("kwargs"),
  83.             "crontab": schedule,
  84.         }
  85.         PeriodicTask.objects.filter(id=instance.id).update(**task_data)
  86.         return CommonResponse.success()
  87.     @extend_schema(summary="触发定时任务")
  88.     @action(
  89.         methods=["put"],
  90.         detail=True,
  91.         url_path="trigger",
  92.     )
  93.     def trigger(self, request, *args, **kwargs):
  94.         """触发定时任务"""
  95.         instance = self.get_object()
  96.         # 获取任务函数并手动触发
  97.         task_name = instance.task  # 任务路径如 "myapp_infra.tasks.send_daily_report"
  98.         task_kwargs = json.loads(instance.kwargs or "{}")
  99.         try:
  100.             # 动态加载任务函数
  101.             task = current_app.tasks[task_name]
  102.             task.delay(**task_kwargs)
  103.             return CommonResponse.success()
  104.         except KeyError:
  105.             return CommonResponse.error(
  106.                 code=121101,
  107.                 msg=f"找不到 {task_name}  任务,或该任务未注册",
  108.             )
  109.         except Exception as e:
  110.             return CommonResponse.error(
  111.                 code=121102,
  112.                 msg=f"触发任务 {task_name} 失败,错误信息:{e}",
  113.             )
  114.     @extend_schema(summary="更新定时任务状态")
  115.     @action(
  116.         methods=["put"],
  117.         detail=True,
  118.         url_path="status",
  119.     )
  120.     def update_status(self, request, *args, **kwargs):
  121.         """更新定时任务状态"""
  122.         status = request.query_params.get("status")
  123.         if status is None or status not in ["1", "2"]:  # 1:开启 2:暂停
  124.             return CommonResponse.error(code=121104, msg="任务状态值错误")
  125.         instance = get_object_or_404(PeriodicTask, pk=kwargs.get("pk"))
  126.         instance.enabled = status == "1"
  127.         instance.save()
  128.         return CommonResponse.success()
  129.     @extend_schema(summary="获取定时任务的下 n 次执行时间")
  130.     @action(
  131.         methods=["get"],
  132.         detail=True,
  133.         url_path="next-times",
  134.     )
  135.     def get_next_times(self, request, *args, **kwargs):
  136.         """获取定时任务的下 n 次执行时间"""
  137.         count = int(request.query_params.get("count", 5))
  138.         task = self.get_object()
  139.         # 生成CORN 表达式
  140.         crontab = task.crontab
  141.         cron_expression = f"{crontab.minute} {crontab.hour} {crontab.day_of_month} {crontab.month_of_year} {crontab.day_of_week}"
  142.         try:
  143.             data = infra_job_service.get_next_times(cron_expression, count)
  144.         except Exception as e:
  145.             return CommonResponse.error(code=121102, msg=str(e))
  146.         return CommonResponse.success(data=data)
复制代码

  • 配置路由:myapp_infra\urls.py
  1. from .job.views import JobViewSet
  2. # 管理后台 - 定时任务
  3. router.register(r"job", JobViewSet, basename="job")
复制代码
启动Celery Beat


  • 启动Celery Worker和Celery Beat调度器
  1. # 在项目目录(与manage.py同级),启动Celery Worker
  2. celery -A mysite worker -l info -P solo
  3. # 新建另一个终端窗口,在项目目录(与manage.py同级),启动Celery Beat
  4. celery -A mysite beat -l info -S django_celery_beat.schedulers:DatabaseScheduler
  5. # 新建另一个终端窗口,在项目目录(与manage.py同级),启动Django
  6. python manage.py runserver
复制代码
四、实战效果

通过上面定义的DRF视图集API,配合 Vue3 前端界面实现效果

  • 定时任务的增、删、开启暂停功能
2.png


  • 定时任务的修改功能

    • 处理器名字:填写定义任务的全路径名称
    • CRON表达式:填写标准的CRON表达式

3.png


  • 定时任务执行结果查询功能:能看到定时任务的执行时间、状态、返回结果等信息
4.png

点击查看完整代码
您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册