找回密码
 立即注册
首页 业界区 业界 taskiq异步分布式任务管理器 适用fastapi

taskiq异步分布式任务管理器 适用fastapi

溶绚 2025-6-21 14:41:48
taskiq 异步分布式任务管理器

https://taskiq-python.github.io/
将 taskiq 视为 asyncio celery 实现。它使用几乎相同的模式,但它更加现代和灵活。
它不是任何其他任务管理器的直接替代品。它具有不同的库生态系统和一组不同的功能。此外,它不适用于同步项目。将无法同步发送任务。
1 安装taskiq
  1. pip install taskiq
复制代码
2 使用

我这里使用的是fastapi+rabbitmq,所以需要多装一个taskiq-aio-pika包来使用
  1. pip install taskiq-aio-pika
复制代码
项目路径如下:
1.png

broker.py:
  1. from taskiq_aio_pika import AioPikaBroker
  2. from app.core.config import settings
  3. broker = AioPikaBroker(url="amqp://guest:guest@localhost:5672//") # 此处替换broker_url
复制代码
这里必须要定义一个worker.py,显式的导入你的tasks和broker。不然会报如下错误:
  1. task "xxxx" is not found. Maybe you forgot to import it?
复制代码
worker.py:
  1. from app.tasks.broker import broker
  2. import app.tasks.notify_tasks
复制代码
xxx_tasks.py:
  1. from app.tasks.broker import broker
  2. @broker.task
  3. async def test_tasks():
  4.         # 现在就可以支持async await使用 例如:
  5.         async with httpx.AsyncClient() as client:
  6.                 await client.post("jd.com", json=body)
  7.         return
复制代码
3 cli启动命令:
  1. taskiq worker app.tasks.worker:broker
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册