hand
_1_35_35
4
python3.X - Web - Django3.2.9
共43篇
python3.X - Web - Django3.2.9
返回栏目
0k
0.6k
0.1k
0.2k
0.1k
0.2k
6k
1k
6k
2k
2k
0.4k
0.3k
0.3k
1k
0.5k
3k
2k
0.6k
0.9k
1k
1k
3k
0.1k
0.3k
0.4k
0.4k
0.1k
0.3k
2k
1k
1k
1k
5k
1k
1k
0k
3k
2k
0k
0.1k
0.3k
0k
返回python3.X - Web - Django3.2.9栏目
作者:
贺及楼
成为作者
更新日期:2024-06-13 21:03:45
celery文档
https://docs.celeryq.dev/en/stable/index.html
别人写的
https://blog.csdn.net/m0_63953077/article/details/128122734
Celery由三部分构成:
消息中间件(Broker):
官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ
任务执行单元(Worker):
任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心
结果存储(Backend):
官方提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等
异步任务 定时任务
Async Task Celery Beat
| |
|发送任务 |发送任务
| /
消息中间件(Broker)___
| |
|监控Broker |监控Broker
| |
任务执行单元 任务执行单元
Celery Worker Celery Worker
| |
|保存 |保存
| <____|
结果存储(Backend)
RabbitMQ
RabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的,可以用flower
Redis
Redis 功能比较全,但是如果突然停止运行或断电会造成数据丢失,可以用flower
pip install celery
## win增加安装,celery4+
pip install eventlet
celery4.x以下适用 redis2.10.6
celery4.3以上适用redis3.2.0以上:
pip install redis==2.10.6
## celery通过flower组件实现管理和监控功能
pip install flower
配置官网
https://docs.celeryq.dev/en/latest/userguide/configuration.html
## Broker配置,使用Redis作为消息中间件
BROKER_URL = 'redis://127.0.0.1:6379/0'
## BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
## 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
## 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
## 时区配置
CELERY_TIMEZONE='Asia/Shanghai'
## 指定导入的任务模块,可以指定多个
##CELERY_IMPORTS = (
## 'other_dir.tasks',
##)
## 设置定时任务
CELERY_BEAT_SCHEDULE = {
'mul_every_30_seconds': {
# 任务路径
'task': 'celery_app.tasks.mul',
# 每5秒执行一次
'schedule': 5,
'args': (4, 5)
},
'xsum_week1_20_20_00': {
# 任务路径
'task': 'celery_app.tasks.xsum',
# 每周一20点20分执行
'schedule': crontab(hour=20, minute=20, day_of_week=1),
'args': ([1,2,3,4],),
},
}
## task:任务函数
## schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)
## 不需要分钟的话,也一定要写上分钟(minute=0)
## args:位置参数,列表或元组
## kwargs:关键字参数,字典
## options:可选参数,字典,任何 apply_async() 支持的参数
## relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间
import os
import django
from celery import Celery
from django.conf import settings
## 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
## celery_study 是当前项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')
django.setup()
## 实例化一个celery类
celery_app = Celery('celery_study')
## 指定配置文件的位置
celery_app.config_from_object('django.conf:settings')
## 自动从settings的配置INSTALLED_APPS中的应用目录下加载 tasks.py
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
__init__.py
from .celery import celery_app
__all__ = ['celery_app'] # 加入celery_app
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
## 通过 base=MyHookTask 指定任务钩子
## Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等
## 在装饰器中加入参数 bind=True
## 在task函数中的第一个参数设置为self
## self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能
@shared_task(base=MyHookTask, bind=True)
def mul(self, x, y):
try:
logger.info('-mul' * 10)
logger.info(f'{self.name}, id:{self.request.id}')
raise Exception
except Exception as e:
# 出错每4秒尝试一次,总共尝试4次
self.retry(exc=e, countdown=4, max_retries=4)
return x * y
class MyHookTask(Task):
# 成功后执行
def on_success(self, retval, task_id, args, kwargs):
logger.info(f'task id:{task_id} , arg:{args} , successful !')
# 任务返回时候执行
def after_return(self, retval, task_id, args, kwargs):
logger.info(f'task id:{task_id} , arg:{args} , successful !')
# 失败时候执行
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
# 任务重试时候执行
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}')
在cmd窗口中,切换到项目根目录下,执行:
celery beat -A celery_study -l debug
celery worker -A celery_study -l info
celery worker -A celery_study -l info -P eventlet
celery worker -A celery_study -l debug -P eventlet
## 说明:
beat: 表明这是一个定时推送单元
worker: 表明这是一个任务执行单元
-A celery_study:指定项目
-l info:指定日志输出级别
-P eventlet:支持windows
## 更多celery命令的参数,可以输入:
celery --help
或
celery worker --help
celery.bin.worker
用于启动celery工作实例的程序。
celery工人命令(以前称为celeryd)
-c, --concurrency
处理队列的子进程的数目。默认值是系统上可用的cpu数量。
-P, --pool
池实现:
prefork(默认)、eventlet、gevent、threads或solo。
-n, --hostname
设置自定义主机名(例如,' w1@%%h ')。扩展:%%h (hostname), %%n (name) and %%d
-B, --beat
还可以运行celery beat周期任务调度程序。请注意,此服务只能有一个实例。
请注意
-B用于开发目的。对于生产环境,您需要分别启动celery beat。
-Q, --queues
为这个worker启用的队列列表,以逗号分隔。默认情况下,所有配置的队列都是启用的。例如:q视频,形象
-x, exclude-queues
要为此工作程序禁用的队列列表,用逗号分隔。默认情况下,所有配置的队列都是启用的。例子:- x视频,形象。
-I, --include
用逗号分隔要导入的其他模块列表。例子:我foo.tasks bar.tasks
-s, --schedule
如果使用-B选项运行,则指向调度数据库的路径。默认为celerybeat-schedule。扩展”。可以将db 附加到文件名中。
-o
应用优化概要文件。支持:默认情况下,公平
--prefetch-multiplier
为这个工作者实例设置自定义预取乘数值。
--scheduler
要使用的调度程序类。默认是celery.beat.PersistentScheduler
-S, --statedb
到状态数据库的路径。扩展”。可以将db 附加到文件名中。默认值:{默认}
-E, --task-events
发送可以被监视器捕获的与任务相关的事件,如芹菜事件、celerymon和其他事件。
--without-gossip
不要订阅其他员工活动。
--without-mingle
在刚开始的时候不要和其他员工同步。
--without-heartbeat
不要发送事件心跳。
--heartbeat-interval
发送工作者心跳的间隔(秒)
--purge
在启动守护进程之前清除所有等待的任务。警告:这是不可恢复的,任务将从消息传递服务器中删除。
--time-limit
为任务启用硬时间限制(以秒为单位的int/float)。
--soft-time-limit
为任务启用软时间限制(以秒为单位int/float)。
--max-tasks-per-child
池工作程序在终止并被新工作程序替换之前可以执行的最大任务数。
--max-memory-per-child
在KiB中,子进程在被新进程替换之前可能消耗的最大驻留内存量。如果单个任务导致子进程超过此限制,则该任务将被完成,然后子进程将被替换。默认值:没有限制。
--autoscale
通过提供max_concurrency、min_concurrency启用自动调焦。例子:
--autoscale=10,3
(始终保持3个进程,但如果需要,可以增加到10个)
--detach
启动worker作为后台进程。
-f, --logfile
日志文件的路径。如果没有指定日志文件,则使用stderr。
-l, --loglevel
日志级别,在调试、信息、警告、错误、关键或致命之间进行选择。
--pidfile
可选文件,用于存储进程pid。
如果这个文件已经存在并且pid仍然是活动的,程序将不会启动。
--uid
用户id,或用户的用户名,以在分离后运行。
--gid
组id,或主组的组名,在分离后更改为。
--umask
分离后工艺的有效umask(1)(八进制)。默认情况下继承父进程的umask(1)。
--workdir
可选目录切换到分离后。
--executable
可执行文件,用于分离的进程。
nohup celery -A tasks flower --address=127.0.0.1 --port=5566 > logs/flower.log 2>&1 &
## nohup 表示后台运行
## flower日志文件会存储在logs/flower.log文件中
## 重启flower后,页面上会重新计数
## 可以看到报错日志
from django.shortcuts import render, HttpResponse
from .tasks import *
## Create your views here.
def task_add_view(request):
# result = add(10,20)
# return HttpResponse(f'调用add函数结果:{result}')
# 若使用第一种方式测试时(注释shared_task),正常输出,延迟10s
ar = add.delay(100,200) # 添加到消息中间件,保存结果到redis
print('add.delay(100,200)--------')
# 若使用第二种方式测试时(不注释),结果直接保存到
return HttpResponse(f'任务调用id:{ar.id}')
from django.shortcuts import render, HttpResponse
from django.http import JsonResponse
from .tasks import *
from celery import result
def get_result_by_taskid(request):
task_id = request.GET.get('task_id')
ar = result.AsyncResult(task_id)
if ar.ready(): # 是否执行完成
return JsonResponse({'status':ar.state,'result':ar.get()})
else:
return JsonResponse({'status':ar.state,'result':''})
AsyncResult类的常用的属性和方法:
state: 返回任务状态,等同status;
task_id: 返回任务id;
result: 返回任务结果,同get()方法;
ready(): 判断任务是否执行以及有结果,有结果为True,否则False;
info(): 获取任务信息,默认为结果;
wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
successful(): 判断任务是否成功,成功为True,否则为False;
官网:https://pypi.org/project/flower/
文档:https://flower.readthedocs.io/en/latest
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理
## 启动flower
flower -A celery_study --port=5555
## 说明:
-A:项目名
–port: 端口号
## 访问
在浏览器输入:http://127.0.0.1:5555
如果需要nginx转发
那么nginx重定向到5555
location /flower/ {
proxy_pass http://127.0.0.1:5555;
}
还要修改templates/base.html
的id='url_prefix'
value改为"/flower"
url.py
修改1 - 静态资源:
static_url_prefix
static_url_prefix='/static/',
static_url_prefix='/flower/static/',
修改2 - url:
所有的请求路径都加上/flower
除了static、login
共29个
[unix_http_server]
file=/tmp/supervisor.sock ;UNIX socket 文件,supervisorctl 会使用
;chmod=0700 ;socket文件的mode,默认是0700
;chown=nobody:nogroup ;socket文件的owner,格式:uid:gid
[inet_http_server] ;HTTP服务器,提供web管理界面
port=*:9002 ;Web管理后台运行的IP和端口,如果开放到公网,需要注意安全性
username=user ;登录管理后台的用户名
password=123 ;登录管理后台的密码
[rpcinterface:supervisor]
supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface
[supervisord]
logfile=/tmp/supervisord.log ;日志文件,默认是 $CWD/supervisord.log
logfile_maxbytes=50MB ;日志文件大小,超出会rotate,默认 50MB,如果设成0,表示不限制大小
logfile_backups=10 ;日志文件保留备份数量默认10,设为0表示不备份
loglevel=info ;日志级别,默认info,其它: debug,warn,trace
pidfile=/tmp/supervisord.pid ;pid 文件
nodaemon=false ;是否在前台启动,默认是false,即以 daemon 的方式启动
minfds=1024 ;可以打开的文件描述符的最小值,默认 1024
minprocs=200 ;可以打开的进程数的最小值,默认 200
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ;通过UNIX socket连接supervisord,路径与unix_http_server部分的file一致
;serverurl=http://127.0.0.1:9001 ; 通过HTTP的方式连接supervisord
;包含其它配置文件
[include]
files = /xxx/xxxProject/xxxProject_worker.conf /xxx/xxxProject/xxxProject_beat.conf /xxx/xxxProject/xxxProject_flower.conf ;可以指定一个或多个以.ini结束的配置文件
[program:xxxProject_beat]
directory=/xxx/xxxProject/xxxServer/
command=celery -A xxxServer beat -l WARNING
autostart=true
autorestart=false
stderr_logfile=/tmp/xxxProject_beat_stderr.log
stdout_logfile=/tmp/xxxProject_beat_stdout.log
[program:xxxProject_worker]
directory=/xxx/xxxProject/xxxServer/
command=celery -A xxxServer worker -l INFO -n 自己命名
## 这个命名可以在flower呈现
autostart=true
autorestart=false
stderr_logfile=/tmp/xxxProject_worker_stderr.log
stdout_logfile=/tmp/xxxProject_worker_stdout.log
[program:xxxProject_flower]
directory=/xxx/xxxProject/xxxServer/
command=celery -A xxxServer flower --port=5555
autostart=true
autorestart=false
stderr_logfile=/tmp/xxxProject_flower_stderr.log
stdout_logfile=/tmp/xxxProject_flower_stdout.log
## 进入虚拟环境
## 安装supervisor
pip install supervisor
## 启动supervisor
supervisord -c /xxx/xxxProject/supervisord.conf
## 进入supervisor、输入用户、密码
supervisorctl
user
123
进入supervisorctl后 | 释义 |
---|---|
status |
查看所有进程的状态 |
stop all |
停止全部 |
stop es |
停止es |
start all |
启动全部 |
start es |
启动es |
restart all |
重启全部 |
restart |
重启es |
update |
配置文件修改后使用该命令加载新的配置 |
reload |
重新启动配置中的所有程序 |
echo "" > xxx_beat_stderr.log
python3.X - Web - Django3.2.9
整章节共43节
快分享给你的小伙伴吧 ~