一、Celery定时任务
Celery Beat 介绍
Celery Beat 是 Celery 框架的一个内置组件,专门用于定时任务调度。它可以按照预设的时间规则(如固定间隔、特定时间点、CRON 表达式等)自动触发 Celery 任务,广泛应用于需要周期性执行的场景(如定时数据备份、日志清理、报表生成等)。
工作原理
- Beat 进程:独立运行的调度进程,负责解析定时规则并生成任务消息。
- 任务发送:当到达预设时间,Beat 进程将任务发送到 Celery 的消息队列(如 Redis)。
- 任务执行:Celery Worker 进程从队列中获取任务并执行。
简单来说,Celery Beat 是 “定时发令枪”,而 Worker 是 “执行者”。
Celery Beat 配置持久化
默认情况下,任务配置存储在内存中,重启后会丢失。需要通过后端存储(如数据库)实现持久化,确保任务配置不丢失。
项目名称说明django-celery-beat通过数据库实现任务配置持久化django-celery-results通过数据库实现任务结果持久化django-celery只支持Celery 3.X版本(不推荐)二、Celery Beat与Django集成
安装配置
安装- pip install django-celery-beat django-celery-results
复制代码 在Django项目settings.py中添加- INSTALLED_APPS = [
- ...
- 'django_celery_beat',
- 'django_celery_results'
- ]
- ### Celery 配置
- CELERY_BROKER_URL = f"{REDIS_URL}/{REDIS_DB}" # 使用Redis作为消息代理
- CELERY_RESULT_BACKEND = "django-db" # 使用数据库存储结果
- CELERY_BEAT_SCHEDULER = (
- "django_celery_beat.schedulers:DatabaseScheduler" # 使用数据库保存定时任务
- )
- CELERY_TIMEZONE = "Asia/Shanghai"
- CELERY_ENABLE_UTC = True
- CELERY_RESULT_EXTENDED = True # 启用后才会记录 task_name、date_started 等字段
- CELERY_TASK_TRACK_STARTED = True # 记录任务开始时间
复制代码 定义 Celery 实例:创建文件mysite\mysite\celery.py- """定义和配置 Celery 实例"""
- import os
- from celery import Celery
- from django.conf import settings
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
- # 创建 Celery 实例
- app = Celery("mysite")
- # 加载配置文件中的 Celery 配置
- app.config_from_object("django.conf:settings", namespace="CELERY")
- # 自动发现并加载任务
- app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)
复制代码 配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py- """Django 启动时加载Celery实例"""
- from .celery import app as celery_app
- __all__ = ("celery_app",)
复制代码 数据库迁移
执行数据库迁移,创建相关数据库表。其中:
- django_celery_beat_periodictask:用于存储任务名称、任务路径、任务参数等元数据。
- django_celery_beat_crontabschedule:用于存储CRON表达式。
- # 在Django项目根目录(包括manage.py的目录)执行
- python manage.py migrate django_celery_beat
- python manage.py migrate django_celery_results
复制代码
三、Celery Beat项目实战
定义Celery任务
发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构- - myapp_system/
- - tasks.py
- - models.py
- - myapp_infra/
- - tasks.py
- - models.py
复制代码 定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务- """定义 Celery 任务"""
- from time import sleep
- from celery import shared_task
- from django.utils import timezone
- @shared_task
- def send_daily_report():
- # 示例:发送日报
- print(f"开始发送日报,现在时间:{timezone.now()}")
- sleep(30)
- print("发送成功")
- return "发送成功"
- @shared_task
- def cleanup_expired_data():
- # 示例:清理过期数据
- print("清理过期数据")
- sleep(15)
- print("清理完成")
- return "清理完成"
复制代码 通过视图集动态管理定时任务
下面是通过 DRF 视图集,动态管理定时任务示例,实现对定时任务的增删改查、手动触发、开启暂停等操作
- 定义视图:myapp_infra\job\views.py
- import json
- from celery import current_app
- from django_celery_beat.models import PeriodicTask
- from drf_spectacular.utils import extend_schema
- from rest_framework.decorators import action
- from rest_framework.generics import get_object_or_404
- from mars_framework.viewsets.base import CustomModelViewSetNoSimple
- from mars_framework.permissions.base import HasPermission
- from mars_framework.response.base import CommonResponse
- from .serializers import JobSaveSerializer, JobSerializer
- from .filters import JobFilter
- from .services import infra_job_service
- @extend_schema(tags=["管理后台-infra-定时任务"])
- class JobViewSet(CustomModelViewSetNoSimple):
- queryset = PeriodicTask.objects.all()
- serializer_class = JobSerializer
- filterset_class = JobFilter
- action_serializers = {
- "create": JobSaveSerializer,
- "update": JobSaveSerializer,
- }
- action_permissions = {
- "create": [HasPermission("infra:job:create")],
- "destroy": [HasPermission("infra:job:delete")], # TODO 是否需要删除对应shedule
- "update": [HasPermission("infra:job:update")],
- "retrieve": [HasPermission("infra:job:query")],
- "list": [HasPermission("infra:job:query")],
- "export": [HasPermission("infra:job:export")],
- "update_status": [HasPermission("infra:job:update")],
- "trigger": [HasPermission("infra:job:trigger")],
- "sync": [HasPermission("infra:job:create")],
- "get_next_times": [HasPermission("infra:job:query")],
- }
- action_querysets = {
- # 排除name=celery.backend_cleanup
- "list": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),
- "export": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),
- }
- export_name = "定时任务列表"
- export_fields_labels = {
- "id": "任务编号",
- "name": "任务名称",
- "task": "处理器名字",
- "kwargs": "处理器参数",
- "cron_expression": "CRON表达式",
- "status": "任务状态",
- }
- export_data_map = {
- "status": {1: "开启", 2: "暂停"},
- }
- @extend_schema(summary="新增")
- def create(self, request, *args, **kwargs):
- """创建定时任务"""
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- # CRON表达式
- cron_expression = serializer.validated_data.pop("cron_expression")
- schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)
- # 创建任务
- task_data = {
- "name": serializer.validated_data.get("name"),
- "task": serializer.validated_data.get("task"),
- "kwargs": serializer.validated_data.get("kwargs"),
- "crontab": schedule,
- "enabled": False, # 默认禁用
- }
- PeriodicTask.objects.create(**task_data)
- return CommonResponse.success()
- @extend_schema(summary="更新")
- def update(self, request, *args, **kwargs):
- """更新定时任务"""
- instance = self.get_object()
- serializer = self.get_serializer(instance, data=request.data)
- serializer.is_valid(raise_exception=True)
- # 任务CRON表达式
- cron_expression = serializer.validated_data.pop("cron_expression")
- schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)
- # 更新任务
- task_data = {
- "name": serializer.validated_data.get("name"),
- "task": serializer.validated_data.get("task"),
- "kwargs": serializer.validated_data.get("kwargs"),
- "crontab": schedule,
- }
- PeriodicTask.objects.filter(id=instance.id).update(**task_data)
- return CommonResponse.success()
- @extend_schema(summary="触发定时任务")
- @action(
- methods=["put"],
- detail=True,
- url_path="trigger",
- )
- def trigger(self, request, *args, **kwargs):
- """触发定时任务"""
- instance = self.get_object()
- # 获取任务函数并手动触发
- task_name = instance.task # 任务路径如 "myapp_infra.tasks.send_daily_report"
- task_kwargs = json.loads(instance.kwargs or "{}")
- try:
- # 动态加载任务函数
- task = current_app.tasks[task_name]
- task.delay(**task_kwargs)
- return CommonResponse.success()
- except KeyError:
- return CommonResponse.error(
- code=121101,
- msg=f"找不到 {task_name} 任务,或该任务未注册",
- )
- except Exception as e:
- return CommonResponse.error(
- code=121102,
- msg=f"触发任务 {task_name} 失败,错误信息:{e}",
- )
- @extend_schema(summary="更新定时任务状态")
- @action(
- methods=["put"],
- detail=True,
- url_path="status",
- )
- def update_status(self, request, *args, **kwargs):
- """更新定时任务状态"""
- status = request.query_params.get("status")
- if status is None or status not in ["1", "2"]: # 1:开启 2:暂停
- return CommonResponse.error(code=121104, msg="任务状态值错误")
- instance = get_object_or_404(PeriodicTask, pk=kwargs.get("pk"))
- instance.enabled = status == "1"
- instance.save()
- return CommonResponse.success()
- @extend_schema(summary="获取定时任务的下 n 次执行时间")
- @action(
- methods=["get"],
- detail=True,
- url_path="next-times",
- )
- def get_next_times(self, request, *args, **kwargs):
- """获取定时任务的下 n 次执行时间"""
- count = int(request.query_params.get("count", 5))
- task = self.get_object()
- # 生成CORN 表达式
- crontab = task.crontab
- cron_expression = f"{crontab.minute} {crontab.hour} {crontab.day_of_month} {crontab.month_of_year} {crontab.day_of_week}"
- try:
- data = infra_job_service.get_next_times(cron_expression, count)
- except Exception as e:
- return CommonResponse.error(code=121102, msg=str(e))
- return CommonResponse.success(data=data)
复制代码- from .job.views import JobViewSet
- # 管理后台 - 定时任务
- router.register(r"job", JobViewSet, basename="job")
复制代码 启动Celery Beat
- 启动Celery Worker和Celery Beat调度器
- # 在项目目录(与manage.py同级),启动Celery Worker
- celery -A mysite worker -l info -P solo
- # 新建另一个终端窗口,在项目目录(与manage.py同级),启动Celery Beat
- celery -A mysite beat -l info -S django_celery_beat.schedulers:DatabaseScheduler
- # 新建另一个终端窗口,在项目目录(与manage.py同级),启动Django
- python manage.py runserver
复制代码 四、实战效果
通过上面定义的DRF视图集API,配合 Vue3 前端界面实现效果
- 定时任务的修改功能
- 处理器名字:填写定义任务的全路径名称
- CRON表达式:填写标准的CRON表达式
- 定时任务执行结果查询功能:能看到定时任务的执行时间、状态、返回结果等信息
点击查看完整代码
您正在阅读的是《Django从入门到实战》专栏!关注不迷路~
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |