django入门(五) celery使用

云惠网小编 2021年12月2日19:18:17
评论
7222字阅读24分4秒
摘要

免费分享,平时搬砖,周末会录制匹配的视频。配套视频地址:https://www.bilibili.com/video/BV1eQ4y1U7os/当你想要放弃的时候,想想当初为什么坚持到了现在!!!文章仅仅针对celery入门,进阶使用,会在实战项目中涉及1.业务场景双11在即,既然是光棍节,就应该品点新茶叶。为所有二级、三级会员推送新茶介绍。首先这种业务不应该影响主逻辑,也就是后台人员编辑好内容,选择好目标会员后点击就返回提交成功,后台处理中,不是在那里等着后台处理完。这个实现挺简单,访问后台

广告也精彩

在这里插入图片描述

celery -A task worker -l info -P eventlet -E
task是文件名称
info 是日志级别
-P eventlet  表示使用什么池实现协程 eventlet,gevent 或者 solo等
其它参数后面谈论

views添加代码

在这里插入图片描述

from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app. 确保celey被加载
from .celery import app as celery_app   # app对应celery.py中的app
__all__ = ('celery_app',)
上面包含了celery的完整架构图:
1.async task 一般的异步任务 生产者
2.beat:定时异步任务        生产者
3.消息中间件(rabbitmq/redis等) 存储消息,官方建议用rabbitmq
4.监控flower 一个项目会有很多任务用到celery,就不得不监控,方便我们
排查问题,找到性能瓶颈完成调优等
5.worker:执行任务的,也称为消费者,首先主进程接收到任务,
启动子进程去执行任务(也可以通过配置使用eventlet创建协程或者绿色线程等)
6.backend:执行完毕得到结果,方便发送方/消费方获知结果
7.celery可以跟项目一起部署,也可以单独部署到服务器
pis:在整个任务流程中,会生成一个任务id唯一记录当次任务,这个很好理解,
只有通过唯一标识去分辨那么多任务。

在这里插入图片描述

3.celery安装入门

在这里插入图片描述

app.conf.beat_schedule = {
'schedule_task': {  # 随便取名字
'task': 'celerytest.task.schedule_task',  # 指定需要定时的任务
'schedule': crontab(),  # 删除就可以了 可以点进去这个方法看 都是*
},
}

1.业务场景

celery是python开发,类似上面的场景都可以用celery实现

from django.urls import path, include, re_path
from .views import celery_test
urlpatterns = [
re_path(r'^celerytest/', celery_test.as_view()),
]
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time
# 可以指定shared_task的name为任务名称 不指定默认采用方法名
# 注意这个名字跟celery.py里面配置的完全一样 所以是定时任务
@shared_task
def schedule_task():
print('定时任务执行了,因为在celery指定了这个名字的任务加入定时任务')
@shared_task(name="send_email")
def send_email():
print("发送邮箱的普通任务需要手动调用!!!")
time.sleep(20)
print('邮箱发送完毕,异步调用不影响主业务')
return "s"

test.py: 用来触发普通任务的

再次启动

其它简单的可以比如redis安装等参考前面的博客

免费分享,平时搬砖,周末会录制匹配的视频。
配套视频地址:https://www.bilibili.com/video/BV1eQ4y1U7os/
配套源码 https://github.com/xmccxwj/django-celery

总的urls下面添加

因为terminal已经被用了,我们打开cmd启动定时任务

在这里插入图片描述

5.2在setting.py同级目录新建 celery.py

5.3编辑setting.py同级目录的__init__.py

为了查看效果,将上面celery.py的配置改为每分钟执行一次

from django.contrib import admin
from django.urls import path, include, re_path
urlpatterns = [
path('admin/', admin.site.urls),
re_path(r'^', include(('celerytest.urls', 'celerytest'), namespace='celerytest'))
]
pip install flower==0.9.1 -i https://pypi.douban.com/simple

大家记住上面的介绍,就知道在celery中我们需要做什么了。比如celery需要使用消息队列,并且已经整合好,我们至少得告诉它使用哪个消息队列,地址是多少吧。 比如celery已经完成得异步,不需要我们编写,但是我们得告诉它哪个方法需要异步吧,它已经整合好了结果存储,那么我们得告诉它存储的地址吧。

3.3启动程序

2.celery介绍

生产者:生成任务的 消费者:处理任务的

5.4启动测试

启动项目

在这里插入图片描述

from celery import Celery
from celery.task import periodic_task
from celery.schedules import crontab
import time
# 定义一个celery对象
# shuozhongdian_celery 为这个celery改的名字 随便都可以
# broker:表示任务队列的位置
# backend:任务完成后结果存放的位置
# 其它参数 后面详解
app = Celery('shuozhongdian_celery', broker='redis://127.0.0.1:6379/6', backend='redis://127.0.0.1:6379/7')
# 普通任务 由 方法名 send_email.delay()触发
@app.task
def send_email(level, content):
print('去数据库根据等级查询用户:'+"_".join(level))
print('发送的内容为:' + content)
time.sleep(20)
print('发送邮箱结束')
return "send_email success"
# 定时任务 每分钟执行一次 后续详解
@periodic_task(run_every=crontab())
def task():
print('定时任务 task 开始!!!!!')
time.sleep(10)
print('定时任务 task 结束!!!!!')
return 'task success'

3.2编写代码

问题:

4.flower-celery监控

1.业务发展方面,市场调研发现一次性给用户推送茶叶,内容太多,
客户大多不看,效果不理想,于是计划每天发送一批茶叶即可,
让客户每天都有新鲜感.也挺简单,事先编辑好茶叶的发送日期,
一个定时任务+多进程即可完成
2.业务再次发展,喝茶得用一系列工具。。这些都可以作为商品推送,
也就是每天要发送得任务量越来越大,茶叶,蜡烛,皮鞭等等一系列东西。。
工作人员肯定是上班时间一次性录入,不可能等你后台发完一个在录入一个吧?
比如后台一次性接收到要发送的任务是500个,只能一个一个发,
那么剩下的这些任务得找个地方存呀,
也挺简单,一个queue(消息队列)就搞定了,java/python都简单,
实在后面任务更大500到5000再到5w。。那就用第三方能实现消息队列的
就可以了 比如rabbitmq/redis/mongo等
3.吩咐其它进程做了业务,你想知道结果。也很简单。
如果用的rabbitmq那么可以监听一个队列,
消费者那边做完了把结果发到这个队列,生产者监听这个队列即可。
如果是redis,利用发布订阅,或者blpop即可
4.当整个项目这种业务越来越多,当前服务器就那么大,
都给这种业务了,那些项目最基本的业务都没有地方跑了,
这个也简单可以把做这种任务的单独抽取到一个服务器,
不影响原来的基本业务服务器
5.任务太多了,全部打过去celery扛不住压力,就需要rate-limit限流什么的
其它:中断任务,监控任务,超时机制...........

打开test.py 右键run就可以了

# SHUOZHONGDIAN是前缀这里注意大写
SHUOZHONGDIAN_BROKER_URL = 'redis://127.0.0.1:6379/1'  # 任务队列的位置
SHUOZHONGDIAN_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'  # 任务执行结果存放
SHUOZHONGDIAN_TIMEZONG = TIME_ZONE  # 项目中有定时任务 设置时区  这里跟项目一致 引用变量
SHUOZHONGDIAN_CELERY_ACCEPT_CONTENT  = ["json"]  # 配置celery可以接受哪些格式
SHUOZHONGDIAN_TASK_SERIALIZER = "json"  # 任务序列化格式 从上面的数组中选择
SHUOZHONGDIAN_RESULT_SERIALIZER = "json"  # 结果序列化数据格式
# 其它配置
# https://docs.celeryproject.org/en/stable/userguide/configuration.html

terminal被我们用来启动celery了

celery -A shuozhongdian beat -l info

再次启动celery,启动项目,然后再cmd输入

5.6测试

6.总结

文档配套视频

3.1首先创建项目安装celery

cd /usr/local
yum install wget –y
wget http://download.redis.io/releases/redis-6.2.5.tar.gz
tar zxvf redis-6.2.5.tar.gz
cd redis-6.2.5
make
cd src
yum -y install gcc gcc-c++ libstdc++-devel
make MALLOC=libc
make install PREFIX=/usr/local/redis
外部访问需要修改配置文件
cd ..
yum install vim -y
vim redis.conf
搜索里面的这两个个修改.
protected-mode后的yes改为no
找到bind 127.0.0.1注释掉(或改为0.0.0.0)
开放端口6379(防火墙、阿里云安全策略。。。)
cd src
./redis-server ../redis.conf
ps -ef|grep redis
查看是否开启

双11在即,既然是光棍节,就应该品点新茶叶。为所有二级、三级会员推送新茶介绍。

进入shuozhongdian目录,打开cmd执行

5.5添加任务

这个实现挺简单,访问后台接口,后台启动一个进程(在java可以起线程,异步),查询二级、三级会员,然后把事先编辑好的内容以邮箱或者短信的形式推送即可。

在这里插入图片描述

也可以在linux安装redis

文章目录

    • 1.业务场景
    • 2.celery介绍
    • 3.celery安装入门
      • 3.1首先创建项目安装celery
      • 3.2编写代码
      • 3.3启动程序
    • 4.flower-celery监控
    • 5.django整合celery
      • 5.1.在setting.py中加入配置
      • 5.2在setting.py同级目录新建 celery.py
      • 5.3编辑setting.py同级目录的__init__.py
      • 5.4启动测试
      • 5.5添加任务
      • 5.6测试
      • 5.7启动定时任务
    • 6.总结

5.7启动定时任务

在这里插入图片描述

可以右键manage.py启动项目–报错

celery入门就到这里了,项目中再去详细使用。

django入门(五) celery使用
在这里插入图片描述

1.打开项目所在文件的cmd

mkvirtualenv -p python3 shuozhongdian    创建虚拟环境
workon shuozhongdian   进入虚拟环境
pip install django==2.2.1 -i https://pypi.douban.com/simple  安装django后续要整合
pip install redis==3.2.1 -i  https://pypi.douban.com/simple  使用redis做消息队列
pip3 install celery==4.4.2 -i https://pypi.douban.com/simple 安装celery
pip install eventlet==0.25.1 -i https://pypi.douban.com/simple  安装eventlet 在windows中高版本celery需要
django-admin startproject shuozhongdian 创建项目

2.普通任务

5.1.在setting.py中加入配置

首先这种业务不应该影响主逻辑,也就是后台人员编辑好内容,选择好目标会员后点击就返回提交成功,后台处理中,不是在那里等着后台处理完。

运行命令:
flower -A task --port=5555

上面随便针对一个项目中的业务,其实类似的需求很多很多,上续的问题都可以采用这样那样的实现方式,虽然不难也挺麻烦,所以就有了celery,它已经把上续功能基本都实现了,比如已经配置好了如何链接redis/rabbitmq,我们只需要告诉它链接那个redis,也就是地址,也已经写好了定时器,我们只需要告诉他定再什么时间就可以了…

celery -A task beat --loglevel=info

在这里插入图片描述

如果没有celery,这些功能都需要程序员手动去实现。

使用pycharm打开项目,选择好虚拟环境,创建task.py /test.py

task.py:

再次启动

没有采用官方听不懂的语言,相信这样介绍,大家应该能明白。

5.django整合celery

 celery -A shuozhongdian worker -l info -P eventlet
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
# 指定Django默认配置文件模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'shuozhongdian.settings')
app = Celery('abcde')
# 这里指定从django的settings.py里读取celery配置
app.config_from_object('django.conf:settings', namespace='SHUOZHONGDIAN')
# beat:定时任务配置
# 详细可参考参考https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
app.conf.beat_schedule = {
'schedule_task': {  # 随便取名字
'task': 'celerytest.task.schedule_task',  # 指定需要定时的任务
'schedule': crontab(minute=0, hour=0),  # 每天0:00 执行
},
}
#  发现任务文件每个app下的task.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在这里插入图片描述

当然也可以阻塞等待结果

项目下新建应用celerytest,加入配置新建task.py

import task
import datetime
def send_email_to_level():
import time
start = time.time()
print("1. 接收前台参数,数据处理巴拉巴拉")
print("2. 调用celery...完成业务")
# 可以获得返回值 后续会讲解
result = task.send_email.delay(level=('2', '3'), content="[email protected]")
print("3. 因为celery是异步的这里可以直接返回,提交成功,后台处理发送中,celery睡了20秒 这里查看下结束时间")
print("耗时:%s 秒 " % (time.time() - start), ',当前时间为: %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 调用下方法
if __name__ == '__main__':
send_email_to_level()

在celerytest模块下添加urls

在这里插入图片描述

在celerytest下面的task.py里面编写

编写view视图

在这里插入图片描述

在这里插入图片描述

访问http://127.0.0.1:8000/celerytest

from django.shortcuts import render
from django.views import View
from .task import send_email
from django.http import JsonResponse
# Create your views here.
class celery_test(View):
def get(self, request):
print("手动调用celery异步任务")
send_email.delay()  # 执行异步任务
return JsonResponse({'code': 200, 'msg': '成功'}, safe=False)

3.定时任务

本文转自 https://blog.csdn.net/weixin_39160689/article/details/121649918

腾讯云618
云惠网小编
SpringCloud -- Config、Bus解析 java

SpringCloud — Config、Bus解析

1、Config1.1、概述简介1. 分布式面临的问题微服务意味着要将单体应用中的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统中会出现大量的服务。由于每个服务都需要必要...
Java数据结构-了解复杂度 java

Java数据结构-了解复杂度

2.实例分析与计算  四.写在最后  // 计算斐波那契递归fibonacci的时间复杂度 int fibonacci(int N) { return N < 2 ? N : fibonacci...
Java数据结构-认识顺序表 java

Java数据结构-认识顺序表

目录二.顺序表1.概念及结构2.顺序表的实现打印顺序表获取顺序表的有效长度在pos位置新增元素判断是否包含某个元素查找某个元素对应的位置获取/查找pos位置的元素给pos位置的元素...
腾讯云618

发表评论