配置celery
1. 安装以下环境
pip install celery pip install redis pip install eventlet # celery 4.0+版本以后不支持在windows运行,还需额外安装eventlet库
本文环境为:python3.9.4+Django4.2.11+celery5.3.6+redis5.0.3
2. 配置setting.py文件
在setting.py文件中加入以下代码
# 设置redis消息队列 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/10' # celery内容等消息的格式设置,默认json CELERY_ACCEPT_CONTENT = ['application/json', ] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' # 时间格式化为中国时间 CELERY_TIMEZONE = 'Asia/Shanghai' # 是否使用UTC时间 CELERY_ENABLE_UTC = False # 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。 # 设为0,存储结果永不过期 CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 任务限流 CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '2/s'}} # Worker并发数量,一般默认CPU核数,可以不设置 CELERY_WORKER_CONCURRENCY = 2
3. 在setting.py的同级目录新建一个celery_work.py文件,加入以下代码
import os from celery import Celery from celery.schedules import crontab from datetime import timedelta # 设置环境变量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings') # 实例化,需要改成自己的项目名称 app = Celery('project_name') # namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置 # 但所有Celery配置项必须以CELERY开头,防止冲突 app.config_from_object('django.conf:settings', namespace='CELERY') # 自动从Django的已注册app中发现任务 app.autodiscover_tasks()
4.在setting.py所在目录中的__init__文件中加入以下代码
from .celery_work import app as celery_app __all__ = ('celery_app',)
实现异步任务
1. 创建任务函数
(1)新建一个tasks模块
(2)在tasks模块下创建tasks.py文件,并编写任务函数代码:
from celery import shared_task from time import sleep @shared_task def test_celery_task1(): print("test_celery_task1") @shared_task def test_celery_task2(x, y): sleep(10) print(f"test_celery_task2---->{x}, {y}")
(3)新建一个app,添加url, 在view.py文件中调用任务函数
from rest_framework import status from rest_framework.views import APIView from rest_framework.response import Response from tasks.tasks import test_celery_task1, test_celery_task2 class TestView(APIView): def get(self, request): test_celery_task2.apply_async(args=[111, 222]) test_celery_task1.apply_async(args=[]) return Response({'message': 'test success'}, status=status.HTTP_200_OK)
现在可在终端输入以下命令启动worker:
celery -A celery_study worker -l info -P eventlet # celery_study为项目名称
出现以下内容就是配置成功
运行项目后可在postman测试,得到以下结果:
实现延时任务
1.通过直接设置执行时间
eta参数为指定执行时间
class TestView(APIView): def get(self, request): # test_celery_task2.apply_async(args=[111, 222]) ctime = datetime.now() # 默认使用utc时间 utc_time = datetime.utcfromtimestamp(ctime.timestamp()) task_delay = timedelta(seconds=10) # 定义时间间隔 task_time = utc_time + task_delay print(f"任务时间:{task_time}") test_time_task.apply_async(args=[ctime], eta=task_time) # 10秒后执行 # test_celery_task1.apply_async(args=[]) return Response({'message': 'test success'}, status=status.HTTP_200_OK)
2.通过设置延时时间
countdown参数为延时时间:
class TestView(APIView): def get(self, request): # test_celery_task2.apply_async(args=[111, 222]) ctime = datetime.now() test_time_task.apply_async(args=[ctime], countdown=10) # 10秒后执行 # test_celery_task1.apply_async(args=[]) return Response({'message': 'test success'}, status=status.HTTP_200_OK)
测试结果如下,执行时间延迟了10秒
实现周期定时任务
1.定义任务函数
@shared_task def test_scheduled_task(x, y): print(f'10秒执行一次---参数:{x},{y}')
2. 在celery_work.py文件中加入以下代码
# 导入库 from datetime import timedelta # 设置定时任务 app.conf.beat_schedule = { 'scheduled_task': { 'task': 'celery_app.tasks.scheduled_task', # 任务函数 'schedule': timedelta(seconds=10), # 每10秒钟执行一次 'args': () # 任务函数的参数 }, }
还可使用crontab定义周期:
app.conf.beat_schedule = { 'scheduled_task': { 'task': 'celery_app.tasks.scheduled_task', 'schedule': timedelta(seconds=10), # 每10秒钟执行一次 'args': () }, 'scheduled_task2': { 'task': 'tasks.tasks.test_celery_task', 'schedule': crontab(minute='*/1'), # 每1分钟执行一次 'args': (111, 222) # 参数 }, }
开两个终端,分别执行以下两条命令即可:
# celery_study为项目名称 celery -A celery_study worker -l info -P eventlet # 启动worker命令 celery -A celery_study beat # 启动定时调度器命令
文章版权声明:除非注明,否则均为VPS857原创文章,转载或复制请以超链接形式并注明出处。
还没有评论,来说两句吧...