
celery文档
https://docs.celeryq.dev/en/stable/index.html
别人写的
https://blog.csdn.net/m0_63953077/article/details/128122734
Celery由三部分构成:
消息中间件(Broker):
官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ
任务执行单元(Worker):
任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心
结果存储(Backend):
官方提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等
异步任务 定时任务Async Task Celery Beat| ||发送任务 |发送任务| /消息中间件(Broker)___| ||监控Broker |监控Broker| |任务执行单元 任务执行单元Celery Worker Celery Worker| ||保存 |保存| <____|结果存储(Backend)
RabbitMQRabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的,可以用flowerRedisRedis 功能比较全,但是如果突然停止运行或断电会造成数据丢失,可以用flower
pip install celery
## win增加安装,celery4+pip install eventlet
celery4.x以下适用 redis2.10.6celery4.3以上适用redis3.2.0以上:pip install redis==2.10.6
## celery通过flower组件实现管理和监控功能pip install flower
配置官网
https://docs.celeryq.dev/en/latest/userguide/configuration.html
## Broker配置,使用Redis作为消息中间件BROKER_URL = 'redis://127.0.0.1:6379/0'## BACKEND配置,这里使用redisCELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'## 结果序列化方案CELERY_RESULT_SERIALIZER = 'json'## 任务结果过期时间,秒CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24## 时区配置CELERY_TIMEZONE='Asia/Shanghai'## 指定导入的任务模块,可以指定多个##CELERY_IMPORTS = (## 'other_dir.tasks',##)## 设置定时任务CELERY_BEAT_SCHEDULE = {'mul_every_30_seconds': {# 任务路径'task': 'celery_app.tasks.mul',# 每5秒执行一次'schedule': 5,'args': (4, 5)},'xsum_week1_20_20_00': {# 任务路径'task': 'celery_app.tasks.xsum',# 每周一20点20分执行'schedule': crontab(hour=20, minute=20, day_of_week=1),'args': ([1,2,3,4],),},}## task:任务函数## schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)## 不需要分钟的话,也一定要写上分钟(minute=0)## args:位置参数,列表或元组## kwargs:关键字参数,字典## options:可选参数,字典,任何 apply_async() 支持的参数## relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间
import osimport djangofrom celery import Celeryfrom django.conf import settings## 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错## celery_study 是当前项目名os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')django.setup()## 实例化一个celery类celery_app = Celery('celery_study')## 指定配置文件的位置celery_app.config_from_object('django.conf:settings')## 自动从settings的配置INSTALLED_APPS中的应用目录下加载 tasks.pycelery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
__init__.py
from .celery import celery_app__all__ = ['celery_app'] # 加入celery_app
from celery import shared_task@shared_taskdef add(x, y):return x + y@shared_taskdef mul(x, y):return x * y@shared_taskdef xsum(numbers):return sum(numbers)## 通过 base=MyHookTask 指定任务钩子## Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等## 在装饰器中加入参数 bind=True## 在task函数中的第一个参数设置为self## self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能@shared_task(base=MyHookTask, bind=True)def mul(self, x, y):try:logger.info('-mul' * 10)logger.info(f'{self.name}, id:{self.request.id}')raise Exceptionexcept Exception as e:# 出错每4秒尝试一次,总共尝试4次self.retry(exc=e, countdown=4, max_retries=4)return x * yclass MyHookTask(Task):# 成功后执行def on_success(self, retval, task_id, args, kwargs):logger.info(f'task id:{task_id} , arg:{args} , successful !')# 任务返回时候执行def after_return(self, retval, task_id, args, kwargs):logger.info(f'task id:{task_id} , arg:{args} , successful !')# 失败时候执行def on_failure(self, exc, task_id, args, kwargs, einfo):logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')# 任务重试时候执行def on_retry(self, exc, task_id, args, kwargs, einfo):logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')
在cmd窗口中,切换到项目根目录下,执行:
celery beat -A celery_study -l debugcelery worker -A celery_study -l infocelery worker -A celery_study -l info -P eventletcelery worker -A celery_study -l debug -P eventlet## 说明:beat: 表明这是一个定时推送单元worker: 表明这是一个任务执行单元-A celery_study:指定项目-l info:指定日志输出级别-P eventlet:支持windows## 更多celery命令的参数,可以输入:celery --help或celery worker --helpcelery.bin.worker用于启动celery工作实例的程序。celery工人命令(以前称为celeryd)-c, --concurrency处理队列的子进程的数目。默认值是系统上可用的cpu数量。-P, --pool池实现:prefork(默认)、eventlet、gevent、threads或solo。-n, --hostname设置自定义主机名(例如,' w1@%%h ')。扩展:%%h (hostname), %%n (name) and %%d-B, --beat还可以运行celery beat周期任务调度程序。请注意,此服务只能有一个实例。请注意-B用于开发目的。对于生产环境,您需要分别启动celery beat。-Q, --queues为这个worker启用的队列列表,以逗号分隔。默认情况下,所有配置的队列都是启用的。例如:q视频,形象-x, exclude-queues要为此工作程序禁用的队列列表,用逗号分隔。默认情况下,所有配置的队列都是启用的。例子:- x视频,形象。-I, --include用逗号分隔要导入的其他模块列表。例子:我foo.tasks bar.tasks-s, --schedule如果使用-B选项运行,则指向调度数据库的路径。默认为celerybeat-schedule。扩展”。可以将db 附加到文件名中。-o应用优化概要文件。支持:默认情况下,公平--prefetch-multiplier为这个工作者实例设置自定义预取乘数值。--scheduler要使用的调度程序类。默认是celery.beat.PersistentScheduler-S, --statedb到状态数据库的路径。扩展”。可以将db 附加到文件名中。默认值:{默认}-E, --task-events发送可以被监视器捕获的与任务相关的事件,如芹菜事件、celerymon和其他事件。--without-gossip不要订阅其他员工活动。--without-mingle在刚开始的时候不要和其他员工同步。--without-heartbeat不要发送事件心跳。--heartbeat-interval发送工作者心跳的间隔(秒)--purge在启动守护进程之前清除所有等待的任务。警告:这是不可恢复的,任务将从消息传递服务器中删除。--time-limit为任务启用硬时间限制(以秒为单位的int/float)。--soft-time-limit为任务启用软时间限制(以秒为单位int/float)。--max-tasks-per-child池工作程序在终止并被新工作程序替换之前可以执行的最大任务数。--max-memory-per-child在KiB中,子进程在被新进程替换之前可能消耗的最大驻留内存量。如果单个任务导致子进程超过此限制,则该任务将被完成,然后子进程将被替换。默认值:没有限制。--autoscale通过提供max_concurrency、min_concurrency启用自动调焦。例子:--autoscale=10,3(始终保持3个进程,但如果需要,可以增加到10个)--detach启动worker作为后台进程。-f, --logfile日志文件的路径。如果没有指定日志文件,则使用stderr。-l, --loglevel日志级别,在调试、信息、警告、错误、关键或致命之间进行选择。--pidfile可选文件,用于存储进程pid。如果这个文件已经存在并且pid仍然是活动的,程序将不会启动。--uid用户id,或用户的用户名,以在分离后运行。--gid组id,或主组的组名,在分离后更改为。--umask分离后工艺的有效umask(1)(八进制)。默认情况下继承父进程的umask(1)。--workdir可选目录切换到分离后。--executable可执行文件,用于分离的进程。
nohup celery -A tasks flower --address=127.0.0.1 --port=5566 > logs/flower.log 2>&1 &## nohup 表示后台运行## flower日志文件会存储在logs/flower.log文件中## 重启flower后,页面上会重新计数## 可以看到报错日志
from django.shortcuts import render, HttpResponsefrom .tasks import *## Create your views here.def task_add_view(request):# result = add(10,20)# return HttpResponse(f'调用add函数结果:{result}')# 若使用第一种方式测试时(注释shared_task),正常输出,延迟10sar = add.delay(100,200) # 添加到消息中间件,保存结果到redisprint('add.delay(100,200)--------')# 若使用第二种方式测试时(不注释),结果直接保存到return HttpResponse(f'任务调用id:{ar.id}')
from django.shortcuts import render, HttpResponsefrom django.http import JsonResponsefrom .tasks import *from celery import resultdef get_result_by_taskid(request):task_id = request.GET.get('task_id')ar = result.AsyncResult(task_id)if ar.ready(): # 是否执行完成return JsonResponse({'status':ar.state,'result':ar.get()})else:return JsonResponse({'status':ar.state,'result':''})
AsyncResult类的常用的属性和方法:state: 返回任务状态,等同status;task_id: 返回任务id;result: 返回任务结果,同get()方法;ready(): 判断任务是否执行以及有结果,有结果为True,否则False;info(): 获取任务信息,默认为结果;wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;successful(): 判断任务是否成功,成功为True,否则为False;
官网:https://pypi.org/project/flower/
文档:https://flower.readthedocs.io/en/latest
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理## 启动flowerflower -A celery_study --port=5555## 说明:-A:项目名–port: 端口号## 访问在浏览器输入:http://127.0.0.1:5555
如果需要nginx转发那么nginx重定向到5555location /flower/ {proxy_pass http://127.0.0.1:5555;}还要修改templates/base.html的id='url_prefix'value改为"/flower"url.py修改1 - 静态资源:static_url_prefixstatic_url_prefix='/static/',static_url_prefix='/flower/static/',修改2 - url:所有的请求路径都加上/flower除了static、login共29个
[unix_http_server]file=/tmp/supervisor.sock ;UNIX socket 文件,supervisorctl 会使用;chmod=0700 ;socket文件的mode,默认是0700;chown=nobody:nogroup ;socket文件的owner,格式:uid:gid[inet_http_server] ;HTTP服务器,提供web管理界面port=*:9002 ;Web管理后台运行的IP和端口,如果开放到公网,需要注意安全性username=user ;登录管理后台的用户名password=123 ;登录管理后台的密码[rpcinterface:supervisor]supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface[supervisord]logfile=/tmp/supervisord.log ;日志文件,默认是 $CWD/supervisord.loglogfile_maxbytes=50MB ;日志文件大小,超出会rotate,默认 50MB,如果设成0,表示不限制大小logfile_backups=10 ;日志文件保留备份数量默认10,设为0表示不备份loglevel=info ;日志级别,默认info,其它: debug,warn,tracepidfile=/tmp/supervisord.pid ;pid 文件nodaemon=false ;是否在前台启动,默认是false,即以 daemon 的方式启动minfds=1024 ;可以打开的文件描述符的最小值,默认 1024minprocs=200 ;可以打开的进程数的最小值,默认 200[supervisorctl]serverurl=unix:///tmp/supervisor.sock ;通过UNIX socket连接supervisord,路径与unix_http_server部分的file一致;serverurl=http://127.0.0.1:9001 ; 通过HTTP的方式连接supervisord;包含其它配置文件[include]files = /xxx/xxxProject/xxxProject_worker.conf /xxx/xxxProject/xxxProject_beat.conf /xxx/xxxProject/xxxProject_flower.conf ;可以指定一个或多个以.ini结束的配置文件
[program:xxxProject_beat]directory=/xxx/xxxProject/xxxServer/command=celery -A xxxServer beat -l WARNINGautostart=trueautorestart=falsestderr_logfile=/tmp/xxxProject_beat_stderr.logstdout_logfile=/tmp/xxxProject_beat_stdout.log
[program:xxxProject_worker]directory=/xxx/xxxProject/xxxServer/command=celery -A xxxServer worker -l INFO -n 自己命名## 这个命名可以在flower呈现autostart=trueautorestart=falsestderr_logfile=/tmp/xxxProject_worker_stderr.logstdout_logfile=/tmp/xxxProject_worker_stdout.log
[program:xxxProject_flower]directory=/xxx/xxxProject/xxxServer/command=celery -A xxxServer flower --port=5555autostart=trueautorestart=falsestderr_logfile=/tmp/xxxProject_flower_stderr.logstdout_logfile=/tmp/xxxProject_flower_stdout.log
## 进入虚拟环境## 安装supervisorpip install supervisor## 启动supervisorsupervisord -c /xxx/xxxProject/supervisord.conf## 进入supervisor、输入用户、密码supervisorctluser123
| 进入supervisorctl后 | 释义 |
|---|---|
status |
查看所有进程的状态 |
stop all |
停止全部 |
stop es |
停止es |
start all |
启动全部 |
start es |
启动es |
restart all |
重启全部 |
restart |
重启es |
update |
配置文件修改后使用该命令加载新的配置 |
reload |
重新启动配置中的所有程序 |
echo "" > xxx_beat_stderr.log