celery文档
https://docs.celeryq.dev/en/stable/index.html
别人写的
https://blog.csdn.net/m0_63953077/article/details/128122734
Celery由三部分构成:
消息中间件(Broker):
官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ
任务执行单元(Worker):
任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心
结果存储(Backend):
官方提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等
异步任务 定时任务
Async Task Celery Beat
| |
|发送任务 |发送任务
| /
消息中间件(Broker)___
| |
|监控Broker |监控Broker
| |
任务执行单元 任务执行单元
Celery Worker Celery Worker
| |
|保存 |保存
| <____|
结果存储(Backend)
RabbitMQ
RabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的,可以用flower
Redis
Redis 功能比较全,但是如果突然停止运行或断电会造成数据丢失,可以用flower
pip install celery
## win增加安装,celery4+
pip install eventlet
celery4.x以下适用 redis2.10.6
celery4.3以上适用redis3.2.0以上:
pip install redis==2.10.6
## celery通过flower组件实现管理和监控功能
pip install flower
配置官网
https://docs.celeryq.dev/en/latest/userguide/configuration.html
## Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0'
## BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
## 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
## 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
## 时区配置
CELERY_TIMEZONE='Asia/Shanghai'
## 指定导入的任务模块,可以指定多个
##CELERY_IMPORTS = (
## 'other_dir.tasks',
##)
## 设置定时任务
CELERY_BEAT_SCHEDULE = {
'mul_every_30_seconds': {
# 任务路径
'task': 'celery_app.tasks.mul',
# 每5秒执行一次
'schedule': 5,
'args': (4, 5)
},
'xsum_week1_20_20_00': {
# 任务路径
'task': 'celery_app.tasks.xsum',
# 每周一20点20分执行
'schedule': crontab(hour=20, minute=20, day_of_week=1),
'args': ([1,2,3,4],),
},
}
## task:任务函数
## schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)
## 不需要分钟的话,也一定要写上分钟(minute=0)
## args:位置参数,列表或元组
## kwargs:关键字参数,字典
## options:可选参数,字典,任何 apply_async() 支持的参数
## relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间
import os
import django
from celery import Celery
from django.conf import settings
## 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
## celery_study 是当前项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')
django.setup()
## 实例化一个celery类
celery_app = Celery('celery_study')
## 指定配置文件的位置
celery_app.config_from_object('django.conf:settings')
## 自动从settings的配置INSTALLED_APPS中的应用目录下加载 tasks.py
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
__init__.py
from .celery import celery_app
__all__ = ['celery_app'] # 加入celery_app
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
## 通过 base=MyHookTask 指定任务钩子
## Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等
## 在装饰器中加入参数 bind=True
## 在task函数中的第一个参数设置为self
## self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能
@shared_task(base=MyHookTask, bind=True)
def mul(self, x, y):
try:
logger.info('-mul' * 10)
logger.info(f'{self.name}, id:{self.request.id}')
raise Exception
except Exception as e:
# 出错每4秒尝试一次,总共尝试4次
self.retry(exc=e, countdown=4, max_retries=4)
return x * y
class MyHookTask(Task):
# 成功后执行
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'task id:{task_id} , arg:{args} , successful !')
# 任务返回时候执行
def after_return(self, retval, task_id, args, kwargs):
logger.info(f'task id:{task_id} , arg:{args} , successful !')
# 失败时候执行
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
# 任务重试时候执行
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')
在cmd窗口中,切换到项目根目录下,执行:
celery beat -A celery_study -l debug
celery worker -A celery_study -l info
celery worker -A celery_study -l info -P eventlet
celery worker -A celery_study -l debug -P eventlet
## 说明:
beat: 表明这是一个定时推送单元
worker: 表明这是一个任务执行单元
-A celery_study:指定项目
-l info:指定日志输出级别
-P eventlet:支持windows
## 更多celery命令的参数,可以输入:
celery --help
或
celery worker --help
celery.bin.worker
用于启动celery工作实例的程序。
celery工人命令(以前称为celeryd)
-c, --concurrency
处理队列的子进程的数目。默认值是系统上可用的cpu数量。
-P, --pool
池实现:
prefork(默认)、eventlet、gevent、threads或solo。
-n, --hostname
设置自定义主机名(例如,' w1@%%h ')。扩展:%%h (hostname), %%n (name) and %%d
-B, --beat
还可以运行celery beat周期任务调度程序。请注意,此服务只能有一个实例。
请注意
-B用于开发目的。对于生产环境,您需要分别启动celery beat。
-Q, --queues
为这个worker启用的队列列表,以逗号分隔。默认情况下,所有配置的队列都是启用的。例如:q视频,形象
-x, exclude-queues
要为此工作程序禁用的队列列表,用逗号分隔。默认情况下,所有配置的队列都是启用的。例子:- x视频,形象。
-I, --include
用逗号分隔要导入的其他模块列表。例子:我foo.tasks bar.tasks
-s, --schedule
如果使用-B选项运行,则指向调度数据库的路径。默认为celerybeat-schedule。扩展”。可以将db 附加到文件名中。
-o
应用优化概要文件。支持:默认情况下,公平
--prefetch-multiplier
为这个工作者实例设置自定义预取乘数值。
--scheduler
要使用的调度程序类。默认是celery.beat.PersistentScheduler
-S, --statedb
到状态数据库的路径。扩展”。可以将db 附加到文件名中。默认值:{默认}
-E, --task-events
发送可以被监视器捕获的与任务相关的事件,如芹菜事件、celerymon和其他事件。
--without-gossip
不要订阅其他员工活动。
--without-mingle
在刚开始的时候不要和其他员工同步。
--without-heartbeat
不要发送事件心跳。
--heartbeat-interval
发送工作者心跳的间隔(秒)
--purge
在启动守护进程之前清除所有等待的任务。警告:这是不可恢复的,任务将从消息传递服务器中删除。
--time-limit
为任务启用硬时间限制(以秒为单位的int/float)。
--soft-time-limit
为任务启用软时间限制(以秒为单位int/float)。
--max-tasks-per-child
池工作程序在终止并被新工作程序替换之前可以执行的最大任务数。
--max-memory-per-child
在KiB中,子进程在被新进程替换之前可能消耗的最大驻留内存量。如果单个任务导致子进程超过此限制,则该任务将被完成,然后子进程将被替换。默认值:没有限制。
--autoscale
通过提供max_concurrency、min_concurrency启用自动调焦。例子:
--autoscale=10,3
(始终保持3个进程,但如果需要,可以增加到10个)
--detach
启动worker作为后台进程。
-f, --logfile
日志文件的路径。如果没有指定日志文件,则使用stderr。
-l, --loglevel
日志级别,在调试、信息、警告、错误、关键或致命之间进行选择。
--pidfile
可选文件,用于存储进程pid。
如果这个文件已经存在并且pid仍然是活动的,程序将不会启动。
--uid
用户id,或用户的用户名,以在分离后运行。
--gid
组id,或主组的组名,在分离后更改为。
--umask
分离后工艺的有效umask(1)(八进制)。默认情况下继承父进程的umask(1)。
--workdir
可选目录切换到分离后。
--executable
可执行文件,用于分离的进程。
nohup celery -A tasks flower --address=127.0.0.1 --port=5566 > logs/flower.log 2>&1 &
## nohup 表示后台运行
## flower日志文件会存储在logs/flower.log文件中
## 重启flower后,页面上会重新计数
## 可以看到报错日志
from django.shortcuts import render, HttpResponse
from .tasks import *
## Create your views here.
def task_add_view(request):
# result = add(10,20)
# return HttpResponse(f'调用add函数结果:{result}')
# 若使用第一种方式测试时(注释shared_task),正常输出,延迟10s
ar = add.delay(100,200) # 添加到消息中间件,保存结果到redis
print('add.delay(100,200)--------')
# 若使用第二种方式测试时(不注释),结果直接保存到
return HttpResponse(f'任务调用id:{ar.id}')
from django.shortcuts import render, HttpResponse
from django.http import JsonResponse
from .tasks import *
from celery import result
def get_result_by_taskid(request):
task_id = request.GET.get('task_id')
ar = result.AsyncResult(task_id)
if ar.ready(): # 是否执行完成
return JsonResponse({'status':ar.state,'result':ar.get()})
else:
return JsonResponse({'status':ar.state,'result':''})
AsyncResult类的常用的属性和方法:
state: 返回任务状态,等同status;
task_id: 返回任务id;
result: 返回任务结果,同get()方法;
ready(): 判断任务是否执行以及有结果,有结果为True,否则False;
info(): 获取任务信息,默认为结果;
wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
successful(): 判断任务是否成功,成功为True,否则为False;
官网:https://pypi.org/project/flower/
文档:https://flower.readthedocs.io/en/latest
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理
## 启动flower
flower -A celery_study --port=5555
## 说明:
-A:项目名
–port: 端口号
## 访问
在浏览器输入:http://127.0.0.1:5555
如果需要nginx转发
那么nginx重定向到5555
location /flower/ {
proxy_pass http://127.0.0.1:5555;
}
还要修改templates/base.html
的id='url_prefix'
value改为"/flower"
url.py
修改1 - 静态资源:
static_url_prefix
static_url_prefix='/static/',
static_url_prefix='/flower/static/',
修改2 - url:
所有的请求路径都加上/flower
除了static、login
共29个
[unix_http_server]
file=/tmp/supervisor.sock ;UNIX socket 文件,supervisorctl 会使用
;chmod=0700 ;socket文件的mode,默认是0700
;chown=nobody:nogroup ;socket文件的owner,格式:uid:gid
[inet_http_server] ;HTTP服务器,提供web管理界面
port=*:9002 ;Web管理后台运行的IP和端口,如果开放到公网,需要注意安全性
username=user ;登录管理后台的用户名
password=123 ;登录管理后台的密码
[rpcinterface:supervisor]
supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface
[supervisord]
logfile=/tmp/supervisord.log ;日志文件,默认是 $CWD/supervisord.log
logfile_maxbytes=50MB ;日志文件大小,超出会rotate,默认 50MB,如果设成0,表示不限制大小
logfile_backups=10 ;日志文件保留备份数量默认10,设为0表示不备份
loglevel=info ;日志级别,默认info,其它: debug,warn,trace
pidfile=/tmp/supervisord.pid ;pid 文件
nodaemon=false ;是否在前台启动,默认是false,即以 daemon 的方式启动
minfds=1024 ;可以打开的文件描述符的最小值,默认 1024
minprocs=200 ;可以打开的进程数的最小值,默认 200
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ;通过UNIX socket连接supervisord,路径与unix_http_server部分的file一致
;serverurl=http://127.0.0.1:9001 ; 通过HTTP的方式连接supervisord
;包含其它配置文件
[include]
files = /xxx/xxxProject/xxxProject_worker.conf /xxx/xxxProject/xxxProject_beat.conf /xxx/xxxProject/xxxProject_flower.conf ;可以指定一个或多个以.ini结束的配置文件
[program:xxxProject_beat]
directory=/xxx/xxxProject/xxxServer/
command=celery -A xxxServer beat -l WARNING
autostart=true
autorestart=false
stderr_logfile=/tmp/xxxProject_beat_stderr.log
stdout_logfile=/tmp/xxxProject_beat_stdout.log
[program:xxxProject_worker]
directory=/xxx/xxxProject/xxxServer/
command=celery -A xxxServer worker -l INFO -n 自己命名
## 这个命名可以在flower呈现
autostart=true
autorestart=false
stderr_logfile=/tmp/xxxProject_worker_stderr.log
stdout_logfile=/tmp/xxxProject_worker_stdout.log
[program:xxxProject_flower]
directory=/xxx/xxxProject/xxxServer/
command=celery -A xxxServer flower --port=5555
autostart=true
autorestart=false
stderr_logfile=/tmp/xxxProject_flower_stderr.log
stdout_logfile=/tmp/xxxProject_flower_stdout.log
## 进入虚拟环境
## 安装supervisor
pip install supervisor
## 启动supervisor
supervisord -c /xxx/xxxProject/supervisord.conf
## 进入supervisor、输入用户、密码
supervisorctl
user
123
进入supervisorctl后 | 释义 |
---|---|
status |
查看所有进程的状态 |
stop all |
停止全部 |
stop es |
停止es |
start all |
启动全部 |
start es |
启动es |
restart all |
重启全部 |
restart |
重启es |
update |
配置文件修改后使用该命令加载新的配置 |
reload |
重新启动配置中的所有程序 |
echo "" > xxx_beat_stderr.log