初识Celery:
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,可将一些耗时的任务放入该消息队列中处理,一些定时任务也可以放入队列中自动执行,如定期去统计日志,数据备份,或者其他的统计任务。
Celery基本工作流程:
Celery的安装与配置
pip install celerypip install celery-with-redispip install django-celeryapt install redis-server
django中的配置:
ALLOWED_HOSTS = ['*']INSTALLED_APPS = ( ... 'djcelery', }import djcelerydjcelery.setup_loader()BROKER_URL='redis://localhost:6379/1' #任务队列存放的位置CELERY_CONCURRENCY=2 #设置worker的并发数量CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' #结果存放的位置
在settings的同级目录下新建一个celery.py的文件
from __future__ import absolute_import #绝对路径导入from celery import Celery from django.conf import settingsimport os#设置系统的环境配置用的是Django的os.environ.setdefault('DJANGO_SETTING_MODULE','day9_ex.settings')#实例化celeryapp = Celery('mycelery')#设置时区app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'#指定celery的配置来源 用的是项目的配置文件settings.pyapp.config_from_object('django.conf:settings')#让celery 自动去发现我们的任务(task)app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)
在settings.py的同级目录的__init__.py文件中导入
#要写在第一行from __future__ import absolute_import #导入工程目录下celery中的app并起别名from day9_ex.celery import app as celery_app
Celery的使用
在需要使用异步任务的app目录下新建tasks.py,将要执行的异步任务放在这里面
from celery import taskimport time@taskdef test(n): for i in range(n): print(i) time.sleep(3)
在views视图函数中进行调用
from django.http import HttpResponsefrom django.shortcuts import renderfrom app.tasks import testdef test_celery(req): test.delay(6) return HttpResponse('ok')
开始执行之前要先进行数据库表的迁移
然后启动worker,命令为python manage.py celery worker --loglevel=info (或者celery -A 工程名 worker -I info) (PS:日志级别可以不写)
然后在浏览器输入url启动项目
from datetime import timedeltaCELERYBEAT_SCHEDULE = { 'schedule-test': { 'task': 'app.tasks.test2', #指定要执行的函数 'schedule': timedelta(seconds=6), # 执行的计划时间,每6秒执行一次,可自定义 'args': () #参数 },}
还可使用下面这种配置
from celery.schedules import crontabCELERYBEAT_SCHEDULE = { 'schedule-test': { 'task': 'app.tasks.test2', #指定要执行的函数 'schedule': crontab(minute=48,hour=11), # 指定具体执行的计划时间 'args': () #参数 },}
crontab的参数设置如上图,也可以一个参数设置多个值,如day_of_week='1,2',表示每周一周二。
具体使用同异步处理。
先启动python manage.py celery worker,再启动python manage.py celery beat